• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4836

31 Oct 2025 03:37AM UTC coverage: 58.728% (+0.2%) from 58.506%
#4836

push

travis-ci

SallyHuo-TAOS
Merge remote-tracking branch 'origin/cover/3.0' into cover/3.0

# Conflicts:
#	test/ci/run.sh

149727 of 324176 branches covered (46.19%)

Branch coverage included in aggregate %.

198923 of 269498 relevant lines covered (73.81%)

238054213.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.0
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbIter.h"
19
#include "tsdbReadUtil.h"
20
#include "tss.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
86,614,482✔
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
86,614,482✔
27
    tsdbTrace(" release lru cache failed");
11,473,655✔
28
  }
29
}
86,616,229✔
30

31
#ifdef USE_SHARED_STORAGE
32

33
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
×
34
  int32_t code = 0, lino = 0;
×
35
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
×
36
  int64_t szBlock = tsSsBlockSize <= 1024 ? 1024 : tsSsBlockSize;
×
37

38
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsBlockCacheSize * szBlock * szPage, 0, .5);
×
39
  if (pCache == NULL) {
×
40
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
41
  }
42

43
  taosLRUCacheSetStrictCapacity(pCache, false);
×
44

45
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
×
46

47
  pTsdb->bCache = pCache;
×
48

49
_err:
×
50
  if (code) {
×
51
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
52
              tstrerror(code));
53
  }
54

55
  TAOS_RETURN(code);
×
56
}
57

58
static void tsdbCloseBCache(STsdb *pTsdb) {
×
59
  SLRUCache *pCache = pTsdb->bCache;
×
60
  if (pCache) {
×
61
    int32_t elems = taosLRUCacheGetElems(pCache);
×
62
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
×
63
    taosLRUCacheEraseUnrefEntries(pCache);
×
64
    elems = taosLRUCacheGetElems(pCache);
×
65
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
×
66

67
    taosLRUCacheCleanup(pCache);
×
68

69
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
×
70
  }
71
}
×
72

73
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
×
74
  int32_t code = 0, lino = 0;
×
75
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
×
76

77
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsPageCacheSize * szPage, 0, .5);
×
78
  if (pCache == NULL) {
×
79
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
80
  }
81

82
  taosLRUCacheSetStrictCapacity(pCache, false);
×
83

84
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
×
85

86
  pTsdb->pgCache = pCache;
×
87

88
_err:
×
89
  if (code) {
×
90
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
91
  }
92

93
  TAOS_RETURN(code);
×
94
}
95

96
static void tsdbClosePgCache(STsdb *pTsdb) {
×
97
  SLRUCache *pCache = pTsdb->pgCache;
×
98
  if (pCache) {
×
99
    int32_t elems = taosLRUCacheGetElems(pCache);
×
100
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
×
101
    taosLRUCacheEraseUnrefEntries(pCache);
×
102
    elems = taosLRUCacheGetElems(pCache);
×
103
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
×
104

105
    taosLRUCacheCleanup(pCache);
×
106

107
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
×
108
  }
109
}
×
110

111
#endif  // USE_SHARED_STORAGE
112

113
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
114

115
enum {
116
  LFLAG_LAST_ROW = 0,
117
  LFLAG_LAST = 1,
118
};
119

120
typedef struct {
121
  tb_uid_t uid;
122
  int16_t  cid;
123
  int8_t   lflag;
124
} SLastKey;
125

126
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
127
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
128

129
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
12,345,846✔
130
  SVnode *pVnode = pTsdb->pVnode;
12,345,846✔
131
  vnodeGetPrimaryPath(pVnode, false, path, TSDB_FILENAME_LEN);
12,346,628✔
132

133
  int32_t offset = strlen(path);
12,357,626!
134
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%scache.rdb", TD_DIRSEP, pTsdb->name, TD_DIRSEP);
12,357,626✔
135
}
12,355,874✔
136

137
static const char *myCmpName(void *state) {
65,266,602✔
138
  (void)state;
139
  return "myCmp";
65,266,602✔
140
}
141

142
static void myCmpDestroy(void *state) { (void)state; }
12,359,720✔
143

144
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
2,035,329,788✔
145
  (void)state;
146
  (void)alen;
147
  (void)blen;
148
  SLastKey *lhs = (SLastKey *)a;
2,035,329,788✔
149
  SLastKey *rhs = (SLastKey *)b;
2,035,329,788✔
150

151
  if (lhs->uid < rhs->uid) {
2,035,329,788✔
152
    return -1;
1,209,362,281✔
153
  } else if (lhs->uid > rhs->uid) {
826,111,763✔
154
    return 1;
303,410,845✔
155
  }
156

157
  if (lhs->cid < rhs->cid) {
522,707,449✔
158
    return -1;
201,965,524✔
159
  } else if (lhs->cid > rhs->cid) {
320,765,439✔
160
    return 1;
129,600,003✔
161
  }
162

163
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
191,171,842✔
164
    return -1;
74,477,025✔
165
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
116,693,516✔
166
    return 1;
110,421,042✔
167
  }
168

169
  return 0;
6,277,526✔
170
}
171

172
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
12,338,358✔
173
  int32_t code = 0, lino = 0;
12,338,358✔
174
#ifdef USE_ROCKSDB
175
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
12,360,576✔
176
  if (NULL == cmp) {
12,359,644!
177
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
178
  }
179

180
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
12,359,644✔
181
  pTsdb->rCache.tableoptions = tableoptions;
12,358,057✔
182

183
  rocksdb_options_t *options = rocksdb_options_create();
12,359,756✔
184
  if (NULL == options) {
12,355,678!
185
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
186
  }
187

188
  rocksdb_options_set_create_if_missing(options, 1);
12,355,678✔
189
  rocksdb_options_set_comparator(options, cmp);
12,352,707✔
190
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
12,353,825✔
191
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
12,357,400✔
192
  // rocksdb_options_set_inplace_update_support(options, 1);
193
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
194

195
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
12,358,155✔
196
  if (NULL == writeoptions) {
12,354,727!
197
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
198
  }
199
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
12,354,727✔
200

201
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
12,356,335✔
202
  if (NULL == readoptions) {
12,352,596!
203
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
204
  }
205

206
  char *err = NULL;
12,352,596✔
207
  char  cachePath[TSDB_FILENAME_LEN] = {0};
12,352,916✔
208
  tsdbGetRocksPath(pTsdb, cachePath);
12,354,219✔
209

210
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
12,359,388✔
211
  if (NULL == db) {
12,360,576!
212
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
213
    rocksdb_free(err);
×
214

215
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
216
  }
217

218
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
12,360,576✔
219
  if (NULL == flushoptions) {
12,360,576!
220
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
221
  }
222

223
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
12,360,576✔
224

225
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
12,358,994!
226

227
  pTsdb->rCache.writebatch = writebatch;
12,358,994✔
228
  pTsdb->rCache.my_comparator = cmp;
12,358,994✔
229
  pTsdb->rCache.options = options;
12,360,576✔
230
  pTsdb->rCache.writeoptions = writeoptions;
12,358,994✔
231
  pTsdb->rCache.readoptions = readoptions;
12,358,994✔
232
  pTsdb->rCache.flushoptions = flushoptions;
12,358,994✔
233
  pTsdb->rCache.db = db;
12,358,994✔
234
  pTsdb->rCache.sver = -1;
12,358,994✔
235
  pTsdb->rCache.suid = -1;
12,358,994✔
236
  pTsdb->rCache.uid = -1;
12,358,994✔
237
  pTsdb->rCache.pTSchema = NULL;
12,358,994✔
238
  pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
12,358,994✔
239
  if (!pTsdb->rCache.ctxArray) {
12,360,576!
240
    TAOS_CHECK_GOTO(terrno, &lino, _err7);
×
241
  }
242

243
  TAOS_RETURN(code);
12,360,576✔
244

245
_err7:
×
246
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
×
247
_err6:
×
248
  rocksdb_writebatch_destroy(writebatch);
×
249
_err5:
×
250
  rocksdb_close(pTsdb->rCache.db);
×
251
_err4:
×
252
  rocksdb_readoptions_destroy(readoptions);
×
253
_err3:
×
254
  rocksdb_writeoptions_destroy(writeoptions);
×
255
_err2:
×
256
  rocksdb_options_destroy(options);
×
257
  rocksdb_block_based_options_destroy(tableoptions);
×
258
_err:
×
259
  rocksdb_comparator_destroy(cmp);
×
260
#endif
261
  TAOS_RETURN(code);
×
262
}
263

264
static void tsdbCloseRocksCache(STsdb *pTsdb) {
12,360,576✔
265
#ifdef USE_ROCKSDB
266
  rocksdb_close(pTsdb->rCache.db);
12,360,576✔
267
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
12,360,576✔
268
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
12,360,576✔
269
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
12,360,576✔
270
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
12,359,291✔
271
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
12,359,291✔
272
  rocksdb_options_destroy(pTsdb->rCache.options);
12,358,904✔
273
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
12,359,331✔
274
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
12,359,720✔
275
  taosMemoryFree(pTsdb->rCache.pTSchema);
12,359,254!
276
  taosArrayDestroy(pTsdb->rCache.ctxArray);
12,358,241✔
277
#endif
278
}
12,359,254✔
279

280
static void rocksMayWrite(STsdb *pTsdb, bool force) {
100,695,744✔
281
#ifdef USE_ROCKSDB
282
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
100,695,744✔
283

284
  int count = rocksdb_writebatch_count(wb);
100,695,744✔
285
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
100,694,996✔
286
    char *err = NULL;
784,911✔
287

288
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
784,911✔
289
    if (NULL != err) {
785,659!
290
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
291
                err);
292
      rocksdb_free(err);
×
293
    }
294

295
    rocksdb_writebatch_clear(wb);
785,659✔
296
  }
297
#endif
298
}
100,695,744✔
299

300
typedef struct {
301
  TSKEY  ts;
302
  int8_t dirty;
303
  struct {
304
    int16_t cid;
305
    int8_t  type;
306
    int8_t  flag;
307
    union {
308
      int64_t val;
309
      struct {
310
        uint32_t nData;
311
        uint8_t *pData;
312
      };
313
    } value;
314
  } colVal;
315
} SLastColV0;
316

317
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
673,050✔
318
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
673,050✔
319

320
  pLastCol->rowKey.ts = pLastColV0->ts;
673,050✔
321
  pLastCol->rowKey.numOfPKs = 0;
673,050✔
322
  pLastCol->dirty = pLastColV0->dirty;
673,050✔
323
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
673,050✔
324
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
673,050✔
325
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
673,050✔
326

327
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
673,050✔
328

329
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
673,050!
330
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
36,786✔
331
    pLastCol->colVal.value.pData = NULL;
36,786✔
332
    if (pLastCol->colVal.value.nData > 0) {
36,786✔
333
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
13,660✔
334
    }
335
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
36,786✔
336
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
636,264✔
337
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
61,240✔
338
    pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
61,240✔
339
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
61,240✔
340
  } else {
341
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
575,024✔
342
    return sizeof(SLastColV0);
575,024✔
343
  }
344
}
345

346
static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
673,050✔
347
  if (!value) {
673,050!
348
    return TSDB_CODE_INVALID_PARA;
×
349
  }
350

351
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
673,050!
352
  if (NULL == pLastCol) {
673,050!
353
    return terrno;
×
354
  }
355

356
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
673,050✔
357
  if (offset == size) {
673,050!
358
    // version 0
359
    *ppLastCol = pLastCol;
×
360

361
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
362
  } else if (offset > size) {
673,050!
363
    taosMemoryFreeClear(pLastCol);
×
364

365
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
366
  }
367

368
  // version
369
  int8_t version = *(int8_t *)(value + offset);
673,050✔
370
  offset += sizeof(int8_t);
673,050✔
371

372
  // numOfPKs
373
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
673,050✔
374
  offset += sizeof(uint8_t);
673,050✔
375

376
  // pks
377
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
673,050!
378
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
×
379
    offset += sizeof(SValue);
×
380

381
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
382
      pLastCol->rowKey.pks[i].pData = NULL;
×
383
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
384
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
385
        offset += pLastCol->rowKey.pks[i].nData;
×
386
      }
387
    }
388
  }
389

390
  if (version >= LAST_COL_VERSION_2) {
673,050!
391
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
673,050✔
392
  }
393

394
  if (offset > size) {
673,050!
395
    taosMemoryFreeClear(pLastCol);
×
396

397
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
398
  }
399

400
  *ppLastCol = pLastCol;
673,050✔
401

402
  TAOS_RETURN(TSDB_CODE_SUCCESS);
673,050✔
403
}
404

405
/*
406
typedef struct {
407
  SLastColV0 lastColV0;
408
  char       colData[];
409
  int8_t     version;
410
  uint8_t    numOfPKs;
411
  SValue     pks[0];
412
  char       pk0Data[];
413
  SValue     pks[1];
414
  char       pk1Data[];
415
  ...
416
} SLastColDisk;
417
*/
418
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
93,784,836✔
419
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
93,784,836✔
420

421
  pLastColV0->ts = pLastCol->rowKey.ts;
93,784,836✔
422
  pLastColV0->dirty = pLastCol->dirty;
93,785,584✔
423
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
93,784,110✔
424
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
93,783,362✔
425
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
93,783,362✔
426
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
93,783,905!
427
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
3,965,416✔
428
    if (pLastCol->colVal.value.nData > 0) {
3,967,140✔
429
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
1,830,685!
430
    }
431
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
3,967,140✔
432
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
89,818,239✔
433
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
1,086,154✔
434
    if (pLastCol->colVal.value.nData > 0) {
1,086,154✔
435
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
625,994!
436
    }
437
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
1,086,154✔
438
  } else {
439
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
88,731,337✔
440
    return sizeof(SLastColV0);
88,730,611✔
441
  }
442

443
  return 0;
444
}
445

446
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
93,785,379✔
447
  *size = sizeof(SLastColV0);
93,785,379✔
448
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
93,785,379!
449
    *size += pLastCol->colVal.value.nData;
3,964,896✔
450
  }
451
  if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
93,785,379✔
452
    *size += DECIMAL128_BYTES;
1,086,154✔
453
  }
454
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
93,785,379✔
455

456
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
105,615,129✔
457
    *size += sizeof(SValue);
11,830,296✔
458
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
11,829,753!
459
      *size += pLastCol->rowKey.pks[i].nData;
3,973,546✔
460
    }
461
  }
462

463
  *value = taosMemoryMalloc(*size);
93,784,833!
464
  if (NULL == *value) {
93,785,584!
465
    TAOS_RETURN(terrno);
×
466
  }
467

468
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
93,785,584✔
469

470
  // version
471
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
93,782,431✔
472
  offset++;
93,782,431✔
473

474
  // numOfPKs
475
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
93,782,431✔
476
  offset++;
93,781,885✔
477

478
  // pks
479
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
105,612,181✔
480
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
11,829,750✔
481
    offset += sizeof(SValue);
11,829,750✔
482
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
11,829,750!
483
      if (pLastCol->rowKey.pks[i].nData > 0) {
3,973,000!
484
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
3,973,000!
485
      }
486
      offset += pLastCol->rowKey.pks[i].nData;
3,973,000✔
487
    }
488
  }
489

490
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
93,782,431✔
491

492
  TAOS_RETURN(TSDB_CODE_SUCCESS);
93,783,179✔
493
}
494

495
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
496

497
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
105,076,315✔
498
  SLastCol *pLastCol = (SLastCol *)value;
105,076,315✔
499

500
  if (pLastCol->dirty) {
105,076,315✔
501
    STsdb *pTsdb = (STsdb *)ud;
79,807,803✔
502

503
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
79,807,803✔
504
    if (code) {
79,811,387!
505
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
506
      return code;
×
507
    }
508

509
    pLastCol->dirty = 0;
79,811,387✔
510

511
    rocksMayWrite(pTsdb, false);
79,811,387✔
512
  }
513

514
  return 0;
105,081,487✔
515
}
516

517
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
832,676,540✔
518
  bool deleted = false;
832,676,540✔
519
  while (*iSkyline > 0) {
832,676,540✔
520
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
17,482✔
521
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
88,799✔
522

523
    if (key->ts > pItemBack->ts) {
88,799✔
524
      return false;
20,224✔
525
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
68,575!
526
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
26,227!
527
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
528
        return true;
26,227✔
529
      } else {
530
        if (*iSkyline > 1) {
×
531
          --*iSkyline;
×
532
        } else {
533
          return false;
×
534
        }
535
      }
536
    } else {
537
      if (*iSkyline > 1) {
42,348!
538
        --*iSkyline;
×
539
      } else {
540
        return false;
42,348✔
541
      }
542
    }
543
  }
544

545
  return deleted;
834,061,573✔
546
}
547

548
// Get next non-deleted row from imem
549
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
235,157,309✔
550
  int32_t code = 0;
235,157,309✔
551

552
  if (tsdbTbDataIterNext(pTbIter)) {
235,157,309✔
553
    TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
234,755,729✔
554
    TSDBKEY  rowKey = TSDBROW_KEY(pMemRow);
234,755,729✔
555
    bool     deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
234,754,029✔
556
    if (!deleted) {
234,754,231!
557
      return pMemRow;
234,754,231✔
558
    }
559
  }
560

561
  return NULL;
398,522✔
562
}
563

564
// Get first non-deleted row from imem
565
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
12,075,993✔
566
                                    int64_t *piSkyline) {
567
  int32_t code = 0;
12,075,993✔
568

569
  tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
12,075,993✔
570
  TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
12,075,993✔
571
  if (pMemRow) {
12,075,993!
572
    // if non deleted, return the found row.
573
    TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
12,075,993✔
574
    bool    deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
12,075,993✔
575
    if (!deleted) {
12,075,993✔
576
      return pMemRow;
12,067,833✔
577
    }
578
  } else {
579
    return NULL;
×
580
  }
581

582
  // continue to find the non-deleted first row from imem, using get next row
583
  return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
8,160✔
584
}
585

586
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
217,558✔
587
  SRocksCache *pRCache = &pTsdb->rCache;
217,558✔
588
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
217,558!
589

590
  if (suid > 0 && suid == pRCache->suid) {
14,740!
591
    pRCache->sver = -1;
×
592
    pRCache->suid = -1;
×
593
  }
594
  if (suid == 0 && uid == pRCache->uid) {
14,740!
595
    pRCache->sver = -1;
1,474✔
596
    pRCache->uid = -1;
1,474✔
597
  }
598
}
599

600
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
235,148,741✔
601
  SRocksCache *pRCache = &pTsdb->rCache;
235,148,741✔
602
  if (pRCache->pTSchema && sver == pRCache->sver) {
235,148,741✔
603
    if (suid > 0 && suid == pRCache->suid) {
235,010,021✔
604
      return 0;
232,742,371✔
605
    }
606
    if (suid == 0 && uid == pRCache->uid) {
2,267,650✔
607
      return 0;
1,786,116✔
608
    }
609
  }
610

611
  pRCache->suid = suid;
621,002✔
612
  pRCache->uid = uid;
623,789✔
613
  pRCache->sver = sver;
623,789✔
614
  tDestroyTSchema(pRCache->pTSchema);
623,789!
615
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema);
623,789✔
616
}
617

618
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
619

620
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
12,075,993✔
621
  int32_t      code = 0;
12,075,993✔
622
  int32_t      lino = 0;
12,075,993✔
623
  STsdb       *pTsdb = imem->pTsdb;
12,075,993✔
624
  SArray      *pMemDelData = NULL;
12,075,993✔
625
  SArray      *pSkyline = NULL;
12,075,993✔
626
  int64_t      iSkyline = 0;
12,075,993✔
627
  STbDataIter  tbIter = {0};
12,075,993✔
628
  TSDBROW     *pMemRow = NULL;
12,075,993✔
629
  STSchema    *pTSchema = NULL;
12,075,993✔
630
  SSHashObj   *iColHash = NULL;
12,075,993✔
631
  int32_t      sver;
632
  int32_t      nCol;
633
  SArray      *ctxArray = pTsdb->rCache.ctxArray;
12,075,993✔
634
  STsdbRowKey  tsdbRowKey = {0};
12,075,993✔
635
  STSDBRowIter iter = {0};
12,075,993✔
636

637
  STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
12,075,993✔
638

639
  // load imem tomb data and build skyline
640
  TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
12,075,993!
641

642
  // tsdbBuildDeleteSkyline
643
  size_t delSize = TARRAY_SIZE(pMemDelData);
12,075,993✔
644
  if (delSize > 0) {
12,075,993✔
645
    pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
11,364✔
646
    if (!pSkyline) {
11,364!
647
      TAOS_CHECK_EXIT(terrno);
×
648
    }
649

650
    TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline));
11,364!
651
    iSkyline = taosArrayGetSize(pSkyline) - 1;
11,364✔
652
  }
653

654
  pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
12,075,993✔
655
  if (!pMemRow) {
12,075,993!
656
    goto _exit;
×
657
  }
658

659
  // iter first row to last_row/last col values to ctxArray, and mark last null col ids
660
  sver = TSDBROW_SVERSION(pMemRow);
12,075,993✔
661
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
12,075,993!
662
  pTSchema = pTsdb->rCache.pTSchema;
12,075,993✔
663
  nCol = pTSchema->numOfCols;
12,075,993✔
664

665
  tsdbRowGetKey(pMemRow, &tsdbRowKey);
12,075,993✔
666

667
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
12,075,993!
668

669
  int32_t iCol = 0;
12,075,993✔
670
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
57,837,890!
671
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
45,761,897✔
672
    if (!taosArrayPush(ctxArray, &updateCtx)) {
45,761,897!
673
      TAOS_CHECK_EXIT(terrno);
×
674
    }
675

676
    if (COL_VAL_IS_VALUE(pColVal)) {
45,761,897✔
677
      updateCtx.lflag = LFLAG_LAST;
37,521,078✔
678
      if (!taosArrayPush(ctxArray, &updateCtx)) {
37,521,078!
679
        TAOS_CHECK_EXIT(terrno);
×
680
      }
681
    } else {
682
      if (!iColHash) {
8,240,819✔
683
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
438,103✔
684
        if (iColHash == NULL) {
438,103!
685
          TAOS_CHECK_EXIT(terrno);
×
686
        }
687
      }
688

689
      if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
8,240,819!
690
        TAOS_CHECK_EXIT(terrno);
×
691
      }
692
    }
693
  }
694
  tsdbRowClose(&iter);
12,075,993✔
695

696
  // continue to get next row to fill null last col values
697
  pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
12,075,993✔
698
  while (pMemRow) {
235,143,845✔
699
    if (tSimpleHashGetSize(iColHash) == 0) {
234,745,323✔
700
      break;
11,677,471✔
701
    }
702

703
    sver = TSDBROW_SVERSION(pMemRow);
223,065,608!
704
    TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
223,069,349!
705
    pTSchema = pTsdb->rCache.pTSchema;
223,076,283✔
706

707
    STsdbRowKey tsdbRowKey = {0};
223,076,283✔
708
    tsdbRowGetKey(pMemRow, &tsdbRowKey);
223,076,283✔
709

710
    TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
223,071,319!
711

712
    int32_t iCol = 0;
223,070,700✔
713
    for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
2,147,483,647!
714
      if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
2,147,483,647✔
715
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
4,882,342✔
716
        if (!taosArrayPush(ctxArray, &updateCtx)) {
4,882,342!
717
          TAOS_CHECK_EXIT(terrno);
×
718
        }
719

720
        TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
4,882,342!
721
      }
722
    }
723
    tsdbRowClose(&iter);
222,937,669✔
724

725
    pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
223,072,203✔
726
  }
727

728
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
12,075,993!
729

730
_exit:
12,075,993✔
731
  if (code) {
12,075,993!
732
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
733

734
    tsdbRowClose(&iter);
×
735
  }
736

737
  taosArrayClear(ctxArray);
12,075,993✔
738
  // destroy any allocated resource
739
  tSimpleHashCleanup(iColHash);
12,075,993✔
740
  if (pMemDelData) {
12,075,993!
741
    taosArrayDestroy(pMemDelData);
12,075,993✔
742
  }
743
  if (pSkyline) {
12,075,993✔
744
    taosArrayDestroy(pSkyline);
11,364✔
745
  }
746

747
  TAOS_RETURN(code);
12,075,993✔
748
}
749

750
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
223,789✔
751
  if (!pTsdb) return 0;
223,789!
752
  if (!pTsdb->imem) return 0;
223,789✔
753

754
  int32_t    code = 0;
176,821✔
755
  int32_t    lino = 0;
176,821✔
756
  SMemTable *imem = pTsdb->imem;
176,821✔
757
  int32_t    nTbData = imem->nTbData;
176,821✔
758
  int64_t    nRow = imem->nRow;
176,821✔
759
  int64_t    nDel = imem->nDel;
176,821✔
760

761
  if (nRow == 0 || nTbData == 0) return 0;
176,821!
762

763
  TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
176,821!
764

765
_exit:
176,821✔
766
  if (code) {
176,821!
767
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
768
  } else {
769
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
176,821!
770
  }
771

772
  TAOS_RETURN(code);
176,821✔
773
}
774

775
int32_t tsdbCacheCommit(STsdb *pTsdb) {
223,789✔
776
  int32_t code = 0;
223,789✔
777

778
  // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
779
  // flush dirty data of lru into rocks
780
  // 4, and update when writing if !updateCacheBatch
781
  // 5, merge cache & mem if updateCacheBatch
782

783
  if (tsUpdateCacheBatch) {
223,789!
784
    code = tsdbCacheUpdateFromIMem(pTsdb);
223,789✔
785
    if (code) {
223,789!
786
      tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
787

788
      TAOS_RETURN(code);
×
789
    }
790
  }
791

792
  char      *err = NULL;
223,789✔
793
  SLRUCache *pCache = pTsdb->lruCache;
223,789✔
794
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
795

796
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
223,789✔
797

798
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
223,789✔
799

800
#ifdef USE_ROCKSDB
801
  rocksMayWrite(pTsdb, true);
223,789✔
802
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
223,789✔
803
#endif
804
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
223,789✔
805
#ifdef USE_ROCKSDB
806
  if (NULL != err) {
223,789!
807
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
808
    rocksdb_free(err);
×
809
    code = TSDB_CODE_FAILED;
×
810
  }
811
#endif
812
  TAOS_RETURN(code);
223,789✔
813
}
814

815
static int32_t reallocVarDataVal(SValue *pValue) {
10,532,176✔
816
  if (IS_VAR_DATA_TYPE(pValue->type)) {
10,532,176!
817
    uint8_t *pVal = pValue->pData;
10,532,176✔
818
    uint32_t nData = pValue->nData;
10,531,633✔
819
    if (nData > 0) {
10,532,176✔
820
      uint8_t *p = taosMemoryMalloc(nData);
7,250,861!
821
      if (!p) {
7,250,861!
822
        TAOS_RETURN(terrno);
×
823
      }
824
      pValue->pData = p;
7,250,861✔
825
      (void)memcpy(pValue->pData, pVal, nData);
7,250,861!
826
    } else {
827
      pValue->pData = NULL;
3,281,315✔
828
    }
829
  }
830

831
  TAOS_RETURN(TSDB_CODE_SUCCESS);
10,532,176✔
832
}
833

834
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
6,005,404✔
835

836
// realloc pk data and col data.
837
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
184,184,298✔
838
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
184,184,298✔
839
  size_t  charge = sizeof(SLastCol);
184,184,298✔
840

841
  int8_t i = 0;
184,184,298✔
842
  for (; i < pCol->rowKey.numOfPKs; i++) {
197,868,742✔
843
    SValue *pValue = &pCol->rowKey.pks[i];
13,684,444✔
844
    if (IS_VAR_DATA_TYPE(pValue->type)) {
13,684,444!
845
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
4,526,772!
846
      charge += pValue->nData;
4,526,772✔
847
    }
848
  }
849

850
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
184,182,194!
851
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
6,010,870!
852
    charge += pCol->colVal.value.nData;
6,005,404✔
853
  }
854

855
  if (pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
184,188,770✔
856
    if (pCol->colVal.value.nData > 0) {
1,551,596✔
857
      void *p = taosMemoryMalloc(pCol->colVal.value.nData);
750,534!
858
      if (!p) TAOS_CHECK_EXIT(terrno);
750,534!
859
      (void)memcpy(p, pCol->colVal.value.pData, pCol->colVal.value.nData);
750,534!
860
      pCol->colVal.value.pData = p;
750,534✔
861
    }
862
    charge += pCol->colVal.value.nData;
1,551,596✔
863
  }
864

865
  if (pCharge) {
184,094,182✔
866
    *pCharge = charge;
165,238,439✔
867
  }
868

869
_exit:
18,855,743✔
870
  if (TSDB_CODE_SUCCESS != code) {
184,186,737!
871
    for (int8_t j = 0; j < i; j++) {
×
872
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
873
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
874
      }
875
    }
876

877
    (void)memset(pCol, 0, sizeof(SLastCol));
×
878
  }
879

880
  TAOS_RETURN(code);
184,186,737✔
881
}
882

883
void tsdbCacheFreeSLastColItem(void *pItem) {
21,396,308✔
884
  SLastCol *pCol = (SLastCol *)pItem;
21,396,308✔
885
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
23,736,791✔
886
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
2,341,026!
887
      taosMemoryFree(pCol->rowKey.pks[i].pData);
685,252!
888
    }
889
  }
890

891
  if ((IS_VAR_DATA_TYPE(pCol->colVal.value.type) || pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) &&
21,396,128!
892
      pCol->colVal.value.pData) {
1,902,348✔
893
    taosMemoryFree(pCol->colVal.value.pData);
1,225,408!
894
  }
895
}
21,399,697✔
896

897
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
165,336,783✔
898
  SLastCol *pLastCol = (SLastCol *)value;
165,336,783✔
899

900
  if (pLastCol->dirty) {
165,336,783✔
901
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
2,103,292!
902
      STsdb *pTsdb = (STsdb *)ud;
×
903
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
904
    }
905
  }
906

907
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
177,167,906✔
908
    SValue *pValue = &pLastCol->rowKey.pks[i];
11,830,296✔
909
    if (IS_VAR_DATA_TYPE(pValue->type)) {
11,830,296!
910
      taosMemoryFree(pValue->pData);
3,973,000!
911
    }
912
  }
913

914
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) ||
165,337,610!
915
      pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL /* && pLastCol->colVal.value.nData > 0*/) {
160,865,044✔
916
    taosMemoryFree(pLastCol->colVal.value.pData);
5,876,714!
917
  }
918

919
  taosMemoryFree(value);
165,337,610!
920
}
165,337,610✔
921

922
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
74,616,437✔
923
  SLastCol *pLastCol = (SLastCol *)value;
74,616,437✔
924
  pLastCol->dirty = 0;
74,616,437✔
925
}
74,616,437✔
926

927
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
928

929
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
76,673,547✔
930
  int32_t code = 0, lino = 0;
76,673,547✔
931

932
  SLRUCache *pCache = pTsdb->lruCache;
76,673,547✔
933
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
934
  SRowKey  emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
76,677,980✔
935
  SLastCol emptyCol = {
76,677,980✔
936
      .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
937

938
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
76,677,980✔
939
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
76,677,928✔
940
  if (code) {
76,682,315!
941
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
942
  }
943

944
  TAOS_RETURN(code);
76,682,315✔
945
}
946

947
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
2,797,055✔
948
  int32_t code = 0;
2,797,055✔
949
  char   *err = NULL;
2,797,055✔
950

951
  SLRUCache *pCache = pTsdb->lruCache;
2,810,811✔
952
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
953

954
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
2,809,521✔
955
#ifdef USE_ROCKSDB
956
  rocksMayWrite(pTsdb, true);
2,814,031✔
957
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
2,814,031✔
958
  if (NULL != err) {
2,812,510!
959
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
960
    rocksdb_free(err);
×
961
    code = TSDB_CODE_FAILED;
×
962
  }
963
#endif
964
  TAOS_RETURN(code);
2,812,510✔
965
}
966

967
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
23,524,291✔
968
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
969
#ifdef USE_ROCKSDB
970
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
23,524,291!
971
  if (!valuesList) return terrno;
23,521,366!
972
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
23,521,366!
973
  if (!valuesListSizes) {
23,522,228!
974
    taosMemoryFreeClear(valuesList);
×
975
    return terrno;
×
976
  }
977
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
23,522,228!
978
  if (!errs) {
23,524,664!
979
    taosMemoryFreeClear(valuesList);
×
980
    taosMemoryFreeClear(valuesListSizes);
×
981
    return terrno;
×
982
  }
983
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
23,524,664✔
984
                    valuesListSizes, errs);
985
  for (size_t i = 0; i < numKeys; ++i) {
141,638,164✔
986
    rocksdb_free(errs[i]);
118,115,555✔
987
  }
988
  taosMemoryFreeClear(errs);
23,522,609!
989

990
  *pppValuesList = valuesList;
23,527,074✔
991
  *ppValuesListSizes = valuesListSizes;
23,524,226✔
992
#endif
993
  TAOS_RETURN(TSDB_CODE_SUCCESS);
23,525,784✔
994
}
995

996
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
15,982,198✔
997
  int32_t code = 0;
15,982,198✔
998

999
  // build keys & multi get from rocks
1000
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
15,982,198!
1001
  if (!keys_list) {
15,977,635!
1002
    return terrno;
×
1003
  }
1004
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
15,977,635!
1005
  if (!keys_list_sizes) {
15,978,285!
1006
    taosMemoryFree(keys_list);
×
1007
    return terrno;
×
1008
  }
1009
  const size_t klen = ROCKS_KEY_LEN;
15,978,285✔
1010

1011
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
15,978,285!
1012
  if (!keys) {
15,974,644!
1013
    taosMemoryFree(keys_list);
×
1014
    taosMemoryFree(keys_list_sizes);
×
1015
    return terrno;
×
1016
  }
1017
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
15,974,644✔
1018
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
15,973,126✔
1019

1020
  keys_list[0] = keys;
15,974,653✔
1021
  keys_list[1] = keys + sizeof(SLastKey);
15,974,653✔
1022
  keys_list_sizes[0] = klen;
15,977,723✔
1023
  keys_list_sizes[1] = klen;
15,977,038✔
1024

1025
  char  **values_list = NULL;
15,981,626✔
1026
  size_t *values_list_sizes = NULL;
15,980,105✔
1027

1028
  // was written by caller
1029
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
1030

1031
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
15,980,967!
1032
                                              &values_list_sizes),
1033
                  NULL, _exit);
1034
#ifdef USE_ROCKSDB
1035
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
15,976,640✔
1036
#endif
1037
  {
1038
#ifdef USE_ROCKSDB
1039
    SLastCol *pLastCol = NULL;
15,978,189✔
1040
    if (values_list[0] != NULL) {
15,979,710✔
1041
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
285,300✔
1042
      if (code != TSDB_CODE_SUCCESS) {
285,300!
1043
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1044
                  tstrerror(code));
1045
        goto _exit;
×
1046
      }
1047
      if (NULL != pLastCol) {
285,300!
1048
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
285,300✔
1049
      }
1050
      taosMemoryFreeClear(pLastCol);
285,300!
1051
    }
1052

1053
    pLastCol = NULL;
15,975,119✔
1054
    if (values_list[1] != NULL) {
15,975,119✔
1055
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
285,300✔
1056
      if (code != TSDB_CODE_SUCCESS) {
285,300!
1057
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1058
                  tstrerror(code));
1059
        goto _exit;
×
1060
      }
1061
      if (NULL != pLastCol) {
285,300!
1062
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
285,300✔
1063
      }
1064
      taosMemoryFreeClear(pLastCol);
285,300!
1065
    }
1066

1067
    rocksdb_free(values_list[0]);
15,975,245✔
1068
    rocksdb_free(values_list[1]);
15,977,370✔
1069
#endif
1070

1071
    for (int i = 0; i < 2; i++) {
47,932,978✔
1072
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
31,951,051✔
1073
      if (h) {
31,965,445✔
1074
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
570,600✔
1075
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
570,600✔
1076
      }
1077
    }
1078
  }
1079

1080
_exit:
15,983,476✔
1081
  taosMemoryFree(keys_list[0]);
15,985,006!
1082

1083
  taosMemoryFree(keys_list);
15,983,476!
1084
  taosMemoryFree(keys_list_sizes);
15,979,190!
1085
  taosMemoryFree(values_list);
15,972,716!
1086
  taosMemoryFree(values_list_sizes);
15,972,100!
1087

1088
  TAOS_RETURN(code);
15,975,224✔
1089
}
1090

1091
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
11,919,354✔
1092
  int32_t code = 0;
11,919,354✔
1093

1094
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
11,919,354✔
1095

1096
  if (suid < 0) {
11,920,479✔
1097
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
317,929✔
1098
      int16_t cid = pSchemaRow->pSchema[i].colId;
275,017✔
1099
      int8_t  col_type = pSchemaRow->pSchema[i].type;
275,017✔
1100

1101
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
275,017✔
1102
      if (code != TSDB_CODE_SUCCESS) {
275,017!
1103
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1104
                  tstrerror(code));
1105
      }
1106
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
275,017✔
1107
      if (code != TSDB_CODE_SUCCESS) {
275,017!
1108
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1109
                  tstrerror(code));
1110
      }
1111
    }
1112
  } else {
1113
    STSchema *pTSchema = NULL;
11,877,567✔
1114
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
11,877,567✔
1115
    if (code != TSDB_CODE_SUCCESS) {
11,875,913!
1116
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1117

1118
      TAOS_RETURN(code);
×
1119
    }
1120

1121
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
49,657,934✔
1122
      int16_t cid = pTSchema->columns[i].colId;
37,781,194✔
1123
      int8_t  col_type = pTSchema->columns[i].type;
37,781,194✔
1124

1125
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
37,781,194✔
1126
      if (code != TSDB_CODE_SUCCESS) {
37,777,059!
1127
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1128
                  tstrerror(code));
1129
      }
1130
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
37,777,059✔
1131
      if (code != TSDB_CODE_SUCCESS) {
37,782,021!
1132
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1133
                  tstrerror(code));
1134
      }
1135
    }
1136

1137
    taosMemoryFree(pTSchema);
11,875,913!
1138
  }
1139

1140
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
11,922,960✔
1141

1142
  TAOS_RETURN(code);
11,922,960✔
1143
}
1144

1145
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
59,088✔
1146
  int32_t code = 0;
59,088✔
1147

1148
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
59,088✔
1149

1150
  code = tsdbCacheCommitNoLock(pTsdb);
59,088✔
1151
  if (code != TSDB_CODE_SUCCESS) {
59,088!
1152
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1153
              tstrerror(code));
1154
  }
1155

1156
  if (pSchemaRow != NULL) {
59,088!
1157
    bool hasPrimayKey = false;
×
1158
    int  nCols = pSchemaRow->nCols;
×
1159
    if (nCols >= 2) {
×
1160
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
1161
    }
1162
    for (int i = 0; i < nCols; ++i) {
×
1163
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
1164
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1165

1166
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1167
      if (code != TSDB_CODE_SUCCESS) {
×
1168
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1169
                  tstrerror(code));
1170
      }
1171
    }
1172
  } else {
1173
    STSchema *pTSchema = NULL;
59,088✔
1174
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
59,088✔
1175
    if (code != TSDB_CODE_SUCCESS) {
59,088!
1176
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1177

1178
      TAOS_RETURN(code);
×
1179
    }
1180

1181
    bool hasPrimayKey = false;
59,088✔
1182
    int  nCols = pTSchema->numOfCols;
59,088✔
1183
    if (nCols >= 2) {
59,088!
1184
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
59,088✔
1185
    }
1186
    for (int i = 0; i < nCols; ++i) {
201,708✔
1187
      int16_t cid = pTSchema->columns[i].colId;
142,620✔
1188
      int8_t  col_type = pTSchema->columns[i].type;
142,620✔
1189

1190
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
142,620✔
1191
      if (code != TSDB_CODE_SUCCESS) {
142,620!
1192
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1193
                  tstrerror(code));
1194
      }
1195
    }
1196

1197
    taosMemoryFree(pTSchema);
59,088!
1198
  }
1199

1200
  rocksMayWrite(pTsdb, false);
59,088✔
1201

1202
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
59,088✔
1203

1204
  TAOS_RETURN(code);
59,088✔
1205
}
1206

1207
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
2,700,144✔
1208
  int32_t code = 0;
2,700,144✔
1209

1210
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
2,700,144✔
1211

1212
  code = tsdbCacheCommitNoLock(pTsdb);
2,708,629✔
1213
  if (code != TSDB_CODE_SUCCESS) {
2,710,119!
1214
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1215
              tstrerror(code));
1216
  }
1217

1218
  STSchema *pTSchema = NULL;
2,710,119✔
1219
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
2,708,629✔
1220
  if (code != TSDB_CODE_SUCCESS) {
2,705,587✔
1221
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
6,096✔
1222

1223
    TAOS_RETURN(code);
6,096✔
1224
  }
1225

1226
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
4,695,693✔
1227
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
1,991,611✔
1228

1229
    bool hasPrimayKey = false;
1,993,132✔
1230
    int  nCols = pTSchema->numOfCols;
1,993,132✔
1231
    if (nCols >= 2) {
1,993,132!
1232
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
1,993,132✔
1233
    }
1234

1235
    for (int i = 0; i < nCols; ++i) {
17,684,658✔
1236
      int16_t cid = pTSchema->columns[i].colId;
15,688,456✔
1237
      int8_t  col_type = pTSchema->columns[i].type;
15,685,383✔
1238

1239
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
15,689,983✔
1240
      if (code != TSDB_CODE_SUCCESS) {
15,690,003!
1241
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1242
                  tstrerror(code));
1243
      }
1244
    }
1245
  }
1246

1247
  taosMemoryFree(pTSchema);
2,701,012!
1248

1249
  rocksMayWrite(pTsdb, false);
2,702,493✔
1250

1251
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
2,704,023✔
1252

1253
  TAOS_RETURN(code);
2,704,023✔
1254
}
1255

1256
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
×
1257
  int32_t code = 0;
×
1258

1259
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1260

1261
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
1262
  if (code != TSDB_CODE_SUCCESS) {
×
1263
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1264
              tstrerror(code));
1265
  }
1266
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
1267
  if (code != TSDB_CODE_SUCCESS) {
×
1268
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1269
              tstrerror(code));
1270
  }
1271
  // rocksMayWrite(pTsdb, true, false, false);
1272
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1273

1274
  TAOS_RETURN(code);
×
1275
}
1276

1277
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
×
1278
  int32_t code = 0;
×
1279

1280
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1281

1282
  code = tsdbCacheCommitNoLock(pTsdb);
×
1283
  if (code != TSDB_CODE_SUCCESS) {
×
1284
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1285
              tstrerror(code));
1286
  }
1287

1288
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1289
  if (code != TSDB_CODE_SUCCESS) {
×
1290
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1291
              tstrerror(code));
1292
  }
1293

1294
  rocksMayWrite(pTsdb, false);
×
1295

1296
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1297

1298
  TAOS_RETURN(code);
×
1299
}
1300

1301
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
89,648✔
1302
  int32_t code = 0;
89,648✔
1303

1304
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
89,648✔
1305

1306
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
375,008✔
1307
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
285,360✔
1308

1309
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
285,360✔
1310
    if (code != TSDB_CODE_SUCCESS) {
285,360!
1311
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1312
                tstrerror(code));
1313
    }
1314
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
285,360✔
1315
    if (code != TSDB_CODE_SUCCESS) {
285,360!
1316
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1317
                tstrerror(code));
1318
    }
1319
  }
1320

1321
  // rocksMayWrite(pTsdb, true, false, false);
1322
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
89,648✔
1323
  TAOS_RETURN(code);
89,648✔
1324
}
1325

1326
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
44,076✔
1327
  int32_t code = 0;
44,076✔
1328

1329
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
44,076✔
1330

1331
  code = tsdbCacheCommitNoLock(pTsdb);
44,824✔
1332
  if (code != TSDB_CODE_SUCCESS) {
44,824!
1333
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1334
              tstrerror(code));
1335
  }
1336

1337
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
187,504✔
1338
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
142,680✔
1339

1340
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
142,680✔
1341
    if (code != TSDB_CODE_SUCCESS) {
142,680!
1342
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1343
                tstrerror(code));
1344
    }
1345
  }
1346

1347
  rocksMayWrite(pTsdb, false);
44,824✔
1348

1349
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
44,824✔
1350

1351
  TAOS_RETURN(code);
44,824✔
1352
}
1353

1354
typedef struct {
1355
  int      idx;
1356
  SLastKey key;
1357
} SIdxKey;
1358

1359
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
×
1360
  // update rowkey
1361
  pLastCol->rowKey.ts = TSKEY_MIN;
×
1362
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
1363
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
1364
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
1365
      taosMemoryFreeClear(pPKValue->pData);
×
1366
      pPKValue->nData = 0;
×
1367
    } else {
1368
      valueClearDatum(pPKValue, pPKValue->type);
×
1369
    }
1370
  }
1371
  pLastCol->rowKey.numOfPKs = 0;
×
1372

1373
  // update colval
1374
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
×
1375
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
1376
    pLastCol->colVal.value.nData = 0;
×
1377
  } else {
1378
    valueClearDatum(&pLastCol->colVal.value, pLastCol->colVal.value.type);
×
1379
  }
1380

1381
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
×
1382
  pLastCol->dirty = 1;
×
1383
  pLastCol->cacheStatus = cacheStatus;
×
1384
}
×
1385

1386
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
93,783,383✔
1387
  int32_t code = 0;
93,783,383✔
1388
#ifdef USE_ROCKSDB
1389
  char  *rocks_value = NULL;
93,783,383✔
1390
  size_t vlen = 0;
93,785,379✔
1391

1392
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
93,785,379✔
1393
  if (code) {
93,781,683!
1394
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
1395
    TAOS_RETURN(code);
×
1396
  }
1397

1398
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
93,781,683✔
1399
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
93,782,431✔
1400
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
93,781,954✔
1401
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
93,785,379✔
1402

1403
  taosMemoryFree(rocks_value);
93,784,381!
1404
#endif
1405
  TAOS_RETURN(code);
93,784,836✔
1406
}
1407

1408
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
165,326,176✔
1409
  int32_t code = 0, lino = 0;
165,326,176✔
1410

1411
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
165,326,176!
1412
  if (!pLRULastCol) {
165,305,573!
1413
    return terrno;
×
1414
  }
1415

1416
  size_t charge = 0;
165,305,573✔
1417
  *pLRULastCol = *pLastCol;
165,305,573✔
1418
  pLRULastCol->dirty = dirty;
165,307,975✔
1419
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
165,321,753!
1420

1421
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
165,329,219✔
1422
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
1423
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
165,332,758!
1424
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
1425
    code = TSDB_CODE_FAILED;
×
1426
    pLRULastCol = NULL;
×
1427
  }
1428

1429
_exit:
165,332,758✔
1430
  if (TSDB_CODE_SUCCESS != code) {
165,331,931!
1431
    taosMemoryFree(pLRULastCol);
×
1432
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1433
  }
1434

1435
  TAOS_RETURN(code);
165,331,931✔
1436
}
1437

1438
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
12,075,993✔
1439
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
12,075,993!
1440
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1441
  }
1442

1443
  int32_t code = 0, lino = 0;
12,075,993✔
1444

1445
  int        num_keys = TARRAY_SIZE(updCtxArray);
12,075,993✔
1446
  SArray    *remainCols = NULL;
12,075,993✔
1447
  SLRUCache *pCache = pTsdb->lruCache;
12,075,993✔
1448

1449
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
12,075,993✔
1450
  for (int i = 0; i < num_keys; ++i) {
100,239,766✔
1451
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
88,163,773✔
1452
    int8_t          lflag = updCtx->lflag;
88,162,775✔
1453
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
88,163,773✔
1454
    SColVal        *pColVal = &updCtx->colVal;
88,163,773✔
1455

1456
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
88,163,227!
1457
      continue;
×
1458
    }
1459

1460
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
88,162,681✔
1461
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
88,164,225✔
1462
    if (h) {
88,161,231✔
1463
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
74,573,600✔
1464
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
74,574,598✔
1465
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
74,568,359✔
1466
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
74,569,357!
1467
          SLastCol newLastCol = {
74,564,986✔
1468
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1469
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
74,565,984✔
1470
        }
1471
      }
1472

1473
      tsdbLRUCacheRelease(pCache, h, false);
74,575,596✔
1474
      TAOS_CHECK_EXIT(code);
74,574,598!
1475
    } else {
1476
      if (!remainCols) {
13,587,631✔
1477
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
370,036✔
1478
        if (!remainCols) {
370,036!
1479
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1480
        }
1481
      }
1482
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
27,175,808!
1483
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1484
      }
1485
    }
1486
  }
1487

1488
  if (remainCols) {
12,075,993✔
1489
    num_keys = TARRAY_SIZE(remainCols);
370,036✔
1490
  }
1491
  if (remainCols && num_keys > 0) {
12,075,993!
1492
    char  **keys_list = NULL;
370,036✔
1493
    size_t *keys_list_sizes = NULL;
370,036✔
1494
    char  **values_list = NULL;
370,036✔
1495
    size_t *values_list_sizes = NULL;
370,036✔
1496
    char  **errs = NULL;
370,036✔
1497
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
370,036!
1498
    if (!keys_list) {
370,036!
1499
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1500
      return terrno;
×
1501
    }
1502
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
370,036!
1503
    if (!keys_list_sizes) {
370,036!
1504
      taosMemoryFree(keys_list);
×
1505
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1506
      return terrno;
×
1507
    }
1508
    for (int i = 0; i < num_keys; ++i) {
13,958,759✔
1509
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
13,588,723✔
1510

1511
      keys_list[i] = (char *)&idxKey->key;
13,588,723✔
1512
      keys_list_sizes[i] = ROCKS_KEY_LEN;
13,588,723✔
1513
    }
1514

1515
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
370,036✔
1516

1517
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
370,036✔
1518
                                       &values_list_sizes);
1519
    if (code) {
370,036!
1520
      taosMemoryFree(keys_list);
×
1521
      taosMemoryFree(keys_list_sizes);
×
1522
      goto _exit;
×
1523
    }
1524

1525
    // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
1526
    for (int i = 0; i < num_keys; ++i) {
13,957,665✔
1527
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
13,587,629✔
1528
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
13,587,081✔
1529
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
13,587,083✔
1530
      SColVal        *pColVal = &updCtx->colVal;
13,587,083✔
1531

1532
      SLastCol *pLastCol = NULL;
13,587,083✔
1533
      if (values_list[i] != NULL) {
13,587,083!
1534
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
1535
        if (code != TSDB_CODE_SUCCESS) {
×
1536
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1537
                    tstrerror(code));
1538
          goto _exit;
×
1539
        }
1540
      }
1541
      /*
1542
      if (code) {
1543
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1544
      }
1545
      */
1546
      SLastCol *pToFree = pLastCol;
13,587,631✔
1547

1548
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
13,587,631!
1549
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
1550
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1551
                    tstrerror(code));
1552
          taosMemoryFreeClear(pToFree);
×
1553
          break;
×
1554
        }
1555

1556
        // cache invalid => skip update
1557
        taosMemoryFreeClear(pToFree);
×
1558
        continue;
×
1559
      }
1560

1561
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
13,587,631!
1562
        taosMemoryFreeClear(pToFree);
×
1563
        continue;
×
1564
      }
1565

1566
      int32_t cmp_res = 1;
13,587,631✔
1567
      if (pLastCol) {
13,587,631!
1568
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
×
1569
      }
1570

1571
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
13,587,631!
1572
        SLastCol lastColTmp = {
13,587,631✔
1573
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
1574
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
13,588,177!
1575
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1576
                    tstrerror(code));
1577
          taosMemoryFreeClear(pToFree);
×
1578
          break;
×
1579
        }
1580
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
13,588,180!
1581
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1582
                    tstrerror(code));
1583
          taosMemoryFreeClear(pToFree);
×
1584
          break;
×
1585
        }
1586
      }
1587

1588
      taosMemoryFreeClear(pToFree);
13,587,629!
1589
    }
1590

1591
    rocksMayWrite(pTsdb, false);
370,036✔
1592

1593
    taosMemoryFree(keys_list);
370,036!
1594
    taosMemoryFree(keys_list_sizes);
370,036!
1595
    if (values_list) {
370,036!
1596
#ifdef USE_ROCKSDB
1597
      for (int i = 0; i < num_keys; ++i) {
13,958,759✔
1598
        rocksdb_free(values_list[i]);
13,588,723✔
1599
      }
1600
#endif
1601
      taosMemoryFree(values_list);
370,036!
1602
    }
1603
    taosMemoryFree(values_list_sizes);
370,036!
1604
  }
1605

1606
_exit:
12,075,993✔
1607
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
12,075,993✔
1608
  taosArrayDestroy(remainCols);
12,075,993✔
1609

1610
  if (code) {
12,075,993!
1611
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1612
              tstrerror(code));
1613
  }
1614

1615
  TAOS_RETURN(code);
12,075,993✔
1616
}
1617

1618
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
×
1619
                                 SRow **aRow) {
1620
  int32_t code = 0, lino = 0;
×
1621

1622
  // 1. prepare last
1623
  TSDBROW      lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
×
1624
  STSchema    *pTSchema = NULL;
×
1625
  int32_t      sver = TSDBROW_SVERSION(&lRow);
×
1626
  SSHashObj   *iColHash = NULL;
×
1627
  STSDBRowIter iter = {0};
×
1628

1629
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
1630
  pTSchema = pTsdb->rCache.pTSchema;
×
1631

1632
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
×
1633
  int32_t nCol = pTSchema->numOfCols;
×
1634
  SArray *ctxArray = pTsdb->rCache.ctxArray;
×
1635

1636
  // 1. prepare by lrow
1637
  STsdbRowKey tsdbRowKey = {0};
×
1638
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1639

1640
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1641

1642
  int32_t iCol = 0;
×
1643
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
1644
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1645
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1646
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1647
    }
1648

1649
    if (COL_VAL_IS_VALUE(pColVal)) {
×
1650
      updateCtx.lflag = LFLAG_LAST;
×
1651
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1652
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1653
      }
1654
    } else {
1655
      if (!iColHash) {
×
1656
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
1657
        if (iColHash == NULL) {
×
1658
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1659
        }
1660
      }
1661

1662
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
×
1663
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1664
      }
1665
    }
1666
  }
1667

1668
  // 2. prepare by the other rows
1669
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
×
1670
    if (tSimpleHashGetSize(iColHash) == 0) {
×
1671
      break;
×
1672
    }
1673

1674
    tRow.pTSRow = aRow[iRow];
×
1675

1676
    STsdbRowKey tsdbRowKey = {0};
×
1677
    tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1678

1679
    void   *pIte = NULL;
×
1680
    int32_t iter = 0;
×
1681
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
×
1682
      int32_t iCol = ((int32_t *)pIte)[0];
×
1683
      SColVal colVal = COL_VAL_NONE(0, 0);
×
1684
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
×
1685

1686
      if (COL_VAL_IS_VALUE(&colVal)) {
×
1687
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1688
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1689
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1690
        }
1691
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
1692
        if (code != TSDB_CODE_SUCCESS) {
×
1693
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1694
                    __LINE__, tstrerror(code));
1695
        }
1696
      }
1697
    }
1698
  }
1699

1700
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1701

1702
_exit:
×
1703
  if (code) {
×
1704
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1705
  }
1706

1707
  tsdbRowClose(&iter);
×
1708
  tSimpleHashCleanup(iColHash);
×
1709
  taosArrayClear(ctxArray);
×
1710

1711
  TAOS_RETURN(code);
×
1712
}
1713

1714
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
1715
  int32_t      code = 0, lino = 0;
×
1716
  STSDBRowIter iter = {0};
×
1717
  STSchema    *pTSchema = NULL;
×
1718
  SArray      *ctxArray = NULL;
×
1719

1720
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
1721
  int32_t sver = TSDBROW_SVERSION(&lRow);
×
1722

1723
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1724

1725
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
1726
  if (ctxArray == NULL) {
×
1727
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1728
  }
1729

1730
  // 1. prepare last
1731
  STsdbRowKey tsdbRowKey = {0};
×
1732
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1733

1734
  {
1735
    SValue tsVal = {.type = TSDB_DATA_TYPE_TIMESTAMP};
×
1736
    VALUE_SET_TRIVIAL_DATUM(&tsVal, lRow.pBlockData->aTSKEY[lRow.iRow]);
×
1737
    SLastUpdateCtx updateCtx = {
×
1738
        .lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, tsVal)};
1739
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1740
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1741
    }
1742
  }
1743

1744
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1745

1746
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1747
    SColData *pColData = &pBlockData->aColData[iColData];
×
1748
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
1749
      continue;
×
1750
    }
1751

1752
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
1753
      STsdbRowKey tsdbRowKey = {0};
×
1754
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1755

1756
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
1757
      if (colType == 2) {
×
1758
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
1759
        TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit);
×
1760

1761
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1762
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1763
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1764
        }
1765
        break;
×
1766
      }
1767
    }
1768
  }
1769

1770
  // 2. prepare last row
1771
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1772
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
1773
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1774
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1775
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1776
    }
1777
  }
1778

1779
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1780

1781
_exit:
×
1782
  tsdbRowClose(&iter);
×
1783
  taosMemoryFreeClear(pTSchema);
×
1784
  taosArrayDestroy(ctxArray);
×
1785

1786
  TAOS_RETURN(code);
×
1787
}
1788

1789
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1790
                            int nCols, int16_t *slotIds);
1791

1792
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1793
                               int nCols, int16_t *slotIds);
1794

1795
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
131,552✔
1796
                                    SCacheRowsReader *pr, int8_t ltype) {
1797
  int32_t code = 0, lino = 0;
131,552✔
1798
  // rocksdb_writebatch_t *wb = NULL;
1799
  SArray *pTmpColArray = NULL;
131,552✔
1800
  bool    extraTS = false;
131,552✔
1801

1802
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
131,552✔
1803
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
131,552✔
1804
    // ignore 'ts' loaded from cache and load it from tsdb
1805
    // SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
1806
    // tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
1807

1808
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
40,720✔
1809
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
40,720!
1810
      TAOS_RETURN(terrno);
×
1811
    }
1812

1813
    extraTS = true;
40,720✔
1814
  }
1815

1816
  int      num_keys = TARRAY_SIZE(remainCols);
131,552✔
1817
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
131,552!
1818

1819
  int16_t *lastColIds = NULL, *lastSlotIds = NULL, *lastrowColIds = NULL, *lastrowSlotIds = NULL;
131,552✔
1820
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
131,552!
1821
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
131,552!
1822
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
131,552!
1823
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
131,552!
1824
  SArray *lastTmpColArray = NULL, *lastTmpIndexArray = NULL, *lastrowTmpColArray = NULL, *lastrowTmpIndexArray = NULL;
131,552✔
1825

1826
  int lastIndex = 0;
131,552✔
1827
  int lastrowIndex = 0;
131,552✔
1828

1829
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
131,552!
1830
    TAOS_CHECK_EXIT(terrno);
×
1831
  }
1832

1833
  for (int i = 0; i < num_keys; ++i) {
517,569✔
1834
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
386,017✔
1835
    if (extraTS && !i) {
386,017✔
1836
      slotIds[i] = 0;
40,720✔
1837
    } else {
1838
      slotIds[i] = pr->pSlotIds[idxKey->idx];
345,297✔
1839
    }
1840

1841
    if (IS_LAST_KEY(idxKey->key)) {
386,017✔
1842
      if (NULL == lastTmpIndexArray) {
197,980✔
1843
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
66,780✔
1844
        if (!lastTmpIndexArray) {
66,780!
1845
          TAOS_CHECK_EXIT(terrno);
×
1846
        }
1847
      }
1848
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
197,980!
1849
        TAOS_CHECK_EXIT(terrno);
×
1850
      }
1851
      lastColIds[lastIndex] = idxKey->key.cid;
197,980✔
1852
      if (extraTS && !i) {
197,980✔
1853
        lastSlotIds[lastIndex] = 0;
40,720✔
1854
      } else {
1855
        lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
157,260✔
1856
      }
1857
      lastIndex++;
197,980✔
1858
    } else {
1859
      if (NULL == lastrowTmpIndexArray) {
188,037✔
1860
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
64,772✔
1861
        if (!lastrowTmpIndexArray) {
64,772!
1862
          TAOS_CHECK_EXIT(terrno);
×
1863
        }
1864
      }
1865
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
188,037!
1866
        TAOS_CHECK_EXIT(terrno);
×
1867
      }
1868
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
188,037✔
1869
      if (extraTS && !i) {
188,037!
1870
        lastrowSlotIds[lastrowIndex] = 0;
×
1871
      } else {
1872
        lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
188,037✔
1873
      }
1874
      lastrowIndex++;
188,037✔
1875
    }
1876
  }
1877

1878
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
131,552✔
1879
  if (!pTmpColArray) {
131,552!
1880
    TAOS_CHECK_EXIT(terrno);
×
1881
  }
1882

1883
  if (lastTmpIndexArray != NULL) {
131,552✔
1884
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
66,780!
1885
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
253,860✔
1886
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
186,332!
1887
                           taosArrayGet(lastTmpColArray, i))) {
187,080✔
1888
        TAOS_CHECK_EXIT(terrno);
×
1889
      }
1890
    }
1891
  }
1892

1893
  if (lastrowTmpIndexArray != NULL) {
131,552✔
1894
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
64,772!
1895
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
241,909✔
1896
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
177,137!
1897
                           taosArrayGet(lastrowTmpColArray, i))) {
177,137✔
1898
        TAOS_CHECK_EXIT(terrno);
×
1899
      }
1900
    }
1901
  }
1902

1903
  SLRUCache *pCache = pTsdb->lruCache;
131,552✔
1904
  for (int i = 0; i < num_keys; ++i) {
517,569✔
1905
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
386,017✔
1906
    SLastCol *pLastCol = NULL;
385,269✔
1907

1908
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
385,269!
1909
      pLastCol = taosArrayGet(pTmpColArray, i);
363,469✔
1910
    }
1911

1912
    // still null, then make up a none col value
1913
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
385,269✔
1914
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
385,269✔
1915
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
1916
    if (!pLastCol) {
385,269✔
1917
      pLastCol = &noneCol;
21,800✔
1918
    }
1919

1920
    if (!extraTS || i > 0) {
385,269✔
1921
      taosArraySet(pLastArray, idxKey->idx, pLastCol);
345,297✔
1922
    }
1923

1924
    // taosArrayRemove(remainCols, i);
1925

1926
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
385,269!
1927
      continue;
×
1928
    }
1929
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
385,269!
1930
      continue;
×
1931
    }
1932

1933
    // store result back to rocks cache
1934
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
385,269✔
1935
    if (code) {
384,521!
1936
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1937
      TAOS_CHECK_EXIT(code);
×
1938
    }
1939

1940
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
384,521✔
1941
    if (code) {
386,017!
1942
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1943
      TAOS_CHECK_EXIT(code);
×
1944
    }
1945

1946
    if (extraTS && i == 0) {
386,017✔
1947
      tsdbCacheFreeSLastColItem(pLastCol);
40,720✔
1948
    }
1949
  }
1950

1951
  rocksMayWrite(pTsdb, false);
131,552✔
1952

1953
_exit:
131,552✔
1954
  taosArrayDestroy(lastrowTmpIndexArray);
131,552✔
1955
  taosArrayDestroy(lastrowTmpColArray);
131,552✔
1956
  taosArrayDestroy(lastTmpIndexArray);
131,552✔
1957
  taosArrayDestroy(lastTmpColArray);
131,552✔
1958

1959
  taosMemoryFree(lastColIds);
131,552!
1960
  taosMemoryFree(lastSlotIds);
131,552!
1961
  taosMemoryFree(lastrowColIds);
131,552!
1962
  taosMemoryFree(lastrowSlotIds);
131,552!
1963

1964
  taosArrayDestroy(pTmpColArray);
131,552✔
1965

1966
  taosMemoryFree(slotIds);
131,552!
1967

1968
  TAOS_RETURN(code);
131,552✔
1969
}
1970

1971
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
184,826✔
1972
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
1973
  int32_t code = 0, lino = 0;
184,826✔
1974
  int     num_keys = TARRAY_SIZE(remainCols);
184,826✔
1975
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
184,826!
1976
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
184,826!
1977
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
184,826!
1978
  if (!keys_list || !keys_list_sizes || !key_list) {
184,826!
1979
    taosMemoryFree(keys_list);
×
1980
    taosMemoryFree(keys_list_sizes);
×
1981
    TAOS_RETURN(terrno);
×
1982
  }
1983
  char  **values_list = NULL;
184,826✔
1984
  size_t *values_list_sizes = NULL;
184,826✔
1985
  for (int i = 0; i < num_keys; ++i) {
632,573✔
1986
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
447,747!
1987
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
447,747✔
1988
    keys_list_sizes[i] = ROCKS_KEY_LEN;
447,747✔
1989
  }
1990

1991
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
184,826✔
1992

1993
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
184,826✔
1994
                                     &values_list_sizes);
1995
  if (code) {
184,826!
1996
    taosMemoryFree(key_list);
×
1997
    taosMemoryFree(keys_list);
×
1998
    taosMemoryFree(keys_list_sizes);
×
1999
    TAOS_RETURN(code);
×
2000
  }
2001

2002
  SLRUCache *pCache = pTsdb->lruCache;
184,826✔
2003
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
632,573!
2004
    SLastCol *pLastCol = NULL;
447,747✔
2005
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
447,747!
2006
    if (ignore) {
447,747✔
2007
      ++j;
1,747✔
2008
      continue;
1,747✔
2009
    }
2010

2011
    if (values_list[i] != NULL) {
446,000✔
2012
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
102,450✔
2013
      if (code != TSDB_CODE_SUCCESS) {
102,450!
2014
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2015
                  tstrerror(code));
2016
        goto _exit;
×
2017
      }
2018
    }
2019
    SLastCol *pToFree = pLastCol;
446,000✔
2020
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
446,000✔
2021
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
548,450!
2022
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
102,450✔
2023
      if (code) {
102,450!
2024
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
2025
        taosMemoryFreeClear(pToFree);
×
2026
        TAOS_CHECK_EXIT(code);
×
2027
      }
2028

2029
      SLastCol lastCol = *pLastCol;
102,450✔
2030
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
102,450✔
2031
      if (TSDB_CODE_SUCCESS != code) {
102,450!
2032
        taosMemoryFreeClear(pToFree);
×
2033
        TAOS_CHECK_EXIT(code);
×
2034
      }
2035

2036
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
102,450✔
2037
      taosArrayRemove(remainCols, j);
102,450✔
2038
      taosArrayRemove(ignoreFromRocks, j);
102,450✔
2039
    } else {
2040
      ++j;
343,550✔
2041
    }
2042

2043
    taosMemoryFreeClear(pToFree);
446,000!
2044
  }
2045

2046
  if (TARRAY_SIZE(remainCols) > 0) {
184,826✔
2047
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
2048
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
131,552✔
2049
  }
2050

2051
_exit:
184,826✔
2052
  taosMemoryFree(key_list);
184,826!
2053
  taosMemoryFree(keys_list);
184,826!
2054
  taosMemoryFree(keys_list_sizes);
184,826!
2055
  if (values_list) {
184,826!
2056
#ifdef USE_ROCKSDB
2057
    for (int i = 0; i < num_keys; ++i) {
632,573✔
2058
      rocksdb_free(values_list[i]);
447,747✔
2059
    }
2060
#endif
2061
    taosMemoryFree(values_list);
184,826!
2062
  }
2063
  taosMemoryFree(values_list_sizes);
184,826!
2064

2065
  TAOS_RETURN(code);
184,826✔
2066
}
2067

2068
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
5,141,628✔
2069
                                        int8_t ltype, SArray *keyArray) {
2070
  int32_t    code = 0, lino = 0;
5,141,628✔
2071
  SArray    *remainCols = NULL;
5,141,628✔
2072
  SArray    *ignoreFromRocks = NULL;
5,141,628✔
2073
  SLRUCache *pCache = pTsdb->lruCache;
5,141,628✔
2074
  SArray    *pCidList = pr->pCidList;
5,141,628✔
2075
  int        numKeys = TARRAY_SIZE(pCidList);
5,141,628✔
2076

2077
  for (int i = 0; i < numKeys; ++i) {
16,843,368✔
2078
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
11,703,090✔
2079

2080
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
11,703,090✔
2081
    // for select last_row, last case
2082
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
11,703,659✔
2083
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
11,703,659!
2084
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
×
2085
    }
2086
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
11,703,659!
2087
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
×
2088
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
×
2089
    }
2090

2091
    if (!taosArrayPush(keyArray, &key)) {
11,704,408!
2092
      TAOS_CHECK_EXIT(terrno);
×
2093
    }
2094

2095
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
11,704,408✔
2096
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
11,703,659✔
2097
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
22,954,441✔
2098
      SLastCol lastCol = *pLastCol;
11,255,163✔
2099
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
11,255,163!
2100
        tsdbLRUCacheRelease(pCache, h, false);
×
2101
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
2102
      }
2103

2104
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
11,253,270!
2105
        code = terrno;
×
2106
        tsdbLRUCacheRelease(pCache, h, false);
×
2107
        goto _exit;
×
2108
      }
2109
    } else {
2110
      // no cache or cache is invalid
2111
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
447,146✔
2112
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
447,747✔
2113

2114
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
447,747!
2115
        code = terrno;
×
2116
        tsdbLRUCacheRelease(pCache, h, false);
×
2117
        goto _exit;
×
2118
      }
2119

2120
      if (!remainCols) {
447,747✔
2121
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
184,826!
2122
          code = terrno;
×
2123
          tsdbLRUCacheRelease(pCache, h, false);
×
2124
          goto _exit;
×
2125
        }
2126
      }
2127
      if (!ignoreFromRocks) {
447,747✔
2128
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
184,826!
2129
          code = terrno;
×
2130
          tsdbLRUCacheRelease(pCache, h, false);
×
2131
          goto _exit;
×
2132
        }
2133
      }
2134
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
895,494!
2135
        code = terrno;
×
2136
        tsdbLRUCacheRelease(pCache, h, false);
×
2137
        goto _exit;
×
2138
      }
2139
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
447,747!
2140
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
447,747!
2141
        code = terrno;
×
2142
        tsdbLRUCacheRelease(pCache, h, false);
×
2143
        goto _exit;
×
2144
      }
2145
    }
2146

2147
    if (h) {
11,700,448✔
2148
      tsdbLRUCacheRelease(pCache, h, false);
11,253,369✔
2149
    }
2150
  }
2151

2152
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
5,140,278!
2153
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
184,826✔
2154

2155
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
632,573✔
2156
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
447,747✔
2157
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
447,747✔
2158
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
447,747✔
2159
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
447,747!
2160
        SLastCol lastCol = *pLastCol;
×
2161
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
2162
        if (code) {
×
2163
          tsdbLRUCacheRelease(pCache, h, false);
×
2164
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
2165
          TAOS_RETURN(code);
×
2166
        }
2167

2168
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
2169

2170
        taosArrayRemove(remainCols, i);
×
2171
        taosArrayRemove(ignoreFromRocks, i);
×
2172
      } else {
2173
        // no cache or cache is invalid
2174
        ++i;
447,747✔
2175
      }
2176
      if (h) {
447,747✔
2177
        tsdbLRUCacheRelease(pCache, h, false);
1,747✔
2178
      }
2179
    }
2180

2181
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
2182
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
184,826✔
2183

2184
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
184,826✔
2185
  }
2186

2187
_exit:
4,955,452✔
2188
  if (remainCols) {
5,139,709✔
2189
    taosArrayDestroy(remainCols);
184,826✔
2190
  }
2191
  if (ignoreFromRocks) {
5,139,709✔
2192
    taosArrayDestroy(ignoreFromRocks);
184,826✔
2193
  }
2194

2195
  TAOS_RETURN(code);
5,139,709✔
2196
}
2197

2198
typedef enum SMEMNEXTROWSTATES {
2199
  SMEMNEXTROW_ENTER,
2200
  SMEMNEXTROW_NEXT,
2201
} SMEMNEXTROWSTATES;
2202

2203
typedef struct SMemNextRowIter {
2204
  SMEMNEXTROWSTATES state;
2205
  STbData          *pMem;  // [input]
2206
  STbDataIter       iter;  // mem buffer skip list iterator
2207
  int64_t           lastTs;
2208
} SMemNextRowIter;
2209

2210
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
583,757,709✔
2211
                                 int nCols) {
2212
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
583,757,709✔
2213
  int32_t          code = 0;
583,757,709✔
2214
  *pIgnoreEarlierTs = false;
583,757,709✔
2215
  switch (state->state) {
585,871,811!
2216
    case SMEMNEXTROW_ENTER: {
3,645,438✔
2217
      if (state->pMem != NULL) {
3,645,438✔
2218
        /*
2219
        if (state->pMem->maxKey <= state->lastTs) {
2220
          *ppRow = NULL;
2221
          *pIgnoreEarlierTs = true;
2222

2223
          TAOS_RETURN(code);
2224
        }
2225
        */
2226
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
3,644,689✔
2227

2228
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
3,646,188!
2229
        if (pMemRow) {
3,644,838!
2230
          *ppRow = pMemRow;
3,645,586✔
2231
          state->state = SMEMNEXTROW_NEXT;
3,645,586✔
2232

2233
          TAOS_RETURN(code);
3,645,586✔
2234
        }
2235
      }
2236

2237
      *ppRow = NULL;
750✔
2238

2239
      TAOS_RETURN(code);
×
2240
    }
2241
    case SMEMNEXTROW_NEXT:
584,763,500✔
2242
      if (tsdbTbDataIterNext(&state->iter)) {
584,763,500✔
2243
        *ppRow = tsdbTbDataIterGet(&state->iter);
1,168,615,111!
2244

2245
        TAOS_RETURN(code);
584,401,897✔
2246
      } else {
2247
        *ppRow = NULL;
236,923✔
2248

2249
        TAOS_RETURN(code);
236,923✔
2250
      }
2251
    default:
×
2252
      break;
×
2253
  }
2254

2255
_err:
×
2256
  *ppRow = NULL;
×
2257

2258
  TAOS_RETURN(code);
×
2259
}
2260

2261
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2262
                                  int nCols);
2263
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2264

2265
typedef struct {
2266
  TSDBROW             *pRow;
2267
  bool                 stop;
2268
  bool                 next;
2269
  bool                 ignoreEarlierTs;
2270
  void                *iter;
2271
  _next_row_fn_t       nextRowFn;
2272
  _next_row_clear_fn_t nextRowClearFn;
2273
} TsdbNextRowState;
2274

2275
typedef struct {
2276
  SArray           *pMemDelData;
2277
  SArray           *pSkyline;
2278
  int64_t           iSkyline;
2279
  SBlockIdx         idx;
2280
  SMemNextRowIter   memState;
2281
  SMemNextRowIter   imemState;
2282
  TSDBROW           memRow, imemRow;
2283
  TsdbNextRowState  input[2];
2284
  SCacheRowsReader *pr;
2285
  STsdb            *pTsdb;
2286
} MemNextRowIter;
2287

2288
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
5,138,960✔
2289
                              STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
2290
  int32_t code = 0, lino = 0;
5,138,960✔
2291

2292
  STbData *pMem = NULL;
5,139,529✔
2293
  if (pReadSnap->pMem) {
5,139,529!
2294
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
5,140,879✔
2295
  }
2296

2297
  STbData *pIMem = NULL;
5,140,278✔
2298
  if (pReadSnap->pIMem) {
5,140,278✔
2299
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
1,588✔
2300
  }
2301

2302
  pIter->pTsdb = pTsdb;
5,141,628✔
2303

2304
  pIter->pMemDelData = NULL;
5,141,085✔
2305

2306
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
5,141,085!
2307

2308
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
5,141,059✔
2309

2310
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
5,140,310✔
2311
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
5,140,130✔
2312

2313
  if (pMem) {
5,138,032✔
2314
    pIter->memState.pMem = pMem;
3,564,323✔
2315
    pIter->memState.state = SMEMNEXTROW_ENTER;
3,565,072✔
2316
    pIter->input[0].stop = false;
3,564,323✔
2317
    pIter->input[0].next = true;
3,563,575✔
2318
  }
2319

2320
  if (pIMem) {
5,138,781✔
2321
    pIter->imemState.pMem = pIMem;
1,588✔
2322
    pIter->imemState.state = SMEMNEXTROW_ENTER;
1,588✔
2323
    pIter->input[1].stop = false;
1,588✔
2324
    pIter->input[1].next = true;
1,588✔
2325
  }
2326

2327
  pIter->pr = pr;
5,138,781✔
2328

2329
_exit:
5,140,131✔
2330
  if (code) {
5,140,131!
2331
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2332
  }
2333

2334
  TAOS_RETURN(code);
5,140,131✔
2335
}
2336

2337
static void memRowIterClose(MemNextRowIter *pIter) {
5,139,588✔
2338
  for (int i = 0; i < 2; ++i) {
15,418,221✔
2339
    if (pIter->input[i].nextRowClearFn) {
10,280,131!
2340
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
2341
    }
2342
  }
2343

2344
  if (pIter->pSkyline) {
5,138,090✔
2345
    taosArrayDestroy(pIter->pSkyline);
3,564,324✔
2346
  }
2347

2348
  if (pIter->pMemDelData) {
5,139,587!
2349
    taosArrayDestroy(pIter->pMemDelData);
5,139,793✔
2350
  }
2351
}
5,140,879✔
2352

2353
static void freeTableInfoFunc(void *param) {
3,611,837✔
2354
  void **p = (void **)param;
3,611,837✔
2355
  taosMemoryFreeClear(*p);
3,611,837!
2356
}
3,611,089✔
2357

2358
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
3,737,520✔
2359
  if (!pReader->pTableMap) {
3,737,520✔
2360
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,923,151✔
2361
    if (!pReader->pTableMap) {
1,925,397!
2362
      return NULL;
×
2363
    }
2364

2365
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
1,924,648✔
2366
  }
2367

2368
  STableLoadInfo  *pInfo = NULL;
3,736,170✔
2369
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
3,733,176✔
2370
  if (!ppInfo) {
3,735,275✔
2371
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
3,610,161!
2372
    if (pInfo) {
3,612,406!
2373
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
3,612,406!
2374
        return NULL;
×
2375
      }
2376
    }
2377

2378
    return pInfo;
3,609,559✔
2379
  }
2380

2381
  return *ppInfo;
125,114✔
2382
}
2383

2384
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
578,165,450✔
2385
  int32_t code = 0, lino = 0;
578,165,450✔
2386

2387
  for (;;) {
2,353,813✔
2388
    for (int i = 0; i < 2; ++i) {
1,760,267,905✔
2389
      if (pIter->input[i].next && !pIter->input[i].stop) {
1,168,484,237!
2390
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
590,556,950!
2391
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
2392
                        &lino, _exit);
2393

2394
        if (pIter->input[i].pRow == NULL) {
587,850,001✔
2395
          pIter->input[i].stop = true;
227,333✔
2396
          pIter->input[i].next = false;
227,333✔
2397
        }
2398
      }
2399
    }
2400

2401
    if (pIter->input[0].stop && pIter->input[1].stop) {
591,783,668!
2402
      return NULL;
1,803,140✔
2403
    }
2404

2405
    TSDBROW *max[2] = {0};
587,740,553✔
2406
    int      iMax[2] = {-1, -1};
583,114,728✔
2407
    int      nMax = 0;
588,187,942✔
2408
    SRowKey  maxKey = {.ts = TSKEY_MIN};
588,187,942✔
2409

2410
    for (int i = 0; i < 2; ++i) {
1,767,770,790✔
2411
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
1,175,634,438!
2412
        STsdbRowKey tsdbRowKey = {0};
590,204,632✔
2413
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
590,984,410✔
2414

2415
        // merging & deduplicating on client side
2416
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
588,177,804✔
2417
        if (c <= 0) {
587,931,609✔
2418
          if (c < 0) {
588,804,947✔
2419
            nMax = 0;
587,475,517✔
2420
            maxKey = tsdbRowKey.key;
587,475,517✔
2421
          }
2422

2423
          iMax[nMax] = i;
588,804,947✔
2424
          max[nMax++] = pIter->input[i].pRow;
590,608,667✔
2425
        }
2426
        pIter->input[i].next = false;
590,531,823✔
2427
      }
2428
    }
2429

2430
    TSDBROW *merge[2] = {0};
592,136,352✔
2431
    int      iMerge[2] = {-1, -1};
592,255,539✔
2432
    int      nMerge = 0;
584,370,184✔
2433
    for (int i = 0; i < nMax; ++i) {
1,174,462,901✔
2434
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
584,089,865!
2435

2436
      if (!pIter->pSkyline) {
584,454,547✔
2437
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
3,565,072✔
2438
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
3,565,821!
2439

2440
        uint64_t        uid = pIter->idx.uid;
3,565,821✔
2441
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
3,565,821✔
2442
        TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
3,564,471!
2443

2444
        if (pInfo->pTombData == NULL) {
3,564,471✔
2445
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
3,484,944✔
2446
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
3,483,446!
2447
        }
2448

2449
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
3,562,973!
2450
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2451
        }
2452

2453
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
3,565,821✔
2454
        if (delSize > 0) {
3,565,821✔
2455
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
32,637✔
2456
          TAOS_CHECK_GOTO(code, &lino, _exit);
32,637!
2457
        }
2458
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
3,565,821✔
2459
      }
2460

2461
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
585,938,742✔
2462
      if (!deleted) {
586,166,241✔
2463
        iMerge[nMerge] = iMax[i];
586,886,482✔
2464
        merge[nMerge++] = max[i];
587,632,790✔
2465
      }
2466

2467
      pIter->input[iMax[i]].next = deleted;
588,401,437✔
2468
    }
2469

2470
    if (nMerge > 0) {
590,373,036✔
2471
      pIter->input[iMerge[0]].next = true;
586,875,920✔
2472

2473
      return merge[0];
591,346,329✔
2474
    }
2475
  }
2476

2477
_exit:
×
2478
  if (code) {
×
2479
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2480
  }
2481

2482
  return NULL;
×
2483
}
2484

2485
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
1,531,143✔
2486
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
1,531,143✔
2487
  *ppDst = taosMemoryMalloc(len);
1,531,143!
2488
  if (NULL == *ppDst) {
1,531,143!
2489
    TAOS_RETURN(terrno);
×
2490
  }
2491
  memcpy(*ppDst, pSrc, len);
1,531,143!
2492

2493
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,531,143✔
2494
}
2495

2496
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
580,568,001✔
2497
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
580,568,001✔
2498
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
1,531,143✔
2499
  }
2500

2501
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
582,272,485✔
2502
    TAOS_RETURN(TSDB_CODE_SUCCESS);
586,922,311✔
2503
  }
2504

2505
  taosMemoryFreeClear(pReader->pCurrSchema);
125,727!
2506
  TAOS_RETURN(
368,124✔
2507
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
2508
}
2509

2510
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
5,138,960✔
2511
                                        SArray *keyArray) {
2512
  int32_t        code = 0;
5,138,960✔
2513
  int32_t        lino = 0;
5,138,960✔
2514
  STSchema      *pTSchema = pr->pSchema;
5,138,960✔
2515
  SLRUCache     *pCache = pTsdb->lruCache;
5,138,780✔
2516
  SArray        *pCidList = pr->pCidList;
5,140,130✔
2517
  int            numKeys = TARRAY_SIZE(pCidList);
5,139,709✔
2518
  MemNextRowIter iter = {0};
5,138,960✔
2519
  SSHashObj     *iColHash = NULL;
5,138,960✔
2520
  STSDBRowIter   rowIter = {0};
5,138,960✔
2521

2522
  // 1, get from mem, imem filtered with delete info
2523
  TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
5,138,960!
2524

2525
  TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
5,139,382✔
2526
  if (!pRow) {
5,139,735✔
2527
    goto _exit;
1,575,264✔
2528
  }
2529

2530
  int32_t sversion = TSDBROW_SVERSION(pRow);
3,564,471!
2531
  if (sversion != -1) {
3,563,722!
2532
    TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
3,565,821!
2533

2534
    pTSchema = pr->pCurrSchema;
3,564,324✔
2535
  }
2536
  int32_t nCol = pTSchema->numOfCols;
3,561,476✔
2537

2538
  STsdbRowKey rowKey = {0};
3,565,821✔
2539
  tsdbRowGetKey(pRow, &rowKey);
3,565,821✔
2540

2541
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
3,565,821!
2542

2543
  int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
3,565,821✔
2544
  for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) {
31,427,749!
2545
    SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
27,861,511✔
2546
    if (pColVal->cid < pTargetCol->colVal.cid) {
27,858,696✔
2547
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
20,455,994✔
2548

2549
      continue;
20,455,245✔
2550
    }
2551
    if (pColVal->cid > pTargetCol->colVal.cid) {
7,404,767!
2552
      break;
×
2553
    }
2554

2555
    int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
7,403,597✔
2556
    if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
7,403,120✔
2557
      if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
2,080,079!
2558
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2,080,079✔
2559
        TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
2,077,981!
2560

2561
        tsdbCacheFreeSLastColItem(pTargetCol);
2,078,729✔
2562
        taosArraySet(pLastArray, jCol, &lastCol);
2,080,079✔
2563
      }
2564
    } else {
2565
      if (COL_VAL_IS_VALUE(pColVal)) {
5,323,040✔
2566
        if (cmp_res <= 0) {
4,658,111!
2567
          SLastCol lastCol = {
4,659,039✔
2568
              .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2569
          TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
4,656,463!
2570

2571
          tsdbCacheFreeSLastColItem(pTargetCol);
4,658,291✔
2572
          taosArraySet(pLastArray, jCol, &lastCol);
4,657,541✔
2573
        }
2574
      } else {
2575
        if (!iColHash) {
665,829✔
2576
          iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
402,193✔
2577
          if (iColHash == NULL) {
402,193!
2578
            TAOS_CHECK_EXIT(terrno);
×
2579
          }
2580
        }
2581

2582
        if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
665,829!
2583
          TAOS_CHECK_EXIT(terrno);
×
2584
        }
2585
      }
2586
    }
2587

2588
    ++jCol;
7,404,199✔
2589

2590
    if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) {
7,404,199✔
2591
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
4,585,549✔
2592
    }
2593
  }
2594
  tsdbRowClose(&rowIter);
3,564,503✔
2595

2596
  if (iColHash && tSimpleHashGetSize(iColHash) > 0) {
3,564,323!
2597
    pRow = memRowIterGet(&iter, false, NULL, 0);
402,193✔
2598
    while (pRow) {
587,032,406✔
2599
      if (tSimpleHashGetSize(iColHash) == 0) {
586,805,073✔
2600
        break;
174,860✔
2601
      }
2602

2603
      sversion = TSDBROW_SVERSION(pRow);
585,592,620!
2604
      if (sversion != -1) {
587,415,754✔
2605
        TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
585,094,901!
2606

2607
        pTSchema = pr->pCurrSchema;
583,724,967✔
2608
      }
2609
      nCol = pTSchema->numOfCols;
587,058,571✔
2610

2611
      STsdbRowKey tsdbRowKey = {0};
584,874,621✔
2612
      tsdbRowGetKey(pRow, &tsdbRowKey);
585,434,769✔
2613

2614
      TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
586,143,569!
2615

2616
      iCol = 0;
574,036,874✔
2617
      for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
2,147,483,647!
2618
           pColVal = tsdbRowIterNext(&rowIter), iCol++) {
2,147,483,647✔
2619
        int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
2,147,483,647✔
2620
        if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
2,147,483,647✔
2621
          SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol];
428,942✔
2622

2623
          int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key);
428,942✔
2624
          if (cmp_res <= 0) {
428,942!
2625
            SLastCol lastCol = {
428,942✔
2626
                .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2627
            TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
428,942!
2628

2629
            tsdbCacheFreeSLastColItem(pTargetCol);
428,942✔
2630
            taosArraySet(pLastArray, *pjCol, &lastCol);
428,942✔
2631
          }
2632

2633
          TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
428,942!
2634
        }
2635
      }
2636
      tsdbRowClose(&rowIter);
493,048,969✔
2637

2638
      pRow = memRowIterGet(&iter, false, NULL, 0);
574,086,167✔
2639
    }
2640
  }
2641

2642
_exit:
3,114,559✔
2643
  if (code) {
5,141,628!
2644
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2645

2646
    tsdbRowClose(&rowIter);
×
2647
  }
2648

2649
  tSimpleHashCleanup(iColHash);
5,141,628✔
2650

2651
  memRowIterClose(&iter);
5,139,588✔
2652

2653
  TAOS_RETURN(code);
5,141,628✔
2654
}
2655

2656
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
5,140,336✔
2657
  int32_t code = 0;
5,140,336✔
2658
  int32_t lino = 0;
5,140,336✔
2659

2660
  SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
5,140,336✔
2661
  if (!keyArray) {
5,140,336!
2662
    TAOS_CHECK_EXIT(terrno);
×
2663
  }
2664

2665
  TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
5,140,336!
2666

2667
  if (tsUpdateCacheBatch) {
5,139,709!
2668
    TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
5,136,994!
2669
  }
2670

2671
_exit:
5,144,343✔
2672
  if (code) {
5,144,343!
2673
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2674
  }
2675

2676
  if (keyArray) {
5,140,336!
2677
    taosArrayDestroy(keyArray);
5,140,336✔
2678
  }
2679

2680
  TAOS_RETURN(code);
5,141,628✔
2681
}
2682

2683
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
6,990,230✔
2684
  int32_t   code = 0, lino = 0;
6,990,230✔
2685
  STSchema *pTSchema = NULL;
6,990,230✔
2686
  int       sver = -1;
6,990,230✔
2687
  int       numKeys = 0;
6,990,230✔
2688
  SArray   *remainCols = NULL;
6,990,230✔
2689

2690
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
6,990,230!
2691

2692
  int numCols = pTSchema->numOfCols;
6,990,230✔
2693

2694
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
6,991,076✔
2695

2696
  for (int i = 0; i < numCols; ++i) {
43,167,483✔
2697
    int16_t cid = pTSchema->columns[i].colId;
36,176,407✔
2698
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
108,527,283✔
2699
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
72,350,876✔
2700
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
72,350,876✔
2701
      if (h) {
72,351,968✔
2702
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
213,544✔
2703
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
213,544!
2704
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
6,988✔
2705
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
6,988✔
2706
                              .dirty = 1,
2707
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2708
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
6,988✔
2709
        }
2710
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
213,544✔
2711
        TAOS_CHECK_EXIT(code);
213,544!
2712
      } else {
2713
        if (!remainCols) {
72,138,424✔
2714
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
6,963,625✔
2715
        }
2716
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
144,276,848!
2717
          TAOS_CHECK_EXIT(terrno);
×
2718
        }
2719
      }
2720
    }
2721
  }
2722

2723
  if (remainCols) {
6,991,076✔
2724
    numKeys = TARRAY_SIZE(remainCols);
6,963,625✔
2725
  }
2726

2727
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
6,991,076!
2728
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
6,991,076!
2729
  char  **values_list = NULL;
6,989,984✔
2730
  size_t *values_list_sizes = NULL;
6,989,984✔
2731

2732
  if (!keys_list || !keys_list_sizes) {
6,989,984!
2733
    code = terrno;
×
2734
    goto _exit;
×
2735
  }
2736
  const size_t klen = ROCKS_KEY_LEN;
6,989,984✔
2737

2738
  for (int i = 0; i < numKeys; ++i) {
79,129,254✔
2739
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
72,138,178!
2740
    if (!key) {
72,136,240!
2741
      code = terrno;
×
2742
      goto _exit;
×
2743
    }
2744
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
72,136,240✔
2745

2746
    ((SLastKey *)key)[0] = idxKey->key;
72,136,240✔
2747

2748
    keys_list[i] = key;
72,136,240✔
2749
    keys_list_sizes[i] = klen;
72,138,178✔
2750
  }
2751

2752
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
6,991,076✔
2753

2754
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
6,991,076!
2755
                                              &values_list, &values_list_sizes),
2756
                  NULL, _exit);
2757

2758
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
2759
  for (int i = 0; i < numKeys; ++i) {
79,115,058✔
2760
    SLastCol *pLastCol = NULL;
72,123,982✔
2761
    if (values_list[i] != NULL) {
72,122,890!
2762
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
2763
      if (code != TSDB_CODE_SUCCESS) {
×
2764
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2765
                  tstrerror(code));
2766
        goto _exit;
×
2767
      }
2768
    }
2769
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
72,123,982✔
2770
    SLastKey *pLastKey = &idxKey->key;
72,134,902✔
2771
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
72,134,902!
2772
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
×
2773
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
×
2774
                             .dirty = 0,
2775
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2776

2777
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
×
2778
        taosMemoryFreeClear(pLastCol);
×
2779
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2780
        goto _exit;
×
2781
      }
2782
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
×
2783
        taosMemoryFreeClear(pLastCol);
×
2784
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2785
        goto _exit;
×
2786
      }
2787
    }
2788

2789
    if (pLastCol == NULL) {
72,134,902✔
2790
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
72,123,982✔
2791
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2792
    }
2793

2794
    taosMemoryFreeClear(pLastCol);
72,123,982!
2795
  }
2796

2797
  rocksMayWrite(pTsdb, false);
6,991,076✔
2798

2799
_exit:
6,991,076✔
2800
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
6,991,076✔
2801

2802
  for (int i = 0; i < numKeys; ++i) {
79,121,610✔
2803
    taosMemoryFree(keys_list[i]);
72,130,534!
2804
  }
2805
  taosMemoryFree(keys_list);
6,991,076!
2806
  taosMemoryFree(keys_list_sizes);
6,991,076!
2807
  if (values_list) {
6,991,076!
2808
#if USE_ROCKSDB
2809
    for (int i = 0; i < numKeys; ++i) {
79,116,150✔
2810
      rocksdb_free(values_list[i]);
72,125,074✔
2811
    }
2812
#endif
2813
    taosMemoryFree(values_list);
6,991,076!
2814
  }
2815
  taosMemoryFree(values_list_sizes);
6,991,076!
2816
  taosArrayDestroy(remainCols);
6,991,076✔
2817
  taosMemoryFree(pTSchema);
6,991,076!
2818

2819
  TAOS_RETURN(code);
6,991,076✔
2820
}
2821

2822
int32_t tsdbOpenCache(STsdb *pTsdb) {
12,347,818✔
2823
  int32_t code = 0, lino = 0;
12,347,818✔
2824
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
12,360,576✔
2825

2826
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
12,360,576✔
2827
  if (pCache == NULL) {
12,360,576!
2828
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2829
  }
2830

2831
#ifdef USE_SHARED_STORAGE
2832
  if (tsSsEnabled) {
12,360,576!
2833
    TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
×
2834
    TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
×
2835
  }
2836
#endif
2837

2838
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
12,360,576!
2839

2840
  taosLRUCacheSetStrictCapacity(pCache, false);
12,360,576✔
2841

2842
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
12,358,994✔
2843

2844
_err:
12,355,018✔
2845
  if (code) {
12,355,018!
2846
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2847
  }
2848

2849
  pTsdb->lruCache = pCache;
12,355,018✔
2850

2851
  TAOS_RETURN(code);
12,359,335✔
2852
}
2853

2854
void tsdbCloseCache(STsdb *pTsdb) {
12,357,331✔
2855
  SLRUCache *pCache = pTsdb->lruCache;
12,357,331✔
2856
  if (pCache) {
12,360,576✔
2857
    taosLRUCacheEraseUnrefEntries(pCache);
12,359,612✔
2858

2859
    taosLRUCacheCleanup(pCache);
12,358,923✔
2860

2861
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
12,360,576✔
2862
  }
2863

2864
#ifdef USE_SHARED_STORAGE
2865
  if (tsSsEnabled) {
12,360,576!
2866
    tsdbCloseBCache(pTsdb);
×
2867
    tsdbClosePgCache(pTsdb);
×
2868
  }
2869
#endif
2870

2871
  tsdbCloseRocksCache(pTsdb);
12,360,576✔
2872
}
12,359,720✔
2873

2874
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
2875
  if (cacheType == 0) {  // last_row
×
2876
    *(uint64_t *)key = (uint64_t)uid;
×
2877
  } else {  // last
2878
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2879
  }
2880

2881
  *len = sizeof(uint64_t);
×
2882
}
×
2883

2884
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
2885
  tb_uid_t suid = 0;
×
2886

2887
  SMetaReader mr = {0};
×
2888
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
2889
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
2890
    metaReaderClear(&mr);  // table not esist
×
2891
    return 0;
×
2892
  }
2893

2894
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
2895
    suid = mr.me.ctbEntry.suid;
×
2896
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
2897
    suid = 0;
×
2898
  } else {
2899
    suid = 0;
×
2900
  }
2901

2902
  metaReaderClear(&mr);
×
2903

2904
  return suid;
×
2905
}
2906

2907
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
2908
  int32_t code = 0;
×
2909

2910
  if (pDelIdx) {
×
2911
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
2912
  }
2913

2914
  TAOS_RETURN(code);
×
2915
}
2916

2917
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
2918
  int32_t   code = 0;
×
2919
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
2920

2921
  for (; pDelData; pDelData = pDelData->pNext) {
×
2922
    if (!taosArrayPush(aDelData, pDelData)) {
×
2923
      TAOS_RETURN(terrno);
×
2924
    }
2925
  }
2926

2927
  TAOS_RETURN(code);
×
2928
}
2929

2930
static uint64_t *getUidList(SCacheRowsReader *pReader) {
490,516✔
2931
  if (!pReader->uidList) {
490,516✔
2932
    int32_t numOfTables = pReader->numOfTables;
30,617✔
2933

2934
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
30,617!
2935
    if (!pReader->uidList) {
30,617!
2936
      return NULL;
×
2937
    }
2938

2939
    for (int32_t i = 0; i < numOfTables; ++i) {
98,129✔
2940
      uint64_t uid = pReader->pTableList[i].uid;
67,512✔
2941
      pReader->uidList[i] = uid;
67,512✔
2942
    }
2943

2944
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
30,617✔
2945
  }
2946

2947
  return pReader->uidList;
490,516✔
2948
}
2949

2950
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
490,516✔
2951
                               bool isFile) {
2952
  int32_t   code = 0;
490,516✔
2953
  int32_t   numOfTables = pReader->numOfTables;
490,516✔
2954
  int64_t   suid = pReader->info.suid;
490,516✔
2955
  uint64_t *uidList = getUidList(pReader);
490,516✔
2956

2957
  if (!uidList) {
490,516!
2958
    TAOS_RETURN(terrno);
×
2959
  }
2960

2961
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
512,436!
2962
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
21,920✔
2963
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
21,920!
2964
      continue;
×
2965
    }
2966

2967
    if (pTombBlk->minTbid.suid > suid ||
21,920!
2968
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
21,920!
2969
      break;
2970
    }
2971

2972
    STombBlock block = {0};
21,920✔
2973
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
21,920✔
2974
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
21,920!
2975
    if (code != TSDB_CODE_SUCCESS) {
21,920!
2976
      TAOS_RETURN(code);
×
2977
    }
2978

2979
    uint64_t        uid = uidList[j];
21,920✔
2980
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
21,920✔
2981
    if (!pInfo) {
21,920!
2982
      tTombBlockDestroy(&block);
×
2983
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
2984
    }
2985

2986
    if (pInfo->pTombData == NULL) {
21,920✔
2987
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
2,740✔
2988
    }
2989

2990
    STombRecord record = {0};
21,920✔
2991
    bool        finished = false;
21,920✔
2992
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
43,840✔
2993
      code = tTombBlockGet(&block, k, &record);
21,920✔
2994
      if (code != TSDB_CODE_SUCCESS) {
21,920!
2995
        finished = true;
×
2996
        break;
×
2997
      }
2998

2999
      if (record.suid < suid) {
21,920!
3000
        continue;
×
3001
      }
3002
      if (record.suid > suid) {
21,920!
3003
        finished = true;
×
3004
        break;
×
3005
      }
3006

3007
      bool newTable = false;
21,920✔
3008
      if (uid < record.uid) {
21,920!
3009
        while (j < numOfTables && uidList[j] < record.uid) {
131,520!
3010
          ++j;
109,600✔
3011
          newTable = true;
109,600✔
3012
        }
3013

3014
        if (j >= numOfTables) {
21,920!
3015
          finished = true;
×
3016
          break;
×
3017
        }
3018

3019
        uid = uidList[j];
21,920✔
3020
      }
3021

3022
      if (record.uid < uid) {
21,920!
3023
        continue;
×
3024
      }
3025

3026
      if (newTable) {
21,920!
3027
        pInfo = getTableLoadInfo(pReader, uid);
21,920✔
3028
        if (!pInfo) {
21,920!
3029
          code = TSDB_CODE_OUT_OF_MEMORY;
×
3030
          finished = true;
×
3031
          break;
×
3032
        }
3033
        if (pInfo->pTombData == NULL) {
21,920✔
3034
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
2,740✔
3035
          if (!pInfo->pTombData) {
2,740!
3036
            code = terrno;
×
3037
            finished = true;
×
3038
            break;
×
3039
          }
3040
        }
3041
      }
3042

3043
      if (record.version <= pReader->info.verRange.maxVer) {
21,920!
3044
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
3045
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
3046

3047
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
21,920✔
3048
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
43,840!
3049
          TAOS_RETURN(terrno);
×
3050
        }
3051
      }
3052
    }
3053

3054
    tTombBlockDestroy(&block);
21,920✔
3055

3056
    if (finished) {
21,920!
3057
      TAOS_RETURN(code);
×
3058
    }
3059
  }
3060

3061
  TAOS_RETURN(TSDB_CODE_SUCCESS);
490,516✔
3062
}
3063

3064
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
21,920✔
3065
  const TTombBlkArray *pBlkArray = NULL;
21,920✔
3066

3067
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
21,920!
3068

3069
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
21,920✔
3070
}
3071

3072
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
468,596✔
3073
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
468,596✔
3074
  const TTombBlkArray *pBlkArray = NULL;
468,596✔
3075

3076
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
468,596!
3077

3078
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
468,096✔
3079
}
3080

3081
typedef struct {
3082
  SMergeTree  mergeTree;
3083
  SMergeTree *pMergeTree;
3084
} SFSLastIter;
3085

3086
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
490,516✔
3087
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
3088
  int32_t code = 0;
490,516✔
3089
  destroySttBlockReader(pr->pLDataIterArray, NULL);
490,516✔
3090
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
490,516✔
3091
  if (pr->pLDataIterArray == NULL) return terrno;
490,516!
3092

3093
  SMergeTreeConf conf = {
490,516✔
3094
      .uid = uid,
3095
      .suid = suid,
3096
      .pTsdb = pTsdb,
3097
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
3098
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
3099
      .strictTimeRange = false,
3100
      .pSchema = pTSchema,
3101
      .pCurrentFileset = pFileSet,
3102
      .backward = 1,
3103
      .pSttFileBlockIterArray = pr->pLDataIterArray,
490,516✔
3104
      .pCols = aCols,
3105
      .numOfCols = nCols,
3106
      .loadTombFn = loadSttTomb,
3107
      .pReader = pr,
3108
      .idstr = pr->idstr,
490,516✔
3109
      .pCurRowKey = &pr->rowKey,
490,516✔
3110
  };
3111

3112
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
490,516!
3113

3114
  iter->pMergeTree = &iter->mergeTree;
490,516✔
3115

3116
  TAOS_RETURN(code);
490,516✔
3117
}
3118

3119
static int32_t lastIterClose(SFSLastIter **iter) {
2,740✔
3120
  int32_t code = 0;
2,740✔
3121

3122
  if ((*iter)->pMergeTree) {
2,740!
3123
    tMergeTreeClose((*iter)->pMergeTree);
2,740✔
3124
    (*iter)->pMergeTree = NULL;
2,740✔
3125
  }
3126

3127
  *iter = NULL;
2,740✔
3128

3129
  TAOS_RETURN(code);
2,740✔
3130
}
3131

3132
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
496,353✔
3133
  bool hasVal = false;
496,353✔
3134
  *ppRow = NULL;
496,353✔
3135

3136
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
496,353✔
3137
  if (code != 0) {
496,353!
3138
    return code;
×
3139
  }
3140

3141
  if (!hasVal) {
496,353!
3142
    *ppRow = NULL;
449,411✔
3143
    TAOS_RETURN(code);
449,411✔
3144
  }
3145

3146
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
46,942✔
3147
  TAOS_RETURN(code);
46,942✔
3148
}
3149

3150
typedef enum SFSNEXTROWSTATES {
3151
  SFSNEXTROW_FS,
3152
  SFSNEXTROW_FILESET,
3153
  SFSNEXTROW_INDEXLIST,
3154
  SFSNEXTROW_BRINBLOCK,
3155
  SFSNEXTROW_BRINRECORD,
3156
  SFSNEXTROW_BLOCKDATA,
3157
  SFSNEXTROW_BLOCKROW,
3158
  SFSNEXTROW_NEXTSTTROW
3159
} SFSNEXTROWSTATES;
3160

3161
struct CacheNextRowIter;
3162

3163
typedef struct SFSNextRowIter {
3164
  SFSNEXTROWSTATES         state;         // [input]
3165
  SBlockIdx               *pBlockIdxExp;  // [input]
3166
  STSchema                *pTSchema;      // [input]
3167
  tb_uid_t                 suid;
3168
  tb_uid_t                 uid;
3169
  int32_t                  iFileSet;
3170
  STFileSet               *pFileSet;
3171
  TFileSetArray           *aDFileSet;
3172
  SArray                  *pIndexList;
3173
  int32_t                  iBrinIndex;
3174
  SBrinBlock               brinBlock;
3175
  SBrinBlock              *pBrinBlock;
3176
  int32_t                  iBrinRecord;
3177
  SBrinRecord              brinRecord;
3178
  SBlockData               blockData;
3179
  SBlockData              *pBlockData;
3180
  int32_t                  nRow;
3181
  int32_t                  iRow;
3182
  TSDBROW                  row;
3183
  int64_t                  lastTs;
3184
  SFSLastIter              lastIter;
3185
  SFSLastIter             *pLastIter;
3186
  int8_t                   lastEmpty;
3187
  TSDBROW                 *pLastRow;
3188
  SRow                    *pTSRow;
3189
  SRowMerger               rowMerger;
3190
  SCacheRowsReader        *pr;
3191
  struct CacheNextRowIter *pRowIter;
3192
} SFSNextRowIter;
3193

3194
static void clearLastFileSet(SFSNextRowIter *state);
3195

3196
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
133,901✔
3197
                                int nCols) {
3198
  int32_t         code = 0, lino = 0;
133,901✔
3199
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
133,901✔
3200
  STsdb          *pTsdb = state->pr->pTsdb;
133,901✔
3201

3202
  if (SFSNEXTROW_FS == state->state) {
133,901✔
3203
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
130,804✔
3204

3205
    state->state = SFSNEXTROW_FILESET;
131,552✔
3206
  }
3207

3208
  if (SFSNEXTROW_FILESET == state->state) {
134,649✔
3209
  _next_fileset:
575,483✔
3210
    clearLastFileSet(state);
575,483✔
3211

3212
    if (--state->iFileSet < 0) {
575,483✔
3213
      *ppRow = NULL;
84,967✔
3214

3215
      TAOS_RETURN(code);
84,967✔
3216
    } else {
3217
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
490,516✔
3218
    }
3219

3220
    STFileObj **pFileObj = state->pFileSet->farr;
490,516✔
3221
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
490,516!
3222
      if (state->pFileSet != state->pr->pCurFileSet) {
21,920!
3223
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
21,920✔
3224
        const char           *filesName[4] = {0};
21,920✔
3225
        if (pFileObj[0] != NULL) {
21,920!
3226
          conf.files[0].file = *pFileObj[0]->f;
21,920✔
3227
          conf.files[0].exist = true;
21,920✔
3228
          filesName[0] = pFileObj[0]->fname;
21,920✔
3229

3230
          conf.files[1].file = *pFileObj[1]->f;
21,920✔
3231
          conf.files[1].exist = true;
21,920✔
3232
          filesName[1] = pFileObj[1]->fname;
21,920✔
3233

3234
          conf.files[2].file = *pFileObj[2]->f;
21,920✔
3235
          conf.files[2].exist = true;
21,920✔
3236
          filesName[2] = pFileObj[2]->fname;
21,920✔
3237
        }
3238

3239
        if (pFileObj[3] != NULL) {
21,920!
3240
          conf.files[3].exist = true;
21,920✔
3241
          conf.files[3].file = *pFileObj[3]->f;
21,920✔
3242
          filesName[3] = pFileObj[3]->fname;
21,920✔
3243
        }
3244

3245
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
21,920!
3246

3247
        state->pr->pCurFileSet = state->pFileSet;
21,920✔
3248

3249
        code = loadDataTomb(state->pr, state->pr->pFileReader);
21,920✔
3250
        if (code != TSDB_CODE_SUCCESS) {
21,920!
3251
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3252
                    tstrerror(code));
3253
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3254
        }
3255

3256
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
21,920!
3257
      }
3258

3259
      if (!state->pIndexList) {
21,920!
3260
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
21,920✔
3261
        if (!state->pIndexList) {
21,920!
3262
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3263
        }
3264
      } else {
3265
        taosArrayClear(state->pIndexList);
×
3266
      }
3267

3268
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
21,920✔
3269

3270
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
43,840✔
3271
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
21,920✔
3272
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
21,920!
3273
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
21,920✔
3274
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
5,480!
3275
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3276
            }
3277
          }
3278
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
3279
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
3280
          break;
3281
        }
3282
      }
3283

3284
      int indexSize = TARRAY_SIZE(state->pIndexList);
21,920✔
3285
      if (indexSize <= 0) {
21,920✔
3286
        goto _check_stt_data;
19,180✔
3287
      }
3288

3289
      state->state = SFSNEXTROW_INDEXLIST;
2,740✔
3290
      state->iBrinIndex = 1;
2,740✔
3291
    }
3292

3293
  _check_stt_data:
490,516✔
3294
    if (state->pFileSet != state->pr->pCurFileSet) {
490,516✔
3295
      state->pr->pCurFileSet = state->pFileSet;
466,596✔
3296
    }
3297

3298
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
490,516!
3299
                                 state->pr, state->lastTs, aCols, nCols),
3300
                    &lino, _err);
3301

3302
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
490,516!
3303

3304
    if (!state->pLastRow) {
490,516✔
3305
      state->lastEmpty = 1;
444,924✔
3306

3307
      if (SFSNEXTROW_INDEXLIST != state->state) {
444,924✔
3308
        clearLastFileSet(state);
442,184✔
3309
        goto _next_fileset;
442,184✔
3310
      }
3311
    } else {
3312
      state->lastEmpty = 0;
45,592✔
3313

3314
      if (SFSNEXTROW_INDEXLIST != state->state) {
45,592!
3315
        state->state = SFSNEXTROW_NEXTSTTROW;
45,592✔
3316

3317
        *ppRow = state->pLastRow;
45,592✔
3318
        state->pLastRow = NULL;
45,592✔
3319

3320
        TAOS_RETURN(code);
45,592✔
3321
      }
3322
    }
3323

3324
    state->pLastIter = &state->lastIter;
2,740✔
3325
  }
3326

3327
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
5,089✔
3328
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
3,097!
3329

3330
    if (!state->pLastRow) {
3,097✔
3331
      if (state->pLastIter) {
1,747!
3332
        code = lastIterClose(&state->pLastIter);
×
3333
        if (code != TSDB_CODE_SUCCESS) {
×
3334
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3335
                    tstrerror(code));
3336
          TAOS_RETURN(code);
×
3337
        }
3338
      }
3339

3340
      clearLastFileSet(state);
1,747✔
3341
      state->state = SFSNEXTROW_FILESET;
1,747✔
3342
      goto _next_fileset;
1,747✔
3343
    } else {
3344
      *ppRow = state->pLastRow;
1,350✔
3345
      state->pLastRow = NULL;
1,350✔
3346

3347
      TAOS_RETURN(code);
1,350✔
3348
    }
3349
  }
3350

3351
  if (SFSNEXTROW_INDEXLIST == state->state) {
2,740!
3352
    SBrinBlk *pBrinBlk = NULL;
2,740✔
3353
  _next_brinindex:
2,740✔
3354
    if (--state->iBrinIndex < 0) {
2,740!
3355
      if (state->pLastRow) {
×
3356
        state->state = SFSNEXTROW_NEXTSTTROW;
×
3357
        *ppRow = state->pLastRow;
×
3358
        state->pLastRow = NULL;
×
3359
        return code;
×
3360
      }
3361

3362
      clearLastFileSet(state);
×
3363
      goto _next_fileset;
×
3364
    } else {
3365
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
2,740✔
3366
    }
3367

3368
    if (!state->pBrinBlock) {
2,740!
3369
      state->pBrinBlock = &state->brinBlock;
2,740✔
3370
    } else {
3371
      tBrinBlockClear(&state->brinBlock);
×
3372
    }
3373

3374
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
2,740!
3375

3376
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
2,740✔
3377
    state->state = SFSNEXTROW_BRINBLOCK;
2,740✔
3378
  }
3379

3380
  if (SFSNEXTROW_BRINBLOCK == state->state) {
2,740!
3381
  _next_brinrecord:
2,740✔
3382
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
2,740!
3383
      tBrinBlockClear(&state->brinBlock);
×
3384
      goto _next_brinindex;
×
3385
    }
3386

3387
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
2,740!
3388

3389
    SBrinRecord *pRecord = &state->brinRecord;
2,740✔
3390
    if (pRecord->uid != state->uid) {
2,740!
3391
      // TODO: goto next brin block early
3392
      --state->iBrinRecord;
×
3393
      goto _next_brinrecord;
×
3394
    }
3395

3396
    state->state = SFSNEXTROW_BRINRECORD;
2,740✔
3397
  }
3398

3399
  if (SFSNEXTROW_BRINRECORD == state->state) {
2,740!
3400
    SBrinRecord *pRecord = &state->brinRecord;
2,740✔
3401

3402
    if (!state->pBlockData) {
2,740!
3403
      state->pBlockData = &state->blockData;
2,740✔
3404

3405
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
2,740!
3406
    } else {
3407
      tBlockDataReset(state->pBlockData);
×
3408
    }
3409

3410
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
2,740!
3411
      --nCols;
2,740✔
3412
      ++aCols;
2,740✔
3413
    }
3414

3415
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
2,740!
3416
                                                      state->pTSchema, aCols, nCols),
3417
                    &lino, _err);
3418

3419
    state->nRow = state->blockData.nRow;
2,740✔
3420
    state->iRow = state->nRow - 1;
2,740✔
3421

3422
    state->state = SFSNEXTROW_BLOCKROW;
2,740✔
3423
  }
3424

3425
  if (SFSNEXTROW_BLOCKROW == state->state) {
2,740!
3426
    if (state->iRow < 0) {
2,740!
3427
      --state->iBrinRecord;
×
3428
      goto _next_brinrecord;
×
3429
    }
3430

3431
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
2,740✔
3432
    if (!state->pLastIter) {
2,740!
3433
      *ppRow = &state->row;
×
3434
      --state->iRow;
×
3435
      return code;
×
3436
    }
3437

3438
    if (!state->pLastRow) {
2,740!
3439
      // get next row from fslast and process with fs row, --state->Row if select fs row
3440
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
2,740!
3441
    }
3442

3443
    if (!state->pLastRow) {
2,740!
3444
      if (state->pLastIter) {
2,740!
3445
        code = lastIterClose(&state->pLastIter);
2,740✔
3446
        if (code != TSDB_CODE_SUCCESS) {
2,740!
3447
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3448
                    tstrerror(code));
3449
          TAOS_RETURN(code);
×
3450
        }
3451
      }
3452

3453
      *ppRow = &state->row;
2,740✔
3454
      --state->iRow;
2,740✔
3455
      return code;
2,740✔
3456
    }
3457

3458
    // process state->pLastRow & state->row
3459
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
3460
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
3461
    if (lastRowTs > rowTs) {
×
3462
      *ppRow = state->pLastRow;
×
3463
      state->pLastRow = NULL;
×
3464

3465
      TAOS_RETURN(code);
×
3466
    } else if (lastRowTs < rowTs) {
×
3467
      *ppRow = &state->row;
×
3468
      --state->iRow;
×
3469

3470
      TAOS_RETURN(code);
×
3471
    } else {
3472
      // TODO: merge rows and *ppRow = mergedRow
3473
      SRowMerger *pMerger = &state->rowMerger;
×
3474
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
3475
      if (code != TSDB_CODE_SUCCESS) {
×
3476
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3477
                  tstrerror(code));
3478
        TAOS_RETURN(code);
×
3479
      }
3480

3481
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
3482
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
3483

3484
      if (state->pTSRow) {
×
3485
        taosMemoryFree(state->pTSRow);
×
3486
        state->pTSRow = NULL;
×
3487
      }
3488

3489
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
3490

3491
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
3492
      *ppRow = &state->row;
×
3493
      --state->iRow;
×
3494

3495
      tsdbRowMergerClear(pMerger);
×
3496

3497
      TAOS_RETURN(code);
×
3498
    }
3499
  }
3500

3501
_err:
×
3502
  clearLastFileSet(state);
×
3503

3504
  *ppRow = NULL;
×
3505

3506
  if (code) {
×
3507
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3508
              tstrerror(code));
3509
  }
3510

3511
  TAOS_RETURN(code);
×
3512
}
3513

3514
typedef struct CacheNextRowIter {
3515
  SArray           *pMemDelData;
3516
  SArray           *pSkyline;
3517
  int64_t           iSkyline;
3518
  SBlockIdx         idx;
3519
  SMemNextRowIter   memState;
3520
  SMemNextRowIter   imemState;
3521
  SFSNextRowIter    fsState;
3522
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
3523
  TsdbNextRowState  input[3];
3524
  SCacheRowsReader *pr;
3525
  STsdb            *pTsdb;
3526
} CacheNextRowIter;
3527

3528
int32_t clearNextRowFromFS(void *iter) {
131,552✔
3529
  int32_t code = 0;
131,552✔
3530

3531
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
131,552✔
3532
  if (!state) {
131,552!
3533
    TAOS_RETURN(code);
×
3534
  }
3535

3536
  if (state->pLastIter) {
131,552!
3537
    code = lastIterClose(&state->pLastIter);
×
3538
    if (code != TSDB_CODE_SUCCESS) {
×
3539
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3540
      TAOS_RETURN(code);
×
3541
    }
3542
  }
3543

3544
  if (state->pBlockData) {
131,552✔
3545
    tBlockDataDestroy(state->pBlockData);
2,740✔
3546
    state->pBlockData = NULL;
2,740✔
3547
  }
3548

3549
  if (state->pBrinBlock) {
131,552✔
3550
    tBrinBlockDestroy(state->pBrinBlock);
2,740✔
3551
    state->pBrinBlock = NULL;
2,740✔
3552
  }
3553

3554
  if (state->pIndexList) {
131,552✔
3555
    taosArrayDestroy(state->pIndexList);
21,920✔
3556
    state->pIndexList = NULL;
21,920✔
3557
  }
3558

3559
  if (state->pTSRow) {
131,552!
3560
    taosMemoryFree(state->pTSRow);
×
3561
    state->pTSRow = NULL;
×
3562
  }
3563

3564
  if (state->pRowIter->pSkyline) {
131,552✔
3565
    taosArrayDestroy(state->pRowIter->pSkyline);
126,112✔
3566
    state->pRowIter->pSkyline = NULL;
126,112✔
3567
  }
3568

3569
  TAOS_RETURN(code);
131,552✔
3570
}
3571

3572
static void clearLastFileSet(SFSNextRowIter *state) {
1,019,414✔
3573
  if (state->pLastIter) {
1,019,414!
3574
    int code = lastIterClose(&state->pLastIter);
×
3575
    if (code != TSDB_CODE_SUCCESS) {
×
3576
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3577
      return;
×
3578
    }
3579
  }
3580

3581
  if (state->pBlockData) {
1,019,414!
3582
    tBlockDataDestroy(state->pBlockData);
×
3583
    state->pBlockData = NULL;
×
3584
  }
3585

3586
  if (state->pr->pFileReader) {
1,018,666✔
3587
    tsdbDataFileReaderClose(&state->pr->pFileReader);
21,920✔
3588
    state->pr->pFileReader = NULL;
21,920✔
3589

3590
    state->pr->pCurFileSet = NULL;
21,920✔
3591
  }
3592

3593
  if (state->pTSRow) {
1,019,414!
3594
    taosMemoryFree(state->pTSRow);
×
3595
    state->pTSRow = NULL;
×
3596
  }
3597

3598
  if (state->pRowIter->pSkyline) {
1,018,666✔
3599
    taosArrayDestroy(state->pRowIter->pSkyline);
1,747✔
3600
    state->pRowIter->pSkyline = NULL;
1,747✔
3601

3602
    void   *pe = NULL;
1,747✔
3603
    int32_t iter = 0;
1,747✔
3604
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
3,494✔
3605
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
1,747✔
3606
      taosArrayDestroy(pInfo->pTombData);
1,747✔
3607
      pInfo->pTombData = NULL;
1,747✔
3608
    }
3609
  }
3610
}
3611

3612
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
130,804✔
3613
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
3614
                               SCacheRowsReader *pr) {
3615
  int32_t code = 0, lino = 0;
130,804✔
3616

3617
  STbData *pMem = NULL;
131,552✔
3618
  if (pReadSnap->pMem) {
131,552!
3619
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
131,552✔
3620
  }
3621

3622
  STbData *pIMem = NULL;
131,552✔
3623
  if (pReadSnap->pIMem) {
131,552!
3624
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
3625
  }
3626

3627
  pIter->pTsdb = pTsdb;
131,552✔
3628

3629
  pIter->pMemDelData = NULL;
131,552✔
3630

3631
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
131,552!
3632

3633
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
131,552✔
3634

3635
  pIter->fsState.pRowIter = pIter;
130,804✔
3636
  pIter->fsState.state = SFSNEXTROW_FS;
131,552✔
3637
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
131,552✔
3638
  pIter->fsState.pBlockIdxExp = &pIter->idx;
130,804✔
3639
  pIter->fsState.pTSchema = pTSchema;
131,552✔
3640
  pIter->fsState.suid = suid;
131,552✔
3641
  pIter->fsState.uid = uid;
131,552✔
3642
  pIter->fsState.lastTs = lastTs;
130,804✔
3643
  pIter->fsState.pr = pr;
130,804✔
3644

3645
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
131,552✔
3646
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
131,552✔
3647
  pIter->input[2] =
131,552✔
3648
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
130,804✔
3649

3650
  if (pMem) {
131,552✔
3651
    pIter->memState.pMem = pMem;
79,527✔
3652
    pIter->memState.state = SMEMNEXTROW_ENTER;
78,779✔
3653
    pIter->memState.lastTs = lastTs;
79,527✔
3654
    pIter->input[0].stop = false;
78,779✔
3655
    pIter->input[0].next = true;
78,779✔
3656
  }
3657

3658
  if (pIMem) {
130,804!
3659
    pIter->imemState.pMem = pIMem;
×
3660
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
3661
    pIter->imemState.lastTs = lastTs;
×
3662
    pIter->input[1].stop = false;
×
3663
    pIter->input[1].next = true;
×
3664
  }
3665

3666
  pIter->pr = pr;
130,804✔
3667

3668
_err:
131,552✔
3669
  TAOS_RETURN(code);
131,552✔
3670
}
3671

3672
static void nextRowIterClose(CacheNextRowIter *pIter) {
131,552✔
3673
  for (int i = 0; i < 3; ++i) {
526,208✔
3674
    if (pIter->input[i].nextRowClearFn) {
394,656✔
3675
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
131,552✔
3676
    }
3677
  }
3678

3679
  if (pIter->pSkyline) {
131,552!
3680
    taosArrayDestroy(pIter->pSkyline);
×
3681
  }
3682

3683
  if (pIter->pMemDelData) {
131,552!
3684
    taosArrayDestroy(pIter->pMemDelData);
130,804✔
3685
  }
3686
}
131,552✔
3687

3688
// iterate next row non deleted backward ts, version (from high to low)
3689
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
149,342✔
3690
                              int16_t *aCols, int nCols) {
3691
  int32_t code = 0, lino = 0;
149,342✔
3692

3693
  for (;;) {
999✔
3694
    for (int i = 0; i < 3; ++i) {
602,860✔
3695
      if (pIter->input[i].next && !pIter->input[i].stop) {
452,519!
3696
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
229,868!
3697
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3698
                        &lino, _err);
3699

3700
        if (pIter->input[i].pRow == NULL) {
230,616✔
3701
          pIter->input[i].stop = true;
94,557✔
3702
          pIter->input[i].next = false;
94,557✔
3703
        }
3704
      }
3705
    }
3706

3707
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
150,341!
3708
      *ppRow = NULL;
15,030✔
3709
      *pIgnoreEarlierTs =
30,060✔
3710
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
15,030!
3711

3712
      TAOS_RETURN(code);
15,030✔
3713
    }
3714

3715
    // select maxpoint(s) from mem, imem, fs and last
3716
    TSDBROW *max[4] = {0};
136,059✔
3717
    int      iMax[4] = {-1, -1, -1, -1};
136,059✔
3718
    int      nMax = 0;
136,059✔
3719
    SRowKey  maxKey = {.ts = TSKEY_MIN};
136,059✔
3720

3721
    for (int i = 0; i < 3; ++i) {
541,992✔
3722
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
406,681!
3723
        STsdbRowKey tsdbRowKey = {0};
137,806✔
3724
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
137,806✔
3725

3726
        // merging & deduplicating on client side
3727
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
137,806✔
3728
        if (c <= 0) {
137,806!
3729
          if (c < 0) {
137,806!
3730
            nMax = 0;
137,806✔
3731
            maxKey = tsdbRowKey.key;
137,806✔
3732
          }
3733

3734
          iMax[nMax] = i;
137,806✔
3735
          max[nMax++] = pIter->input[i].pRow;
137,058✔
3736
        }
3737
        pIter->input[i].next = false;
137,058✔
3738
      }
3739
    }
3740

3741
    // delete detection
3742
    TSDBROW *merge[4] = {0};
135,311✔
3743
    int      iMerge[4] = {-1, -1, -1, -1};
135,311✔
3744
    int      nMerge = 0;
136,059✔
3745
    for (int i = 0; i < nMax; ++i) {
271,370✔
3746
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
136,059✔
3747

3748
      if (!pIter->pSkyline) {
135,311✔
3749
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
127,111✔
3750
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
127,859!
3751

3752
        uint64_t        uid = pIter->idx.uid;
127,859✔
3753
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
127,859✔
3754
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
127,111!
3755

3756
        if (pInfo->pTombData == NULL) {
127,111✔
3757
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
121,631✔
3758
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
122,379!
3759
        }
3760

3761
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
127,859!
3762
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3763
        }
3764

3765
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
127,859✔
3766
        if (delSize > 0) {
127,859✔
3767
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
6,234✔
3768
          TAOS_CHECK_GOTO(code, &lino, _err);
6,234!
3769
        }
3770
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
127,859✔
3771
      }
3772

3773
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
136,807✔
3774
      if (!deleted) {
136,059✔
3775
        iMerge[nMerge] = iMax[i];
134,312✔
3776
        merge[nMerge++] = max[i];
133,564✔
3777
      }
3778

3779
      pIter->input[iMax[i]].next = deleted;
135,311✔
3780
    }
3781

3782
    if (nMerge > 0) {
135,311✔
3783
      pIter->input[iMerge[0]].next = true;
133,564✔
3784

3785
      *ppRow = merge[0];
133,564✔
3786

3787
      TAOS_RETURN(code);
133,564✔
3788
    }
3789
  }
3790

3791
_err:
×
3792
  if (code) {
×
3793
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3794
  }
3795

3796
  TAOS_RETURN(code);
×
3797
}
3798

3799
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
131,552✔
3800
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
131,552✔
3801
  if (NULL == pColArray) {
131,552!
3802
    TAOS_RETURN(terrno);
×
3803
  }
3804

3805
  for (int32_t i = 0; i < nCols; ++i) {
517,569✔
3806
    int16_t  slotId = slotIds[i];
386,017✔
3807
    SLastCol col = {.rowKey.ts = 0,
386,017✔
3808
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
386,017✔
3809
    if (!taosArrayPush(pColArray, &col)) {
386,017!
3810
      TAOS_RETURN(terrno);
×
3811
    }
3812
  }
3813
  *ppColArray = pColArray;
131,552✔
3814

3815
  TAOS_RETURN(TSDB_CODE_SUCCESS);
131,552✔
3816
}
3817

3818
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
66,780✔
3819
                            int nCols, int16_t *slotIds) {
3820
  int32_t   code = 0, lino = 0;
66,780✔
3821
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
66,780✔
3822
  int16_t   nLastCol = nCols;
66,780✔
3823
  int16_t   noneCol = 0;
66,780✔
3824
  bool      setNoneCol = false;
66,780✔
3825
  bool      hasRow = false;
66,780✔
3826
  bool      ignoreEarlierTs = false;
66,780✔
3827
  SArray   *pColArray = NULL;
66,780✔
3828
  SColVal  *pColVal = &(SColVal){0};
66,780✔
3829

3830
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
66,780!
3831

3832
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
66,780✔
3833
  if (NULL == aColArray) {
66,780!
3834
    taosArrayDestroy(pColArray);
×
3835

3836
    TAOS_RETURN(terrno);
×
3837
  }
3838

3839
  for (int i = 0; i < nCols; ++i) {
264,760✔
3840
    if (!taosArrayPush(aColArray, &aCols[i])) {
395,960!
3841
      taosArrayDestroy(pColArray);
×
3842

3843
      TAOS_RETURN(terrno);
×
3844
    }
3845
  }
3846

3847
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
66,780✔
3848

3849
  // inverse iterator
3850
  CacheNextRowIter iter = {0};
66,780✔
3851
  code =
3852
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
66,780✔
3853
  TAOS_CHECK_GOTO(code, &lino, _err);
66,780!
3854

3855
  do {
3856
    TSDBROW *pRow = NULL;
84,570✔
3857
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
84,570✔
3858

3859
    if (!pRow) {
84,570✔
3860
      break;
12,310✔
3861
    }
3862

3863
    hasRow = true;
72,260✔
3864

3865
    int32_t sversion = TSDBROW_SVERSION(pRow);
72,260✔
3866
    if (sversion != -1) {
72,260✔
3867
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
68,190!
3868

3869
      pTSchema = pr->pCurrSchema;
68,190✔
3870
    }
3871
    // int16_t nCol = pTSchema->numOfCols;
3872

3873
    STsdbRowKey rowKey = {0};
72,260✔
3874
    tsdbRowGetKey(pRow, &rowKey);
72,260✔
3875

3876
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
72,260✔
3877
      lastRowKey = rowKey;
64,060✔
3878

3879
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
250,392✔
3880
        if (iCol >= nLastCol) {
187,080!
3881
          break;
×
3882
        }
3883
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
187,080✔
3884
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
187,080!
3885
          if (!setNoneCol) {
×
3886
            noneCol = iCol;
×
3887
            setNoneCol = true;
×
3888
          }
3889
          continue;
×
3890
        }
3891
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
187,080✔
3892
          continue;
40,720✔
3893
        }
3894
        if (slotIds[iCol] == 0) {
145,612✔
3895
          STColumn *pTColumn = &pTSchema->columns[0];
63,312✔
3896
          SValue    val = {.type = pTColumn->type};
63,312✔
3897
          VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
63,312✔
3898
          *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
63,312✔
3899

3900
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
64,060✔
3901
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
64,060!
3902

3903
          taosArraySet(pColArray, 0, &colTmp);
64,060✔
3904
          continue;
64,060✔
3905
        }
3906
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
82,300✔
3907

3908
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
82,300✔
3909
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
82,300!
3910

3911
        if (!COL_VAL_IS_VALUE(pColVal)) {
82,300✔
3912
          if (!setNoneCol) {
24,620✔
3913
            noneCol = iCol;
15,050✔
3914
            setNoneCol = true;
15,050✔
3915
          }
3916
        } else {
3917
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
57,680✔
3918
          if (aColIndex >= 0) {
57,680!
3919
            taosArrayRemove(aColArray, aColIndex);
57,680✔
3920
          }
3921
        }
3922
      }
3923
      if (!setNoneCol) {
63,312✔
3924
        // done, goto return pColArray
3925
        break;
49,010✔
3926
      } else {
3927
        continue;
14,302✔
3928
      }
3929
    }
3930

3931
    // merge into pColArray
3932
    setNoneCol = false;
8,200✔
3933
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
28,710✔
3934
      if (iCol >= nLastCol) {
20,510!
3935
        break;
×
3936
      }
3937
      // high version's column value
3938
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
20,510!
3939
        continue;
×
3940
      }
3941

3942
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
20,510✔
3943
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
20,510!
3944
        continue;
×
3945
      }
3946
      SColVal *tColVal = &lastColVal->colVal;
20,510✔
3947
      if (COL_VAL_IS_VALUE(tColVal)) continue;
20,510✔
3948

3949
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
13,660✔
3950
      if (COL_VAL_IS_VALUE(pColVal)) {
13,660✔
3951
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
10,920✔
3952
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
10,920!
3953

3954
        tsdbCacheFreeSLastColItem(lastColVal);
10,920✔
3955
        taosArraySet(pColArray, iCol, &lastCol);
10,920✔
3956
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
10,920✔
3957
        if (aColIndex >= 0) {
10,920!
3958
          taosArrayRemove(aColArray, aColIndex);
10,920✔
3959
        }
3960
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
2,740!
3961
        noneCol = iCol;
2,740✔
3962
        setNoneCol = true;
2,740✔
3963
      }
3964
    }
3965
  } while (setNoneCol);
23,250✔
3966

3967
  if (!hasRow) {
66,780✔
3968
    if (ignoreEarlierTs) {
2,720!
3969
      taosArrayDestroy(pColArray);
×
3970
      pColArray = NULL;
×
3971
    } else {
3972
      taosArrayClear(pColArray);
2,720✔
3973
    }
3974
  }
3975
  *ppLastArray = pColArray;
66,780✔
3976

3977
  nextRowIterClose(&iter);
66,780✔
3978
  taosArrayDestroy(aColArray);
66,780✔
3979

3980
  TAOS_RETURN(code);
66,780✔
3981

3982
_err:
×
3983
  nextRowIterClose(&iter);
×
3984
  // taosMemoryFreeClear(pTSchema);
3985
  *ppLastArray = NULL;
×
3986
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
3987
  taosArrayDestroy(aColArray);
×
3988

3989
  if (code) {
×
3990
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3991
              tstrerror(code));
3992
  }
3993

3994
  TAOS_RETURN(code);
×
3995
}
3996

3997
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
64,772✔
3998
                               int nCols, int16_t *slotIds) {
3999
  int32_t   code = 0, lino = 0;
64,772✔
4000
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
64,772✔
4001
  int16_t   nLastCol = nCols;
64,772✔
4002
  int16_t   noneCol = 0;
64,772✔
4003
  bool      setNoneCol = false;
64,772✔
4004
  bool      hasRow = false;
64,772✔
4005
  bool      ignoreEarlierTs = false;
64,772✔
4006
  SArray   *pColArray = NULL;
64,772✔
4007
  SColVal  *pColVal = &(SColVal){0};
64,772✔
4008

4009
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
64,772!
4010

4011
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
64,772✔
4012
  if (NULL == aColArray) {
64,772!
4013
    taosArrayDestroy(pColArray);
×
4014

4015
    TAOS_RETURN(terrno);
×
4016
  }
4017

4018
  for (int i = 0; i < nCols; ++i) {
252,809✔
4019
    if (!taosArrayPush(aColArray, &aCols[i])) {
376,074!
4020
      taosArrayDestroy(pColArray);
×
4021

4022
      TAOS_RETURN(terrno);
×
4023
    }
4024
  }
4025

4026
  // inverse iterator
4027
  CacheNextRowIter iter = {0};
64,772✔
4028
  code =
4029
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
64,772✔
4030
  TAOS_CHECK_GOTO(code, &lino, _err);
64,772!
4031

4032
  do {
4033
    TSDBROW *pRow = NULL;
64,772✔
4034
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
64,772✔
4035

4036
    if (!pRow) {
64,772✔
4037
      break;
2,720✔
4038
    }
4039

4040
    hasRow = true;
62,052✔
4041

4042
    int32_t sversion = TSDBROW_SVERSION(pRow);
62,052✔
4043
    if (sversion != -1) {
62,052✔
4044
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
18,187!
4045

4046
      pTSchema = pr->pCurrSchema;
18,187✔
4047
    }
4048
    // int16_t nCol = pTSchema->numOfCols;
4049

4050
    STsdbRowKey rowKey = {0};
62,052✔
4051
    tsdbRowGetKey(pRow, &rowKey);
62,052✔
4052

4053
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
239,189✔
4054
      if (iCol >= nLastCol) {
177,137!
4055
        break;
×
4056
      }
4057
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
177,137✔
4058
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
177,137!
4059
        continue;
×
4060
      }
4061
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
177,137!
4062
        continue;
×
4063
      }
4064
      if (slotIds[iCol] == 0) {
177,137✔
4065
        STColumn *pTColumn = &pTSchema->columns[0];
62,052✔
4066
        SValue    val = {.type = pTColumn->type};
62,052✔
4067
        VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
62,052✔
4068
        *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
62,052✔
4069

4070
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
62,052✔
4071
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
62,052!
4072

4073
        taosArraySet(pColArray, 0, &colTmp);
62,052✔
4074
        continue;
62,052✔
4075
      }
4076
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
115,085✔
4077

4078
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
115,085✔
4079
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
115,085!
4080

4081
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
115,085✔
4082
      if (aColIndex >= 0) {
115,085!
4083
        taosArrayRemove(aColArray, aColIndex);
115,085✔
4084
      }
4085
    }
4086

4087
    break;
62,052✔
4088
  } while (1);
4089

4090
  if (!hasRow) {
64,772✔
4091
    if (ignoreEarlierTs) {
2,720!
4092
      taosArrayDestroy(pColArray);
×
4093
      pColArray = NULL;
×
4094
    } else {
4095
      taosArrayClear(pColArray);
2,720✔
4096
    }
4097
  }
4098
  *ppLastArray = pColArray;
64,772✔
4099

4100
  nextRowIterClose(&iter);
64,772✔
4101
  taosArrayDestroy(aColArray);
64,772✔
4102

4103
  TAOS_RETURN(code);
64,772✔
4104

4105
_err:
×
4106
  nextRowIterClose(&iter);
×
4107

4108
  *ppLastArray = NULL;
×
4109
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
4110
  taosArrayDestroy(aColArray);
×
4111

4112
  if (code) {
×
4113
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4114
              tstrerror(code));
4115
  }
4116

4117
  TAOS_RETURN(code);
×
4118
}
4119

4120
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
×
4121

4122
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
72,555✔
4123
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
72,555✔
4124
}
72,555✔
4125

4126
#ifdef BUILD_NO_CALL
4127
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
4128
#endif
4129

4130
size_t tsdbCacheGetUsage(SVnode *pVnode) {
443,833,225✔
4131
  size_t usage = 0;
443,833,225✔
4132
  if (pVnode->pTsdb != NULL) {
443,833,225!
4133
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
443,833,225✔
4134
  }
4135

4136
  return usage;
443,833,225✔
4137
}
4138

4139
int32_t tsdbCacheGetElems(SVnode *pVnode) {
443,833,225✔
4140
  int32_t elems = 0;
443,833,225✔
4141
  if (pVnode->pTsdb != NULL) {
443,833,225!
4142
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
443,833,225✔
4143
  }
4144

4145
  return elems;
443,833,225✔
4146
}
4147

4148
#ifdef USE_SHARED_STORAGE
4149
// block cache
4150
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
×
4151
  struct {
4152
    int32_t fid;
4153
    int64_t commitID;
4154
    int64_t blkno;
4155
  } bKey = {0};
×
4156

4157
  bKey.fid = fid;
×
4158
  bKey.commitID = commitID;
×
4159
  bKey.blkno = blkno;
×
4160

4161
  *len = sizeof(bKey);
×
4162
  memcpy(key, &bKey, *len);
×
4163
}
×
4164

4165
static int32_t tsdbCacheLoadBlockSs(STsdbFD *pFD, uint8_t **ppBlock) {
×
4166
  int32_t code = 0;
×
4167

4168
  int64_t block_size = tsSsBlockSize * pFD->szPage;
×
4169
  int64_t block_offset = (pFD->blkno - 1) * block_size;
×
4170

4171
  char *buf = taosMemoryMalloc(block_size);
×
4172
  if (buf == NULL) {
×
4173
    code = TSDB_CODE_OUT_OF_MEMORY;
×
4174
    goto _exit;
×
4175
  }
4176

4177
  // TODO: pFD->objName is not initialized, but this function is never called.
4178
  code = tssReadFileFromDefault(pFD->objName, block_offset, buf, &block_size);
×
4179
  if (code != TSDB_CODE_SUCCESS) {
×
4180
    taosMemoryFree(buf);
×
4181
    goto _exit;
×
4182
  }
4183
  *ppBlock = buf;
×
4184

4185
_exit:
×
4186
  return code;
×
4187
}
4188

4189
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
×
4190
  (void)ud;
4191
  uint8_t *pBlock = (uint8_t *)value;
×
4192

4193
  taosMemoryFree(pBlock);
×
4194
}
×
4195

4196
int32_t tsdbCacheGetBlockSs(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
4197
  int32_t code = 0;
×
4198
  char    key[128] = {0};
×
4199
  int     keyLen = 0;
×
4200

4201
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
4202
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
4203
  if (!h) {
×
4204
    STsdb *pTsdb = pFD->pTsdb;
×
4205
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
4206

4207
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
4208
    if (!h) {
×
4209
      uint8_t *pBlock = NULL;
×
4210
      code = tsdbCacheLoadBlockSs(pFD, &pBlock);
×
4211
      //  if table's empty or error, return code of -1
4212
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
4213
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4214

4215
        *handle = NULL;
×
4216
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
4217
          code = TSDB_CODE_OUT_OF_MEMORY;
×
4218
        }
4219

4220
        TAOS_RETURN(code);
×
4221
      }
4222

4223
      size_t              charge = tsSsBlockSize * pFD->szPage;
×
4224
      _taos_lru_deleter_t deleter = deleteBCache;
×
4225
      LRUStatus           status =
4226
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
4227
      if (status != TAOS_LRU_STATUS_OK) {
4228
        // code = -1;
4229
      }
4230
    }
4231

4232
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4233
  }
4234

4235
  *handle = h;
×
4236

4237
  TAOS_RETURN(code);
×
4238
}
4239

4240
int32_t tsdbCacheGetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
×
4241
  if (!tsSsEnabled) {
×
4242
    return TSDB_CODE_OPS_NOT_SUPPORT;
×
4243
  }
4244

4245
  int32_t code = 0;
×
4246
  char    key[128] = {0};
×
4247
  int     keyLen = 0;
×
4248

4249
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
4250
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
×
4251

4252
  return code;
×
4253
}
4254

4255
void tsdbCacheSetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
×
4256
  if (!tsSsEnabled) {
×
4257
    return;
×
4258
  }
4259

4260
  char       key[128] = {0};
×
4261
  int        keyLen = 0;
×
4262
  LRUHandle *handle = NULL;
×
4263

4264
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
4265
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
×
4266
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
×
4267
  if (!handle) {
×
4268
    size_t              charge = pFD->szPage;
×
4269
    _taos_lru_deleter_t deleter = deleteBCache;
×
4270
    uint8_t            *pPg = taosMemoryMalloc(charge);
×
4271
    if (!pPg) {
×
4272
      (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4273

4274
      return;  // ignore error with ss cache and leave error untouched
×
4275
    }
4276
    memcpy(pPg, pPage, charge);
×
4277

4278
    LRUStatus status =
4279
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
×
4280
    if (status != TAOS_LRU_STATUS_OK) {
4281
      // ignore cache updating if not ok
4282
      // code = TSDB_CODE_OUT_OF_MEMORY;
4283
    }
4284
  }
4285
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4286

4287
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
4288
}
4289
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc