• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4872

04 Dec 2025 01:55AM UTC coverage: 64.678% (+0.02%) from 64.654%
#4872

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

880 of 2219 new or added lines in 36 files covered. (39.66%)

6146 existing lines in 122 files now uncovered.

159679 of 246882 relevant lines covered (64.68%)

110947965.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.2
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbIter.h"
19
#include "tsdbReadUtil.h"
20
#include "tss.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
13,151,304✔
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
13,151,304✔
27
    tsdbTrace(" release lru cache failed");
2,326,348✔
28
  }
29
}
13,151,444✔
30

31
#ifdef USE_SHARED_STORAGE
32

33
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
×
34
  int32_t code = 0, lino = 0;
×
35
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
×
36
  int64_t szBlock = tsSsBlockSize <= 1024 ? 1024 : tsSsBlockSize;
×
37

38
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsBlockCacheSize * szBlock * szPage, 0, .5);
×
39
  if (pCache == NULL) {
×
40
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
41
  }
42

43
  taosLRUCacheSetStrictCapacity(pCache, false);
×
44

45
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
×
46

47
  pTsdb->bCache = pCache;
×
48

49
_err:
×
50
  if (code) {
×
51
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
52
              tstrerror(code));
53
  }
54

55
  TAOS_RETURN(code);
×
56
}
57

58
static void tsdbCloseBCache(STsdb *pTsdb) {
×
59
  SLRUCache *pCache = pTsdb->bCache;
×
60
  if (pCache) {
×
61
    int32_t elems = taosLRUCacheGetElems(pCache);
×
62
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
×
63
    taosLRUCacheEraseUnrefEntries(pCache);
×
64
    elems = taosLRUCacheGetElems(pCache);
×
65
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
×
66

67
    taosLRUCacheCleanup(pCache);
×
68

69
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
×
70
  }
71
}
×
72

73
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
×
74
  int32_t code = 0, lino = 0;
×
75
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
×
76

77
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsPageCacheSize * szPage, 0, .5);
×
78
  if (pCache == NULL) {
×
79
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
80
  }
81

82
  taosLRUCacheSetStrictCapacity(pCache, false);
×
83

84
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
×
85

86
  pTsdb->pgCache = pCache;
×
87

88
_err:
×
89
  if (code) {
×
90
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
91
  }
92

93
  TAOS_RETURN(code);
×
94
}
95

96
static void tsdbClosePgCache(STsdb *pTsdb) {
×
97
  SLRUCache *pCache = pTsdb->pgCache;
×
98
  if (pCache) {
×
99
    int32_t elems = taosLRUCacheGetElems(pCache);
×
100
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
×
101
    taosLRUCacheEraseUnrefEntries(pCache);
×
102
    elems = taosLRUCacheGetElems(pCache);
×
103
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
×
104

105
    taosLRUCacheCleanup(pCache);
×
106

107
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
×
108
  }
109
}
×
110

111
#endif  // USE_SHARED_STORAGE
112

113
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
114

115
enum {
116
  LFLAG_LAST_ROW = 0,
117
  LFLAG_LAST = 1,
118
};
119

120
typedef struct {
121
  tb_uid_t uid;
122
  int16_t  cid;
123
  int8_t   lflag;
124
} SLastKey;
125

126
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
127
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
128

129
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
4,171,574✔
130
  SVnode *pVnode = pTsdb->pVnode;
4,171,574✔
131
  vnodeGetPrimaryPath(pVnode, false, path, TSDB_FILENAME_LEN);
4,172,389✔
132

133
  int32_t offset = strlen(path);
4,174,522✔
134
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%scache.rdb", TD_DIRSEP, pTsdb->name, TD_DIRSEP);
4,174,522✔
135
}
4,174,522✔
136

137
static const char *myCmpName(void *state) {
22,171,681✔
138
  (void)state;
139
  return "myCmp";
22,171,681✔
140
}
141

142
static void myCmpDestroy(void *state) { (void)state; }
4,175,284✔
143

144
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
263,484,076✔
145
  (void)state;
146
  (void)alen;
147
  (void)blen;
148
  SLastKey *lhs = (SLastKey *)a;
263,484,076✔
149
  SLastKey *rhs = (SLastKey *)b;
263,484,076✔
150

151
  if (lhs->uid < rhs->uid) {
263,484,076✔
152
    return -1;
134,399,833✔
153
  } else if (lhs->uid > rhs->uid) {
129,089,081✔
154
    return 1;
40,628,973✔
155
  }
156

157
  if (lhs->cid < rhs->cid) {
88,461,528✔
158
    return -1;
35,521,845✔
159
  } else if (lhs->cid > rhs->cid) {
52,942,321✔
160
    return 1;
21,315,919✔
161
  }
162

163
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
31,626,884✔
164
    return -1;
9,933,608✔
165
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
21,693,276✔
166
    return 1;
18,131,541✔
167
  }
168

169
  return 0;
3,561,735✔
170
}
171

172
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
4,171,167✔
173
  int32_t code = 0, lino = 0;
4,171,167✔
174
#ifdef USE_ROCKSDB
175
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
4,175,114✔
176
  if (NULL == cmp) {
4,174,980✔
177
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
178
  }
179

180
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
4,174,980✔
181
  pTsdb->rCache.tableoptions = tableoptions;
4,174,242✔
182

183
  rocksdb_options_t *options = rocksdb_options_create();
4,175,004✔
184
  if (NULL == options) {
4,174,525✔
185
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
186
  }
187

188
  rocksdb_options_set_create_if_missing(options, 1);
4,174,525✔
189
  rocksdb_options_set_comparator(options, cmp);
4,174,582✔
190
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
4,174,558✔
191
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
4,175,072✔
192
  // rocksdb_options_set_inplace_update_support(options, 1);
193
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
194

195
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
4,175,072✔
196
  if (NULL == writeoptions) {
4,175,114✔
197
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
198
  }
199
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
4,175,114✔
200

201
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
4,175,284✔
202
  if (NULL == readoptions) {
4,174,412✔
203
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
204
  }
205

206
  char *err = NULL;
4,174,412✔
207
  char  cachePath[TSDB_FILENAME_LEN] = {0};
4,173,970✔
208
  tsdbGetRocksPath(pTsdb, cachePath);
4,174,370✔
209

210
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
4,175,284✔
211
  if (NULL == db) {
4,175,284✔
212
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
213
    rocksdb_free(err);
×
214

215
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
216
  }
217

218
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
4,175,284✔
219
  if (NULL == flushoptions) {
4,175,284✔
220
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
221
  }
222

223
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
4,175,284✔
224

225
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
4,175,284✔
226

227
  pTsdb->rCache.writebatch = writebatch;
4,175,284✔
228
  pTsdb->rCache.my_comparator = cmp;
4,175,284✔
229
  pTsdb->rCache.options = options;
4,175,284✔
230
  pTsdb->rCache.writeoptions = writeoptions;
4,174,801✔
231
  pTsdb->rCache.readoptions = readoptions;
4,175,284✔
232
  pTsdb->rCache.flushoptions = flushoptions;
4,175,284✔
233
  pTsdb->rCache.db = db;
4,175,284✔
234
  pTsdb->rCache.sver = -1;
4,175,284✔
235
  pTsdb->rCache.suid = -1;
4,175,284✔
236
  pTsdb->rCache.uid = -1;
4,175,284✔
237
  pTsdb->rCache.pTSchema = NULL;
4,175,284✔
238
  pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
4,174,801✔
239
  if (!pTsdb->rCache.ctxArray) {
4,175,284✔
240
    TAOS_CHECK_GOTO(terrno, &lino, _err7);
×
241
  }
242

243
  TAOS_RETURN(code);
4,175,284✔
244

245
_err7:
×
246
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
×
247
_err6:
×
248
  rocksdb_writebatch_destroy(writebatch);
×
249
_err5:
×
250
  rocksdb_close(pTsdb->rCache.db);
×
251
_err4:
×
252
  rocksdb_readoptions_destroy(readoptions);
×
253
_err3:
×
254
  rocksdb_writeoptions_destroy(writeoptions);
×
255
_err2:
×
256
  rocksdb_options_destroy(options);
×
257
  rocksdb_block_based_options_destroy(tableoptions);
×
258
_err:
×
259
  rocksdb_comparator_destroy(cmp);
×
260
#endif
261
  TAOS_RETURN(code);
×
262
}
263

264
static void tsdbCloseRocksCache(STsdb *pTsdb) {
4,175,284✔
265
#ifdef USE_ROCKSDB
266
  rocksdb_close(pTsdb->rCache.db);
4,175,284✔
267
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
4,175,284✔
268
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
4,175,284✔
269
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
4,175,284✔
270
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
4,175,284✔
271
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
4,175,284✔
272
  rocksdb_options_destroy(pTsdb->rCache.options);
4,175,284✔
273
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
4,175,230✔
274
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
4,175,284✔
275
  taosMemoryFree(pTsdb->rCache.pTSchema);
4,175,284✔
276
  taosArrayDestroy(pTsdb->rCache.ctxArray);
4,175,284✔
277
#endif
278
}
4,175,284✔
279

280
static void rocksMayWrite(STsdb *pTsdb, bool force) {
20,455,212✔
281
#ifdef USE_ROCKSDB
282
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
20,455,212✔
283

284
  int count = rocksdb_writebatch_count(wb);
20,455,212✔
285
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
20,455,212✔
286
    char *err = NULL;
105,288✔
287

288
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
105,288✔
289
    if (NULL != err) {
105,288✔
290
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
291
                err);
292
      rocksdb_free(err);
×
293
    }
294

295
    rocksdb_writebatch_clear(wb);
105,288✔
296
  }
297
#endif
298
}
20,455,212✔
299

300
typedef struct {
301
  TSKEY  ts;
302
  int8_t dirty;
303
  struct {
304
    int16_t cid;
305
    int8_t  type;
306
    int8_t  flag;
307
    union {
308
      int64_t val;
309
      struct {
310
        uint32_t nData;
311
        uint8_t *pData;
312
      };
313
    } value;
314
  } colVal;
315
} SLastColV0;
316

317
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
206,635✔
318
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
206,635✔
319

320
  pLastCol->rowKey.ts = pLastColV0->ts;
206,635✔
321
  pLastCol->rowKey.numOfPKs = 0;
206,635✔
322
  pLastCol->dirty = pLastColV0->dirty;
206,635✔
323
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
206,635✔
324
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
206,635✔
325
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
206,635✔
326

327
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
206,635✔
328

329
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
206,635✔
330
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
12,999✔
331
    pLastCol->colVal.value.pData = NULL;
12,999✔
332
    if (pLastCol->colVal.value.nData > 0) {
12,999✔
333
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
4,810✔
334
    }
335
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
12,999✔
336
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
193,636✔
337
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
59,740✔
338
    pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
59,740✔
339
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
59,740✔
340
  } else {
341
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
133,896✔
342
    return sizeof(SLastColV0);
133,896✔
343
  }
344
}
345

346
static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
206,635✔
347
  if (!value) {
206,635✔
348
    return TSDB_CODE_INVALID_PARA;
×
349
  }
350

351
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
206,635✔
352
  if (NULL == pLastCol) {
206,635✔
353
    return terrno;
×
354
  }
355

356
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
206,635✔
357
  if (offset == size) {
206,635✔
358
    // version 0
359
    *ppLastCol = pLastCol;
×
360

361
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
362
  } else if (offset > size) {
206,635✔
363
    taosMemoryFreeClear(pLastCol);
×
364

365
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
366
  }
367

368
  // version
369
  int8_t version = *(int8_t *)(value + offset);
206,635✔
370
  offset += sizeof(int8_t);
206,635✔
371

372
  // numOfPKs
373
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
206,635✔
374
  offset += sizeof(uint8_t);
206,635✔
375

376
  // pks
377
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
206,635✔
378
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
×
379
    offset += sizeof(SValue);
×
380

381
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
382
      pLastCol->rowKey.pks[i].pData = NULL;
×
383
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
384
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
385
        offset += pLastCol->rowKey.pks[i].nData;
×
386
      }
387
    }
388
  }
389

390
  if (version >= LAST_COL_VERSION_2) {
206,635✔
391
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
206,635✔
392
  }
393

394
  if (offset > size) {
206,635✔
395
    taosMemoryFreeClear(pLastCol);
×
396

397
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
398
  }
399

400
  *ppLastCol = pLastCol;
206,635✔
401

402
  TAOS_RETURN(TSDB_CODE_SUCCESS);
206,635✔
403
}
404

405
/*
406
typedef struct {
407
  SLastColV0 lastColV0;
408
  char       colData[];
409
  int8_t     version;
410
  uint8_t    numOfPKs;
411
  SValue     pks[0];
412
  char       pk0Data[];
413
  SValue     pks[1];
414
  char       pk1Data[];
415
  ...
416
} SLastColDisk;
417
*/
418
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
14,860,786✔
419
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
14,860,786✔
420

421
  pLastColV0->ts = pLastCol->rowKey.ts;
14,860,786✔
422
  pLastColV0->dirty = pLastCol->dirty;
14,860,035✔
423
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
14,860,035✔
424
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
14,860,786✔
425
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
14,860,786✔
426
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
14,860,786✔
427
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
850,824✔
428
    if (pLastCol->colVal.value.nData > 0) {
850,824✔
429
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
492,711✔
430
    }
431
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
850,824✔
432
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
14,009,962✔
433
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
817,910✔
434
    if (pLastCol->colVal.value.nData > 0) {
817,910✔
435
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
411,893✔
436
    }
437
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
817,910✔
438
  } else {
439
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
13,192,052✔
440
    return sizeof(SLastColV0);
13,192,052✔
441
  }
442

443
  return 0;
444
}
445

446
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
14,858,019✔
447
  *size = sizeof(SLastColV0);
14,858,019✔
448
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
14,858,019✔
449
    *size += pLastCol->colVal.value.nData;
848,057✔
450
  }
451
  if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
14,860,786✔
452
    *size += DECIMAL128_BYTES;
817,910✔
453
  }
454
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
14,860,303✔
455

456
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
14,902,871✔
457
    *size += sizeof(SValue);
42,568✔
458
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
42,568✔
459
      *size += pLastCol->rowKey.pks[i].nData;
8,496✔
460
    }
461
  }
462

463
  *value = taosMemoryMalloc(*size);
14,860,303✔
464
  if (NULL == *value) {
14,859,267✔
465
    TAOS_RETURN(terrno);
×
466
  }
467

468
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
14,859,267✔
469

470
  // version
471
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
14,860,786✔
472
  offset++;
14,860,786✔
473

474
  // numOfPKs
475
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
14,860,786✔
476
  offset++;
14,860,028✔
477

478
  // pks
479
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
14,902,596✔
480
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
42,568✔
481
    offset += sizeof(SValue);
42,568✔
482
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
42,568✔
483
      if (pLastCol->rowKey.pks[i].nData > 0) {
8,496✔
484
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
8,496✔
485
      }
486
      offset += pLastCol->rowKey.pks[i].nData;
8,496✔
487
    }
488
  }
489

490
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
14,860,786✔
491

492
  TAOS_RETURN(TSDB_CODE_SUCCESS);
14,860,786✔
493
}
494

495
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
496

497
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
23,372,374✔
498
  SLastCol *pLastCol = (SLastCol *)value;
23,372,374✔
499

500
  if (pLastCol->dirty) {
23,372,374✔
501
    STsdb *pTsdb = (STsdb *)ud;
14,753,585✔
502

503
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
14,753,585✔
504
    if (code) {
14,756,579✔
505
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
506
      return code;
×
507
    }
508

509
    pLastCol->dirty = 0;
14,756,579✔
510

511
    rocksMayWrite(pTsdb, false);
14,756,579✔
512
  }
513

514
  return 0;
23,376,126✔
515
}
516

517
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
45,049,377✔
518
  bool deleted = false;
45,049,377✔
519
  while (*iSkyline > 0) {
45,049,377✔
520
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
29,629✔
521
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
29,629✔
522

523
    if (key->ts > pItemBack->ts) {
29,629✔
524
      return false;
7,866✔
525
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
21,763✔
526
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
8,695✔
527
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
528
        return true;
8,695✔
529
      } else {
530
        if (*iSkyline > 1) {
×
531
          --*iSkyline;
×
532
        } else {
533
          return false;
×
534
        }
535
      }
536
    } else {
537
      if (*iSkyline > 1) {
13,068✔
538
        --*iSkyline;
×
539
      } else {
540
        return false;
13,068✔
541
      }
542
    }
543
  }
544

545
  return deleted;
45,025,516✔
546
}
547

548
// Get next non-deleted row from imem
549
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
11,919,506✔
550
  int32_t code = 0;
11,919,506✔
551

552
  if (tsdbTbDataIterNext(pTbIter)) {
11,919,506✔
553
    TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
11,893,167✔
554
    TSDBKEY  rowKey = TSDBROW_KEY(pMemRow);
11,893,167✔
555
    bool     deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
11,893,167✔
556
    if (!deleted) {
11,893,167✔
557
      return pMemRow;
11,893,167✔
558
    }
559
  }
560

561
  return NULL;
26,339✔
562
}
563

564
// Get first non-deleted row from imem
565
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
1,571,761✔
566
                                    int64_t *piSkyline) {
567
  int32_t code = 0;
1,571,761✔
568

569
  tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
1,571,761✔
570
  TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
1,571,761✔
571
  if (pMemRow) {
1,571,761✔
572
    // if non deleted, return the found row.
573
    TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
1,571,761✔
574
    bool    deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
1,571,761✔
575
    if (!deleted) {
1,571,761✔
576
      return pMemRow;
1,568,929✔
577
    }
578
  } else {
579
    return NULL;
×
580
  }
581

582
  // continue to find the non-deleted first row from imem, using get next row
583
  return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
2,832✔
584
}
585

586
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
86,893✔
587
  SRocksCache *pRCache = &pTsdb->rCache;
86,893✔
588
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
86,893✔
589

590
  if (suid > 0 && suid == pRCache->suid) {
1,516✔
591
    pRCache->sver = -1;
×
592
    pRCache->suid = -1;
×
593
  }
594
  if (suid == 0 && uid == pRCache->uid) {
1,516✔
595
    pRCache->sver = -1;
758✔
596
    pRCache->uid = -1;
758✔
597
  }
598
}
599

600
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
11,916,674✔
601
  SRocksCache *pRCache = &pTsdb->rCache;
11,916,674✔
602
  if (pRCache->pTSchema && sver == pRCache->sver) {
11,916,674✔
603
    if (suid > 0 && suid == pRCache->suid) {
11,882,805✔
604
      return 0;
11,779,697✔
605
    }
606
    if (suid == 0 && uid == pRCache->uid) {
103,108✔
607
      return 0;
5,765✔
608
    }
609
  }
610

611
  pRCache->suid = suid;
131,212✔
612
  pRCache->uid = uid;
131,212✔
613
  pRCache->sver = sver;
131,212✔
614
  tDestroyTSchema(pRCache->pTSchema);
131,212✔
615
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema);
131,212✔
616
}
617

618
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
619

620
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
1,571,761✔
621
  int32_t      code = 0;
1,571,761✔
622
  int32_t      lino = 0;
1,571,761✔
623
  STsdb       *pTsdb = imem->pTsdb;
1,571,761✔
624
  SArray      *pMemDelData = NULL;
1,571,761✔
625
  SArray      *pSkyline = NULL;
1,571,761✔
626
  int64_t      iSkyline = 0;
1,571,761✔
627
  STbDataIter  tbIter = {0};
1,571,761✔
628
  TSDBROW     *pMemRow = NULL;
1,571,761✔
629
  STSchema    *pTSchema = NULL;
1,571,761✔
630
  SSHashObj   *iColHash = NULL;
1,571,761✔
631
  int32_t      sver;
632
  int32_t      nCol;
633
  SArray      *ctxArray = pTsdb->rCache.ctxArray;
1,571,761✔
634
  STsdbRowKey  tsdbRowKey = {0};
1,571,761✔
635
  STSDBRowIter iter = {0};
1,571,761✔
636

637
  STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
1,571,761✔
638

639
  // load imem tomb data and build skyline
640
  TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
1,571,761✔
641

642
  // tsdbBuildDeleteSkyline
643
  size_t delSize = TARRAY_SIZE(pMemDelData);
1,571,761✔
644
  if (delSize > 0) {
1,571,761✔
645
    pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
3,606✔
646
    if (!pSkyline) {
3,606✔
647
      TAOS_CHECK_EXIT(terrno);
×
648
    }
649

650
    TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline));
3,606✔
651
    iSkyline = taosArrayGetSize(pSkyline) - 1;
3,606✔
652
  }
653

654
  pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
1,571,761✔
655
  if (!pMemRow) {
1,571,761✔
656
    goto _exit;
×
657
  }
658

659
  // iter first row to last_row/last col values to ctxArray, and mark last null col ids
660
  sver = TSDBROW_SVERSION(pMemRow);
1,571,761✔
661
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
1,571,761✔
662
  pTSchema = pTsdb->rCache.pTSchema;
1,571,761✔
663
  nCol = pTSchema->numOfCols;
1,571,761✔
664

665
  tsdbRowGetKey(pMemRow, &tsdbRowKey);
1,571,761✔
666

667
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
1,571,761✔
668

669
  int32_t iCol = 0;
1,571,761✔
670
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
6,929,461✔
671
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
5,357,700✔
672
    if (!taosArrayPush(ctxArray, &updateCtx)) {
5,357,728✔
673
      TAOS_CHECK_EXIT(terrno);
×
674
    }
675

676
    if (COL_VAL_IS_VALUE(pColVal)) {
5,357,728✔
677
      updateCtx.lflag = LFLAG_LAST;
5,271,307✔
678
      if (!taosArrayPush(ctxArray, &updateCtx)) {
5,271,279✔
679
        TAOS_CHECK_EXIT(terrno);
×
680
      }
681
    } else {
682
      if (!iColHash) {
86,421✔
683
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
23,985✔
684
        if (iColHash == NULL) {
23,985✔
685
          TAOS_CHECK_EXIT(terrno);
×
686
        }
687
      }
688

689
      if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
86,421✔
690
        TAOS_CHECK_EXIT(terrno);
×
691
      }
692
    }
693
  }
694
  tsdbRowClose(&iter);
1,571,789✔
695

696
  // continue to get next row to fill null last col values
697
  pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
1,571,761✔
698
  while (pMemRow) {
11,916,674✔
699
    if (tSimpleHashGetSize(iColHash) == 0) {
11,890,335✔
700
      break;
1,545,422✔
701
    }
702

703
    sver = TSDBROW_SVERSION(pMemRow);
10,344,913✔
704
    TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
10,344,913✔
705
    pTSchema = pTsdb->rCache.pTSchema;
10,344,913✔
706

707
    STsdbRowKey tsdbRowKey = {0};
10,344,913✔
708
    tsdbRowGetKey(pMemRow, &tsdbRowKey);
10,344,913✔
709

710
    TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
10,344,913✔
711

712
    int32_t iCol = 0;
10,344,913✔
713
    for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
124,092,418✔
714
      if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
113,744,621✔
715
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
52,587✔
716
        if (!taosArrayPush(ctxArray, &updateCtx)) {
52,587✔
717
          TAOS_CHECK_EXIT(terrno);
×
718
        }
719

720
        TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
52,587✔
721
      }
722
    }
723
    tsdbRowClose(&iter);
10,348,415✔
724

725
    pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
10,344,913✔
726
  }
727

728
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
1,571,761✔
729

730
_exit:
1,571,761✔
731
  if (code) {
1,571,761✔
732
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
733

734
    tsdbRowClose(&iter);
×
735
  }
736

737
  taosArrayClear(ctxArray);
1,571,761✔
738
  // destroy any allocated resource
739
  tSimpleHashCleanup(iColHash);
1,571,761✔
740
  if (pMemDelData) {
1,571,761✔
741
    taosArrayDestroy(pMemDelData);
1,571,761✔
742
  }
743
  if (pSkyline) {
1,571,761✔
744
    taosArrayDestroy(pSkyline);
3,606✔
745
  }
746

747
  TAOS_RETURN(code);
1,571,761✔
748
}
749

750
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
76,704✔
751
  if (!pTsdb) return 0;
76,704✔
752
  if (!pTsdb->imem) return 0;
76,704✔
753

754
  int32_t    code = 0;
56,010✔
755
  int32_t    lino = 0;
56,010✔
756
  SMemTable *imem = pTsdb->imem;
56,010✔
757
  int32_t    nTbData = imem->nTbData;
56,010✔
758
  int64_t    nRow = imem->nRow;
56,010✔
759
  int64_t    nDel = imem->nDel;
56,010✔
760

761
  if (nRow == 0 || nTbData == 0) return 0;
56,010✔
762

763
  TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
56,010✔
764

765
_exit:
56,010✔
766
  if (code) {
56,010✔
767
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
768
  } else {
769
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
56,010✔
770
  }
771

772
  TAOS_RETURN(code);
56,010✔
773
}
774

775
int32_t tsdbCacheCommit(STsdb *pTsdb) {
76,704✔
776
  int32_t code = 0;
76,704✔
777

778
  // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
779
  // flush dirty data of lru into rocks
780
  // 4, and update when writing if !updateCacheBatch
781
  // 5, merge cache & mem if updateCacheBatch
782

783
  if (tsUpdateCacheBatch) {
76,704✔
784
    code = tsdbCacheUpdateFromIMem(pTsdb);
76,704✔
785
    if (code) {
76,704✔
786
      tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
787

788
      TAOS_RETURN(code);
×
789
    }
790
  }
791

792
  char      *err = NULL;
76,704✔
793
  SLRUCache *pCache = pTsdb->lruCache;
76,704✔
794
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
795

796
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
76,704✔
797

798
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
76,704✔
799

800
#ifdef USE_ROCKSDB
801
  rocksMayWrite(pTsdb, true);
76,704✔
802
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
76,704✔
803
#endif
804
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
76,704✔
805
#ifdef USE_ROCKSDB
806
  if (NULL != err) {
76,704✔
807
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
808
    rocksdb_free(err);
×
809
    code = TSDB_CODE_FAILED;
×
810
  }
811
#endif
812
  TAOS_RETURN(code);
76,704✔
813
}
814

815
static int32_t reallocVarDataVal(SValue *pValue) {
1,404,019✔
816
  if (IS_VAR_DATA_TYPE(pValue->type)) {
1,404,019✔
817
    uint8_t *pVal = pValue->pData;
1,404,019✔
818
    uint32_t nData = pValue->nData;
1,404,019✔
819
    if (nData > 0) {
1,404,019✔
820
      uint8_t *p = taosMemoryMalloc(nData);
695,461✔
821
      if (!p) {
695,461✔
822
        TAOS_RETURN(terrno);
×
823
      }
824
      pValue->pData = p;
695,461✔
825
      (void)memcpy(pValue->pData, pVal, nData);
695,461✔
826
    } else {
827
      pValue->pData = NULL;
708,558✔
828
    }
829
  }
830

831
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,404,019✔
832
}
833

834
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
1,378,531✔
835

836
// realloc pk data and col data.
837
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
27,521,850✔
838
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
27,521,850✔
839
  size_t  charge = sizeof(SLastCol);
27,521,850✔
840

841
  int8_t i = 0;
27,521,850✔
842
  for (; i < pCol->rowKey.numOfPKs; i++) {
27,633,226✔
843
    SValue *pValue = &pCol->rowKey.pks[i];
111,376✔
844
    if (IS_VAR_DATA_TYPE(pValue->type)) {
111,376✔
845
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
25,488✔
846
      charge += pValue->nData;
25,488✔
847
    }
848
  }
849

850
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
27,521,616✔
851
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
1,376,822✔
852
    charge += pCol->colVal.value.nData;
1,378,531✔
853
  }
854

855
  if (pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
27,522,047✔
856
    if (pCol->colVal.value.nData > 0) {
999,320✔
857
      void *p = taosMemoryMalloc(pCol->colVal.value.nData);
491,377✔
858
      if (!p) TAOS_CHECK_EXIT(terrno);
491,377✔
859
      (void)memcpy(p, pCol->colVal.value.pData, pCol->colVal.value.nData);
491,377✔
860
      pCol->colVal.value.pData = p;
491,377✔
861
    }
862
    charge += pCol->colVal.value.nData;
999,320✔
863
  }
864

865
  if (pCharge) {
27,521,466✔
866
    *pCharge = charge;
23,599,218✔
867
  }
868

869
_exit:
3,922,248✔
870
  if (TSDB_CODE_SUCCESS != code) {
27,521,752✔
871
    for (int8_t j = 0; j < i; j++) {
×
872
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
873
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
874
      }
875
    }
876

877
    (void)memset(pCol, 0, sizeof(SLastCol));
×
878
  }
879

880
  TAOS_RETURN(code);
27,521,752✔
881
}
882

883
void tsdbCacheFreeSLastColItem(void *pItem) {
4,328,512✔
884
  SLastCol *pCol = (SLastCol *)pItem;
4,328,512✔
885
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
4,410,704✔
886
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
82,192✔
887
      taosMemoryFree(pCol->rowKey.pks[i].pData);
22,656✔
888
    }
889
  }
890

891
  if ((IS_VAR_DATA_TYPE(pCol->colVal.value.type) || pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) &&
4,328,924✔
892
      pCol->colVal.value.pData) {
511,413✔
893
    taosMemoryFree(pCol->colVal.value.pData);
301,924✔
894
  }
895
}
4,328,718✔
896

897
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
23,600,160✔
898
  SLastCol *pLastCol = (SLastCol *)value;
23,600,160✔
899

900
  if (pLastCol->dirty) {
23,600,160✔
901
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
2,124,260✔
902
      STsdb *pTsdb = (STsdb *)ud;
×
903
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
904
    }
905
  }
906

907
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
23,642,700✔
908
    SValue *pValue = &pLastCol->rowKey.pks[i];
42,568✔
909
    if (IS_VAR_DATA_TYPE(pValue->type)) {
42,568✔
910
      taosMemoryFree(pValue->pData);
8,496✔
911
    }
912
  }
913

914
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) ||
23,600,160✔
915
      pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL /* && pLastCol->colVal.value.nData > 0*/) {
22,575,035✔
916
    taosMemoryFree(pLastCol->colVal.value.pData);
1,918,081✔
917
  }
918

919
  taosMemoryFree(value);
23,600,160✔
920
}
23,600,160✔
921

922
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
10,656,795✔
923
  SLastCol *pLastCol = (SLastCol *)value;
10,656,795✔
924
  pLastCol->dirty = 0;
10,656,795✔
925
}
10,656,795✔
926

927
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
928

929
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
12,803,520✔
930
  int32_t code = 0, lino = 0;
12,803,520✔
931

932
  SLRUCache *pCache = pTsdb->lruCache;
12,803,520✔
933
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
934
  SRowKey  emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
12,804,344✔
935
  SLastCol emptyCol = {
12,804,344✔
936
      .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
937

938
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
12,804,434✔
939
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
12,804,536✔
940
  if (code) {
12,804,400✔
941
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
942
  }
943

944
  TAOS_RETURN(code);
12,804,400✔
945
}
946

947
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
1,011,179✔
948
  int32_t code = 0;
1,011,179✔
949
  char   *err = NULL;
1,011,179✔
950

951
  SLRUCache *pCache = pTsdb->lruCache;
1,012,876✔
952
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
953

954
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
1,012,876✔
955
#ifdef USE_ROCKSDB
956
  rocksMayWrite(pTsdb, true);
1,012,876✔
957
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
1,012,876✔
958
  if (NULL != err) {
1,012,876✔
959
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
960
    rocksdb_free(err);
×
961
    code = TSDB_CODE_FAILED;
×
962
  }
963
#endif
964
  TAOS_RETURN(code);
1,012,876✔
965
}
966

967
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
7,684,186✔
968
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
969
#ifdef USE_ROCKSDB
970
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
7,684,186✔
971
  if (!valuesList) return terrno;
7,683,340✔
972
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
7,683,340✔
973
  if (!valuesListSizes) {
7,682,962✔
974
    taosMemoryFreeClear(valuesList);
×
975
    return terrno;
×
976
  }
977
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
7,682,962✔
978
  if (!errs) {
7,683,574✔
979
    taosMemoryFreeClear(valuesList);
×
980
    taosMemoryFreeClear(valuesListSizes);
×
981
    return terrno;
×
982
  }
983
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
7,683,574✔
984
                    valuesListSizes, errs);
985
  for (size_t i = 0; i < numKeys; ++i) {
27,865,604✔
986
    rocksdb_free(errs[i]);
20,183,014✔
987
  }
988
  taosMemoryFreeClear(errs);
7,682,590✔
989

990
  *pppValuesList = valuesList;
7,682,727✔
991
  *ppValuesListSizes = valuesListSizes;
7,683,822✔
992
#endif
993
  TAOS_RETURN(TSDB_CODE_SUCCESS);
7,683,822✔
994
}
995

996
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
5,874,780✔
997
  int32_t code = 0;
5,874,780✔
998

999
  // build keys & multi get from rocks
1000
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
5,874,780✔
1001
  if (!keys_list) {
5,875,392✔
1002
    return terrno;
×
1003
  }
1004
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
5,875,392✔
1005
  if (!keys_list_sizes) {
5,875,392✔
1006
    taosMemoryFree(keys_list);
×
1007
    return terrno;
×
1008
  }
1009
  const size_t klen = ROCKS_KEY_LEN;
5,875,392✔
1010

1011
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
5,875,392✔
1012
  if (!keys) {
5,875,158✔
1013
    taosMemoryFree(keys_list);
×
1014
    taosMemoryFree(keys_list_sizes);
×
1015
    return terrno;
×
1016
  }
1017
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
5,875,158✔
1018
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
5,875,158✔
1019

1020
  keys_list[0] = keys;
5,874,546✔
1021
  keys_list[1] = keys + sizeof(SLastKey);
5,875,158✔
1022
  keys_list_sizes[0] = klen;
5,875,392✔
1023
  keys_list_sizes[1] = klen;
5,875,392✔
1024

1025
  char  **values_list = NULL;
5,874,780✔
1026
  size_t *values_list_sizes = NULL;
5,874,780✔
1027

1028
  // was written by caller
1029
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
1030

1031
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
5,874,780✔
1032
                                              &values_list_sizes),
1033
                  NULL, _exit);
1034
#ifdef USE_ROCKSDB
1035
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
5,875,028✔
1036
#endif
1037
  {
1038
#ifdef USE_ROCKSDB
1039
    SLastCol *pLastCol = NULL;
5,875,028✔
1040
    if (values_list[0] != NULL) {
5,874,416✔
1041
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
85,280✔
1042
      if (code != TSDB_CODE_SUCCESS) {
85,280✔
1043
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1044
                  tstrerror(code));
1045
        goto _exit;
×
1046
      }
1047
      if (NULL != pLastCol) {
85,280✔
1048
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
85,280✔
1049
      }
1050
      taosMemoryFreeClear(pLastCol);
85,280✔
1051
    }
1052

1053
    pLastCol = NULL;
5,875,028✔
1054
    if (values_list[1] != NULL) {
5,875,028✔
1055
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
85,280✔
1056
      if (code != TSDB_CODE_SUCCESS) {
85,280✔
1057
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1058
                  tstrerror(code));
1059
        goto _exit;
×
1060
      }
1061
      if (NULL != pLastCol) {
85,280✔
1062
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
85,280✔
1063
      }
1064
      taosMemoryFreeClear(pLastCol);
85,280✔
1065
    }
1066

1067
    rocksdb_free(values_list[0]);
5,875,028✔
1068
    rocksdb_free(values_list[1]);
5,875,338✔
1069
#endif
1070

1071
    for (int i = 0; i < 2; i++) {
17,623,810✔
1072
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
11,748,418✔
1073
      if (h) {
11,750,701✔
1074
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
170,560✔
1075
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
170,560✔
1076
      }
1077
    }
1078
  }
1079

1080
_exit:
5,875,392✔
1081
  taosMemoryFree(keys_list[0]);
5,875,392✔
1082

1083
  taosMemoryFree(keys_list);
5,874,726✔
1084
  taosMemoryFree(keys_list_sizes);
5,875,392✔
1085
  taosMemoryFree(values_list);
5,874,492✔
1086
  taosMemoryFree(values_list_sizes);
5,875,392✔
1087

1088
  TAOS_RETURN(code);
5,875,392✔
1089
}
1090

1091
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
1,635,963✔
1092
  int32_t code = 0;
1,635,963✔
1093

1094
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
1,635,963✔
1095

1096
  if (suid < 0) {
1,635,481✔
1097
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
123,315✔
1098
      int16_t cid = pSchemaRow->pSchema[i].colId;
107,379✔
1099
      int8_t  col_type = pSchemaRow->pSchema[i].type;
107,379✔
1100

1101
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
107,379✔
1102
      if (code != TSDB_CODE_SUCCESS) {
107,379✔
1103
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1104
                  tstrerror(code));
1105
      }
1106
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
107,379✔
1107
      if (code != TSDB_CODE_SUCCESS) {
107,379✔
1108
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1109
                  tstrerror(code));
1110
      }
1111
    }
1112
  } else {
1113
    STSchema *pTSchema = NULL;
1,619,545✔
1114
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
1,620,285✔
1115
    if (code != TSDB_CODE_SUCCESS) {
1,620,229✔
1116
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1117

1118
      TAOS_RETURN(code);
×
1119
    }
1120

1121
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
7,787,801✔
1122
      int16_t cid = pTSchema->columns[i].colId;
6,167,544✔
1123
      int8_t  col_type = pTSchema->columns[i].type;
6,167,174✔
1124

1125
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
6,166,462✔
1126
      if (code != TSDB_CODE_SUCCESS) {
6,167,202✔
1127
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1128
                  tstrerror(code));
1129
      }
1130
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
6,167,202✔
1131
      if (code != TSDB_CODE_SUCCESS) {
6,167,544✔
1132
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1133
                  tstrerror(code));
1134
      }
1135
    }
1136

1137
    taosMemoryFree(pTSchema);
1,620,229✔
1138
  }
1139

1140
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
1,636,221✔
1141

1142
  TAOS_RETURN(code);
1,636,221✔
1143
}
1144

1145
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
6,372✔
1146
  int32_t code = 0;
6,372✔
1147

1148
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
6,372✔
1149

1150
  code = tsdbCacheCommitNoLock(pTsdb);
6,372✔
1151
  if (code != TSDB_CODE_SUCCESS) {
6,372✔
1152
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1153
              tstrerror(code));
1154
  }
1155

1156
  if (pSchemaRow != NULL) {
6,372✔
1157
    bool hasPrimayKey = false;
×
1158
    int  nCols = pSchemaRow->nCols;
×
1159
    if (nCols >= 2) {
×
1160
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
1161
    }
1162
    for (int i = 0; i < nCols; ++i) {
×
1163
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
1164
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1165

1166
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1167
      if (code != TSDB_CODE_SUCCESS) {
×
1168
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1169
                  tstrerror(code));
1170
      }
1171
    }
1172
  } else {
1173
    STSchema *pTSchema = NULL;
6,372✔
1174
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
6,372✔
1175
    if (code != TSDB_CODE_SUCCESS) {
6,372✔
1176
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1177

1178
      TAOS_RETURN(code);
×
1179
    }
1180

1181
    bool hasPrimayKey = false;
6,372✔
1182
    int  nCols = pTSchema->numOfCols;
6,372✔
1183
    if (nCols >= 2) {
6,372✔
1184
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
6,372✔
1185
    }
1186
    for (int i = 0; i < nCols; ++i) {
27,792✔
1187
      int16_t cid = pTSchema->columns[i].colId;
21,420✔
1188
      int8_t  col_type = pTSchema->columns[i].type;
21,420✔
1189

1190
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
21,420✔
1191
      if (code != TSDB_CODE_SUCCESS) {
21,420✔
1192
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1193
                  tstrerror(code));
1194
      }
1195
    }
1196

1197
    taosMemoryFree(pTSchema);
6,372✔
1198
  }
1199

1200
  rocksMayWrite(pTsdb, false);
6,372✔
1201

1202
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
6,372✔
1203

1204
  TAOS_RETURN(code);
6,372✔
1205
}
1206

1207
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
991,871✔
1208
  int32_t code = 0;
991,871✔
1209

1210
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
991,871✔
1211

1212
  code = tsdbCacheCommitNoLock(pTsdb);
992,296✔
1213
  if (code != TSDB_CODE_SUCCESS) {
992,908✔
1214
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1215
              tstrerror(code));
1216
  }
1217

1218
  STSchema *pTSchema = NULL;
992,908✔
1219
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
992,296✔
1220
  if (code != TSDB_CODE_SUCCESS) {
992,304✔
1221
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
2,652✔
1222

1223
    TAOS_RETURN(code);
2,652✔
1224
  }
1225

1226
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
1,687,261✔
1227
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
696,997✔
1228

1229
    bool hasPrimayKey = false;
696,997✔
1230
    int  nCols = pTSchema->numOfCols;
696,997✔
1231
    if (nCols >= 2) {
696,997✔
1232
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
696,997✔
1233
    }
1234

1235
    for (int i = 0; i < nCols; ++i) {
6,487,109✔
1236
      int16_t cid = pTSchema->columns[i].colId;
5,789,500✔
1237
      int8_t  col_type = pTSchema->columns[i].type;
5,788,888✔
1238

1239
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
5,789,500✔
1240
      if (code != TSDB_CODE_SUCCESS) {
5,790,112✔
1241
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1242
                  tstrerror(code));
1243
      }
1244
    }
1245
  }
1246

1247
  taosMemoryFree(pTSchema);
989,652✔
1248

1249
  rocksMayWrite(pTsdb, false);
990,256✔
1250

1251
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
990,256✔
1252

1253
  TAOS_RETURN(code);
990,256✔
1254
}
1255

1256
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
×
1257
  int32_t code = 0;
×
1258

1259
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1260

1261
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
1262
  if (code != TSDB_CODE_SUCCESS) {
×
1263
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1264
              tstrerror(code));
1265
  }
1266
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
1267
  if (code != TSDB_CODE_SUCCESS) {
×
1268
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1269
              tstrerror(code));
1270
  }
1271
  // rocksMayWrite(pTsdb, true, false, false);
1272
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1273

1274
  TAOS_RETURN(code);
×
1275
}
1276

1277
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
×
1278
  int32_t code = 0;
×
1279

1280
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1281

1282
  code = tsdbCacheCommitNoLock(pTsdb);
×
1283
  if (code != TSDB_CODE_SUCCESS) {
×
1284
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1285
              tstrerror(code));
1286
  }
1287

1288
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1289
  if (code != TSDB_CODE_SUCCESS) {
×
1290
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1291
              tstrerror(code));
1292
  }
1293

1294
  rocksMayWrite(pTsdb, false);
×
1295

1296
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1297

1298
  TAOS_RETURN(code);
×
1299
}
1300

1301
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
27,192✔
1302
  int32_t code = 0;
27,192✔
1303

1304
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
27,192✔
1305

1306
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
154,912✔
1307
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
127,720✔
1308

1309
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
127,720✔
1310
    if (code != TSDB_CODE_SUCCESS) {
127,720✔
1311
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1312
                tstrerror(code));
1313
    }
1314
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
127,720✔
1315
    if (code != TSDB_CODE_SUCCESS) {
127,720✔
1316
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1317
                tstrerror(code));
1318
    }
1319
  }
1320

1321
  // rocksMayWrite(pTsdb, true, false, false);
1322
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
27,192✔
1323
  TAOS_RETURN(code);
27,192✔
1324
}
1325

1326
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
13,596✔
1327
  int32_t code = 0;
13,596✔
1328

1329
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
13,596✔
1330

1331
  code = tsdbCacheCommitNoLock(pTsdb);
13,596✔
1332
  if (code != TSDB_CODE_SUCCESS) {
13,596✔
1333
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1334
              tstrerror(code));
1335
  }
1336

1337
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
77,456✔
1338
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
63,860✔
1339

1340
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
63,860✔
1341
    if (code != TSDB_CODE_SUCCESS) {
63,860✔
1342
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1343
                tstrerror(code));
1344
    }
1345
  }
1346

1347
  rocksMayWrite(pTsdb, false);
13,596✔
1348

1349
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
13,596✔
1350

1351
  TAOS_RETURN(code);
13,596✔
1352
}
1353

1354
typedef struct {
1355
  int      idx;
1356
  SLastKey key;
1357
} SIdxKey;
1358

1359
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
×
1360
  // update rowkey
1361
  pLastCol->rowKey.ts = TSKEY_MIN;
×
1362
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
1363
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
1364
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
1365
      taosMemoryFreeClear(pPKValue->pData);
×
1366
      pPKValue->nData = 0;
×
1367
    } else {
1368
      valueClearDatum(pPKValue, pPKValue->type);
×
1369
    }
1370
  }
1371
  pLastCol->rowKey.numOfPKs = 0;
×
1372

1373
  // update colval
1374
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
×
1375
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
1376
    pLastCol->colVal.value.nData = 0;
×
1377
  } else {
1378
    valueClearDatum(&pLastCol->colVal.value, pLastCol->colVal.value.type);
×
1379
  }
1380

1381
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
×
1382
  pLastCol->dirty = 1;
×
1383
  pLastCol->cacheStatus = cacheStatus;
×
1384
}
×
1385

1386
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
14,857,309✔
1387
  int32_t code = 0;
14,857,309✔
1388
#ifdef USE_ROCKSDB
1389
  char  *rocks_value = NULL;
14,857,309✔
1390
  size_t vlen = 0;
14,858,019✔
1391

1392
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
14,858,019✔
1393
  if (code) {
14,860,786✔
1394
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
1395
    TAOS_RETURN(code);
×
1396
  }
1397

1398
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
14,860,786✔
1399
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
14,860,786✔
1400
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
14,860,018✔
1401
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
14,860,018✔
1402

1403
  taosMemoryFree(rocks_value);
14,860,035✔
1404
#endif
1405
  TAOS_RETURN(code);
14,860,786✔
1406
}
1407

1408
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
23,599,532✔
1409
  int32_t code = 0, lino = 0;
23,599,532✔
1410

1411
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
23,599,532✔
1412
  if (!pLRULastCol) {
23,597,998✔
1413
    return terrno;
×
1414
  }
1415

1416
  size_t charge = 0;
23,597,998✔
1417
  *pLRULastCol = *pLastCol;
23,597,998✔
1418
  pLRULastCol->dirty = dirty;
23,597,712✔
1419
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
23,598,652✔
1420

1421
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
23,599,022✔
1422
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
1423
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
23,599,796✔
1424
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
1425
    code = TSDB_CODE_FAILED;
×
1426
    pLRULastCol = NULL;
×
1427
  }
1428

1429
_exit:
23,599,796✔
1430
  if (TSDB_CODE_SUCCESS != code) {
23,599,824✔
1431
    taosMemoryFree(pLRULastCol);
×
1432
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1433
  }
1434

1435
  TAOS_RETURN(code);
23,599,824✔
1436
}
1437

1438
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
1,571,761✔
1439
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
1,571,761✔
1440
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1441
  }
1442

1443
  int32_t code = 0, lino = 0;
1,571,761✔
1444

1445
  int        num_keys = TARRAY_SIZE(updCtxArray);
1,571,761✔
1446
  SArray    *remainCols = NULL;
1,571,761✔
1447
  SLRUCache *pCache = pTsdb->lruCache;
1,571,761✔
1448

1449
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
1,571,761✔
1450
  for (int i = 0; i < num_keys; ++i) {
12,253,383✔
1451
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
10,681,622✔
1452
    int8_t          lflag = updCtx->lflag;
10,681,622✔
1453
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
10,681,622✔
1454
    SColVal        *pColVal = &updCtx->colVal;
10,681,622✔
1455

1456
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
10,681,622✔
1457
      continue;
×
1458
    }
1459

1460
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
10,681,622✔
1461
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
10,681,622✔
1462
    if (h) {
10,681,594✔
1463
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
10,656,034✔
1464
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
10,656,034✔
1465
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
10,655,437✔
1466
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
10,655,437✔
1467
          SLastCol newLastCol = {
10,653,712✔
1468
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1469
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
10,653,712✔
1470
        }
1471
      }
1472

1473
      tsdbLRUCacheRelease(pCache, h, false);
10,655,922✔
1474
      TAOS_CHECK_EXIT(code);
10,656,062✔
1475
    } else {
1476
      if (!remainCols) {
25,560✔
1477
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
6,271✔
1478
        if (!remainCols) {
6,271✔
1479
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1480
        }
1481
      }
1482
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
51,120✔
1483
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1484
      }
1485
    }
1486
  }
1487

1488
  if (remainCols) {
1,571,761✔
1489
    num_keys = TARRAY_SIZE(remainCols);
6,271✔
1490
  }
1491
  if (remainCols && num_keys > 0) {
1,571,761✔
1492
    char  **keys_list = NULL;
6,271✔
1493
    size_t *keys_list_sizes = NULL;
6,271✔
1494
    char  **values_list = NULL;
6,271✔
1495
    size_t *values_list_sizes = NULL;
6,271✔
1496
    char  **errs = NULL;
6,271✔
1497
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
6,271✔
1498
    if (!keys_list) {
6,271✔
1499
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1500
      return terrno;
×
1501
    }
1502
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
6,271✔
1503
    if (!keys_list_sizes) {
6,271✔
1504
      taosMemoryFree(keys_list);
×
1505
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1506
      return terrno;
×
1507
    }
1508
    for (int i = 0; i < num_keys; ++i) {
31,831✔
1509
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
25,560✔
1510

1511
      keys_list[i] = (char *)&idxKey->key;
25,560✔
1512
      keys_list_sizes[i] = ROCKS_KEY_LEN;
25,560✔
1513
    }
1514

1515
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
6,271✔
1516

1517
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
6,271✔
1518
                                       &values_list_sizes);
1519
    if (code) {
6,271✔
1520
      taosMemoryFree(keys_list);
×
1521
      taosMemoryFree(keys_list_sizes);
×
1522
      goto _exit;
×
1523
    }
1524

1525
    // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
1526
    for (int i = 0; i < num_keys; ++i) {
31,831✔
1527
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
25,560✔
1528
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
25,560✔
1529
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
25,560✔
1530
      SColVal        *pColVal = &updCtx->colVal;
25,560✔
1531

1532
      SLastCol *pLastCol = NULL;
25,560✔
1533
      if (values_list[i] != NULL) {
25,560✔
1534
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
1535
        if (code != TSDB_CODE_SUCCESS) {
×
1536
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1537
                    tstrerror(code));
1538
          goto _exit;
×
1539
        }
1540
      }
1541
      /*
1542
      if (code) {
1543
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1544
      }
1545
      */
1546
      SLastCol *pToFree = pLastCol;
25,560✔
1547

1548
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
25,560✔
1549
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
1550
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1551
                    tstrerror(code));
1552
          taosMemoryFreeClear(pToFree);
×
1553
          break;
×
1554
        }
1555

1556
        // cache invalid => skip update
1557
        taosMemoryFreeClear(pToFree);
×
1558
        continue;
×
1559
      }
1560

1561
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
25,560✔
1562
        taosMemoryFreeClear(pToFree);
×
1563
        continue;
×
1564
      }
1565

1566
      int32_t cmp_res = 1;
25,560✔
1567
      if (pLastCol) {
25,560✔
1568
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
×
1569
      }
1570

1571
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
25,560✔
1572
        SLastCol lastColTmp = {
25,560✔
1573
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
1574
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
25,560✔
1575
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1576
                    tstrerror(code));
1577
          taosMemoryFreeClear(pToFree);
×
1578
          break;
×
1579
        }
1580
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
25,560✔
1581
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1582
                    tstrerror(code));
1583
          taosMemoryFreeClear(pToFree);
×
1584
          break;
×
1585
        }
1586
      }
1587

1588
      taosMemoryFreeClear(pToFree);
25,560✔
1589
    }
1590

1591
    rocksMayWrite(pTsdb, false);
6,271✔
1592

1593
    taosMemoryFree(keys_list);
6,271✔
1594
    taosMemoryFree(keys_list_sizes);
6,271✔
1595
    if (values_list) {
6,271✔
1596
#ifdef USE_ROCKSDB
1597
      for (int i = 0; i < num_keys; ++i) {
31,831✔
1598
        rocksdb_free(values_list[i]);
25,560✔
1599
      }
1600
#endif
1601
      taosMemoryFree(values_list);
6,271✔
1602
    }
1603
    taosMemoryFree(values_list_sizes);
6,271✔
1604
  }
1605

1606
_exit:
1,571,761✔
1607
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
1,571,761✔
1608
  taosArrayDestroy(remainCols);
1,571,761✔
1609

1610
  if (code) {
1,571,761✔
1611
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1612
              tstrerror(code));
1613
  }
1614

1615
  TAOS_RETURN(code);
1,571,761✔
1616
}
1617

1618
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
×
1619
                                 SRow **aRow) {
1620
  int32_t code = 0, lino = 0;
×
1621

1622
  // 1. prepare last
1623
  TSDBROW      lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
×
1624
  STSchema    *pTSchema = NULL;
×
1625
  int32_t      sver = TSDBROW_SVERSION(&lRow);
×
1626
  SSHashObj   *iColHash = NULL;
×
1627
  STSDBRowIter iter = {0};
×
1628

1629
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
1630
  pTSchema = pTsdb->rCache.pTSchema;
×
1631

1632
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
×
1633
  int32_t nCol = pTSchema->numOfCols;
×
1634
  SArray *ctxArray = pTsdb->rCache.ctxArray;
×
1635

1636
  // 1. prepare by lrow
1637
  STsdbRowKey tsdbRowKey = {0};
×
1638
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1639

1640
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1641

1642
  int32_t iCol = 0;
×
1643
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
1644
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1645
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1646
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1647
    }
1648

1649
    if (COL_VAL_IS_VALUE(pColVal)) {
×
1650
      updateCtx.lflag = LFLAG_LAST;
×
1651
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1652
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1653
      }
1654
    } else {
1655
      if (!iColHash) {
×
1656
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
1657
        if (iColHash == NULL) {
×
1658
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1659
        }
1660
      }
1661

1662
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
×
1663
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1664
      }
1665
    }
1666
  }
1667

1668
  // 2. prepare by the other rows
1669
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
×
1670
    if (tSimpleHashGetSize(iColHash) == 0) {
×
1671
      break;
×
1672
    }
1673

1674
    tRow.pTSRow = aRow[iRow];
×
1675

1676
    STsdbRowKey tsdbRowKey = {0};
×
1677
    tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1678

1679
    void   *pIte = NULL;
×
1680
    int32_t iter = 0;
×
1681
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
×
1682
      int32_t iCol = ((int32_t *)pIte)[0];
×
1683
      SColVal colVal = COL_VAL_NONE(0, 0);
×
1684
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
×
1685

1686
      if (COL_VAL_IS_VALUE(&colVal)) {
×
1687
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1688
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1689
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1690
        }
1691
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
1692
        if (code != TSDB_CODE_SUCCESS) {
×
1693
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1694
                    __LINE__, tstrerror(code));
1695
        }
1696
      }
1697
    }
1698
  }
1699

1700
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1701

1702
_exit:
×
1703
  if (code) {
×
1704
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1705
  }
1706

1707
  tsdbRowClose(&iter);
×
1708
  tSimpleHashCleanup(iColHash);
×
1709
  taosArrayClear(ctxArray);
×
1710

1711
  TAOS_RETURN(code);
×
1712
}
1713

1714
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
1715
  int32_t      code = 0, lino = 0;
×
1716
  STSDBRowIter iter = {0};
×
1717
  STSchema    *pTSchema = NULL;
×
1718
  SArray      *ctxArray = NULL;
×
1719

1720
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
1721
  int32_t sver = TSDBROW_SVERSION(&lRow);
×
1722

1723
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1724

1725
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
1726
  if (ctxArray == NULL) {
×
1727
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1728
  }
1729

1730
  // 1. prepare last
1731
  STsdbRowKey tsdbRowKey = {0};
×
1732
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1733

1734
  {
1735
    SValue tsVal = {.type = TSDB_DATA_TYPE_TIMESTAMP};
×
1736
    VALUE_SET_TRIVIAL_DATUM(&tsVal, lRow.pBlockData->aTSKEY[lRow.iRow]);
×
1737
    SLastUpdateCtx updateCtx = {
×
1738
        .lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, tsVal)};
1739
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1740
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1741
    }
1742
  }
1743

1744
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1745

1746
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1747
    SColData *pColData = &pBlockData->aColData[iColData];
×
1748
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
1749
      continue;
×
1750
    }
1751

1752
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
1753
      STsdbRowKey tsdbRowKey = {0};
×
1754
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1755

1756
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
1757
      if (colType == 2) {
×
1758
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
1759
        TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit);
×
1760

1761
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1762
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1763
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1764
        }
1765
        break;
×
1766
      }
1767
    }
1768
  }
1769

1770
  // 2. prepare last row
1771
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1772
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
1773
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1774
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1775
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1776
    }
1777
  }
1778

1779
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1780

1781
_exit:
×
1782
  tsdbRowClose(&iter);
×
1783
  taosMemoryFreeClear(pTSchema);
×
1784
  taosArrayDestroy(ctxArray);
×
1785

1786
  TAOS_RETURN(code);
×
1787
}
1788

1789
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1790
                            int nCols, int16_t *slotIds);
1791

1792
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1793
                               int nCols, int16_t *slotIds);
1794

1795
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
17,929✔
1796
                                    SCacheRowsReader *pr, int8_t ltype) {
1797
  int32_t code = 0, lino = 0;
17,929✔
1798
  // rocksdb_writebatch_t *wb = NULL;
1799
  SArray *pTmpColArray = NULL;
17,929✔
1800
  bool    extraTS = false;
17,929✔
1801

1802
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
17,929✔
1803
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
17,929✔
1804
    // ignore 'ts' loaded from cache and load it from tsdb
1805
    // SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
1806
    // tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
1807

1808
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
2,060✔
1809
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
2,060✔
1810
      TAOS_RETURN(terrno);
×
1811
    }
1812

1813
    extraTS = true;
2,060✔
1814
  }
1815

1816
  int      num_keys = TARRAY_SIZE(remainCols);
17,929✔
1817
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
17,929✔
1818

1819
  int16_t *lastColIds = NULL, *lastSlotIds = NULL, *lastrowColIds = NULL, *lastrowSlotIds = NULL;
17,929✔
1820
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
17,929✔
1821
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
17,929✔
1822
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
17,929✔
1823
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
17,929✔
1824
  SArray *lastTmpColArray = NULL, *lastTmpIndexArray = NULL, *lastrowTmpColArray = NULL, *lastrowTmpIndexArray = NULL;
17,929✔
1825

1826
  int lastIndex = 0;
17,929✔
1827
  int lastrowIndex = 0;
17,929✔
1828

1829
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
17,929✔
1830
    TAOS_CHECK_EXIT(terrno);
×
1831
  }
1832

1833
  for (int i = 0; i < num_keys; ++i) {
96,576✔
1834
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
78,164✔
1835
    if (extraTS && !i) {
78,164✔
1836
      slotIds[i] = 0;
2,060✔
1837
    } else {
1838
      slotIds[i] = pr->pSlotIds[idxKey->idx];
76,104✔
1839
    }
1840

1841
    if (IS_LAST_KEY(idxKey->key)) {
78,647✔
1842
      if (NULL == lastTmpIndexArray) {
41,824✔
1843
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
10,030✔
1844
        if (!lastTmpIndexArray) {
10,030✔
1845
          TAOS_CHECK_EXIT(terrno);
×
1846
        }
1847
      }
1848
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
41,824✔
1849
        TAOS_CHECK_EXIT(terrno);
×
1850
      }
1851
      lastColIds[lastIndex] = idxKey->key.cid;
41,824✔
1852
      if (extraTS && !i) {
41,824✔
1853
        lastSlotIds[lastIndex] = 0;
2,060✔
1854
      } else {
1855
        lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
39,764✔
1856
      }
1857
      lastIndex++;
41,824✔
1858
    } else {
1859
      if (NULL == lastrowTmpIndexArray) {
36,340✔
1860
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
7,416✔
1861
        if (!lastrowTmpIndexArray) {
7,899✔
1862
          TAOS_CHECK_EXIT(terrno);
×
1863
        }
1864
      }
1865
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
36,823✔
1866
        TAOS_CHECK_EXIT(terrno);
×
1867
      }
1868
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
36,823✔
1869
      if (extraTS && !i) {
36,823✔
1870
        lastrowSlotIds[lastrowIndex] = 0;
×
1871
      } else {
1872
        lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
36,823✔
1873
      }
1874
      lastrowIndex++;
36,823✔
1875
    }
1876
  }
1877

1878
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
17,929✔
1879
  if (!pTmpColArray) {
17,929✔
1880
    TAOS_CHECK_EXIT(terrno);
×
1881
  }
1882

1883
  if (lastTmpIndexArray != NULL) {
17,929✔
1884
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
10,030✔
1885
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
48,032✔
1886
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
38,002✔
1887
                           taosArrayGet(lastTmpColArray, i))) {
38,002✔
1888
        TAOS_CHECK_EXIT(terrno);
×
1889
      }
1890
    }
1891
  }
1892

1893
  if (lastrowTmpIndexArray != NULL) {
17,929✔
1894
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
7,899✔
1895
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
40,900✔
1896
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
33,001✔
1897
                           taosArrayGet(lastrowTmpColArray, i))) {
33,001✔
1898
        TAOS_CHECK_EXIT(terrno);
×
1899
      }
1900
    }
1901
  }
1902

1903
  SLRUCache *pCache = pTsdb->lruCache;
17,929✔
1904
  for (int i = 0; i < num_keys; ++i) {
96,576✔
1905
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
78,647✔
1906
    SLastCol *pLastCol = NULL;
78,647✔
1907

1908
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
78,647✔
1909
      pLastCol = taosArrayGet(pTmpColArray, i);
71,003✔
1910
    }
1911

1912
    // still null, then make up a none col value
1913
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
78,647✔
1914
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
78,647✔
1915
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
1916
    if (!pLastCol) {
78,647✔
1917
      pLastCol = &noneCol;
7,644✔
1918
    }
1919

1920
    if (!extraTS || i > 0) {
78,647✔
1921
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from tsdb, col_id:%d col_flag:%d ts:%" PRId64,
76,587✔
1922
                TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, pLastCol->colVal.cid,
1923
                pLastCol->colVal.flag, pLastCol->rowKey.ts);
1924
      taosArraySet(pLastArray, idxKey->idx, pLastCol);
76,587✔
1925
    }
1926

1927
    // taosArrayRemove(remainCols, i);
1928

1929
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
78,164✔
UNCOV
1930
      continue;
×
1931
    }
1932
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
78,164✔
1933
      continue;
×
1934
    }
1935

1936
    // store result back to rocks cache
1937
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
78,164✔
1938
    if (code) {
78,647✔
UNCOV
1939
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
UNCOV
1940
      TAOS_CHECK_EXIT(code);
×
1941
    }
1942

1943
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
78,647✔
1944
    if (code) {
78,647✔
UNCOV
1945
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
UNCOV
1946
      TAOS_CHECK_EXIT(code);
×
1947
    }
1948

1949
    if (extraTS && i == 0) {
78,647✔
1950
      tsdbCacheFreeSLastColItem(pLastCol);
2,060✔
1951
    }
1952
  }
1953

1954
  rocksMayWrite(pTsdb, false);
17,929✔
1955

1956
_exit:
17,929✔
1957
  taosArrayDestroy(lastrowTmpIndexArray);
17,929✔
1958
  taosArrayDestroy(lastrowTmpColArray);
17,929✔
1959
  taosArrayDestroy(lastTmpIndexArray);
17,929✔
1960
  taosArrayDestroy(lastTmpColArray);
17,929✔
1961

1962
  taosMemoryFree(lastColIds);
17,929✔
1963
  taosMemoryFree(lastSlotIds);
17,929✔
1964
  taosMemoryFree(lastrowColIds);
17,929✔
1965
  taosMemoryFree(lastrowSlotIds);
17,929✔
1966

1967
  taosArrayDestroy(pTmpColArray);
17,929✔
1968

1969
  taosMemoryFree(slotIds);
17,929✔
1970

1971
  TAOS_RETURN(code);
17,929✔
1972
}
1973

1974
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
36,688✔
1975
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
1976
  int32_t code = 0, lino = 0;
36,688✔
1977
  int     num_keys = TARRAY_SIZE(remainCols);
36,688✔
1978
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
36,688✔
1979
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
36,688✔
1980
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
36,688✔
1981
  if (!keys_list || !keys_list_sizes || !key_list) {
36,688✔
UNCOV
1982
    taosMemoryFree(keys_list);
×
UNCOV
1983
    taosMemoryFree(keys_list_sizes);
×
UNCOV
1984
    TAOS_RETURN(terrno);
×
1985
  }
1986
  char  **values_list = NULL;
36,688✔
1987
  size_t *values_list_sizes = NULL;
36,688✔
1988
  for (int i = 0; i < num_keys; ++i) {
149,350✔
1989
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
112,662✔
1990
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
112,662✔
1991
    keys_list_sizes[i] = ROCKS_KEY_LEN;
112,179✔
1992
  }
1993

1994
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
36,688✔
1995

1996
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
36,688✔
1997
                                     &values_list_sizes);
1998
  if (code) {
36,688✔
UNCOV
1999
    taosMemoryFree(key_list);
×
UNCOV
2000
    taosMemoryFree(keys_list);
×
UNCOV
2001
    taosMemoryFree(keys_list_sizes);
×
2002
    TAOS_RETURN(code);
×
2003
  }
2004

2005
  SLRUCache *pCache = pTsdb->lruCache;
36,688✔
2006
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
148,384✔
2007
    SLastCol *pLastCol = NULL;
111,696✔
2008
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
111,696✔
2009
    if (ignore) {
112,179✔
2010
      ++j;
199✔
2011
      continue;
199✔
2012
    }
2013

2014
    if (values_list[i] != NULL) {
111,980✔
2015
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
36,075✔
2016
      if (code != TSDB_CODE_SUCCESS) {
36,075✔
UNCOV
2017
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2018
                  tstrerror(code));
UNCOV
2019
        goto _exit;
×
2020
      }
2021
    }
2022
    SLastCol *pToFree = pLastCol;
111,980✔
2023
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
111,980✔
2024
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
147,572✔
2025
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
36,075✔
2026
      if (code) {
36,075✔
UNCOV
2027
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
UNCOV
2028
        taosMemoryFreeClear(pToFree);
×
UNCOV
2029
        TAOS_CHECK_EXIT(code);
×
2030
      }
2031

2032
      SLastCol lastCol = *pLastCol;
36,075✔
2033
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
36,075✔
2034
      if (TSDB_CODE_SUCCESS != code) {
36,075✔
UNCOV
2035
        taosMemoryFreeClear(pToFree);
×
UNCOV
2036
        TAOS_CHECK_EXIT(code);
×
2037
      }
2038

2039
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from rocksdb, col_id:%d col_flag:%d ts:%" PRId64,
36,075✔
2040
                TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2041
                lastCol.colVal.flag, lastCol.rowKey.ts);
2042

2043
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
36,075✔
2044
      taosArrayRemove(remainCols, j);
36,075✔
2045
      taosArrayRemove(ignoreFromRocks, j);
36,075✔
2046
    } else {
2047
      ++j;
75,422✔
2048
    }
2049

2050
    taosMemoryFreeClear(pToFree);
111,497✔
2051
  }
2052

2053
  if (TARRAY_SIZE(remainCols) > 0) {
37,171✔
2054
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
2055
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
17,929✔
2056
  }
2057

2058
_exit:
36,688✔
2059
  taosMemoryFree(key_list);
36,688✔
2060
  taosMemoryFree(keys_list);
36,688✔
2061
  taosMemoryFree(keys_list_sizes);
36,688✔
2062
  if (values_list) {
36,688✔
2063
#ifdef USE_ROCKSDB
2064
    for (int i = 0; i < num_keys; ++i) {
149,350✔
2065
      rocksdb_free(values_list[i]);
112,662✔
2066
    }
2067
#endif
2068
    taosMemoryFree(values_list);
36,688✔
2069
  }
2070
  taosMemoryFree(values_list_sizes);
36,688✔
2071

2072
  TAOS_RETURN(code);
36,688✔
2073
}
2074

2075
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
884,212✔
2076
                                        int8_t ltype, SArray *keyArray) {
2077
  int32_t    code = 0, lino = 0;
884,212✔
2078
  SArray    *remainCols = NULL;
884,212✔
2079
  SArray    *ignoreFromRocks = NULL;
884,212✔
2080
  SLRUCache *pCache = pTsdb->lruCache;
884,212✔
2081
  SArray    *pCidList = pr->pCidList;
884,695✔
2082
  int        numKeys = TARRAY_SIZE(pCidList);
884,695✔
2083

2084
  for (int i = 0; i < numKeys; ++i) {
3,241,525✔
2085
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
2,356,830✔
2086

2087
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
2,356,830✔
2088
    // for select last_row, last case
2089
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
2,356,830✔
2090
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
2,356,830✔
UNCOV
2091
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
×
2092
    }
2093
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
2,356,830✔
UNCOV
2094
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
×
UNCOV
2095
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
×
2096
    }
2097

2098
    if (!taosArrayPush(keyArray, &key)) {
2,356,830✔
UNCOV
2099
      TAOS_CHECK_EXIT(terrno);
×
2100
    }
2101

2102
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
2,356,830✔
2103
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
2,356,830✔
2104
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
4,600,998✔
2105
      SLastCol lastCol = *pLastCol;
2,244,168✔
2106
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
2,244,168✔
UNCOV
2107
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2108
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
2109
      }
2110

2111
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from lru, col_id:%d col_flag:%d ts:%" PRId64, TD_VID(pTsdb->pVnode),
2,243,699✔
2112
                __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid, lastCol.colVal.flag,
2113
                lastCol.rowKey.ts);
2114

2115
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
2,244,168✔
UNCOV
2116
        code = terrno;
×
UNCOV
2117
        tsdbLRUCacheRelease(pCache, h, false);
×
2118
        goto _exit;
×
2119
      }
2120
    } else {
2121
      // no cache or cache is invalid
2122
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
112,662✔
2123
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
112,662✔
2124

2125
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
112,662✔
UNCOV
2126
        code = terrno;
×
2127
        tsdbLRUCacheRelease(pCache, h, false);
×
2128
        goto _exit;
×
2129
      }
2130

2131
      if (!remainCols) {
112,662✔
2132
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
36,688✔
UNCOV
2133
          code = terrno;
×
UNCOV
2134
          tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2135
          goto _exit;
×
2136
        }
2137
      }
2138
      if (!ignoreFromRocks) {
112,662✔
2139
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
36,688✔
UNCOV
2140
          code = terrno;
×
UNCOV
2141
          tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2142
          goto _exit;
×
2143
        }
2144
      }
2145
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
225,324✔
2146
        code = terrno;
×
UNCOV
2147
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2148
        goto _exit;
×
2149
      }
2150
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
112,662✔
2151
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
112,662✔
2152
        code = terrno;
×
2153
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2154
        goto _exit;
×
2155
      }
2156
    }
2157

2158
    if (h) {
2,356,830✔
2159
      tsdbLRUCacheRelease(pCache, h, false);
2,244,367✔
2160
    }
2161
  }
2162

2163
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
884,695✔
2164
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
36,688✔
2165

2166
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
149,350✔
2167
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
112,662✔
2168
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
112,662✔
2169
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
112,662✔
2170
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
112,662✔
UNCOV
2171
        SLastCol lastCol = *pLastCol;
×
UNCOV
2172
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
UNCOV
2173
        if (code) {
×
UNCOV
2174
          tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2175
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
UNCOV
2176
          TAOS_RETURN(code);
×
2177
        }
2178

NEW
2179
        tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from lru(2nd lookup), col_id:%d col_flag:%d ts:%" PRId64,
×
2180
                  TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2181
                  lastCol.colVal.flag, lastCol.rowKey.ts);
2182

UNCOV
2183
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
2184

UNCOV
2185
        taosArrayRemove(remainCols, i);
×
2186
        taosArrayRemove(ignoreFromRocks, i);
×
2187
      } else {
2188
        // no cache or cache is invalid
2189
        ++i;
112,662✔
2190
      }
2191
      if (h) {
112,662✔
2192
        tsdbLRUCacheRelease(pCache, h, false);
199✔
2193
      }
2194
    }
2195

2196
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
2197
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
36,688✔
2198

2199
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
36,688✔
2200
  }
2201

2202
_exit:
848,007✔
2203
  if (remainCols) {
884,695✔
2204
    taosArrayDestroy(remainCols);
36,688✔
2205
  }
2206
  if (ignoreFromRocks) {
884,695✔
2207
    taosArrayDestroy(ignoreFromRocks);
36,688✔
2208
  }
2209

2210
  TAOS_RETURN(code);
884,695✔
2211
}
2212

2213
typedef enum SMEMNEXTROWSTATES {
2214
  SMEMNEXTROW_ENTER,
2215
  SMEMNEXTROW_NEXT,
2216
} SMEMNEXTROWSTATES;
2217

2218
typedef struct SMemNextRowIter {
2219
  SMEMNEXTROWSTATES state;
2220
  STbData          *pMem;  // [input]
2221
  STbDataIter       iter;  // mem buffer skip list iterator
2222
  int64_t           lastTs;
2223
} SMemNextRowIter;
2224

2225
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
31,458,037✔
2226
                                 int nCols) {
2227
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
31,458,037✔
2228
  int32_t          code = 0;
31,458,037✔
2229
  *pIgnoreEarlierTs = false;
31,458,037✔
2230
  switch (state->state) {
31,601,825✔
2231
    case SMEMNEXTROW_ENTER: {
645,429✔
2232
      if (state->pMem != NULL) {
645,429✔
2233
        /*
2234
        if (state->pMem->maxKey <= state->lastTs) {
2235
          *ppRow = NULL;
2236
          *pIgnoreEarlierTs = true;
2237

2238
          TAOS_RETURN(code);
2239
        }
2240
        */
2241
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
645,912✔
2242

2243
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
645,635✔
2244
        if (pMemRow) {
646,118✔
2245
          *ppRow = pMemRow;
646,118✔
2246
          state->state = SMEMNEXTROW_NEXT;
646,118✔
2247

2248
          TAOS_RETURN(code);
646,118✔
2249
        }
2250
      }
2251

UNCOV
2252
      *ppRow = NULL;
×
2253

UNCOV
2254
      TAOS_RETURN(code);
×
2255
    }
2256
    case SMEMNEXTROW_NEXT:
30,998,555✔
2257
      if (tsdbTbDataIterNext(&state->iter)) {
30,998,555✔
2258
        *ppRow = tsdbTbDataIterGet(&state->iter);
61,960,314✔
2259

2260
        TAOS_RETURN(code);
30,987,058✔
2261
      } else {
2262
        *ppRow = NULL;
66,293✔
2263

2264
        TAOS_RETURN(code);
66,293✔
2265
      }
UNCOV
2266
    default:
×
2267
      break;
×
2268
  }
2269

UNCOV
2270
_err:
×
UNCOV
2271
  *ppRow = NULL;
×
2272

UNCOV
2273
  TAOS_RETURN(code);
×
2274
}
2275

2276
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2277
                                  int nCols);
2278
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2279

2280
typedef struct {
2281
  TSDBROW             *pRow;
2282
  bool                 stop;
2283
  bool                 next;
2284
  bool                 ignoreEarlierTs;
2285
  void                *iter;
2286
  _next_row_fn_t       nextRowFn;
2287
  _next_row_clear_fn_t nextRowClearFn;
2288
} TsdbNextRowState;
2289

2290
typedef struct {
2291
  SArray           *pMemDelData;
2292
  SArray           *pSkyline;
2293
  int64_t           iSkyline;
2294
  SBlockIdx         idx;
2295
  SMemNextRowIter   memState;
2296
  SMemNextRowIter   imemState;
2297
  TSDBROW           memRow, imemRow;
2298
  TsdbNextRowState  input[2];
2299
  SCacheRowsReader *pr;
2300
  STsdb            *pTsdb;
2301
} MemNextRowIter;
2302

2303
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
884,695✔
2304
                              STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
2305
  int32_t code = 0, lino = 0;
884,695✔
2306

2307
  STbData *pMem = NULL;
884,695✔
2308
  if (pReadSnap->pMem) {
884,695✔
2309
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
884,695✔
2310
  }
2311

2312
  STbData *pIMem = NULL;
884,695✔
2313
  if (pReadSnap->pIMem) {
884,695✔
2314
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
13,253✔
2315
  }
2316

2317
  pIter->pTsdb = pTsdb;
884,695✔
2318

2319
  pIter->pMemDelData = NULL;
884,695✔
2320

2321
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
884,695✔
2322

2323
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
884,695✔
2324

2325
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
884,695✔
2326
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
884,489✔
2327

2328
  if (pMem) {
884,695✔
2329
    pIter->memState.pMem = pMem;
618,744✔
2330
    pIter->memState.state = SMEMNEXTROW_ENTER;
618,744✔
2331
    pIter->input[0].stop = false;
618,744✔
2332
    pIter->input[0].next = true;
618,744✔
2333
  }
2334

2335
  if (pIMem) {
884,695✔
2336
    pIter->imemState.pMem = pIMem;
13,253✔
2337
    pIter->imemState.state = SMEMNEXTROW_ENTER;
13,253✔
2338
    pIter->input[1].stop = false;
13,253✔
2339
    pIter->input[1].next = true;
13,253✔
2340
  }
2341

2342
  pIter->pr = pr;
884,695✔
2343

2344
_exit:
884,695✔
2345
  if (code) {
884,695✔
UNCOV
2346
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2347
  }
2348

2349
  TAOS_RETURN(code);
884,695✔
2350
}
2351

2352
static void memRowIterClose(MemNextRowIter *pIter) {
884,695✔
2353
  for (int i = 0; i < 2; ++i) {
2,654,085✔
2354
    if (pIter->input[i].nextRowClearFn) {
1,769,390✔
UNCOV
2355
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
2356
    }
2357
  }
2358

2359
  if (pIter->pSkyline) {
884,695✔
2360
    taosArrayDestroy(pIter->pSkyline);
618,744✔
2361
  }
2362

2363
  if (pIter->pMemDelData) {
884,489✔
2364
    taosArrayDestroy(pIter->pMemDelData);
884,489✔
2365
  }
2366
}
884,489✔
2367

2368
static void freeTableInfoFunc(void *param) {
620,648✔
2369
  void **p = (void **)param;
620,648✔
2370
  taosMemoryFreeClear(*p);
620,648✔
2371
}
620,648✔
2372

2373
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
650,424✔
2374
  if (!pReader->pTableMap) {
650,424✔
2375
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
403,370✔
2376
    if (!pReader->pTableMap) {
403,164✔
UNCOV
2377
      return NULL;
×
2378
    }
2379

2380
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
403,164✔
2381
  }
2382

2383
  STableLoadInfo  *pInfo = NULL;
650,218✔
2384
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
650,218✔
2385
  if (!ppInfo) {
650,424✔
2386
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
620,648✔
2387
    if (pInfo) {
620,442✔
2388
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
620,442✔
UNCOV
2389
        return NULL;
×
2390
      }
2391
    }
2392

2393
    return pInfo;
620,648✔
2394
  }
2395

2396
  return *ppInfo;
29,776✔
2397
}
2398

2399
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
31,703,630✔
2400
  int32_t code = 0, lino = 0;
31,703,630✔
2401

2402
  for (;;) {
43,568✔
2403
    for (int i = 0; i < 2; ++i) {
95,579,228✔
2404
      if (pIter->input[i].next && !pIter->input[i].stop) {
63,598,632✔
2405
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
31,682,713✔
2406
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
2407
                        &lino, _exit);
2408

2409
        if (pIter->input[i].pRow == NULL) {
31,661,154✔
2410
          pIter->input[i].stop = true;
62,642✔
2411
          pIter->input[i].next = false;
62,642✔
2412
        }
2413
      }
2414
    }
2415

2416
    if (pIter->input[0].stop && pIter->input[1].stop) {
31,980,596✔
2417
      return NULL;
328,593✔
2418
    }
2419

2420
    TSDBROW *max[2] = {0};
31,582,375✔
2421
    int      iMax[2] = {-1, -1};
31,422,242✔
2422
    int      nMax = 0;
31,586,012✔
2423
    SRowKey  maxKey = {.ts = TSKEY_MIN};
31,586,012✔
2424

2425
    for (int i = 0; i < 2; ++i) {
94,735,241✔
2426
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
63,132,131✔
2427
        STsdbRowKey tsdbRowKey = {0};
31,643,143✔
2428
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
31,637,652✔
2429

2430
        // merging & deduplicating on client side
2431
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
31,588,624✔
2432
        if (c <= 0) {
31,577,706✔
2433
          if (c < 0) {
31,586,701✔
2434
            nMax = 0;
31,592,469✔
2435
            maxKey = tsdbRowKey.key;
31,592,469✔
2436
          }
2437

2438
          iMax[nMax] = i;
31,586,701✔
2439
          max[nMax++] = pIter->input[i].pRow;
31,612,039✔
2440
        }
2441
        pIter->input[i].next = false;
31,595,557✔
2442
      }
2443
    }
2444

2445
    TSDBROW *merge[2] = {0};
31,603,110✔
2446
    int      iMerge[2] = {-1, -1};
31,606,200✔
2447
    int      nMerge = 0;
31,554,153✔
2448
    for (int i = 0; i < nMax; ++i) {
63,146,359✔
2449
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
31,543,441✔
2450

2451
      if (!pIter->pSkyline) {
31,624,399✔
2452
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
618,744✔
2453
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
618,744✔
2454

2455
        uint64_t        uid = pIter->idx.uid;
618,538✔
2456
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
618,744✔
2457
        TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
618,538✔
2458

2459
        if (pInfo->pTombData == NULL) {
618,538✔
2460
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
604,687✔
2461
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
604,687✔
2462
        }
2463

2464
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
618,538✔
UNCOV
2465
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2466
        }
2467

2468
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
618,744✔
2469
        if (delSize > 0) {
618,744✔
2470
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
11,613✔
2471
          TAOS_CHECK_GOTO(code, &lino, _exit);
11,613✔
2472
        }
2473
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
618,744✔
2474
      }
2475

2476
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
31,623,369✔
2477
      if (!deleted) {
31,524,077✔
2478
        iMerge[nMerge] = iMax[i];
31,519,443✔
2479
        merge[nMerge++] = max[i];
31,307,469✔
2480
      }
2481

2482
      pIter->input[iMax[i]].next = deleted;
31,593,236✔
2483
    }
2484

2485
    if (nMerge > 0) {
31,602,918✔
2486
      pIter->input[iMerge[0]].next = true;
31,602,198✔
2487

2488
      return merge[0];
31,613,116✔
2489
    }
2490
  }
2491

UNCOV
2492
_exit:
×
UNCOV
2493
  if (code) {
×
UNCOV
2494
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2495
  }
2496

UNCOV
2497
  return NULL;
×
2498
}
2499

2500
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
382,102✔
2501
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
382,102✔
2502
  *ppDst = taosMemoryMalloc(len);
382,102✔
2503
  if (NULL == *ppDst) {
382,102✔
UNCOV
2504
    TAOS_RETURN(terrno);
×
2505
  }
2506
  memcpy(*ppDst, pSrc, len);
382,102✔
2507

2508
  TAOS_RETURN(TSDB_CODE_SUCCESS);
382,102✔
2509
}
2510

2511
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
31,418,551✔
2512
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
31,418,551✔
2513
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
382,102✔
2514
  }
2515

2516
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
31,086,301✔
2517
    TAOS_RETURN(TSDB_CODE_SUCCESS);
31,182,773✔
2518
  }
2519

2520
  taosMemoryFreeClear(pReader->pCurrSchema);
1,172✔
2521
  TAOS_RETURN(
20,330✔
2522
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
2523
}
2524

2525
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
884,489✔
2526
                                        SArray *keyArray) {
2527
  int32_t        code = 0;
884,489✔
2528
  int32_t        lino = 0;
884,489✔
2529
  STSchema      *pTSchema = pr->pSchema;
884,489✔
2530
  SLRUCache     *pCache = pTsdb->lruCache;
884,695✔
2531
  SArray        *pCidList = pr->pCidList;
884,695✔
2532
  int            numKeys = TARRAY_SIZE(pCidList);
884,695✔
2533
  MemNextRowIter iter = {0};
884,695✔
2534
  SSHashObj     *iColHash = NULL;
884,695✔
2535
  STSDBRowIter   rowIter = {0};
884,695✔
2536

2537
  // 1, get from mem, imem filtered with delete info
2538
  TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
884,695✔
2539

2540
  TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
884,489✔
2541
  if (!pRow) {
884,226✔
2542
    goto _exit;
265,951✔
2543
  }
2544

2545
  int32_t sversion = TSDBROW_SVERSION(pRow);
618,275✔
2546
  if (sversion != -1) {
618,538✔
2547
    TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
618,744✔
2548

2549
    pTSchema = pr->pCurrSchema;
618,744✔
2550
  }
2551
  int32_t nCol = pTSchema->numOfCols;
618,538✔
2552

2553
  STsdbRowKey rowKey = {0};
618,744✔
2554
  tsdbRowGetKey(pRow, &rowKey);
618,744✔
2555

2556
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
618,744✔
2557

2558
  int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
618,744✔
2559
  for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) {
3,579,095✔
2560
    SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
2,960,351✔
2561
    if (pColVal->cid < pTargetCol->colVal.cid) {
2,960,351✔
2562
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
1,324,405✔
2563

2564
      continue;
1,324,199✔
2565
    }
2566
    if (pColVal->cid > pTargetCol->colVal.cid) {
1,635,946✔
UNCOV
2567
      break;
×
2568
    }
2569

2570
    int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
1,635,946✔
2571
    if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
1,635,740✔
2572
      if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
559,609✔
2573
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
559,609✔
2574
        TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
559,609✔
2575

2576
        tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from memtable, col_id:%d col_flag:%d ts:%" PRId64,
559,609✔
2577
                  TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2578
                  lastCol.colVal.flag, rowKey.key.ts);
2579

2580
        tsdbCacheFreeSLastColItem(pTargetCol);
559,609✔
2581
        taosArraySet(pLastArray, jCol, &lastCol);
559,609✔
2582
      }
2583
    } else {
2584
      if (COL_VAL_IS_VALUE(pColVal)) {
1,076,337✔
2585
        if (cmp_res <= 0) {
880,691✔
2586
          SLastCol lastCol = {
880,691✔
2587
              .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2588
          TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
880,485✔
2589

2590
          tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64
880,691✔
2591
                    " from memtable(last) and memtable(newer), col_id:%d col_flag:%d ts:%" PRId64,
2592
                    TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2593
                    lastCol.colVal.flag, rowKey.key.ts);
2594

2595
          tsdbCacheFreeSLastColItem(pTargetCol);
880,691✔
2596
          taosArraySet(pLastArray, jCol, &lastCol);
880,691✔
2597
        }
2598
      } else {
2599
        if (!iColHash) {
195,646✔
2600
          iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
103,438✔
2601
          if (iColHash == NULL) {
103,438✔
UNCOV
2602
            TAOS_CHECK_EXIT(terrno);
×
2603
          }
2604
        }
2605

2606
        if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
195,646✔
UNCOV
2607
          TAOS_CHECK_EXIT(terrno);
×
2608
        }
2609
      }
2610
    }
2611

2612
    ++jCol;
1,635,946✔
2613

2614
    if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) {
1,635,946✔
2615
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
1,054,346✔
2616
    }
2617
  }
2618
  tsdbRowClose(&rowIter);
618,744✔
2619

2620
  if (iColHash && tSimpleHashGetSize(iColHash) > 0) {
618,744✔
2621
    pRow = memRowIterGet(&iter, false, NULL, 0);
103,438✔
2622
    while (pRow) {
31,037,501✔
2623
      if (tSimpleHashGetSize(iColHash) == 0) {
30,974,859✔
2624
        break;
40,796✔
2625
      }
2626

2627
      sversion = TSDBROW_SVERSION(pRow);
30,944,775✔
2628
      if (sversion != -1) {
30,950,543✔
2629
        TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
30,947,659✔
2630

2631
        pTSchema = pr->pCurrSchema;
30,901,927✔
2632
      }
2633
      nCol = pTSchema->numOfCols;
30,912,227✔
2634

2635
      STsdbRowKey tsdbRowKey = {0};
30,912,227✔
2636
      tsdbRowGetKey(pRow, &tsdbRowKey);
30,922,321✔
2637

2638
      TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
30,934,269✔
2639

2640
      iCol = 0;
30,771,529✔
2641
      for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
366,119,688✔
2642
           pColVal = tsdbRowIterNext(&rowIter), iCol++) {
334,638,901✔
2643
        int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
336,387,223✔
2644
        if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
334,632,309✔
2645
          SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol];
129,665✔
2646

2647
          int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key);
129,665✔
2648
          if (cmp_res <= 0) {
129,665✔
2649
            SLastCol lastCol = {
129,665✔
2650
                .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2651
            TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
129,665✔
2652

2653
            tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from memtable(hash), col_id:%d col_flag:%d ts:%" PRId64,
129,665✔
2654
                      TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2655
                      lastCol.colVal.flag, tsdbRowKey.key.ts);
2656

2657
            tsdbCacheFreeSLastColItem(pTargetCol);
129,665✔
2658
            taosArraySet(pLastArray, *pjCol, &lastCol);
129,665✔
2659
          }
2660

2661
          TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
129,665✔
2662
        }
2663
      }
2664
      tsdbRowClose(&rowIter);
29,839,379✔
2665

2666
      pRow = memRowIterGet(&iter, false, NULL, 0);
30,264,769✔
2667
    }
2668
  }
2669

2670
_exit:
846,997✔
2671
  if (code) {
884,695✔
UNCOV
2672
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2673

UNCOV
2674
    tsdbRowClose(&rowIter);
×
2675
  }
2676

2677
  tSimpleHashCleanup(iColHash);
884,695✔
2678

2679
  memRowIterClose(&iter);
884,695✔
2680

2681
  TAOS_RETURN(code);
884,489✔
2682
}
2683

2684
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
884,695✔
2685
  int32_t code = 0;
884,695✔
2686
  int32_t lino = 0;
884,695✔
2687

2688
  tsdbDebug("vgId:%d, %s start, qid:%s uid:%" PRId64 " ltype:%d", TD_VID(pTsdb->pVnode), __func__,
884,695✔
2689
            pr && pr->idstr ? pr->idstr : "null", uid, ltype);
2690

2691
  SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
884,695✔
2692
  if (!keyArray) {
884,695✔
UNCOV
2693
    TAOS_CHECK_EXIT(terrno);
×
2694
  }
2695

2696
  TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
884,695✔
2697

2698
  if (tsUpdateCacheBatch) {
884,695✔
2699
    TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
884,695✔
2700
  }
2701

2702
_exit:
884,489✔
2703
  if (code) {
884,489✔
UNCOV
2704
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2705
  }
2706

2707
  if (keyArray) {
884,695✔
2708
    taosArrayDestroy(keyArray);
884,695✔
2709
  }
2710

2711
  TAOS_RETURN(code);
884,695✔
2712
}
2713

2714
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
1,765,835✔
2715
  int32_t   code = 0, lino = 0;
1,765,835✔
2716
  STSchema *pTSchema = NULL;
1,765,835✔
2717
  int       sver = -1;
1,765,835✔
2718
  int       numKeys = 0;
1,765,835✔
2719
  SArray   *remainCols = NULL;
1,765,835✔
2720

2721
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
1,765,835✔
2722

2723
  int numCols = pTSchema->numOfCols;
1,765,835✔
2724

2725
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
1,765,835✔
2726

2727
  for (int i = 0; i < numCols; ++i) {
5,954,385✔
2728
    int16_t cid = pTSchema->columns[i].colId;
4,188,550✔
2729
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
12,565,650✔
2730
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
8,377,100✔
2731
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
8,377,100✔
2732
      if (h) {
8,377,100✔
2733
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
80,256✔
2734
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
80,256✔
2735
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
796✔
2736
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
796✔
2737
                              .dirty = 1,
2738
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2739
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
796✔
2740
        }
2741
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
80,256✔
2742
        TAOS_CHECK_EXIT(code);
80,256✔
2743
      } else {
2744
        if (!remainCols) {
8,296,844✔
2745
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
1,755,812✔
2746
        }
2747
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
16,593,688✔
UNCOV
2748
          TAOS_CHECK_EXIT(terrno);
×
2749
        }
2750
      }
2751
    }
2752
  }
2753

2754
  if (remainCols) {
1,765,835✔
2755
    numKeys = TARRAY_SIZE(remainCols);
1,755,812✔
2756
  }
2757

2758
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
1,765,835✔
2759
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
1,765,835✔
2760
  char  **values_list = NULL;
1,765,835✔
2761
  size_t *values_list_sizes = NULL;
1,765,835✔
2762

2763
  if (!keys_list || !keys_list_sizes) {
1,765,835✔
UNCOV
2764
    code = terrno;
×
UNCOV
2765
    goto _exit;
×
2766
  }
2767
  const size_t klen = ROCKS_KEY_LEN;
1,765,835✔
2768

2769
  for (int i = 0; i < numKeys; ++i) {
10,062,679✔
2770
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
8,296,844✔
2771
    if (!key) {
8,296,844✔
UNCOV
2772
      code = terrno;
×
UNCOV
2773
      goto _exit;
×
2774
    }
2775
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
8,296,844✔
2776

2777
    ((SLastKey *)key)[0] = idxKey->key;
8,296,844✔
2778

2779
    keys_list[i] = key;
8,296,844✔
2780
    keys_list_sizes[i] = klen;
8,296,844✔
2781
  }
2782

2783
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
1,765,835✔
2784

2785
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
1,765,835✔
2786
                                              &values_list, &values_list_sizes),
2787
                  NULL, _exit);
2788

2789
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
2790
  for (int i = 0; i < numKeys; ++i) {
10,062,679✔
2791
    SLastCol *pLastCol = NULL;
8,296,844✔
2792
    if (values_list[i] != NULL) {
8,296,844✔
UNCOV
2793
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
UNCOV
2794
      if (code != TSDB_CODE_SUCCESS) {
×
2795
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2796
                  tstrerror(code));
UNCOV
2797
        goto _exit;
×
2798
      }
2799
    }
2800
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
8,296,844✔
2801
    SLastKey *pLastKey = &idxKey->key;
8,296,844✔
2802
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
8,296,844✔
2803
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
×
2804
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
×
2805
                             .dirty = 0,
2806
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2807

UNCOV
2808
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
×
UNCOV
2809
        taosMemoryFreeClear(pLastCol);
×
UNCOV
2810
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
2811
        goto _exit;
×
2812
      }
UNCOV
2813
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
×
UNCOV
2814
        taosMemoryFreeClear(pLastCol);
×
UNCOV
2815
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
2816
        goto _exit;
×
2817
      }
2818
    }
2819

2820
    if (pLastCol == NULL) {
8,296,844✔
2821
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
8,296,844✔
2822
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2823
    }
2824

2825
    taosMemoryFreeClear(pLastCol);
8,296,844✔
2826
  }
2827

2828
  rocksMayWrite(pTsdb, false);
1,765,835✔
2829

2830
_exit:
1,765,835✔
2831
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
1,765,835✔
2832

2833
  for (int i = 0; i < numKeys; ++i) {
10,062,679✔
2834
    taosMemoryFree(keys_list[i]);
8,296,844✔
2835
  }
2836
  taosMemoryFree(keys_list);
1,765,835✔
2837
  taosMemoryFree(keys_list_sizes);
1,765,835✔
2838
  if (values_list) {
1,765,835✔
2839
#if USE_ROCKSDB
2840
    for (int i = 0; i < numKeys; ++i) {
10,062,679✔
2841
      rocksdb_free(values_list[i]);
8,296,844✔
2842
    }
2843
#endif
2844
    taosMemoryFree(values_list);
1,765,835✔
2845
  }
2846
  taosMemoryFree(values_list_sizes);
1,765,835✔
2847
  taosArrayDestroy(remainCols);
1,765,835✔
2848
  taosMemoryFree(pTSchema);
1,765,835✔
2849

2850
  TAOS_RETURN(code);
1,765,835✔
2851
}
2852

2853
int32_t tsdbOpenCache(STsdb *pTsdb) {
4,173,999✔
2854
  int32_t code = 0, lino = 0;
4,173,999✔
2855
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
4,175,284✔
2856

2857
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
4,175,284✔
2858
  if (pCache == NULL) {
4,174,218✔
UNCOV
2859
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2860
  }
2861

2862
#ifdef USE_SHARED_STORAGE
2863
  if (tsSsEnabled) {
4,174,218✔
UNCOV
2864
    TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
×
UNCOV
2865
    TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
×
2866
  }
2867
#endif
2868

2869
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
4,174,218✔
2870

2871
  taosLRUCacheSetStrictCapacity(pCache, false);
4,175,284✔
2872

2873
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
4,175,284✔
2874

2875
_err:
4,175,284✔
2876
  if (code) {
4,175,284✔
UNCOV
2877
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2878
  }
2879

2880
  pTsdb->lruCache = pCache;
4,175,284✔
2881

2882
  TAOS_RETURN(code);
4,175,284✔
2883
}
2884

2885
void tsdbCloseCache(STsdb *pTsdb) {
4,174,547✔
2886
  SLRUCache *pCache = pTsdb->lruCache;
4,174,547✔
2887
  if (pCache) {
4,174,547✔
2888
    taosLRUCacheEraseUnrefEntries(pCache);
4,172,989✔
2889

2890
    taosLRUCacheCleanup(pCache);
4,175,106✔
2891

2892
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
4,175,284✔
2893
  }
2894

2895
#ifdef USE_SHARED_STORAGE
2896
  if (tsSsEnabled) {
4,175,284✔
UNCOV
2897
    tsdbCloseBCache(pTsdb);
×
UNCOV
2898
    tsdbClosePgCache(pTsdb);
×
2899
  }
2900
#endif
2901

2902
  tsdbCloseRocksCache(pTsdb);
4,175,284✔
2903
}
4,175,284✔
2904

UNCOV
2905
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
UNCOV
2906
  if (cacheType == 0) {  // last_row
×
UNCOV
2907
    *(uint64_t *)key = (uint64_t)uid;
×
2908
  } else {  // last
UNCOV
2909
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2910
  }
2911

UNCOV
2912
  *len = sizeof(uint64_t);
×
UNCOV
2913
}
×
2914

UNCOV
2915
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
UNCOV
2916
  tb_uid_t suid = 0;
×
2917

UNCOV
2918
  SMetaReader mr = {0};
×
UNCOV
2919
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
UNCOV
2920
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
UNCOV
2921
    metaReaderClear(&mr);  // table not esist
×
UNCOV
2922
    return 0;
×
2923
  }
2924

UNCOV
2925
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
UNCOV
2926
    suid = mr.me.ctbEntry.suid;
×
UNCOV
2927
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
2928
    suid = 0;
×
2929
  } else {
UNCOV
2930
    suid = 0;
×
2931
  }
2932

UNCOV
2933
  metaReaderClear(&mr);
×
2934

UNCOV
2935
  return suid;
×
2936
}
2937

2938
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
UNCOV
2939
  int32_t code = 0;
×
2940

UNCOV
2941
  if (pDelIdx) {
×
UNCOV
2942
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
2943
  }
2944

UNCOV
2945
  TAOS_RETURN(code);
×
2946
}
2947

UNCOV
2948
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
2949
  int32_t   code = 0;
×
2950
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
2951

2952
  for (; pDelData; pDelData = pDelData->pNext) {
×
2953
    if (!taosArrayPush(aDelData, pDelData)) {
×
UNCOV
2954
      TAOS_RETURN(terrno);
×
2955
    }
2956
  }
2957

2958
  TAOS_RETURN(code);
×
2959
}
2960

2961
static uint64_t *getUidList(SCacheRowsReader *pReader) {
9,135✔
2962
  if (!pReader->uidList) {
9,135✔
2963
    int32_t numOfTables = pReader->numOfTables;
2,373✔
2964

2965
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
2,373✔
2966
    if (!pReader->uidList) {
2,373✔
UNCOV
2967
      return NULL;
×
2968
    }
2969

2970
    for (int32_t i = 0; i < numOfTables; ++i) {
11,508✔
2971
      uint64_t uid = pReader->pTableList[i].uid;
9,135✔
2972
      pReader->uidList[i] = uid;
9,135✔
2973
    }
2974

2975
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
2,373✔
2976
  }
2977

2978
  return pReader->uidList;
9,135✔
2979
}
2980

2981
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
9,135✔
2982
                               bool isFile) {
2983
  int32_t   code = 0;
9,135✔
2984
  int32_t   numOfTables = pReader->numOfTables;
9,135✔
2985
  int64_t   suid = pReader->info.suid;
9,135✔
2986
  uint64_t *uidList = getUidList(pReader);
9,135✔
2987

2988
  if (!uidList) {
9,135✔
2989
    TAOS_RETURN(terrno);
×
2990
  }
2991

2992
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
16,863✔
2993
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
7,728✔
2994
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
7,728✔
UNCOV
2995
      continue;
×
2996
    }
2997

2998
    if (pTombBlk->minTbid.suid > suid ||
7,728✔
2999
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
7,728✔
3000
      break;
3001
    }
3002

3003
    STombBlock block = {0};
7,728✔
3004
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
7,728✔
3005
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
7,728✔
3006
    if (code != TSDB_CODE_SUCCESS) {
7,728✔
UNCOV
3007
      TAOS_RETURN(code);
×
3008
    }
3009

3010
    uint64_t        uid = uidList[j];
7,728✔
3011
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
7,728✔
3012
    if (!pInfo) {
7,728✔
UNCOV
3013
      tTombBlockDestroy(&block);
×
UNCOV
3014
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
3015
    }
3016

3017
    if (pInfo->pTombData == NULL) {
7,728✔
3018
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
966✔
3019
    }
3020

3021
    STombRecord record = {0};
7,728✔
3022
    bool        finished = false;
7,728✔
3023
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
15,456✔
3024
      code = tTombBlockGet(&block, k, &record);
7,728✔
3025
      if (code != TSDB_CODE_SUCCESS) {
7,728✔
3026
        finished = true;
×
UNCOV
3027
        break;
×
3028
      }
3029

3030
      if (record.suid < suid) {
7,728✔
UNCOV
3031
        continue;
×
3032
      }
3033
      if (record.suid > suid) {
7,728✔
UNCOV
3034
        finished = true;
×
UNCOV
3035
        break;
×
3036
      }
3037

3038
      bool newTable = false;
7,728✔
3039
      if (uid < record.uid) {
7,728✔
3040
        while (j < numOfTables && uidList[j] < record.uid) {
46,368✔
3041
          ++j;
38,640✔
3042
          newTable = true;
38,640✔
3043
        }
3044

3045
        if (j >= numOfTables) {
7,728✔
UNCOV
3046
          finished = true;
×
UNCOV
3047
          break;
×
3048
        }
3049

3050
        uid = uidList[j];
7,728✔
3051
      }
3052

3053
      if (record.uid < uid) {
7,728✔
UNCOV
3054
        continue;
×
3055
      }
3056

3057
      if (newTable) {
7,728✔
3058
        pInfo = getTableLoadInfo(pReader, uid);
7,728✔
3059
        if (!pInfo) {
7,728✔
UNCOV
3060
          code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
3061
          finished = true;
×
3062
          break;
×
3063
        }
3064
        if (pInfo->pTombData == NULL) {
7,728✔
3065
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
966✔
3066
          if (!pInfo->pTombData) {
966✔
UNCOV
3067
            code = terrno;
×
UNCOV
3068
            finished = true;
×
UNCOV
3069
            break;
×
3070
          }
3071
        }
3072
      }
3073

3074
      if (record.version <= pReader->info.verRange.maxVer) {
7,728✔
3075
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
3076
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
3077

3078
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
7,728✔
3079
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
15,456✔
UNCOV
3080
          TAOS_RETURN(terrno);
×
3081
        }
3082
      }
3083
    }
3084

3085
    tTombBlockDestroy(&block);
7,728✔
3086

3087
    if (finished) {
7,728✔
UNCOV
3088
      TAOS_RETURN(code);
×
3089
    }
3090
  }
3091

3092
  TAOS_RETURN(TSDB_CODE_SUCCESS);
9,135✔
3093
}
3094

3095
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
7,728✔
3096
  const TTombBlkArray *pBlkArray = NULL;
7,728✔
3097

3098
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
7,728✔
3099

3100
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
7,728✔
3101
}
3102

3103
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
1,407✔
3104
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
1,407✔
3105
  const TTombBlkArray *pBlkArray = NULL;
1,407✔
3106

3107
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
1,407✔
3108

3109
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
1,407✔
3110
}
3111

3112
typedef struct {
3113
  SMergeTree  mergeTree;
3114
  SMergeTree *pMergeTree;
3115
} SFSLastIter;
3116

3117
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
9,135✔
3118
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
3119
  int32_t code = 0;
9,135✔
3120
  destroySttBlockReader(pr->pLDataIterArray, NULL);
9,135✔
3121
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
9,135✔
3122
  if (pr->pLDataIterArray == NULL) return terrno;
9,135✔
3123

3124
  SMergeTreeConf conf = {
9,135✔
3125
      .uid = uid,
3126
      .suid = suid,
3127
      .pTsdb = pTsdb,
3128
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
3129
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
3130
      .strictTimeRange = false,
3131
      .cacheStatis = false,
3132
      .pSchema = pTSchema,
3133
      .pCurrentFileset = pFileSet,
3134
      .backward = 1,
3135
      .pSttFileBlockIterArray = pr->pLDataIterArray,
9,135✔
3136
      .pCols = aCols,
3137
      .numOfCols = nCols,
3138
      .loadTombFn = loadSttTomb,
3139
      .pReader = pr,
3140
      .idstr = pr->idstr,
9,135✔
3141
      .pCurRowKey = &pr->rowKey,
9,135✔
3142
  };
3143

3144
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
9,135✔
3145

3146
  iter->pMergeTree = &iter->mergeTree;
9,135✔
3147

3148
  TAOS_RETURN(code);
9,135✔
3149
}
3150

3151
static int32_t lastIterClose(SFSLastIter **iter) {
966✔
3152
  int32_t code = 0;
966✔
3153

3154
  if ((*iter)->pMergeTree) {
966✔
3155
    tMergeTreeClose((*iter)->pMergeTree);
966✔
3156
    (*iter)->pMergeTree = NULL;
966✔
3157
  }
3158

3159
  *iter = NULL;
966✔
3160

3161
  TAOS_RETURN(code);
966✔
3162
}
3163

3164
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
11,039✔
3165
  bool hasVal = false;
11,039✔
3166
  *ppRow = NULL;
11,039✔
3167

3168
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
11,039✔
3169
  if (code != 0) {
11,039✔
UNCOV
3170
    return code;
×
3171
  }
3172

3173
  if (!hasVal) {
11,039✔
3174
    *ppRow = NULL;
9,163✔
3175
    TAOS_RETURN(code);
9,163✔
3176
  }
3177

3178
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
1,876✔
3179
  TAOS_RETURN(code);
1,876✔
3180
}
3181

3182
typedef enum SFSNEXTROWSTATES {
3183
  SFSNEXTROW_FS,
3184
  SFSNEXTROW_FILESET,
3185
  SFSNEXTROW_INDEXLIST,
3186
  SFSNEXTROW_BRINBLOCK,
3187
  SFSNEXTROW_BRINRECORD,
3188
  SFSNEXTROW_BLOCKDATA,
3189
  SFSNEXTROW_BLOCKROW,
3190
  SFSNEXTROW_NEXTSTTROW
3191
} SFSNEXTROWSTATES;
3192

3193
struct CacheNextRowIter;
3194

3195
typedef struct SFSNextRowIter {
3196
  SFSNEXTROWSTATES         state;         // [input]
3197
  SBlockIdx               *pBlockIdxExp;  // [input]
3198
  STSchema                *pTSchema;      // [input]
3199
  tb_uid_t                 suid;
3200
  tb_uid_t                 uid;
3201
  int32_t                  iFileSet;
3202
  STFileSet               *pFileSet;
3203
  TFileSetArray           *aDFileSet;
3204
  SArray                  *pIndexList;
3205
  int32_t                  iBrinIndex;
3206
  SBrinBlock               brinBlock;
3207
  SBrinBlock              *pBrinBlock;
3208
  int32_t                  iBrinRecord;
3209
  SBrinRecord              brinRecord;
3210
  SBlockData               blockData;
3211
  SBlockData              *pBlockData;
3212
  int32_t                  nRow;
3213
  int32_t                  iRow;
3214
  TSDBROW                  row;
3215
  int64_t                  lastTs;
3216
  SFSLastIter              lastIter;
3217
  SFSLastIter             *pLastIter;
3218
  int8_t                   lastEmpty;
3219
  TSDBROW                 *pLastRow;
3220
  SRow                    *pTSRow;
3221
  SRowMerger               rowMerger;
3222
  SCacheRowsReader        *pr;
3223
  struct CacheNextRowIter *pRowIter;
3224
} SFSNextRowIter;
3225

3226
static void clearLastFileSet(SFSNextRowIter *state);
3227

3228
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
18,384✔
3229
                                int nCols) {
3230
  int32_t         code = 0, lino = 0;
18,384✔
3231
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
18,867✔
3232
  STsdb          *pTsdb = state->pr->pTsdb;
18,867✔
3233

3234
  if (SFSNEXTROW_FS == state->state) {
18,867✔
3235
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
17,929✔
3236

3237
    state->state = SFSNEXTROW_FILESET;
17,929✔
3238
  }
3239

3240
  if (SFSNEXTROW_FILESET == state->state) {
18,867✔
3241
  _next_fileset:
25,160✔
3242
    clearLastFileSet(state);
25,160✔
3243

3244
    if (--state->iFileSet < 0) {
25,160✔
3245
      *ppRow = NULL;
16,025✔
3246

3247
      TAOS_RETURN(code);
16,025✔
3248
    } else {
3249
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
9,135✔
3250
    }
3251

3252
    STFileObj **pFileObj = state->pFileSet->farr;
9,135✔
3253
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
9,135✔
3254
      if (state->pFileSet != state->pr->pCurFileSet) {
7,728✔
3255
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
7,728✔
3256
        const char           *filesName[4] = {0};
7,728✔
3257
        if (pFileObj[0] != NULL) {
7,728✔
3258
          conf.files[0].file = *pFileObj[0]->f;
7,728✔
3259
          conf.files[0].exist = true;
7,728✔
3260
          filesName[0] = pFileObj[0]->fname;
7,728✔
3261

3262
          conf.files[1].file = *pFileObj[1]->f;
7,728✔
3263
          conf.files[1].exist = true;
7,728✔
3264
          filesName[1] = pFileObj[1]->fname;
7,728✔
3265

3266
          conf.files[2].file = *pFileObj[2]->f;
7,728✔
3267
          conf.files[2].exist = true;
7,728✔
3268
          filesName[2] = pFileObj[2]->fname;
7,728✔
3269
        }
3270

3271
        if (pFileObj[3] != NULL) {
7,728✔
3272
          conf.files[3].exist = true;
7,728✔
3273
          conf.files[3].file = *pFileObj[3]->f;
7,728✔
3274
          filesName[3] = pFileObj[3]->fname;
7,728✔
3275
        }
3276

3277
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
7,728✔
3278

3279
        state->pr->pCurFileSet = state->pFileSet;
7,728✔
3280

3281
        code = loadDataTomb(state->pr, state->pr->pFileReader);
7,728✔
3282
        if (code != TSDB_CODE_SUCCESS) {
7,728✔
UNCOV
3283
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3284
                    tstrerror(code));
UNCOV
3285
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3286
        }
3287

3288
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
7,728✔
3289
      }
3290

3291
      if (!state->pIndexList) {
7,728✔
3292
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
7,728✔
3293
        if (!state->pIndexList) {
7,728✔
UNCOV
3294
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3295
        }
3296
      } else {
UNCOV
3297
        taosArrayClear(state->pIndexList);
×
3298
      }
3299

3300
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
7,728✔
3301

3302
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
15,456✔
3303
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
7,728✔
3304
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
7,728✔
3305
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
7,728✔
3306
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
1,932✔
UNCOV
3307
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3308
            }
3309
          }
UNCOV
3310
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
UNCOV
3311
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
3312
          break;
3313
        }
3314
      }
3315

3316
      int indexSize = TARRAY_SIZE(state->pIndexList);
7,728✔
3317
      if (indexSize <= 0) {
7,728✔
3318
        goto _check_stt_data;
6,762✔
3319
      }
3320

3321
      state->state = SFSNEXTROW_INDEXLIST;
966✔
3322
      state->iBrinIndex = 1;
966✔
3323
    }
3324

3325
  _check_stt_data:
9,135✔
3326
    if (state->pFileSet != state->pr->pCurFileSet) {
9,135✔
3327
      state->pr->pCurFileSet = state->pFileSet;
1,407✔
3328
    }
3329

3330
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
9,135✔
3331
                                 state->pr, state->lastTs, aCols, nCols),
3332
                    &lino, _err);
3333

3334
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
9,135✔
3335

3336
    if (!state->pLastRow) {
9,135✔
3337
      state->lastEmpty = 1;
7,728✔
3338

3339
      if (SFSNEXTROW_INDEXLIST != state->state) {
7,728✔
3340
        clearLastFileSet(state);
6,762✔
3341
        goto _next_fileset;
6,762✔
3342
      }
3343
    } else {
3344
      state->lastEmpty = 0;
1,407✔
3345

3346
      if (SFSNEXTROW_INDEXLIST != state->state) {
1,407✔
3347
        state->state = SFSNEXTROW_NEXTSTTROW;
1,407✔
3348

3349
        *ppRow = state->pLastRow;
1,407✔
3350
        state->pLastRow = NULL;
1,407✔
3351

3352
        TAOS_RETURN(code);
1,407✔
3353
      }
3354
    }
3355

3356
    state->pLastIter = &state->lastIter;
966✔
3357
  }
3358

3359
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
1,904✔
3360
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
938✔
3361

3362
    if (!state->pLastRow) {
938✔
3363
      if (state->pLastIter) {
469✔
UNCOV
3364
        code = lastIterClose(&state->pLastIter);
×
UNCOV
3365
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3366
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3367
                    tstrerror(code));
UNCOV
3368
          TAOS_RETURN(code);
×
3369
        }
3370
      }
3371

3372
      clearLastFileSet(state);
469✔
3373
      state->state = SFSNEXTROW_FILESET;
469✔
3374
      goto _next_fileset;
469✔
3375
    } else {
3376
      *ppRow = state->pLastRow;
469✔
3377
      state->pLastRow = NULL;
469✔
3378

3379
      TAOS_RETURN(code);
469✔
3380
    }
3381
  }
3382

3383
  if (SFSNEXTROW_INDEXLIST == state->state) {
966✔
3384
    SBrinBlk *pBrinBlk = NULL;
966✔
3385
  _next_brinindex:
966✔
3386
    if (--state->iBrinIndex < 0) {
966✔
UNCOV
3387
      if (state->pLastRow) {
×
UNCOV
3388
        state->state = SFSNEXTROW_NEXTSTTROW;
×
UNCOV
3389
        *ppRow = state->pLastRow;
×
UNCOV
3390
        state->pLastRow = NULL;
×
UNCOV
3391
        return code;
×
3392
      }
3393

UNCOV
3394
      clearLastFileSet(state);
×
3395
      goto _next_fileset;
×
3396
    } else {
3397
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
966✔
3398
    }
3399

3400
    if (!state->pBrinBlock) {
966✔
3401
      state->pBrinBlock = &state->brinBlock;
966✔
3402
    } else {
UNCOV
3403
      tBrinBlockClear(&state->brinBlock);
×
3404
    }
3405

3406
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
966✔
3407

3408
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
966✔
3409
    state->state = SFSNEXTROW_BRINBLOCK;
966✔
3410
  }
3411

3412
  if (SFSNEXTROW_BRINBLOCK == state->state) {
966✔
3413
  _next_brinrecord:
966✔
3414
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
966✔
UNCOV
3415
      tBrinBlockClear(&state->brinBlock);
×
UNCOV
3416
      goto _next_brinindex;
×
3417
    }
3418

3419
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
966✔
3420

3421
    SBrinRecord *pRecord = &state->brinRecord;
966✔
3422
    if (pRecord->uid != state->uid) {
966✔
3423
      // TODO: goto next brin block early
UNCOV
3424
      --state->iBrinRecord;
×
3425
      goto _next_brinrecord;
×
3426
    }
3427

3428
    state->state = SFSNEXTROW_BRINRECORD;
966✔
3429
  }
3430

3431
  if (SFSNEXTROW_BRINRECORD == state->state) {
966✔
3432
    SBrinRecord *pRecord = &state->brinRecord;
966✔
3433

3434
    if (!state->pBlockData) {
966✔
3435
      state->pBlockData = &state->blockData;
966✔
3436

3437
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
966✔
3438
    } else {
UNCOV
3439
      tBlockDataReset(state->pBlockData);
×
3440
    }
3441

3442
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
966✔
3443
      --nCols;
966✔
3444
      ++aCols;
966✔
3445
    }
3446

3447
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
966✔
3448
                                                      state->pTSchema, aCols, nCols),
3449
                    &lino, _err);
3450

3451
    state->nRow = state->blockData.nRow;
966✔
3452
    state->iRow = state->nRow - 1;
966✔
3453

3454
    state->state = SFSNEXTROW_BLOCKROW;
966✔
3455
  }
3456

3457
  if (SFSNEXTROW_BLOCKROW == state->state) {
966✔
3458
    if (state->iRow < 0) {
966✔
UNCOV
3459
      --state->iBrinRecord;
×
UNCOV
3460
      goto _next_brinrecord;
×
3461
    }
3462

3463
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
966✔
3464
    if (!state->pLastIter) {
966✔
UNCOV
3465
      *ppRow = &state->row;
×
UNCOV
3466
      --state->iRow;
×
UNCOV
3467
      return code;
×
3468
    }
3469

3470
    if (!state->pLastRow) {
966✔
3471
      // get next row from fslast and process with fs row, --state->Row if select fs row
3472
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
966✔
3473
    }
3474

3475
    if (!state->pLastRow) {
966✔
3476
      if (state->pLastIter) {
966✔
3477
        code = lastIterClose(&state->pLastIter);
966✔
3478
        if (code != TSDB_CODE_SUCCESS) {
966✔
UNCOV
3479
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3480
                    tstrerror(code));
UNCOV
3481
          TAOS_RETURN(code);
×
3482
        }
3483
      }
3484

3485
      *ppRow = &state->row;
966✔
3486
      --state->iRow;
966✔
3487
      return code;
966✔
3488
    }
3489

3490
    // process state->pLastRow & state->row
3491
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
UNCOV
3492
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
UNCOV
3493
    if (lastRowTs > rowTs) {
×
UNCOV
3494
      *ppRow = state->pLastRow;
×
UNCOV
3495
      state->pLastRow = NULL;
×
3496

3497
      TAOS_RETURN(code);
×
3498
    } else if (lastRowTs < rowTs) {
×
UNCOV
3499
      *ppRow = &state->row;
×
UNCOV
3500
      --state->iRow;
×
3501

UNCOV
3502
      TAOS_RETURN(code);
×
3503
    } else {
3504
      // TODO: merge rows and *ppRow = mergedRow
UNCOV
3505
      SRowMerger *pMerger = &state->rowMerger;
×
UNCOV
3506
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
UNCOV
3507
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3508
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3509
                  tstrerror(code));
3510
        TAOS_RETURN(code);
×
3511
      }
3512

UNCOV
3513
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
UNCOV
3514
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
3515

UNCOV
3516
      if (state->pTSRow) {
×
UNCOV
3517
        taosMemoryFree(state->pTSRow);
×
UNCOV
3518
        state->pTSRow = NULL;
×
3519
      }
3520

UNCOV
3521
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
3522

3523
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
3524
      *ppRow = &state->row;
×
3525
      --state->iRow;
×
3526

UNCOV
3527
      tsdbRowMergerClear(pMerger);
×
3528

3529
      TAOS_RETURN(code);
×
3530
    }
3531
  }
3532

3533
_err:
×
UNCOV
3534
  clearLastFileSet(state);
×
3535

3536
  *ppRow = NULL;
×
3537

3538
  if (code) {
×
3539
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3540
              tstrerror(code));
3541
  }
3542

UNCOV
3543
  TAOS_RETURN(code);
×
3544
}
3545

3546
typedef struct CacheNextRowIter {
3547
  SArray           *pMemDelData;
3548
  SArray           *pSkyline;
3549
  int64_t           iSkyline;
3550
  SBlockIdx         idx;
3551
  SMemNextRowIter   memState;
3552
  SMemNextRowIter   imemState;
3553
  SFSNextRowIter    fsState;
3554
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
3555
  TsdbNextRowState  input[3];
3556
  SCacheRowsReader *pr;
3557
  STsdb            *pTsdb;
3558
} CacheNextRowIter;
3559

3560
int32_t clearNextRowFromFS(void *iter) {
17,929✔
3561
  int32_t code = 0;
17,929✔
3562

3563
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
17,929✔
3564
  if (!state) {
17,929✔
3565
    TAOS_RETURN(code);
×
3566
  }
3567

3568
  if (state->pLastIter) {
17,929✔
3569
    code = lastIterClose(&state->pLastIter);
×
3570
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3571
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3572
      TAOS_RETURN(code);
×
3573
    }
3574
  }
3575

3576
  if (state->pBlockData) {
17,929✔
3577
    tBlockDataDestroy(state->pBlockData);
966✔
3578
    state->pBlockData = NULL;
966✔
3579
  }
3580

3581
  if (state->pBrinBlock) {
17,929✔
3582
    tBrinBlockDestroy(state->pBrinBlock);
966✔
3583
    state->pBrinBlock = NULL;
966✔
3584
  }
3585

3586
  if (state->pIndexList) {
17,929✔
3587
    taosArrayDestroy(state->pIndexList);
7,728✔
3588
    state->pIndexList = NULL;
7,728✔
3589
  }
3590

3591
  if (state->pTSRow) {
17,929✔
UNCOV
3592
    taosMemoryFree(state->pTSRow);
×
UNCOV
3593
    state->pTSRow = NULL;
×
3594
  }
3595

3596
  if (state->pRowIter->pSkyline) {
17,929✔
3597
    taosArrayDestroy(state->pRowIter->pSkyline);
15,755✔
3598
    state->pRowIter->pSkyline = NULL;
15,755✔
3599
  }
3600

3601
  TAOS_RETURN(code);
17,929✔
3602
}
3603

3604
static void clearLastFileSet(SFSNextRowIter *state) {
32,391✔
3605
  if (state->pLastIter) {
32,391✔
UNCOV
3606
    int code = lastIterClose(&state->pLastIter);
×
UNCOV
3607
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3608
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3609
      return;
×
3610
    }
3611
  }
3612

3613
  if (state->pBlockData) {
32,391✔
UNCOV
3614
    tBlockDataDestroy(state->pBlockData);
×
UNCOV
3615
    state->pBlockData = NULL;
×
3616
  }
3617

3618
  if (state->pr->pFileReader) {
32,391✔
3619
    tsdbDataFileReaderClose(&state->pr->pFileReader);
7,728✔
3620
    state->pr->pFileReader = NULL;
7,728✔
3621

3622
    state->pr->pCurFileSet = NULL;
7,728✔
3623
  }
3624

3625
  if (state->pTSRow) {
32,391✔
UNCOV
3626
    taosMemoryFree(state->pTSRow);
×
UNCOV
3627
    state->pTSRow = NULL;
×
3628
  }
3629

3630
  if (state->pRowIter->pSkyline) {
31,908✔
3631
    taosArrayDestroy(state->pRowIter->pSkyline);
469✔
3632
    state->pRowIter->pSkyline = NULL;
469✔
3633

3634
    void   *pe = NULL;
469✔
3635
    int32_t iter = 0;
469✔
3636
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
938✔
3637
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
469✔
3638
      taosArrayDestroy(pInfo->pTombData);
469✔
3639
      pInfo->pTombData = NULL;
469✔
3640
    }
3641
  }
3642
}
3643

3644
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
17,929✔
3645
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
3646
                               SCacheRowsReader *pr) {
3647
  int32_t code = 0, lino = 0;
17,929✔
3648

3649
  STbData *pMem = NULL;
17,929✔
3650
  if (pReadSnap->pMem) {
17,929✔
3651
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
17,929✔
3652
  }
3653

3654
  STbData *pIMem = NULL;
17,929✔
3655
  if (pReadSnap->pIMem) {
17,929✔
UNCOV
3656
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
3657
  }
3658

3659
  pIter->pTsdb = pTsdb;
17,929✔
3660

3661
  pIter->pMemDelData = NULL;
17,929✔
3662

3663
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
17,929✔
3664

3665
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
17,929✔
3666

3667
  pIter->fsState.pRowIter = pIter;
17,929✔
3668
  pIter->fsState.state = SFSNEXTROW_FS;
17,929✔
3669
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
17,929✔
3670
  pIter->fsState.pBlockIdxExp = &pIter->idx;
17,929✔
3671
  pIter->fsState.pTSchema = pTSchema;
17,929✔
3672
  pIter->fsState.suid = suid;
17,929✔
3673
  pIter->fsState.uid = uid;
17,929✔
3674
  pIter->fsState.lastTs = lastTs;
17,929✔
3675
  pIter->fsState.pr = pr;
17,929✔
3676

3677
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
17,929✔
3678
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
17,929✔
3679
  pIter->input[2] =
17,929✔
3680
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
17,929✔
3681

3682
  if (pMem) {
17,446✔
3683
    pIter->memState.pMem = pMem;
13,638✔
3684
    pIter->memState.state = SMEMNEXTROW_ENTER;
14,121✔
3685
    pIter->memState.lastTs = lastTs;
13,638✔
3686
    pIter->input[0].stop = false;
14,121✔
3687
    pIter->input[0].next = true;
14,121✔
3688
  }
3689

3690
  if (pIMem) {
17,446✔
UNCOV
3691
    pIter->imemState.pMem = pIMem;
×
UNCOV
3692
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
UNCOV
3693
    pIter->imemState.lastTs = lastTs;
×
UNCOV
3694
    pIter->input[1].stop = false;
×
UNCOV
3695
    pIter->input[1].next = true;
×
3696
  }
3697

3698
  pIter->pr = pr;
17,446✔
3699

3700
_err:
17,446✔
3701
  TAOS_RETURN(code);
17,446✔
3702
}
3703

3704
static void nextRowIterClose(CacheNextRowIter *pIter) {
17,929✔
3705
  for (int i = 0; i < 3; ++i) {
71,716✔
3706
    if (pIter->input[i].nextRowClearFn) {
53,787✔
3707
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
17,929✔
3708
    }
3709
  }
3710

3711
  if (pIter->pSkyline) {
17,929✔
UNCOV
3712
    taosArrayDestroy(pIter->pSkyline);
×
3713
  }
3714

3715
  if (pIter->pMemDelData) {
17,929✔
3716
    taosArrayDestroy(pIter->pMemDelData);
17,929✔
3717
  }
3718
}
17,929✔
3719

3720
// iterate next row non deleted backward ts, version (from high to low)
3721
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
24,251✔
3722
                              int16_t *aCols, int nCols) {
3723
  int32_t code = 0, lino = 0;
24,251✔
3724

3725
  for (;;) {
199✔
3726
    for (int i = 0; i < 3; ++i) {
98,766✔
3727
      if (pIter->input[i].next && !pIter->input[i].stop) {
74,316✔
3728
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
39,054✔
3729
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3730
                        &lino, _err);
3731

3732
        if (pIter->input[i].pRow == NULL) {
38,571✔
3733
          pIter->input[i].stop = true;
19,676✔
3734
          pIter->input[i].next = false;
19,676✔
3735
        }
3736
      }
3737
    }
3738

3739
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
24,450✔
3740
      *ppRow = NULL;
5,555✔
3741
      *pIgnoreEarlierTs =
11,110✔
3742
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
5,555✔
3743

3744
      TAOS_RETURN(code);
5,555✔
3745
    }
3746

3747
    // select maxpoint(s) from mem, imem, fs and last
3748
    TSDBROW *max[4] = {0};
19,378✔
3749
    int      iMax[4] = {-1, -1, -1, -1};
19,378✔
3750
    int      nMax = 0;
19,378✔
3751
    SRowKey  maxKey = {.ts = TSKEY_MIN};
19,378✔
3752

3753
    for (int i = 0; i < 3; ++i) {
77,512✔
3754
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
58,134✔
3755
        STsdbRowKey tsdbRowKey = {0};
19,847✔
3756
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
19,847✔
3757

3758
        // merging & deduplicating on client side
3759
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
19,847✔
3760
        if (c <= 0) {
19,847✔
3761
          if (c < 0) {
19,577✔
3762
            nMax = 0;
19,577✔
3763
            maxKey = tsdbRowKey.key;
19,577✔
3764
          }
3765

3766
          iMax[nMax] = i;
19,577✔
3767
          max[nMax++] = pIter->input[i].pRow;
19,577✔
3768
        }
3769
        pIter->input[i].next = false;
19,847✔
3770
      }
3771
    }
3772

3773
    // delete detection
3774
    TSDBROW *merge[4] = {0};
19,378✔
3775
    int      iMerge[4] = {-1, -1, -1, -1};
19,378✔
3776
    int      nMerge = 0;
19,378✔
3777
    for (int i = 0; i < nMax; ++i) {
38,756✔
3778
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
19,378✔
3779

3780
      if (!pIter->pSkyline) {
19,378✔
3781
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
16,224✔
3782
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
16,224✔
3783

3784
        uint64_t        uid = pIter->idx.uid;
16,224✔
3785
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
16,224✔
3786
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
16,224✔
3787

3788
        if (pInfo->pTombData == NULL) {
16,224✔
3789
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
14,292✔
3790
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
14,292✔
3791
        }
3792

3793
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
16,224✔
UNCOV
3794
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3795
        }
3796

3797
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
16,224✔
3798
        if (delSize > 0) {
16,224✔
3799
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
1,364✔
3800
          TAOS_CHECK_GOTO(code, &lino, _err);
1,364✔
3801
        }
3802
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
16,224✔
3803
      }
3804

3805
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
19,378✔
3806
      if (!deleted) {
19,378✔
3807
        iMerge[nMerge] = iMax[i];
19,179✔
3808
        merge[nMerge++] = max[i];
19,179✔
3809
      }
3810

3811
      pIter->input[iMax[i]].next = deleted;
19,378✔
3812
    }
3813

3814
    if (nMerge > 0) {
19,378✔
3815
      pIter->input[iMerge[0]].next = true;
19,179✔
3816

3817
      *ppRow = merge[0];
19,179✔
3818

3819
      TAOS_RETURN(code);
19,179✔
3820
    }
3821
  }
3822

UNCOV
3823
_err:
×
UNCOV
3824
  if (code) {
×
3825
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3826
  }
3827

UNCOV
3828
  TAOS_RETURN(code);
×
3829
}
3830

3831
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
17,446✔
3832
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
17,446✔
3833
  if (NULL == pColArray) {
17,929✔
UNCOV
3834
    TAOS_RETURN(terrno);
×
3835
  }
3836

3837
  for (int32_t i = 0; i < nCols; ++i) {
96,093✔
3838
    int16_t  slotId = slotIds[i];
78,647✔
3839
    SLastCol col = {.rowKey.ts = 0,
78,647✔
3840
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
78,164✔
3841
    if (!taosArrayPush(pColArray, &col)) {
77,681✔
UNCOV
3842
      TAOS_RETURN(terrno);
×
3843
    }
3844
  }
3845
  *ppColArray = pColArray;
17,446✔
3846

3847
  TAOS_RETURN(TSDB_CODE_SUCCESS);
17,446✔
3848
}
3849

3850
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
10,030✔
3851
                            int nCols, int16_t *slotIds) {
3852
  int32_t   code = 0, lino = 0;
10,030✔
3853
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
10,030✔
3854
  int16_t   nLastCol = nCols;
10,030✔
3855
  int16_t   noneCol = 0;
10,030✔
3856
  bool      setNoneCol = false;
10,030✔
3857
  bool      hasRow = false;
10,030✔
3858
  bool      ignoreEarlierTs = false;
10,030✔
3859
  SArray   *pColArray = NULL;
10,030✔
3860
  SColVal  *pColVal = &(SColVal){0};
10,030✔
3861

3862
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
10,030✔
3863

3864
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
10,030✔
3865
  if (NULL == aColArray) {
10,030✔
UNCOV
3866
    taosArrayDestroy(pColArray);
×
3867

UNCOV
3868
    TAOS_RETURN(terrno);
×
3869
  }
3870

3871
  for (int i = 0; i < nCols; ++i) {
51,854✔
3872
    if (!taosArrayPush(aColArray, &aCols[i])) {
83,648✔
3873
      taosArrayDestroy(pColArray);
×
3874

UNCOV
3875
      TAOS_RETURN(terrno);
×
3876
    }
3877
  }
3878

3879
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
10,030✔
3880

3881
  // inverse iterator
3882
  CacheNextRowIter iter = {0};
10,030✔
3883
  code =
3884
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
10,030✔
3885
  TAOS_CHECK_GOTO(code, &lino, _err);
10,030✔
3886

3887
  do {
3888
    TSDBROW *pRow = NULL;
16,835✔
3889
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
16,835✔
3890

3891
    if (!pRow) {
16,835✔
3892
      break;
4,603✔
3893
    }
3894

3895
    hasRow = true;
12,232✔
3896

3897
    int32_t sversion = TSDBROW_SVERSION(pRow);
12,232✔
3898
    if (sversion != -1) {
12,232✔
3899
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
10,541✔
3900

3901
      pTSchema = pr->pCurrSchema;
10,541✔
3902
    }
3903
    // int16_t nCol = pTSchema->numOfCols;
3904

3905
    STsdbRowKey rowKey = {0};
12,232✔
3906
    tsdbRowGetKey(pRow, &rowKey);
12,232✔
3907

3908
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
12,232✔
3909
      lastRowKey = rowKey;
9,078✔
3910

3911
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
47,080✔
3912
        if (iCol >= nLastCol) {
38,002✔
UNCOV
3913
          break;
×
3914
        }
3915
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
38,002✔
3916
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
38,002✔
3917
          if (!setNoneCol) {
270✔
UNCOV
3918
            noneCol = iCol;
×
UNCOV
3919
            setNoneCol = true;
×
3920
          }
3921
          continue;
270✔
3922
        }
3923
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
37,732✔
3924
          continue;
2,060✔
3925
        }
3926
        if (slotIds[iCol] == 0) {
35,672✔
3927
          STColumn *pTColumn = &pTSchema->columns[0];
9,078✔
3928
          SValue    val = {.type = pTColumn->type};
9,078✔
3929
          VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
9,078✔
3930
          *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
9,078✔
3931

3932
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
9,078✔
3933
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
9,078✔
3934

3935
          taosArraySet(pColArray, 0, &colTmp);
9,078✔
3936
          continue;
9,078✔
3937
        }
3938
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
26,594✔
3939

3940
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
26,594✔
3941
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
26,594✔
3942

3943
        if (!COL_VAL_IS_VALUE(pColVal)) {
26,594✔
3944
          if (!setNoneCol) {
8,936✔
3945
            noneCol = iCol;
5,569✔
3946
            setNoneCol = true;
5,569✔
3947
          }
3948
        } else {
3949
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
17,658✔
3950
          if (aColIndex >= 0) {
17,658✔
3951
            taosArrayRemove(aColArray, aColIndex);
17,658✔
3952
          }
3953
        }
3954
      }
3955
      if (!setNoneCol) {
9,078✔
3956
        // done, goto return pColArray
3957
        break;
3,509✔
3958
      } else {
3959
        continue;
5,569✔
3960
      }
3961
    }
3962

3963
    // merge into pColArray
3964
    setNoneCol = false;
3,154✔
3965
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
10,911✔
3966
      if (iCol >= nLastCol) {
7,757✔
UNCOV
3967
        break;
×
3968
      }
3969
      // high version's column value
3970
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
7,757✔
3971
        continue;
270✔
3972
      }
3973

3974
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
7,487✔
3975
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
7,487✔
UNCOV
3976
        continue;
×
3977
      }
3978
      SColVal *tColVal = &lastColVal->colVal;
7,487✔
3979
      if (COL_VAL_IS_VALUE(tColVal)) continue;
7,487✔
3980

3981
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
5,072✔
3982
      if (COL_VAL_IS_VALUE(pColVal)) {
5,072✔
3983
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
3,836✔
3984
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
3,836✔
3985

3986
        tsdbCacheFreeSLastColItem(lastColVal);
3,836✔
3987
        taosArraySet(pColArray, iCol, &lastCol);
3,836✔
3988
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
3,836✔
3989
        if (aColIndex >= 0) {
3,836✔
3990
          taosArrayRemove(aColArray, aColIndex);
3,836✔
3991
        }
3992
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
1,236✔
3993
        noneCol = iCol;
1,236✔
3994
        setNoneCol = true;
1,236✔
3995
      }
3996
    }
3997
  } while (setNoneCol);
8,723✔
3998

3999
  if (!hasRow) {
10,030✔
4000
    if (ignoreEarlierTs) {
952✔
UNCOV
4001
      taosArrayDestroy(pColArray);
×
4002
      pColArray = NULL;
×
4003
    } else {
4004
      taosArrayClear(pColArray);
952✔
4005
    }
4006
  }
4007
  *ppLastArray = pColArray;
10,030✔
4008

4009
  nextRowIterClose(&iter);
10,030✔
4010
  taosArrayDestroy(aColArray);
10,030✔
4011

4012
  TAOS_RETURN(code);
10,030✔
4013

UNCOV
4014
_err:
×
UNCOV
4015
  nextRowIterClose(&iter);
×
4016
  // taosMemoryFreeClear(pTSchema);
UNCOV
4017
  *ppLastArray = NULL;
×
UNCOV
4018
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
UNCOV
4019
  taosArrayDestroy(aColArray);
×
4020

UNCOV
4021
  if (code) {
×
UNCOV
4022
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4023
              tstrerror(code));
4024
  }
4025

UNCOV
4026
  TAOS_RETURN(code);
×
4027
}
4028

4029
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
7,899✔
4030
                               int nCols, int16_t *slotIds) {
4031
  int32_t   code = 0, lino = 0;
7,899✔
4032
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
7,899✔
4033
  int16_t   nLastCol = nCols;
7,899✔
4034
  int16_t   noneCol = 0;
7,899✔
4035
  bool      setNoneCol = false;
7,899✔
4036
  bool      hasRow = false;
7,899✔
4037
  bool      ignoreEarlierTs = false;
7,899✔
4038
  SArray   *pColArray = NULL;
7,416✔
4039
  SColVal  *pColVal = &(SColVal){0};
7,416✔
4040

4041
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
7,899✔
4042

4043
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
7,899✔
4044
  if (NULL == aColArray) {
7,899✔
4045
    taosArrayDestroy(pColArray);
×
4046

UNCOV
4047
    TAOS_RETURN(terrno);
×
4048
  }
4049

4050
  for (int i = 0; i < nCols; ++i) {
44,722✔
4051
    if (!taosArrayPush(aColArray, &aCols[i])) {
73,646✔
4052
      taosArrayDestroy(pColArray);
×
4053

UNCOV
4054
      TAOS_RETURN(terrno);
×
4055
    }
4056
  }
4057

4058
  // inverse iterator
4059
  CacheNextRowIter iter = {0};
7,899✔
4060
  code =
4061
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
7,899✔
4062
  TAOS_CHECK_GOTO(code, &lino, _err);
7,899✔
4063

4064
  do {
4065
    TSDBROW *pRow = NULL;
7,899✔
4066
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
7,899✔
4067

4068
    if (!pRow) {
7,899✔
4069
      break;
952✔
4070
    }
4071

4072
    hasRow = true;
6,947✔
4073

4074
    int32_t sversion = TSDBROW_SVERSION(pRow);
6,947✔
4075
    if (sversion != -1) {
6,947✔
4076
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
5,995✔
4077

4078
      pTSchema = pr->pCurrSchema;
5,995✔
4079
    }
4080
    // int16_t nCol = pTSchema->numOfCols;
4081

4082
    STsdbRowKey rowKey = {0};
6,947✔
4083
    tsdbRowGetKey(pRow, &rowKey);
6,947✔
4084

4085
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
39,948✔
4086
      if (iCol >= nLastCol) {
33,001✔
UNCOV
4087
        break;
×
4088
      }
4089
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
33,001✔
4090
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
33,001✔
UNCOV
4091
        continue;
×
4092
      }
4093
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
33,001✔
UNCOV
4094
        continue;
×
4095
      }
4096
      if (slotIds[iCol] == 0) {
33,001✔
4097
        STColumn *pTColumn = &pTSchema->columns[0];
6,947✔
4098
        SValue    val = {.type = pTColumn->type};
6,947✔
4099
        VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
6,947✔
4100
        *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
6,947✔
4101

4102
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
6,947✔
4103
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
6,947✔
4104

4105
        taosArraySet(pColArray, 0, &colTmp);
6,947✔
4106
        continue;
6,947✔
4107
      }
4108
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
26,054✔
4109

4110
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
26,054✔
4111
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
26,054✔
4112

4113
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
26,054✔
4114
      if (aColIndex >= 0) {
26,054✔
4115
        taosArrayRemove(aColArray, aColIndex);
26,054✔
4116
      }
4117
    }
4118

4119
    break;
6,947✔
4120
  } while (1);
4121

4122
  if (!hasRow) {
7,899✔
4123
    if (ignoreEarlierTs) {
952✔
UNCOV
4124
      taosArrayDestroy(pColArray);
×
4125
      pColArray = NULL;
×
4126
    } else {
4127
      taosArrayClear(pColArray);
952✔
4128
    }
4129
  }
4130
  *ppLastArray = pColArray;
7,899✔
4131

4132
  nextRowIterClose(&iter);
7,899✔
4133
  taosArrayDestroy(aColArray);
7,899✔
4134

4135
  TAOS_RETURN(code);
7,899✔
4136

UNCOV
4137
_err:
×
UNCOV
4138
  nextRowIterClose(&iter);
×
4139

UNCOV
4140
  *ppLastArray = NULL;
×
UNCOV
4141
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
UNCOV
4142
  taosArrayDestroy(aColArray);
×
4143

UNCOV
4144
  if (code) {
×
UNCOV
4145
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4146
              tstrerror(code));
4147
  }
4148

UNCOV
4149
  TAOS_RETURN(code);
×
4150
}
4151

UNCOV
4152
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
×
4153

4154
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
4,961✔
4155
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
4,961✔
4156
}
4,961✔
4157

4158
#ifdef BUILD_NO_CALL
4159
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
4160
#endif
4161

4162
size_t tsdbCacheGetUsage(SVnode *pVnode) {
113,909,232✔
4163
  size_t usage = 0;
113,909,232✔
4164
  if (pVnode->pTsdb != NULL) {
113,909,232✔
4165
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
113,909,232✔
4166
  }
4167

4168
  return usage;
113,909,232✔
4169
}
4170

4171
int32_t tsdbCacheGetElems(SVnode *pVnode) {
113,909,232✔
4172
  int32_t elems = 0;
113,909,232✔
4173
  if (pVnode->pTsdb != NULL) {
113,909,232✔
4174
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
113,909,232✔
4175
  }
4176

4177
  return elems;
113,909,232✔
4178
}
4179

4180
#ifdef USE_SHARED_STORAGE
4181
// block cache
UNCOV
4182
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
×
4183
  struct {
4184
    int32_t fid;
4185
    int64_t commitID;
4186
    int64_t blkno;
UNCOV
4187
  } bKey = {0};
×
4188

UNCOV
4189
  bKey.fid = fid;
×
UNCOV
4190
  bKey.commitID = commitID;
×
UNCOV
4191
  bKey.blkno = blkno;
×
4192

UNCOV
4193
  *len = sizeof(bKey);
×
UNCOV
4194
  memcpy(key, &bKey, *len);
×
UNCOV
4195
}
×
4196

UNCOV
4197
static int32_t tsdbCacheLoadBlockSs(STsdbFD *pFD, uint8_t **ppBlock) {
×
UNCOV
4198
  int32_t code = 0;
×
4199

UNCOV
4200
  int64_t block_size = tsSsBlockSize * pFD->szPage;
×
UNCOV
4201
  int64_t block_offset = (pFD->blkno - 1) * block_size;
×
4202

UNCOV
4203
  char *buf = taosMemoryMalloc(block_size);
×
UNCOV
4204
  if (buf == NULL) {
×
UNCOV
4205
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
4206
    goto _exit;
×
4207
  }
4208

4209
  // TODO: pFD->objName is not initialized, but this function is never called.
UNCOV
4210
  code = tssReadFileFromDefault(pFD->objName, block_offset, buf, &block_size);
×
UNCOV
4211
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
4212
    taosMemoryFree(buf);
×
4213
    goto _exit;
×
4214
  }
UNCOV
4215
  *ppBlock = buf;
×
4216

UNCOV
4217
_exit:
×
4218
  return code;
×
4219
}
4220

4221
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
×
4222
  (void)ud;
UNCOV
4223
  uint8_t *pBlock = (uint8_t *)value;
×
4224

4225
  taosMemoryFree(pBlock);
×
4226
}
×
4227

4228
int32_t tsdbCacheGetBlockSs(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
4229
  int32_t code = 0;
×
UNCOV
4230
  char    key[128] = {0};
×
4231
  int     keyLen = 0;
×
4232

UNCOV
4233
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
4234
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
4235
  if (!h) {
×
4236
    STsdb *pTsdb = pFD->pTsdb;
×
4237
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
4238

UNCOV
4239
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
UNCOV
4240
    if (!h) {
×
4241
      uint8_t *pBlock = NULL;
×
4242
      code = tsdbCacheLoadBlockSs(pFD, &pBlock);
×
4243
      //  if table's empty or error, return code of -1
4244
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
UNCOV
4245
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4246

UNCOV
4247
        *handle = NULL;
×
4248
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
4249
          code = TSDB_CODE_OUT_OF_MEMORY;
×
4250
        }
4251

4252
        TAOS_RETURN(code);
×
4253
      }
4254

UNCOV
4255
      size_t              charge = tsSsBlockSize * pFD->szPage;
×
4256
      _taos_lru_deleter_t deleter = deleteBCache;
×
4257
      LRUStatus           status =
UNCOV
4258
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
4259
      if (status != TAOS_LRU_STATUS_OK) {
4260
        // code = -1;
4261
      }
4262
    }
4263

4264
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4265
  }
4266

4267
  *handle = h;
×
4268

UNCOV
4269
  TAOS_RETURN(code);
×
4270
}
4271

4272
int32_t tsdbCacheGetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
×
4273
  if (!tsSsEnabled) {
×
UNCOV
4274
    return TSDB_CODE_OPS_NOT_SUPPORT;
×
4275
  }
4276

UNCOV
4277
  int32_t code = 0;
×
4278
  char    key[128] = {0};
×
4279
  int     keyLen = 0;
×
4280

UNCOV
4281
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
UNCOV
4282
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
×
4283

UNCOV
4284
  return code;
×
4285
}
4286

4287
void tsdbCacheSetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
×
UNCOV
4288
  if (!tsSsEnabled) {
×
4289
    return;
×
4290
  }
4291

UNCOV
4292
  char       key[128] = {0};
×
UNCOV
4293
  int        keyLen = 0;
×
UNCOV
4294
  LRUHandle *handle = NULL;
×
4295

UNCOV
4296
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
UNCOV
4297
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
×
4298
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
×
UNCOV
4299
  if (!handle) {
×
4300
    size_t              charge = pFD->szPage;
×
UNCOV
4301
    _taos_lru_deleter_t deleter = deleteBCache;
×
UNCOV
4302
    uint8_t            *pPg = taosMemoryMalloc(charge);
×
4303
    if (!pPg) {
×
4304
      (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4305

UNCOV
4306
      return;  // ignore error with ss cache and leave error untouched
×
4307
    }
4308
    memcpy(pPg, pPage, charge);
×
4309

4310
    LRUStatus status =
UNCOV
4311
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
×
4312
    if (status != TAOS_LRU_STATUS_OK) {
4313
      // ignore cache updating if not ok
4314
      // code = TSDB_CODE_OUT_OF_MEMORY;
4315
    }
4316
  }
UNCOV
4317
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4318

4319
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
4320
}
4321
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc