• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5013

03 Apr 2026 03:59PM UTC coverage: 72.317% (+0.01%) from 72.305%
#5013

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

13131 existing lines in 160 files now uncovered.

257489 of 356056 relevant lines covered (72.32%)

129893134.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.33
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbIter.h"
19
#include "tsdbReadUtil.h"
20
#include "tss.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
65,642,387✔
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
65,642,387✔
27
    tsdbTrace(" release lru cache failed");
17,120,425✔
28
  }
29
}
65,664,033✔
30

31
#ifdef USE_SHARED_STORAGE
32

33
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
23,040✔
34
  int32_t code = 0, lino = 0;
23,040✔
35
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
23,040✔
36
  int64_t szBlock = tsSsBlockSize <= 1024 ? 1024 : tsSsBlockSize;
23,040✔
37

38
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsBlockCacheSize * szBlock * szPage, 0, .5);
23,040✔
39
  if (pCache == NULL) {
23,040✔
40
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
41
  }
42

43
  taosLRUCacheSetStrictCapacity(pCache, false);
23,040✔
44

45
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
23,040✔
46

47
  pTsdb->bCache = pCache;
23,040✔
48

49
_err:
23,040✔
50
  if (code) {
23,040✔
51
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
52
              tstrerror(code));
53
  }
54

55
  TAOS_RETURN(code);
23,040✔
56
}
57

58
static void tsdbCloseBCache(STsdb *pTsdb) {
23,040✔
59
  SLRUCache *pCache = pTsdb->bCache;
23,040✔
60
  if (pCache) {
23,040✔
61
    int32_t elems = taosLRUCacheGetElems(pCache);
23,040✔
62
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
23,040✔
63
    taosLRUCacheEraseUnrefEntries(pCache);
23,040✔
64
    elems = taosLRUCacheGetElems(pCache);
23,040✔
65
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
23,040✔
66

67
    taosLRUCacheCleanup(pCache);
23,040✔
68

69
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
23,040✔
70
  }
71
}
23,040✔
72

73
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
23,040✔
74
  int32_t code = 0, lino = 0;
23,040✔
75
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
23,040✔
76

77
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsPageCacheSize * szPage, 0, .5);
23,040✔
78
  if (pCache == NULL) {
23,040✔
79
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
80
  }
81

82
  taosLRUCacheSetStrictCapacity(pCache, false);
23,040✔
83

84
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
23,040✔
85

86
  pTsdb->pgCache = pCache;
23,040✔
87

88
_err:
23,040✔
89
  if (code) {
23,040✔
90
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
91
  }
92

93
  TAOS_RETURN(code);
23,040✔
94
}
95

96
static void tsdbClosePgCache(STsdb *pTsdb) {
23,040✔
97
  SLRUCache *pCache = pTsdb->pgCache;
23,040✔
98
  if (pCache) {
23,040✔
99
    int32_t elems = taosLRUCacheGetElems(pCache);
23,040✔
100
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
23,040✔
101
    taosLRUCacheEraseUnrefEntries(pCache);
23,040✔
102
    elems = taosLRUCacheGetElems(pCache);
23,040✔
103
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
23,040✔
104

105
    taosLRUCacheCleanup(pCache);
23,040✔
106

107
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
23,040✔
108
  }
109
}
23,040✔
110

111
#endif  // USE_SHARED_STORAGE
112

113
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
114

115
enum {
116
  LFLAG_LAST_ROW = 0,
117
  LFLAG_LAST = 1,
118
};
119

120
typedef struct {
121
  tb_uid_t uid;
122
  int16_t  cid;
123
  int8_t   lflag;
124
} SLastKey;
125

126
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
127
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
128

129
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
4,306,983✔
130
  SVnode *pVnode = pTsdb->pVnode;
4,306,983✔
131
  vnodeGetPrimaryPath(pVnode, false, path, TSDB_FILENAME_LEN);
4,305,565✔
132

133
  int32_t offset = strlen(path);
4,308,016✔
134
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%scache.rdb", TD_DIRSEP, pTsdb->name, TD_DIRSEP);
4,308,016✔
135
}
4,307,580✔
136

137
static const char *myCmpName(void *state) {
22,783,569✔
138
  (void)state;
139
  return "myCmp";
22,783,569✔
140
}
141

142
static void myCmpDestroy(void *state) { (void)state; }
4,308,785✔
143

144
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
1,249,103,818✔
145
  (void)state;
146
  (void)alen;
147
  (void)blen;
148
  SLastKey *lhs = (SLastKey *)a;
1,249,103,818✔
149
  SLastKey *rhs = (SLastKey *)b;
1,249,103,818✔
150

151
  if (lhs->uid < rhs->uid) {
1,249,103,818✔
152
    return -1;
748,768,428✔
153
  } else if (lhs->uid > rhs->uid) {
501,725,791✔
154
    return 1;
213,206,980✔
155
  }
156

157
  if (lhs->cid < rhs->cid) {
288,553,361✔
158
    return -1;
106,934,540✔
159
  } else if (lhs->cid > rhs->cid) {
181,653,949✔
160
    return 1;
69,584,493✔
161
  }
162

163
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
112,070,455✔
164
    return -1;
42,199,028✔
165
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
69,873,082✔
166
    return 1;
66,615,600✔
167
  }
168

169
  return 0;
3,257,482✔
170
}
171

172
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
4,306,880✔
173
  int32_t code = 0, lino = 0;
4,306,880✔
174
#ifdef USE_ROCKSDB
175
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
4,308,785✔
176
  if (NULL == cmp) {
4,308,785✔
177
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
178
  }
179

180
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
4,308,785✔
181
  pTsdb->rCache.tableoptions = tableoptions;
4,306,877✔
182

183
  rocksdb_options_t *options = rocksdb_options_create();
4,307,805✔
184
  if (NULL == options) {
4,308,373✔
185
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
186
  }
187

188
  rocksdb_options_set_create_if_missing(options, 1);
4,308,373✔
189
  rocksdb_options_set_comparator(options, cmp);
4,307,810✔
190
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
4,306,576✔
191
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
4,308,785✔
192
  // rocksdb_options_set_inplace_update_support(options, 1);
193
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
194

195
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
4,308,785✔
196
  if (NULL == writeoptions) {
4,308,096✔
197
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
198
  }
199
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
4,308,096✔
200

201
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
4,308,135✔
202
  if (NULL == readoptions) {
4,307,602✔
203
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
204
  }
205

206
  char *err = NULL;
4,307,602✔
207
  char  cachePath[TSDB_FILENAME_LEN] = {0};
4,307,496✔
208
  tsdbGetRocksPath(pTsdb, cachePath);
4,307,374✔
209

210
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
4,308,040✔
211
  if (NULL == db) {
4,308,108✔
212
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
213
    rocksdb_free(err);
×
214

215
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
216
  }
217

218
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
4,308,108✔
219
  if (NULL == flushoptions) {
4,308,108✔
220
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
221
  }
222

223
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
4,308,108✔
224

225
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
4,307,907✔
226

227
  pTsdb->rCache.writebatch = writebatch;
4,308,122✔
228
  pTsdb->rCache.my_comparator = cmp;
4,308,122✔
229
  pTsdb->rCache.options = options;
4,307,445✔
230
  pTsdb->rCache.writeoptions = writeoptions;
4,308,122✔
231
  pTsdb->rCache.readoptions = readoptions;
4,307,445✔
232
  pTsdb->rCache.flushoptions = flushoptions;
4,308,108✔
233
  pTsdb->rCache.db = db;
4,308,108✔
234
  pTsdb->rCache.sver = -1;
4,307,874✔
235
  pTsdb->rCache.suid = -1;
4,308,083✔
236
  pTsdb->rCache.uid = -1;
4,307,621✔
237
  pTsdb->rCache.pTSchema = NULL;
4,308,108✔
238
  pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
4,307,186✔
239
  if (!pTsdb->rCache.ctxArray) {
4,308,432✔
240
    TAOS_CHECK_GOTO(terrno, &lino, _err7);
×
241
  }
242

243
  TAOS_RETURN(code);
4,308,432✔
244

245
_err7:
×
246
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
×
247
_err6:
×
248
  rocksdb_writebatch_destroy(writebatch);
×
249
_err5:
×
250
  rocksdb_close(pTsdb->rCache.db);
×
251
_err4:
×
252
  rocksdb_readoptions_destroy(readoptions);
×
253
_err3:
×
254
  rocksdb_writeoptions_destroy(writeoptions);
×
255
_err2:
×
256
  rocksdb_options_destroy(options);
×
257
  rocksdb_block_based_options_destroy(tableoptions);
×
258
_err:
×
259
  rocksdb_comparator_destroy(cmp);
×
260
#endif
261
  TAOS_RETURN(code);
×
262
}
263

264
static void tsdbCloseRocksCache(STsdb *pTsdb) {
4,308,785✔
265
#ifdef USE_ROCKSDB
266
  rocksdb_close(pTsdb->rCache.db);
4,308,785✔
267
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
4,308,785✔
268
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
4,308,452✔
269
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
4,308,347✔
270
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
4,308,025✔
271
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
4,307,870✔
272
  rocksdb_options_destroy(pTsdb->rCache.options);
4,308,308✔
273
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
4,308,785✔
274
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
4,308,419✔
275
  taosMemoryFree(pTsdb->rCache.pTSchema);
4,308,420✔
276
  taosArrayDestroy(pTsdb->rCache.ctxArray);
4,308,420✔
277
#endif
278
}
4,308,785✔
279

280
static void rocksMayWrite(STsdb *pTsdb, bool force) {
58,564,309✔
281
#ifdef USE_ROCKSDB
282
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
58,564,309✔
283

284
  int count = rocksdb_writebatch_count(wb);
58,563,981✔
285
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
58,563,965✔
286
    char *err = NULL;
296,608✔
287

288
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
296,608✔
289
    if (NULL != err) {
296,608✔
290
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
291
                err);
292
      rocksdb_free(err);
×
293
    }
294

295
    rocksdb_writebatch_clear(wb);
296,608✔
296
  }
297
#endif
298
}
58,564,636✔
299

300
typedef struct {
301
  TSKEY  ts;
302
  int8_t dirty;
303
  struct {
304
    int16_t cid;
305
    int8_t  type;
306
    int8_t  flag;
307
    union {
308
      int64_t val;
309
      struct {
310
        uint32_t nData;
311
        uint8_t *pData;
312
      };
313
    } value;
314
  } colVal;
315
} SLastColV0;
316

317
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
196,351✔
318
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
196,351✔
319

320
  pLastCol->rowKey.ts = pLastColV0->ts;
196,351✔
321
  pLastCol->rowKey.numOfPKs = 0;
196,351✔
322
  pLastCol->dirty = pLastColV0->dirty;
196,351✔
323
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
196,351✔
324
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
197,119✔
325
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
196,351✔
326

327
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
196,351✔
328

329
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
196,351✔
330
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
11,835✔
331
    pLastCol->colVal.value.pData = NULL;
11,835✔
332
    if (pLastCol->colVal.value.nData > 0) {
11,835✔
333
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
4,410✔
334
    }
335
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
11,835✔
336
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
184,516✔
337
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
45,260✔
338
    pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
45,260✔
339
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
45,260✔
340
  } else {
341
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
139,256✔
342
    return sizeof(SLastColV0);
139,256✔
343
  }
344
}
345

346
static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
196,351✔
347
  if (!value) {
196,351✔
348
    return TSDB_CODE_INVALID_PARA;
×
349
  }
350

351
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
196,351✔
352
  if (NULL == pLastCol) {
196,351✔
353
    return terrno;
×
354
  }
355

356
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
196,351✔
357
  if (offset == size) {
196,351✔
358
    // version 0
359
    *ppLastCol = pLastCol;
×
360

361
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
362
  } else if (offset > size) {
196,351✔
363
    taosMemoryFreeClear(pLastCol);
×
364

365
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
366
  }
367

368
  // version
369
  int8_t version = *(int8_t *)(value + offset);
196,351✔
370
  offset += sizeof(int8_t);
196,351✔
371

372
  // numOfPKs
373
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
196,351✔
374
  offset += sizeof(uint8_t);
196,351✔
375

376
  // pks
377
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
196,351✔
378
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
×
379
    offset += sizeof(SValue);
×
380

381
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
382
      pLastCol->rowKey.pks[i].pData = NULL;
×
383
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
384
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
385
        offset += pLastCol->rowKey.pks[i].nData;
×
386
      }
387
    }
388
  }
389

390
  if (version >= LAST_COL_VERSION_2) {
196,351✔
391
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
196,351✔
392
  }
393

394
  if (offset > size) {
196,351✔
395
    taosMemoryFreeClear(pLastCol);
×
396

397
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
398
  }
399

400
  *ppLastCol = pLastCol;
196,351✔
401

402
  TAOS_RETURN(TSDB_CODE_SUCCESS);
197,119✔
403
}
404

405
/*
406
typedef struct {
407
  SLastColV0 lastColV0;
408
  char       colData[];
409
  int8_t     version;
410
  uint8_t    numOfPKs;
411
  SValue     pks[0];
412
  char       pk0Data[];
413
  SValue     pks[1];
414
  char       pk1Data[];
415
  ...
416
} SLastColDisk;
417
*/
418
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
56,036,751✔
419
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
56,036,751✔
420

421
  pLastColV0->ts = pLastCol->rowKey.ts;
56,036,751✔
422
  pLastColV0->dirty = pLastCol->dirty;
56,036,750✔
423
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
56,036,422✔
424
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
56,036,422✔
425
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
56,036,422✔
426
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
56,036,422✔
427
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
1,387,463✔
428
    if (pLastCol->colVal.value.nData > 0) {
1,387,806✔
429
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
635,756✔
430
    }
431
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
1,387,806✔
432
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
54,647,290✔
433
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
607,738✔
434
    if (pLastCol->colVal.value.nData > 0) {
607,738✔
435
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
315,355✔
436
    }
437
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
607,738✔
438
  } else {
439
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
54,039,881✔
440
    return sizeof(SLastColV0);
54,040,224✔
441
  }
442

443
  return 0;
444
}
445

446
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
56,029,210✔
447
  *size = sizeof(SLastColV0);
56,029,210✔
448
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
56,029,537✔
449
    *size += pLastCol->colVal.value.nData;
1,386,497✔
450
  }
451
  if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
56,033,471✔
452
    *size += DECIMAL128_BYTES;
607,738✔
453
  }
454
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
56,033,471✔
455

456
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
59,917,380✔
457
    *size += sizeof(SValue);
3,884,580✔
458
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
3,884,580✔
459
      *size += pLastCol->rowKey.pks[i].nData;
1,290,618✔
460
    }
461
  }
462

463
  *value = taosMemoryMalloc(*size);
56,030,176✔
464
  if (NULL == *value) {
56,035,423✔
465
    TAOS_RETURN(terrno);
×
466
  }
467

468
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
56,036,750✔
469

470
  // version
471
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
56,034,114✔
472
  offset++;
56,034,114✔
473

474
  // numOfPKs
475
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
56,034,114✔
476
  offset++;
56,034,442✔
477

478
  // pks
479
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
59,919,022✔
480
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
3,884,580✔
481
    offset += sizeof(SValue);
3,884,580✔
482
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
3,884,580✔
483
      if (pLastCol->rowKey.pks[i].nData > 0) {
1,288,605✔
484
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
1,289,276✔
485
      }
486
      offset += pLastCol->rowKey.pks[i].nData;
1,289,276✔
487
    }
488
  }
489

490
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
56,035,113✔
491

492
  TAOS_RETURN(TSDB_CODE_SUCCESS);
56,035,440✔
493
}
494

495
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
496

497
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
60,978,873✔
498
  SLastCol *pLastCol = (SLastCol *)value;
60,978,873✔
499

500
  if (pLastCol->dirty) {
60,978,873✔
501
    STsdb *pTsdb = (STsdb *)ud;
51,674,716✔
502

503
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
51,674,716✔
504
    if (code) {
51,684,224✔
505
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
506
      return code;
×
507
    }
508

509
    pLastCol->dirty = 0;
51,684,224✔
510

511
    rocksMayWrite(pTsdb, false);
51,684,224✔
512
  }
513

514
  return 0;
60,987,726✔
515
}
516

517
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
24,125,503✔
518
  bool deleted = false;
24,125,503✔
519
  while (*iSkyline > 0) {
24,125,503✔
520
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
29,395✔
521
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
29,395✔
522

523
    if (key->ts > pItemBack->ts) {
29,395✔
524
      return false;
7,592✔
525
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
21,803✔
526
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
8,335✔
527
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
528
        return true;
8,335✔
529
      } else {
530
        if (*iSkyline > 1) {
×
531
          --*iSkyline;
×
532
        } else {
533
          return false;
×
534
        }
535
      }
536
    } else {
537
      if (*iSkyline > 1) {
13,468✔
538
        --*iSkyline;
×
539
      } else {
540
        return false;
13,468✔
541
      }
542
    }
543
  }
544

545
  return deleted;
24,095,124✔
546
}
547

548
// Get next non-deleted row from imem
549
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
14,048,454✔
550
  int32_t code = 0;
14,048,454✔
551

552
  if (tsdbTbDataIterNext(pTbIter)) {
14,048,454✔
553
    TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
13,922,661✔
554
    TSDBKEY  rowKey = TSDBROW_KEY(pMemRow);
13,922,661✔
555
    bool     deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
13,922,985✔
556
    if (!deleted) {
13,922,329✔
557
      return pMemRow;
13,921,017✔
558
    }
559
  }
560

561
  return NULL;
128,417✔
562
}
563

564
// Get first non-deleted row from imem
565
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
7,972,039✔
566
                                    int64_t *piSkyline) {
567
  int32_t code = 0;
7,972,039✔
568

569
  tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
7,972,039✔
570
  TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
7,974,335✔
571
  if (pMemRow) {
7,974,335✔
572
    // if non deleted, return the found row.
573
    TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
7,974,335✔
574
    bool    deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
7,974,335✔
575
    if (!deleted) {
7,974,335✔
576
      return pMemRow;
7,971,743✔
577
    }
578
  } else {
579
    return NULL;
×
580
  }
581

582
  // continue to find the non-deleted first row from imem, using get next row
583
  return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
2,592✔
584
}
585

586
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
57,673✔
587
  SRocksCache *pRCache = &pTsdb->rCache;
57,673✔
588
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
57,673✔
589

590
  if (suid > 0 && suid == pRCache->suid) {
3,750✔
591
    pRCache->sver = -1;
×
592
    pRCache->suid = -1;
×
593
  }
594
  if (suid == 0 && uid == pRCache->uid) {
3,750✔
595
    pRCache->sver = -1;
750✔
596
    pRCache->uid = -1;
750✔
597
  }
598
}
599

600
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
14,047,826✔
601
  SRocksCache *pRCache = &pTsdb->rCache;
14,047,826✔
602
  if (pRCache->pTSchema && sver == pRCache->sver) {
14,048,482✔
603
    if (suid > 0 && suid == pRCache->suid) {
14,001,310✔
604
      return 0;
13,258,264✔
605
    }
606
    if (suid == 0 && uid == pRCache->uid) {
743,046✔
607
      return 0;
555,248✔
608
    }
609
  }
610

611
  pRCache->suid = suid;
235,298✔
612
  pRCache->uid = uid;
235,626✔
613
  pRCache->sver = sver;
235,626✔
614
  tDestroyTSchema(pRCache->pTSchema);
235,626✔
615
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema);
235,626✔
616
}
617

618
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
619

620
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
7,973,023✔
621
  int32_t      code = 0;
7,973,023✔
622
  int32_t      lino = 0;
7,973,023✔
623
  STsdb       *pTsdb = imem->pTsdb;
7,973,679✔
624
  SArray      *pMemDelData = NULL;
7,974,007✔
625
  SArray      *pSkyline = NULL;
7,974,007✔
626
  int64_t      iSkyline = 0;
7,974,007✔
627
  STbDataIter  tbIter = {0};
7,974,007✔
628
  TSDBROW     *pMemRow = NULL;
7,974,007✔
629
  STSchema    *pTSchema = NULL;
7,974,007✔
630
  SSHashObj   *iColHash = NULL;
7,974,007✔
631
  int32_t      sver;
632
  int32_t      nCol;
633
  SArray      *ctxArray = pTsdb->rCache.ctxArray;
7,974,007✔
634
  STsdbRowKey  tsdbRowKey = {0};
7,972,367✔
635
  STSDBRowIter iter = {0};
7,973,679✔
636

637
  STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
7,973,351✔
638

639
  // load imem tomb data and build skyline
640
  TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
7,974,335✔
641

642
  // tsdbBuildDeleteSkyline
643
  size_t delSize = TARRAY_SIZE(pMemDelData);
7,974,007✔
644
  if (delSize > 0) {
7,974,335✔
645
    pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
3,710✔
646
    if (!pSkyline) {
3,710✔
647
      TAOS_CHECK_EXIT(terrno);
×
648
    }
649

650
    TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline));
3,710✔
651
    iSkyline = taosArrayGetSize(pSkyline) - 1;
3,710✔
652
  }
653

654
  pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
7,974,335✔
655
  if (!pMemRow) {
7,974,335✔
656
    goto _exit;
×
657
  }
658

659
  // iter first row to last_row/last col values to ctxArray, and mark last null col ids
660
  sver = TSDBROW_SVERSION(pMemRow);
7,974,335✔
661
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
7,974,335✔
662
  pTSchema = pTsdb->rCache.pTSchema;
7,973,023✔
663
  nCol = pTSchema->numOfCols;
7,973,351✔
664

665
  tsdbRowGetKey(pMemRow, &tsdbRowKey);
7,973,351✔
666

667
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
7,974,007✔
668

669
  int32_t iCol = 0;
7,974,007✔
670
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
34,767,748✔
671
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
26,793,741✔
672
    if (!taosArrayPush(ctxArray, &updateCtx)) {
26,792,757✔
673
      TAOS_CHECK_EXIT(terrno);
×
674
    }
675

676
    if (COL_VAL_IS_VALUE(pColVal)) {
26,792,757✔
677
      updateCtx.lflag = LFLAG_LAST;
24,240,668✔
678
      if (!taosArrayPush(ctxArray, &updateCtx)) {
24,242,308✔
679
        TAOS_CHECK_EXIT(terrno);
×
680
      }
681
    } else {
682
      if (!iColHash) {
2,551,433✔
683
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
125,291✔
684
        if (iColHash == NULL) {
125,291✔
685
          TAOS_CHECK_EXIT(terrno);
×
686
        }
687
      }
688

689
      if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
2,551,433✔
690
        TAOS_CHECK_EXIT(terrno);
×
691
      }
692
    }
693
  }
694
  tsdbRowClose(&iter);
7,972,367✔
695

696
  // continue to get next row to fill null last col values
697
  pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
7,974,335✔
698
  while (pMemRow) {
14,047,498✔
699
    if (tSimpleHashGetSize(iColHash) == 0) {
13,919,081✔
700
      break;
7,844,934✔
701
    }
702

703
    sver = TSDBROW_SVERSION(pMemRow);
6,074,475✔
704
    TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
6,074,803✔
705
    pTSchema = pTsdb->rCache.pTSchema;
6,074,803✔
706

707
    STsdbRowKey tsdbRowKey = {0};
6,074,803✔
708
    tsdbRowGetKey(pMemRow, &tsdbRowKey);
6,074,151✔
709

710
    TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
6,074,803✔
711

712
    int32_t iCol = 0;
6,074,151✔
713
    for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
161,934,398✔
714
      if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
155,965,492✔
715
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
1,492,082✔
716
        if (!taosArrayPush(ctxArray, &updateCtx)) {
1,492,082✔
717
          TAOS_CHECK_EXIT(terrno);
×
718
        }
719

720
        TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
1,492,082✔
721
      }
722
    }
723
    tsdbRowClose(&iter);
5,124,838✔
724

725
    pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
6,074,803✔
726
  }
727

728
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
7,973,679✔
729

730
_exit:
7,972,367✔
731
  if (code) {
7,971,711✔
732
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
733

734
    tsdbRowClose(&iter);
×
735
  }
736

737
  taosArrayClear(ctxArray);
7,971,711✔
738
  // destroy any allocated resource
739
  tSimpleHashCleanup(iColHash);
7,972,695✔
740
  if (pMemDelData) {
7,973,351✔
741
    taosArrayDestroy(pMemDelData);
7,973,351✔
742
  }
743
  if (pSkyline) {
7,973,679✔
744
    taosArrayDestroy(pSkyline);
3,710✔
745
  }
746

747
  TAOS_RETURN(code);
7,973,679✔
748
}
749

750
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
81,503✔
751
  if (!pTsdb) return 0;
81,503✔
752
  if (!pTsdb->imem) return 0;
81,503✔
753

754
  int32_t    code = 0;
62,663✔
755
  int32_t    lino = 0;
62,663✔
756
  SMemTable *imem = pTsdb->imem;
62,663✔
757
  int32_t    nTbData = imem->nTbData;
62,663✔
758
  int64_t    nRow = imem->nRow;
62,663✔
759
  int64_t    nDel = imem->nDel;
62,663✔
760

761
  if (nRow == 0 || nTbData == 0) return 0;
62,663✔
762

763
  TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
62,663✔
764

765
_exit:
62,663✔
766
  if (code) {
62,663✔
767
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
768
  } else {
769
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
62,663✔
770
  }
771

772
  TAOS_RETURN(code);
62,663✔
773
}
774

775
int32_t tsdbCacheCommit(STsdb *pTsdb) {
81,503✔
776
  int32_t code = 0;
81,503✔
777

778
  // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
779
  // flush dirty data of lru into rocks
780
  // 4, and update when writing if !updateCacheBatch
781
  // 5, merge cache & mem if updateCacheBatch
782

783
  if (tsUpdateCacheBatch) {
81,503✔
784
    code = tsdbCacheUpdateFromIMem(pTsdb);
81,503✔
785
    if (code) {
81,503✔
786
      tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
787

788
      TAOS_RETURN(code);
×
789
    }
790
  }
791

792
  char      *err = NULL;
81,503✔
793
  SLRUCache *pCache = pTsdb->lruCache;
81,503✔
794
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
795

796
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
81,503✔
797

798
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
81,503✔
799

800
#ifdef USE_ROCKSDB
801
  rocksMayWrite(pTsdb, true);
81,503✔
802
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
81,503✔
803
#endif
804
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
81,503✔
805
#ifdef USE_ROCKSDB
806
  if (NULL != err) {
81,503✔
807
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
808
    rocksdb_free(err);
×
809
    code = TSDB_CODE_FAILED;
×
810
  }
811
#endif
812
  TAOS_RETURN(code);
81,503✔
813
}
814

815
static int32_t reallocVarDataVal(SValue *pValue) {
5,417,469✔
816
  if (IS_VAR_DATA_TYPE(pValue->type)) {
5,417,469✔
817
    uint8_t *pVal = pValue->pData;
5,417,469✔
818
    uint32_t nData = pValue->nData;
5,417,469✔
819
    if (nData > 0) {
5,417,469✔
820
      uint8_t *p = taosMemoryMalloc(nData);
4,199,261✔
821
      if (!p) {
4,199,261✔
822
        TAOS_RETURN(terrno);
×
823
      }
824
      pValue->pData = p;
4,199,261✔
825
      (void)memcpy(pValue->pData, pVal, nData);
4,199,261✔
826
    } else {
827
      pValue->pData = NULL;
1,218,208✔
828
    }
829
  }
830

831
  TAOS_RETURN(TSDB_CODE_SUCCESS);
5,417,469✔
832
}
833

834
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
2,813,585✔
835

836
// realloc pk data and col data.
837
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
113,069,903✔
838
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
113,069,903✔
839
  size_t  charge = sizeof(SLastCol);
113,069,903✔
840

841
  int8_t i = 0;
113,069,903✔
842
  for (; i < pCol->rowKey.numOfPKs; i++) {
121,460,727✔
843
    SValue *pValue = &pCol->rowKey.pks[i];
8,392,837✔
844
    if (IS_VAR_DATA_TYPE(pValue->type)) {
8,392,166✔
845
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
2,605,897✔
846
      charge += pValue->nData;
2,603,884✔
847
    }
848
  }
849

850
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
113,424,426✔
851
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
2,790,973✔
852
    charge += pCol->colVal.value.nData;
2,813,585✔
853
  }
854

855
  if (pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
113,435,571✔
856
    if (pCol->colVal.value.nData > 0) {
904,064✔
857
      void *p = taosMemoryMalloc(pCol->colVal.value.nData);
444,536✔
858
      if (!p) TAOS_CHECK_EXIT(terrno);
444,536✔
859
      (void)memcpy(p, pCol->colVal.value.pData, pCol->colVal.value.nData);
444,536✔
860
      pCol->colVal.value.pData = p;
444,536✔
861
    }else {
862
      pCol->colVal.value.pData = NULL;
459,528✔
863
    }
864
    charge += pCol->colVal.value.nData;
904,064✔
865
  }
866

867
  if (pCharge) {
113,418,757✔
868
    *pCharge = charge;
103,049,685✔
869
  }
870

871
_exit:
10,369,072✔
872
  if (TSDB_CODE_SUCCESS != code) {
113,421,053✔
873
    for (int8_t j = 0; j < i; j++) {
×
874
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
875
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
876
      }
877
    }
878

879
    (void)memset(pCol, 0, sizeof(SLastCol));
×
880
  }
881

882
  TAOS_RETURN(code);
113,421,053✔
883
}
884

885
void tsdbCacheFreeSLastColItem(void *pItem) {
11,284,572✔
886
  SLastCol *pCol = (SLastCol *)pItem;
11,284,572✔
887
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
16,362,670✔
888
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
5,076,085✔
889
      taosMemoryFree(pCol->rowKey.pks[i].pData);
1,453,992✔
890
    }
891
  }
892

893
  if ((IS_VAR_DATA_TYPE(pCol->colVal.value.type) || pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) &&
11,284,378✔
894
      pCol->colVal.value.pData) {
1,589,099✔
895
    taosMemoryFree(pCol->colVal.value.pData);
1,197,127✔
896
  }
897
}
11,286,895✔
898

899
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
103,027,066✔
900
  SLastCol *pLastCol = (SLastCol *)value;
103,027,066✔
901

902
  if (pLastCol->dirty) {
103,027,066✔
903
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
2,143,552✔
904
      STsdb *pTsdb = (STsdb *)ud;
×
905
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
906
    }
907
  }
908

909
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
106,925,634✔
910
    SValue *pValue = &pLastCol->rowKey.pks[i];
3,884,580✔
911
    if (IS_VAR_DATA_TYPE(pValue->type)) {
3,884,580✔
912
      taosMemoryFree(pValue->pData);
1,289,276✔
913
    }
914
  }
915

916
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) ||
103,061,062✔
917
      pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL /* && pLastCol->colVal.value.nData > 0*/) {
101,516,324✔
918
    taosMemoryFree(pLastCol->colVal.value.pData);
2,243,307✔
919
  }
920

921
  taosMemoryFree(value);
103,034,822✔
922
}
103,052,747✔
923

924
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
48,381,516✔
925
  SLastCol *pLastCol = (SLastCol *)value;
48,381,516✔
926
  pLastCol->dirty = 0;
48,381,516✔
927
}
48,412,348✔
928

929
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
930

931
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
50,233,129✔
932
  int32_t code = 0, lino = 0;
50,233,129✔
933

934
  SLRUCache *pCache = pTsdb->lruCache;
50,233,129✔
935
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
936
  SRowKey  emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
50,234,288✔
937
  SLastCol emptyCol = {
50,234,288✔
938
      .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
939

940
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
50,234,475✔
941
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
50,233,276✔
942
  if (code) {
50,234,450✔
943
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
944
  }
945

946
  TAOS_RETURN(code);
50,234,450✔
947
}
948

949
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
956,833✔
950
  int32_t code = 0;
956,833✔
951
  char   *err = NULL;
956,833✔
952

953
  SLRUCache *pCache = pTsdb->lruCache;
960,282✔
954
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
955

956
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
960,282✔
957
#ifdef USE_ROCKSDB
958
  rocksMayWrite(pTsdb, true);
960,282✔
959
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
959,510✔
960
  if (NULL != err) {
960,282✔
961
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
962
    rocksdb_free(err);
×
963
    code = TSDB_CODE_FAILED;
×
964
  }
965
#endif
966
  TAOS_RETURN(code);
960,282✔
967
}
968

969
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
8,899,049✔
970
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
971
#ifdef USE_ROCKSDB
972
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
8,899,049✔
973
  if (!valuesList) return terrno;
8,898,764✔
974
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
8,898,764✔
975
  if (!valuesListSizes) {
8,898,158✔
976
    taosMemoryFreeClear(valuesList);
×
977
    return terrno;
×
978
  }
979
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
8,898,158✔
980
  if (!errs) {
8,898,110✔
981
    taosMemoryFreeClear(valuesList);
×
982
    taosMemoryFreeClear(valuesListSizes);
×
983
    return terrno;
×
984
  }
985
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
8,898,110✔
986
                    valuesListSizes, errs);
987
  for (size_t i = 0; i < numKeys; ++i) {
41,408,653✔
988
    rocksdb_free(errs[i]);
32,509,902✔
989
  }
990
  taosMemoryFreeClear(errs);
8,898,751✔
991

992
  *pppValuesList = valuesList;
8,899,070✔
993
  *ppValuesListSizes = valuesListSizes;
8,899,729✔
994
#endif
995
  TAOS_RETURN(TSDB_CODE_SUCCESS);
8,899,058✔
996
}
997

998
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
6,444,506✔
999
  int32_t code = 0;
6,444,506✔
1000

1001
  // build keys & multi get from rocks
1002
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
6,444,506✔
1003
  if (!keys_list) {
6,444,629✔
1004
    return terrno;
×
1005
  }
1006
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
6,444,629✔
1007
  if (!keys_list_sizes) {
6,444,589✔
1008
    taosMemoryFree(keys_list);
×
1009
    return terrno;
×
1010
  }
1011
  const size_t klen = ROCKS_KEY_LEN;
6,444,589✔
1012

1013
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
6,444,589✔
1014
  if (!keys) {
6,444,663✔
1015
    taosMemoryFree(keys_list);
×
1016
    taosMemoryFree(keys_list_sizes);
×
1017
    return terrno;
×
1018
  }
1019
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
6,444,663✔
1020
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
6,445,928✔
1021

1022
  keys_list[0] = keys;
6,444,663✔
1023
  keys_list[1] = keys + sizeof(SLastKey);
6,445,248✔
1024
  keys_list_sizes[0] = klen;
6,444,589✔
1025
  keys_list_sizes[1] = klen;
6,445,248✔
1026

1027
  char  **values_list = NULL;
6,444,663✔
1028
  size_t *values_list_sizes = NULL;
6,445,928✔
1029

1030
  // was written by caller
1031
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
1032

1033
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
6,446,906✔
1034
                                              &values_list_sizes),
1035
                  NULL, _exit);
1036
#ifdef USE_ROCKSDB
1037
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
6,446,553✔
1038
#endif
1039
  {
1040
#ifdef USE_ROCKSDB
1041
    SLastCol *pLastCol = NULL;
6,446,553✔
1042
    if (values_list[0] != NULL) {
6,445,894✔
1043
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
70,502✔
1044
      if (code != TSDB_CODE_SUCCESS) {
70,502✔
1045
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1046
                  tstrerror(code));
1047
        goto _exit;
×
1048
      }
1049
      if (NULL != pLastCol) {
70,502✔
1050
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
70,502✔
1051
      }
1052
      taosMemoryFreeClear(pLastCol);
70,502✔
1053
    }
1054

1055
    pLastCol = NULL;
6,446,553✔
1056
    if (values_list[1] != NULL) {
6,446,553✔
1057
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
70,502✔
1058
      if (code != TSDB_CODE_SUCCESS) {
70,502✔
1059
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1060
                  tstrerror(code));
1061
        goto _exit;
×
1062
      }
1063
      if (NULL != pLastCol) {
70,502✔
1064
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
70,502✔
1065
      }
1066
      taosMemoryFreeClear(pLastCol);
70,502✔
1067
    }
1068

1069
    rocksdb_free(values_list[0]);
6,446,553✔
1070
    rocksdb_free(values_list[1]);
6,442,860✔
1071
#endif
1072

1073
    for (int i = 0; i < 2; i++) {
19,335,812✔
1074
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
12,889,225✔
1075
      if (h) {
12,891,877✔
1076
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
141,004✔
1077
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
141,004✔
1078
      }
1079
    }
1080
  }
1081

1082
_exit:
6,446,587✔
1083
  taosMemoryFree(keys_list[0]);
6,446,587✔
1084

1085
  taosMemoryFree(keys_list);
6,446,247✔
1086
  taosMemoryFree(keys_list_sizes);
6,445,110✔
1087
  taosMemoryFree(values_list);
6,443,094✔
1088
  taosMemoryFree(values_list_sizes);
6,445,518✔
1089

1090
  TAOS_RETURN(code);
6,444,629✔
1091
}
1092

1093
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
8,119,444✔
1094
  int32_t code = 0;
8,119,444✔
1095

1096
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
8,119,444✔
1097

1098
  if (suid < 0) {
8,119,444✔
1099
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
114,084✔
1100
      int16_t cid = pSchemaRow->pSchema[i].colId;
98,126✔
1101
      int8_t  col_type = pSchemaRow->pSchema[i].type;
98,126✔
1102

1103
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
98,126✔
1104
      if (code != TSDB_CODE_SUCCESS) {
98,126✔
1105
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1106
                  tstrerror(code));
1107
      }
1108
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
98,126✔
1109
      if (code != TSDB_CODE_SUCCESS) {
98,126✔
1110
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1111
                  tstrerror(code));
1112
      }
1113
    }
1114
  } else {
1115
    STSchema *pTSchema = NULL;
8,103,486✔
1116
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
8,103,486✔
1117
    if (code != TSDB_CODE_SUCCESS) {
8,103,486✔
1118
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1119

1120
      TAOS_RETURN(code);
×
1121
    }
1122

1123
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
33,032,427✔
1124
      int16_t cid = pTSchema->columns[i].colId;
24,929,268✔
1125
      int8_t  col_type = pTSchema->columns[i].type;
24,929,268✔
1126

1127
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
24,928,941✔
1128
      if (code != TSDB_CODE_SUCCESS) {
24,927,563✔
1129
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1130
                  tstrerror(code));
1131
      }
1132
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
24,927,563✔
1133
      if (code != TSDB_CODE_SUCCESS) {
24,928,941✔
1134
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1135
                  tstrerror(code));
1136
      }
1137
    }
1138

1139
    taosMemoryFree(pTSchema);
8,103,159✔
1140
  }
1141

1142
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
8,118,790✔
1143

1144
  TAOS_RETURN(code);
8,119,444✔
1145
}
1146

1147
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
8,706✔
1148
  int32_t code = 0;
8,706✔
1149

1150
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
8,706✔
1151

1152
  code = tsdbCacheCommitNoLock(pTsdb);
8,706✔
1153
  if (code != TSDB_CODE_SUCCESS) {
8,706✔
1154
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1155
              tstrerror(code));
1156
  }
1157

1158
  if (pSchemaRow != NULL) {
8,706✔
1159
    bool hasPrimayKey = false;
×
1160
    int  nCols = pSchemaRow->nCols;
×
1161
    if (nCols >= 2) {
×
1162
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
1163
    }
1164
    for (int i = 0; i < nCols; ++i) {
×
1165
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
1166
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1167

1168
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1169
      if (code != TSDB_CODE_SUCCESS) {
×
1170
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1171
                  tstrerror(code));
1172
      }
1173
    }
1174
  } else {
1175
    STSchema *pTSchema = NULL;
8,706✔
1176
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
8,706✔
1177
    if (code != TSDB_CODE_SUCCESS) {
8,706✔
1178
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1179

1180
      TAOS_RETURN(code);
×
1181
    }
1182

1183
    bool hasPrimayKey = false;
8,706✔
1184
    int  nCols = pTSchema->numOfCols;
8,706✔
1185
    if (nCols >= 2) {
8,706✔
1186
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
8,706✔
1187
    }
1188
    for (int i = 0; i < nCols; ++i) {
33,948✔
1189
      int16_t cid = pTSchema->columns[i].colId;
25,242✔
1190
      int8_t  col_type = pTSchema->columns[i].type;
25,242✔
1191

1192
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
25,242✔
1193
      if (code != TSDB_CODE_SUCCESS) {
25,242✔
1194
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1195
                  tstrerror(code));
1196
      }
1197
    }
1198

1199
    taosMemoryFree(pTSchema);
8,706✔
1200
  }
1201

1202
  rocksMayWrite(pTsdb, false);
8,706✔
1203

1204
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
8,706✔
1205

1206
  TAOS_RETURN(code);
8,706✔
1207
}
1208

1209
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
938,777✔
1210
  int32_t code = 0;
938,777✔
1211

1212
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
938,777✔
1213

1214
  code = tsdbCacheCommitNoLock(pTsdb);
942,524✔
1215
  if (code != TSDB_CODE_SUCCESS) {
941,921✔
1216
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1217
              tstrerror(code));
1218
  }
1219

1220
  STSchema *pTSchema = NULL;
941,921✔
1221
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
941,921✔
1222
  if (code != TSDB_CODE_SUCCESS) {
941,969✔
1223
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
2,800✔
1224

1225
    TAOS_RETURN(code);
2,800✔
1226
  }
1227

1228
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
1,758,762✔
1229
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
818,989✔
1230

1231
    bool hasPrimayKey = false;
818,989✔
1232
    int  nCols = pTSchema->numOfCols;
818,989✔
1233
    if (nCols >= 2) {
818,989✔
1234
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
818,989✔
1235
    }
1236

1237
    for (int i = 0; i < nCols; ++i) {
7,192,510✔
1238
      int16_t cid = pTSchema->columns[i].colId;
6,372,917✔
1239
      int8_t  col_type = pTSchema->columns[i].type;
6,372,258✔
1240

1241
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
6,372,545✔
1242
      if (code != TSDB_CODE_SUCCESS) {
6,372,862✔
1243
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1244
                  tstrerror(code));
1245
      }
1246
    }
1247
  }
1248

1249
  taosMemoryFree(pTSchema);
938,566✔
1250

1251
  rocksMayWrite(pTsdb, false);
939,724✔
1252

1253
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
939,724✔
1254

1255
  TAOS_RETURN(code);
939,724✔
1256
}
1257

1258
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
×
1259
  int32_t code = 0;
×
1260

1261
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1262

1263
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
1264
  if (code != TSDB_CODE_SUCCESS) {
×
1265
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1266
              tstrerror(code));
1267
  }
1268
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
1269
  if (code != TSDB_CODE_SUCCESS) {
×
1270
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1271
              tstrerror(code));
1272
  }
1273
  // rocksMayWrite(pTsdb, true, false, false);
1274
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1275

1276
  TAOS_RETURN(code);
×
1277
}
1278

1279
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
×
1280
  int32_t code = 0;
×
1281

1282
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1283

1284
  code = tsdbCacheCommitNoLock(pTsdb);
×
1285
  if (code != TSDB_CODE_SUCCESS) {
×
1286
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1287
              tstrerror(code));
1288
  }
1289

1290
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1291
  if (code != TSDB_CODE_SUCCESS) {
×
1292
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1293
              tstrerror(code));
1294
  }
1295

1296
  rocksMayWrite(pTsdb, false);
×
1297

1298
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1299

1300
  TAOS_RETURN(code);
×
1301
}
1302

1303
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
18,104✔
1304
  int32_t code = 0;
18,104✔
1305

1306
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
18,104✔
1307

1308
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
108,624✔
1309
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
90,520✔
1310

1311
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
90,520✔
1312
    if (code != TSDB_CODE_SUCCESS) {
90,520✔
1313
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1314
                tstrerror(code));
1315
    }
1316
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
90,520✔
1317
    if (code != TSDB_CODE_SUCCESS) {
90,520✔
1318
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1319
                tstrerror(code));
1320
    }
1321
  }
1322

1323
  // rocksMayWrite(pTsdb, true, false, false);
1324
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
18,104✔
1325
  TAOS_RETURN(code);
18,104✔
1326
}
1327

1328
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
9,052✔
1329
  int32_t code = 0;
9,052✔
1330

1331
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
9,052✔
1332

1333
  code = tsdbCacheCommitNoLock(pTsdb);
9,052✔
1334
  if (code != TSDB_CODE_SUCCESS) {
9,052✔
1335
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1336
              tstrerror(code));
1337
  }
1338

1339
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
54,312✔
1340
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
45,260✔
1341

1342
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
45,260✔
1343
    if (code != TSDB_CODE_SUCCESS) {
45,260✔
1344
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1345
                tstrerror(code));
1346
    }
1347
  }
1348

1349
  rocksMayWrite(pTsdb, false);
9,052✔
1350

1351
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
9,052✔
1352

1353
  TAOS_RETURN(code);
9,052✔
1354
}
1355

1356
typedef struct {
1357
  int      idx;
1358
  SLastKey key;
1359
} SIdxKey;
1360

1361
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
×
1362
  // update rowkey
1363
  pLastCol->rowKey.ts = TSKEY_MIN;
×
1364
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
1365
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
1366
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
1367
      taosMemoryFreeClear(pPKValue->pData);
×
1368
      pPKValue->nData = 0;
×
1369
    } else {
1370
      valueClearDatum(pPKValue, pPKValue->type);
×
1371
    }
1372
  }
1373
  pLastCol->rowKey.numOfPKs = 0;
×
1374

1375
  // update colval
1376
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
×
1377
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
1378
    pLastCol->colVal.value.nData = 0;
×
1379
  } else {
1380
    valueClearDatum(&pLastCol->colVal.value, pLastCol->colVal.value.type);
×
1381
  }
1382

1383
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
×
1384
  pLastCol->dirty = 1;
×
1385
  pLastCol->cacheStatus = cacheStatus;
×
1386
}
×
1387

1388
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
56,026,260✔
1389
  int32_t code = 0;
56,026,260✔
1390
#ifdef USE_ROCKSDB
1391
  char  *rocks_value = NULL;
56,026,260✔
1392
  size_t vlen = 0;
56,026,588✔
1393

1394
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
56,029,538✔
1395
  if (code) {
56,035,112✔
1396
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
1397
    TAOS_RETURN(code);
×
1398
  }
1399

1400
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
56,035,112✔
1401
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
56,028,224✔
1402
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
56,030,852✔
1403
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
56,035,112✔
1404

1405
  taosMemoryFree(rocks_value);
56,034,111✔
1406
#endif
1407
  TAOS_RETURN(code);
56,035,439✔
1408
}
1409

1410
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
103,034,695✔
1411
  int32_t code = 0, lino = 0;
103,034,695✔
1412

1413
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
103,034,695✔
1414
  if (!pLRULastCol) {
103,043,859✔
1415
    return terrno;
×
1416
  }
1417

1418
  size_t charge = 0;
103,043,859✔
1419
  *pLRULastCol = *pLastCol;
103,025,848✔
1420
  pLRULastCol->dirty = dirty;
103,056,009✔
1421
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
103,059,737✔
1422

1423
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
103,061,710✔
1424
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
1425
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
103,028,998✔
1426
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
1427
    code = TSDB_CODE_FAILED;
×
1428
    pLRULastCol = NULL;
×
1429
  }
1430

1431
_exit:
103,028,998✔
1432
  if (TSDB_CODE_SUCCESS != code) {
103,022,768✔
1433
    taosMemoryFree(pLRULastCol);
×
1434
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1435
  }
1436

1437
  TAOS_RETURN(code);
103,022,768✔
1438
}
1439

1440
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
7,970,727✔
1441
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
7,970,727✔
1442
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1443
  }
1444

1445
  int32_t code = 0, lino = 0;
7,972,367✔
1446

1447
  int        num_keys = TARRAY_SIZE(updCtxArray);
7,973,679✔
1448
  SArray    *remainCols = NULL;
7,973,023✔
1449
  SLRUCache *pCache = pTsdb->lruCache;
7,973,023✔
1450

1451
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
7,974,007✔
1452
  for (int i = 0; i < num_keys; ++i) {
60,480,820✔
1453
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
52,507,141✔
1454
    int8_t          lflag = updCtx->lflag;
52,509,768✔
1455
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
52,474,345✔
1456
    SColVal        *pColVal = &updCtx->colVal;
52,515,344✔
1457

1458
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
52,513,048✔
1459
      continue;
×
1460
    }
1461

1462
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
52,514,032✔
1463
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
52,516,000✔
1464
    if (h) {
52,518,293✔
1465
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
48,420,764✔
1466
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
48,424,700✔
1467
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
48,423,680✔
1468
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
48,424,008✔
1469
          SLastCol newLastCol = {
48,421,035✔
1470
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1471
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
48,417,427✔
1472
        }
1473
      }
1474

1475
      tsdbLRUCacheRelease(pCache, h, false);
48,408,298✔
1476
      TAOS_CHECK_EXIT(code);
48,403,379✔
1477
    } else {
1478
      if (!remainCols) {
4,097,529✔
1479
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
105,530✔
1480
        if (!remainCols) {
105,530✔
1481
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1482
        }
1483
      }
1484
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
8,195,058✔
1485
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1486
      }
1487
    }
1488
  }
1489

1490
  if (remainCols) {
7,973,679✔
1491
    num_keys = TARRAY_SIZE(remainCols);
105,530✔
1492
  }
1493
  if (remainCols && num_keys > 0) {
7,973,679✔
1494
    char  **keys_list = NULL;
105,530✔
1495
    size_t *keys_list_sizes = NULL;
105,530✔
1496
    char  **values_list = NULL;
105,530✔
1497
    size_t *values_list_sizes = NULL;
105,530✔
1498
    char  **errs = NULL;
105,530✔
1499
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
105,530✔
1500
    if (!keys_list) {
105,530✔
1501
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1502
      return terrno;
×
1503
    }
1504
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
105,530✔
1505
    if (!keys_list_sizes) {
105,530✔
1506
      taosMemoryFree(keys_list);
×
1507
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1508
      return terrno;
×
1509
    }
1510
    for (int i = 0; i < num_keys; ++i) {
4,203,059✔
1511
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
4,097,529✔
1512

1513
      keys_list[i] = (char *)&idxKey->key;
4,097,529✔
1514
      keys_list_sizes[i] = ROCKS_KEY_LEN;
4,097,529✔
1515
    }
1516

1517
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
105,530✔
1518

1519
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
105,530✔
1520
                                       &values_list_sizes);
1521
    if (code) {
105,530✔
1522
      taosMemoryFree(keys_list);
×
1523
      taosMemoryFree(keys_list_sizes);
×
1524
      goto _exit;
×
1525
    }
1526

1527
    // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
1528
    for (int i = 0; i < num_keys; ++i) {
4,203,059✔
1529
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
4,097,529✔
1530
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
4,097,529✔
1531
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
4,097,529✔
1532
      SColVal        *pColVal = &updCtx->colVal;
4,097,529✔
1533

1534
      SLastCol *pLastCol = NULL;
4,097,529✔
1535
      if (values_list[i] != NULL) {
4,097,529✔
1536
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
1537
        if (code != TSDB_CODE_SUCCESS) {
×
1538
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1539
                    tstrerror(code));
1540
          goto _exit;
×
1541
        }
1542
      }
1543
      /*
1544
      if (code) {
1545
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1546
      }
1547
      */
1548
      SLastCol *pToFree = pLastCol;
4,097,529✔
1549

1550
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
4,097,529✔
1551
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
1552
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1553
                    tstrerror(code));
1554
          taosMemoryFreeClear(pToFree);
×
1555
          break;
×
1556
        }
1557

1558
        // cache invalid => skip update
1559
        taosMemoryFreeClear(pToFree);
×
1560
        continue;
×
1561
      }
1562

1563
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
4,097,529✔
1564
        taosMemoryFreeClear(pToFree);
×
1565
        continue;
×
1566
      }
1567

1568
      int32_t cmp_res = 1;
4,097,529✔
1569
      if (pLastCol) {
4,097,529✔
1570
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
×
1571
      }
1572

1573
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
4,097,529✔
1574
        SLastCol lastColTmp = {
4,097,529✔
1575
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
1576
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
4,097,529✔
1577
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1578
                    tstrerror(code));
1579
          taosMemoryFreeClear(pToFree);
×
1580
          break;
×
1581
        }
1582
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
4,097,529✔
1583
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1584
                    tstrerror(code));
1585
          taosMemoryFreeClear(pToFree);
×
1586
          break;
×
1587
        }
1588
      }
1589

1590
      taosMemoryFreeClear(pToFree);
4,097,529✔
1591
    }
1592

1593
    rocksMayWrite(pTsdb, false);
105,530✔
1594

1595
    taosMemoryFree(keys_list);
105,530✔
1596
    taosMemoryFree(keys_list_sizes);
105,530✔
1597
    if (values_list) {
105,530✔
1598
#ifdef USE_ROCKSDB
1599
      for (int i = 0; i < num_keys; ++i) {
4,203,059✔
1600
        rocksdb_free(values_list[i]);
4,097,529✔
1601
      }
1602
#endif
1603
      taosMemoryFree(values_list);
105,530✔
1604
    }
1605
    taosMemoryFree(values_list_sizes);
105,530✔
1606
  }
1607

1608
_exit:
7,969,415✔
1609
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
7,972,367✔
1610
  taosArrayDestroy(remainCols);
7,973,351✔
1611

1612
  if (code) {
7,973,351✔
1613
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1614
              tstrerror(code));
1615
  }
1616

1617
  TAOS_RETURN(code);
7,973,351✔
1618
}
1619

1620
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
×
1621
                                 SRow **aRow) {
1622
  int32_t code = 0, lino = 0;
×
1623

1624
  // 1. prepare last
1625
  TSDBROW      lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
×
1626
  STSchema    *pTSchema = NULL;
×
1627
  int32_t      sver = TSDBROW_SVERSION(&lRow);
×
1628
  SSHashObj   *iColHash = NULL;
×
1629
  STSDBRowIter iter = {0};
×
1630

1631
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
1632
  pTSchema = pTsdb->rCache.pTSchema;
×
1633

1634
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
×
1635
  int32_t nCol = pTSchema->numOfCols;
×
1636
  SArray *ctxArray = pTsdb->rCache.ctxArray;
×
1637

1638
  // 1. prepare by lrow
1639
  STsdbRowKey tsdbRowKey = {0};
×
1640
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1641

1642
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1643

1644
  int32_t iCol = 0;
×
1645
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
1646
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1647
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1648
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1649
    }
1650

1651
    if (COL_VAL_IS_VALUE(pColVal)) {
×
1652
      updateCtx.lflag = LFLAG_LAST;
×
1653
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1654
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1655
      }
1656
    } else {
1657
      if (!iColHash) {
×
1658
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
1659
        if (iColHash == NULL) {
×
1660
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1661
        }
1662
      }
1663

1664
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
×
1665
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1666
      }
1667
    }
1668
  }
1669

1670
  // 2. prepare by the other rows
1671
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
×
1672
    if (tSimpleHashGetSize(iColHash) == 0) {
×
1673
      break;
×
1674
    }
1675

1676
    tRow.pTSRow = aRow[iRow];
×
1677

1678
    STsdbRowKey tsdbRowKey = {0};
×
1679
    tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1680

1681
    void   *pIte = NULL;
×
1682
    int32_t iter = 0;
×
1683
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
×
1684
      int32_t iCol = ((int32_t *)pIte)[0];
×
1685
      SColVal colVal = COL_VAL_NONE(0, 0);
×
1686
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
×
1687

1688
      if (COL_VAL_IS_VALUE(&colVal)) {
×
1689
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1690
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1691
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1692
        }
1693
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
1694
        if (code != TSDB_CODE_SUCCESS) {
×
1695
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1696
                    __LINE__, tstrerror(code));
1697
        }
1698
      }
1699
    }
1700
  }
1701

1702
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1703

1704
_exit:
×
1705
  if (code) {
×
1706
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1707
  }
1708

1709
  tsdbRowClose(&iter);
×
1710
  tSimpleHashCleanup(iColHash);
×
1711
  taosArrayClear(ctxArray);
×
1712

1713
  TAOS_RETURN(code);
×
1714
}
1715

1716
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
1717
  int32_t      code = 0, lino = 0;
×
1718
  STSDBRowIter iter = {0};
×
1719
  STSchema    *pTSchema = NULL;
×
1720
  SArray      *ctxArray = NULL;
×
1721

1722
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
1723
  int32_t sver = TSDBROW_SVERSION(&lRow);
×
1724

1725
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1726

1727
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
1728
  if (ctxArray == NULL) {
×
1729
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1730
  }
1731

1732
  // 1. prepare last
1733
  STsdbRowKey tsdbRowKey = {0};
×
1734
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1735

1736
  {
1737
    SValue tsVal = {.type = TSDB_DATA_TYPE_TIMESTAMP};
×
1738
    VALUE_SET_TRIVIAL_DATUM(&tsVal, lRow.pBlockData->aTSKEY[lRow.iRow]);
×
1739
    SLastUpdateCtx updateCtx = {
×
1740
        .lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, tsVal)};
1741
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1742
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1743
    }
1744
  }
1745

1746
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1747

1748
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1749
    SColData *pColData = &pBlockData->aColData[iColData];
×
1750
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
1751
      continue;
×
1752
    }
1753

1754
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
1755
      STsdbRowKey tsdbRowKey = {0};
×
1756
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1757

1758
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
1759
      if (colType == 2) {
×
1760
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
1761
        TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit);
×
1762

1763
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1764
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1765
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1766
        }
1767
        break;
×
1768
      }
1769
    }
1770
  }
1771

1772
  // 2. prepare last row
1773
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1774
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
1775
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1776
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1777
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1778
    }
1779
  }
1780

1781
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1782

1783
_exit:
×
1784
  tsdbRowClose(&iter);
×
1785
  taosMemoryFreeClear(pTSchema);
×
1786
  taosArrayDestroy(ctxArray);
×
1787

1788
  TAOS_RETURN(code);
×
1789
}
1790

1791
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1792
                            int nCols, int16_t *slotIds);
1793

1794
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1795
                               int nCols, int16_t *slotIds);
1796

1797
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
106,849✔
1798
                                    SCacheRowsReader *pr, int8_t ltype) {
1799
  int32_t code = 0, lino = 0;
106,849✔
1800
  // rocksdb_writebatch_t *wb = NULL;
1801
  SArray *pTmpColArray = NULL;
106,849✔
1802
  bool    extraTS = false;
106,849✔
1803

1804
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
106,849✔
1805
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
106,849✔
1806
    // ignore 'ts' loaded from cache and load it from tsdb
1807
    // SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
1808
    // tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
1809

1810
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
×
1811
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
×
1812
      TAOS_RETURN(terrno);
×
1813
    }
1814

1815
    extraTS = true;
×
1816
  }
1817

1818
  int      num_keys = TARRAY_SIZE(remainCols);
106,849✔
1819
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
106,849✔
1820

1821
  int16_t *lastColIds = NULL, *lastSlotIds = NULL, *lastrowColIds = NULL, *lastrowSlotIds = NULL;
106,849✔
1822
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
106,849✔
1823
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
106,849✔
1824
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
106,849✔
1825
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
106,178✔
1826
  SArray *lastTmpColArray = NULL, *lastTmpIndexArray = NULL, *lastrowTmpColArray = NULL, *lastrowTmpIndexArray = NULL;
106,849✔
1827

1828
  int lastIndex = 0;
106,849✔
1829
  int lastrowIndex = 0;
106,849✔
1830

1831
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
106,849✔
1832
    TAOS_CHECK_EXIT(terrno);
671✔
1833
  }
1834

1835
  for (int i = 0; i < num_keys; ++i) {
358,553✔
1836
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
251,704✔
1837
    if (extraTS && !i) {
251,704✔
1838
      slotIds[i] = 0;
×
1839
    } else {
1840
      slotIds[i] = pr->pSlotIds[idxKey->idx];
251,704✔
1841
    }
1842

1843
    if (IS_LAST_KEY(idxKey->key)) {
251,033✔
1844
      if (NULL == lastTmpIndexArray) {
129,790✔
1845
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
54,978✔
1846
        if (!lastTmpIndexArray) {
54,978✔
1847
          TAOS_CHECK_EXIT(terrno);
×
1848
        }
1849
      }
1850
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
129,790✔
1851
        TAOS_CHECK_EXIT(terrno);
×
1852
      }
1853
      lastColIds[lastIndex] = idxKey->key.cid;
129,790✔
1854
      if (extraTS && !i) {
129,790✔
1855
        lastSlotIds[lastIndex] = 0;
×
1856
      } else {
1857
        lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
129,790✔
1858
      }
1859
      lastIndex++;
129,790✔
1860
    } else {
1861
      if (NULL == lastrowTmpIndexArray) {
121,914✔
1862
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
51,871✔
1863
        if (!lastrowTmpIndexArray) {
51,200✔
1864
          TAOS_CHECK_EXIT(terrno);
×
1865
        }
1866
      }
1867
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
121,914✔
1868
        TAOS_CHECK_EXIT(terrno);
×
1869
      }
1870
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
121,914✔
1871
      if (extraTS && !i) {
121,914✔
1872
        lastrowSlotIds[lastrowIndex] = 0;
×
1873
      } else {
1874
        lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
121,914✔
1875
      }
1876
      lastrowIndex++;
122,585✔
1877
    }
1878
  }
1879

1880
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
106,849✔
1881
  if (!pTmpColArray) {
106,849✔
1882
    TAOS_CHECK_EXIT(terrno);
×
1883
  }
1884

1885
  if (lastTmpIndexArray != NULL) {
106,849✔
1886
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
54,978✔
1887
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
177,605✔
1888
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
122,627✔
1889
                           taosArrayGet(lastTmpColArray, i))) {
122,627✔
1890
        TAOS_CHECK_EXIT(terrno);
×
1891
      }
1892
    }
1893
  }
1894

1895
  if (lastrowTmpIndexArray != NULL) {
106,849✔
1896
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
51,871✔
1897
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
170,989✔
1898
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
119,118✔
1899
                           taosArrayGet(lastrowTmpColArray, i))) {
119,118✔
1900
        TAOS_CHECK_EXIT(terrno);
×
1901
      }
1902
    }
1903
  }
1904

1905
  SLRUCache *pCache = pTsdb->lruCache;
106,849✔
1906
  for (int i = 0; i < num_keys; ++i) {
359,224✔
1907
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
252,375✔
1908
    SLastCol *pLastCol = NULL;
252,375✔
1909

1910
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
252,375✔
1911
      pLastCol = taosArrayGet(pTmpColArray, i);
241,745✔
1912
    }
1913

1914
    // still null, then make up a none col value
1915
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
252,375✔
1916
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
252,375✔
1917
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
1918
    if (!pLastCol) {
251,704✔
1919
      pLastCol = &noneCol;
10,630✔
1920
    }
1921

1922
    if (!extraTS || i > 0) {
251,704✔
1923
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from tsdb, col_id:%d col_flag:%d ts:%" PRId64,
251,704✔
1924
                TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, pLastCol->colVal.cid,
1925
                pLastCol->colVal.flag, pLastCol->rowKey.ts);
1926
      taosArraySet(pLastArray, idxKey->idx, pLastCol);
251,704✔
1927
    }
1928

1929
    // taosArrayRemove(remainCols, i);
1930

1931
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
252,375✔
1932
      continue;
×
1933
    }
1934
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
252,375✔
1935
      continue;
×
1936
    }
1937

1938
    // store result back to rocks cache
1939
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
252,375✔
1940
    if (code) {
252,375✔
1941
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1942
      TAOS_CHECK_EXIT(code);
×
1943
    }
1944

1945
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
252,375✔
1946
    if (code) {
252,375✔
1947
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1948
      TAOS_CHECK_EXIT(code);
×
1949
    }
1950

1951
    if (extraTS && i == 0) {
252,375✔
1952
      tsdbCacheFreeSLastColItem(pLastCol);
×
1953
    }
1954
  }
1955

1956
  rocksMayWrite(pTsdb, false);
106,849✔
1957

1958
_exit:
106,849✔
1959
  taosArrayDestroy(lastrowTmpIndexArray);
106,849✔
1960
  taosArrayDestroy(lastrowTmpColArray);
106,849✔
1961
  taosArrayDestroy(lastTmpIndexArray);
106,849✔
1962
  taosArrayDestroy(lastTmpColArray);
106,849✔
1963

1964
  taosMemoryFree(lastColIds);
106,849✔
1965
  taosMemoryFree(lastSlotIds);
106,849✔
1966
  taosMemoryFree(lastrowColIds);
106,849✔
1967
  taosMemoryFree(lastrowSlotIds);
106,849✔
1968

1969
  taosArrayDestroy(pTmpColArray);
106,849✔
1970

1971
  taosMemoryFree(slotIds);
106,849✔
1972

1973
  TAOS_RETURN(code);
106,849✔
1974
}
1975

1976
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
131,728✔
1977
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
1978
  int32_t code = 0, lino = 0;
131,728✔
1979
  int     num_keys = TARRAY_SIZE(remainCols);
131,728✔
1980
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
131,728✔
1981
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
131,728✔
1982
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
131,728✔
1983
  if (!keys_list || !keys_list_sizes || !key_list) {
131,728✔
1984
    taosMemoryFree(keys_list);
×
1985
    taosMemoryFree(keys_list_sizes);
×
1986
    TAOS_RETURN(terrno);
×
1987
  }
1988
  char  **values_list = NULL;
131,728✔
1989
  size_t *values_list_sizes = NULL;
131,728✔
1990
  for (int i = 0; i < num_keys; ++i) {
440,218✔
1991
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
308,490✔
1992
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
308,490✔
1993
    keys_list_sizes[i] = ROCKS_KEY_LEN;
308,490✔
1994
  }
1995

1996
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
131,728✔
1997

1998
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
131,728✔
1999
                                     &values_list_sizes);
2000
  if (code) {
131,057✔
2001
    taosMemoryFree(key_list);
×
2002
    taosMemoryFree(keys_list);
×
2003
    taosMemoryFree(keys_list_sizes);
×
2004
    TAOS_RETURN(code);
×
2005
  }
2006

2007
  SLRUCache *pCache = pTsdb->lruCache;
131,057✔
2008
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
438,876✔
2009
    SLastCol *pLastCol = NULL;
307,819✔
2010
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
307,819✔
2011
    if (ignore) {
307,051✔
2012
      ++j;
559✔
2013
      continue;
559✔
2014
    }
2015

2016
    if (values_list[i] != NULL) {
306,492✔
2017
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
55,347✔
2018
      if (code != TSDB_CODE_SUCCESS) {
56,115✔
2019
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2020
                  tstrerror(code));
2021
        goto _exit;
×
2022
      }
2023
    }
2024
    SLastCol *pToFree = pLastCol;
307,931✔
2025
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
307,931✔
2026
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
364,046✔
2027
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
56,115✔
2028
      if (code) {
56,115✔
2029
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
2030
        taosMemoryFreeClear(pToFree);
×
2031
        TAOS_CHECK_EXIT(code);
×
2032
      }
2033

2034
      SLastCol lastCol = *pLastCol;
56,115✔
2035
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
56,115✔
2036
      if (TSDB_CODE_SUCCESS != code) {
56,115✔
2037
        taosMemoryFreeClear(pToFree);
×
2038
        TAOS_CHECK_EXIT(code);
×
2039
      }
2040

2041
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from rocksdb, col_id:%d col_flag:%d ts:%" PRId64,
56,115✔
2042
                TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2043
                lastCol.colVal.flag, lastCol.rowKey.ts);
2044

2045
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
56,115✔
2046
      taosArrayRemove(remainCols, j);
56,115✔
2047
      taosArrayRemove(ignoreFromRocks, j);
56,115✔
2048
    } else {
2049
      ++j;
251,816✔
2050
    }
2051

2052
    taosMemoryFreeClear(pToFree);
307,931✔
2053
  }
2054

2055
  if (TARRAY_SIZE(remainCols) > 0) {
131,728✔
2056
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
2057
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
106,849✔
2058
  }
2059

2060
_exit:
131,728✔
2061
  taosMemoryFree(key_list);
131,728✔
2062
  taosMemoryFree(keys_list);
131,728✔
2063
  taosMemoryFree(keys_list_sizes);
131,728✔
2064
  if (values_list) {
131,728✔
2065
#ifdef USE_ROCKSDB
2066
    for (int i = 0; i < num_keys; ++i) {
440,218✔
2067
      rocksdb_free(values_list[i]);
308,490✔
2068
    }
2069
#endif
2070
    taosMemoryFree(values_list);
131,728✔
2071
  }
2072
  taosMemoryFree(values_list_sizes);
131,728✔
2073

2074
  TAOS_RETURN(code);
131,728✔
2075
}
2076

2077
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
2,244,561✔
2078
                                        int8_t ltype, SArray *keyArray) {
2079
  int32_t    code = 0, lino = 0;
2,244,561✔
2080
  SArray    *remainCols = NULL;
2,244,561✔
2081
  SArray    *ignoreFromRocks = NULL;
2,244,561✔
2082
  SLRUCache *pCache = pTsdb->lruCache;
2,244,561✔
2083
  SArray    *pCidList = pr->pCidList;
2,243,890✔
2084
  int        numKeys = TARRAY_SIZE(pCidList);
2,243,890✔
2085

2086
  for (int i = 0; i < numKeys; ++i) {
7,781,024✔
2087
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
5,536,463✔
2088

2089
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
5,536,463✔
2090
    // for select last_row, last case
2091
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
5,536,463✔
2092
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
5,536,463✔
2093
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
×
2094
    }
2095
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
5,535,792✔
2096
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
×
2097
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
×
2098
    }
2099

2100
    if (!taosArrayPush(keyArray, &key)) {
5,536,463✔
2101
      TAOS_CHECK_EXIT(terrno);
×
2102
    }
2103

2104
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
5,536,463✔
2105
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
5,536,463✔
2106
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
10,764,436✔
2107
      SLastCol lastCol = *pLastCol;
5,227,554✔
2108
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
5,227,554✔
2109
        tsdbLRUCacheRelease(pCache, h, false);
×
2110
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
2111
      }
2112

2113
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from lru, col_id:%d col_flag:%d ts:%" PRId64, TD_VID(pTsdb->pVnode),
5,226,812✔
2114
                __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid, lastCol.colVal.flag,
2115
                lastCol.rowKey.ts);
2116

2117
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
5,227,973✔
2118
        code = terrno;
×
2119
        tsdbLRUCacheRelease(pCache, h, false);
×
2120
        goto _exit;
×
2121
      }
2122
    } else {
2123
      // no cache or cache is invalid
2124
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
308,909✔
2125
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
308,490✔
2126

2127
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
308,490✔
2128
        code = terrno;
×
2129
        tsdbLRUCacheRelease(pCache, h, false);
×
2130
        goto _exit;
×
2131
      }
2132

2133
      if (!remainCols) {
308,490✔
2134
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
131,728✔
2135
          code = terrno;
×
2136
          tsdbLRUCacheRelease(pCache, h, false);
×
2137
          goto _exit;
×
2138
        }
2139
      }
2140
      if (!ignoreFromRocks) {
307,819✔
2141
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
131,057✔
2142
          code = terrno;
×
2143
          tsdbLRUCacheRelease(pCache, h, false);
×
2144
          goto _exit;
×
2145
        }
2146
      }
2147
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
616,980✔
2148
        code = terrno;
×
2149
        tsdbLRUCacheRelease(pCache, h, false);
×
2150
        goto _exit;
×
2151
      }
2152
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
308,490✔
2153
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
308,490✔
2154
        code = terrno;
×
2155
        tsdbLRUCacheRelease(pCache, h, false);
×
2156
        goto _exit;
×
2157
      }
2158
    }
2159

2160
    if (h) {
5,536,463✔
2161
      tsdbLRUCacheRelease(pCache, h, false);
5,228,532✔
2162
    }
2163
  }
2164

2165
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
2,244,561✔
2166
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
131,728✔
2167

2168
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
440,218✔
2169
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
308,490✔
2170
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
308,490✔
2171
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
308,490✔
2172
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
308,490✔
2173
        SLastCol lastCol = *pLastCol;
×
2174
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
2175
        if (code) {
×
2176
          tsdbLRUCacheRelease(pCache, h, false);
×
2177
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
2178
          TAOS_RETURN(code);
×
2179
        }
2180

2181
        tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from lru(2nd lookup), col_id:%d col_flag:%d ts:%" PRId64,
×
2182
                  TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2183
                  lastCol.colVal.flag, lastCol.rowKey.ts);
2184

2185
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
2186

2187
        taosArrayRemove(remainCols, i);
×
2188
        taosArrayRemove(ignoreFromRocks, i);
×
2189
      } else {
2190
        // no cache or cache is invalid
2191
        ++i;
308,490✔
2192
      }
2193
      if (h) {
308,490✔
2194
        tsdbLRUCacheRelease(pCache, h, false);
559✔
2195
      }
2196
    }
2197

2198
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
2199
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
131,728✔
2200

2201
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
131,728✔
2202
  }
2203

2204
_exit:
2,112,833✔
2205
  if (remainCols) {
2,244,561✔
2206
    taosArrayDestroy(remainCols);
131,728✔
2207
  }
2208
  if (ignoreFromRocks) {
2,244,561✔
2209
    taosArrayDestroy(ignoreFromRocks);
131,728✔
2210
  }
2211

2212
  TAOS_RETURN(code);
2,244,561✔
2213
}
2214

2215
typedef enum SMEMNEXTROWSTATES {
2216
  SMEMNEXTROW_ENTER,
2217
  SMEMNEXTROW_NEXT,
2218
} SMEMNEXTROWSTATES;
2219

2220
typedef struct SMemNextRowIter {
2221
  SMEMNEXTROWSTATES state;
2222
  STbData          *pMem;  // [input]
2223
  STbDataIter       iter;  // mem buffer skip list iterator
2224
  int64_t           lastTs;
2225
} SMemNextRowIter;
2226

2227
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2,289,506✔
2228
                                 int nCols) {
2229
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
2,289,506✔
2230
  int32_t          code = 0;
2,289,506✔
2231
  *pIgnoreEarlierTs = false;
2,289,506✔
2232
  switch (state->state) {
2,289,506✔
2233
    case SMEMNEXTROW_ENTER: {
2,098,530✔
2234
      if (state->pMem != NULL) {
2,098,530✔
2235
        /*
2236
        if (state->pMem->maxKey <= state->lastTs) {
2237
          *ppRow = NULL;
2238
          *pIgnoreEarlierTs = true;
2239

2240
          TAOS_RETURN(code);
2241
        }
2242
        */
2243
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
2,099,201✔
2244

2245
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
2,099,201✔
2246
        if (pMemRow) {
2,099,201✔
2247
          *ppRow = pMemRow;
2,099,201✔
2248
          state->state = SMEMNEXTROW_NEXT;
2,099,201✔
2249

2250
          TAOS_RETURN(code);
2,099,201✔
2251
        }
2252
      }
2253

2254
      *ppRow = NULL;
×
2255

2256
      TAOS_RETURN(code);
×
2257
    }
2258
    case SMEMNEXTROW_NEXT:
190,976✔
2259
      if (tsdbTbDataIterNext(&state->iter)) {
190,976✔
2260
        *ppRow = tsdbTbDataIterGet(&state->iter);
267,270✔
2261

2262
        TAOS_RETURN(code);
133,635✔
2263
      } else {
2264
        *ppRow = NULL;
57,341✔
2265

2266
        TAOS_RETURN(code);
57,341✔
2267
      }
2268
    default:
×
2269
      break;
×
2270
  }
2271

2272
_err:
×
2273
  *ppRow = NULL;
×
2274

2275
  TAOS_RETURN(code);
×
2276
}
2277

2278
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2279
                                  int nCols);
2280
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2281

2282
typedef struct {
2283
  TSDBROW             *pRow;
2284
  bool                 stop;
2285
  bool                 next;
2286
  bool                 ignoreEarlierTs;
2287
  void                *iter;
2288
  _next_row_fn_t       nextRowFn;
2289
  _next_row_clear_fn_t nextRowClearFn;
2290
} TsdbNextRowState;
2291

2292
typedef struct {
2293
  SArray           *pMemDelData;
2294
  SArray           *pSkyline;
2295
  int64_t           iSkyline;
2296
  SBlockIdx         idx;
2297
  SMemNextRowIter   memState;
2298
  SMemNextRowIter   imemState;
2299
  TSDBROW           memRow, imemRow;
2300
  TsdbNextRowState  input[2];
2301
  SCacheRowsReader *pr;
2302
  STsdb            *pTsdb;
2303
} MemNextRowIter;
2304

2305
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
2,244,561✔
2306
                              STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
2307
  int32_t code = 0, lino = 0;
2,244,561✔
2308

2309
  STbData *pMem = NULL;
2,243,890✔
2310
  if (pReadSnap->pMem) {
2,243,890✔
2311
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
2,243,890✔
2312
  }
2313

2314
  STbData *pIMem = NULL;
2,244,561✔
2315
  if (pReadSnap->pIMem) {
2,244,561✔
2316
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
6,771✔
2317
  }
2318

2319
  pIter->pTsdb = pTsdb;
2,244,561✔
2320

2321
  pIter->pMemDelData = NULL;
2,244,561✔
2322

2323
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
2,244,561✔
2324

2325
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
2,244,561✔
2326

2327
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
2,244,561✔
2328
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
2,244,561✔
2329

2330
  if (pMem) {
2,244,561✔
2331
    pIter->memState.pMem = pMem;
1,991,588✔
2332
    pIter->memState.state = SMEMNEXTROW_ENTER;
1,992,259✔
2333
    pIter->input[0].stop = false;
1,992,259✔
2334
    pIter->input[0].next = true;
1,991,588✔
2335
  }
2336

2337
  if (pIMem) {
2,244,561✔
2338
    pIter->imemState.pMem = pIMem;
6,771✔
2339
    pIter->imemState.state = SMEMNEXTROW_ENTER;
6,771✔
2340
    pIter->input[1].stop = false;
6,771✔
2341
    pIter->input[1].next = true;
6,771✔
2342
  }
2343

2344
  pIter->pr = pr;
2,244,561✔
2345

2346
_exit:
2,243,890✔
2347
  if (code) {
2,243,890✔
2348
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2349
  }
2350

2351
  TAOS_RETURN(code);
2,243,890✔
2352
}
2353

2354
static void memRowIterClose(MemNextRowIter *pIter) {
2,244,561✔
2355
  for (int i = 0; i < 2; ++i) {
6,731,573✔
2356
    if (pIter->input[i].nextRowClearFn) {
4,488,451✔
2357
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
2358
    }
2359
  }
2360

2361
  if (pIter->pSkyline) {
2,243,122✔
2362
    taosArrayDestroy(pIter->pSkyline);
1,992,259✔
2363
  }
2364

2365
  if (pIter->pMemDelData) {
2,243,122✔
2366
    taosArrayDestroy(pIter->pMemDelData);
2,243,122✔
2367
  }
2368
}
2,243,219✔
2369

2370
static void freeTableInfoFunc(void *param) {
1,995,367✔
2371
  void **p = (void **)param;
1,995,367✔
2372
  taosMemoryFreeClear(*p);
1,995,367✔
2373
}
1,995,367✔
2374

2375
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
2,110,241✔
2376
  if (!pReader->pTableMap) {
2,110,241✔
2377
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,293,943✔
2378
    if (!pReader->pTableMap) {
1,293,943✔
2379
      return NULL;
×
2380
    }
2381

2382
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
1,293,943✔
2383
  }
2384

2385
  STableLoadInfo  *pInfo = NULL;
2,108,705✔
2386
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
2,109,473✔
2387
  if (!ppInfo) {
2,108,802✔
2388
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
1,993,928✔
2389
    if (pInfo) {
1,993,928✔
2390
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
1,993,928✔
2391
        return NULL;
×
2392
      }
2393
    }
2394

2395
    return pInfo;
1,995,367✔
2396
  }
2397

2398
  return *ppInfo;
114,874✔
2399
}
2400

2401
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
2,424,378✔
2402
  int32_t code = 0, lino = 0;
2,424,378✔
2403

2404
  for (;;) {
5,184✔
2405
    for (int i = 0; i < 2; ++i) {
7,290,028✔
2406
      if (pIter->input[i].next && !pIter->input[i].stop) {
4,859,795✔
2407
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
2,184,702✔
2408
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
2409
                        &lino, _exit);
2410

2411
        if (pIter->input[i].pRow == NULL) {
2,184,702✔
2412
          pIter->input[i].stop = true;
54,247✔
2413
          pIter->input[i].next = false;
54,247✔
2414
        }
2415
      }
2416
    }
2417

2418
    if (pIter->input[0].stop && pIter->input[1].stop) {
2,430,233✔
2419
      return NULL;
306,549✔
2420
    }
2421

2422
    TSDBROW *max[2] = {0};
2,123,684✔
2423
    int      iMax[2] = {-1, -1};
2,123,684✔
2424
    int      nMax = 0;
2,123,684✔
2425
    SRowKey  maxKey = {.ts = TSKEY_MIN};
2,123,684✔
2426

2427
    for (int i = 0; i < 2; ++i) {
6,369,710✔
2428
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
4,246,697✔
2429
        STsdbRowKey tsdbRowKey = {0};
2,129,784✔
2430
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
2,129,784✔
2431

2432
        // merging & deduplicating on client side
2433
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
2,130,455✔
2434
        if (c <= 0) {
2,129,784✔
2435
          if (c < 0) {
2,123,013✔
2436
            nMax = 0;
2,123,013✔
2437
            maxKey = tsdbRowKey.key;
2,123,013✔
2438
          }
2439

2440
          iMax[nMax] = i;
2,123,013✔
2441
          max[nMax++] = pIter->input[i].pRow;
2,123,684✔
2442
        }
2443
        pIter->input[i].next = false;
2,130,455✔
2444
      }
2445
    }
2446

2447
    TSDBROW *merge[2] = {0};
2,123,013✔
2448
    int      iMerge[2] = {-1, -1};
2,123,013✔
2449
    int      nMerge = 0;
2,123,684✔
2450
    for (int i = 0; i < nMax; ++i) {
4,245,929✔
2451
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
2,123,684✔
2452

2453
      if (!pIter->pSkyline) {
2,123,684✔
2454
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
1,992,259✔
2455
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
1,992,259✔
2456

2457
        uint64_t        uid = pIter->idx.uid;
1,992,259✔
2458
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
1,992,259✔
2459
        TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
1,992,259✔
2460

2461
        if (pInfo->pTombData == NULL) {
1,992,259✔
2462
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
1,891,320✔
2463
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
1,891,320✔
2464
        }
2465

2466
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
1,992,259✔
2467
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2468
        }
2469

2470
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
1,992,259✔
2471
        if (delSize > 0) {
1,992,259✔
2472
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
11,333✔
2473
          TAOS_CHECK_GOTO(code, &lino, _exit);
11,333✔
2474
        }
2475
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
1,992,259✔
2476
      }
2477

2478
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
2,122,916✔
2479
      if (!deleted) {
2,123,684✔
2480
        iMerge[nMerge] = iMax[i];
2,118,500✔
2481
        merge[nMerge++] = max[i];
2,117,061✔
2482
      }
2483

2484
      pIter->input[iMax[i]].next = deleted;
2,123,684✔
2485
    }
2486

2487
    if (nMerge > 0) {
2,122,245✔
2488
      pIter->input[iMerge[0]].next = true;
2,117,829✔
2489

2490
      return merge[0];
2,117,061✔
2491
    }
2492
  }
2493

2494
_exit:
×
2495
  if (code) {
×
2496
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2497
  }
2498

2499
  return NULL;
×
2500
}
2501

2502
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
1,289,183✔
2503
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
1,289,183✔
2504
  *ppDst = taosMemoryMalloc(len);
1,289,183✔
2505
  if (NULL == *ppDst) {
1,289,183✔
2506
    TAOS_RETURN(terrno);
×
2507
  }
2508
  memcpy(*ppDst, pSrc, len);
1,289,183✔
2509

2510
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,289,951✔
2511
}
2512

2513
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
2,177,354✔
2514
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
2,177,354✔
2515
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
1,289,951✔
2516
  }
2517

2518
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
888,171✔
2519
    TAOS_RETURN(TSDB_CODE_SUCCESS);
886,306✔
2520
  }
2521

2522
  taosMemoryFreeClear(pReader->pCurrSchema);
1,768✔
2523
  TAOS_RETURN(
1,768✔
2524
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
2525
}
2526

2527
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
2,244,561✔
2528
                                        SArray *keyArray) {
2529
  int32_t        code = 0;
2,244,561✔
2530
  int32_t        lino = 0;
2,244,561✔
2531
  STSchema      *pTSchema = pr->pSchema;
2,244,561✔
2532
  SLRUCache     *pCache = pTsdb->lruCache;
2,244,561✔
2533
  SArray        *pCidList = pr->pCidList;
2,244,561✔
2534
  int            numKeys = TARRAY_SIZE(pCidList);
2,244,561✔
2535
  MemNextRowIter iter = {0};
2,244,561✔
2536
  SSHashObj     *iColHash = NULL;
2,244,561✔
2537
  STSDBRowIter   rowIter = {0};
2,244,561✔
2538

2539
  // 1, get from mem, imem filtered with delete info
2540
  TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
2,243,219✔
2541

2542
  TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
2,243,219✔
2543
  if (!pRow) {
2,242,451✔
2544
    goto _exit;
252,302✔
2545
  }
2546

2547
  int32_t sversion = TSDBROW_SVERSION(pRow);
1,990,149✔
2548
  if (sversion != -1) {
1,990,149✔
2549
    TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
1,990,917✔
2550

2551
    pTSchema = pr->pCurrSchema;
1,992,259✔
2552
  }
2553
  int32_t nCol = pTSchema->numOfCols;
1,991,491✔
2554

2555
  STsdbRowKey rowKey = {0};
1,991,491✔
2556
  tsdbRowGetKey(pRow, &rowKey);
1,992,259✔
2557

2558
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
1,992,259✔
2559

2560
  int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
1,990,246✔
2561
  for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) {
8,615,576✔
2562
    SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
6,623,123✔
2563
    if (pColVal->cid < pTargetCol->colVal.cid) {
6,623,220✔
2564
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
1,724,822✔
2565

2566
      continue;
1,724,822✔
2567
    }
2568
    if (pColVal->cid > pTargetCol->colVal.cid) {
4,899,069✔
2569
      break;
×
2570
    }
2571

2572
    int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
4,897,630✔
2573
    if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
4,899,069✔
2574
      if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
1,639,229✔
2575
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1,639,900✔
2576
        TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
1,638,558✔
2577

2578
        tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from memtable, col_id:%d col_flag:%d ts:%" PRId64,
1,639,229✔
2579
                  TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2580
                  lastCol.colVal.flag, rowKey.key.ts);
2581

2582
        tsdbCacheFreeSLastColItem(pTargetCol);
1,639,229✔
2583
        taosArraySet(pLastArray, jCol, &lastCol);
1,639,229✔
2584
      }
2585
    } else {
2586
      if (COL_VAL_IS_VALUE(pColVal)) {
3,259,840✔
2587
        if (cmp_res <= 0) {
3,081,542✔
2588
          SLastCol lastCol = {
3,080,246✔
2589
              .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2590
          TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
3,080,246✔
2591

2592
          tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64
3,080,246✔
2593
                    " from memtable(last) and memtable(newer), col_id:%d col_flag:%d ts:%" PRId64,
2594
                    TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2595
                    lastCol.colVal.flag, rowKey.key.ts);
2596

2597
          tsdbCacheFreeSLastColItem(pTargetCol);
3,080,246✔
2598
          taosArraySet(pLastArray, jCol, &lastCol);
3,081,014✔
2599
        }
2600
      } else {
2601
        if (!iColHash) {
178,298✔
2602
          iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
96,671✔
2603
          if (iColHash == NULL) {
96,671✔
2604
            TAOS_CHECK_EXIT(terrno);
×
2605
          }
2606
        }
2607

2608
        if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
178,298✔
2609
          TAOS_CHECK_EXIT(terrno);
×
2610
        }
2611
      }
2612
    }
2613

2614
    ++jCol;
4,899,837✔
2615

2616
    if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) {
4,899,837✔
2617
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
2,906,236✔
2618
    }
2619
  }
2620
  tsdbRowClose(&rowIter);
1,992,682✔
2621

2622
  if (iColHash && tSimpleHashGetSize(iColHash) > 0) {
1,991,491✔
2623
    pRow = memRowIterGet(&iter, false, NULL, 0);
96,671✔
2624
    while (pRow) {
180,488✔
2625
      if (tSimpleHashGetSize(iColHash) == 0) {
126,241✔
2626
        break;
42,424✔
2627
      }
2628

2629
      sversion = TSDBROW_SVERSION(pRow);
83,817✔
2630
      if (sversion != -1) {
83,817✔
2631
        TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
83,817✔
2632

2633
        pTSchema = pr->pCurrSchema;
83,817✔
2634
      }
2635
      nCol = pTSchema->numOfCols;
83,817✔
2636

2637
      STsdbRowKey tsdbRowKey = {0};
83,817✔
2638
      tsdbRowGetKey(pRow, &tsdbRowKey);
83,817✔
2639

2640
      TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
83,817✔
2641

2642
      iCol = 0;
83,817✔
2643
      for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
558,224✔
2644
           pColVal = tsdbRowIterNext(&rowIter), iCol++) {
473,569✔
2645
        int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
474,407✔
2646
        if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
473,569✔
2647
          SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol];
121,016✔
2648

2649
          int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key);
121,016✔
2650
          if (cmp_res <= 0) {
121,016✔
2651
            SLastCol lastCol = {
121,016✔
2652
                .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2653
            TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
121,016✔
2654

2655
            tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from memtable(hash), col_id:%d col_flag:%d ts:%" PRId64,
121,016✔
2656
                      TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2657
                      lastCol.colVal.flag, tsdbRowKey.key.ts);
2658

2659
            tsdbCacheFreeSLastColItem(pTargetCol);
121,016✔
2660
            taosArraySet(pLastArray, *pjCol, &lastCol);
121,016✔
2661
          }
2662

2663
          TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
121,016✔
2664
        }
2665
      }
2666
      tsdbRowClose(&rowIter);
83,817✔
2667

2668
      pRow = memRowIterGet(&iter, false, NULL, 0);
83,817✔
2669
    }
2670
  }
2671

2672
_exit:
2,243,122✔
2673
  if (code) {
2,244,561✔
2674
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2675

2676
    tsdbRowClose(&rowIter);
×
2677
  }
2678

2679
  tSimpleHashCleanup(iColHash);
2,244,561✔
2680

2681
  memRowIterClose(&iter);
2,244,561✔
2682

2683
  TAOS_RETURN(code);
2,243,219✔
2684
}
2685

2686
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
2,244,561✔
2687
  int32_t code = 0;
2,244,561✔
2688
  int32_t lino = 0;
2,244,561✔
2689

2690
  tsdbDebug("vgId:%d, %s start, qid:%s uid:%" PRId64 " ltype:%d", TD_VID(pTsdb->pVnode), __func__,
2,244,561✔
2691
            pr && pr->idstr ? pr->idstr : "null", uid, ltype);
2692

2693
  SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
2,244,561✔
2694
  if (!keyArray) {
2,243,219✔
2695
    TAOS_CHECK_EXIT(terrno);
×
2696
  }
2697

2698
  TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
2,243,219✔
2699

2700
  if (tsUpdateCacheBatch) {
2,244,561✔
2701
    TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
2,244,561✔
2702
  }
2703

2704
_exit:
2,243,219✔
2705
  if (code) {
2,243,219✔
2706
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2707
  }
2708

2709
  if (keyArray) {
2,244,561✔
2710
    taosArrayDestroy(keyArray);
2,244,561✔
2711
  }
2712

2713
  TAOS_RETURN(code);
2,244,561✔
2714
}
2715

2716
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
2,215,918✔
2717
  int32_t   code = 0, lino = 0;
2,215,918✔
2718
  STSchema *pTSchema = NULL;
2,215,918✔
2719
  int       sver = -1;
2,215,918✔
2720
  int       numKeys = 0;
2,215,918✔
2721
  SArray   *remainCols = NULL;
2,215,918✔
2722

2723
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
2,215,918✔
2724

2725
  int numCols = pTSchema->numOfCols;
2,215,918✔
2726

2727
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
2,215,918✔
2728

2729
  for (int i = 0; i < numCols; ++i) {
9,861,922✔
2730
    int16_t cid = pTSchema->columns[i].colId;
7,646,004✔
2731
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
22,938,012✔
2732
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
15,292,008✔
2733
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
15,292,008✔
2734
      if (h) {
15,292,008✔
2735
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
78,616✔
2736
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
78,616✔
2737
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
2,236✔
2738
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
2,236✔
2739
                              .dirty = 1,
2740
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2741
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
2,236✔
2742
        }
2743
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
78,616✔
2744
        TAOS_CHECK_EXIT(code);
78,616✔
2745
      } else {
2746
        if (!remainCols) {
15,213,392✔
2747
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
2,205,945✔
2748
        }
2749
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
30,426,784✔
2750
          TAOS_CHECK_EXIT(terrno);
×
2751
        }
2752
      }
2753
    }
2754
  }
2755

2756
  if (remainCols) {
2,215,918✔
2757
    numKeys = TARRAY_SIZE(remainCols);
2,205,945✔
2758
  }
2759

2760
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
2,215,918✔
2761
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
2,215,918✔
2762
  char  **values_list = NULL;
2,215,918✔
2763
  size_t *values_list_sizes = NULL;
2,215,918✔
2764

2765
  if (!keys_list || !keys_list_sizes) {
2,215,918✔
2766
    code = terrno;
×
2767
    goto _exit;
×
2768
  }
2769
  const size_t klen = ROCKS_KEY_LEN;
2,215,918✔
2770

2771
  for (int i = 0; i < numKeys; ++i) {
17,429,310✔
2772
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
15,213,392✔
2773
    if (!key) {
15,213,392✔
2774
      code = terrno;
×
2775
      goto _exit;
×
2776
    }
2777
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
15,213,392✔
2778

2779
    ((SLastKey *)key)[0] = idxKey->key;
15,213,392✔
2780

2781
    keys_list[i] = key;
15,213,392✔
2782
    keys_list_sizes[i] = klen;
15,213,392✔
2783
  }
2784

2785
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
2,215,918✔
2786

2787
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
2,215,918✔
2788
                                              &values_list, &values_list_sizes),
2789
                  NULL, _exit);
2790

2791
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
2792
  for (int i = 0; i < numKeys; ++i) {
17,429,310✔
2793
    SLastCol *pLastCol = NULL;
15,213,392✔
2794
    if (values_list[i] != NULL) {
15,213,392✔
2795
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
2796
      if (code != TSDB_CODE_SUCCESS) {
×
2797
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2798
                  tstrerror(code));
2799
        goto _exit;
×
2800
      }
2801
    }
2802
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
15,213,392✔
2803
    SLastKey *pLastKey = &idxKey->key;
15,213,392✔
2804
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
15,213,392✔
2805
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
×
2806
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
×
2807
                             .dirty = 0,
2808
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2809

2810
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
×
2811
        taosMemoryFreeClear(pLastCol);
×
2812
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2813
        goto _exit;
×
2814
      }
2815
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
×
2816
        taosMemoryFreeClear(pLastCol);
×
2817
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2818
        goto _exit;
×
2819
      }
2820
    }
2821

2822
    if (pLastCol == NULL) {
15,213,392✔
2823
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
15,213,392✔
2824
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2825
    }
2826

2827
    taosMemoryFreeClear(pLastCol);
15,213,392✔
2828
  }
2829

2830
  rocksMayWrite(pTsdb, false);
2,215,918✔
2831

2832
_exit:
2,215,918✔
2833
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
2,215,918✔
2834

2835
  for (int i = 0; i < numKeys; ++i) {
17,429,310✔
2836
    taosMemoryFree(keys_list[i]);
15,213,392✔
2837
  }
2838
  taosMemoryFree(keys_list);
2,215,918✔
2839
  taosMemoryFree(keys_list_sizes);
2,215,918✔
2840
  if (values_list) {
2,215,918✔
2841
#if USE_ROCKSDB
2842
    for (int i = 0; i < numKeys; ++i) {
17,429,310✔
2843
      rocksdb_free(values_list[i]);
15,213,392✔
2844
    }
2845
#endif
2846
    taosMemoryFree(values_list);
2,215,918✔
2847
  }
2848
  taosMemoryFree(values_list_sizes);
2,215,918✔
2849
  taosArrayDestroy(remainCols);
2,215,918✔
2850
  taosMemoryFree(pTSchema);
2,215,918✔
2851

2852
  TAOS_RETURN(code);
2,215,918✔
2853
}
2854

2855
int32_t tsdbOpenCache(STsdb *pTsdb) {
4,304,901✔
2856
  int32_t code = 0, lino = 0;
4,304,901✔
2857
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
4,308,785✔
2858
  int32_t numShardBits = pTsdb->pVnode->config.cacheLastShardBits;
4,308,785✔
2859

2860
  // Use configured shard bits, or -1 to auto-calculate based on cache size
2861
  // This enables multi-shard LRU cache for better concurrency
2862
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, numShardBits, .5);
4,308,785✔
2863
  if (pCache == NULL) {
4,308,785✔
2864
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2865
  }
2866

2867
#ifdef USE_SHARED_STORAGE
2868
  if (tsSsEnabled) {
4,308,785✔
2869
    TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
23,040✔
2870
    TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
23,040✔
2871
  }
2872
#endif
2873

2874
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
4,308,785✔
2875

2876
  taosLRUCacheSetStrictCapacity(pCache, false);
4,307,981✔
2877

2878
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
4,307,981✔
2879

2880
  pTsdb->lruCache = pCache;
4,307,956✔
2881

2882
  tsdbInfo("vgId:%d, lruCache opened with capacity:%zu bytes, numShards:%d (configured:%d)",
4,307,956✔
2883
           TD_VID(pTsdb->pVnode), cfgCapacity, taosLRUCacheGetNumShards(pCache), numShardBits);
2884
           
2885
  TAOS_RETURN(0);
4,309,158✔
2886

2887
_err:
×
2888
  if (code) {
×
2889
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2890
    if (pCache) {
×
2891
      taosLRUCacheCleanup(pCache);
×
2892
      pCache = NULL;
×
2893
    }
2894
  }
2895

2896
  pTsdb->lruCache = pCache;
×
2897
  TAOS_RETURN(code);
×
2898
}
2899

2900
void tsdbCloseCache(STsdb *pTsdb) {
4,307,723✔
2901
  SLRUCache *pCache = pTsdb->lruCache;
4,307,723✔
2902
  if (pCache) {
4,308,785✔
2903
    taosLRUCacheEraseUnrefEntries(pCache);
4,307,738✔
2904

2905
    taosLRUCacheCleanup(pCache);
4,308,699✔
2906

2907
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
4,308,785✔
2908
  }
2909

2910
#ifdef USE_SHARED_STORAGE
2911
  if (tsSsEnabled) {
4,308,785✔
2912
    tsdbCloseBCache(pTsdb);
23,040✔
2913
    tsdbClosePgCache(pTsdb);
23,040✔
2914
  }
2915
#endif
2916

2917
  tsdbCloseRocksCache(pTsdb);
4,308,785✔
2918
}
4,308,785✔
2919

2920
// Rebuild only the last cache (lruCache) with a new shard count.
2921
// Must be called with pTsdb->lruMutex held by the caller.
2922
int32_t tsdbRebuildLastCache(STsdb *pTsdb, int32_t numShardBits) {
1,536✔
2923
  size_t     cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
1,536✔
2924
  SLRUCache *pNewCache = taosLRUCacheInit(cfgCapacity, numShardBits, .5);
1,536✔
2925
  if (pNewCache == NULL) {
1,536✔
2926
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
2927
  }
2928
  taosLRUCacheSetStrictCapacity(pNewCache, false);
1,536✔
2929

2930
  // Swap in the new cache and clean up the old one
2931
  SLRUCache *pOldCache = pTsdb->lruCache;
1,536✔
2932
  pTsdb->lruCache = pNewCache;
1,536✔
2933

2934
  if (pOldCache) {
1,536✔
2935
    int64_t start = taosGetTimestampMs();
1,536✔
2936
    taosLRUCacheEraseUnrefEntries(pOldCache);
1,536✔
2937
    taosLRUCacheCleanup(pOldCache);
1,536✔
2938
    int64_t end = taosGetTimestampMs();
1,536✔
2939
    tsdbInfo("vgId:%d, lruCache erase unref entries and cleanup time:%" PRId64 " ms", TD_VID(pTsdb->pVnode),
1,536✔
2940
             end - start);
2941
  }
2942

2943
  tsdbInfo("vgId:%d, lruCache rebuilt with capacity:%zu bytes, numShards:%d (configured:%d)", TD_VID(pTsdb->pVnode),
1,536✔
2944
           cfgCapacity, taosLRUCacheGetNumShards(pNewCache), numShardBits);
2945

2946
  TAOS_RETURN(0);
1,536✔
2947
}
2948

2949
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
2950
  if (cacheType == 0) {  // last_row
×
2951
    *(uint64_t *)key = (uint64_t)uid;
×
2952
  } else {  // last
2953
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2954
  }
2955

2956
  *len = sizeof(uint64_t);
×
2957
}
×
2958

2959
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
2960
  tb_uid_t suid = 0;
×
2961

2962
  SMetaReader mr = {0};
×
2963
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
2964
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
2965
    metaReaderClear(&mr);  // table not esist
×
2966
    return 0;
×
2967
  }
2968

2969
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
2970
    suid = mr.me.ctbEntry.suid;
×
2971
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
2972
    suid = 0;
×
2973
  } else {
2974
    suid = 0;
×
2975
  }
2976

2977
  metaReaderClear(&mr);
×
2978

2979
  return suid;
×
2980
}
2981

2982
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
2983
  int32_t code = 0;
×
2984

2985
  if (pDelIdx) {
×
2986
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
2987
  }
2988

2989
  TAOS_RETURN(code);
×
2990
}
2991

2992
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
2993
  int32_t   code = 0;
×
2994
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
2995

2996
  for (; pDelData; pDelData = pDelData->pNext) {
×
2997
    if (!taosArrayPush(aDelData, pDelData)) {
×
2998
      TAOS_RETURN(terrno);
×
2999
    }
3000
  }
3001

3002
  TAOS_RETURN(code);
×
3003
}
3004

3005
static uint64_t *getUidList(SCacheRowsReader *pReader) {
15,831✔
3006
  if (!pReader->uidList) {
15,831✔
3007
    int32_t numOfTables = pReader->numOfTables;
4,099✔
3008

3009
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
4,099✔
3010
    if (!pReader->uidList) {
4,099✔
3011
      return NULL;
×
3012
    }
3013

3014
    for (int32_t i = 0; i < numOfTables; ++i) {
16,234✔
3015
      uint64_t uid = pReader->pTableList[i].uid;
12,135✔
3016
      pReader->uidList[i] = uid;
12,135✔
3017
    }
3018

3019
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
4,099✔
3020
  }
3021

3022
  return pReader->uidList;
15,831✔
3023
}
3024

3025
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
15,831✔
3026
                               bool isFile) {
3027
  int32_t   code = 0;
15,831✔
3028
  int32_t   numOfTables = pReader->numOfTables;
15,831✔
3029
  int64_t   suid = pReader->info.suid;
15,831✔
3030
  uint64_t *uidList = getUidList(pReader);
15,831✔
3031

3032
  if (!uidList) {
15,831✔
3033
    TAOS_RETURN(terrno);
×
3034
  }
3035

3036
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
22,903✔
3037
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
7,072✔
3038
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
7,072✔
3039
      continue;
×
3040
    }
3041

3042
    if (pTombBlk->minTbid.suid > suid ||
7,072✔
3043
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
7,072✔
3044
      break;
3045
    }
3046

3047
    STombBlock block = {0};
7,072✔
3048
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
7,072✔
3049
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
7,072✔
3050
    if (code != TSDB_CODE_SUCCESS) {
7,072✔
3051
      TAOS_RETURN(code);
×
3052
    }
3053

3054
    uint64_t        uid = uidList[j];
7,072✔
3055
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
7,072✔
3056
    if (!pInfo) {
7,072✔
3057
      tTombBlockDestroy(&block);
×
3058
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
3059
    }
3060

3061
    if (pInfo->pTombData == NULL) {
7,072✔
3062
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
884✔
3063
    }
3064

3065
    STombRecord record = {0};
7,072✔
3066
    bool        finished = false;
7,072✔
3067
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
14,144✔
3068
      code = tTombBlockGet(&block, k, &record);
7,072✔
3069
      if (code != TSDB_CODE_SUCCESS) {
7,072✔
3070
        finished = true;
×
3071
        break;
×
3072
      }
3073

3074
      if (record.suid < suid) {
7,072✔
3075
        continue;
×
3076
      }
3077
      if (record.suid > suid) {
7,072✔
3078
        finished = true;
×
3079
        break;
×
3080
      }
3081

3082
      bool newTable = false;
7,072✔
3083
      if (uid < record.uid) {
7,072✔
3084
        while (j < numOfTables && uidList[j] < record.uid) {
42,432✔
3085
          ++j;
35,360✔
3086
          newTable = true;
35,360✔
3087
        }
3088

3089
        if (j >= numOfTables) {
7,072✔
3090
          finished = true;
×
3091
          break;
×
3092
        }
3093

3094
        uid = uidList[j];
7,072✔
3095
      }
3096

3097
      if (record.uid < uid) {
7,072✔
3098
        continue;
×
3099
      }
3100

3101
      if (newTable) {
7,072✔
3102
        pInfo = getTableLoadInfo(pReader, uid);
7,072✔
3103
        if (!pInfo) {
7,072✔
3104
          code = TSDB_CODE_OUT_OF_MEMORY;
×
3105
          finished = true;
×
3106
          break;
×
3107
        }
3108
        if (pInfo->pTombData == NULL) {
7,072✔
3109
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
884✔
3110
          if (!pInfo->pTombData) {
884✔
3111
            code = terrno;
×
3112
            finished = true;
×
3113
            break;
×
3114
          }
3115
        }
3116
      }
3117

3118
      if (record.version <= pReader->info.verRange.maxVer) {
7,072✔
3119
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
3120
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
3121

3122
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
7,072✔
3123
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
14,144✔
3124
          TAOS_RETURN(terrno);
×
3125
        }
3126
      }
3127
    }
3128

3129
    tTombBlockDestroy(&block);
7,072✔
3130

3131
    if (finished) {
7,072✔
3132
      TAOS_RETURN(code);
×
3133
    }
3134
  }
3135

3136
  TAOS_RETURN(TSDB_CODE_SUCCESS);
15,831✔
3137
}
3138

3139
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
7,072✔
3140
  const TTombBlkArray *pBlkArray = NULL;
7,072✔
3141

3142
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
7,072✔
3143

3144
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
7,072✔
3145
}
3146

3147
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
8,759✔
3148
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
8,759✔
3149
  const TTombBlkArray *pBlkArray = NULL;
8,759✔
3150

3151
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
8,759✔
3152

3153
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
8,759✔
3154
}
3155

3156
typedef struct {
3157
  SMergeTree  mergeTree;
3158
  SMergeTree *pMergeTree;
3159
} SFSLastIter;
3160

3161
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
15,831✔
3162
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
3163
  int32_t code = 0;
15,831✔
3164
  destroySttBlockReader(pr->pLDataIterArray, NULL);
15,831✔
3165
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
15,831✔
3166
  if (pr->pLDataIterArray == NULL) return terrno;
15,831✔
3167

3168
  SMergeTreeConf conf = {
15,831✔
3169
      .uid = uid,
3170
      .suid = suid,
3171
      .pTsdb = pTsdb,
3172
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
3173
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
3174
      .strictTimeRange = false,
3175
      .cacheStatis = false,
3176
      .pSchema = pTSchema,
3177
      .pCurrentFileset = pFileSet,
3178
      .backward = 1,
3179
      .pSttFileBlockIterArray = pr->pLDataIterArray,
15,831✔
3180
      .pCols = aCols,
3181
      .numOfCols = nCols,
3182
      .loadTombFn = loadSttTomb,
3183
      .pReader = pr,
3184
      .idstr = pr->idstr,
15,831✔
3185
      .pCurRowKey = &pr->rowKey,
15,831✔
3186
  };
3187

3188
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
15,831✔
3189

3190
  iter->pMergeTree = &iter->mergeTree;
15,831✔
3191

3192
  TAOS_RETURN(code);
15,831✔
3193
}
3194

3195
static int32_t lastIterClose(SFSLastIter **iter) {
884✔
3196
  int32_t code = 0;
884✔
3197

3198
  if ((*iter)->pMergeTree) {
884✔
3199
    tMergeTreeClose((*iter)->pMergeTree);
884✔
3200
    (*iter)->pMergeTree = NULL;
884✔
3201
  }
3202

3203
  *iter = NULL;
884✔
3204

3205
  TAOS_RETURN(code);
884✔
3206
}
3207

3208
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
17,693✔
3209
  bool hasVal = false;
17,693✔
3210
  *ppRow = NULL;
17,693✔
3211

3212
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
17,693✔
3213
  if (code != 0) {
17,693✔
3214
    return code;
×
3215
  }
3216

3217
  if (!hasVal) {
17,693✔
3218
    *ppRow = NULL;
14,059✔
3219
    TAOS_RETURN(code);
14,059✔
3220
  }
3221

3222
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
3,634✔
3223
  TAOS_RETURN(code);
3,634✔
3224
}
3225

3226
typedef enum SFSNEXTROWSTATES {
3227
  SFSNEXTROW_FS,
3228
  SFSNEXTROW_FILESET,
3229
  SFSNEXTROW_INDEXLIST,
3230
  SFSNEXTROW_BRINBLOCK,
3231
  SFSNEXTROW_BRINRECORD,
3232
  SFSNEXTROW_BLOCKDATA,
3233
  SFSNEXTROW_BLOCKROW,
3234
  SFSNEXTROW_NEXTSTTROW
3235
} SFSNEXTROWSTATES;
3236

3237
struct CacheNextRowIter;
3238

3239
typedef struct SFSNextRowIter {
3240
  SFSNEXTROWSTATES         state;         // [input]
3241
  SBlockIdx               *pBlockIdxExp;  // [input]
3242
  STSchema                *pTSchema;      // [input]
3243
  tb_uid_t                 suid;
3244
  tb_uid_t                 uid;
3245
  int32_t                  iFileSet;
3246
  STFileSet               *pFileSet;
3247
  TFileSetArray           *aDFileSet;
3248
  SArray                  *pIndexList;
3249
  int32_t                  iBrinIndex;
3250
  SBrinBlock               brinBlock;
3251
  SBrinBlock              *pBrinBlock;
3252
  int32_t                  iBrinRecord;
3253
  SBrinRecord              brinRecord;
3254
  SBlockData               blockData;
3255
  SBlockData              *pBlockData;
3256
  int32_t                  nRow;
3257
  int32_t                  iRow;
3258
  TSDBROW                  row;
3259
  int64_t                  lastTs;
3260
  SFSLastIter              lastIter;
3261
  SFSLastIter             *pLastIter;
3262
  int8_t                   lastEmpty;
3263
  TSDBROW                 *pLastRow;
3264
  SRow                    *pTSRow;
3265
  SRowMerger               rowMerger;
3266
  SCacheRowsReader        *pr;
3267
  struct CacheNextRowIter *pRowIter;
3268
} SFSNextRowIter;
3269

3270
static void clearLastFileSet(SFSNextRowIter *state);
3271

3272
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
107,827✔
3273
                                int nCols) {
3274
  int32_t         code = 0, lino = 0;
107,827✔
3275
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
107,827✔
3276
  STsdb          *pTsdb = state->pr->pTsdb;
107,827✔
3277

3278
  if (SFSNEXTROW_FS == state->state) {
107,827✔
3279
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
106,849✔
3280

3281
    state->state = SFSNEXTROW_FILESET;
106,849✔
3282
  }
3283

3284
  if (SFSNEXTROW_FILESET == state->state) {
107,827✔
3285
  _next_fileset:
119,140✔
3286
    clearLastFileSet(state);
119,140✔
3287

3288
    if (--state->iFileSet < 0) {
119,140✔
3289
      *ppRow = NULL;
103,309✔
3290

3291
      TAOS_RETURN(code);
103,309✔
3292
    } else {
3293
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
15,831✔
3294
    }
3295

3296
    STFileObj **pFileObj = state->pFileSet->farr;
15,831✔
3297
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
15,831✔
3298
      if (state->pFileSet != state->pr->pCurFileSet) {
7,072✔
3299
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
7,072✔
3300
        const char           *filesName[4] = {0};
7,072✔
3301
        if (pFileObj[0] != NULL) {
7,072✔
3302
          conf.files[0].file = *pFileObj[0]->f;
7,072✔
3303
          conf.files[0].exist = true;
7,072✔
3304
          filesName[0] = pFileObj[0]->fname;
7,072✔
3305

3306
          conf.files[1].file = *pFileObj[1]->f;
7,072✔
3307
          conf.files[1].exist = true;
7,072✔
3308
          filesName[1] = pFileObj[1]->fname;
7,072✔
3309

3310
          conf.files[2].file = *pFileObj[2]->f;
7,072✔
3311
          conf.files[2].exist = true;
7,072✔
3312
          filesName[2] = pFileObj[2]->fname;
7,072✔
3313
        }
3314

3315
        if (pFileObj[3] != NULL) {
7,072✔
3316
          conf.files[3].exist = true;
7,072✔
3317
          conf.files[3].file = *pFileObj[3]->f;
7,072✔
3318
          filesName[3] = pFileObj[3]->fname;
7,072✔
3319
        }
3320

3321
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
7,072✔
3322

3323
        state->pr->pCurFileSet = state->pFileSet;
7,072✔
3324

3325
        code = loadDataTomb(state->pr, state->pr->pFileReader);
7,072✔
3326
        if (code != TSDB_CODE_SUCCESS) {
7,072✔
3327
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3328
                    tstrerror(code));
3329
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3330
        }
3331

3332
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
7,072✔
3333
      }
3334

3335
      if (!state->pIndexList) {
7,072✔
3336
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
7,072✔
3337
        if (!state->pIndexList) {
7,072✔
3338
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3339
        }
3340
      } else {
3341
        taosArrayClear(state->pIndexList);
×
3342
      }
3343

3344
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
7,072✔
3345

3346
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
14,144✔
3347
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
7,072✔
3348
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
7,072✔
3349
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
7,072✔
3350
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
1,768✔
3351
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3352
            }
3353
          }
3354
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
3355
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
3356
          break;
3357
        }
3358
      }
3359

3360
      int indexSize = TARRAY_SIZE(state->pIndexList);
7,072✔
3361
      if (indexSize <= 0) {
7,072✔
3362
        goto _check_stt_data;
6,188✔
3363
      }
3364

3365
      state->state = SFSNEXTROW_INDEXLIST;
884✔
3366
      state->iBrinIndex = 1;
884✔
3367
    }
3368

3369
  _check_stt_data:
15,831✔
3370
    if (state->pFileSet != state->pr->pCurFileSet) {
15,831✔
3371
      state->pr->pCurFileSet = state->pFileSet;
7,835✔
3372
    }
3373

3374
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
15,831✔
3375
                                 state->pr, state->lastTs, aCols, nCols),
3376
                    &lino, _err);
3377

3378
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
15,831✔
3379

3380
    if (!state->pLastRow) {
15,831✔
3381
      state->lastEmpty = 1;
12,616✔
3382

3383
      if (SFSNEXTROW_INDEXLIST != state->state) {
12,616✔
3384
        clearLastFileSet(state);
11,732✔
3385
        goto _next_fileset;
11,732✔
3386
      }
3387
    } else {
3388
      state->lastEmpty = 0;
3,215✔
3389

3390
      if (SFSNEXTROW_INDEXLIST != state->state) {
3,215✔
3391
        state->state = SFSNEXTROW_NEXTSTTROW;
3,215✔
3392

3393
        *ppRow = state->pLastRow;
3,215✔
3394
        state->pLastRow = NULL;
3,215✔
3395

3396
        TAOS_RETURN(code);
3,215✔
3397
      }
3398
    }
3399

3400
    state->pLastIter = &state->lastIter;
884✔
3401
  }
3402

3403
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
1,862✔
3404
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
978✔
3405

3406
    if (!state->pLastRow) {
978✔
3407
      if (state->pLastIter) {
559✔
3408
        code = lastIterClose(&state->pLastIter);
×
3409
        if (code != TSDB_CODE_SUCCESS) {
×
3410
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3411
                    tstrerror(code));
3412
          TAOS_RETURN(code);
×
3413
        }
3414
      }
3415

3416
      clearLastFileSet(state);
559✔
3417
      state->state = SFSNEXTROW_FILESET;
559✔
3418
      goto _next_fileset;
559✔
3419
    } else {
3420
      *ppRow = state->pLastRow;
419✔
3421
      state->pLastRow = NULL;
419✔
3422

3423
      TAOS_RETURN(code);
419✔
3424
    }
3425
  }
3426

3427
  if (SFSNEXTROW_INDEXLIST == state->state) {
884✔
3428
    SBrinBlk *pBrinBlk = NULL;
884✔
3429
  _next_brinindex:
884✔
3430
    if (--state->iBrinIndex < 0) {
884✔
3431
      if (state->pLastRow) {
×
3432
        state->state = SFSNEXTROW_NEXTSTTROW;
×
3433
        *ppRow = state->pLastRow;
×
3434
        state->pLastRow = NULL;
×
3435
        return code;
×
3436
      }
3437

3438
      clearLastFileSet(state);
×
3439
      goto _next_fileset;
×
3440
    } else {
3441
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
884✔
3442
    }
3443

3444
    if (!state->pBrinBlock) {
884✔
3445
      state->pBrinBlock = &state->brinBlock;
884✔
3446
    } else {
3447
      tBrinBlockClear(&state->brinBlock);
×
3448
    }
3449

3450
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
884✔
3451

3452
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
884✔
3453
    state->state = SFSNEXTROW_BRINBLOCK;
884✔
3454
  }
3455

3456
  if (SFSNEXTROW_BRINBLOCK == state->state) {
884✔
3457
  _next_brinrecord:
884✔
3458
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
884✔
3459
      tBrinBlockClear(&state->brinBlock);
×
3460
      goto _next_brinindex;
×
3461
    }
3462

3463
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
884✔
3464

3465
    SBrinRecord *pRecord = &state->brinRecord;
884✔
3466
    if (pRecord->uid != state->uid) {
884✔
3467
      // TODO: goto next brin block early
3468
      --state->iBrinRecord;
×
3469
      goto _next_brinrecord;
×
3470
    }
3471

3472
    state->state = SFSNEXTROW_BRINRECORD;
884✔
3473
  }
3474

3475
  if (SFSNEXTROW_BRINRECORD == state->state) {
884✔
3476
    SBrinRecord *pRecord = &state->brinRecord;
884✔
3477

3478
    if (!state->pBlockData) {
884✔
3479
      state->pBlockData = &state->blockData;
884✔
3480

3481
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
884✔
3482
    } else {
3483
      tBlockDataReset(state->pBlockData);
×
3484
    }
3485

3486
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
884✔
3487
      --nCols;
884✔
3488
      ++aCols;
884✔
3489
    }
3490

3491
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
884✔
3492
                                                      state->pTSchema, aCols, nCols),
3493
                    &lino, _err);
3494

3495
    state->nRow = state->blockData.nRow;
884✔
3496
    state->iRow = state->nRow - 1;
884✔
3497

3498
    state->state = SFSNEXTROW_BLOCKROW;
884✔
3499
  }
3500

3501
  if (SFSNEXTROW_BLOCKROW == state->state) {
884✔
3502
    if (state->iRow < 0) {
884✔
3503
      --state->iBrinRecord;
×
3504
      goto _next_brinrecord;
×
3505
    }
3506

3507
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
884✔
3508
    if (!state->pLastIter) {
884✔
3509
      *ppRow = &state->row;
×
3510
      --state->iRow;
×
3511
      return code;
×
3512
    }
3513

3514
    if (!state->pLastRow) {
884✔
3515
      // get next row from fslast and process with fs row, --state->Row if select fs row
3516
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
884✔
3517
    }
3518

3519
    if (!state->pLastRow) {
884✔
3520
      if (state->pLastIter) {
884✔
3521
        code = lastIterClose(&state->pLastIter);
884✔
3522
        if (code != TSDB_CODE_SUCCESS) {
884✔
3523
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3524
                    tstrerror(code));
3525
          TAOS_RETURN(code);
×
3526
        }
3527
      }
3528

3529
      *ppRow = &state->row;
884✔
3530
      --state->iRow;
884✔
3531
      return code;
884✔
3532
    }
3533

3534
    // process state->pLastRow & state->row
3535
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
3536
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
3537
    if (lastRowTs > rowTs) {
×
3538
      *ppRow = state->pLastRow;
×
3539
      state->pLastRow = NULL;
×
3540

3541
      TAOS_RETURN(code);
×
3542
    } else if (lastRowTs < rowTs) {
×
3543
      *ppRow = &state->row;
×
3544
      --state->iRow;
×
3545

3546
      TAOS_RETURN(code);
×
3547
    } else {
3548
      // TODO: merge rows and *ppRow = mergedRow
3549
      SRowMerger *pMerger = &state->rowMerger;
×
3550
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
3551
      if (code != TSDB_CODE_SUCCESS) {
×
3552
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3553
                  tstrerror(code));
3554
        TAOS_RETURN(code);
×
3555
      }
3556

3557
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
3558
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
3559

3560
      if (state->pTSRow) {
×
3561
        taosMemoryFree(state->pTSRow);
×
3562
        state->pTSRow = NULL;
×
3563
      }
3564

3565
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
3566

3567
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
3568
      *ppRow = &state->row;
×
3569
      --state->iRow;
×
3570

3571
      tsdbRowMergerClear(pMerger);
×
3572

3573
      TAOS_RETURN(code);
×
3574
    }
3575
  }
3576

3577
_err:
×
3578
  clearLastFileSet(state);
×
3579

3580
  *ppRow = NULL;
×
3581

3582
  if (code) {
×
3583
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3584
              tstrerror(code));
3585
  }
3586

3587
  TAOS_RETURN(code);
×
3588
}
3589

3590
typedef struct CacheNextRowIter {
3591
  SArray           *pMemDelData;
3592
  SArray           *pSkyline;
3593
  int64_t           iSkyline;
3594
  SBlockIdx         idx;
3595
  SMemNextRowIter   memState;
3596
  SMemNextRowIter   imemState;
3597
  SFSNextRowIter    fsState;
3598
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
3599
  TsdbNextRowState  input[3];
3600
  SCacheRowsReader *pr;
3601
  STsdb            *pTsdb;
3602
} CacheNextRowIter;
3603

3604
int32_t clearNextRowFromFS(void *iter) {
106,849✔
3605
  int32_t code = 0;
106,849✔
3606

3607
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
106,849✔
3608
  if (!state) {
106,849✔
3609
    TAOS_RETURN(code);
×
3610
  }
3611

3612
  if (state->pLastIter) {
106,849✔
3613
    code = lastIterClose(&state->pLastIter);
×
3614
    if (code != TSDB_CODE_SUCCESS) {
×
3615
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3616
      TAOS_RETURN(code);
×
3617
    }
3618
  }
3619

3620
  if (state->pBlockData) {
106,849✔
3621
    tBlockDataDestroy(state->pBlockData);
884✔
3622
    state->pBlockData = NULL;
884✔
3623
  }
3624

3625
  if (state->pBrinBlock) {
106,849✔
3626
    tBrinBlockDestroy(state->pBrinBlock);
884✔
3627
    state->pBrinBlock = NULL;
884✔
3628
  }
3629

3630
  if (state->pIndexList) {
106,849✔
3631
    taosArrayDestroy(state->pIndexList);
7,072✔
3632
    state->pIndexList = NULL;
7,072✔
3633
  }
3634

3635
  if (state->pTSRow) {
106,849✔
3636
    taosMemoryFree(state->pTSRow);
×
3637
    state->pTSRow = NULL;
×
3638
  }
3639

3640
  if (state->pRowIter->pSkyline) {
106,849✔
3641
    taosArrayDestroy(state->pRowIter->pSkyline);
103,279✔
3642
    state->pRowIter->pSkyline = NULL;
103,279✔
3643
  }
3644

3645
  TAOS_RETURN(code);
106,178✔
3646
}
3647

3648
static void clearLastFileSet(SFSNextRowIter *state) {
131,431✔
3649
  if (state->pLastIter) {
131,431✔
3650
    int code = lastIterClose(&state->pLastIter);
×
3651
    if (code != TSDB_CODE_SUCCESS) {
×
3652
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3653
      return;
×
3654
    }
3655
  }
3656

3657
  if (state->pBlockData) {
131,431✔
3658
    tBlockDataDestroy(state->pBlockData);
×
3659
    state->pBlockData = NULL;
×
3660
  }
3661

3662
  if (state->pr->pFileReader) {
131,431✔
3663
    tsdbDataFileReaderClose(&state->pr->pFileReader);
7,072✔
3664
    state->pr->pFileReader = NULL;
7,072✔
3665

3666
    state->pr->pCurFileSet = NULL;
7,072✔
3667
  }
3668

3669
  if (state->pTSRow) {
131,431✔
3670
    taosMemoryFree(state->pTSRow);
×
3671
    state->pTSRow = NULL;
×
3672
  }
3673

3674
  if (state->pRowIter->pSkyline) {
131,431✔
3675
    taosArrayDestroy(state->pRowIter->pSkyline);
559✔
3676
    state->pRowIter->pSkyline = NULL;
559✔
3677

3678
    void   *pe = NULL;
559✔
3679
    int32_t iter = 0;
559✔
3680
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
1,118✔
3681
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
559✔
3682
      taosArrayDestroy(pInfo->pTombData);
559✔
3683
      pInfo->pTombData = NULL;
559✔
3684
    }
3685
  }
3686
}
3687

3688
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
106,849✔
3689
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
3690
                               SCacheRowsReader *pr) {
3691
  int32_t code = 0, lino = 0;
106,849✔
3692

3693
  STbData *pMem = NULL;
106,849✔
3694
  if (pReadSnap->pMem) {
106,849✔
3695
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
106,178✔
3696
  }
3697

3698
  STbData *pIMem = NULL;
106,849✔
3699
  if (pReadSnap->pIMem) {
106,849✔
3700
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
3701
  }
3702

3703
  pIter->pTsdb = pTsdb;
106,849✔
3704

3705
  pIter->pMemDelData = NULL;
106,849✔
3706

3707
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
106,849✔
3708

3709
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
106,849✔
3710

3711
  pIter->fsState.pRowIter = pIter;
106,178✔
3712
  pIter->fsState.state = SFSNEXTROW_FS;
106,849✔
3713
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
106,849✔
3714
  pIter->fsState.pBlockIdxExp = &pIter->idx;
106,849✔
3715
  pIter->fsState.pTSchema = pTSchema;
106,178✔
3716
  pIter->fsState.suid = suid;
106,849✔
3717
  pIter->fsState.uid = uid;
106,849✔
3718
  pIter->fsState.lastTs = lastTs;
106,849✔
3719
  pIter->fsState.pr = pr;
106,178✔
3720

3721
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
106,178✔
3722
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
106,849✔
3723
  pIter->input[2] =
106,849✔
3724
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
106,849✔
3725

3726
  if (pMem) {
106,178✔
3727
    pIter->memState.pMem = pMem;
100,171✔
3728
    pIter->memState.state = SMEMNEXTROW_ENTER;
100,171✔
3729
    pIter->memState.lastTs = lastTs;
100,171✔
3730
    pIter->input[0].stop = false;
100,171✔
3731
    pIter->input[0].next = true;
100,171✔
3732
  }
3733

3734
  if (pIMem) {
106,178✔
3735
    pIter->imemState.pMem = pIMem;
×
3736
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
3737
    pIter->imemState.lastTs = lastTs;
×
3738
    pIter->input[1].stop = false;
×
3739
    pIter->input[1].next = true;
×
3740
  }
3741

3742
  pIter->pr = pr;
106,178✔
3743

3744
_err:
106,849✔
3745
  TAOS_RETURN(code);
106,849✔
3746
}
3747

3748
static void nextRowIterClose(CacheNextRowIter *pIter) {
106,849✔
3749
  for (int i = 0; i < 3; ++i) {
427,396✔
3750
    if (pIter->input[i].nextRowClearFn) {
319,876✔
3751
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
106,849✔
3752
    }
3753
  }
3754

3755
  if (pIter->pSkyline) {
107,520✔
3756
    taosArrayDestroy(pIter->pSkyline);
×
3757
  }
3758

3759
  if (pIter->pMemDelData) {
106,849✔
3760
    taosArrayDestroy(pIter->pMemDelData);
106,849✔
3761
  }
3762
}
106,178✔
3763

3764
// iterate next row non deleted backward ts, version (from high to low)
3765
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
112,572✔
3766
                              int16_t *aCols, int nCols) {
3767
  int32_t code = 0, lino = 0;
112,572✔
3768

3769
  for (;;) {
559✔
3770
    for (int i = 0; i < 3; ++i) {
451,182✔
3771
      if (pIter->input[i].next && !pIter->input[i].stop) {
338,722✔
3772
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
213,302✔
3773
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3774
                        &lino, _err);
3775

3776
        if (pIter->input[i].pRow == NULL) {
212,631✔
3777
          pIter->input[i].stop = true;
106,403✔
3778
          pIter->input[i].next = false;
106,403✔
3779
        }
3780
      }
3781
    }
3782

3783
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
112,460✔
3784
      *ppRow = NULL;
6,664✔
3785
      *pIgnoreEarlierTs =
13,328✔
3786
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
6,664✔
3787

3788
      TAOS_RETURN(code);
6,664✔
3789
    }
3790

3791
    // select maxpoint(s) from mem, imem, fs and last
3792
    TSDBROW *max[4] = {0};
106,467✔
3793
    int      iMax[4] = {-1, -1, -1, -1};
106,467✔
3794
    int      nMax = 0;
106,467✔
3795
    SRowKey  maxKey = {.ts = TSKEY_MIN};
106,467✔
3796

3797
    for (int i = 0; i < 3; ++i) {
423,855✔
3798
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
318,059✔
3799
        STsdbRowKey tsdbRowKey = {0};
106,787✔
3800
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
106,787✔
3801

3802
        // merging & deduplicating on client side
3803
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
107,458✔
3804
        if (c <= 0) {
107,458✔
3805
          if (c < 0) {
107,458✔
3806
            nMax = 0;
107,458✔
3807
            maxKey = tsdbRowKey.key;
107,458✔
3808
          }
3809

3810
          iMax[nMax] = i;
107,458✔
3811
          max[nMax++] = pIter->input[i].pRow;
107,458✔
3812
        }
3813
        pIter->input[i].next = false;
107,458✔
3814
      }
3815
    }
3816

3817
    // delete detection
3818
    TSDBROW *merge[4] = {0};
105,796✔
3819
    int      iMerge[4] = {-1, -1, -1, -1};
105,796✔
3820
    int      nMerge = 0;
107,138✔
3821
    for (int i = 0; i < nMax; ++i) {
212,934✔
3822
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
106,467✔
3823

3824
      if (!pIter->pSkyline) {
106,467✔
3825
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
103,838✔
3826
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
103,838✔
3827

3828
        uint64_t        uid = pIter->idx.uid;
103,838✔
3829
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
103,838✔
3830
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
103,167✔
3831

3832
        if (pInfo->pTombData == NULL) {
103,167✔
3833
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
102,070✔
3834
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
101,399✔
3835
        }
3836

3837
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
103,167✔
3838
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3839
        }
3840

3841
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
103,838✔
3842
        if (delSize > 0) {
103,838✔
3843
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
2,002✔
3844
          TAOS_CHECK_GOTO(code, &lino, _err);
2,002✔
3845
        }
3846
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
103,838✔
3847
      }
3848

3849
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
106,467✔
3850
      if (!deleted) {
106,467✔
3851
        iMerge[nMerge] = iMax[i];
105,908✔
3852
        merge[nMerge++] = max[i];
105,908✔
3853
      }
3854

3855
      pIter->input[iMax[i]].next = deleted;
105,796✔
3856
    }
3857

3858
    if (nMerge > 0) {
106,467✔
3859
      pIter->input[iMerge[0]].next = true;
105,908✔
3860

3861
      *ppRow = merge[0];
105,908✔
3862

3863
      TAOS_RETURN(code);
105,237✔
3864
    }
3865
  }
3866

3867
_err:
×
3868
  if (code) {
×
3869
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3870
  }
3871

3872
  TAOS_RETURN(code);
×
3873
}
3874

3875
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
106,849✔
3876
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
106,849✔
3877
  if (NULL == pColArray) {
106,849✔
3878
    TAOS_RETURN(terrno);
×
3879
  }
3880

3881
  for (int32_t i = 0; i < nCols; ++i) {
359,224✔
3882
    int16_t  slotId = slotIds[i];
252,375✔
3883
    SLastCol col = {.rowKey.ts = 0,
252,375✔
3884
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
251,704✔
3885
    if (!taosArrayPush(pColArray, &col)) {
252,375✔
3886
      TAOS_RETURN(terrno);
×
3887
    }
3888
  }
3889
  *ppColArray = pColArray;
106,849✔
3890

3891
  TAOS_RETURN(TSDB_CODE_SUCCESS);
106,849✔
3892
}
3893

3894
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
54,978✔
3895
                            int nCols, int16_t *slotIds) {
3896
  int32_t   code = 0, lino = 0;
54,978✔
3897
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
54,978✔
3898
  int16_t   nLastCol = nCols;
54,978✔
3899
  int16_t   noneCol = 0;
54,978✔
3900
  bool      setNoneCol = false;
54,978✔
3901
  bool      hasRow = false;
54,978✔
3902
  bool      ignoreEarlierTs = false;
54,978✔
3903
  SArray   *pColArray = NULL;
54,978✔
3904
  SColVal  *pColVal = &(SColVal){0};
54,978✔
3905

3906
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
54,978✔
3907

3908
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
54,978✔
3909
  if (NULL == aColArray) {
54,978✔
3910
    taosArrayDestroy(pColArray);
×
3911

3912
    TAOS_RETURN(terrno);
×
3913
  }
3914

3915
  for (int i = 0; i < nCols; ++i) {
184,768✔
3916
    if (!taosArrayPush(aColArray, &aCols[i])) {
259,580✔
3917
      taosArrayDestroy(pColArray);
×
3918

3919
      TAOS_RETURN(terrno);
×
3920
    }
3921
  }
3922

3923
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
54,978✔
3924

3925
  // inverse iterator
3926
  CacheNextRowIter iter = {0};
54,978✔
3927
  code =
3928
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
54,978✔
3929
  TAOS_CHECK_GOTO(code, &lino, _err);
54,978✔
3930

3931
  do {
3932
    TSDBROW *pRow = NULL;
60,701✔
3933
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
60,701✔
3934

3935
    if (!pRow) {
60,701✔
3936
      break;
5,803✔
3937
    }
3938

3939
    hasRow = true;
54,898✔
3940

3941
    int32_t sversion = TSDBROW_SVERSION(pRow);
54,898✔
3942
    if (sversion != -1) {
54,898✔
3943
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
51,800✔
3944

3945
      pTSchema = pr->pCurrSchema;
51,800✔
3946
    }
3947
    // int16_t nCol = pTSchema->numOfCols;
3948

3949
    STsdbRowKey rowKey = {0};
54,898✔
3950
    tsdbRowGetKey(pRow, &rowKey);
54,898✔
3951

3952
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
54,898✔
3953
      lastRowKey = rowKey;
52,269✔
3954

3955
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
174,896✔
3956
        if (iCol >= nLastCol) {
122,627✔
3957
          break;
×
3958
        }
3959
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
122,627✔
3960
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
122,627✔
UNCOV
3961
          if (!setNoneCol) {
×
3962
            noneCol = iCol;
×
3963
            setNoneCol = true;
×
3964
          }
UNCOV
3965
          continue;
×
3966
        }
3967
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
122,627✔
3968
          continue;
×
3969
        }
3970
        if (slotIds[iCol] == 0) {
122,627✔
3971
          STColumn *pTColumn = &pTSchema->columns[0];
52,269✔
3972
          SValue    val = {.type = pTColumn->type};
52,269✔
3973
          VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
52,269✔
3974
          *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
52,269✔
3975

3976
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
52,269✔
3977
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
52,269✔
3978

3979
          taosArraySet(pColArray, 0, &colTmp);
52,269✔
3980
          continue;
52,269✔
3981
        }
3982
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
70,358✔
3983

3984
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
70,358✔
3985
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
70,358✔
3986

3987
        if (!COL_VAL_IS_VALUE(pColVal)) {
70,358✔
3988
          if (!setNoneCol) {
7,910✔
3989
            noneCol = iCol;
4,839✔
3990
            setNoneCol = true;
4,839✔
3991
          }
3992
        } else {
3993
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
62,448✔
3994
          if (aColIndex >= 0) {
62,448✔
3995
            taosArrayRemove(aColArray, aColIndex);
62,448✔
3996
          }
3997
        }
3998
      }
3999
      if (!setNoneCol) {
52,269✔
4000
        // done, goto return pColArray
4001
        break;
47,430✔
4002
      } else {
4003
        continue;
4,839✔
4004
      }
4005
    }
4006

4007
    // merge into pColArray
4008
    setNoneCol = false;
2,629✔
4009
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
9,213✔
4010
      if (iCol >= nLastCol) {
6,584✔
4011
        break;
×
4012
      }
4013
      // high version's column value
4014
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
6,584✔
UNCOV
4015
        continue;
×
4016
      }
4017

4018
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
6,584✔
4019
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
6,584✔
4020
        continue;
×
4021
      }
4022
      SColVal *tColVal = &lastColVal->colVal;
6,584✔
4023
      if (COL_VAL_IS_VALUE(tColVal)) continue;
6,584✔
4024

4025
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
4,374✔
4026
      if (COL_VAL_IS_VALUE(pColVal)) {
4,374✔
4027
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
3,490✔
4028
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
3,490✔
4029

4030
        tsdbCacheFreeSLastColItem(lastColVal);
3,490✔
4031
        taosArraySet(pColArray, iCol, &lastCol);
3,490✔
4032
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
3,490✔
4033
        if (aColIndex >= 0) {
3,490✔
4034
          taosArrayRemove(aColArray, aColIndex);
3,490✔
4035
        }
4036
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
884✔
4037
        noneCol = iCol;
884✔
4038
        setNoneCol = true;
884✔
4039
      }
4040
    }
4041
  } while (setNoneCol);
7,468✔
4042

4043
  if (!hasRow) {
54,978✔
4044
    if (ignoreEarlierTs) {
2,709✔
4045
      taosArrayDestroy(pColArray);
×
4046
      pColArray = NULL;
×
4047
    } else {
4048
      taosArrayClear(pColArray);
2,709✔
4049
    }
4050
  }
4051
  *ppLastArray = pColArray;
54,978✔
4052

4053
  nextRowIterClose(&iter);
54,978✔
4054
  taosArrayDestroy(aColArray);
54,978✔
4055

4056
  TAOS_RETURN(code);
54,978✔
4057

4058
_err:
×
4059
  nextRowIterClose(&iter);
×
4060
  // taosMemoryFreeClear(pTSchema);
4061
  *ppLastArray = NULL;
×
4062
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
4063
  taosArrayDestroy(aColArray);
×
4064

4065
  if (code) {
×
4066
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4067
              tstrerror(code));
4068
  }
4069

4070
  TAOS_RETURN(code);
×
4071
}
4072

4073
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
51,871✔
4074
                               int nCols, int16_t *slotIds) {
4075
  int32_t   code = 0, lino = 0;
51,871✔
4076
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
51,871✔
4077
  int16_t   nLastCol = nCols;
51,871✔
4078
  int16_t   noneCol = 0;
51,871✔
4079
  bool      setNoneCol = false;
51,871✔
4080
  bool      hasRow = false;
51,871✔
4081
  bool      ignoreEarlierTs = false;
51,871✔
4082
  SArray   *pColArray = NULL;
51,871✔
4083
  SColVal  *pColVal = &(SColVal){0};
51,871✔
4084

4085
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
51,871✔
4086

4087
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
51,871✔
4088
  if (NULL == aColArray) {
51,871✔
4089
    taosArrayDestroy(pColArray);
×
4090

4091
    TAOS_RETURN(terrno);
×
4092
  }
4093

4094
  for (int i = 0; i < nCols; ++i) {
174,456✔
4095
    if (!taosArrayPush(aColArray, &aCols[i])) {
245,170✔
4096
      taosArrayDestroy(pColArray);
×
4097

4098
      TAOS_RETURN(terrno);
×
4099
    }
4100
  }
4101

4102
  // inverse iterator
4103
  CacheNextRowIter iter = {0};
51,871✔
4104
  code =
4105
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
51,871✔
4106
  TAOS_CHECK_GOTO(code, &lino, _err);
51,871✔
4107

4108
  do {
4109
    TSDBROW *pRow = NULL;
51,871✔
4110
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
51,871✔
4111

4112
    if (!pRow) {
51,200✔
4113
      break;
861✔
4114
    }
4115

4116
    hasRow = true;
50,339✔
4117

4118
    int32_t sversion = TSDBROW_SVERSION(pRow);
50,339✔
4119
    if (sversion != -1) {
51,010✔
4120
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
50,149✔
4121

4122
      pTSchema = pr->pCurrSchema;
50,149✔
4123
    }
4124
    // int16_t nCol = pTSchema->numOfCols;
4125

4126
    STsdbRowKey rowKey = {0};
51,010✔
4127
    tsdbRowGetKey(pRow, &rowKey);
51,010✔
4128

4129
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
169,457✔
4130
      if (iCol >= nLastCol) {
119,118✔
4131
        break;
×
4132
      }
4133
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
119,118✔
4134
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
119,118✔
4135
        continue;
×
4136
      }
4137
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
119,118✔
4138
        continue;
×
4139
      }
4140
      if (slotIds[iCol] == 0) {
119,118✔
4141
        STColumn *pTColumn = &pTSchema->columns[0];
50,339✔
4142
        SValue    val = {.type = pTColumn->type};
51,010✔
4143
        VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
51,010✔
4144
        *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
51,010✔
4145

4146
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
51,010✔
4147
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
50,339✔
4148

4149
        taosArraySet(pColArray, 0, &colTmp);
51,010✔
4150
        continue;
51,010✔
4151
      }
4152
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
68,108✔
4153

4154
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
68,108✔
4155
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
68,108✔
4156

4157
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
68,108✔
4158
      if (aColIndex >= 0) {
68,108✔
4159
        taosArrayRemove(aColArray, aColIndex);
68,108✔
4160
      }
4161
    }
4162

4163
    break;
50,339✔
4164
  } while (1);
4165

4166
  if (!hasRow) {
51,200✔
4167
    if (ignoreEarlierTs) {
861✔
4168
      taosArrayDestroy(pColArray);
×
4169
      pColArray = NULL;
×
4170
    } else {
4171
      taosArrayClear(pColArray);
861✔
4172
    }
4173
  }
4174
  *ppLastArray = pColArray;
51,200✔
4175

4176
  nextRowIterClose(&iter);
51,871✔
4177
  taosArrayDestroy(aColArray);
51,200✔
4178

4179
  TAOS_RETURN(code);
51,871✔
4180

4181
_err:
×
4182
  nextRowIterClose(&iter);
×
4183

4184
  *ppLastArray = NULL;
×
4185
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
4186
  taosArrayDestroy(aColArray);
×
4187

4188
  if (code) {
×
4189
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4190
              tstrerror(code));
4191
  }
4192

4193
  TAOS_RETURN(code);
×
4194
}
4195

4196
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
11,810,304✔
4197

4198
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
11,031✔
4199
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
11,031✔
4200
}
11,031✔
4201

4202
#ifdef BUILD_NO_CALL
4203
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
4204
#endif
4205

4206
size_t tsdbCacheGetUsage(SVnode *pVnode) {
146,303,984✔
4207
  size_t usage = 0;
146,303,984✔
4208
  if (pVnode->pTsdb != NULL) {
146,303,984✔
4209
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
146,303,984✔
4210
  }
4211

4212
  return usage;
146,303,984✔
4213
}
4214

4215
int32_t tsdbCacheGetElems(SVnode *pVnode) {
146,303,984✔
4216
  int32_t elems = 0;
146,303,984✔
4217
  if (pVnode->pTsdb != NULL) {
146,303,984✔
4218
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
146,303,984✔
4219
  }
4220

4221
  return elems;
146,303,984✔
4222
}
4223

4224
#ifdef USE_SHARED_STORAGE
4225
// block cache
4226
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
11,814,912✔
4227
  struct {
4228
    int32_t fid;
4229
    int64_t commitID;
4230
    int64_t blkno;
4231
  } bKey = {0};
11,814,912✔
4232

4233
  bKey.fid = fid;
11,814,912✔
4234
  bKey.commitID = commitID;
11,814,912✔
4235
  bKey.blkno = blkno;
11,814,912✔
4236

4237
  *len = sizeof(bKey);
11,814,912✔
4238
  memcpy(key, &bKey, *len);
11,814,912✔
4239
}
11,814,912✔
4240

4241
static int32_t tsdbCacheLoadBlockSs(STsdbFD *pFD, uint8_t **ppBlock) {
×
4242
  int32_t code = 0;
×
4243

4244
  int64_t block_size = tsSsBlockSize * pFD->szPage;
×
4245
  int64_t block_offset = (pFD->blkno - 1) * block_size;
×
4246

4247
  char *buf = taosMemoryMalloc(block_size);
×
4248
  if (buf == NULL) {
×
4249
    code = TSDB_CODE_OUT_OF_MEMORY;
×
4250
    goto _exit;
×
4251
  }
4252

4253
  // TODO: pFD->objName is not initialized, but this function is never called.
4254
  code = tssReadFileFromDefault(pFD->objName, block_offset, buf, &block_size);
×
4255
  if (code != TSDB_CODE_SUCCESS) {
×
4256
    taosMemoryFree(buf);
×
4257
    goto _exit;
×
4258
  }
4259
  *ppBlock = buf;
×
4260

4261
_exit:
×
4262
  return code;
×
4263
}
4264

4265
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
9,823,488✔
4266
  (void)ud;
4267
  uint8_t *pBlock = (uint8_t *)value;
9,823,488✔
4268

4269
  taosMemoryFree(pBlock);
9,823,488✔
4270
}
9,823,488✔
4271

4272
int32_t tsdbCacheGetBlockSs(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
4273
  int32_t code = 0;
×
4274
  char    key[128] = {0};
×
4275
  int     keyLen = 0;
×
4276

4277
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
4278
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
4279
  if (!h) {
×
4280
    STsdb *pTsdb = pFD->pTsdb;
×
4281
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
4282

4283
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
4284
    if (!h) {
×
4285
      uint8_t *pBlock = NULL;
×
4286
      code = tsdbCacheLoadBlockSs(pFD, &pBlock);
×
4287
      //  if table's empty or error, return code of -1
4288
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
4289
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4290

4291
        *handle = NULL;
×
4292
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
4293
          code = TSDB_CODE_OUT_OF_MEMORY;
×
4294
        }
4295

4296
        TAOS_RETURN(code);
×
4297
      }
4298

4299
      size_t              charge = tsSsBlockSize * pFD->szPage;
×
4300
      _taos_lru_deleter_t deleter = deleteBCache;
×
4301
      LRUStatus           status =
4302
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
4303
      if (status != TAOS_LRU_STATUS_OK) {
4304
        // code = -1;
4305
      }
4306
    }
4307

4308
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4309
  }
4310

4311
  *handle = h;
×
4312

4313
  TAOS_RETURN(code);
×
4314
}
4315

4316
int32_t tsdbCacheGetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
1,991,424✔
4317
  if (!tsSsEnabled) {
1,991,424✔
4318
    return TSDB_CODE_OPS_NOT_SUPPORT;
×
4319
  }
4320

4321
  int32_t code = 0;
1,991,424✔
4322
  char    key[128] = {0};
1,991,424✔
4323
  int     keyLen = 0;
1,991,424✔
4324

4325
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
1,991,424✔
4326
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
1,991,424✔
4327

4328
  return code;
1,991,424✔
4329
}
4330

4331
void tsdbCacheSetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
9,823,488✔
4332
  if (!tsSsEnabled) {
9,823,488✔
4333
    return;
×
4334
  }
4335

4336
  char       key[128] = {0};
9,823,488✔
4337
  int        keyLen = 0;
9,823,488✔
4338
  LRUHandle *handle = NULL;
9,823,488✔
4339

4340
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
9,823,488✔
4341
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
9,823,488✔
4342
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
9,823,488✔
4343
  if (!handle) {
9,823,488✔
4344
    size_t              charge = pFD->szPage;
9,823,488✔
4345
    _taos_lru_deleter_t deleter = deleteBCache;
9,823,488✔
4346
    uint8_t            *pPg = taosMemoryMalloc(charge);
9,823,488✔
4347
    if (!pPg) {
9,823,488✔
4348
      (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4349

4350
      return;  // ignore error with ss cache and leave error untouched
×
4351
    }
4352
    memcpy(pPg, pPage, charge);
9,823,488✔
4353

4354
    LRUStatus status =
4355
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
9,823,488✔
4356
    if (status != TAOS_LRU_STATUS_OK) {
4357
      // ignore cache updating if not ok
4358
      // code = TSDB_CODE_OUT_OF_MEMORY;
4359
    }
4360
  }
4361
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
9,823,488✔
4362

4363
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
9,823,488✔
4364
}
4365
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc