• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5060

17 May 2026 01:15AM UTC coverage: 73.425% (-0.02%) from 73.443%
#5060

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281800 of 383795 relevant lines covered (73.42%)

134332207.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.05
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbIter.h"
19
#include "tsdbReadUtil.h"
20
#include "tss.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
71,544,635✔
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
71,544,635✔
27
    tsdbTrace(" release lru cache failed");
17,631,957✔
28
  }
29
}
71,541,677✔
30

31
#ifdef USE_SHARED_STORAGE
32

33
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
24,810✔
34
  int32_t code = 0, lino = 0;
24,810✔
35
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
24,810✔
36
  int64_t szBlock = tsSsBlockSize <= 1024 ? 1024 : tsSsBlockSize;
24,810✔
37

38
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsBlockCacheSize * szBlock * szPage, 0, .5);
24,810✔
39
  if (pCache == NULL) {
24,810✔
40
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
41
  }
42

43
  taosLRUCacheSetStrictCapacity(pCache, false);
24,810✔
44

45
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
24,810✔
46

47
  pTsdb->bCache = pCache;
24,810✔
48

49
_err:
24,810✔
50
  if (code) {
24,810✔
51
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
52
              tstrerror(code));
53
  }
54

55
  TAOS_RETURN(code);
24,810✔
56
}
57

58
static void tsdbCloseBCache(STsdb *pTsdb) {
24,810✔
59
  SLRUCache *pCache = pTsdb->bCache;
24,810✔
60
  if (pCache) {
24,810✔
61
    int32_t elems = taosLRUCacheGetElems(pCache);
24,810✔
62
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
24,810✔
63
    taosLRUCacheEraseUnrefEntries(pCache);
24,810✔
64
    elems = taosLRUCacheGetElems(pCache);
24,810✔
65
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
24,810✔
66

67
    taosLRUCacheCleanup(pCache);
24,810✔
68

69
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
24,810✔
70
  }
71
}
24,810✔
72

73
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
24,810✔
74
  int32_t code = 0, lino = 0;
24,810✔
75
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
24,810✔
76

77
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsPageCacheSize * szPage, 0, .5);
24,810✔
78
  if (pCache == NULL) {
24,810✔
79
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
80
  }
81

82
  taosLRUCacheSetStrictCapacity(pCache, false);
24,810✔
83

84
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
24,810✔
85

86
  pTsdb->pgCache = pCache;
24,810✔
87

88
_err:
24,810✔
89
  if (code) {
24,810✔
90
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
91
  }
92

93
  TAOS_RETURN(code);
24,810✔
94
}
95

96
static void tsdbClosePgCache(STsdb *pTsdb) {
24,810✔
97
  SLRUCache *pCache = pTsdb->pgCache;
24,810✔
98
  if (pCache) {
24,810✔
99
    int32_t elems = taosLRUCacheGetElems(pCache);
24,810✔
100
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
24,810✔
101
    taosLRUCacheEraseUnrefEntries(pCache);
24,810✔
102
    elems = taosLRUCacheGetElems(pCache);
24,810✔
103
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
24,810✔
104

105
    taosLRUCacheCleanup(pCache);
24,810✔
106

107
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
24,810✔
108
  }
109
}
24,810✔
110

111
#endif  // USE_SHARED_STORAGE
112

113
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
114

115
enum {
116
  LFLAG_LAST_ROW = 0,
117
  LFLAG_LAST = 1,
118
};
119

120
typedef struct {
121
  tb_uid_t uid;
122
  int16_t  cid;
123
  int8_t   lflag;
124
} SLastKey;
125

126
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
127
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
128

129
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
4,990,595✔
130
  SVnode *pVnode = pTsdb->pVnode;
4,990,595✔
131
  vnodeGetPrimaryPath(pVnode, false, path, TSDB_FILENAME_LEN);
4,990,695✔
132

133
  int32_t offset = strlen(path);
4,991,596✔
134
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%scache.rdb", TD_DIRSEP, pTsdb->name, TD_DIRSEP);
4,991,596✔
135
}
4,989,360✔
136

137
static const char *myCmpName(void *state) {
26,363,558✔
138
  (void)state;
139
  return "myCmp";
26,363,558✔
140
}
141

142
static void myCmpDestroy(void *state) { (void)state; }
4,992,260✔
143

144
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
1,397,930,729✔
145
  (void)state;
146
  (void)alen;
147
  (void)blen;
148
  SLastKey *lhs = (SLastKey *)a;
1,397,930,729✔
149
  SLastKey *rhs = (SLastKey *)b;
1,397,930,729✔
150

151
  if (lhs->uid < rhs->uid) {
1,397,930,729✔
152
    return -1;
840,586,774✔
153
  } else if (lhs->uid > rhs->uid) {
559,745,230✔
154
    return 1;
239,164,522✔
155
  }
156

157
  if (lhs->cid < rhs->cid) {
321,081,480✔
158
    return -1;
119,166,379✔
159
  } else if (lhs->cid > rhs->cid) {
202,024,867✔
160
    return 1;
76,977,985✔
161
  }
162

163
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
125,113,467✔
164
    return -1;
46,812,582✔
165
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
78,329,389✔
166
    return 1;
74,291,940✔
167
  }
168

169
  return 0;
4,038,183✔
170
}
171

172
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
4,991,510✔
173
  int32_t code = 0, lino = 0;
4,991,510✔
174
#ifdef USE_ROCKSDB
175
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
4,992,820✔
176
  if (NULL == cmp) {
4,992,422✔
177
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
178
  }
179

180
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
4,992,422✔
181
  pTsdb->rCache.tableoptions = tableoptions;
4,992,820✔
182

183
  rocksdb_options_t *options = rocksdb_options_create();
4,992,820✔
184
  if (NULL == options) {
4,992,043✔
185
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
186
  }
187

188
  rocksdb_options_set_create_if_missing(options, 1);
4,992,043✔
189
  rocksdb_options_set_comparator(options, cmp);
4,991,400✔
190
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
4,990,996✔
191
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
4,991,203✔
192
  // rocksdb_options_set_inplace_update_support(options, 1);
193
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
194

195
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
4,991,705✔
196
  if (NULL == writeoptions) {
4,989,130✔
197
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
198
  }
199
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
4,989,130✔
200

201
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
4,990,355✔
202
  if (NULL == readoptions) {
4,988,340✔
203
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
204
  }
205

206
  char *err = NULL;
4,988,340✔
207
  char  cachePath[TSDB_FILENAME_LEN] = {0};
4,989,416✔
208
  tsdbGetRocksPath(pTsdb, cachePath);
4,989,988✔
209

210
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
4,991,731✔
211
  if (NULL == db) {
4,992,689✔
212
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
213
    rocksdb_free(err);
×
214

215
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
216
  }
217

218
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
4,992,689✔
219
  if (NULL == flushoptions) {
4,992,381✔
220
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
221
  }
222

223
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
4,992,381✔
224

225
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
4,992,820✔
226

227
  pTsdb->rCache.writebatch = writebatch;
4,992,272✔
228
  pTsdb->rCache.my_comparator = cmp;
4,992,272✔
229
  pTsdb->rCache.options = options;
4,992,820✔
230
  pTsdb->rCache.writeoptions = writeoptions;
4,992,272✔
231
  pTsdb->rCache.readoptions = readoptions;
4,992,272✔
232
  pTsdb->rCache.flushoptions = flushoptions;
4,992,450✔
233
  pTsdb->rCache.db = db;
4,992,272✔
234
  pTsdb->rCache.sver = -1;
4,992,272✔
235
  pTsdb->rCache.suid = -1;
4,992,450✔
236
  pTsdb->rCache.uid = -1;
4,992,450✔
237
  pTsdb->rCache.pTSchema = NULL;
4,992,450✔
238
  pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
4,991,704✔
239
  if (!pTsdb->rCache.ctxArray) {
4,992,364✔
240
    TAOS_CHECK_GOTO(terrno, &lino, _err7);
×
241
  }
242

243
  TAOS_RETURN(code);
4,992,816✔
244

245
_err7:
×
246
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
×
247
_err6:
×
248
  rocksdb_writebatch_destroy(writebatch);
×
249
_err5:
×
250
  rocksdb_close(pTsdb->rCache.db);
×
251
_err4:
×
252
  rocksdb_readoptions_destroy(readoptions);
×
253
_err3:
×
254
  rocksdb_writeoptions_destroy(writeoptions);
×
255
_err2:
×
256
  rocksdb_options_destroy(options);
×
257
  rocksdb_block_based_options_destroy(tableoptions);
×
258
_err:
×
259
  rocksdb_comparator_destroy(cmp);
×
260
#endif
261
  TAOS_RETURN(code);
×
262
}
263

264
static void tsdbCloseRocksCache(STsdb *pTsdb) {
4,992,820✔
265
#ifdef USE_ROCKSDB
266
  rocksdb_close(pTsdb->rCache.db);
4,992,820✔
267
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
4,992,665✔
268
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
4,992,424✔
269
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
4,992,820✔
270
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
4,992,260✔
271
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
4,992,260✔
272
  rocksdb_options_destroy(pTsdb->rCache.options);
4,991,864✔
273
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
4,992,820✔
274
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
4,991,719✔
275
  taosMemoryFree(pTsdb->rCache.pTSchema);
4,992,820✔
276
  taosArrayDestroy(pTsdb->rCache.ctxArray);
4,991,408✔
277
#endif
278
}
4,992,796✔
279

280
static void rocksMayWrite(STsdb *pTsdb, bool force) {
65,261,167✔
281
#ifdef USE_ROCKSDB
282
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
65,261,167✔
283

284
  int count = rocksdb_writebatch_count(wb);
65,262,259✔
285
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
65,266,277✔
286
    char *err = NULL;
331,215✔
287

288
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
331,215✔
289
    if (NULL != err) {
331,215✔
290
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
291
                err);
292
      rocksdb_free(err);
×
293
    }
294

295
    rocksdb_writebatch_clear(wb);
331,215✔
296
  }
297
#endif
298
}
65,266,277✔
299

300
typedef struct {
301
  TSKEY  ts;
302
  int8_t dirty;
303
  struct {
304
    int16_t cid;
305
    int8_t  type;
306
    int8_t  flag;
307
    union {
308
      int64_t val;
309
      struct {
310
        uint32_t nData;
311
        uint8_t *pData;
312
      };
313
    } value;
314
  } colVal;
315
} SLastColV0;
316

317
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
219,800✔
318
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
219,800✔
319

320
  pLastCol->rowKey.ts = pLastColV0->ts;
219,800✔
321
  pLastCol->rowKey.numOfPKs = 0;
219,800✔
322
  pLastCol->dirty = pLastColV0->dirty;
219,800✔
323
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
219,800✔
324
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
219,800✔
325
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
219,800✔
326

327
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
219,306✔
328

329
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
219,306✔
330
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
13,302✔
331
    pLastCol->colVal.value.pData = NULL;
13,302✔
332
    if (pLastCol->colVal.value.nData > 0) {
13,302✔
333
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
4,940✔
334
    }
335
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
13,302✔
336
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
206,498✔
337
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
50,140✔
338
    pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
50,140✔
339
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
50,140✔
340
  } else {
341
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
156,358✔
342
    return sizeof(SLastColV0);
156,358✔
343
  }
344
}
345

346
int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
219,800✔
347
  if (!value) {
219,800✔
348
    return TSDB_CODE_INVALID_PARA;
×
349
  }
350

351
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
219,800✔
352
  if (NULL == pLastCol) {
219,800✔
353
    return terrno;
×
354
  }
355

356
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
219,800✔
357
  if (offset == size) {
219,800✔
358
    // version 0
359
    *ppLastCol = pLastCol;
×
360

361
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
362
  } else if (offset > size) {
219,800✔
363
    taosMemoryFreeClear(pLastCol);
×
364

365
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
366
  }
367

368
  // version - validate before read
369
  if (offset + sizeof(int8_t) > size) {
219,800✔
370
    taosMemoryFreeClear(pLastCol);
×
371
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
372
  }
373
  int8_t version = *(int8_t *)(value + offset);
219,800✔
374
  offset += sizeof(int8_t);
219,306✔
375

376
  // numOfPKs - validate before read
377
  if (offset + sizeof(uint8_t) > size) {
219,306✔
378
    taosMemoryFreeClear(pLastCol);
×
379
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
380
  }
381
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
219,306✔
382
  offset += sizeof(uint8_t);
219,306✔
383

384
  if (pLastCol->rowKey.numOfPKs > TD_MAX_PK_COLS) {
219,306✔
385
    taosMemoryFreeClear(pLastCol);
×
386
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
387
  }
388

389
  // pks
390
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
219,306✔
391
    // validate before reading SValue
392
    if (offset + sizeof(SValue) > size) {
×
393
      taosMemoryFreeClear(pLastCol);
×
394
      TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
395
    }
396
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
×
397
    offset += sizeof(SValue);
×
398

399
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
400
      pLastCol->rowKey.pks[i].pData = NULL;
×
401
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
402
        // validate before reading variable-length payload
403
        if (offset + pLastCol->rowKey.pks[i].nData > size) {
×
404
          taosMemoryFreeClear(pLastCol);
×
405
          TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
406
        }
407
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
408
        offset += pLastCol->rowKey.pks[i].nData;
×
409
      }
410
    }
411
  }
412

413
  if (version >= LAST_COL_VERSION_2) {
219,800✔
414
    // validate before reading cacheStatus
415
    if (offset + sizeof(uint8_t) > size) {
219,800✔
416
      taosMemoryFreeClear(pLastCol);
×
417
      TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
418
    }
419
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
219,800✔
420
    offset += sizeof(uint8_t);
219,306✔
421
  }
422

423
  // Final validation
424
  if (offset > size) {
219,306✔
425
    taosMemoryFreeClear(pLastCol);
×
426
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
427
  }
428

429
  *ppLastCol = pLastCol;
219,306✔
430

431
  TAOS_RETURN(TSDB_CODE_SUCCESS);
219,306✔
432
}
433

434
/*
435
typedef struct {
436
  SLastColV0 lastColV0;
437
  char       colData[];
438
  int8_t     version;
439
  uint8_t    numOfPKs;
440
  SValue     pks[0];
441
  char       pk0Data[];
442
  SValue     pks[1];
443
  char       pk1Data[];
444
  ...
445
} SLastColDisk;
446
*/
447
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
62,327,554✔
448
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
62,327,554✔
449

450
  pLastColV0->ts = pLastCol->rowKey.ts;
62,327,554✔
451
  pLastColV0->dirty = pLastCol->dirty;
62,329,040✔
452
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
62,328,662✔
453
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
62,329,754✔
454
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
62,336,328✔
455
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
62,335,228✔
456
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
1,526,102✔
457
    if (pLastCol->colVal.value.nData > 0) {
1,526,818✔
458
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
681,324✔
459
    }
460
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
1,526,818✔
461
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
60,813,271✔
462
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
669,960✔
463
    if (pLastCol->colVal.value.nData > 0) {
669,960✔
464
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
349,170✔
465
    }
466
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
669,960✔
467
  } else {
468
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
60,142,569✔
469
    return sizeof(SLastColV0);
60,143,311✔
470
  }
471

472
  return 0;
473
}
474

475
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
62,334,251✔
476
  *size = sizeof(SLastColV0);
62,334,251✔
477
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
62,334,979✔
478
    *size += pLastCol->colVal.value.nData;
1,529,032✔
479
  }
480
  if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
62,336,419✔
481
    *size += DECIMAL128_BYTES;
669,960✔
482
  }
483
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
62,336,797✔
484

485
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
66,660,059✔
486
    *size += sizeof(SValue);
4,323,992✔
487
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
4,323,250✔
488
      *size += pLastCol->rowKey.pks[i].nData;
1,433,684✔
489
    }
490
  }
491

492
  *value = taosMemoryMalloc(*size);
62,336,799✔
493
  if (NULL == *value) {
62,326,776✔
494
    TAOS_RETURN(terrno);
×
495
  }
496

497
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
62,326,804✔
498

499
  // version
500
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
62,339,725✔
501
  offset++;
62,340,089✔
502

503
  // numOfPKs
504
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
62,340,089✔
505
  offset++;
62,337,521✔
506

507
  // pks
508
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
66,661,513✔
509
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
4,323,250✔
510
    offset += sizeof(SValue);
4,323,992✔
511
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
4,323,992✔
512
      if (pLastCol->rowKey.pks[i].nData > 0) {
1,434,426✔
513
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
1,435,168✔
514
      }
515
      offset += pLastCol->rowKey.pks[i].nData;
1,435,168✔
516
    }
517
  }
518

519
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
62,337,523✔
520

521
  TAOS_RETURN(TSDB_CODE_SUCCESS);
62,338,257✔
522
}
523

524
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
525

526
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
67,282,015✔
527
  SLastCol *pLastCol = (SLastCol *)value;
67,282,015✔
528

529
  if (pLastCol->dirty) {
67,282,015✔
530
    STsdb *pTsdb = (STsdb *)ud;
57,488,254✔
531

532
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
57,488,254✔
533
    if (code) {
57,479,864✔
534
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
535
      return code;
×
536
    }
537

538
    pLastCol->dirty = 0;
57,479,864✔
539

540
    rocksMayWrite(pTsdb, false);
57,479,864✔
541
  }
542

543
  return 0;
67,292,619✔
544
}
545

546
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
26,571,718✔
547
  bool deleted = false;
26,571,718✔
548
  while (*iSkyline > 0) {
26,571,718✔
549
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
32,544✔
550
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
33,168✔
551

552
    if (key->ts > pItemBack->ts) {
33,168✔
553
      return false;
8,488✔
554
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
24,680✔
555
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
9,444✔
556
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
557
        return true;
9,444✔
558
      } else {
559
        if (*iSkyline > 1) {
×
560
          --*iSkyline;
×
561
        } else {
562
          return false;
×
563
        }
564
      }
565
    } else {
566
      if (*iSkyline > 1) {
15,236✔
567
        --*iSkyline;
×
568
      } else {
569
        return false;
15,236✔
570
      }
571
    }
572
  }
573

574
  return deleted;
26,538,196✔
575
}
576

577
// Get next non-deleted row from imem
578
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
15,620,851✔
579
  int32_t code = 0;
15,620,851✔
580

581
  if (tsdbTbDataIterNext(pTbIter)) {
15,620,851✔
582
    TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
15,479,206✔
583
    TSDBKEY  rowKey = TSDBROW_KEY(pMemRow);
15,479,206✔
584
    bool     deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
15,479,570✔
585
    if (!deleted) {
15,478,110✔
586
      return pMemRow;
15,478,476✔
587
    }
588
  }
589

590
  return NULL;
143,111✔
591
}
592

593
// Get first non-deleted row from imem
594
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
8,861,113✔
595
                                    int64_t *piSkyline) {
596
  int32_t code = 0;
8,861,113✔
597

598
  tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
8,861,113✔
599
  TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
8,862,577✔
600
  if (pMemRow) {
8,862,577✔
601
    // if non deleted, return the found row.
602
    TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
8,862,577✔
603
    bool    deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
8,863,677✔
604
    if (!deleted) {
8,863,677✔
605
      return pMemRow;
8,860,737✔
606
    }
607
  } else {
608
    return NULL;
×
609
  }
610

611
  // continue to find the non-deleted first row from imem, using get next row
612
  return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
2,940✔
613
}
614

615
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
63,905✔
616
  SRocksCache *pRCache = &pTsdb->rCache;
63,905✔
617
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
63,905✔
618

619
  if (suid > 0 && suid == pRCache->suid) {
827✔
620
    pRCache->sver = -1;
×
621
    pRCache->suid = -1;
×
622
  }
623
  if (suid == 0 && uid == pRCache->uid) {
827✔
624
    pRCache->sver = -1;
827✔
625
    pRCache->uid = -1;
827✔
626
  }
627
}
628

629
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
15,620,109✔
630
  SRocksCache *pRCache = &pTsdb->rCache;
15,620,109✔
631
  if (pRCache->pTSchema && sver == pRCache->sver) {
15,620,109✔
632
    if (suid > 0 && suid == pRCache->suid) {
15,575,133✔
633
      return 0;
14,752,432✔
634
    }
635
    if (suid == 0 && uid == pRCache->uid) {
822,335✔
636
      return 0;
617,861✔
637
    }
638
  }
639

640
  pRCache->suid = suid;
249,450✔
641
  pRCache->uid = uid;
249,816✔
642
  pRCache->sver = sver;
249,816✔
643
  tDestroyTSchema(pRCache->pTSchema);
249,816✔
644
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema);
249,816✔
645
}
646

647
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
648

649
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
8,863,309✔
650
  int32_t      code = 0;
8,863,309✔
651
  int32_t      lino = 0;
8,863,309✔
652
  STsdb       *pTsdb = imem->pTsdb;
8,863,309✔
653
  SArray      *pMemDelData = NULL;
8,864,041✔
654
  SArray      *pSkyline = NULL;
8,863,675✔
655
  int64_t      iSkyline = 0;
8,863,675✔
656
  STbDataIter  tbIter = {0};
8,863,675✔
657
  TSDBROW     *pMemRow = NULL;
8,863,675✔
658
  STSchema    *pTSchema = NULL;
8,863,675✔
659
  SSHashObj   *iColHash = NULL;
8,863,675✔
660
  int32_t      sver;
661
  int32_t      nCol;
662
  SArray      *ctxArray = pTsdb->rCache.ctxArray;
8,863,675✔
663
  STsdbRowKey  tsdbRowKey = {0};
8,862,943✔
664
  STSDBRowIter iter = {0};
8,863,675✔
665

666
  STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
8,863,675✔
667

668
  // load imem tomb data and build skyline
669
  TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
8,863,675✔
670

671
  // tsdbBuildDeleteSkyline
672
  size_t delSize = TARRAY_SIZE(pMemDelData);
8,862,943✔
673
  if (delSize > 0) {
8,862,943✔
674
    pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
4,188✔
675
    if (!pSkyline) {
4,188✔
676
      TAOS_CHECK_EXIT(terrno);
×
677
    }
678

679
    TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline));
4,188✔
680
    iSkyline = taosArrayGetSize(pSkyline) - 1;
4,188✔
681
  }
682

683
  pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
8,862,943✔
684
  if (!pMemRow) {
8,863,677✔
685
    goto _exit;
×
686
  }
687

688
  // iter first row to last_row/last col values to ctxArray, and mark last null col ids
689
  sver = TSDBROW_SVERSION(pMemRow);
8,863,677✔
690
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
8,863,677✔
691
  pTSchema = pTsdb->rCache.pTSchema;
8,863,675✔
692
  nCol = pTSchema->numOfCols;
8,863,675✔
693

694
  tsdbRowGetKey(pMemRow, &tsdbRowKey);
8,863,675✔
695

696
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
8,862,579✔
697

698
  int32_t iCol = 0;
8,862,577✔
699
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
38,629,111✔
700
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
29,763,253✔
701
    if (!taosArrayPush(ctxArray, &updateCtx)) {
29,767,999✔
702
      TAOS_CHECK_EXIT(terrno);
×
703
    }
704

705
    if (COL_VAL_IS_VALUE(pColVal)) {
29,767,999✔
706
      updateCtx.lflag = LFLAG_LAST;
26,928,297✔
707
      if (!taosArrayPush(ctxArray, &updateCtx)) {
26,926,832✔
708
        TAOS_CHECK_EXIT(terrno);
×
709
      }
710
    } else {
711
      if (!iColHash) {
2,840,068✔
712
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
139,744✔
713
        if (iColHash == NULL) {
139,744✔
714
          TAOS_CHECK_EXIT(terrno);
×
715
        }
716
      }
717

718
      if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
2,840,068✔
719
        TAOS_CHECK_EXIT(terrno);
×
720
      }
721
    }
722
  }
723
  tsdbRowClose(&iter);
8,862,209✔
724

725
  // continue to get next row to fill null last col values
726
  pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
8,863,675✔
727
  while (pMemRow) {
15,618,281✔
728
    if (tSimpleHashGetSize(iColHash) == 0) {
15,475,170✔
729
      break;
8,717,274✔
730
    }
731

732
    sver = TSDBROW_SVERSION(pMemRow);
6,757,164✔
733
    TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
6,756,798✔
734
    pTSchema = pTsdb->rCache.pTSchema;
6,756,798✔
735

736
    STsdbRowKey tsdbRowKey = {0};
6,756,798✔
737
    tsdbRowGetKey(pMemRow, &tsdbRowKey);
6,756,798✔
738

739
    TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
6,756,798✔
740

741
    int32_t iCol = 0;
6,756,798✔
742
    for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
180,452,469✔
743
      if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
173,706,801✔
744
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
1,660,109✔
745
        if (!taosArrayPush(ctxArray, &updateCtx)) {
1,660,109✔
746
          TAOS_CHECK_EXIT(terrno);
×
747
        }
748

749
        TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
1,660,109✔
750
      }
751
    }
752
    tsdbRowClose(&iter);
6,749,378✔
753

754
    pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
6,756,798✔
755
  }
756

757
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
8,860,387✔
758

759
_exit:
8,861,479✔
760
  if (code) {
8,862,943✔
761
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
762

763
    tsdbRowClose(&iter);
×
764
  }
765

766
  taosArrayClear(ctxArray);
8,862,943✔
767
  // destroy any allocated resource
768
  tSimpleHashCleanup(iColHash);
8,862,943✔
769
  if (pMemDelData) {
8,862,943✔
770
    taosArrayDestroy(pMemDelData);
8,862,943✔
771
  }
772
  if (pSkyline) {
8,862,577✔
773
    taosArrayDestroy(pSkyline);
4,188✔
774
  }
775

776
  TAOS_RETURN(code);
8,862,577✔
777
}
778

779
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
81,933✔
780
  if (!pTsdb) return 0;
81,933✔
781
  if (!pTsdb->imem) return 0;
81,933✔
782

783
  int32_t    code = 0;
62,497✔
784
  int32_t    lino = 0;
62,497✔
785
  SMemTable *imem = pTsdb->imem;
62,497✔
786
  int32_t    nTbData = imem->nTbData;
62,497✔
787
  int64_t    nRow = imem->nRow;
62,497✔
788
  int64_t    nDel = imem->nDel;
62,497✔
789

790
  if (nRow == 0 || nTbData == 0) return 0;
62,497✔
791

792
  TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
62,497✔
793

794
_exit:
62,497✔
795
  if (code) {
62,497✔
796
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
797
  } else {
798
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
62,497✔
799
  }
800

801
  TAOS_RETURN(code);
62,497✔
802
}
803

804
int32_t tsdbCacheCommit(STsdb *pTsdb) {
81,933✔
805
  int32_t code = 0;
81,933✔
806

807
  // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
808
  // flush dirty data of lru into rocks
809
  // 4, and update when writing if !updateCacheBatch
810
  // 5, merge cache & mem if updateCacheBatch
811

812
  if (tsUpdateCacheBatch) {
81,933✔
813
    code = tsdbCacheUpdateFromIMem(pTsdb);
81,933✔
814
    if (code) {
81,933✔
815
      tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
816

817
      TAOS_RETURN(code);
×
818
    }
819
  }
820

821
  char      *err = NULL;
81,933✔
822
  SLRUCache *pCache = pTsdb->lruCache;
81,933✔
823
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
824

825
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
81,933✔
826

827
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
81,933✔
828

829
#ifdef USE_ROCKSDB
830
  rocksMayWrite(pTsdb, true);
81,933✔
831
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
81,933✔
832
#endif
833
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
81,933✔
834
#ifdef USE_ROCKSDB
835
  if (NULL != err) {
81,933✔
836
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
837
    rocksdb_free(err);
×
838
    code = TSDB_CODE_FAILED;
×
839
  }
840
#endif
841
  TAOS_RETURN(code);
81,933✔
842
}
843

844
static int32_t reallocVarDataVal(SValue *pValue) {
5,510,920✔
845
  if (IS_VAR_DATA_TYPE(pValue->type)) {
5,510,920✔
846
    uint8_t *pVal = pValue->pData;
5,510,920✔
847
    uint32_t nData = pValue->nData;
5,510,920✔
848
    if (nData > 0) {
5,510,920✔
849
      uint8_t *p = taosMemoryMalloc(nData);
4,139,378✔
850
      if (!p) {
4,140,862✔
851
        TAOS_RETURN(terrno);
×
852
      }
853
      pValue->pData = p;
4,140,862✔
854
      (void)memcpy(pValue->pData, pVal, nData);
4,140,035✔
855
    } else {
856
      pValue->pData = NULL;
1,371,542✔
857
    }
858
  }
859

860
  TAOS_RETURN(TSDB_CODE_SUCCESS);
5,511,662✔
861
}
862

863
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
2,949,564✔
864

865
// realloc pk data and col data.
866
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
125,017,066✔
867
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
125,017,066✔
868
  size_t  charge = sizeof(SLastCol);
125,017,066✔
869

870
  int8_t i = 0;
125,017,066✔
871
  for (; i < pCol->rowKey.numOfPKs; i++) {
133,522,222✔
872
    SValue *pValue = &pCol->rowKey.pks[i];
8,503,672✔
873
    if (IS_VAR_DATA_TYPE(pValue->type)) {
8,502,930✔
874
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
2,561,356✔
875
      charge += pValue->nData;
2,562,098✔
876
    }
877
  }
878

879
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
125,015,860✔
880
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
2,936,328✔
881
    charge += pCol->colVal.value.nData;
2,949,479✔
882
  }
883

884
  if (pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
125,053,368✔
885
    if (pCol->colVal.value.nData > 0) {
909,960✔
886
      void *p = taosMemoryMalloc(pCol->colVal.value.nData);
444,769✔
887
      if (!p) TAOS_CHECK_EXIT(terrno);
444,769✔
888
      (void)memcpy(p, pCol->colVal.value.pData, pCol->colVal.value.nData);
444,769✔
889
      pCol->colVal.value.pData = p;
444,769✔
890
    }else {
891
      pCol->colVal.value.pData = NULL;
465,191✔
892
    }
893
    charge += pCol->colVal.value.nData;
909,960✔
894
  }
895

896
  if (pCharge) {
125,050,822✔
897
    *pCharge = charge;
114,638,051✔
898
  }
899

900
_exit:
10,412,771✔
901
  if (TSDB_CODE_SUCCESS != code) {
125,057,415✔
902
    for (int8_t j = 0; j < i; j++) {
×
903
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
904
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
905
      }
906
    }
907

908
    (void)memset(pCol, 0, sizeof(SLastCol));
×
909
  }
910

911
  TAOS_RETURN(code);
125,057,415✔
912
}
913

914
void tsdbCacheFreeSLastColItem(void *pItem) {
11,153,344✔
915
  SLastCol *pCol = (SLastCol *)pItem;
11,153,344✔
916
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
15,690,426✔
917
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
4,532,630✔
918
      taosMemoryFree(pCol->rowKey.pks[i].pData);
1,220,366✔
919
    }
920
  }
921

922
  if ((IS_VAR_DATA_TYPE(pCol->colVal.value.type) || pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) &&
11,152,762✔
923
      pCol->colVal.value.pData) {
1,477,575✔
924
    taosMemoryFree(pCol->colVal.value.pData);
1,082,523✔
925
  }
926
}
11,160,258✔
927

928
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
114,633,079✔
929
  SLastCol *pLastCol = (SLastCol *)value;
114,633,079✔
930

931
  if (pLastCol->dirty) {
114,633,079✔
932
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
2,465,504✔
933
      STsdb *pTsdb = (STsdb *)ud;
×
934
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
935
    }
936
  }
937

938
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
118,945,030✔
939
    SValue *pValue = &pLastCol->rowKey.pks[i];
4,323,250✔
940
    if (IS_VAR_DATA_TYPE(pValue->type)) {
4,323,992✔
941
      taosMemoryFree(pValue->pData);
1,435,168✔
942
    }
943
  }
944

945
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) ||
114,607,872✔
946
      pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL /* && pLastCol->colVal.value.nData > 0*/) {
112,925,927✔
947
    taosMemoryFree(pLastCol->colVal.value.pData);
2,483,432✔
948
  }
949

950
  taosMemoryFree(value);
114,638,949✔
951
}
114,588,837✔
952

953
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
53,766,688✔
954
  SLastCol *pLastCol = (SLastCol *)value;
53,766,688✔
955
  pLastCol->dirty = 0;
53,766,688✔
956
}
53,779,114✔
957

958
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
959

960
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
55,958,121✔
961
  int32_t code = 0, lino = 0;
55,958,121✔
962

963
  SLRUCache *pCache = pTsdb->lruCache;
55,958,121✔
964
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
965
  SRowKey  emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
55,958,302✔
966
  SLastCol emptyCol = {
55,958,302✔
967
      .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
968

969
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
55,958,171✔
970
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
55,959,040✔
971
  if (code) {
55,957,604✔
972
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
973
  }
974

975
  TAOS_RETURN(code);
55,957,604✔
976
}
977

978
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
1,091,377✔
979
  int32_t code = 0;
1,091,377✔
980
  char   *err = NULL;
1,091,377✔
981

982
  SLRUCache *pCache = pTsdb->lruCache;
1,092,522✔
983
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
984

985
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
1,091,850✔
986
#ifdef USE_ROCKSDB
987
  rocksMayWrite(pTsdb, true);
1,093,928✔
988
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
1,093,928✔
989
  if (NULL != err) {
1,093,928✔
990
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
991
    rocksdb_free(err);
×
992
    code = TSDB_CODE_FAILED;
×
993
  }
994
#endif
995
  TAOS_RETURN(code);
1,093,928✔
996
}
997

998
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
10,040,692✔
999
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
1000
#ifdef USE_ROCKSDB
1001
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
10,040,692✔
1002
  if (!valuesList) return terrno;
10,041,635✔
1003
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
10,041,635✔
1004
  if (!valuesListSizes) {
10,042,003✔
1005
    taosMemoryFreeClear(valuesList);
×
1006
    return terrno;
×
1007
  }
1008
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
10,042,003✔
1009
  if (!errs) {
10,043,861✔
1010
    taosMemoryFreeClear(valuesList);
×
1011
    taosMemoryFreeClear(valuesListSizes);
×
1012
    return terrno;
×
1013
  }
1014
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
10,043,861✔
1015
                    valuesListSizes, errs);
1016
  for (size_t i = 0; i < numKeys; ++i) {
46,876,891✔
1017
    rocksdb_free(errs[i]);
36,833,443✔
1018
  }
1019
  taosMemoryFreeClear(errs);
10,043,448✔
1020

1021
  *pppValuesList = valuesList;
10,043,975✔
1022
  *ppValuesListSizes = valuesListSizes;
10,044,965✔
1023
#endif
1024
  TAOS_RETURN(TSDB_CODE_SUCCESS);
10,044,965✔
1025
}
1026

1027
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
7,270,194✔
1028
  int32_t code = 0;
7,270,194✔
1029

1030
  // build keys & multi get from rocks
1031
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
7,270,194✔
1032
  if (!keys_list) {
7,275,045✔
1033
    return terrno;
×
1034
  }
1035
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
7,275,045✔
1036
  if (!keys_list_sizes) {
7,273,894✔
1037
    taosMemoryFree(keys_list);
×
1038
    return terrno;
×
1039
  }
1040
  const size_t klen = ROCKS_KEY_LEN;
7,273,894✔
1041

1042
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
7,273,894✔
1043
  if (!keys) {
7,271,343✔
1044
    taosMemoryFree(keys_list);
×
1045
    taosMemoryFree(keys_list_sizes);
×
1046
    return terrno;
×
1047
  }
1048
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
7,271,343✔
1049
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
7,270,638✔
1050

1051
  keys_list[0] = keys;
7,271,002✔
1052
  keys_list[1] = keys + sizeof(SLastKey);
7,271,338✔
1053
  keys_list_sizes[0] = klen;
7,270,772✔
1054
  keys_list_sizes[1] = klen;
7,271,506✔
1055

1056
  char  **values_list = NULL;
7,271,351✔
1057
  size_t *values_list_sizes = NULL;
7,271,351✔
1058

1059
  // was written by caller
1060
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
1061

1062
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
7,270,032✔
1063
                                              &values_list_sizes),
1064
                  NULL, _exit);
1065
#ifdef USE_ROCKSDB
1066
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
7,275,213✔
1067
#endif
1068
  {
1069
#ifdef USE_ROCKSDB
1070
    SLastCol *pLastCol = NULL;
7,275,213✔
1071
    if (values_list[0] != NULL) {
7,274,258✔
1072
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
78,970✔
1073
      if (code != TSDB_CODE_SUCCESS) {
78,970✔
1074
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1075
                  tstrerror(code));
1076
        goto _exit;
×
1077
      }
1078
      if (NULL != pLastCol) {
78,970✔
1079
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
78,970✔
1080
      }
1081
      taosMemoryFreeClear(pLastCol);
78,970✔
1082
    }
1083

1084
    pLastCol = NULL;
7,274,258✔
1085
    if (values_list[1] != NULL) {
7,274,258✔
1086
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
78,970✔
1087
      if (code != TSDB_CODE_SUCCESS) {
78,970✔
1088
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1089
                  tstrerror(code));
1090
        goto _exit;
×
1091
      }
1092
      if (NULL != pLastCol) {
78,970✔
1093
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
78,970✔
1094
      }
1095
      taosMemoryFreeClear(pLastCol);
78,970✔
1096
    }
1097

1098
    rocksdb_free(values_list[0]);
7,273,888✔
1099
    rocksdb_free(values_list[1]);
7,275,219✔
1100
#endif
1101

1102
    for (int i = 0; i < 2; i++) {
21,817,195✔
1103
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
14,544,202✔
1104
      if (h) {
14,545,957✔
1105
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
157,940✔
1106
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
157,940✔
1107
      }
1108
    }
1109
  }
1110

1111
_exit:
7,273,363✔
1112
  taosMemoryFree(keys_list[0]);
7,273,363✔
1113

1114
  taosMemoryFree(keys_list);
7,272,784✔
1115
  taosMemoryFree(keys_list_sizes);
7,272,205✔
1116
  taosMemoryFree(values_list);
7,272,246✔
1117
  taosMemoryFree(values_list_sizes);
7,272,569✔
1118

1119
  TAOS_RETURN(code);
7,269,956✔
1120
}
1121

1122
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
9,022,761✔
1123
  int32_t code = 0;
9,022,761✔
1124

1125
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
9,022,761✔
1126

1127
  if (suid < 0) {
9,022,462✔
1128
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
128,325✔
1129
      int16_t cid = pSchemaRow->pSchema[i].colId;
110,409✔
1130
      int8_t  col_type = pSchemaRow->pSchema[i].type;
110,409✔
1131

1132
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
110,409✔
1133
      if (code != TSDB_CODE_SUCCESS) {
110,409✔
1134
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1135
                  tstrerror(code));
1136
      }
1137
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
110,409✔
1138
      if (code != TSDB_CODE_SUCCESS) {
110,409✔
1139
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1140
                  tstrerror(code));
1141
      }
1142
    }
1143
  } else {
1144
    STSchema *pTSchema = NULL;
9,004,546✔
1145
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
9,004,546✔
1146
    if (code != TSDB_CODE_SUCCESS) {
9,003,817✔
1147
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1148

1149
      TAOS_RETURN(code);
×
1150
    }
1151

1152
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
36,773,270✔
1153
      int16_t cid = pTSchema->columns[i].colId;
27,769,154✔
1154
      int8_t  col_type = pTSchema->columns[i].type;
27,768,051✔
1155

1156
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
27,767,994✔
1157
      if (code != TSDB_CODE_SUCCESS) {
27,767,200✔
1158
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1159
                  tstrerror(code));
1160
      }
1161
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
27,767,200✔
1162
      if (code != TSDB_CODE_SUCCESS) {
27,769,882✔
1163
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1164
                  tstrerror(code));
1165
      }
1166
    }
1167

1168
    taosMemoryFree(pTSchema);
9,004,844✔
1169
  }
1170

1171
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
9,022,826✔
1172

1173
  TAOS_RETURN(code);
9,022,462✔
1174
}
1175

1176
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
9,996✔
1177
  int32_t code = 0;
9,996✔
1178

1179
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
9,996✔
1180

1181
  code = tsdbCacheCommitNoLock(pTsdb);
9,996✔
1182
  if (code != TSDB_CODE_SUCCESS) {
9,996✔
1183
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1184
              tstrerror(code));
1185
  }
1186

1187
  if (pSchemaRow != NULL) {
9,996✔
1188
    bool hasPrimayKey = false;
×
1189
    int  nCols = pSchemaRow->nCols;
×
1190
    if (nCols >= 2) {
×
1191
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
1192
    }
1193
    for (int i = 0; i < nCols; ++i) {
×
1194
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
1195
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1196

1197
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1198
      if (code != TSDB_CODE_SUCCESS) {
×
1199
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1200
                  tstrerror(code));
1201
      }
1202
    }
1203
  } else {
1204
    STSchema *pTSchema = NULL;
9,996✔
1205
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
9,996✔
1206
    if (code != TSDB_CODE_SUCCESS) {
9,996✔
1207
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1208

1209
      TAOS_RETURN(code);
×
1210
    }
1211

1212
    bool hasPrimayKey = false;
9,996✔
1213
    int  nCols = pTSchema->numOfCols;
9,996✔
1214
    if (nCols >= 2) {
9,996✔
1215
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
9,996✔
1216
    }
1217
    for (int i = 0; i < nCols; ++i) {
38,826✔
1218
      int16_t cid = pTSchema->columns[i].colId;
28,830✔
1219
      int8_t  col_type = pTSchema->columns[i].type;
28,830✔
1220

1221
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
28,830✔
1222
      if (code != TSDB_CODE_SUCCESS) {
28,830✔
1223
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1224
                  tstrerror(code));
1225
      }
1226
    }
1227

1228
    taosMemoryFree(pTSchema);
9,996✔
1229
  }
1230

1231
  rocksMayWrite(pTsdb, false);
9,996✔
1232

1233
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
9,996✔
1234

1235
  TAOS_RETURN(code);
9,996✔
1236
}
1237

1238
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
1,071,543✔
1239
  int32_t code = 0;
1,071,543✔
1240

1241
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
1,071,543✔
1242

1243
  code = tsdbCacheCommitNoLock(pTsdb);
1,073,904✔
1244
  if (code != TSDB_CODE_SUCCESS) {
1,073,319✔
1245
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1246
              tstrerror(code));
1247
  }
1248

1249
  STSchema *pTSchema = NULL;
1,073,319✔
1250
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
1,072,585✔
1251
  if (code != TSDB_CODE_SUCCESS) {
1,073,319✔
1252
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
3,050✔
1253

1254
    TAOS_RETURN(code);
3,050✔
1255
  }
1256

1257
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
2,007,076✔
1258
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
936,807✔
1259

1260
    bool hasPrimayKey = false;
936,807✔
1261
    int  nCols = pTSchema->numOfCols;
936,807✔
1262
    if (nCols >= 2) {
936,807✔
1263
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
936,807✔
1264
    }
1265

1266
    for (int i = 0; i < nCols; ++i) {
8,125,561✔
1267
      int16_t cid = pTSchema->columns[i].colId;
7,188,754✔
1268
      int8_t  col_type = pTSchema->columns[i].type;
7,188,969✔
1269

1270
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
7,189,709✔
1271
      if (code != TSDB_CODE_SUCCESS) {
7,189,829✔
1272
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1273
                  tstrerror(code));
1274
      }
1275
    }
1276
  }
1277

1278
  taosMemoryFree(pTSchema);
1,070,269✔
1279

1280
  rocksMayWrite(pTsdb, false);
1,070,854✔
1281

1282
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
1,070,854✔
1283

1284
  TAOS_RETURN(code);
1,070,854✔
1285
}
1286

1287
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
×
1288
  int32_t code = 0;
×
1289

1290
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1291

1292
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
1293
  if (code != TSDB_CODE_SUCCESS) {
×
1294
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1295
              tstrerror(code));
1296
  }
1297
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
1298
  if (code != TSDB_CODE_SUCCESS) {
×
1299
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1300
              tstrerror(code));
1301
  }
1302
  // rocksMayWrite(pTsdb, true, false, false);
1303
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1304

1305
  TAOS_RETURN(code);
×
1306
}
1307

1308
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
×
1309
  int32_t code = 0;
×
1310

1311
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1312

1313
  code = tsdbCacheCommitNoLock(pTsdb);
×
1314
  if (code != TSDB_CODE_SUCCESS) {
×
1315
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1316
              tstrerror(code));
1317
  }
1318

1319
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1320
  if (code != TSDB_CODE_SUCCESS) {
×
1321
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1322
              tstrerror(code));
1323
  }
1324

1325
  rocksMayWrite(pTsdb, false);
×
1326

1327
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1328

1329
  TAOS_RETURN(code);
×
1330
}
1331

1332
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
20,056✔
1333
  int32_t code = 0;
20,056✔
1334

1335
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
20,056✔
1336

1337
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
120,336✔
1338
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
100,280✔
1339

1340
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
100,280✔
1341
    if (code != TSDB_CODE_SUCCESS) {
100,280✔
1342
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1343
                tstrerror(code));
1344
    }
1345
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
100,280✔
1346
    if (code != TSDB_CODE_SUCCESS) {
100,280✔
1347
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1348
                tstrerror(code));
1349
    }
1350
  }
1351

1352
  // rocksMayWrite(pTsdb, true, false, false);
1353
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
20,056✔
1354
  TAOS_RETURN(code);
20,056✔
1355
}
1356

1357
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
10,028✔
1358
  int32_t code = 0;
10,028✔
1359

1360
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
10,028✔
1361

1362
  code = tsdbCacheCommitNoLock(pTsdb);
10,028✔
1363
  if (code != TSDB_CODE_SUCCESS) {
10,028✔
1364
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1365
              tstrerror(code));
1366
  }
1367

1368
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
60,168✔
1369
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
50,140✔
1370

1371
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
50,140✔
1372
    if (code != TSDB_CODE_SUCCESS) {
50,140✔
1373
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1374
                tstrerror(code));
1375
    }
1376
  }
1377

1378
  rocksMayWrite(pTsdb, false);
10,028✔
1379

1380
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
10,028✔
1381

1382
  TAOS_RETURN(code);
10,028✔
1383
}
1384

1385
typedef struct {
1386
  int      idx;
1387
  SLastKey key;
1388
} SIdxKey;
1389

1390
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
×
1391
  // update rowkey
1392
  pLastCol->rowKey.ts = TSKEY_MIN;
×
1393
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
1394
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
1395
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
1396
      taosMemoryFreeClear(pPKValue->pData);
×
1397
      pPKValue->nData = 0;
×
1398
    } else {
1399
      valueClearDatum(pPKValue, pPKValue->type);
×
1400
    }
1401
  }
1402
  pLastCol->rowKey.numOfPKs = 0;
×
1403

1404
  // update colval
1405
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
×
1406
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
1407
    pLastCol->colVal.value.nData = 0;
×
1408
  } else {
1409
    valueClearDatum(&pLastCol->colVal.value, pLastCol->colVal.value.type);
×
1410
  }
1411

1412
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
×
1413
  pLastCol->dirty = 1;
×
1414
  pLastCol->cacheStatus = cacheStatus;
×
1415
}
×
1416

1417
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
62,324,773✔
1418
  int32_t code = 0;
62,324,773✔
1419
#ifdef USE_ROCKSDB
1420
  char  *rocks_value = NULL;
62,324,773✔
1421
  size_t vlen = 0;
62,330,951✔
1422

1423
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
62,333,887✔
1424
  if (code) {
62,333,125✔
1425
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
1426
    TAOS_RETURN(code);
×
1427
  }
1428

1429
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
62,333,125✔
1430
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
62,334,597✔
1431
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
62,340,795✔
1432
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
62,309,740✔
1433

1434
  taosMemoryFree(rocks_value);
62,334,957✔
1435
#endif
1436
  TAOS_RETURN(code);
62,318,076✔
1437
}
1438

1439
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
114,636,207✔
1440
  int32_t code = 0, lino = 0;
114,636,207✔
1441

1442
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
114,636,207✔
1443
  if (!pLRULastCol) {
114,568,007✔
1444
    return terrno;
×
1445
  }
1446

1447
  size_t charge = 0;
114,568,007✔
1448
  *pLRULastCol = *pLastCol;
114,581,855✔
1449
  pLRULastCol->dirty = dirty;
114,592,827✔
1450
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
114,599,349✔
1451

1452
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
114,641,651✔
1453
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
1454
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
114,647,355✔
1455
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
1456
    code = TSDB_CODE_FAILED;
×
1457
    pLRULastCol = NULL;
×
1458
  }
1459

1460
_exit:
114,647,355✔
1461
  if (TSDB_CODE_SUCCESS != code) {
114,646,615✔
1462
    taosMemoryFree(pLRULastCol);
×
1463
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1464
  }
1465

1466
  TAOS_RETURN(code);
114,646,615✔
1467
}
1468

1469
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
8,860,749✔
1470
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
8,860,749✔
1471
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1472
  }
1473

1474
  int32_t code = 0, lino = 0;
8,862,211✔
1475

1476
  int        num_keys = TARRAY_SIZE(updCtxArray);
8,862,211✔
1477
  SArray    *remainCols = NULL;
8,862,211✔
1478
  SLRUCache *pCache = pTsdb->lruCache;
8,862,211✔
1479

1480
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
8,862,943✔
1481
  for (int i = 0; i < num_keys; ++i) {
67,172,876✔
1482
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
58,311,763✔
1483
    int8_t          lflag = updCtx->lflag;
58,312,891✔
1484
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
58,313,257✔
1485
    SColVal        *pColVal = &updCtx->colVal;
58,313,253✔
1486

1487
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
58,318,727✔
1488
      continue;
×
1489
    }
1490

1491
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
58,319,459✔
1492
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
58,324,951✔
1493
    if (h) {
58,355,307✔
1494
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
53,791,818✔
1495
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
53,787,796✔
1496
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
53,783,367✔
1497
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
53,783,731✔
1498
          SLastCol newLastCol = {
53,780,389✔
1499
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1500
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
53,779,293✔
1501
        }
1502
      }
1503

1504
      tsdbLRUCacheRelease(pCache, h, false);
53,777,171✔
1505
      TAOS_CHECK_EXIT(code);
53,753,766✔
1506
    } else {
1507
      if (!remainCols) {
4,563,489✔
1508
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
117,526✔
1509
        if (!remainCols) {
117,526✔
1510
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1511
        }
1512
      }
1513
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
9,126,978✔
1514
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1515
      }
1516
    }
1517
  }
1518

1519
  if (remainCols) {
8,861,113✔
1520
    num_keys = TARRAY_SIZE(remainCols);
117,526✔
1521
  }
1522
  if (remainCols && num_keys > 0) {
8,861,113✔
1523
    char  **keys_list = NULL;
117,526✔
1524
    size_t *keys_list_sizes = NULL;
117,526✔
1525
    char  **values_list = NULL;
117,526✔
1526
    size_t *values_list_sizes = NULL;
117,526✔
1527
    char  **errs = NULL;
117,526✔
1528
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
117,526✔
1529
    if (!keys_list) {
117,526✔
1530
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1531
      return terrno;
×
1532
    }
1533
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
117,526✔
1534
    if (!keys_list_sizes) {
117,526✔
1535
      taosMemoryFree(keys_list);
×
1536
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1537
      return terrno;
×
1538
    }
1539
    for (int i = 0; i < num_keys; ++i) {
4,681,015✔
1540
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
4,563,489✔
1541

1542
      keys_list[i] = (char *)&idxKey->key;
4,563,489✔
1543
      keys_list_sizes[i] = ROCKS_KEY_LEN;
4,563,489✔
1544
    }
1545

1546
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
117,526✔
1547

1548
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
117,526✔
1549
                                       &values_list_sizes);
1550
    if (code) {
117,526✔
1551
      taosMemoryFree(keys_list);
×
1552
      taosMemoryFree(keys_list_sizes);
×
1553
      goto _exit;
×
1554
    }
1555

1556
    // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
1557
    for (int i = 0; i < num_keys; ++i) {
4,681,015✔
1558
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
4,563,489✔
1559
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
4,563,489✔
1560
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
4,563,489✔
1561
      SColVal        *pColVal = &updCtx->colVal;
4,563,489✔
1562

1563
      SLastCol *pLastCol = NULL;
4,563,489✔
1564
      if (values_list[i] != NULL) {
4,563,489✔
1565
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
1566
        if (code != TSDB_CODE_SUCCESS) {
×
1567
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1568
                    tstrerror(code));
1569
          goto _exit;
×
1570
        }
1571
      }
1572
      /*
1573
      if (code) {
1574
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1575
      }
1576
      */
1577
      SLastCol *pToFree = pLastCol;
4,563,489✔
1578

1579
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
4,563,489✔
1580
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
1581
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1582
                    tstrerror(code));
1583
          taosMemoryFreeClear(pToFree);
×
1584
          break;
×
1585
        }
1586

1587
        // cache invalid => skip update
1588
        taosMemoryFreeClear(pToFree);
×
1589
        continue;
×
1590
      }
1591

1592
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
4,563,489✔
1593
        taosMemoryFreeClear(pToFree);
×
1594
        continue;
×
1595
      }
1596

1597
      int32_t cmp_res = 1;
4,563,489✔
1598
      if (pLastCol) {
4,563,489✔
1599
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
×
1600
      }
1601

1602
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
4,563,489✔
1603
        SLastCol lastColTmp = {
4,563,489✔
1604
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
1605
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
4,563,489✔
1606
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1607
                    tstrerror(code));
1608
          taosMemoryFreeClear(pToFree);
×
1609
          break;
×
1610
        }
1611
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
4,563,489✔
1612
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1613
                    tstrerror(code));
1614
          taosMemoryFreeClear(pToFree);
×
1615
          break;
×
1616
        }
1617
      }
1618

1619
      taosMemoryFreeClear(pToFree);
4,563,489✔
1620
    }
1621

1622
    rocksMayWrite(pTsdb, false);
117,526✔
1623

1624
    taosMemoryFree(keys_list);
117,526✔
1625
    taosMemoryFree(keys_list_sizes);
117,526✔
1626
    if (values_list) {
117,526✔
1627
#ifdef USE_ROCKSDB
1628
      for (int i = 0; i < num_keys; ++i) {
4,681,015✔
1629
        rocksdb_free(values_list[i]);
4,563,489✔
1630
      }
1631
#endif
1632
      taosMemoryFree(values_list);
117,526✔
1633
    }
1634
    taosMemoryFree(values_list_sizes);
117,526✔
1635
  }
1636

1637
_exit:
8,853,429✔
1638
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
8,862,211✔
1639
  taosArrayDestroy(remainCols);
8,863,675✔
1640

1641
  if (code) {
8,863,675✔
1642
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1643
              tstrerror(code));
1644
  }
1645

1646
  TAOS_RETURN(code);
8,863,675✔
1647
}
1648

1649
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
×
1650
                                 SRow **aRow) {
1651
  int32_t code = 0, lino = 0;
×
1652

1653
  // 1. prepare last
1654
  TSDBROW      lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
×
1655
  STSchema    *pTSchema = NULL;
×
1656
  int32_t      sver = TSDBROW_SVERSION(&lRow);
×
1657
  SSHashObj   *iColHash = NULL;
×
1658
  STSDBRowIter iter = {0};
×
1659

1660
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
1661
  pTSchema = pTsdb->rCache.pTSchema;
×
1662

1663
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
×
1664
  int32_t nCol = pTSchema->numOfCols;
×
1665
  SArray *ctxArray = pTsdb->rCache.ctxArray;
×
1666

1667
  // 1. prepare by lrow
1668
  STsdbRowKey tsdbRowKey = {0};
×
1669
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1670

1671
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1672

1673
  int32_t iCol = 0;
×
1674
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
1675
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1676
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1677
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1678
    }
1679

1680
    if (COL_VAL_IS_VALUE(pColVal)) {
×
1681
      updateCtx.lflag = LFLAG_LAST;
×
1682
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1683
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1684
      }
1685
    } else {
1686
      if (!iColHash) {
×
1687
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
1688
        if (iColHash == NULL) {
×
1689
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1690
        }
1691
      }
1692

1693
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
×
1694
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1695
      }
1696
    }
1697
  }
1698

1699
  // 2. prepare by the other rows
1700
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
×
1701
    if (tSimpleHashGetSize(iColHash) == 0) {
×
1702
      break;
×
1703
    }
1704

1705
    tRow.pTSRow = aRow[iRow];
×
1706

1707
    STsdbRowKey tsdbRowKey = {0};
×
1708
    tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1709

1710
    void   *pIte = NULL;
×
1711
    int32_t iter = 0;
×
1712
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
×
1713
      int32_t iCol = ((int32_t *)pIte)[0];
×
1714
      SColVal colVal = COL_VAL_NONE(0, 0);
×
1715
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
×
1716

1717
      if (COL_VAL_IS_VALUE(&colVal)) {
×
1718
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1719
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1720
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1721
        }
1722
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
1723
        if (code != TSDB_CODE_SUCCESS) {
×
1724
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1725
                    __LINE__, tstrerror(code));
1726
        }
1727
      }
1728
    }
1729
  }
1730

1731
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1732

1733
_exit:
×
1734
  if (code) {
×
1735
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1736
  }
1737

1738
  tsdbRowClose(&iter);
×
1739
  tSimpleHashCleanup(iColHash);
×
1740
  taosArrayClear(ctxArray);
×
1741

1742
  TAOS_RETURN(code);
×
1743
}
1744

1745
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
1746
  int32_t      code = 0, lino = 0;
×
1747
  STSDBRowIter iter = {0};
×
1748
  STSchema    *pTSchema = NULL;
×
1749
  SArray      *ctxArray = NULL;
×
1750

1751
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
1752
  int32_t sver = TSDBROW_SVERSION(&lRow);
×
1753

1754
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1755

1756
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
1757
  if (ctxArray == NULL) {
×
1758
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1759
  }
1760

1761
  // 1. prepare last
1762
  STsdbRowKey tsdbRowKey = {0};
×
1763
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1764

1765
  {
1766
    SValue tsVal = {.type = TSDB_DATA_TYPE_TIMESTAMP};
×
1767
    VALUE_SET_TRIVIAL_DATUM(&tsVal, lRow.pBlockData->aTSKEY[lRow.iRow]);
×
1768
    SLastUpdateCtx updateCtx = {
×
1769
        .lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, tsVal)};
1770
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1771
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1772
    }
1773
  }
1774

1775
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1776

1777
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1778
    SColData *pColData = &pBlockData->aColData[iColData];
×
1779
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
1780
      continue;
×
1781
    }
1782

1783
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
1784
      STsdbRowKey tsdbRowKey = {0};
×
1785
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1786

1787
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
1788
      if (colType == 2) {
×
1789
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
1790
        TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit);
×
1791

1792
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1793
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1794
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1795
        }
1796
        break;
×
1797
      }
1798
    }
1799
  }
1800

1801
  // 2. prepare last row
1802
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1803
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
1804
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1805
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1806
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1807
    }
1808
  }
1809

1810
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1811

1812
_exit:
×
1813
  tsdbRowClose(&iter);
×
1814
  taosMemoryFreeClear(pTSchema);
×
1815
  taosArrayDestroy(ctxArray);
×
1816

1817
  TAOS_RETURN(code);
×
1818
}
1819

1820
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1821
                            int nCols, int16_t *slotIds);
1822

1823
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1824
                               int nCols, int16_t *slotIds);
1825

1826
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
118,707✔
1827
                                    SCacheRowsReader *pr, int8_t ltype) {
1828
  int32_t code = 0, lino = 0;
118,707✔
1829
  // rocksdb_writebatch_t *wb = NULL;
1830
  SArray *pTmpColArray = NULL;
118,707✔
1831
  bool    extraTS = false;
118,707✔
1832

1833
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
118,707✔
1834
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
118,707✔
1835
    // ignore 'ts' loaded from cache and load it from tsdb
1836
    // SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
1837
    // tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
1838

1839
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
×
1840
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
×
1841
      TAOS_RETURN(terrno);
×
1842
    }
1843

1844
    extraTS = true;
×
1845
  }
1846

1847
  int      num_keys = TARRAY_SIZE(remainCols);
118,707✔
1848
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
118,707✔
1849

1850
  int16_t *lastColIds = NULL, *lastSlotIds = NULL, *lastrowColIds = NULL, *lastrowSlotIds = NULL;
118,707✔
1851
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
118,707✔
1852
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
117,965✔
1853
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
117,965✔
1854
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
118,707✔
1855
  SArray *lastTmpColArray = NULL, *lastTmpIndexArray = NULL, *lastrowTmpColArray = NULL, *lastrowTmpIndexArray = NULL;
117,965✔
1856

1857
  int lastIndex = 0;
117,965✔
1858
  int lastrowIndex = 0;
117,965✔
1859

1860
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
117,965✔
1861
    TAOS_CHECK_EXIT(terrno);
×
1862
  }
1863

1864
  for (int i = 0; i < num_keys; ++i) {
398,625✔
1865
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
280,660✔
1866
    if (extraTS && !i) {
280,660✔
1867
      slotIds[i] = 0;
×
1868
    } else {
1869
      slotIds[i] = pr->pSlotIds[idxKey->idx];
280,660✔
1870
    }
1871

1872
    if (IS_LAST_KEY(idxKey->key)) {
280,660✔
1873
      if (NULL == lastTmpIndexArray) {
144,948✔
1874
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
61,149✔
1875
        if (!lastTmpIndexArray) {
61,149✔
1876
          TAOS_CHECK_EXIT(terrno);
×
1877
        }
1878
      }
1879
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
144,948✔
1880
        TAOS_CHECK_EXIT(terrno);
×
1881
      }
1882
      lastColIds[lastIndex] = idxKey->key.cid;
144,948✔
1883
      if (extraTS && !i) {
144,948✔
1884
        lastSlotIds[lastIndex] = 0;
×
1885
      } else {
1886
        lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
144,948✔
1887
      }
1888
      lastIndex++;
144,948✔
1889
    } else {
1890
      if (NULL == lastrowTmpIndexArray) {
134,970✔
1891
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
57,558✔
1892
        if (!lastrowTmpIndexArray) {
57,558✔
1893
          TAOS_CHECK_EXIT(terrno);
×
1894
        }
1895
      }
1896
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
136,454✔
1897
        TAOS_CHECK_EXIT(terrno);
×
1898
      }
1899
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
136,454✔
1900
      if (extraTS && !i) {
136,454✔
1901
        lastrowSlotIds[lastrowIndex] = 0;
×
1902
      } else {
1903
        lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
136,454✔
1904
      }
1905
      lastrowIndex++;
135,712✔
1906
    }
1907
  }
1908

1909
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
118,707✔
1910
  if (!pTmpColArray) {
118,707✔
1911
    TAOS_CHECK_EXIT(terrno);
×
1912
  }
1913

1914
  if (lastTmpIndexArray != NULL) {
118,707✔
1915
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
61,149✔
1916
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
198,010✔
1917
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
136,861✔
1918
                           taosArrayGet(lastTmpColArray, i))) {
136,861✔
1919
        TAOS_CHECK_EXIT(terrno);
×
1920
      }
1921
    }
1922
  }
1923

1924
  if (lastrowTmpIndexArray != NULL) {
118,707✔
1925
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
57,558✔
1926
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
188,585✔
1927
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
132,511✔
1928
                           taosArrayGet(lastrowTmpColArray, i))) {
131,769✔
1929
        TAOS_CHECK_EXIT(terrno);
×
1930
      }
1931
    }
1932
  }
1933

1934
  SLRUCache *pCache = pTsdb->lruCache;
118,707✔
1935
  for (int i = 0; i < num_keys; ++i) {
400,109✔
1936
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
281,402✔
1937
    SLastCol *pLastCol = NULL;
281,402✔
1938

1939
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
281,402✔
1940
      pLastCol = taosArrayGet(pTmpColArray, i);
269,372✔
1941
    }
1942

1943
    // still null, then make up a none col value
1944
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
281,402✔
1945
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
281,402✔
1946
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
1947
    if (!pLastCol) {
281,402✔
1948
      pLastCol = &noneCol;
12,030✔
1949
    }
1950

1951
    if (!extraTS || i > 0) {
281,402✔
1952
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from tsdb, col_id:%d col_flag:%d ts:%" PRId64,
281,402✔
1953
                TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, pLastCol->colVal.cid,
1954
                pLastCol->colVal.flag, pLastCol->rowKey.ts);
1955
      taosArraySet(pLastArray, idxKey->idx, pLastCol);
281,402✔
1956
    }
1957

1958
    // taosArrayRemove(remainCols, i);
1959

1960
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
281,402✔
1961
      continue;
×
1962
    }
1963
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
281,402✔
1964
      continue;
×
1965
    }
1966

1967
    // store result back to rocks cache
1968
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
281,402✔
1969
    if (code) {
280,660✔
1970
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1971
      TAOS_CHECK_EXIT(code);
×
1972
    }
1973

1974
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
280,660✔
1975
    if (code) {
281,402✔
1976
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1977
      TAOS_CHECK_EXIT(code);
×
1978
    }
1979

1980
    if (extraTS && i == 0) {
281,402✔
1981
      tsdbCacheFreeSLastColItem(pLastCol);
×
1982
    }
1983
  }
1984

1985
  rocksMayWrite(pTsdb, false);
118,707✔
1986

1987
_exit:
118,707✔
1988
  taosArrayDestroy(lastrowTmpIndexArray);
118,707✔
1989
  taosArrayDestroy(lastrowTmpColArray);
118,707✔
1990
  taosArrayDestroy(lastTmpIndexArray);
118,707✔
1991
  taosArrayDestroy(lastTmpColArray);
118,707✔
1992

1993
  taosMemoryFree(lastColIds);
118,707✔
1994
  taosMemoryFree(lastSlotIds);
118,707✔
1995
  taosMemoryFree(lastrowColIds);
118,707✔
1996
  taosMemoryFree(lastrowSlotIds);
118,707✔
1997

1998
  taosArrayDestroy(pTmpColArray);
118,707✔
1999

2000
  taosMemoryFree(slotIds);
118,707✔
2001

2002
  TAOS_RETURN(code);
118,707✔
2003
}
2004

2005
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
146,243✔
2006
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
2007
  int32_t code = 0, lino = 0;
146,243✔
2008
  int     num_keys = TARRAY_SIZE(remainCols);
146,243✔
2009
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
146,243✔
2010
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
146,243✔
2011
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
146,243✔
2012
  if (!keys_list || !keys_list_sizes || !key_list) {
146,243✔
2013
    taosMemoryFree(keys_list);
×
2014
    taosMemoryFree(keys_list_sizes);
×
2015
    TAOS_RETURN(terrno);
×
2016
  }
2017
  char  **values_list = NULL;
146,243✔
2018
  size_t *values_list_sizes = NULL;
146,243✔
2019
  for (int i = 0; i < num_keys; ++i) {
489,505✔
2020
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
343,262✔
2021
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
343,262✔
2022
    keys_list_sizes[i] = ROCKS_KEY_LEN;
343,262✔
2023
  }
2024

2025
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
146,243✔
2026

2027
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
146,243✔
2028
                                     &values_list_sizes);
2029
  if (code) {
145,501✔
2030
    taosMemoryFree(key_list);
×
2031
    taosMemoryFree(keys_list);
×
2032
    taosMemoryFree(keys_list_sizes);
×
2033
    TAOS_RETURN(code);
×
2034
  }
2035

2036
  SLRUCache *pCache = pTsdb->lruCache;
145,501✔
2037
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
488,021✔
2038
    SLastCol *pLastCol = NULL;
341,778✔
2039
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
341,778✔
2040
    if (ignore) {
342,520✔
2041
      ++j;
624✔
2042
      continue;
624✔
2043
    }
2044

2045
    if (values_list[i] != NULL) {
341,896✔
2046
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
61,860✔
2047
      if (code != TSDB_CODE_SUCCESS) {
61,366✔
2048
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2049
                  tstrerror(code));
2050
        goto _exit;
×
2051
      }
2052
    }
2053
    SLastCol *pToFree = pLastCol;
341,402✔
2054
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
341,402✔
2055
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
403,756✔
2056
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
61,860✔
2057
      if (code) {
61,860✔
2058
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
2059
        taosMemoryFreeClear(pToFree);
×
2060
        TAOS_CHECK_EXIT(code);
×
2061
      }
2062

2063
      SLastCol lastCol = *pLastCol;
61,860✔
2064
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
61,860✔
2065
      if (TSDB_CODE_SUCCESS != code) {
61,860✔
2066
        taosMemoryFreeClear(pToFree);
×
2067
        TAOS_CHECK_EXIT(code);
×
2068
      }
2069

2070
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from rocksdb, col_id:%d col_flag:%d ts:%" PRId64,
61,860✔
2071
                TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2072
                lastCol.colVal.flag, lastCol.rowKey.ts);
2073

2074
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
61,860✔
2075
      taosArrayRemove(remainCols, j);
61,366✔
2076
      taosArrayRemove(ignoreFromRocks, j);
61,366✔
2077
    } else {
2078
      ++j;
279,294✔
2079
    }
2080

2081
    taosMemoryFreeClear(pToFree);
341,154✔
2082
  }
2083

2084
  if (TARRAY_SIZE(remainCols) > 0) {
146,243✔
2085
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
2086
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
117,965✔
2087
  }
2088

2089
_exit:
146,243✔
2090
  taosMemoryFree(key_list);
146,243✔
2091
  taosMemoryFree(keys_list);
146,243✔
2092
  taosMemoryFree(keys_list_sizes);
146,243✔
2093
  if (values_list) {
146,243✔
2094
#ifdef USE_ROCKSDB
2095
    for (int i = 0; i < num_keys; ++i) {
489,505✔
2096
      rocksdb_free(values_list[i]);
343,262✔
2097
    }
2098
#endif
2099
    taosMemoryFree(values_list);
146,243✔
2100
  }
2101
  taosMemoryFree(values_list_sizes);
146,243✔
2102

2103
  TAOS_RETURN(code);
146,243✔
2104
}
2105

2106
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
2,246,673✔
2107
                                        int8_t ltype, SArray *keyArray) {
2108
  int32_t    code = 0, lino = 0;
2,246,673✔
2109
  SArray    *remainCols = NULL;
2,246,673✔
2110
  SArray    *ignoreFromRocks = NULL;
2,246,673✔
2111
  SLRUCache *pCache = pTsdb->lruCache;
2,246,673✔
2112
  SArray    *pCidList = pr->pCidList;
2,245,931✔
2113
  int        numKeys = TARRAY_SIZE(pCidList);
2,246,588✔
2114

2115
  for (int i = 0; i < numKeys; ++i) {
7,853,175✔
2116
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
5,605,760✔
2117

2118
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
5,602,537✔
2119
    // for select last_row, last case
2120
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
5,602,452✔
2121
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
5,602,452✔
2122
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
×
2123
    }
2124
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
5,599,314✔
2125
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
×
2126
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
×
2127
    }
2128

2129
    if (!taosArrayPush(keyArray, &key)) {
5,605,760✔
2130
      TAOS_CHECK_EXIT(terrno);
×
2131
    }
2132

2133
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
5,605,760✔
2134
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
5,606,417✔
2135
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
10,868,088✔
2136
      SLastCol lastCol = *pLastCol;
5,261,501✔
2137
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
5,262,328✔
2138
        tsdbLRUCacheRelease(pCache, h, false);
×
2139
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
2140
      }
2141

2142
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from lru, col_id:%d col_flag:%d ts:%" PRId64, TD_VID(pTsdb->pVnode),
5,263,240✔
2143
                __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid, lastCol.colVal.flag,
2144
                lastCol.rowKey.ts);
2145

2146
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
5,262,413✔
2147
        code = terrno;
×
2148
        tsdbLRUCacheRelease(pCache, h, false);
×
2149
        goto _exit;
×
2150
      }
2151
    } else {
2152
      // no cache or cache is invalid
2153
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
344,916✔
2154
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
343,262✔
2155

2156
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
342,520✔
2157
        code = terrno;
×
2158
        tsdbLRUCacheRelease(pCache, h, false);
×
2159
        goto _exit;
×
2160
      }
2161

2162
      if (!remainCols) {
342,520✔
2163
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
146,243✔
2164
          code = terrno;
×
2165
          tsdbLRUCacheRelease(pCache, h, false);
×
2166
          goto _exit;
×
2167
        }
2168
      }
2169
      if (!ignoreFromRocks) {
342,520✔
2170
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
146,243✔
2171
          code = terrno;
×
2172
          tsdbLRUCacheRelease(pCache, h, false);
×
2173
          goto _exit;
×
2174
        }
2175
      }
2176
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
685,782✔
2177
        code = terrno;
×
2178
        tsdbLRUCacheRelease(pCache, h, false);
×
2179
        goto _exit;
×
2180
      }
2181
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
343,262✔
2182
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
343,262✔
2183
        code = terrno;
×
2184
        tsdbLRUCacheRelease(pCache, h, false);
×
2185
        goto _exit;
×
2186
      }
2187
    }
2188

2189
    if (h) {
5,605,675✔
2190
      tsdbLRUCacheRelease(pCache, h, false);
5,262,295✔
2191
    }
2192
  }
2193

2194
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
2,247,415✔
2195
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
146,243✔
2196

2197
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
489,505✔
2198
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
343,262✔
2199
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
343,262✔
2200
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
343,262✔
2201
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
343,262✔
2202
        SLastCol lastCol = *pLastCol;
×
2203
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
2204
        if (code) {
×
2205
          tsdbLRUCacheRelease(pCache, h, false);
×
2206
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
2207
          TAOS_RETURN(code);
×
2208
        }
2209

2210
        tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from lru(2nd lookup), col_id:%d col_flag:%d ts:%" PRId64,
×
2211
                  TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2212
                  lastCol.colVal.flag, lastCol.rowKey.ts);
2213

2214
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
2215

2216
        taosArrayRemove(remainCols, i);
×
2217
        taosArrayRemove(ignoreFromRocks, i);
×
2218
      } else {
2219
        // no cache or cache is invalid
2220
        ++i;
343,262✔
2221
      }
2222
      if (h) {
343,262✔
2223
        tsdbLRUCacheRelease(pCache, h, false);
624✔
2224
      }
2225
    }
2226

2227
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
2228
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
146,243✔
2229

2230
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
146,243✔
2231
  }
2232

2233
_exit:
2,101,172✔
2234
  if (remainCols) {
2,247,415✔
2235
    taosArrayDestroy(remainCols);
146,243✔
2236
  }
2237
  if (ignoreFromRocks) {
2,247,415✔
2238
    taosArrayDestroy(ignoreFromRocks);
146,243✔
2239
  }
2240

2241
  TAOS_RETURN(code);
2,247,415✔
2242
}
2243

2244
typedef enum SMEMNEXTROWSTATES {
2245
  SMEMNEXTROW_ENTER,
2246
  SMEMNEXTROW_NEXT,
2247
} SMEMNEXTROWSTATES;
2248

2249
typedef struct SMemNextRowIter {
2250
  SMEMNEXTROWSTATES state;
2251
  STbData          *pMem;  // [input]
2252
  STbDataIter       iter;  // mem buffer skip list iterator
2253
  int64_t           lastTs;
2254
} SMemNextRowIter;
2255

2256
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2,295,722✔
2257
                                 int nCols) {
2258
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
2,295,722✔
2259
  int32_t          code = 0;
2,295,722✔
2260
  *pIgnoreEarlierTs = false;
2,295,722✔
2261
  switch (state->state) {
2,295,807✔
2262
    case SMEMNEXTROW_ENTER: {
2,076,470✔
2263
      if (state->pMem != NULL) {
2,076,470✔
2264
        /*
2265
        if (state->pMem->maxKey <= state->lastTs) {
2266
          *ppRow = NULL;
2267
          *pIgnoreEarlierTs = true;
2268

2269
          TAOS_RETURN(code);
2270
        }
2271
        */
2272
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
2,076,470✔
2273

2274
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
2,077,297✔
2275
        if (pMemRow) {
2,077,212✔
2276
          *ppRow = pMemRow;
2,077,212✔
2277
          state->state = SMEMNEXTROW_NEXT;
2,077,212✔
2278

2279
          TAOS_RETURN(code);
2,077,212✔
2280
        }
2281
      }
2282

2283
      *ppRow = NULL;
1,569✔
2284

2285
      TAOS_RETURN(code);
×
2286
    }
2287
    case SMEMNEXTROW_NEXT:
218,510✔
2288
      if (tsdbTbDataIterNext(&state->iter)) {
218,510✔
2289
        *ppRow = tsdbTbDataIterGet(&state->iter);
306,892✔
2290

2291
        TAOS_RETURN(code);
153,446✔
2292
      } else {
2293
        *ppRow = NULL;
65,064✔
2294

2295
        TAOS_RETURN(code);
65,064✔
2296
      }
2297
    default:
×
2298
      break;
×
2299
  }
2300

2301
_err:
×
2302
  *ppRow = NULL;
×
2303

2304
  TAOS_RETURN(code);
×
2305
}
2306

2307
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2308
                                  int nCols);
2309
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2310

2311
typedef struct {
2312
  TSDBROW             *pRow;
2313
  bool                 stop;
2314
  bool                 next;
2315
  bool                 ignoreEarlierTs;
2316
  void                *iter;
2317
  _next_row_fn_t       nextRowFn;
2318
  _next_row_clear_fn_t nextRowClearFn;
2319
} TsdbNextRowState;
2320

2321
typedef struct {
2322
  SArray           *pMemDelData;
2323
  SArray           *pSkyline;
2324
  int64_t           iSkyline;
2325
  SBlockIdx         idx;
2326
  SMemNextRowIter   memState;
2327
  SMemNextRowIter   imemState;
2328
  TSDBROW           memRow, imemRow;
2329
  TsdbNextRowState  input[2];
2330
  SCacheRowsReader *pr;
2331
  STsdb            *pTsdb;
2332
} MemNextRowIter;
2333

2334
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
2,246,673✔
2335
                              STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
2336
  int32_t code = 0, lino = 0;
2,246,673✔
2337

2338
  STbData *pMem = NULL;
2,243,450✔
2339
  if (pReadSnap->pMem) {
2,243,450✔
2340
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
2,244,277✔
2341
  }
2342

2343
  STbData *pIMem = NULL;
2,248,157✔
2344
  if (pReadSnap->pIMem) {
2,248,157✔
2345
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
5,815✔
2346
  }
2347

2348
  pIter->pTsdb = pTsdb;
2,247,415✔
2349

2350
  pIter->pMemDelData = NULL;
2,247,415✔
2351

2352
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
2,247,415✔
2353

2354
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
2,247,415✔
2355

2356
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
2,245,189✔
2357
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
2,245,104✔
2358

2359
  if (pMem) {
2,245,931✔
2360
    pIter->memState.pMem = pMem;
1,957,929✔
2361
    pIter->memState.state = SMEMNEXTROW_ENTER;
1,957,929✔
2362
    pIter->input[0].stop = false;
1,960,325✔
2363
    pIter->input[0].next = true;
1,958,671✔
2364
  }
2365

2366
  if (pIMem) {
2,247,415✔
2367
    pIter->imemState.pMem = pIMem;
5,815✔
2368
    pIter->imemState.state = SMEMNEXTROW_ENTER;
5,815✔
2369
    pIter->input[1].stop = false;
5,815✔
2370
    pIter->input[1].next = true;
5,815✔
2371
  }
2372

2373
  pIter->pr = pr;
2,247,415✔
2374

2375
_exit:
2,246,673✔
2376
  if (code) {
2,245,846✔
2377
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2378
  }
2379

2380
  TAOS_RETURN(code);
2,245,846✔
2381
}
2382

2383
static void memRowIterClose(MemNextRowIter *pIter) {
2,246,673✔
2384
  for (int i = 0; i < 2; ++i) {
6,730,177✔
2385
    if (pIter->input[i].nextRowClearFn) {
4,487,312✔
2386
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
2387
    }
2388
  }
2389

2390
  if (pIter->pSkyline) {
2,242,865✔
2391
    taosArrayDestroy(pIter->pSkyline);
1,958,171✔
2392
  }
2393

2394
  if (pIter->pMemDelData) {
2,243,535✔
2395
    taosArrayDestroy(pIter->pMemDelData);
2,244,362✔
2396
  }
2397
}
2,245,931✔
2398

2399
static void freeTableInfoFunc(void *param) {
1,960,618✔
2400
  void **p = (void **)param;
1,960,618✔
2401
  taosMemoryFreeClear(*p);
1,960,618✔
2402
}
1,962,929✔
2403

2404
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
2,091,537✔
2405
  if (!pReader->pTableMap) {
2,091,537✔
2406
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
864,452✔
2407
    if (!pReader->pTableMap) {
865,436✔
2408
      return NULL;
×
2409
    }
2410

2411
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
865,436✔
2412
  }
2413

2414
  STableLoadInfo  *pInfo = NULL;
2,090,952✔
2415
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
2,092,606✔
2416
  if (!ppInfo) {
2,090,053✔
2417
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
1,962,272✔
2418
    if (pInfo) {
1,964,583✔
2419
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
1,963,841✔
2420
        return NULL;
×
2421
      }
2422
    }
2423

2424
    return pInfo;
1,963,014✔
2425
  }
2426

2427
  return *ppInfo;
127,781✔
2428
}
2429

2430
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
2,453,204✔
2431
  int32_t code = 0, lino = 0;
2,453,204✔
2432

2433
  for (;;) {
5,880✔
2434
    for (int i = 0; i < 2; ++i) {
7,370,303✔
2435
      if (pIter->input[i].next && !pIter->input[i].stop) {
4,915,772✔
2436
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
2,178,466✔
2437
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
2438
                        &lino, _exit);
2439

2440
        if (pIter->input[i].pRow == NULL) {
2,177,639✔
2441
          pIter->input[i].stop = true;
61,465✔
2442
          pIter->input[i].next = false;
61,465✔
2443
        }
2444
      }
2445
    }
2446

2447
    if (pIter->input[0].stop && pIter->input[1].stop) {
2,454,531✔
2448
      return NULL;
347,071✔
2449
    }
2450

2451
    TSDBROW *max[2] = {0};
2,109,702✔
2452
    int      iMax[2] = {-1, -1};
2,109,702✔
2453
    int      nMax = 0;
2,108,133✔
2454
    SRowKey  maxKey = {.ts = TSKEY_MIN};
2,108,133✔
2455

2456
    for (int i = 0; i < 2; ++i) {
6,327,436✔
2457
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
4,217,492✔
2458
        STsdbRowKey tsdbRowKey = {0};
2,116,001✔
2459
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
2,114,432✔
2460

2461
        // merging & deduplicating on client side
2462
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
2,116,916✔
2463
        if (c <= 0) {
2,114,190✔
2464
          if (c < 0) {
2,108,875✔
2465
            nMax = 0;
2,108,875✔
2466
            maxKey = tsdbRowKey.key;
2,108,875✔
2467
          }
2468

2469
          iMax[nMax] = i;
2,108,875✔
2470
          max[nMax++] = pIter->input[i].pRow;
2,109,117✔
2471
        }
2472
        pIter->input[i].next = false;
2,113,690✔
2473
      }
2474
    }
2475

2476
    TSDBROW *merge[2] = {0};
2,109,944✔
2477
    int      iMerge[2] = {-1, -1};
2,110,686✔
2478
    int      nMerge = 0;
2,111,755✔
2479
    for (int i = 0; i < nMax; ++i) {
4,219,888✔
2480
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
2,109,444✔
2481

2482
      if (!pIter->pSkyline) {
2,108,048✔
2483
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
1,958,014✔
2484
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
1,960,325✔
2485

2486
        uint64_t        uid = pIter->idx.uid;
1,959,583✔
2487
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
1,960,325✔
2488
        TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
1,959,413✔
2489

2490
        if (pInfo->pTombData == NULL) {
1,959,413✔
2491
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
1,849,097✔
2492
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
1,846,371✔
2493
        }
2494

2495
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
1,956,687✔
2496
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2497
        }
2498

2499
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
1,961,809✔
2500
        if (delSize > 0) {
1,960,325✔
2501
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
12,744✔
2502
          TAOS_CHECK_GOTO(code, &lino, _exit);
12,744✔
2503
        }
2504
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
1,960,325✔
2505
      }
2506

2507
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
2,109,117✔
2508
      if (!deleted) {
2,111,428✔
2509
        iMerge[nMerge] = iMax[i];
2,105,548✔
2510
        merge[nMerge++] = max[i];
2,103,237✔
2511
      }
2512

2513
      pIter->input[iMax[i]].next = deleted;
2,108,960✔
2514
    }
2515

2516
    if (nMerge > 0) {
2,110,444✔
2517
      pIter->input[iMerge[0]].next = true;
2,103,822✔
2518

2519
      return merge[0];
2,102,495✔
2520
    }
2521
  }
2522

2523
_exit:
×
2524
  if (code) {
×
2525
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2526
  }
2527

2528
  return NULL;
×
2529
}
2530

2531
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
861,321✔
2532
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
861,321✔
2533
  *ppDst = taosMemoryMalloc(len);
861,321✔
2534
  if (NULL == *ppDst) {
859,667✔
2535
    TAOS_RETURN(terrno);
×
2536
  }
2537
  memcpy(*ppDst, pSrc, len);
860,409✔
2538

2539
  TAOS_RETURN(TSDB_CODE_SUCCESS);
860,494✔
2540
}
2541

2542
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
2,170,080✔
2543
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
2,170,080✔
2544
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
857,026✔
2545
  }
2546

2547
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
1,313,966✔
2548
    TAOS_RETURN(TSDB_CODE_SUCCESS);
1,306,745✔
2549
  }
2550

2551
  taosMemoryFreeClear(pReader->pCurrSchema);
2,099✔
2552
  TAOS_RETURN(
2,099✔
2553
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
2554
}
2555

2556
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
2,246,673✔
2557
                                        SArray *keyArray) {
2558
  int32_t        code = 0;
2,246,673✔
2559
  int32_t        lino = 0;
2,246,673✔
2560
  STSchema      *pTSchema = pr->pSchema;
2,246,673✔
2561
  SLRUCache     *pCache = pTsdb->lruCache;
2,245,019✔
2562
  SArray        *pCidList = pr->pCidList;
2,245,104✔
2563
  int            numKeys = TARRAY_SIZE(pCidList);
2,244,277✔
2564
  MemNextRowIter iter = {0};
2,245,846✔
2565
  SSHashObj     *iColHash = NULL;
2,244,277✔
2566
  STSDBRowIter   rowIter = {0};
2,244,277✔
2567

2568
  // 1, get from mem, imem filtered with delete info
2569
  TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
2,245,104✔
2570

2571
  TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
2,246,673✔
2572
  if (!pRow) {
2,245,019✔
2573
    goto _exit;
285,606✔
2574
  }
2575

2576
  int32_t sversion = TSDBROW_SVERSION(pRow);
1,959,413✔
2577
  if (sversion != -1) {
1,956,687✔
2578
    TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
1,955,445✔
2579

2580
    pTSchema = pr->pCurrSchema;
1,959,655✔
2581
  }
2582
  int32_t nCol = pTSchema->numOfCols;
1,961,397✔
2583

2584
  STsdbRowKey rowKey = {0};
1,960,155✔
2585
  tsdbRowGetKey(pRow, &rowKey);
1,960,155✔
2586

2587
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
1,960,982✔
2588

2589
  int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
1,959,583✔
2590
  for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) {
8,770,848✔
2591
    SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
6,806,571✔
2592
    if (pColVal->cid < pTargetCol->colVal.cid) {
6,804,917✔
2593
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
1,931,374✔
2594

2595
      continue;
1,931,374✔
2596
    }
2597
    if (pColVal->cid > pTargetCol->colVal.cid) {
4,873,213✔
2598
      break;
×
2599
    }
2600

2601
    int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
4,875,354✔
2602
    if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
4,875,697✔
2603
      if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
1,829,108✔
2604
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1,829,108✔
2605
        TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
1,828,366✔
2606

2607
        tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from memtable, col_id:%d col_flag:%d ts:%" PRId64,
1,829,108✔
2608
                  TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2609
                  lastCol.colVal.flag, rowKey.key.ts);
2610

2611
        tsdbCacheFreeSLastColItem(pTargetCol);
1,829,108✔
2612
        taosArraySet(pLastArray, jCol, &lastCol);
1,829,108✔
2613
      }
2614
    } else {
2615
      if (COL_VAL_IS_VALUE(pColVal)) {
3,050,384✔
2616
        if (cmp_res <= 0) {
2,845,953✔
2617
          SLastCol lastCol = {
2,845,310✔
2618
              .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2619
          TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
2,846,879✔
2620

2621
          tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64
2,848,948✔
2622
                    " from memtable(last) and memtable(newer), col_id:%d col_flag:%d ts:%" PRId64,
2623
                    TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2624
                    lastCol.colVal.flag, rowKey.key.ts);
2625

2626
          tsdbCacheFreeSLastColItem(pTargetCol);
2,848,948✔
2627
          taosArraySet(pLastArray, jCol, &lastCol);
2,849,033✔
2628
        }
2629
      } else {
2630
        if (!iColHash) {
203,604✔
2631
          iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
110,600✔
2632
          if (iColHash == NULL) {
110,600✔
2633
            TAOS_CHECK_EXIT(terrno);
×
2634
          }
2635
        }
2636

2637
        if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
203,604✔
2638
          TAOS_CHECK_EXIT(terrno);
×
2639
        }
2640
      }
2641
    }
2642

2643
    ++jCol;
4,882,372✔
2644

2645
    if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) {
4,882,372✔
2646
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
2,918,367✔
2647
    }
2648
  }
2649
  tsdbRowClose(&rowIter);
1,961,551✔
2650

2651
  if (iColHash && tSimpleHashGetSize(iColHash) > 0) {
1,961,067✔
2652
    pRow = memRowIterGet(&iter, false, NULL, 0);
110,600✔
2653
    while (pRow) {
206,531✔
2654
      if (tSimpleHashGetSize(iColHash) == 0) {
145,066✔
2655
        break;
49,135✔
2656
      }
2657

2658
      sversion = TSDBROW_SVERSION(pRow);
95,931✔
2659
      if (sversion != -1) {
95,931✔
2660
        TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
95,931✔
2661

2662
        pTSchema = pr->pCurrSchema;
95,931✔
2663
      }
2664
      nCol = pTSchema->numOfCols;
95,931✔
2665

2666
      STsdbRowKey tsdbRowKey = {0};
95,931✔
2667
      tsdbRowGetKey(pRow, &tsdbRowKey);
95,931✔
2668

2669
      TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
95,931✔
2670

2671
      iCol = 0;
95,931✔
2672
      for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
644,719✔
2673
           pColVal = tsdbRowIterNext(&rowIter), iCol++) {
548,788✔
2674
        int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
548,788✔
2675
        if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
548,788✔
2676
          SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol];
138,691✔
2677

2678
          int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key);
138,691✔
2679
          if (cmp_res <= 0) {
138,691✔
2680
            SLastCol lastCol = {
138,691✔
2681
                .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2682
            TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
138,691✔
2683

2684
            tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from memtable(hash), col_id:%d col_flag:%d ts:%" PRId64,
138,691✔
2685
                      TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2686
                      lastCol.colVal.flag, tsdbRowKey.key.ts);
2687

2688
            tsdbCacheFreeSLastColItem(pTargetCol);
138,691✔
2689
            taosArraySet(pLastArray, *pjCol, &lastCol);
138,691✔
2690
          }
2691

2692
          TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
138,691✔
2693
        }
2694
      }
2695
      tsdbRowClose(&rowIter);
95,931✔
2696

2697
      pRow = memRowIterGet(&iter, false, NULL, 0);
95,931✔
2698
    }
2699
  }
2700

2701
_exit:
2,244,692✔
2702
  if (code) {
2,245,104✔
2703
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2704

2705
    tsdbRowClose(&rowIter);
×
2706
  }
2707

2708
  tSimpleHashCleanup(iColHash);
2,245,104✔
2709

2710
  memRowIterClose(&iter);
2,245,104✔
2711

2712
  TAOS_RETURN(code);
2,245,931✔
2713
}
2714

2715
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
2,245,931✔
2716
  int32_t code = 0;
2,245,931✔
2717
  int32_t lino = 0;
2,245,931✔
2718

2719
  tsdbDebug("vgId:%d, %s start, qid:%s uid:%" PRId64 " ltype:%d", TD_VID(pTsdb->pVnode), __func__,
2,245,931✔
2720
            pr && pr->idstr ? pr->idstr : "null", uid, ltype);
2721

2722
  SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
2,245,931✔
2723
  if (!keyArray) {
2,245,846✔
2724
    TAOS_CHECK_EXIT(terrno);
×
2725
  }
2726

2727
  TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
2,245,846✔
2728

2729
  if (tsUpdateCacheBatch) {
2,245,846✔
2730
    TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
2,246,673✔
2731
  }
2732

2733
_exit:
2,245,846✔
2734
  if (code) {
2,245,846✔
2735
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2736
  }
2737

2738
  if (keyArray) {
2,246,673✔
2739
    taosArrayDestroy(keyArray);
2,246,673✔
2740
  }
2741

2742
  TAOS_RETURN(code);
2,244,277✔
2743
}
2744

2745
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
2,505,458✔
2746
  int32_t   code = 0, lino = 0;
2,505,458✔
2747
  STSchema *pTSchema = NULL;
2,505,458✔
2748
  int       sver = -1;
2,505,458✔
2749
  int       numKeys = 0;
2,505,458✔
2750
  SArray   *remainCols = NULL;
2,505,458✔
2751

2752
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
2,505,458✔
2753

2754
  int numCols = pTSchema->numOfCols;
2,505,458✔
2755

2756
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
2,505,458✔
2757

2758
  for (int i = 0; i < numCols; ++i) {
11,239,608✔
2759
    int16_t cid = pTSchema->columns[i].colId;
8,734,150✔
2760
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
26,202,450✔
2761
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
17,468,300✔
2762
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
17,468,300✔
2763
      if (h) {
17,468,300✔
2764
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
87,936✔
2765
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
87,936✔
2766
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
2,496✔
2767
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
2,496✔
2768
                              .dirty = 1,
2769
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2770
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
2,496✔
2771
        }
2772
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
87,936✔
2773
        TAOS_CHECK_EXIT(code);
87,936✔
2774
      } else {
2775
        if (!remainCols) {
17,380,364✔
2776
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
2,494,304✔
2777
        }
2778
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
34,760,728✔
2779
          TAOS_CHECK_EXIT(terrno);
×
2780
        }
2781
      }
2782
    }
2783
  }
2784

2785
  if (remainCols) {
2,505,458✔
2786
    numKeys = TARRAY_SIZE(remainCols);
2,494,304✔
2787
  }
2788

2789
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
2,505,458✔
2790
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
2,505,458✔
2791
  char  **values_list = NULL;
2,505,458✔
2792
  size_t *values_list_sizes = NULL;
2,505,458✔
2793

2794
  if (!keys_list || !keys_list_sizes) {
2,505,458✔
2795
    code = terrno;
×
2796
    goto _exit;
×
2797
  }
2798
  const size_t klen = ROCKS_KEY_LEN;
2,505,458✔
2799

2800
  for (int i = 0; i < numKeys; ++i) {
19,885,822✔
2801
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
17,380,364✔
2802
    if (!key) {
17,380,364✔
2803
      code = terrno;
×
2804
      goto _exit;
×
2805
    }
2806
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
17,380,364✔
2807

2808
    ((SLastKey *)key)[0] = idxKey->key;
17,380,364✔
2809

2810
    keys_list[i] = key;
17,380,364✔
2811
    keys_list_sizes[i] = klen;
17,380,364✔
2812
  }
2813

2814
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
2,505,458✔
2815

2816
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
2,505,458✔
2817
                                              &values_list, &values_list_sizes),
2818
                  NULL, _exit);
2819

2820
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
2821
  for (int i = 0; i < numKeys; ++i) {
19,885,822✔
2822
    SLastCol *pLastCol = NULL;
17,380,364✔
2823
    if (values_list[i] != NULL) {
17,380,364✔
2824
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
2825
      if (code != TSDB_CODE_SUCCESS) {
×
2826
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2827
                  tstrerror(code));
2828
        goto _exit;
×
2829
      }
2830
    }
2831
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
17,380,364✔
2832
    SLastKey *pLastKey = &idxKey->key;
17,380,364✔
2833
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
17,380,364✔
2834
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
×
2835
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
×
2836
                             .dirty = 0,
2837
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2838

2839
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
×
2840
        taosMemoryFreeClear(pLastCol);
×
2841
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2842
        goto _exit;
×
2843
      }
2844
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
×
2845
        taosMemoryFreeClear(pLastCol);
×
2846
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2847
        goto _exit;
×
2848
      }
2849
    }
2850

2851
    if (pLastCol == NULL) {
17,380,364✔
2852
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
17,380,364✔
2853
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2854
    }
2855

2856
    taosMemoryFreeClear(pLastCol);
17,380,364✔
2857
  }
2858

2859
  rocksMayWrite(pTsdb, false);
2,505,458✔
2860

2861
_exit:
2,505,458✔
2862
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
2,505,458✔
2863

2864
  for (int i = 0; i < numKeys; ++i) {
19,885,822✔
2865
    taosMemoryFree(keys_list[i]);
17,380,364✔
2866
  }
2867
  taosMemoryFree(keys_list);
2,505,458✔
2868
  taosMemoryFree(keys_list_sizes);
2,505,458✔
2869
  if (values_list) {
2,505,458✔
2870
#if USE_ROCKSDB
2871
    for (int i = 0; i < numKeys; ++i) {
19,885,822✔
2872
      rocksdb_free(values_list[i]);
17,380,364✔
2873
    }
2874
#endif
2875
    taosMemoryFree(values_list);
2,505,458✔
2876
  }
2877
  taosMemoryFree(values_list_sizes);
2,505,458✔
2878
  taosArrayDestroy(remainCols);
2,505,458✔
2879
  taosMemoryFree(pTSchema);
2,505,458✔
2880

2881
  TAOS_RETURN(code);
2,505,458✔
2882
}
2883

2884
int32_t tsdbOpenCache(STsdb *pTsdb) {
4,988,390✔
2885
  int32_t code = 0, lino = 0;
4,988,390✔
2886
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
4,992,820✔
2887
  int32_t numShardBits = pTsdb->pVnode->config.cacheLastShardBits;
4,992,820✔
2888

2889
  // Use configured shard bits, or -1 to auto-calculate based on cache size
2890
  // This enables multi-shard LRU cache for better concurrency
2891
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, numShardBits, .5);
4,992,820✔
2892
  if (pCache == NULL) {
4,992,820✔
2893
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2894
  }
2895

2896
#ifdef USE_SHARED_STORAGE
2897
  if (tsSsEnabled) {
4,992,820✔
2898
    TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
24,810✔
2899
    TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
24,810✔
2900
  }
2901
#endif
2902

2903
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
4,992,820✔
2904

2905
  taosLRUCacheSetStrictCapacity(pCache, false);
4,992,728✔
2906

2907
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
4,992,820✔
2908

2909
  pTsdb->lruCache = pCache;
4,990,974✔
2910

2911
  tsdbInfo("vgId:%d, lruCache opened with capacity:%zu bytes, numShards:%d (configured:%d)",
4,991,430✔
2912
           TD_VID(pTsdb->pVnode), cfgCapacity, taosLRUCacheGetNumShards(pCache), numShardBits);
2913
           
2914
  TAOS_RETURN(0);
4,992,287✔
2915

2916
_err:
×
2917
  if (code) {
×
2918
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2919
    if (pCache) {
×
2920
      taosLRUCacheCleanup(pCache);
×
2921
      pCache = NULL;
×
2922
    }
2923
  }
2924

2925
  pTsdb->lruCache = pCache;
×
2926
  TAOS_RETURN(code);
×
2927
}
2928

2929
void tsdbCloseCache(STsdb *pTsdb) {
4,990,547✔
2930
  SLRUCache *pCache = pTsdb->lruCache;
4,990,547✔
2931
  if (pCache) {
4,991,499✔
2932
    taosLRUCacheEraseUnrefEntries(pCache);
4,990,675✔
2933

2934
    taosLRUCacheCleanup(pCache);
4,992,410✔
2935

2936
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
4,992,820✔
2937
  }
2938

2939
#ifdef USE_SHARED_STORAGE
2940
  if (tsSsEnabled) {
4,992,820✔
2941
    tsdbCloseBCache(pTsdb);
24,810✔
2942
    tsdbClosePgCache(pTsdb);
24,810✔
2943
  }
2944
#endif
2945

2946
  tsdbCloseRocksCache(pTsdb);
4,992,820✔
2947
}
4,992,820✔
2948

2949
// Rebuild only the last cache (lruCache) with a new shard count.
2950
// Must be called with pTsdb->lruMutex held by the caller.
2951
int32_t tsdbRebuildLastCache(STsdb *pTsdb, int32_t numShardBits) {
1,654✔
2952
  size_t     cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
1,654✔
2953
  SLRUCache *pNewCache = taosLRUCacheInit(cfgCapacity, numShardBits, .5);
1,654✔
2954
  if (pNewCache == NULL) {
1,654✔
2955
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
2956
  }
2957
  taosLRUCacheSetStrictCapacity(pNewCache, false);
1,654✔
2958

2959
  // Swap in the new cache and clean up the old one
2960
  SLRUCache *pOldCache = pTsdb->lruCache;
1,654✔
2961
  pTsdb->lruCache = pNewCache;
1,654✔
2962

2963
  if (pOldCache) {
1,654✔
2964
    int64_t start = taosGetTimestampMs();
1,654✔
2965
    taosLRUCacheEraseUnrefEntries(pOldCache);
1,654✔
2966
    taosLRUCacheCleanup(pOldCache);
1,654✔
2967
    int64_t end = taosGetTimestampMs();
1,654✔
2968
    tsdbInfo("vgId:%d, lruCache erase unref entries and cleanup time:%" PRId64 " ms", TD_VID(pTsdb->pVnode),
1,654✔
2969
             end - start);
2970
  }
2971

2972
  tsdbInfo("vgId:%d, lruCache rebuilt with capacity:%zu bytes, numShards:%d (configured:%d)", TD_VID(pTsdb->pVnode),
1,654✔
2973
           cfgCapacity, taosLRUCacheGetNumShards(pNewCache), numShardBits);
2974

2975
  TAOS_RETURN(0);
1,654✔
2976
}
2977

2978
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
2979
  if (cacheType == 0) {  // last_row
×
2980
    *(uint64_t *)key = (uint64_t)uid;
×
2981
  } else {  // last
2982
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2983
  }
2984

2985
  *len = sizeof(uint64_t);
×
2986
}
×
2987

2988
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
2989
  tb_uid_t suid = 0;
×
2990

2991
  SMetaReader mr = {0};
×
2992
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
2993
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
2994
    metaReaderClear(&mr);  // table not esist
×
2995
    return 0;
×
2996
  }
2997

2998
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
2999
    suid = mr.me.ctbEntry.suid;
×
3000
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
3001
    suid = 0;
×
3002
  } else {
3003
    suid = 0;
×
3004
  }
3005

3006
  metaReaderClear(&mr);
×
3007

3008
  return suid;
×
3009
}
3010

3011
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
3012
  int32_t code = 0;
×
3013

3014
  if (pDelIdx) {
×
3015
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
3016
  }
3017

3018
  TAOS_RETURN(code);
×
3019
}
3020

3021
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
3022
  int32_t   code = 0;
×
3023
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
3024

3025
  for (; pDelData; pDelData = pDelData->pNext) {
×
3026
    if (!taosArrayPush(aDelData, pDelData)) {
×
3027
      TAOS_RETURN(terrno);
×
3028
    }
3029
  }
3030

3031
  TAOS_RETURN(code);
×
3032
}
3033

3034
static uint64_t *getUidList(SCacheRowsReader *pReader) {
17,945✔
3035
  if (!pReader->uidList) {
17,945✔
3036
    int32_t numOfTables = pReader->numOfTables;
4,729✔
3037

3038
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
4,729✔
3039
    if (!pReader->uidList) {
4,729✔
3040
      return NULL;
×
3041
    }
3042

3043
    for (int32_t i = 0; i < numOfTables; ++i) {
18,530✔
3044
      uint64_t uid = pReader->pTableList[i].uid;
13,801✔
3045
      pReader->uidList[i] = uid;
13,801✔
3046
    }
3047

3048
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
4,729✔
3049
  }
3050

3051
  return pReader->uidList;
17,945✔
3052
}
3053

3054
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
17,945✔
3055
                               bool isFile) {
3056
  int32_t   code = 0;
17,945✔
3057
  int32_t   numOfTables = pReader->numOfTables;
17,945✔
3058
  int64_t   suid = pReader->info.suid;
17,945✔
3059
  uint64_t *uidList = getUidList(pReader);
17,945✔
3060

3061
  if (!uidList) {
17,945✔
3062
    TAOS_RETURN(terrno);
×
3063
  }
3064

3065
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
25,945✔
3066
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
8,000✔
3067
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
8,000✔
3068
      continue;
×
3069
    }
3070

3071
    if (pTombBlk->minTbid.suid > suid ||
8,000✔
3072
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
8,000✔
3073
      break;
3074
    }
3075

3076
    STombBlock block = {0};
8,000✔
3077
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
8,000✔
3078
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
8,000✔
3079
    if (code != TSDB_CODE_SUCCESS) {
8,000✔
3080
      TAOS_RETURN(code);
×
3081
    }
3082

3083
    uint64_t        uid = uidList[j];
8,000✔
3084
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
8,000✔
3085
    if (!pInfo) {
8,000✔
3086
      tTombBlockDestroy(&block);
×
3087
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
3088
    }
3089

3090
    if (pInfo->pTombData == NULL) {
8,000✔
3091
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
1,000✔
3092
    }
3093

3094
    STombRecord record = {0};
8,000✔
3095
    bool        finished = false;
8,000✔
3096
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
16,000✔
3097
      code = tTombBlockGet(&block, k, &record);
8,000✔
3098
      if (code != TSDB_CODE_SUCCESS) {
8,000✔
3099
        finished = true;
×
3100
        break;
×
3101
      }
3102

3103
      if (record.suid < suid) {
8,000✔
3104
        continue;
×
3105
      }
3106
      if (record.suid > suid) {
8,000✔
3107
        finished = true;
×
3108
        break;
×
3109
      }
3110

3111
      bool newTable = false;
8,000✔
3112
      if (uid < record.uid) {
8,000✔
3113
        while (j < numOfTables && uidList[j] < record.uid) {
48,000✔
3114
          ++j;
40,000✔
3115
          newTable = true;
40,000✔
3116
        }
3117

3118
        if (j >= numOfTables) {
8,000✔
3119
          finished = true;
×
3120
          break;
×
3121
        }
3122

3123
        uid = uidList[j];
8,000✔
3124
      }
3125

3126
      if (record.uid < uid) {
8,000✔
3127
        continue;
×
3128
      }
3129

3130
      if (newTable) {
8,000✔
3131
        pInfo = getTableLoadInfo(pReader, uid);
8,000✔
3132
        if (!pInfo) {
8,000✔
3133
          code = TSDB_CODE_OUT_OF_MEMORY;
×
3134
          finished = true;
×
3135
          break;
×
3136
        }
3137
        if (pInfo->pTombData == NULL) {
8,000✔
3138
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
1,000✔
3139
          if (!pInfo->pTombData) {
1,000✔
3140
            code = terrno;
×
3141
            finished = true;
×
3142
            break;
×
3143
          }
3144
        }
3145
      }
3146

3147
      if (record.version <= pReader->info.verRange.maxVer) {
8,000✔
3148
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
3149
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
3150

3151
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
8,000✔
3152
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
16,000✔
3153
          TAOS_RETURN(terrno);
×
3154
        }
3155
      }
3156
    }
3157

3158
    tTombBlockDestroy(&block);
8,000✔
3159

3160
    if (finished) {
8,000✔
3161
      TAOS_RETURN(code);
×
3162
    }
3163
  }
3164

3165
  TAOS_RETURN(TSDB_CODE_SUCCESS);
17,945✔
3166
}
3167

3168
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
8,000✔
3169
  const TTombBlkArray *pBlkArray = NULL;
8,000✔
3170

3171
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
8,000✔
3172

3173
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
8,000✔
3174
}
3175

3176
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
9,945✔
3177
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
9,945✔
3178
  const TTombBlkArray *pBlkArray = NULL;
9,945✔
3179

3180
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
9,945✔
3181

3182
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
9,945✔
3183
}
3184

3185
typedef struct {
3186
  SMergeTree  mergeTree;
3187
  SMergeTree *pMergeTree;
3188
} SFSLastIter;
3189

3190
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
17,945✔
3191
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
3192
  int32_t code = 0;
17,945✔
3193
  destroySttBlockReader(pr->pLDataIterArray, NULL);
17,945✔
3194
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
17,945✔
3195
  if (pr->pLDataIterArray == NULL) return terrno;
17,945✔
3196

3197
  SMergeTreeConf conf = {
17,945✔
3198
      .uid = uid,
3199
      .suid = suid,
3200
      .pTsdb = pTsdb,
3201
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
3202
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
3203
      .strictTimeRange = false,
3204
      .cacheStatis = false,
3205
      .pSchema = pTSchema,
3206
      .pCurrentFileset = pFileSet,
3207
      .backward = 1,
3208
      .pSttFileBlockIterArray = pr->pLDataIterArray,
17,945✔
3209
      .pCols = aCols,
3210
      .numOfCols = nCols,
3211
      .loadTombFn = loadSttTomb,
3212
      .pReader = pr,
3213
      .idstr = pr->idstr,
17,945✔
3214
      .pCurRowKey = &pr->rowKey,
17,945✔
3215
  };
3216

3217
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
17,945✔
3218

3219
  iter->pMergeTree = &iter->mergeTree;
17,945✔
3220

3221
  TAOS_RETURN(code);
17,945✔
3222
}
3223

3224
static int32_t lastIterClose(SFSLastIter **iter) {
1,000✔
3225
  int32_t code = 0;
1,000✔
3226

3227
  if ((*iter)->pMergeTree) {
1,000✔
3228
    tMergeTreeClose((*iter)->pMergeTree);
1,000✔
3229
    (*iter)->pMergeTree = NULL;
1,000✔
3230
  }
3231

3232
  *iter = NULL;
1,000✔
3233

3234
  TAOS_RETURN(code);
1,000✔
3235
}
3236

3237
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
20,149✔
3238
  bool hasVal = false;
20,149✔
3239
  *ppRow = NULL;
20,149✔
3240

3241
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
20,149✔
3242
  if (code != 0) {
20,149✔
3243
    return code;
×
3244
  }
3245

3246
  if (!hasVal) {
20,149✔
3247
    *ppRow = NULL;
15,939✔
3248
    TAOS_RETURN(code);
15,939✔
3249
  }
3250

3251
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
4,210✔
3252
  TAOS_RETURN(code);
4,210✔
3253
}
3254

3255
typedef enum SFSNEXTROWSTATES {
3256
  SFSNEXTROW_FS,
3257
  SFSNEXTROW_FILESET,
3258
  SFSNEXTROW_INDEXLIST,
3259
  SFSNEXTROW_BRINBLOCK,
3260
  SFSNEXTROW_BRINRECORD,
3261
  SFSNEXTROW_BLOCKDATA,
3262
  SFSNEXTROW_BLOCKROW,
3263
  SFSNEXTROW_NEXTSTTROW
3264
} SFSNEXTROWSTATES;
3265

3266
struct CacheNextRowIter;
3267

3268
typedef struct SFSNextRowIter {
3269
  SFSNEXTROWSTATES         state;         // [input]
3270
  SBlockIdx               *pBlockIdxExp;  // [input]
3271
  STSchema                *pTSchema;      // [input]
3272
  tb_uid_t                 suid;
3273
  tb_uid_t                 uid;
3274
  int32_t                  iFileSet;
3275
  STFileSet               *pFileSet;
3276
  TFileSetArray           *aDFileSet;
3277
  SArray                  *pIndexList;
3278
  int32_t                  iBrinIndex;
3279
  SBrinBlock               brinBlock;
3280
  SBrinBlock              *pBrinBlock;
3281
  int32_t                  iBrinRecord;
3282
  SBrinRecord              brinRecord;
3283
  SBlockData               blockData;
3284
  SBlockData              *pBlockData;
3285
  int32_t                  nRow;
3286
  int32_t                  iRow;
3287
  TSDBROW                  row;
3288
  int64_t                  lastTs;
3289
  SFSLastIter              lastIter;
3290
  SFSLastIter             *pLastIter;
3291
  int8_t                   lastEmpty;
3292
  TSDBROW                 *pLastRow;
3293
  SRow                    *pTSRow;
3294
  SRowMerger               rowMerger;
3295
  SCacheRowsReader        *pr;
3296
  struct CacheNextRowIter *pRowIter;
3297
} SFSNextRowIter;
3298

3299
static void clearLastFileSet(SFSNextRowIter *state);
3300

3301
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
119,169✔
3302
                                int nCols) {
3303
  int32_t         code = 0, lino = 0;
119,169✔
3304
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
119,169✔
3305
  STsdb          *pTsdb = state->pr->pTsdb;
119,169✔
3306

3307
  if (SFSNEXTROW_FS == state->state) {
119,911✔
3308
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
117,965✔
3309

3310
    state->state = SFSNEXTROW_FILESET;
118,707✔
3311
  }
3312

3313
  if (SFSNEXTROW_FILESET == state->state) {
119,169✔
3314
  _next_fileset:
132,646✔
3315
    clearLastFileSet(state);
132,646✔
3316

3317
    if (--state->iFileSet < 0) {
131,904✔
3318
      *ppRow = NULL;
114,701✔
3319

3320
      TAOS_RETURN(code);
113,959✔
3321
    } else {
3322
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
17,945✔
3323
    }
3324

3325
    STFileObj **pFileObj = state->pFileSet->farr;
17,945✔
3326
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
17,945✔
3327
      if (state->pFileSet != state->pr->pCurFileSet) {
8,000✔
3328
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
8,000✔
3329
        const char           *filesName[4] = {0};
8,000✔
3330
        if (pFileObj[0] != NULL) {
8,000✔
3331
          conf.files[0].file = *pFileObj[0]->f;
8,000✔
3332
          conf.files[0].exist = true;
8,000✔
3333
          filesName[0] = pFileObj[0]->fname;
8,000✔
3334

3335
          conf.files[1].file = *pFileObj[1]->f;
8,000✔
3336
          conf.files[1].exist = true;
8,000✔
3337
          filesName[1] = pFileObj[1]->fname;
8,000✔
3338

3339
          conf.files[2].file = *pFileObj[2]->f;
8,000✔
3340
          conf.files[2].exist = true;
8,000✔
3341
          filesName[2] = pFileObj[2]->fname;
8,000✔
3342
        }
3343

3344
        if (pFileObj[3] != NULL) {
8,000✔
3345
          conf.files[3].exist = true;
8,000✔
3346
          conf.files[3].file = *pFileObj[3]->f;
8,000✔
3347
          filesName[3] = pFileObj[3]->fname;
8,000✔
3348
        }
3349

3350
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
8,000✔
3351

3352
        state->pr->pCurFileSet = state->pFileSet;
8,000✔
3353

3354
        code = loadDataTomb(state->pr, state->pr->pFileReader);
8,000✔
3355
        if (code != TSDB_CODE_SUCCESS) {
8,000✔
3356
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3357
                    tstrerror(code));
3358
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3359
        }
3360

3361
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
8,000✔
3362
      }
3363

3364
      if (!state->pIndexList) {
8,000✔
3365
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
8,000✔
3366
        if (!state->pIndexList) {
8,000✔
3367
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3368
        }
3369
      } else {
3370
        taosArrayClear(state->pIndexList);
×
3371
      }
3372

3373
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
8,000✔
3374

3375
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
16,000✔
3376
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
8,000✔
3377
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
8,000✔
3378
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
8,000✔
3379
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
2,000✔
3380
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3381
            }
3382
          }
3383
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
3384
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
3385
          break;
3386
        }
3387
      }
3388

3389
      int indexSize = TARRAY_SIZE(state->pIndexList);
8,000✔
3390
      if (indexSize <= 0) {
8,000✔
3391
        goto _check_stt_data;
7,000✔
3392
      }
3393

3394
      state->state = SFSNEXTROW_INDEXLIST;
1,000✔
3395
      state->iBrinIndex = 1;
1,000✔
3396
    }
3397

3398
  _check_stt_data:
17,945✔
3399
    if (state->pFileSet != state->pr->pCurFileSet) {
17,945✔
3400
      state->pr->pCurFileSet = state->pFileSet;
8,909✔
3401
    }
3402

3403
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
17,945✔
3404
                                 state->pr, state->lastTs, aCols, nCols),
3405
                    &lino, _err);
3406

3407
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
17,945✔
3408

3409
    if (!state->pLastRow) {
17,945✔
3410
      state->lastEmpty = 1;
14,216✔
3411

3412
      if (SFSNEXTROW_INDEXLIST != state->state) {
14,216✔
3413
        clearLastFileSet(state);
13,216✔
3414
        goto _next_fileset;
13,216✔
3415
      }
3416
    } else {
3417
      state->lastEmpty = 0;
3,729✔
3418

3419
      if (SFSNEXTROW_INDEXLIST != state->state) {
3,729✔
3420
        state->state = SFSNEXTROW_NEXTSTTROW;
3,729✔
3421

3422
        *ppRow = state->pLastRow;
3,729✔
3423
        state->pLastRow = NULL;
3,729✔
3424

3425
        TAOS_RETURN(code);
3,729✔
3426
      }
3427
    }
3428

3429
    state->pLastIter = &state->lastIter;
1,000✔
3430
  }
3431

3432
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
2,204✔
3433
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
1,204✔
3434

3435
    if (!state->pLastRow) {
1,204✔
3436
      if (state->pLastIter) {
723✔
3437
        code = lastIterClose(&state->pLastIter);
×
3438
        if (code != TSDB_CODE_SUCCESS) {
×
3439
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3440
                    tstrerror(code));
3441
          TAOS_RETURN(code);
×
3442
        }
3443
      }
3444

3445
      clearLastFileSet(state);
723✔
3446
      state->state = SFSNEXTROW_FILESET;
723✔
3447
      goto _next_fileset;
723✔
3448
    } else {
3449
      *ppRow = state->pLastRow;
481✔
3450
      state->pLastRow = NULL;
481✔
3451

3452
      TAOS_RETURN(code);
481✔
3453
    }
3454
  }
3455

3456
  if (SFSNEXTROW_INDEXLIST == state->state) {
1,000✔
3457
    SBrinBlk *pBrinBlk = NULL;
1,000✔
3458
  _next_brinindex:
1,000✔
3459
    if (--state->iBrinIndex < 0) {
1,000✔
3460
      if (state->pLastRow) {
×
3461
        state->state = SFSNEXTROW_NEXTSTTROW;
×
3462
        *ppRow = state->pLastRow;
×
3463
        state->pLastRow = NULL;
×
3464
        return code;
×
3465
      }
3466

3467
      clearLastFileSet(state);
×
3468
      goto _next_fileset;
×
3469
    } else {
3470
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
1,000✔
3471
    }
3472

3473
    if (!state->pBrinBlock) {
1,000✔
3474
      state->pBrinBlock = &state->brinBlock;
1,000✔
3475
    } else {
3476
      tBrinBlockClear(&state->brinBlock);
×
3477
    }
3478

3479
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
1,000✔
3480

3481
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
1,000✔
3482
    state->state = SFSNEXTROW_BRINBLOCK;
1,000✔
3483
  }
3484

3485
  if (SFSNEXTROW_BRINBLOCK == state->state) {
1,000✔
3486
  _next_brinrecord:
1,000✔
3487
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
1,000✔
3488
      tBrinBlockClear(&state->brinBlock);
×
3489
      goto _next_brinindex;
×
3490
    }
3491

3492
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
1,000✔
3493

3494
    SBrinRecord *pRecord = &state->brinRecord;
1,000✔
3495
    if (pRecord->uid != state->uid) {
1,000✔
3496
      // TODO: goto next brin block early
3497
      --state->iBrinRecord;
×
3498
      goto _next_brinrecord;
×
3499
    }
3500

3501
    state->state = SFSNEXTROW_BRINRECORD;
1,000✔
3502
  }
3503

3504
  if (SFSNEXTROW_BRINRECORD == state->state) {
1,000✔
3505
    SBrinRecord *pRecord = &state->brinRecord;
1,000✔
3506

3507
    if (!state->pBlockData) {
1,000✔
3508
      state->pBlockData = &state->blockData;
1,000✔
3509

3510
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
1,000✔
3511
    } else {
3512
      tBlockDataReset(state->pBlockData);
×
3513
    }
3514

3515
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
1,000✔
3516
      --nCols;
1,000✔
3517
      ++aCols;
1,000✔
3518
    }
3519

3520
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
1,000✔
3521
                                                      state->pTSchema, aCols, nCols),
3522
                    &lino, _err);
3523

3524
    state->nRow = state->blockData.nRow;
1,000✔
3525
    state->iRow = state->nRow - 1;
1,000✔
3526

3527
    state->state = SFSNEXTROW_BLOCKROW;
1,000✔
3528
  }
3529

3530
  if (SFSNEXTROW_BLOCKROW == state->state) {
1,000✔
3531
    if (state->iRow < 0) {
1,000✔
3532
      --state->iBrinRecord;
×
3533
      goto _next_brinrecord;
×
3534
    }
3535

3536
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
1,000✔
3537
    if (!state->pLastIter) {
1,000✔
3538
      *ppRow = &state->row;
×
3539
      --state->iRow;
×
3540
      return code;
×
3541
    }
3542

3543
    if (!state->pLastRow) {
1,000✔
3544
      // get next row from fslast and process with fs row, --state->Row if select fs row
3545
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
1,000✔
3546
    }
3547

3548
    if (!state->pLastRow) {
1,000✔
3549
      if (state->pLastIter) {
1,000✔
3550
        code = lastIterClose(&state->pLastIter);
1,000✔
3551
        if (code != TSDB_CODE_SUCCESS) {
1,000✔
3552
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3553
                    tstrerror(code));
3554
          TAOS_RETURN(code);
×
3555
        }
3556
      }
3557

3558
      *ppRow = &state->row;
1,000✔
3559
      --state->iRow;
1,000✔
3560
      return code;
1,000✔
3561
    }
3562

3563
    // process state->pLastRow & state->row
3564
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
3565
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
3566
    if (lastRowTs > rowTs) {
×
3567
      *ppRow = state->pLastRow;
×
3568
      state->pLastRow = NULL;
×
3569

3570
      TAOS_RETURN(code);
×
3571
    } else if (lastRowTs < rowTs) {
×
3572
      *ppRow = &state->row;
×
3573
      --state->iRow;
×
3574

3575
      TAOS_RETURN(code);
×
3576
    } else {
3577
      // TODO: merge rows and *ppRow = mergedRow
3578
      SRowMerger *pMerger = &state->rowMerger;
×
3579
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
3580
      if (code != TSDB_CODE_SUCCESS) {
×
3581
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3582
                  tstrerror(code));
3583
        TAOS_RETURN(code);
×
3584
      }
3585

3586
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
3587
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
3588

3589
      if (state->pTSRow) {
×
3590
        taosMemoryFree(state->pTSRow);
×
3591
        state->pTSRow = NULL;
×
3592
      }
3593

3594
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
3595

3596
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
3597
      *ppRow = &state->row;
×
3598
      --state->iRow;
×
3599

3600
      tsdbRowMergerClear(pMerger);
×
3601

3602
      TAOS_RETURN(code);
×
3603
    }
3604
  }
3605

3606
_err:
×
3607
  clearLastFileSet(state);
×
3608

3609
  *ppRow = NULL;
×
3610

3611
  if (code) {
×
3612
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3613
              tstrerror(code));
3614
  }
3615

3616
  TAOS_RETURN(code);
×
3617
}
3618

3619
typedef struct CacheNextRowIter {
3620
  SArray           *pMemDelData;
3621
  SArray           *pSkyline;
3622
  int64_t           iSkyline;
3623
  SBlockIdx         idx;
3624
  SMemNextRowIter   memState;
3625
  SMemNextRowIter   imemState;
3626
  SFSNextRowIter    fsState;
3627
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
3628
  TsdbNextRowState  input[3];
3629
  SCacheRowsReader *pr;
3630
  STsdb            *pTsdb;
3631
} CacheNextRowIter;
3632

3633
int32_t clearNextRowFromFS(void *iter) {
118,707✔
3634
  int32_t code = 0;
118,707✔
3635

3636
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
118,707✔
3637
  if (!state) {
118,707✔
3638
    TAOS_RETURN(code);
×
3639
  }
3640

3641
  if (state->pLastIter) {
118,707✔
3642
    code = lastIterClose(&state->pLastIter);
×
3643
    if (code != TSDB_CODE_SUCCESS) {
×
3644
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3645
      TAOS_RETURN(code);
×
3646
    }
3647
  }
3648

3649
  if (state->pBlockData) {
117,965✔
3650
    tBlockDataDestroy(state->pBlockData);
1,000✔
3651
    state->pBlockData = NULL;
1,000✔
3652
  }
3653

3654
  if (state->pBrinBlock) {
117,965✔
3655
    tBrinBlockDestroy(state->pBrinBlock);
1,000✔
3656
    state->pBrinBlock = NULL;
1,000✔
3657
  }
3658

3659
  if (state->pIndexList) {
118,707✔
3660
    taosArrayDestroy(state->pIndexList);
8,000✔
3661
    state->pIndexList = NULL;
8,000✔
3662
  }
3663

3664
  if (state->pTSRow) {
118,707✔
3665
    taosMemoryFree(state->pTSRow);
×
3666
    state->pTSRow = NULL;
×
3667
  }
3668

3669
  if (state->pRowIter->pSkyline) {
118,707✔
3670
    taosArrayDestroy(state->pRowIter->pSkyline);
113,832✔
3671
    state->pRowIter->pSkyline = NULL;
113,832✔
3672
  }
3673

3674
  TAOS_RETURN(code);
118,707✔
3675
}
3676

3677
static void clearLastFileSet(SFSNextRowIter *state) {
145,843✔
3678
  if (state->pLastIter) {
145,843✔
3679
    int code = lastIterClose(&state->pLastIter);
×
3680
    if (code != TSDB_CODE_SUCCESS) {
×
3681
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3682
      return;
×
3683
    }
3684
  }
3685

3686
  if (state->pBlockData) {
146,585✔
3687
    tBlockDataDestroy(state->pBlockData);
×
3688
    state->pBlockData = NULL;
×
3689
  }
3690

3691
  if (state->pr->pFileReader) {
146,585✔
3692
    tsdbDataFileReaderClose(&state->pr->pFileReader);
8,000✔
3693
    state->pr->pFileReader = NULL;
8,000✔
3694

3695
    state->pr->pCurFileSet = NULL;
8,000✔
3696
  }
3697

3698
  if (state->pTSRow) {
145,843✔
3699
    taosMemoryFree(state->pTSRow);
×
3700
    state->pTSRow = NULL;
×
3701
  }
3702

3703
  if (state->pRowIter->pSkyline) {
146,585✔
3704
    taosArrayDestroy(state->pRowIter->pSkyline);
723✔
3705
    state->pRowIter->pSkyline = NULL;
723✔
3706

3707
    void   *pe = NULL;
723✔
3708
    int32_t iter = 0;
723✔
3709
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
1,446✔
3710
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
723✔
3711
      taosArrayDestroy(pInfo->pTombData);
723✔
3712
      pInfo->pTombData = NULL;
723✔
3713
    }
3714
  }
3715
}
3716

3717
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
118,707✔
3718
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
3719
                               SCacheRowsReader *pr) {
3720
  int32_t code = 0, lino = 0;
118,707✔
3721

3722
  STbData *pMem = NULL;
118,707✔
3723
  if (pReadSnap->pMem) {
118,707✔
3724
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
118,707✔
3725
  }
3726

3727
  STbData *pIMem = NULL;
118,707✔
3728
  if (pReadSnap->pIMem) {
118,707✔
3729
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
3730
  }
3731

3732
  pIter->pTsdb = pTsdb;
118,707✔
3733

3734
  pIter->pMemDelData = NULL;
118,707✔
3735

3736
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
118,707✔
3737

3738
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
118,707✔
3739

3740
  pIter->fsState.pRowIter = pIter;
118,707✔
3741
  pIter->fsState.state = SFSNEXTROW_FS;
118,707✔
3742
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
118,707✔
3743
  pIter->fsState.pBlockIdxExp = &pIter->idx;
118,707✔
3744
  pIter->fsState.pTSchema = pTSchema;
118,707✔
3745
  pIter->fsState.suid = suid;
118,707✔
3746
  pIter->fsState.uid = uid;
117,965✔
3747
  pIter->fsState.lastTs = lastTs;
118,707✔
3748
  pIter->fsState.pr = pr;
118,707✔
3749

3750
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
117,965✔
3751
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
118,707✔
3752
  pIter->input[2] =
118,707✔
3753
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
117,965✔
3754

3755
  if (pMem) {
118,707✔
3756
    pIter->memState.pMem = pMem;
111,157✔
3757
    pIter->memState.state = SMEMNEXTROW_ENTER;
111,157✔
3758
    pIter->memState.lastTs = lastTs;
110,415✔
3759
    pIter->input[0].stop = false;
110,415✔
3760
    pIter->input[0].next = true;
110,415✔
3761
  }
3762

3763
  if (pIMem) {
117,965✔
3764
    pIter->imemState.pMem = pIMem;
×
3765
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
3766
    pIter->imemState.lastTs = lastTs;
×
3767
    pIter->input[1].stop = false;
×
3768
    pIter->input[1].next = true;
×
3769
  }
3770

3771
  pIter->pr = pr;
117,965✔
3772

3773
_err:
117,965✔
3774
  TAOS_RETURN(code);
117,965✔
3775
}
3776

3777
static void nextRowIterClose(CacheNextRowIter *pIter) {
117,965✔
3778
  for (int i = 0; i < 3; ++i) {
471,860✔
3779
    if (pIter->input[i].nextRowClearFn) {
354,637✔
3780
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
118,707✔
3781
    }
3782
  }
3783

3784
  if (pIter->pSkyline) {
117,223✔
3785
    taosArrayDestroy(pIter->pSkyline);
×
3786
  }
3787

3788
  if (pIter->pMemDelData) {
117,965✔
3789
    taosArrayDestroy(pIter->pMemDelData);
118,707✔
3790
  }
3791
}
118,707✔
3792

3793
// iterate next row non deleted backward ts, version (from high to low)
3794
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
125,386✔
3795
                              int16_t *aCols, int nCols) {
3796
  int32_t code = 0, lino = 0;
125,386✔
3797

3798
  for (;;) {
×
3799
    for (int i = 0; i < 3; ++i) {
501,814✔
3800
      if (pIter->input[i].next && !pIter->input[i].stop) {
376,546✔
3801
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
237,167✔
3802
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3803
                        &lino, _err);
3804

3805
        if (pIter->input[i].pRow == NULL) {
236,425✔
3806
          pIter->input[i].stop = true;
118,300✔
3807
          pIter->input[i].next = false;
117,558✔
3808
        }
3809
      }
3810
    }
3811

3812
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
125,268✔
3813
      *ppRow = NULL;
7,633✔
3814
      *pIgnoreEarlierTs =
15,266✔
3815
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
7,633✔
3816

3817
      TAOS_RETURN(code);
7,633✔
3818
    }
3819

3820
    // select maxpoint(s) from mem, imem, fs and last
3821
    TSDBROW *max[4] = {0};
117,635✔
3822
    int      iMax[4] = {-1, -1, -1, -1};
117,635✔
3823
    int      nMax = 0;
117,635✔
3824
    SRowKey  maxKey = {.ts = TSKEY_MIN};
117,635✔
3825

3826
    for (int i = 0; i < 3; ++i) {
471,282✔
3827
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
353,647✔
3828
        STsdbRowKey tsdbRowKey = {0};
118,848✔
3829
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
119,590✔
3830

3831
        // merging & deduplicating on client side
3832
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
119,590✔
3833
        if (c <= 0) {
118,848✔
3834
          if (c < 0) {
118,749✔
3835
            nMax = 0;
118,749✔
3836
            maxKey = tsdbRowKey.key;
118,749✔
3837
          }
3838

3839
          iMax[nMax] = i;
118,749✔
3840
          max[nMax++] = pIter->input[i].pRow;
119,491✔
3841
        }
3842
        pIter->input[i].next = false;
119,590✔
3843
      }
3844
    }
3845

3846
    // delete detection
3847
    TSDBROW *merge[4] = {0};
117,635✔
3848
    int      iMerge[4] = {-1, -1, -1, -1};
117,635✔
3849
    int      nMerge = 0;
118,377✔
3850
    for (int i = 0; i < nMax; ++i) {
236,754✔
3851
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
118,377✔
3852

3853
      if (!pIter->pSkyline) {
117,635✔
3854
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
114,555✔
3855
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
115,297✔
3856

3857
        uint64_t        uid = pIter->idx.uid;
115,297✔
3858
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
115,297✔
3859
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
115,297✔
3860

3861
        if (pInfo->pTombData == NULL) {
115,297✔
3862
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
112,555✔
3863
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
112,555✔
3864
        }
3865

3866
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
115,297✔
3867
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3868
        }
3869

3870
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
114,555✔
3871
        if (delSize > 0) {
115,297✔
3872
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
2,248✔
3873
          TAOS_CHECK_GOTO(code, &lino, _err);
2,248✔
3874
        }
3875
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
115,297✔
3876
      }
3877

3878
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
118,377✔
3879
      if (!deleted) {
118,377✔
3880
        iMerge[nMerge] = iMax[i];
117,753✔
3881
        merge[nMerge++] = max[i];
117,011✔
3882
      }
3883

3884
      pIter->input[iMax[i]].next = deleted;
118,377✔
3885
    }
3886

3887
    if (nMerge > 0) {
118,377✔
3888
      pIter->input[iMerge[0]].next = true;
117,753✔
3889

3890
      *ppRow = merge[0];
117,753✔
3891

3892
      TAOS_RETURN(code);
117,011✔
3893
    }
3894
  }
3895

3896
_err:
×
3897
  if (code) {
×
3898
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3899
  }
3900

3901
  TAOS_RETURN(code);
×
3902
}
3903

3904
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
118,707✔
3905
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
118,707✔
3906
  if (NULL == pColArray) {
118,707✔
3907
    TAOS_RETURN(terrno);
×
3908
  }
3909

3910
  for (int32_t i = 0; i < nCols; ++i) {
400,109✔
3911
    int16_t  slotId = slotIds[i];
281,402✔
3912
    SLastCol col = {.rowKey.ts = 0,
281,402✔
3913
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
281,402✔
3914
    if (!taosArrayPush(pColArray, &col)) {
281,402✔
3915
      TAOS_RETURN(terrno);
×
3916
    }
3917
  }
3918
  *ppColArray = pColArray;
118,707✔
3919

3920
  TAOS_RETURN(TSDB_CODE_SUCCESS);
118,707✔
3921
}
3922

3923
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
61,149✔
3924
                            int nCols, int16_t *slotIds) {
3925
  int32_t   code = 0, lino = 0;
61,149✔
3926
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
61,149✔
3927
  int16_t   nLastCol = nCols;
61,149✔
3928
  int16_t   noneCol = 0;
61,149✔
3929
  bool      setNoneCol = false;
61,149✔
3930
  bool      hasRow = false;
61,149✔
3931
  bool      ignoreEarlierTs = false;
61,149✔
3932
  SArray   *pColArray = NULL;
61,149✔
3933
  SColVal  *pColVal = &(SColVal){0};
61,149✔
3934

3935
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
61,149✔
3936

3937
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
61,149✔
3938
  if (NULL == aColArray) {
61,149✔
3939
    taosArrayDestroy(pColArray);
×
3940

3941
    TAOS_RETURN(terrno);
×
3942
  }
3943

3944
  for (int i = 0; i < nCols; ++i) {
206,097✔
3945
    if (!taosArrayPush(aColArray, &aCols[i])) {
289,896✔
3946
      taosArrayDestroy(pColArray);
×
3947

3948
      TAOS_RETURN(terrno);
×
3949
    }
3950
  }
3951

3952
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
61,149✔
3953

3954
  // inverse iterator
3955
  CacheNextRowIter iter = {0};
61,149✔
3956
  code =
3957
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
61,149✔
3958
  TAOS_CHECK_GOTO(code, &lino, _err);
61,149✔
3959

3960
  do {
3961
    TSDBROW *pRow = NULL;
67,828✔
3962
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
67,828✔
3963

3964
    if (!pRow) {
67,828✔
3965
      break;
6,652✔
3966
    }
3967

3968
    hasRow = true;
61,176✔
3969

3970
    int32_t sversion = TSDBROW_SVERSION(pRow);
61,176✔
3971
    if (sversion != -1) {
61,176✔
3972
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
57,571✔
3973

3974
      pTSchema = pr->pCurrSchema;
57,571✔
3975
    }
3976
    // int16_t nCol = pTSchema->numOfCols;
3977

3978
    STsdbRowKey rowKey = {0};
61,176✔
3979
    tsdbRowGetKey(pRow, &rowKey);
61,176✔
3980

3981
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
61,176✔
3982
      lastRowKey = rowKey;
58,096✔
3983

3984
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
194,957✔
3985
        if (iCol >= nLastCol) {
136,861✔
3986
          break;
×
3987
        }
3988
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
136,861✔
3989
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
136,861✔
3990
          if (!setNoneCol) {
99✔
3991
            noneCol = iCol;
×
3992
            setNoneCol = true;
×
3993
          }
3994
          continue;
99✔
3995
        }
3996
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
136,762✔
3997
          continue;
×
3998
        }
3999
        if (slotIds[iCol] == 0) {
136,762✔
4000
          STColumn *pTColumn = &pTSchema->columns[0];
58,096✔
4001
          SValue    val = {.type = pTColumn->type};
58,096✔
4002
          VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
58,096✔
4003
          *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
58,096✔
4004

4005
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
58,096✔
4006
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
58,096✔
4007

4008
          taosArraySet(pColArray, 0, &colTmp);
58,096✔
4009
          continue;
58,096✔
4010
        }
4011
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
78,666✔
4012

4013
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
78,666✔
4014
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
78,666✔
4015

4016
        if (!COL_VAL_IS_VALUE(pColVal)) {
78,666✔
4017
          if (!setNoneCol) {
9,061✔
4018
            noneCol = iCol;
5,580✔
4019
            setNoneCol = true;
5,580✔
4020
          }
4021
        } else {
4022
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
69,605✔
4023
          if (aColIndex >= 0) {
69,605✔
4024
            taosArrayRemove(aColArray, aColIndex);
69,605✔
4025
          }
4026
        }
4027
      }
4028
      if (!setNoneCol) {
58,096✔
4029
        // done, goto return pColArray
4030
        break;
52,516✔
4031
      } else {
4032
        continue;
5,580✔
4033
      }
4034
    }
4035

4036
    // merge into pColArray
4037
    setNoneCol = false;
3,080✔
4038
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
10,740✔
4039
      if (iCol >= nLastCol) {
7,660✔
4040
        break;
×
4041
      }
4042
      // high version's column value
4043
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
7,660✔
4044
        continue;
99✔
4045
      }
4046

4047
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
7,561✔
4048
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
7,561✔
4049
        continue;
×
4050
      }
4051
      SColVal *tColVal = &lastColVal->colVal;
7,561✔
4052
      if (COL_VAL_IS_VALUE(tColVal)) continue;
7,561✔
4053

4054
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
5,061✔
4055
      if (COL_VAL_IS_VALUE(pColVal)) {
5,061✔
4056
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
3,962✔
4057
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
3,962✔
4058

4059
        tsdbCacheFreeSLastColItem(lastColVal);
3,962✔
4060
        taosArraySet(pColArray, iCol, &lastCol);
3,962✔
4061
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
3,962✔
4062
        if (aColIndex >= 0) {
3,962✔
4063
          taosArrayRemove(aColArray, aColIndex);
3,962✔
4064
        }
4065
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
1,099✔
4066
        noneCol = iCol;
1,099✔
4067
        setNoneCol = true;
1,099✔
4068
      }
4069
    }
4070
  } while (setNoneCol);
8,660✔
4071

4072
  if (!hasRow) {
61,149✔
4073
    if (ignoreEarlierTs) {
3,053✔
4074
      taosArrayDestroy(pColArray);
×
4075
      pColArray = NULL;
×
4076
    } else {
4077
      taosArrayClear(pColArray);
3,053✔
4078
    }
4079
  }
4080
  *ppLastArray = pColArray;
61,149✔
4081

4082
  nextRowIterClose(&iter);
61,149✔
4083
  taosArrayDestroy(aColArray);
61,149✔
4084

4085
  TAOS_RETURN(code);
61,149✔
4086

4087
_err:
×
4088
  nextRowIterClose(&iter);
×
4089
  // taosMemoryFreeClear(pTSchema);
4090
  *ppLastArray = NULL;
×
4091
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
4092
  taosArrayDestroy(aColArray);
×
4093

4094
  if (code) {
×
4095
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4096
              tstrerror(code));
4097
  }
4098

4099
  TAOS_RETURN(code);
×
4100
}
4101

4102
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
57,558✔
4103
                               int nCols, int16_t *slotIds) {
4104
  int32_t   code = 0, lino = 0;
57,558✔
4105
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
57,558✔
4106
  int16_t   nLastCol = nCols;
57,558✔
4107
  int16_t   noneCol = 0;
57,558✔
4108
  bool      setNoneCol = false;
57,558✔
4109
  bool      hasRow = false;
57,558✔
4110
  bool      ignoreEarlierTs = false;
57,558✔
4111
  SArray   *pColArray = NULL;
57,558✔
4112
  SColVal  *pColVal = &(SColVal){0};
57,558✔
4113

4114
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
57,558✔
4115

4116
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
57,558✔
4117
  if (NULL == aColArray) {
57,558✔
4118
    taosArrayDestroy(pColArray);
×
4119

4120
    TAOS_RETURN(terrno);
×
4121
  }
4122

4123
  for (int i = 0; i < nCols; ++i) {
194,012✔
4124
    if (!taosArrayPush(aColArray, &aCols[i])) {
272,908✔
4125
      taosArrayDestroy(pColArray);
×
4126

4127
      TAOS_RETURN(terrno);
×
4128
    }
4129
  }
4130

4131
  // inverse iterator
4132
  CacheNextRowIter iter = {0};
57,558✔
4133
  code =
4134
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
57,558✔
4135
  TAOS_CHECK_GOTO(code, &lino, _err);
56,816✔
4136

4137
  do {
4138
    TSDBROW *pRow = NULL;
56,816✔
4139
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
56,816✔
4140

4141
    if (!pRow) {
56,816✔
4142
      break;
981✔
4143
    }
4144

4145
    hasRow = true;
55,835✔
4146

4147
    int32_t sversion = TSDBROW_SVERSION(pRow);
55,835✔
4148
    if (sversion != -1) {
55,835✔
4149
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
55,596✔
4150

4151
      pTSchema = pr->pCurrSchema;
55,596✔
4152
    }
4153
    // int16_t nCol = pTSchema->numOfCols;
4154

4155
    STsdbRowKey rowKey = {0};
55,835✔
4156
    tsdbRowGetKey(pRow, &rowKey);
56,577✔
4157

4158
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
188,346✔
4159
      if (iCol >= nLastCol) {
131,769✔
4160
        break;
×
4161
      }
4162
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
131,769✔
4163
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
131,769✔
4164
        continue;
×
4165
      }
4166
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
131,769✔
4167
        continue;
×
4168
      }
4169
      if (slotIds[iCol] == 0) {
131,769✔
4170
        STColumn *pTColumn = &pTSchema->columns[0];
56,577✔
4171
        SValue    val = {.type = pTColumn->type};
56,577✔
4172
        VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
56,577✔
4173
        *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
56,577✔
4174

4175
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
56,577✔
4176
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
56,577✔
4177

4178
        taosArraySet(pColArray, 0, &colTmp);
56,577✔
4179
        continue;
55,835✔
4180
      }
4181
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
75,192✔
4182

4183
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
75,934✔
4184
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
75,192✔
4185

4186
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
75,934✔
4187
      if (aColIndex >= 0) {
75,192✔
4188
        taosArrayRemove(aColArray, aColIndex);
75,192✔
4189
      }
4190
    }
4191

4192
    break;
56,577✔
4193
  } while (1);
4194

4195
  if (!hasRow) {
57,558✔
4196
    if (ignoreEarlierTs) {
981✔
4197
      taosArrayDestroy(pColArray);
×
4198
      pColArray = NULL;
×
4199
    } else {
4200
      taosArrayClear(pColArray);
981✔
4201
    }
4202
  }
4203
  *ppLastArray = pColArray;
57,558✔
4204

4205
  nextRowIterClose(&iter);
56,816✔
4206
  taosArrayDestroy(aColArray);
57,558✔
4207

4208
  TAOS_RETURN(code);
57,558✔
4209

4210
_err:
×
4211
  nextRowIterClose(&iter);
×
4212

4213
  *ppLastArray = NULL;
×
4214
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
4215
  taosArrayDestroy(aColArray);
×
4216

4217
  if (code) {
×
4218
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4219
              tstrerror(code));
4220
  }
4221

4222
  TAOS_RETURN(code);
×
4223
}
4224

4225
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
12,277,642✔
4226

4227
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
4,869✔
4228
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
4,869✔
4229
}
4,869✔
4230

4231
#ifdef BUILD_NO_CALL
4232
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
4233
#endif
4234

4235
size_t tsdbCacheGetUsage(SVnode *pVnode) {
170,883,162✔
4236
  size_t usage = 0;
170,883,162✔
4237
  if (pVnode->pTsdb != NULL) {
170,883,162✔
4238
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
170,883,162✔
4239
  }
4240

4241
  return usage;
170,883,162✔
4242
}
4243

4244
int32_t tsdbCacheGetElems(SVnode *pVnode) {
170,883,162✔
4245
  int32_t elems = 0;
170,883,162✔
4246
  if (pVnode->pTsdb != NULL) {
170,883,162✔
4247
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
170,883,162✔
4248
  }
4249

4250
  return elems;
170,883,162✔
4251
}
4252

4253
#ifdef USE_SHARED_STORAGE
4254
// block cache
4255
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
12,282,604✔
4256
  struct {
4257
    int32_t fid;
4258
    int64_t commitID;
4259
    int64_t blkno;
4260
  } bKey = {0};
12,282,604✔
4261

4262
  bKey.fid = fid;
12,282,604✔
4263
  bKey.commitID = commitID;
12,282,604✔
4264
  bKey.blkno = blkno;
12,282,604✔
4265

4266
  *len = sizeof(bKey);
12,282,604✔
4267
  memcpy(key, &bKey, *len);
12,282,604✔
4268
}
12,282,604✔
4269

4270
static int32_t tsdbCacheLoadBlockSs(STsdbFD *pFD, uint8_t **ppBlock) {
×
4271
  int32_t code = 0;
×
4272

4273
  int64_t block_size = tsSsBlockSize * pFD->szPage;
×
4274
  int64_t block_offset = (pFD->blkno - 1) * block_size;
×
4275

4276
  char *buf = taosMemoryMalloc(block_size);
×
4277
  if (buf == NULL) {
×
4278
    code = TSDB_CODE_OUT_OF_MEMORY;
×
4279
    goto _exit;
×
4280
  }
4281

4282
  // TODO: pFD->objName is not initialized, but this function is never called.
4283
  code = tssReadFileFromDefault(pFD->objName, block_offset, buf, &block_size);
×
4284
  if (code != TSDB_CODE_SUCCESS) {
×
4285
    taosMemoryFree(buf);
×
4286
    goto _exit;
×
4287
  }
4288
  *ppBlock = buf;
×
4289

4290
_exit:
×
4291
  return code;
×
4292
}
4293

4294
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
10,215,931✔
4295
  (void)ud;
4296
  uint8_t *pBlock = (uint8_t *)value;
10,215,931✔
4297

4298
  taosMemoryFree(pBlock);
10,215,931✔
4299
}
10,215,931✔
4300

4301
int32_t tsdbCacheGetBlockSs(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
4302
  int32_t code = 0;
×
4303
  char    key[128] = {0};
×
4304
  int     keyLen = 0;
×
4305

4306
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
4307
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
4308
  if (!h) {
×
4309
    STsdb *pTsdb = pFD->pTsdb;
×
4310
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
4311

4312
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
4313
    if (!h) {
×
4314
      uint8_t *pBlock = NULL;
×
4315
      code = tsdbCacheLoadBlockSs(pFD, &pBlock);
×
4316
      //  if table's empty or error, return code of -1
4317
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
4318
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4319

4320
        *handle = NULL;
×
4321
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
4322
          code = TSDB_CODE_OUT_OF_MEMORY;
×
4323
        }
4324

4325
        TAOS_RETURN(code);
×
4326
      }
4327

4328
      size_t              charge = tsSsBlockSize * pFD->szPage;
×
4329
      _taos_lru_deleter_t deleter = deleteBCache;
×
4330
      LRUStatus           status =
4331
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
4332
      if (status != TAOS_LRU_STATUS_OK) {
4333
        // code = -1;
4334
      }
4335
    }
4336

4337
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4338
  }
4339

4340
  *handle = h;
×
4341

4342
  TAOS_RETURN(code);
×
4343
}
4344

4345
int32_t tsdbCacheGetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
2,066,673✔
4346
  if (!tsSsEnabled) {
2,066,673✔
4347
    return TSDB_CODE_OPS_NOT_SUPPORT;
×
4348
  }
4349

4350
  int32_t code = 0;
2,066,673✔
4351
  char    key[128] = {0};
2,066,673✔
4352
  int     keyLen = 0;
2,066,673✔
4353

4354
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
2,066,673✔
4355
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
2,066,673✔
4356

4357
  return code;
2,066,673✔
4358
}
4359

4360
void tsdbCacheSetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
10,215,931✔
4361
  if (!tsSsEnabled) {
10,215,931✔
4362
    return;
×
4363
  }
4364

4365
  char       key[128] = {0};
10,215,931✔
4366
  int        keyLen = 0;
10,215,931✔
4367
  LRUHandle *handle = NULL;
10,215,931✔
4368

4369
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
10,215,931✔
4370
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
10,215,931✔
4371
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
10,215,931✔
4372
  if (!handle) {
10,215,931✔
4373
    size_t              charge = pFD->szPage;
10,215,931✔
4374
    _taos_lru_deleter_t deleter = deleteBCache;
10,215,931✔
4375
    uint8_t            *pPg = taosMemoryMalloc(charge);
10,215,931✔
4376
    if (!pPg) {
10,215,931✔
4377
      (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4378

4379
      return;  // ignore error with ss cache and leave error untouched
×
4380
    }
4381
    memcpy(pPg, pPage, charge);
10,215,931✔
4382

4383
    LRUStatus status =
4384
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
10,215,931✔
4385
    if (status != TAOS_LRU_STATUS_OK) {
4386
      // ignore cache updating if not ok
4387
      // code = TSDB_CODE_OUT_OF_MEMORY;
4388
    }
4389
  }
4390
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
10,215,931✔
4391

4392
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
10,215,931✔
4393
}
4394
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc