• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5051

13 May 2026 12:00PM UTC coverage: 73.358% (-0.04%) from 73.398%
#5051

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

714 existing lines in 146 files now uncovered.

281543 of 383795 relevant lines covered (73.36%)

135448694.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.05
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbIter.h"
19
#include "tsdbReadUtil.h"
20
#include "tss.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
70,796,629✔
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
70,796,629✔
27
    tsdbTrace(" release lru cache failed");
17,164,769✔
28
  }
29
}
70,831,685✔
30

31
#ifdef USE_SHARED_STORAGE
32

33
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
24,780✔
34
  int32_t code = 0, lino = 0;
24,780✔
35
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
24,780✔
36
  int64_t szBlock = tsSsBlockSize <= 1024 ? 1024 : tsSsBlockSize;
24,780✔
37

38
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsBlockCacheSize * szBlock * szPage, 0, .5);
24,780✔
39
  if (pCache == NULL) {
24,780✔
40
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
41
  }
42

43
  taosLRUCacheSetStrictCapacity(pCache, false);
24,780✔
44

45
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
24,780✔
46

47
  pTsdb->bCache = pCache;
24,780✔
48

49
_err:
24,780✔
50
  if (code) {
24,780✔
51
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
52
              tstrerror(code));
53
  }
54

55
  TAOS_RETURN(code);
24,780✔
56
}
57

58
static void tsdbCloseBCache(STsdb *pTsdb) {
24,780✔
59
  SLRUCache *pCache = pTsdb->bCache;
24,780✔
60
  if (pCache) {
24,780✔
61
    int32_t elems = taosLRUCacheGetElems(pCache);
24,780✔
62
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
24,780✔
63
    taosLRUCacheEraseUnrefEntries(pCache);
24,780✔
64
    elems = taosLRUCacheGetElems(pCache);
24,780✔
65
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
24,780✔
66

67
    taosLRUCacheCleanup(pCache);
24,780✔
68

69
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
24,780✔
70
  }
71
}
24,780✔
72

73
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
24,780✔
74
  int32_t code = 0, lino = 0;
24,780✔
75
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
24,780✔
76

77
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsPageCacheSize * szPage, 0, .5);
24,780✔
78
  if (pCache == NULL) {
24,780✔
79
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
80
  }
81

82
  taosLRUCacheSetStrictCapacity(pCache, false);
24,780✔
83

84
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
24,780✔
85

86
  pTsdb->pgCache = pCache;
24,780✔
87

88
_err:
24,780✔
89
  if (code) {
24,780✔
90
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
91
  }
92

93
  TAOS_RETURN(code);
24,780✔
94
}
95

96
static void tsdbClosePgCache(STsdb *pTsdb) {
24,780✔
97
  SLRUCache *pCache = pTsdb->pgCache;
24,780✔
98
  if (pCache) {
24,780✔
99
    int32_t elems = taosLRUCacheGetElems(pCache);
24,780✔
100
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
24,780✔
101
    taosLRUCacheEraseUnrefEntries(pCache);
24,780✔
102
    elems = taosLRUCacheGetElems(pCache);
24,780✔
103
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
24,780✔
104

105
    taosLRUCacheCleanup(pCache);
24,780✔
106

107
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
24,780✔
108
  }
109
}
24,780✔
110

111
#endif  // USE_SHARED_STORAGE
112

113
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
114

115
enum {
116
  LFLAG_LAST_ROW = 0,
117
  LFLAG_LAST = 1,
118
};
119

120
typedef struct {
121
  tb_uid_t uid;
122
  int16_t  cid;
123
  int8_t   lflag;
124
} SLastKey;
125

126
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
127
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
128

129
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
5,012,869✔
130
  SVnode *pVnode = pTsdb->pVnode;
5,012,869✔
131
  vnodeGetPrimaryPath(pVnode, false, path, TSDB_FILENAME_LEN);
5,012,377✔
132

133
  int32_t offset = strlen(path);
5,011,372✔
134
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%scache.rdb", TD_DIRSEP, pTsdb->name, TD_DIRSEP);
5,011,372✔
135
}
5,011,383✔
136

137
static const char *myCmpName(void *state) {
26,466,530✔
138
  (void)state;
139
  return "myCmp";
26,466,530✔
140
}
141

142
static void myCmpDestroy(void *state) { (void)state; }
5,014,031✔
143

144
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
1,311,154,672✔
145
  (void)state;
146
  (void)alen;
147
  (void)blen;
148
  SLastKey *lhs = (SLastKey *)a;
1,311,154,672✔
149
  SLastKey *rhs = (SLastKey *)b;
1,311,154,672✔
150

151
  if (lhs->uid < rhs->uid) {
1,311,154,672✔
152
    return -1;
795,946,378✔
153
  } else if (lhs->uid > rhs->uid) {
534,085,868✔
154
    return 1;
238,788,563✔
155
  }
156

157
  if (lhs->cid < rhs->cid) {
297,529,584✔
158
    return -1;
107,453,145✔
159
  } else if (lhs->cid > rhs->cid) {
190,424,314✔
160
    return 1;
71,769,734✔
161
  }
162

163
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
118,763,179✔
164
    return -1;
44,056,565✔
165
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
74,753,626✔
166
    return 1;
71,706,180✔
167
  }
168

169
  return 0;
3,049,268✔
170
}
171

172
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
5,013,909✔
173
  int32_t code = 0, lino = 0;
5,013,909✔
174
#ifdef USE_ROCKSDB
175
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
5,014,031✔
176
  if (NULL == cmp) {
5,014,031✔
177
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
178
  }
179

180
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
5,014,031✔
181
  pTsdb->rCache.tableoptions = tableoptions;
5,013,508✔
182

183
  rocksdb_options_t *options = rocksdb_options_create();
5,013,626✔
184
  if (NULL == options) {
5,013,728✔
185
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
186
  }
187

188
  rocksdb_options_set_create_if_missing(options, 1);
5,013,728✔
189
  rocksdb_options_set_comparator(options, cmp);
5,012,525✔
190
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
5,012,478✔
191
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
5,012,890✔
192
  // rocksdb_options_set_inplace_update_support(options, 1);
193
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
194

195
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
5,014,010✔
196
  if (NULL == writeoptions) {
5,012,813✔
197
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
198
  }
199
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
5,012,813✔
200

201
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
5,014,010✔
202
  if (NULL == readoptions) {
5,009,072✔
203
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
204
  }
205

206
  char *err = NULL;
5,009,072✔
207
  char  cachePath[TSDB_FILENAME_LEN] = {0};
5,010,928✔
208
  tsdbGetRocksPath(pTsdb, cachePath);
5,010,497✔
209

210
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
5,014,010✔
211
  if (NULL == db) {
5,013,847✔
212
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
213
    rocksdb_free(err);
×
214

215
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
216
  }
217

218
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
5,013,847✔
219
  if (NULL == flushoptions) {
5,013,403✔
220
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
221
  }
222

223
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
5,013,403✔
224

225
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
5,014,031✔
226

227
  pTsdb->rCache.writebatch = writebatch;
5,013,403✔
228
  pTsdb->rCache.my_comparator = cmp;
5,013,403✔
229
  pTsdb->rCache.options = options;
5,013,626✔
230
  pTsdb->rCache.writeoptions = writeoptions;
5,012,998✔
231
  pTsdb->rCache.readoptions = readoptions;
5,013,847✔
232
  pTsdb->rCache.flushoptions = flushoptions;
5,013,626✔
233
  pTsdb->rCache.db = db;
5,013,442✔
234
  pTsdb->rCache.sver = -1;
5,013,442✔
235
  pTsdb->rCache.suid = -1;
5,013,847✔
236
  pTsdb->rCache.uid = -1;
5,013,403✔
237
  pTsdb->rCache.pTSchema = NULL;
5,013,847✔
238
  pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
5,012,197✔
239
  if (!pTsdb->rCache.ctxArray) {
5,014,031✔
240
    TAOS_CHECK_GOTO(terrno, &lino, _err7);
×
241
  }
242

243
  TAOS_RETURN(code);
5,013,246✔
244

245
_err7:
×
246
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
×
247
_err6:
×
248
  rocksdb_writebatch_destroy(writebatch);
×
249
_err5:
×
250
  rocksdb_close(pTsdb->rCache.db);
×
251
_err4:
×
252
  rocksdb_readoptions_destroy(readoptions);
×
253
_err3:
×
254
  rocksdb_writeoptions_destroy(writeoptions);
×
255
_err2:
×
256
  rocksdb_options_destroy(options);
×
257
  rocksdb_block_based_options_destroy(tableoptions);
×
258
_err:
×
259
  rocksdb_comparator_destroy(cmp);
×
260
#endif
261
  TAOS_RETURN(code);
×
262
}
263

264
static void tsdbCloseRocksCache(STsdb *pTsdb) {
5,012,816✔
265
#ifdef USE_ROCKSDB
266
  rocksdb_close(pTsdb->rCache.db);
5,012,816✔
267
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
5,014,031✔
268
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
5,014,031✔
269
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
5,014,031✔
270
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
5,013,129✔
271
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
5,014,031✔
272
  rocksdb_options_destroy(pTsdb->rCache.options);
5,013,129✔
273
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
5,014,031✔
274
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
5,013,235✔
275
  taosMemoryFree(pTsdb->rCache.pTSchema);
5,014,031✔
276
  taosArrayDestroy(pTsdb->rCache.ctxArray);
5,014,031✔
277
#endif
278
}
5,014,031✔
279

280
static void rocksMayWrite(STsdb *pTsdb, bool force) {
64,740,843✔
281
#ifdef USE_ROCKSDB
282
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
64,740,843✔
283

284
  int count = rocksdb_writebatch_count(wb);
64,741,935✔
285
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
64,753,952✔
286
    char *err = NULL;
223,724✔
287

288
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
223,724✔
289
    if (NULL != err) {
223,724✔
290
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
291
                err);
292
      rocksdb_free(err);
×
293
    }
294

295
    rocksdb_writebatch_clear(wb);
223,724✔
296
  }
297
#endif
298
}
64,753,952✔
299

300
typedef struct {
301
  TSKEY  ts;
302
  int8_t dirty;
303
  struct {
304
    int16_t cid;
305
    int8_t  type;
306
    int8_t  flag;
307
    union {
308
      int64_t val;
309
      struct {
310
        uint32_t nData;
311
        uint8_t *pData;
312
      };
313
    } value;
314
  } colVal;
315
} SLastColV0;
316

317
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
216,516✔
318
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
216,516✔
319

320
  pLastCol->rowKey.ts = pLastColV0->ts;
216,516✔
321
  pLastCol->rowKey.numOfPKs = 0;
216,516✔
322
  pLastCol->dirty = pLastColV0->dirty;
216,516✔
323
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
216,516✔
324
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
216,516✔
325
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
216,516✔
326

327
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
217,010✔
328

329
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
217,010✔
330
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
13,302✔
331
    pLastCol->colVal.value.pData = NULL;
13,302✔
332
    if (pLastCol->colVal.value.nData > 0) {
13,302✔
333
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
4,940✔
334
    }
335
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
13,302✔
336
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
203,708✔
337
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
50,080✔
338
    pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
50,080✔
339
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
50,080✔
340
  } else {
341
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
153,628✔
342
    return sizeof(SLastColV0);
153,628✔
343
  }
344
}
345

346
int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
217,010✔
347
  if (!value) {
217,010✔
348
    return TSDB_CODE_INVALID_PARA;
×
349
  }
350

351
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
217,010✔
352
  if (NULL == pLastCol) {
216,516✔
353
    return terrno;
×
354
  }
355

356
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
216,516✔
357
  if (offset == size) {
217,010✔
358
    // version 0
359
    *ppLastCol = pLastCol;
×
360

361
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
362
  } else if (offset > size) {
217,010✔
363
    taosMemoryFreeClear(pLastCol);
×
364

365
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
366
  }
367

368
  // version - validate before read
369
  if (offset + sizeof(int8_t) > size) {
217,010✔
370
    taosMemoryFreeClear(pLastCol);
×
371
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
372
  }
373
  int8_t version = *(int8_t *)(value + offset);
217,010✔
374
  offset += sizeof(int8_t);
217,010✔
375

376
  // numOfPKs - validate before read
377
  if (offset + sizeof(uint8_t) > size) {
217,010✔
378
    taosMemoryFreeClear(pLastCol);
×
379
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
380
  }
381
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
217,010✔
382
  offset += sizeof(uint8_t);
217,010✔
383

384
  if (pLastCol->rowKey.numOfPKs > TD_MAX_PK_COLS) {
217,010✔
385
    taosMemoryFreeClear(pLastCol);
×
386
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
387
  }
388

389
  // pks
390
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
217,010✔
391
    // validate before reading SValue
392
    if (offset + sizeof(SValue) > size) {
×
393
      taosMemoryFreeClear(pLastCol);
×
394
      TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
395
    }
396
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
×
397
    offset += sizeof(SValue);
×
398

399
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
400
      pLastCol->rowKey.pks[i].pData = NULL;
×
401
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
402
        // validate before reading variable-length payload
403
        if (offset + pLastCol->rowKey.pks[i].nData > size) {
×
404
          taosMemoryFreeClear(pLastCol);
×
405
          TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
406
        }
407
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
408
        offset += pLastCol->rowKey.pks[i].nData;
×
409
      }
410
    }
411
  }
412

413
  if (version >= LAST_COL_VERSION_2) {
217,010✔
414
    // validate before reading cacheStatus
415
    if (offset + sizeof(uint8_t) > size) {
217,010✔
416
      taosMemoryFreeClear(pLastCol);
×
417
      TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
418
    }
419
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
217,010✔
420
    offset += sizeof(uint8_t);
217,010✔
421
  }
422

423
  // Final validation
424
  if (offset > size) {
217,010✔
425
    taosMemoryFreeClear(pLastCol);
×
426
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
427
  }
428

429
  *ppLastCol = pLastCol;
217,010✔
430

431
  TAOS_RETURN(TSDB_CODE_SUCCESS);
217,010✔
432
}
433

434
/*
435
typedef struct {
436
  SLastColV0 lastColV0;
437
  char       colData[];
438
  int8_t     version;
439
  uint8_t    numOfPKs;
440
  SValue     pks[0];
441
  char       pk0Data[];
442
  SValue     pks[1];
443
  char       pk1Data[];
444
  ...
445
} SLastColDisk;
446
*/
447
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
59,615,348✔
448
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
59,615,348✔
449

450
  pLastColV0->ts = pLastCol->rowKey.ts;
59,615,348✔
451
  pLastColV0->dirty = pLastCol->dirty;
59,615,348✔
452
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
59,613,892✔
453
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
59,616,079✔
454
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
59,648,483✔
455
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
59,646,661✔
456
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
1,104,792✔
457
    if (pLastCol->colVal.value.nData > 0) {
1,105,884✔
458
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
521,787✔
459
    }
460
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
1,105,884✔
461
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
58,543,693✔
462
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
645,494✔
463
    if (pLastCol->colVal.value.nData > 0) {
645,494✔
464
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
330,435✔
465
    }
466
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
645,494✔
467
  } else {
468
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
57,897,471✔
469
    return sizeof(SLastColV0);
57,898,564✔
470
  }
471

472
  return 0;
473
}
474

475
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
59,648,838✔
476
  *size = sizeof(SLastColV0);
59,648,838✔
477
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
59,649,567✔
478
    *size += pLastCol->colVal.value.nData;
1,104,788✔
479
  }
480
  if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
59,652,848✔
481
    *size += DECIMAL128_BYTES;
645,494✔
482
  }
483
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
59,652,483✔
484

485
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
61,820,207✔
486
    *size += sizeof(SValue);
2,167,724✔
487
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
2,167,724✔
488
      *size += pLastCol->rowKey.pks[i].nData;
716,428✔
489
    }
490
  }
491

492
  *value = taosMemoryMalloc(*size);
59,649,206✔
493
  if (NULL == *value) {
59,620,811✔
494
    TAOS_RETURN(terrno);
×
495
  }
496

497
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
59,613,529✔
498

499
  // version
500
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
59,648,118✔
501
  offset++;
59,648,482✔
502

503
  // numOfPKs
504
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
59,648,482✔
505
  offset++;
59,645,470✔
506

507
  // pks
508
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
61,813,194✔
509
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
2,167,724✔
510
    offset += sizeof(SValue);
2,167,724✔
511
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
2,167,724✔
512
      if (pLastCol->rowKey.pks[i].nData > 0) {
716,428✔
513
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
716,428✔
514
      }
515
      offset += pLastCol->rowKey.pks[i].nData;
716,428✔
516
    }
517
  }
518

519
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
59,643,650✔
520

521
  TAOS_RETURN(TSDB_CODE_SUCCESS);
59,644,378✔
522
}
523

524
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
525

526
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
63,348,841✔
527
  SLastCol *pLastCol = (SLastCol *)value;
63,348,841✔
528

529
  if (pLastCol->dirty) {
63,348,841✔
530
    STsdb *pTsdb = (STsdb *)ud;
57,156,356✔
531

532
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
57,156,356✔
533
    if (code) {
57,129,678✔
534
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
535
      return code;
×
536
    }
537

538
    pLastCol->dirty = 0;
57,129,678✔
539

540
    rocksMayWrite(pTsdb, false);
57,130,405✔
541
  }
542

543
  return 0;
63,365,588✔
544
}
545

546
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
22,558,007✔
547
  bool deleted = false;
22,558,007✔
548
  while (*iSkyline > 0) {
22,558,007✔
549
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
33,164✔
550
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
33,164✔
551

552
    if (key->ts > pItemBack->ts) {
33,164✔
553
      return false;
8,484✔
554
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
24,680✔
555
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
9,444✔
556
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
557
        return true;
9,444✔
558
      } else {
559
        if (*iSkyline > 1) {
×
560
          --*iSkyline;
×
561
        } else {
562
          return false;
×
563
        }
564
      }
565
    } else {
566
      if (*iSkyline > 1) {
15,236✔
567
        --*iSkyline;
×
568
      } else {
569
        return false;
15,236✔
570
      }
571
    }
572
  }
573

574
  return deleted;
22,524,113✔
575
}
576

577
// Get next non-deleted row from imem
578
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
12,190,499✔
579
  int32_t code = 0;
12,190,499✔
580

581
  if (tsdbTbDataIterNext(pTbIter)) {
12,190,499✔
582
    TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
12,105,899✔
583
    TSDBKEY  rowKey = TSDBROW_KEY(pMemRow);
12,105,899✔
584
    bool     deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
12,106,992✔
585
    if (!deleted) {
12,105,899✔
586
      return pMemRow;
12,105,899✔
587
    }
588
  }
589

590
  return NULL;
83,872✔
591
}
592

593
// Get first non-deleted row from imem
594
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
8,793,592✔
595
                                    int64_t *piSkyline) {
596
  int32_t code = 0;
8,793,592✔
597

598
  tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
8,793,592✔
599
  TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
8,793,228✔
600
  if (pMemRow) {
8,793,228✔
601
    // if non deleted, return the found row.
602
    TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
8,793,228✔
603
    bool    deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
8,793,957✔
604
    if (!deleted) {
8,792,863✔
605
      return pMemRow;
8,790,287✔
606
    }
607
  } else {
608
    return NULL;
×
609
  }
610

611
  // continue to find the non-deleted first row from imem, using get next row
612
  return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
2,940✔
613
}
614

615
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
63,824✔
616
  SRocksCache *pRCache = &pTsdb->rCache;
63,824✔
617
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
63,824✔
618

619
  if (suid > 0 && suid == pRCache->suid) {
3,304✔
620
    pRCache->sver = -1;
×
621
    pRCache->suid = -1;
×
622
  }
623
  if (suid == 0 && uid == pRCache->uid) {
3,304✔
624
    pRCache->sver = -1;
826✔
625
    pRCache->uid = -1;
826✔
626
  }
627
}
628

629
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
12,188,287✔
630
  SRocksCache *pRCache = &pTsdb->rCache;
12,188,287✔
631
  if (pRCache->pTSchema && sver == pRCache->sver) {
12,188,287✔
632
    if (suid > 0 && suid == pRCache->suid) {
12,138,589✔
633
      return 0;
11,716,585✔
634
    }
635
    if (suid == 0 && uid == pRCache->uid) {
422,004✔
636
      return 0;
309,610✔
637
    }
638
  }
639

640
  pRCache->suid = suid;
162,092✔
641
  pRCache->uid = uid;
162,457✔
642
  pRCache->sver = sver;
162,457✔
643
  tDestroyTSchema(pRCache->pTSchema);
162,457✔
644
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema);
162,457✔
645
}
646

647
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
648

649
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
8,793,958✔
650
  int32_t      code = 0;
8,793,958✔
651
  int32_t      lino = 0;
8,793,958✔
652
  STsdb       *pTsdb = imem->pTsdb;
8,793,593✔
653
  SArray      *pMemDelData = NULL;
8,793,593✔
654
  SArray      *pSkyline = NULL;
8,793,593✔
655
  int64_t      iSkyline = 0;
8,793,593✔
656
  STbDataIter  tbIter = {0};
8,793,593✔
657
  TSDBROW     *pMemRow = NULL;
8,793,593✔
658
  STSchema    *pTSchema = NULL;
8,793,593✔
659
  SSHashObj   *iColHash = NULL;
8,793,593✔
660
  int32_t      sver;
661
  int32_t      nCol;
662
  SArray      *ctxArray = pTsdb->rCache.ctxArray;
8,793,593✔
663
  STsdbRowKey  tsdbRowKey = {0};
8,793,228✔
664
  STSDBRowIter iter = {0};
8,793,593✔
665

666
  STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
8,792,498✔
667

668
  // load imem tomb data and build skyline
669
  TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
8,793,592✔
670

671
  // tsdbBuildDeleteSkyline
672
  size_t delSize = TARRAY_SIZE(pMemDelData);
8,794,322✔
673
  if (delSize > 0) {
8,794,322✔
674
    pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
4,188✔
675
    if (!pSkyline) {
4,188✔
676
      TAOS_CHECK_EXIT(terrno);
×
677
    }
678

679
    TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline));
4,188✔
680
    iSkyline = taosArrayGetSize(pSkyline) - 1;
4,188✔
681
  }
682

683
  pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
8,794,322✔
684
  if (!pMemRow) {
8,791,402✔
685
    goto _exit;
×
686
  }
687

688
  // iter first row to last_row/last col values to ctxArray, and mark last null col ids
689
  sver = TSDBROW_SVERSION(pMemRow);
8,791,402✔
690
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
8,793,592✔
691
  pTSchema = pTsdb->rCache.pTSchema;
8,793,957✔
692
  nCol = pTSchema->numOfCols;
8,793,957✔
693

694
  tsdbRowGetKey(pMemRow, &tsdbRowKey);
8,793,592✔
695

696
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
8,793,228✔
697

698
  int32_t iCol = 0;
8,792,863✔
699
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
37,007,782✔
700
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
28,214,920✔
701
    if (!taosArrayPush(ctxArray, &updateCtx)) {
28,213,465✔
702
      TAOS_CHECK_EXIT(terrno);
×
703
    }
704

705
    if (COL_VAL_IS_VALUE(pColVal)) {
28,213,465✔
706
      updateCtx.lflag = LFLAG_LAST;
26,749,584✔
707
      if (!taosArrayPush(ctxArray, &updateCtx)) {
26,750,310✔
708
        TAOS_CHECK_EXIT(terrno);
×
709
      }
710
    } else {
711
      if (!iColHash) {
1,465,341✔
712
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
81,368✔
713
        if (iColHash == NULL) {
81,368✔
714
          TAOS_CHECK_EXIT(terrno);
×
715
        }
716
      }
717

718
      if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
1,465,341✔
719
        TAOS_CHECK_EXIT(terrno);
×
720
      }
721
    }
722
  }
723
  tsdbRowClose(&iter);
8,792,137✔
724

725
  // continue to get next row to fill null last col values
726
  pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
8,792,864✔
727
  while (pMemRow) {
12,186,831✔
728
    if (tSimpleHashGetSize(iColHash) == 0) {
12,102,959✔
729
      break;
8,708,264✔
730
    }
731

732
    sver = TSDBROW_SVERSION(pMemRow);
3,394,695✔
733
    TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
3,394,695✔
734
    pTSchema = pTsdb->rCache.pTSchema;
3,394,695✔
735

736
    STsdbRowKey tsdbRowKey = {0};
3,394,695✔
737
    tsdbRowGetKey(pMemRow, &tsdbRowKey);
3,394,695✔
738

739
    TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
3,394,695✔
740

741
    int32_t iCol = 0;
3,393,964✔
742
    for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
89,866,772✔
743
      if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
86,537,136✔
744
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
858,032✔
745
        if (!taosArrayPush(ctxArray, &updateCtx)) {
858,032✔
746
          TAOS_CHECK_EXIT(terrno);
×
747
        }
748

749
        TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
858,032✔
750
      }
751
    }
752
    tsdbRowClose(&iter);
3,384,461✔
753

754
    pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
3,394,695✔
755
  }
756

757
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
8,792,136✔
758

759
_exit:
8,791,403✔
760
  if (code) {
8,791,403✔
761
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
762

763
    tsdbRowClose(&iter);
×
764
  }
765

766
  taosArrayClear(ctxArray);
8,791,403✔
767
  // destroy any allocated resource
768
  tSimpleHashCleanup(iColHash);
8,791,768✔
769
  if (pMemDelData) {
8,792,862✔
770
    taosArrayDestroy(pMemDelData);
8,792,497✔
771
  }
772
  if (pSkyline) {
8,793,958✔
773
    taosArrayDestroy(pSkyline);
4,188✔
774
  }
775

776
  TAOS_RETURN(code);
8,793,958✔
777
}
778

779
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
79,589✔
780
  if (!pTsdb) return 0;
79,589✔
781
  if (!pTsdb->imem) return 0;
79,589✔
782

783
  int32_t    code = 0;
63,093✔
784
  int32_t    lino = 0;
63,093✔
785
  SMemTable *imem = pTsdb->imem;
63,093✔
786
  int32_t    nTbData = imem->nTbData;
63,093✔
787
  int64_t    nRow = imem->nRow;
63,093✔
788
  int64_t    nDel = imem->nDel;
63,093✔
789

790
  if (nRow == 0 || nTbData == 0) return 0;
63,093✔
791

792
  TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
63,093✔
793

794
_exit:
63,093✔
795
  if (code) {
63,093✔
796
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
797
  } else {
798
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
63,093✔
799
  }
800

801
  TAOS_RETURN(code);
63,093✔
802
}
803

804
int32_t tsdbCacheCommit(STsdb *pTsdb) {
79,589✔
805
  int32_t code = 0;
79,589✔
806

807
  // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
808
  // flush dirty data of lru into rocks
809
  // 4, and update when writing if !updateCacheBatch
810
  // 5, merge cache & mem if updateCacheBatch
811

812
  if (tsUpdateCacheBatch) {
79,589✔
813
    code = tsdbCacheUpdateFromIMem(pTsdb);
79,589✔
814
    if (code) {
79,589✔
815
      tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
816

817
      TAOS_RETURN(code);
×
818
    }
819
  }
820

821
  char      *err = NULL;
79,589✔
822
  SLRUCache *pCache = pTsdb->lruCache;
79,589✔
823
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
824

825
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
79,589✔
826

827
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
79,589✔
828

829
#ifdef USE_ROCKSDB
830
  rocksMayWrite(pTsdb, true);
79,589✔
831
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
79,589✔
832
#endif
833
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
79,589✔
834
#ifdef USE_ROCKSDB
835
  if (NULL != err) {
79,589✔
836
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
837
    rocksdb_free(err);
×
838
    code = TSDB_CODE_FAILED;
×
839
  }
840
#endif
841
  TAOS_RETURN(code);
79,589✔
842
}
843

844
static int32_t reallocVarDataVal(SValue *pValue) {
3,785,358✔
845
  if (IS_VAR_DATA_TYPE(pValue->type)) {
3,785,358✔
846
    uint8_t *pVal = pValue->pData;
3,785,358✔
847
    uint32_t nData = pValue->nData;
3,785,358✔
848
    if (nData > 0) {
3,785,358✔
849
      uint8_t *p = taosMemoryMalloc(nData);
2,659,097✔
850
      if (!p) {
2,659,097✔
851
        TAOS_RETURN(terrno);
×
852
      }
853
      pValue->pData = p;
2,659,097✔
854
      (void)memcpy(pValue->pData, pVal, nData);
2,659,097✔
855
    } else {
856
      pValue->pData = NULL;
1,126,261✔
857
    }
858
  }
859

860
  TAOS_RETURN(TSDB_CODE_SUCCESS);
3,785,358✔
861
}
862

863
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
2,343,682✔
864

865
// realloc pk data and col data.
866
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
120,317,046✔
867
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
120,317,046✔
868
  size_t  charge = sizeof(SLastCol);
120,317,046✔
869

870
  int8_t i = 0;
120,317,046✔
871
  for (; i < pCol->rowKey.numOfPKs; i++) {
124,510,772✔
872
    SValue *pValue = &pCol->rowKey.pks[i];
4,193,726✔
873
    if (IS_VAR_DATA_TYPE(pValue->type)) {
4,193,726✔
874
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
1,441,676✔
875
      charge += pValue->nData;
1,441,676✔
876
    }
877
  }
878

879
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
120,315,956✔
880
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
2,328,259✔
881
    charge += pCol->colVal.value.nData;
2,343,682✔
882
  }
883

884
  if (pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
120,336,222✔
885
    if (pCol->colVal.value.nData > 0) {
919,108✔
886
      void *p = taosMemoryMalloc(pCol->colVal.value.nData);
442,621✔
887
      if (!p) TAOS_CHECK_EXIT(terrno);
442,621✔
888
      (void)memcpy(p, pCol->colVal.value.pData, pCol->colVal.value.nData);
442,621✔
889
      pCol->colVal.value.pData = p;
442,621✔
890
    }else {
891
      pCol->colVal.value.pData = NULL;
476,487✔
892
    }
893
    charge += pCol->colVal.value.nData;
919,108✔
894
  }
895

896
  if (pCharge) {
120,330,438✔
897
    *pCharge = charge;
111,965,586✔
898
  }
899

900
_exit:
8,364,852✔
901
  if (TSDB_CODE_SUCCESS != code) {
120,331,540✔
902
    for (int8_t j = 0; j < i; j++) {
×
903
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
904
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
905
      }
906
    }
907

908
    (void)memset(pCol, 0, sizeof(SLastCol));
×
909
  }
910

911
  TAOS_RETURN(code);
120,331,540✔
912
}
913

914
void tsdbCacheFreeSLastColItem(void *pItem) {
9,034,275✔
915
  SLastCol *pCol = (SLastCol *)pItem;
9,034,275✔
916
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
11,319,271✔
917
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
2,284,996✔
918
      taosMemoryFree(pCol->rowKey.pks[i].pData);
848,088✔
919
    }
920
  }
921

922
  if ((IS_VAR_DATA_TYPE(pCol->colVal.value.type) || pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) &&
9,034,284✔
923
      pCol->colVal.value.pData) {
1,315,991✔
924
    taosMemoryFree(pCol->colVal.value.pData);
912,838✔
925
  }
926
}
9,035,749✔
927

928
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
111,916,664✔
929
  SLastCol *pLastCol = (SLastCol *)value;
111,916,664✔
930

931
  if (pLastCol->dirty) {
111,916,664✔
932
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
2,458,164✔
933
      STsdb *pTsdb = (STsdb *)ud;
×
934
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
935
    }
936
  }
937

938
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
114,082,560✔
939
    SValue *pValue = &pLastCol->rowKey.pks[i];
2,167,724✔
940
    if (IS_VAR_DATA_TYPE(pValue->type)) {
2,167,724✔
941
      taosMemoryFree(pValue->pData);
716,428✔
942
    }
943
  }
944

945
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) ||
111,884,541✔
946
      pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL /* && pLastCol->colVal.value.nData > 0*/) {
110,598,468✔
947
    taosMemoryFree(pLastCol->colVal.value.pData);
2,058,322✔
948
  }
949

950
  taosMemoryFree(value);
111,894,766✔
951
}
111,621,437✔
952

953
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
53,480,610✔
954
  SLastCol *pLastCol = (SLastCol *)value;
53,480,610✔
955
  pLastCol->dirty = 0;
53,480,610✔
956
}
53,507,619✔
957

958
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
959

960
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
55,886,500✔
961
  int32_t code = 0, lino = 0;
55,886,500✔
962

963
  SLRUCache *pCache = pTsdb->lruCache;
55,886,500✔
964
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
965
  SRowKey  emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
55,888,396✔
966
  SLastCol emptyCol = {
55,888,396✔
967
      .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
968

969
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
55,887,008✔
970
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
55,884,566✔
971
  if (code) {
55,889,363✔
972
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
973
  }
974

975
  TAOS_RETURN(code);
55,889,363✔
976
}
977

978
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
1,089,626✔
979
  int32_t code = 0;
1,089,626✔
980
  char   *err = NULL;
1,089,626✔
981

982
  SLRUCache *pCache = pTsdb->lruCache;
1,090,478✔
983
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
984

985
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
1,089,073✔
986
#ifdef USE_ROCKSDB
987
  rocksMayWrite(pTsdb, true);
1,091,231✔
988
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
1,091,231✔
989
  if (NULL != err) {
1,091,231✔
990
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
991
    rocksdb_free(err);
×
992
    code = TSDB_CODE_FAILED;
×
993
  }
994
#endif
995
  TAOS_RETURN(code);
1,091,231✔
996
}
997

998
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
9,942,059✔
999
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
1000
#ifdef USE_ROCKSDB
1001
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
9,942,059✔
1002
  if (!valuesList) return terrno;
9,941,747✔
1003
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
9,941,747✔
1004
  if (!valuesListSizes) {
9,942,366✔
1005
    taosMemoryFreeClear(valuesList);
×
1006
    return terrno;
×
1007
  }
1008
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
9,942,366✔
1009
  if (!errs) {
9,940,458✔
1010
    taosMemoryFreeClear(valuesList);
×
1011
    taosMemoryFreeClear(valuesListSizes);
×
1012
    return terrno;
×
1013
  }
1014
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
9,940,458✔
1015
                    valuesListSizes, errs);
1016
  for (size_t i = 0; i < numKeys; ++i) {
44,479,505✔
1017
    rocksdb_free(errs[i]);
34,537,806✔
1018
  }
1019
  taosMemoryFreeClear(errs);
9,941,699✔
1020

1021
  *pppValuesList = valuesList;
9,939,869✔
1022
  *ppValuesListSizes = valuesListSizes;
9,940,045✔
1023
#endif
1024
  TAOS_RETURN(TSDB_CODE_SUCCESS);
9,940,414✔
1025
}
1026

1027
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
7,264,591✔
1028
  int32_t code = 0;
7,264,591✔
1029

1030
  // build keys & multi get from rocks
1031
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
7,264,591✔
1032
  if (!keys_list) {
7,264,222✔
1033
    return terrno;
×
1034
  }
1035
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
7,264,222✔
1036
  if (!keys_list_sizes) {
7,262,683✔
1037
    taosMemoryFree(keys_list);
×
1038
    return terrno;
×
1039
  }
1040
  const size_t klen = ROCKS_KEY_LEN;
7,262,683✔
1041

1042
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
7,262,683✔
1043
  if (!keys) {
7,262,439✔
1044
    taosMemoryFree(keys_list);
×
1045
    taosMemoryFree(keys_list_sizes);
×
1046
    return terrno;
×
1047
  }
1048
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
7,262,439✔
1049
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
7,263,240✔
1050

1051
  keys_list[0] = keys;
7,262,808✔
1052
  keys_list[1] = keys + sizeof(SLastKey);
7,263,240✔
1053
  keys_list_sizes[0] = klen;
7,263,240✔
1054
  keys_list_sizes[1] = klen;
7,264,222✔
1055

1056
  char  **values_list = NULL;
7,263,546✔
1057
  size_t *values_list_sizes = NULL;
7,263,546✔
1058

1059
  // was written by caller
1060
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
1061

1062
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
7,263,546✔
1063
                                              &values_list_sizes),
1064
                  NULL, _exit);
1065
#ifdef USE_ROCKSDB
1066
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
7,263,924✔
1067
#endif
1068
  {
1069
#ifdef USE_ROCKSDB
1070
    SLastCol *pLastCol = NULL;
7,263,433✔
1071
    if (values_list[0] != NULL) {
7,263,433✔
1072
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
77,590✔
1073
      if (code != TSDB_CODE_SUCCESS) {
77,590✔
1074
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1075
                  tstrerror(code));
1076
        goto _exit;
×
1077
      }
1078
      if (NULL != pLastCol) {
77,590✔
1079
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
77,590✔
1080
      }
1081
      taosMemoryFreeClear(pLastCol);
77,590✔
1082
    }
1083

1084
    pLastCol = NULL;
7,263,433✔
1085
    if (values_list[1] != NULL) {
7,263,433✔
1086
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
77,590✔
1087
      if (code != TSDB_CODE_SUCCESS) {
77,590✔
1088
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1089
                  tstrerror(code));
1090
        goto _exit;
×
1091
      }
1092
      if (NULL != pLastCol) {
77,590✔
1093
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
77,590✔
1094
      }
1095
      taosMemoryFreeClear(pLastCol);
77,590✔
1096
    }
1097

1098
    rocksdb_free(values_list[0]);
7,263,433✔
1099
    rocksdb_free(values_list[1]);
7,264,568✔
1100
#endif
1101

1102
    for (int i = 0; i < 2; i++) {
21,788,042✔
1103
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
14,523,451✔
1104
      if (h) {
14,529,182✔
1105
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
155,180✔
1106
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
155,180✔
1107
      }
1108
    }
1109
  }
1110

1111
_exit:
7,264,591✔
1112
  taosMemoryFree(keys_list[0]);
7,264,591✔
1113

1114
  taosMemoryFree(keys_list);
7,263,484✔
1115
  taosMemoryFree(keys_list_sizes);
7,263,853✔
1116
  taosMemoryFree(values_list);
7,263,362✔
1117
  taosMemoryFree(values_list_sizes);
7,263,802✔
1118

1119
  TAOS_RETURN(code);
7,264,100✔
1120
}
1121

1122
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
9,012,038✔
1123
  int32_t code = 0;
9,012,038✔
1124

1125
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
9,012,038✔
1126

1127
  if (suid < 0) {
9,011,421✔
1128
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
128,385✔
1129
      int16_t cid = pSchemaRow->pSchema[i].colId;
110,461✔
1130
      int8_t  col_type = pSchemaRow->pSchema[i].type;
110,461✔
1131

1132
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
110,461✔
1133
      if (code != TSDB_CODE_SUCCESS) {
110,461✔
1134
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1135
                  tstrerror(code));
1136
      }
1137
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
110,461✔
1138
      if (code != TSDB_CODE_SUCCESS) {
110,461✔
1139
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1140
                  tstrerror(code));
1141
      }
1142
    }
1143
  } else {
1144
    STSchema *pTSchema = NULL;
8,993,497✔
1145
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
8,993,560✔
1146
    if (code != TSDB_CODE_SUCCESS) {
8,992,518✔
1147
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1148

1149
      TAOS_RETURN(code);
×
1150
    }
1151

1152
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
36,728,076✔
1153
      int16_t cid = pTSchema->columns[i].colId;
27,735,010✔
1154
      int8_t  col_type = pTSchema->columns[i].type;
27,733,477✔
1155

1156
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
27,733,974✔
1157
      if (code != TSDB_CODE_SUCCESS) {
27,732,620✔
1158
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1159
                  tstrerror(code));
1160
      }
1161
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
27,732,620✔
1162
      if (code != TSDB_CODE_SUCCESS) {
27,735,558✔
1163
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1164
                  tstrerror(code));
1165
      }
1166
    }
1167

1168
    taosMemoryFree(pTSchema);
8,993,557✔
1169
  }
1170

1171
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
9,012,038✔
1172

1173
  TAOS_RETURN(code);
9,012,038✔
1174
}
1175

1176
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
9,336✔
1177
  int32_t code = 0;
9,336✔
1178

1179
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
9,336✔
1180

1181
  code = tsdbCacheCommitNoLock(pTsdb);
9,336✔
1182
  if (code != TSDB_CODE_SUCCESS) {
9,336✔
1183
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1184
              tstrerror(code));
1185
  }
1186

1187
  if (pSchemaRow != NULL) {
9,336✔
1188
    bool hasPrimayKey = false;
×
1189
    int  nCols = pSchemaRow->nCols;
×
1190
    if (nCols >= 2) {
×
1191
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
1192
    }
1193
    for (int i = 0; i < nCols; ++i) {
×
1194
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
1195
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1196

1197
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1198
      if (code != TSDB_CODE_SUCCESS) {
×
1199
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1200
                  tstrerror(code));
1201
      }
1202
    }
1203
  } else {
1204
    STSchema *pTSchema = NULL;
9,336✔
1205
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
9,336✔
1206
    if (code != TSDB_CODE_SUCCESS) {
9,336✔
1207
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1208

1209
      TAOS_RETURN(code);
×
1210
    }
1211

1212
    bool hasPrimayKey = false;
9,336✔
1213
    int  nCols = pTSchema->numOfCols;
9,336✔
1214
    if (nCols >= 2) {
9,336✔
1215
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
9,336✔
1216
    }
1217
    for (int i = 0; i < nCols; ++i) {
36,846✔
1218
      int16_t cid = pTSchema->columns[i].colId;
27,510✔
1219
      int8_t  col_type = pTSchema->columns[i].type;
27,510✔
1220

1221
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
27,510✔
1222
      if (code != TSDB_CODE_SUCCESS) {
27,510✔
1223
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1224
                  tstrerror(code));
1225
      }
1226
    }
1227

1228
    taosMemoryFree(pTSchema);
9,336✔
1229
  }
1230

1231
  rocksMayWrite(pTsdb, false);
9,336✔
1232

1233
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
9,336✔
1234

1235
  TAOS_RETURN(code);
9,336✔
1236
}
1237

1238
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
1,070,274✔
1239
  int32_t code = 0;
1,070,274✔
1240

1241
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
1,070,274✔
1242

1243
  code = tsdbCacheCommitNoLock(pTsdb);
1,071,879✔
1244
  if (code != TSDB_CODE_SUCCESS) {
1,071,145✔
1245
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1246
              tstrerror(code));
1247
  }
1248

1249
  STSchema *pTSchema = NULL;
1,071,145✔
1250
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
1,070,966✔
1251
  if (code != TSDB_CODE_SUCCESS) {
1,071,145✔
1252
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
3,054✔
1253

1254
    TAOS_RETURN(code);
3,054✔
1255
  }
1256

1257
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
2,005,243✔
1258
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
936,720✔
1259

1260
    bool hasPrimayKey = false;
936,720✔
1261
    int  nCols = pTSchema->numOfCols;
936,720✔
1262
    if (nCols >= 2) {
936,720✔
1263
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
936,720✔
1264
    }
1265

1266
    for (int i = 0; i < nCols; ++i) {
8,123,230✔
1267
      int16_t cid = pTSchema->columns[i].colId;
7,186,078✔
1268
      int8_t  col_type = pTSchema->columns[i].type;
7,186,569✔
1269

1270
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
7,186,569✔
1271
      if (code != TSDB_CODE_SUCCESS) {
7,186,510✔
1272
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1273
                  tstrerror(code));
1274
      }
1275
    }
1276
  }
1277

1278
  taosMemoryFree(pTSchema);
1,067,857✔
1279

1280
  rocksMayWrite(pTsdb, false);
1,068,825✔
1281

1282
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
1,068,825✔
1283

1284
  TAOS_RETURN(code);
1,068,825✔
1285
}
1286

1287
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
×
1288
  int32_t code = 0;
×
1289

1290
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1291

1292
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
1293
  if (code != TSDB_CODE_SUCCESS) {
×
1294
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1295
              tstrerror(code));
1296
  }
1297
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
1298
  if (code != TSDB_CODE_SUCCESS) {
×
1299
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1300
              tstrerror(code));
1301
  }
1302
  // rocksMayWrite(pTsdb, true, false, false);
1303
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1304

1305
  TAOS_RETURN(code);
×
1306
}
1307

1308
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
×
1309
  int32_t code = 0;
×
1310

1311
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1312

1313
  code = tsdbCacheCommitNoLock(pTsdb);
×
1314
  if (code != TSDB_CODE_SUCCESS) {
×
1315
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1316
              tstrerror(code));
1317
  }
1318

1319
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1320
  if (code != TSDB_CODE_SUCCESS) {
×
1321
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1322
              tstrerror(code));
1323
  }
1324

1325
  rocksMayWrite(pTsdb, false);
×
1326

1327
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1328

1329
  TAOS_RETURN(code);
×
1330
}
1331

1332
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
20,032✔
1333
  int32_t code = 0;
20,032✔
1334

1335
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
20,032✔
1336

1337
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
120,192✔
1338
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
100,160✔
1339

1340
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
100,160✔
1341
    if (code != TSDB_CODE_SUCCESS) {
100,160✔
1342
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1343
                tstrerror(code));
1344
    }
1345
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
100,160✔
1346
    if (code != TSDB_CODE_SUCCESS) {
100,160✔
1347
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1348
                tstrerror(code));
1349
    }
1350
  }
1351

1352
  // rocksMayWrite(pTsdb, true, false, false);
1353
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
20,032✔
1354
  TAOS_RETURN(code);
20,032✔
1355
}
1356

1357
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
10,016✔
1358
  int32_t code = 0;
10,016✔
1359

1360
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
10,016✔
1361

1362
  code = tsdbCacheCommitNoLock(pTsdb);
10,016✔
1363
  if (code != TSDB_CODE_SUCCESS) {
10,016✔
1364
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1365
              tstrerror(code));
1366
  }
1367

1368
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
60,096✔
1369
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
50,080✔
1370

1371
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
50,080✔
1372
    if (code != TSDB_CODE_SUCCESS) {
50,080✔
1373
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1374
                tstrerror(code));
1375
    }
1376
  }
1377

1378
  rocksMayWrite(pTsdb, false);
10,016✔
1379

1380
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
10,016✔
1381

1382
  TAOS_RETURN(code);
10,016✔
1383
}
1384

1385
typedef struct {
1386
  int      idx;
1387
  SLastKey key;
1388
} SIdxKey;
1389

1390
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
×
1391
  // update rowkey
1392
  pLastCol->rowKey.ts = TSKEY_MIN;
×
1393
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
1394
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
1395
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
1396
      taosMemoryFreeClear(pPKValue->pData);
×
1397
      pPKValue->nData = 0;
×
1398
    } else {
1399
      valueClearDatum(pPKValue, pPKValue->type);
×
1400
    }
1401
  }
1402
  pLastCol->rowKey.numOfPKs = 0;
×
1403

1404
  // update colval
1405
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
×
1406
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
1407
    pLastCol->colVal.value.nData = 0;
×
1408
  } else {
1409
    valueClearDatum(&pLastCol->colVal.value, pLastCol->colVal.value.type);
×
1410
  }
1411

1412
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
×
1413
  pLastCol->dirty = 1;
×
1414
  pLastCol->cacheStatus = cacheStatus;
×
1415
}
×
1416

1417
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
59,639,733✔
1418
  int32_t code = 0;
59,639,733✔
1419
#ifdef USE_ROCKSDB
1420
  char  *rocks_value = NULL;
59,639,733✔
1421
  size_t vlen = 0;
59,641,922✔
1422

1423
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
59,648,111✔
1424
  if (code) {
59,645,564✔
1425
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
1426
    TAOS_RETURN(code);
×
1427
  }
1428

1429
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
59,645,564✔
1430
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
59,645,199✔
1431
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
59,659,763✔
1432
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
59,585,854✔
1433

1434
  taosMemoryFree(rocks_value);
59,645,573✔
1435
#endif
1436
  TAOS_RETURN(code);
59,595,938✔
1437
}
1438

1439
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
111,952,808✔
1440
  int32_t code = 0, lino = 0;
111,952,808✔
1441

1442
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
111,952,808✔
1443
  if (!pLRULastCol) {
111,911,680✔
1444
    return terrno;
×
1445
  }
1446

1447
  size_t charge = 0;
111,911,680✔
1448
  *pLRULastCol = *pLastCol;
111,911,315✔
1449
  pLRULastCol->dirty = dirty;
111,909,776✔
1450
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
111,916,967✔
1451

1452
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
111,955,785✔
1453
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
1454
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
111,903,415✔
1455
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
1456
    code = TSDB_CODE_FAILED;
×
1457
    pLRULastCol = NULL;
×
1458
  }
1459

1460
_exit:
111,903,415✔
1461
  if (TSDB_CODE_SUCCESS != code) {
111,903,347✔
1462
    taosMemoryFree(pLRULastCol);
×
1463
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1464
  }
1465

1466
  TAOS_RETURN(code);
111,903,347✔
1467
}
1468

1469
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
8,792,865✔
1470
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
8,792,865✔
1471
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1472
  }
1473

1474
  int32_t code = 0, lino = 0;
8,792,865✔
1475

1476
  int        num_keys = TARRAY_SIZE(updCtxArray);
8,792,865✔
1477
  SArray    *remainCols = NULL;
8,792,865✔
1478
  SLRUCache *pCache = pTsdb->lruCache;
8,792,865✔
1479

1480
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
8,792,865✔
1481
  for (int i = 0; i < num_keys; ++i) {
64,557,013✔
1482
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
55,764,880✔
1483
    int8_t          lflag = updCtx->lflag;
55,782,035✔
1484
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
55,782,400✔
1485
    SColVal        *pColVal = &updCtx->colVal;
55,786,780✔
1486

1487
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
55,779,110✔
1488
      continue;
×
1489
    }
1490

1491
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
55,779,110✔
1492
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
55,789,327✔
1493
    if (h) {
55,797,713✔
1494
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
53,518,097✔
1495
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
53,509,703✔
1496
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
53,501,636✔
1497
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
53,481,926✔
1498
          SLastCol newLastCol = {
53,478,584✔
1499
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1500
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
53,480,408✔
1501
        }
1502
      }
1503

1504
      tsdbLRUCacheRelease(pCache, h, false);
53,481,578✔
1505
      TAOS_CHECK_EXIT(code);
53,513,722✔
1506
    } else {
1507
      if (!remainCols) {
2,279,616✔
1508
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
60,392✔
1509
        if (!remainCols) {
60,392✔
1510
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1511
        }
1512
      }
1513
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
4,559,232✔
1514
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1515
      }
1516
    }
1517
  }
1518

1519
  if (remainCols) {
8,792,133✔
1520
    num_keys = TARRAY_SIZE(remainCols);
60,392✔
1521
  }
1522
  if (remainCols && num_keys > 0) {
8,792,133✔
1523
    char  **keys_list = NULL;
60,392✔
1524
    size_t *keys_list_sizes = NULL;
60,392✔
1525
    char  **values_list = NULL;
60,392✔
1526
    size_t *values_list_sizes = NULL;
60,392✔
1527
    char  **errs = NULL;
60,392✔
1528
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
60,392✔
1529
    if (!keys_list) {
60,392✔
1530
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1531
      return terrno;
×
1532
    }
1533
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
60,392✔
1534
    if (!keys_list_sizes) {
60,392✔
1535
      taosMemoryFree(keys_list);
×
1536
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1537
      return terrno;
×
1538
    }
1539
    for (int i = 0; i < num_keys; ++i) {
2,340,008✔
1540
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
2,279,616✔
1541

1542
      keys_list[i] = (char *)&idxKey->key;
2,279,616✔
1543
      keys_list_sizes[i] = ROCKS_KEY_LEN;
2,279,616✔
1544
    }
1545

1546
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
60,392✔
1547

1548
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
60,392✔
1549
                                       &values_list_sizes);
1550
    if (code) {
60,392✔
1551
      taosMemoryFree(keys_list);
×
1552
      taosMemoryFree(keys_list_sizes);
×
1553
      goto _exit;
×
1554
    }
1555

1556
    // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
1557
    for (int i = 0; i < num_keys; ++i) {
2,340,008✔
1558
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
2,279,616✔
1559
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
2,279,616✔
1560
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
2,279,616✔
1561
      SColVal        *pColVal = &updCtx->colVal;
2,279,616✔
1562

1563
      SLastCol *pLastCol = NULL;
2,279,616✔
1564
      if (values_list[i] != NULL) {
2,279,616✔
1565
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
1566
        if (code != TSDB_CODE_SUCCESS) {
×
1567
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1568
                    tstrerror(code));
1569
          goto _exit;
×
1570
        }
1571
      }
1572
      /*
1573
      if (code) {
1574
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1575
      }
1576
      */
1577
      SLastCol *pToFree = pLastCol;
2,279,616✔
1578

1579
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
2,279,616✔
1580
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
1581
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1582
                    tstrerror(code));
1583
          taosMemoryFreeClear(pToFree);
×
1584
          break;
×
1585
        }
1586

1587
        // cache invalid => skip update
1588
        taosMemoryFreeClear(pToFree);
×
1589
        continue;
×
1590
      }
1591

1592
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
2,279,616✔
1593
        taosMemoryFreeClear(pToFree);
×
1594
        continue;
×
1595
      }
1596

1597
      int32_t cmp_res = 1;
2,279,616✔
1598
      if (pLastCol) {
2,279,616✔
1599
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
×
1600
      }
1601

1602
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
2,279,616✔
1603
        SLastCol lastColTmp = {
2,279,616✔
1604
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
1605
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
2,279,616✔
1606
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1607
                    tstrerror(code));
1608
          taosMemoryFreeClear(pToFree);
×
1609
          break;
×
1610
        }
1611
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
2,279,616✔
1612
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1613
                    tstrerror(code));
1614
          taosMemoryFreeClear(pToFree);
×
1615
          break;
×
1616
        }
1617
      }
1618

1619
      taosMemoryFreeClear(pToFree);
2,279,616✔
1620
    }
1621

1622
    rocksMayWrite(pTsdb, false);
60,392✔
1623

1624
    taosMemoryFree(keys_list);
60,392✔
1625
    taosMemoryFree(keys_list_sizes);
60,392✔
1626
    if (values_list) {
60,392✔
1627
#ifdef USE_ROCKSDB
1628
      for (int i = 0; i < num_keys; ++i) {
2,340,008✔
1629
        rocksdb_free(values_list[i]);
2,279,616✔
1630
      }
1631
#endif
1632
      taosMemoryFree(values_list);
60,392✔
1633
    }
1634
    taosMemoryFree(values_list_sizes);
60,392✔
1635
  }
1636

1637
_exit:
8,790,673✔
1638
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
8,791,768✔
1639
  taosArrayDestroy(remainCols);
8,792,498✔
1640

1641
  if (code) {
8,792,133✔
1642
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1643
              tstrerror(code));
1644
  }
1645

1646
  TAOS_RETURN(code);
8,792,133✔
1647
}
1648

1649
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
×
1650
                                 SRow **aRow) {
1651
  int32_t code = 0, lino = 0;
×
1652

1653
  // 1. prepare last
1654
  TSDBROW      lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
×
1655
  STSchema    *pTSchema = NULL;
×
1656
  int32_t      sver = TSDBROW_SVERSION(&lRow);
×
1657
  SSHashObj   *iColHash = NULL;
×
1658
  STSDBRowIter iter = {0};
×
1659

1660
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
1661
  pTSchema = pTsdb->rCache.pTSchema;
×
1662

1663
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
×
1664
  int32_t nCol = pTSchema->numOfCols;
×
1665
  SArray *ctxArray = pTsdb->rCache.ctxArray;
×
1666

1667
  // 1. prepare by lrow
1668
  STsdbRowKey tsdbRowKey = {0};
×
1669
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1670

1671
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1672

1673
  int32_t iCol = 0;
×
1674
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
1675
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1676
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1677
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1678
    }
1679

1680
    if (COL_VAL_IS_VALUE(pColVal)) {
×
1681
      updateCtx.lflag = LFLAG_LAST;
×
1682
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1683
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1684
      }
1685
    } else {
1686
      if (!iColHash) {
×
1687
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
1688
        if (iColHash == NULL) {
×
1689
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1690
        }
1691
      }
1692

1693
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
×
1694
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1695
      }
1696
    }
1697
  }
1698

1699
  // 2. prepare by the other rows
1700
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
×
1701
    if (tSimpleHashGetSize(iColHash) == 0) {
×
1702
      break;
×
1703
    }
1704

1705
    tRow.pTSRow = aRow[iRow];
×
1706

1707
    STsdbRowKey tsdbRowKey = {0};
×
1708
    tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1709

1710
    void   *pIte = NULL;
×
1711
    int32_t iter = 0;
×
1712
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
×
1713
      int32_t iCol = ((int32_t *)pIte)[0];
×
1714
      SColVal colVal = COL_VAL_NONE(0, 0);
×
1715
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
×
1716

1717
      if (COL_VAL_IS_VALUE(&colVal)) {
×
1718
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1719
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1720
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1721
        }
1722
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
1723
        if (code != TSDB_CODE_SUCCESS) {
×
1724
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1725
                    __LINE__, tstrerror(code));
1726
        }
1727
      }
1728
    }
1729
  }
1730

1731
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1732

1733
_exit:
×
1734
  if (code) {
×
1735
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1736
  }
1737

1738
  tsdbRowClose(&iter);
×
1739
  tSimpleHashCleanup(iColHash);
×
1740
  taosArrayClear(ctxArray);
×
1741

1742
  TAOS_RETURN(code);
×
1743
}
1744

1745
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
1746
  int32_t      code = 0, lino = 0;
×
1747
  STSDBRowIter iter = {0};
×
1748
  STSchema    *pTSchema = NULL;
×
1749
  SArray      *ctxArray = NULL;
×
1750

1751
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
1752
  int32_t sver = TSDBROW_SVERSION(&lRow);
×
1753

1754
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1755

1756
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
1757
  if (ctxArray == NULL) {
×
1758
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1759
  }
1760

1761
  // 1. prepare last
1762
  STsdbRowKey tsdbRowKey = {0};
×
1763
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1764

1765
  {
1766
    SValue tsVal = {.type = TSDB_DATA_TYPE_TIMESTAMP};
×
1767
    VALUE_SET_TRIVIAL_DATUM(&tsVal, lRow.pBlockData->aTSKEY[lRow.iRow]);
×
1768
    SLastUpdateCtx updateCtx = {
×
1769
        .lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, tsVal)};
1770
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1771
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1772
    }
1773
  }
1774

1775
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1776

1777
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1778
    SColData *pColData = &pBlockData->aColData[iColData];
×
1779
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
1780
      continue;
×
1781
    }
1782

1783
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
1784
      STsdbRowKey tsdbRowKey = {0};
×
1785
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1786

1787
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
1788
      if (colType == 2) {
×
1789
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
1790
        TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit);
×
1791

1792
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1793
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1794
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1795
        }
1796
        break;
×
1797
      }
1798
    }
1799
  }
1800

1801
  // 2. prepare last row
1802
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1803
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
1804
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1805
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1806
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1807
    }
1808
  }
1809

1810
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1811

1812
_exit:
×
1813
  tsdbRowClose(&iter);
×
1814
  taosMemoryFreeClear(pTSchema);
×
1815
  taosArrayDestroy(ctxArray);
×
1816

1817
  TAOS_RETURN(code);
×
1818
}
1819

1820
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1821
                            int nCols, int16_t *slotIds);
1822

1823
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1824
                               int nCols, int16_t *slotIds);
1825

1826
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
75,208✔
1827
                                    SCacheRowsReader *pr, int8_t ltype) {
1828
  int32_t code = 0, lino = 0;
75,208✔
1829
  // rocksdb_writebatch_t *wb = NULL;
1830
  SArray *pTmpColArray = NULL;
75,208✔
1831
  bool    extraTS = false;
75,208✔
1832

1833
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
75,208✔
1834
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
75,208✔
1835
    // ignore 'ts' loaded from cache and load it from tsdb
1836
    // SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
1837
    // tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
1838

1839
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
×
1840
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
×
1841
      TAOS_RETURN(terrno);
×
1842
    }
1843

1844
    extraTS = true;
×
1845
  }
1846

1847
  int      num_keys = TARRAY_SIZE(remainCols);
75,208✔
1848
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
75,208✔
1849

1850
  int16_t *lastColIds = NULL, *lastSlotIds = NULL, *lastrowColIds = NULL, *lastrowSlotIds = NULL;
75,208✔
1851
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
75,208✔
1852
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
75,208✔
1853
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
75,208✔
1854
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
75,208✔
1855
  SArray *lastTmpColArray = NULL, *lastTmpIndexArray = NULL, *lastrowTmpColArray = NULL, *lastrowTmpIndexArray = NULL;
75,208✔
1856

1857
  int lastIndex = 0;
75,208✔
1858
  int lastrowIndex = 0;
75,208✔
1859

1860
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
75,208✔
1861
    TAOS_CHECK_EXIT(terrno);
×
1862
  }
1863

1864
  for (int i = 0; i < num_keys; ++i) {
275,686✔
1865
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
200,478✔
1866
    if (extraTS && !i) {
200,478✔
1867
      slotIds[i] = 0;
×
1868
    } else {
1869
      slotIds[i] = pr->pSlotIds[idxKey->idx];
200,478✔
1870
    }
1871

1872
    if (IS_LAST_KEY(idxKey->key)) {
200,478✔
1873
      if (NULL == lastTmpIndexArray) {
162,108✔
1874
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
66,650✔
1875
        if (!lastTmpIndexArray) {
66,650✔
1876
          TAOS_CHECK_EXIT(terrno);
×
1877
        }
1878
      }
1879
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
162,108✔
1880
        TAOS_CHECK_EXIT(terrno);
×
1881
      }
1882
      lastColIds[lastIndex] = idxKey->key.cid;
162,108✔
1883
      if (extraTS && !i) {
162,108✔
1884
        lastSlotIds[lastIndex] = 0;
×
1885
      } else {
1886
        lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
162,108✔
1887
      }
1888
      lastIndex++;
162,108✔
1889
    } else {
1890
      if (NULL == lastrowTmpIndexArray) {
38,370✔
1891
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
8,558✔
1892
        if (!lastrowTmpIndexArray) {
8,558✔
1893
          TAOS_CHECK_EXIT(terrno);
×
1894
        }
1895
      }
1896
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
38,370✔
1897
        TAOS_CHECK_EXIT(terrno);
×
1898
      }
1899
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
38,370✔
1900
      if (extraTS && !i) {
38,370✔
1901
        lastrowSlotIds[lastrowIndex] = 0;
×
1902
      } else {
1903
        lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
38,370✔
1904
      }
1905
      lastrowIndex++;
38,370✔
1906
    }
1907
  }
1908

1909
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
75,208✔
1910
  if (!pTmpColArray) {
75,208✔
1911
    TAOS_CHECK_EXIT(terrno);
×
1912
  }
1913

1914
  if (lastTmpIndexArray != NULL) {
75,208✔
1915
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
66,650✔
1916
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
220,665✔
1917
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
154,015✔
1918
                           taosArrayGet(lastTmpColArray, i))) {
154,015✔
1919
        TAOS_CHECK_EXIT(terrno);
×
1920
      }
1921
    }
1922
  }
1923

1924
  if (lastrowTmpIndexArray != NULL) {
75,208✔
1925
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
8,558✔
1926
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
42,995✔
1927
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
34,437✔
1928
                           taosArrayGet(lastrowTmpColArray, i))) {
34,437✔
1929
        TAOS_CHECK_EXIT(terrno);
×
1930
      }
1931
    }
1932
  }
1933

1934
  SLRUCache *pCache = pTsdb->lruCache;
75,208✔
1935
  for (int i = 0; i < num_keys; ++i) {
275,686✔
1936
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
200,478✔
1937
    SLastCol *pLastCol = NULL;
200,478✔
1938

1939
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
200,478✔
1940
      pLastCol = taosArrayGet(pTmpColArray, i);
188,452✔
1941
    }
1942

1943
    // still null, then make up a none col value
1944
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
200,478✔
1945
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
200,478✔
1946
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
1947
    if (!pLastCol) {
200,478✔
1948
      pLastCol = &noneCol;
12,026✔
1949
    }
1950

1951
    if (!extraTS || i > 0) {
200,478✔
1952
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from tsdb, col_id:%d col_flag:%d ts:%" PRId64,
200,478✔
1953
                TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, pLastCol->colVal.cid,
1954
                pLastCol->colVal.flag, pLastCol->rowKey.ts);
1955
      taosArraySet(pLastArray, idxKey->idx, pLastCol);
200,478✔
1956
    }
1957

1958
    // taosArrayRemove(remainCols, i);
1959

1960
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
200,478✔
1961
      continue;
×
1962
    }
1963
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
200,478✔
1964
      continue;
×
1965
    }
1966

1967
    // store result back to rocks cache
1968
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
200,478✔
1969
    if (code) {
200,478✔
1970
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1971
      TAOS_CHECK_EXIT(code);
×
1972
    }
1973

1974
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
200,478✔
1975
    if (code) {
200,478✔
1976
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1977
      TAOS_CHECK_EXIT(code);
×
1978
    }
1979

1980
    if (extraTS && i == 0) {
200,478✔
1981
      tsdbCacheFreeSLastColItem(pLastCol);
×
1982
    }
1983
  }
1984

1985
  rocksMayWrite(pTsdb, false);
75,208✔
1986

1987
_exit:
75,208✔
1988
  taosArrayDestroy(lastrowTmpIndexArray);
75,208✔
1989
  taosArrayDestroy(lastrowTmpColArray);
75,208✔
1990
  taosArrayDestroy(lastTmpIndexArray);
75,208✔
1991
  taosArrayDestroy(lastTmpColArray);
75,208✔
1992

1993
  taosMemoryFree(lastColIds);
75,208✔
1994
  taosMemoryFree(lastSlotIds);
75,208✔
1995
  taosMemoryFree(lastrowColIds);
75,208✔
1996
  taosMemoryFree(lastrowSlotIds);
75,208✔
1997

1998
  taosArrayDestroy(pTmpColArray);
75,208✔
1999

2000
  taosMemoryFree(slotIds);
75,208✔
2001

2002
  TAOS_RETURN(code);
75,208✔
2003
}
2004

2005
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
102,734✔
2006
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
2007
  int32_t code = 0, lino = 0;
102,734✔
2008
  int     num_keys = TARRAY_SIZE(remainCols);
102,734✔
2009
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
102,734✔
2010
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
102,734✔
2011
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
102,734✔
2012
  if (!keys_list || !keys_list_sizes || !key_list) {
102,734✔
UNCOV
2013
    taosMemoryFree(keys_list);
×
2014
    taosMemoryFree(keys_list_sizes);
×
2015
    TAOS_RETURN(terrno);
×
2016
  }
2017
  char  **values_list = NULL;
102,734✔
2018
  size_t *values_list_sizes = NULL;
102,734✔
2019
  for (int i = 0; i < num_keys; ++i) {
365,042✔
2020
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
262,308✔
2021
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
262,308✔
2022
    keys_list_sizes[i] = ROCKS_KEY_LEN;
262,308✔
2023
  }
2024

2025
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
102,734✔
2026

2027
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
102,734✔
2028
                                     &values_list_sizes);
2029
  if (code) {
102,240✔
2030
    taosMemoryFree(key_list);
×
2031
    taosMemoryFree(keys_list);
×
2032
    taosMemoryFree(keys_list_sizes);
×
2033
    TAOS_RETURN(code);
×
2034
  }
2035

2036
  SLRUCache *pCache = pTsdb->lruCache;
102,240✔
2037
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
364,548✔
2038
    SLastCol *pLastCol = NULL;
261,814✔
2039
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
261,814✔
2040
    if (ignore) {
261,814✔
2041
      ++j;
624✔
2042
      continue;
624✔
2043
    }
2044

2045
    if (values_list[i] != NULL) {
261,190✔
2046
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
61,830✔
2047
      if (code != TSDB_CODE_SUCCESS) {
61,830✔
2048
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2049
                  tstrerror(code));
2050
        goto _exit;
×
2051
      }
2052
    }
2053
    SLastCol *pToFree = pLastCol;
261,190✔
2054
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
261,190✔
2055
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
323,514✔
2056
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
61,830✔
2057
      if (code) {
61,830✔
2058
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
2059
        taosMemoryFreeClear(pToFree);
×
2060
        TAOS_CHECK_EXIT(code);
×
2061
      }
2062

2063
      SLastCol lastCol = *pLastCol;
61,830✔
2064
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
61,830✔
2065
      if (TSDB_CODE_SUCCESS != code) {
61,830✔
2066
        taosMemoryFreeClear(pToFree);
×
2067
        TAOS_CHECK_EXIT(code);
×
2068
      }
2069

2070
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from rocksdb, col_id:%d col_flag:%d ts:%" PRId64,
61,830✔
2071
                TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2072
                lastCol.colVal.flag, lastCol.rowKey.ts);
2073

2074
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
61,830✔
2075
      taosArrayRemove(remainCols, j);
61,830✔
2076
      taosArrayRemove(ignoreFromRocks, j);
61,830✔
2077
    } else {
2078
      ++j;
199,854✔
2079
    }
2080

2081
    taosMemoryFreeClear(pToFree);
261,684✔
2082
  }
2083

2084
  if (TARRAY_SIZE(remainCols) > 0) {
102,734✔
2085
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
2086
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
75,208✔
2087
  }
2088

2089
_exit:
102,734✔
2090
  taosMemoryFree(key_list);
102,734✔
2091
  taosMemoryFree(keys_list);
102,734✔
2092
  taosMemoryFree(keys_list_sizes);
102,734✔
2093
  if (values_list) {
102,734✔
2094
#ifdef USE_ROCKSDB
2095
    for (int i = 0; i < num_keys; ++i) {
365,042✔
2096
      rocksdb_free(values_list[i]);
262,308✔
2097
    }
2098
#endif
2099
    taosMemoryFree(values_list);
102,734✔
2100
  }
2101
  taosMemoryFree(values_list_sizes);
102,734✔
2102

2103
  TAOS_RETURN(code);
102,734✔
2104
}
2105

2106
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
1,723,165✔
2107
                                        int8_t ltype, SArray *keyArray) {
2108
  int32_t    code = 0, lino = 0;
1,723,165✔
2109
  SArray    *remainCols = NULL;
1,723,165✔
2110
  SArray    *ignoreFromRocks = NULL;
1,723,165✔
2111
  SLRUCache *pCache = pTsdb->lruCache;
1,723,165✔
2112
  SArray    *pCidList = pr->pCidList;
1,723,655✔
2113
  int        numKeys = TARRAY_SIZE(pCidList);
1,723,655✔
2114

2115
  for (int i = 0; i < numKeys; ++i) {
6,303,738✔
2116
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
4,580,083✔
2117

2118
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
4,580,573✔
2119
    // for select last_row, last case
2120
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
4,580,573✔
2121
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
4,580,573✔
2122
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
×
2123
    }
2124
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
4,579,585✔
2125
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
×
2126
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
×
2127
    }
2128

2129
    if (!taosArrayPush(keyArray, &key)) {
4,580,573✔
2130
      TAOS_CHECK_EXIT(terrno);
×
2131
    }
2132

2133
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
4,580,573✔
2134
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
4,580,573✔
2135
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
8,898,348✔
2136
      SLastCol lastCol = *pLastCol;
4,317,775✔
2137
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
4,318,265✔
2138
        tsdbLRUCacheRelease(pCache, h, false);
×
2139
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
2140
      }
2141

2142
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from lru, col_id:%d col_flag:%d ts:%" PRId64, TD_VID(pTsdb->pVnode),
4,318,265✔
2143
                __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid, lastCol.colVal.flag,
2144
                lastCol.rowKey.ts);
2145

2146
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
4,317,775✔
2147
        code = terrno;
×
2148
        tsdbLRUCacheRelease(pCache, h, false);
×
2149
        goto _exit;
×
2150
      }
2151
    } else {
2152
      // no cache or cache is invalid
2153
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
262,798✔
2154
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
262,308✔
2155

2156
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
262,308✔
2157
        code = terrno;
×
2158
        tsdbLRUCacheRelease(pCache, h, false);
×
2159
        goto _exit;
×
2160
      }
2161

2162
      if (!remainCols) {
262,308✔
2163
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
102,734✔
2164
          code = terrno;
×
2165
          tsdbLRUCacheRelease(pCache, h, false);
×
2166
          goto _exit;
×
2167
        }
2168
      }
2169
      if (!ignoreFromRocks) {
262,308✔
2170
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
102,734✔
2171
          code = terrno;
×
2172
          tsdbLRUCacheRelease(pCache, h, false);
×
2173
          goto _exit;
×
2174
        }
2175
      }
2176
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
524,616✔
2177
        code = terrno;
×
2178
        tsdbLRUCacheRelease(pCache, h, false);
×
2179
        goto _exit;
×
2180
      }
2181
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
262,308✔
2182
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
262,308✔
2183
        code = terrno;
×
2184
        tsdbLRUCacheRelease(pCache, h, false);
×
2185
        goto _exit;
×
2186
      }
2187
    }
2188

2189
    if (h) {
4,580,083✔
2190
      tsdbLRUCacheRelease(pCache, h, false);
4,318,399✔
2191
    }
2192
  }
2193

2194
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
1,723,655✔
2195
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
102,734✔
2196

2197
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
365,042✔
2198
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
262,308✔
2199
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
262,308✔
2200
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
262,308✔
2201
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
262,308✔
2202
        SLastCol lastCol = *pLastCol;
×
2203
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
2204
        if (code) {
×
2205
          tsdbLRUCacheRelease(pCache, h, false);
×
2206
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
2207
          TAOS_RETURN(code);
×
2208
        }
2209

2210
        tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from lru(2nd lookup), col_id:%d col_flag:%d ts:%" PRId64,
×
2211
                  TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2212
                  lastCol.colVal.flag, lastCol.rowKey.ts);
2213

2214
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
2215

2216
        taosArrayRemove(remainCols, i);
×
2217
        taosArrayRemove(ignoreFromRocks, i);
×
2218
      } else {
2219
        // no cache or cache is invalid
2220
        ++i;
262,308✔
2221
      }
2222
      if (h) {
262,308✔
2223
        tsdbLRUCacheRelease(pCache, h, false);
624✔
2224
      }
2225
    }
2226

2227
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
2228
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
102,734✔
2229

2230
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
102,734✔
2231
  }
2232

2233
_exit:
1,620,921✔
2234
  if (remainCols) {
1,723,165✔
2235
    taosArrayDestroy(remainCols);
102,734✔
2236
  }
2237
  if (ignoreFromRocks) {
1,723,165✔
2238
    taosArrayDestroy(ignoreFromRocks);
102,734✔
2239
  }
2240

2241
  TAOS_RETURN(code);
1,723,165✔
2242
}
2243

2244
typedef enum SMEMNEXTROWSTATES {
2245
  SMEMNEXTROW_ENTER,
2246
  SMEMNEXTROW_NEXT,
2247
} SMEMNEXTROWSTATES;
2248

2249
typedef struct SMemNextRowIter {
2250
  SMEMNEXTROWSTATES state;
2251
  STbData          *pMem;  // [input]
2252
  STbDataIter       iter;  // mem buffer skip list iterator
2253
  int64_t           lastTs;
2254
} SMemNextRowIter;
2255

2256
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
1,723,493✔
2257
                                 int nCols) {
2258
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
1,723,493✔
2259
  int32_t          code = 0;
1,723,493✔
2260
  *pIgnoreEarlierTs = false;
1,723,493✔
2261
  switch (state->state) {
1,723,983✔
2262
    case SMEMNEXTROW_ENTER: {
1,510,709✔
2263
      if (state->pMem != NULL) {
1,510,709✔
2264
        /*
2265
        if (state->pMem->maxKey <= state->lastTs) {
2266
          *ppRow = NULL;
2267
          *pIgnoreEarlierTs = true;
2268

2269
          TAOS_RETURN(code);
2270
        }
2271
        */
2272
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
1,510,709✔
2273

2274
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
1,510,709✔
2275
        if (pMemRow) {
1,510,709✔
2276
          *ppRow = pMemRow;
1,511,199✔
2277
          state->state = SMEMNEXTROW_NEXT;
1,511,199✔
2278

2279
          TAOS_RETURN(code);
1,511,199✔
2280
        }
2281
      }
2282

2283
      *ppRow = NULL;
×
2284

2285
      TAOS_RETURN(code);
×
2286
    }
2287
    case SMEMNEXTROW_NEXT:
212,784✔
2288
      if (tsdbTbDataIterNext(&state->iter)) {
212,784✔
2289
        *ppRow = tsdbTbDataIterGet(&state->iter);
295,884✔
2290

2291
        TAOS_RETURN(code);
147,942✔
2292
      } else {
2293
        *ppRow = NULL;
64,842✔
2294

2295
        TAOS_RETURN(code);
64,842✔
2296
      }
2297
    default:
×
2298
      break;
×
2299
  }
2300

2301
_err:
×
2302
  *ppRow = NULL;
×
2303

2304
  TAOS_RETURN(code);
×
2305
}
2306

2307
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2308
                                  int nCols);
2309
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2310

2311
typedef struct {
2312
  TSDBROW             *pRow;
2313
  bool                 stop;
2314
  bool                 next;
2315
  bool                 ignoreEarlierTs;
2316
  void                *iter;
2317
  _next_row_fn_t       nextRowFn;
2318
  _next_row_clear_fn_t nextRowClearFn;
2319
} TsdbNextRowState;
2320

2321
typedef struct {
2322
  SArray           *pMemDelData;
2323
  SArray           *pSkyline;
2324
  int64_t           iSkyline;
2325
  SBlockIdx         idx;
2326
  SMemNextRowIter   memState;
2327
  SMemNextRowIter   imemState;
2328
  TSDBROW           memRow, imemRow;
2329
  TsdbNextRowState  input[2];
2330
  SCacheRowsReader *pr;
2331
  STsdb            *pTsdb;
2332
} MemNextRowIter;
2333

2334
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
1,723,165✔
2335
                              STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
2336
  int32_t code = 0, lino = 0;
1,723,165✔
2337

2338
  STbData *pMem = NULL;
1,723,165✔
2339
  if (pReadSnap->pMem) {
1,723,165✔
2340
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
1,723,165✔
2341
  }
2342

2343
  STbData *pIMem = NULL;
1,723,655✔
2344
  if (pReadSnap->pIMem) {
1,723,655✔
2345
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
5,437✔
2346
  }
2347

2348
  pIter->pTsdb = pTsdb;
1,723,655✔
2349

2350
  pIter->pMemDelData = NULL;
1,723,165✔
2351

2352
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
1,723,165✔
2353

2354
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
1,723,165✔
2355

2356
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
1,723,165✔
2357
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
1,723,165✔
2358

2359
  if (pMem) {
1,723,165✔
2360
    pIter->memState.pMem = pMem;
1,437,620✔
2361
    pIter->memState.state = SMEMNEXTROW_ENTER;
1,437,620✔
2362
    pIter->input[0].stop = false;
1,437,620✔
2363
    pIter->input[0].next = true;
1,436,794✔
2364
  }
2365

2366
  if (pIMem) {
1,723,165✔
2367
    pIter->imemState.pMem = pIMem;
5,437✔
2368
    pIter->imemState.state = SMEMNEXTROW_ENTER;
5,437✔
2369
    pIter->input[1].stop = false;
5,437✔
2370
    pIter->input[1].next = true;
5,437✔
2371
  }
2372

2373
  pIter->pr = pr;
1,723,165✔
2374

2375
_exit:
1,723,165✔
2376
  if (code) {
1,723,165✔
2377
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2378
  }
2379

2380
  TAOS_RETURN(code);
1,723,165✔
2381
}
2382

2383
static void memRowIterClose(MemNextRowIter *pIter) {
1,723,165✔
2384
  for (int i = 0; i < 2; ++i) {
5,169,458✔
2385
    if (pIter->input[i].nextRowClearFn) {
3,446,820✔
2386
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
2387
    }
2388
  }
2389

2390
  if (pIter->pSkyline) {
1,722,638✔
2391
    taosArrayDestroy(pIter->pSkyline);
1,438,565✔
2392
  }
2393

2394
  if (pIter->pMemDelData) {
1,723,655✔
2395
    taosArrayDestroy(pIter->pMemDelData);
1,723,655✔
2396
  }
2397
}
1,723,165✔
2398

2399
static void freeTableInfoFunc(void *param) {
1,442,083✔
2400
  void **p = (void **)param;
1,442,083✔
2401
  taosMemoryFreeClear(*p);
1,442,083✔
2402
}
1,442,083✔
2403

2404
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
1,525,324✔
2405
  if (!pReader->pTableMap) {
1,525,324✔
2406
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
930,465✔
2407
    if (!pReader->pTableMap) {
930,963✔
2408
      return NULL;
×
2409
    }
2410

2411
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
930,963✔
2412
  }
2413

2414
  STableLoadInfo  *pInfo = NULL;
1,525,831✔
2415
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
1,525,814✔
2416
  if (!ppInfo) {
1,524,826✔
2417
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
1,440,614✔
2418
    if (pInfo) {
1,442,083✔
2419
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
1,442,083✔
2420
        return NULL;
×
2421
      }
2422
    }
2423

2424
    return pInfo;
1,441,095✔
2425
  }
2426

2427
  return *ppInfo;
84,212✔
2428
}
2429

2430
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
1,923,993✔
2431
  int32_t code = 0, lino = 0;
1,923,993✔
2432

2433
  for (;;) {
5,880✔
2434
    for (int i = 0; i < 2; ++i) {
5,788,623✔
2435
      if (pIter->input[i].next && !pIter->input[i].stop) {
3,859,248✔
2436
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
1,649,765✔
2437
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
2438
                        &lino, _exit);
2439

2440
        if (pIter->input[i].pRow == NULL) {
1,650,255✔
2441
          pIter->input[i].stop = true;
61,256✔
2442
          pIter->input[i].next = false;
61,256✔
2443
        }
2444
      }
2445
    }
2446

2447
    if (pIter->input[0].stop && pIter->input[1].stop) {
1,929,375✔
2448
      return NULL;
346,346✔
2449
    }
2450

2451
    TSDBROW *max[2] = {0};
1,583,029✔
2452
    int      iMax[2] = {-1, -1};
1,583,519✔
2453
    int      nMax = 0;
1,584,017✔
2454
    SRowKey  maxKey = {.ts = TSKEY_MIN};
1,584,017✔
2455

2456
    for (int i = 0; i < 2; ++i) {
4,749,586✔
2457
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
3,166,548✔
2458
        STsdbRowKey tsdbRowKey = {0};
1,588,011✔
2459
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
1,588,011✔
2460

2461
        // merging & deduplicating on client side
2462
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
1,588,999✔
2463
        if (c <= 0) {
1,588,011✔
2464
          if (c < 0) {
1,583,029✔
2465
            nMax = 0;
1,583,029✔
2466
            maxKey = tsdbRowKey.key;
1,583,029✔
2467
          }
2468

2469
          iMax[nMax] = i;
1,583,029✔
2470
          max[nMax++] = pIter->input[i].pRow;
1,583,029✔
2471
        }
2472
        pIter->input[i].next = false;
1,588,011✔
2473
      }
2474
    }
2475

2476
    TSDBROW *merge[2] = {0};
1,583,038✔
2477
    int      iMerge[2] = {-1, -1};
1,583,038✔
2478
    int      nMerge = 0;
1,583,527✔
2479
    for (int i = 0; i < nMax; ++i) {
3,167,046✔
2480
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
1,583,527✔
2481

2482
      if (!pIter->pSkyline) {
1,582,548✔
2483
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
1,437,594✔
2484
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
1,438,565✔
2485

2486
        uint64_t        uid = pIter->idx.uid;
1,438,565✔
2487
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
1,438,565✔
2488
        TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
1,437,577✔
2489

2490
        if (pInfo->pTombData == NULL) {
1,437,577✔
2491
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
1,370,025✔
2492
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
1,370,515✔
2493
        }
2494

2495
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
1,438,067✔
2496
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2497
        }
2498

2499
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
1,438,565✔
2500
        if (delSize > 0) {
1,438,565✔
2501
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
12,744✔
2502
          TAOS_CHECK_GOTO(code, &lino, _exit);
12,744✔
2503
        }
2504
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
1,438,565✔
2505
      }
2506

2507
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
1,584,000✔
2508
      if (!deleted) {
1,584,017✔
2509
        iMerge[nMerge] = iMax[i];
1,578,137✔
2510
        merge[nMerge++] = max[i];
1,577,158✔
2511
      }
2512

2513
      pIter->input[iMax[i]].next = deleted;
1,583,519✔
2514
    }
2515

2516
    if (nMerge > 0) {
1,583,519✔
2517
      pIter->input[iMerge[0]].next = true;
1,578,137✔
2518

2519
      return merge[0];
1,577,656✔
2520
    }
2521
  }
2522

2523
_exit:
×
2524
  if (code) {
×
2525
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2526
  }
2527

2528
  return NULL;
×
2529
}
2530

2531
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
926,839✔
2532
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
926,839✔
2533
  *ppDst = taosMemoryMalloc(len);
926,839✔
2534
  if (NULL == *ppDst) {
926,839✔
2535
    TAOS_RETURN(terrno);
×
2536
  }
2537
  memcpy(*ppDst, pSrc, len);
926,839✔
2538

2539
  TAOS_RETURN(TSDB_CODE_SUCCESS);
926,839✔
2540
}
2541

2542
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
1,601,334✔
2543
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
1,601,334✔
2544
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
926,839✔
2545
  }
2546

2547
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
674,976✔
2548
    TAOS_RETURN(TSDB_CODE_SUCCESS);
672,403✔
2549
  }
2550

2551
  taosMemoryFreeClear(pReader->pCurrSchema);
2,092✔
2552
  TAOS_RETURN(
2,092✔
2553
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
2554
}
2555

2556
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
1,723,165✔
2557
                                        SArray *keyArray) {
2558
  int32_t        code = 0;
1,723,165✔
2559
  int32_t        lino = 0;
1,723,165✔
2560
  STSchema      *pTSchema = pr->pSchema;
1,723,165✔
2561
  SLRUCache     *pCache = pTsdb->lruCache;
1,723,165✔
2562
  SArray        *pCidList = pr->pCidList;
1,723,655✔
2563
  int            numKeys = TARRAY_SIZE(pCidList);
1,723,655✔
2564
  MemNextRowIter iter = {0};
1,723,165✔
2565
  SSHashObj     *iColHash = NULL;
1,723,165✔
2566
  STSDBRowIter   rowIter = {0};
1,723,165✔
2567

2568
  // 1, get from mem, imem filtered with delete info
2569
  TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
1,723,165✔
2570

2571
  TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
1,723,165✔
2572
  if (!pRow) {
1,723,174✔
2573
    goto _exit;
285,090✔
2574
  }
2575

2576
  int32_t sversion = TSDBROW_SVERSION(pRow);
1,438,084✔
2577
  if (sversion != -1) {
1,438,565✔
2578
    TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
1,438,565✔
2579

2580
    pTSchema = pr->pCurrSchema;
1,438,565✔
2581
  }
2582
  int32_t nCol = pTSchema->numOfCols;
1,438,565✔
2583

2584
  STsdbRowKey rowKey = {0};
1,438,565✔
2585
  tsdbRowGetKey(pRow, &rowKey);
1,438,565✔
2586

2587
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
1,438,067✔
2588

2589
  int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
1,438,075✔
2590
  for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) {
7,228,952✔
2591
    SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
5,790,396✔
2592
    if (pColVal->cid < pTargetCol->colVal.cid) {
5,790,396✔
2593
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
1,931,306✔
2594

2595
      continue;
1,930,808✔
2596
    }
2597
    if (pColVal->cid > pTargetCol->colVal.cid) {
3,859,588✔
2598
      break;
×
2599
    }
2600

2601
    int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
3,858,127✔
2602
    if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
3,858,617✔
2603
      if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
554,385✔
2604
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
554,385✔
2605
        TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
554,385✔
2606

2607
        tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from memtable, col_id:%d col_flag:%d ts:%" PRId64,
554,866✔
2608
                  TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2609
                  lastCol.colVal.flag, rowKey.key.ts);
2610

2611
        tsdbCacheFreeSLastColItem(pTargetCol);
554,866✔
2612
        taosArraySet(pLastArray, jCol, &lastCol);
554,866✔
2613
      }
2614
    } else {
2615
      if (COL_VAL_IS_VALUE(pColVal)) {
3,304,713✔
2616
        if (cmp_res <= 0) {
3,103,650✔
2617
          SLastCol lastCol = {
3,102,180✔
2618
              .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2619
          TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
3,102,678✔
2620

2621
          tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64
3,103,168✔
2622
                    " from memtable(last) and memtable(newer), col_id:%d col_flag:%d ts:%" PRId64,
2623
                    TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2624
                    lastCol.colVal.flag, rowKey.key.ts);
2625

2626
          tsdbCacheFreeSLastColItem(pTargetCol);
3,103,168✔
2627
          taosArraySet(pLastArray, jCol, &lastCol);
3,101,653✔
2628
        }
2629
      } else {
2630
        if (!iColHash) {
200,565✔
2631
          iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
107,711✔
2632
          if (iColHash == NULL) {
107,711✔
2633
            TAOS_CHECK_EXIT(terrno);
×
2634
          }
2635
        }
2636

2637
        if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
200,565✔
2638
          TAOS_CHECK_EXIT(terrno);
×
2639
        }
2640
      }
2641
    }
2642

2643
    ++jCol;
3,859,081✔
2644

2645
    if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) {
3,859,081✔
2646
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
2,420,135✔
2647
    }
2648
  }
2649
  tsdbRowClose(&rowIter);
1,438,075✔
2650

2651
  if (iColHash && tSimpleHashGetSize(iColHash) > 0) {
1,438,565✔
2652
    pRow = memRowIterGet(&iter, false, NULL, 0);
107,711✔
2653
    while (pRow) {
200,828✔
2654
      if (tSimpleHashGetSize(iColHash) == 0) {
139,572✔
2655
        break;
46,455✔
2656
      }
2657

2658
      sversion = TSDBROW_SVERSION(pRow);
93,117✔
2659
      if (sversion != -1) {
93,117✔
2660
        TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
93,117✔
2661

2662
        pTSchema = pr->pCurrSchema;
93,117✔
2663
      }
2664
      nCol = pTSchema->numOfCols;
93,117✔
2665

2666
      STsdbRowKey tsdbRowKey = {0};
93,117✔
2667
      tsdbRowGetKey(pRow, &tsdbRowKey);
93,117✔
2668

2669
      TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
93,117✔
2670

2671
      iCol = 0;
93,117✔
2672
      for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
619,045✔
2673
           pColVal = tsdbRowIterNext(&rowIter), iCol++) {
525,928✔
2674
        int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
525,928✔
2675
        if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
525,928✔
2676
          SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol];
135,873✔
2677

2678
          int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key);
135,873✔
2679
          if (cmp_res <= 0) {
135,873✔
2680
            SLastCol lastCol = {
135,873✔
2681
                .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2682
            TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
135,873✔
2683

2684
            tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from memtable(hash), col_id:%d col_flag:%d ts:%" PRId64,
135,873✔
2685
                      TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2686
                      lastCol.colVal.flag, tsdbRowKey.key.ts);
2687

2688
            tsdbCacheFreeSLastColItem(pTargetCol);
135,873✔
2689
            taosArraySet(pLastArray, *pjCol, &lastCol);
135,873✔
2690
          }
2691

2692
          TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
135,873✔
2693
        }
2694
      }
2695
      tsdbRowClose(&rowIter);
93,117✔
2696

2697
      pRow = memRowIterGet(&iter, false, NULL, 0);
93,117✔
2698
    }
2699
  }
2700

2701
_exit:
1,724,643✔
2702
  if (code) {
1,723,655✔
2703
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2704

2705
    tsdbRowClose(&rowIter);
×
2706
  }
2707

2708
  tSimpleHashCleanup(iColHash);
1,723,655✔
2709

2710
  memRowIterClose(&iter);
1,723,165✔
2711

2712
  TAOS_RETURN(code);
1,723,165✔
2713
}
2714

2715
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
1,723,165✔
2716
  int32_t code = 0;
1,723,165✔
2717
  int32_t lino = 0;
1,723,165✔
2718

2719
  tsdbDebug("vgId:%d, %s start, qid:%s uid:%" PRId64 " ltype:%d", TD_VID(pTsdb->pVnode), __func__,
1,723,165✔
2720
            pr && pr->idstr ? pr->idstr : "null", uid, ltype);
2721

2722
  SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
1,723,165✔
2723
  if (!keyArray) {
1,723,165✔
2724
    TAOS_CHECK_EXIT(terrno);
×
2725
  }
2726

2727
  TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
1,723,165✔
2728

2729
  if (tsUpdateCacheBatch) {
1,722,684✔
2730
    TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
1,723,165✔
2731
  }
2732

2733
_exit:
1,723,174✔
2734
  if (code) {
1,723,174✔
2735
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2736
  }
2737

2738
  if (keyArray) {
1,723,165✔
2739
    taosArrayDestroy(keyArray);
1,723,165✔
2740
  }
2741

2742
  TAOS_RETURN(code);
1,723,655✔
2743
}
2744

2745
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
2,515,018✔
2746
  int32_t   code = 0, lino = 0;
2,515,018✔
2747
  STSchema *pTSchema = NULL;
2,515,018✔
2748
  int       sver = -1;
2,515,018✔
2749
  int       numKeys = 0;
2,515,018✔
2750
  SArray   *remainCols = NULL;
2,515,018✔
2751

2752
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
2,515,018✔
2753

2754
  int numCols = pTSchema->numOfCols;
2,515,018✔
2755

2756
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
2,515,018✔
2757

2758
  for (int i = 0; i < numCols; ++i) {
11,295,060✔
2759
    int16_t cid = pTSchema->columns[i].colId;
8,780,042✔
2760
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
26,340,126✔
2761
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
17,560,084✔
2762
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
17,560,084✔
2763
      if (h) {
17,560,084✔
2764
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
87,936✔
2765
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
87,936✔
2766
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
2,496✔
2767
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
2,496✔
2768
                              .dirty = 1,
2769
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2770
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
2,496✔
2771
        }
2772
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
87,936✔
2773
        TAOS_CHECK_EXIT(code);
87,936✔
2774
      } else {
2775
        if (!remainCols) {
17,472,148✔
2776
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
2,503,864✔
2777
        }
2778
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
34,944,296✔
2779
          TAOS_CHECK_EXIT(terrno);
×
2780
        }
2781
      }
2782
    }
2783
  }
2784

2785
  if (remainCols) {
2,515,018✔
2786
    numKeys = TARRAY_SIZE(remainCols);
2,503,864✔
2787
  }
2788

2789
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
2,515,018✔
2790
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
2,515,018✔
2791
  char  **values_list = NULL;
2,515,018✔
2792
  size_t *values_list_sizes = NULL;
2,515,018✔
2793

2794
  if (!keys_list || !keys_list_sizes) {
2,515,018✔
2795
    code = terrno;
×
2796
    goto _exit;
×
2797
  }
2798
  const size_t klen = ROCKS_KEY_LEN;
2,515,018✔
2799

2800
  for (int i = 0; i < numKeys; ++i) {
19,987,166✔
2801
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
17,472,148✔
2802
    if (!key) {
17,472,148✔
2803
      code = terrno;
×
2804
      goto _exit;
×
2805
    }
2806
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
17,472,148✔
2807

2808
    ((SLastKey *)key)[0] = idxKey->key;
17,472,148✔
2809

2810
    keys_list[i] = key;
17,472,148✔
2811
    keys_list_sizes[i] = klen;
17,472,148✔
2812
  }
2813

2814
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
2,515,018✔
2815

2816
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
2,515,018✔
2817
                                              &values_list, &values_list_sizes),
2818
                  NULL, _exit);
2819

2820
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
2821
  for (int i = 0; i < numKeys; ++i) {
19,987,166✔
2822
    SLastCol *pLastCol = NULL;
17,472,148✔
2823
    if (values_list[i] != NULL) {
17,472,148✔
2824
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
2825
      if (code != TSDB_CODE_SUCCESS) {
×
2826
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2827
                  tstrerror(code));
2828
        goto _exit;
×
2829
      }
2830
    }
2831
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
17,472,148✔
2832
    SLastKey *pLastKey = &idxKey->key;
17,472,148✔
2833
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
17,472,148✔
2834
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
×
2835
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
×
2836
                             .dirty = 0,
2837
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2838

2839
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
×
2840
        taosMemoryFreeClear(pLastCol);
×
2841
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2842
        goto _exit;
×
2843
      }
2844
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
×
2845
        taosMemoryFreeClear(pLastCol);
×
2846
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2847
        goto _exit;
×
2848
      }
2849
    }
2850

2851
    if (pLastCol == NULL) {
17,472,148✔
2852
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
17,472,148✔
2853
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2854
    }
2855

2856
    taosMemoryFreeClear(pLastCol);
17,472,148✔
2857
  }
2858

2859
  rocksMayWrite(pTsdb, false);
2,515,018✔
2860

2861
_exit:
2,515,018✔
2862
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
2,515,018✔
2863

2864
  for (int i = 0; i < numKeys; ++i) {
19,987,166✔
2865
    taosMemoryFree(keys_list[i]);
17,472,148✔
2866
  }
2867
  taosMemoryFree(keys_list);
2,515,018✔
2868
  taosMemoryFree(keys_list_sizes);
2,515,018✔
2869
  if (values_list) {
2,515,018✔
2870
#if USE_ROCKSDB
2871
    for (int i = 0; i < numKeys; ++i) {
19,987,166✔
2872
      rocksdb_free(values_list[i]);
17,472,148✔
2873
    }
2874
#endif
2875
    taosMemoryFree(values_list);
2,515,018✔
2876
  }
2877
  taosMemoryFree(values_list_sizes);
2,515,018✔
2878
  taosArrayDestroy(remainCols);
2,515,018✔
2879
  taosMemoryFree(pTSchema);
2,515,018✔
2880

2881
  TAOS_RETURN(code);
2,515,018✔
2882
}
2883

2884
int32_t tsdbOpenCache(STsdb *pTsdb) {
5,013,051✔
2885
  int32_t code = 0, lino = 0;
5,013,051✔
2886
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
5,014,031✔
2887
  int32_t numShardBits = pTsdb->pVnode->config.cacheLastShardBits;
5,014,031✔
2888

2889
  // Use configured shard bits, or -1 to auto-calculate based on cache size
2890
  // This enables multi-shard LRU cache for better concurrency
2891
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, numShardBits, .5);
5,014,031✔
2892
  if (pCache == NULL) {
5,014,031✔
2893
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2894
  }
2895

2896
#ifdef USE_SHARED_STORAGE
2897
  if (tsSsEnabled) {
5,014,031✔
2898
    TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
24,780✔
2899
    TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
24,780✔
2900
  }
2901
#endif
2902

2903
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
5,014,031✔
2904

2905
  taosLRUCacheSetStrictCapacity(pCache, false);
5,013,385✔
2906

2907
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
5,013,385✔
2908

2909
  pTsdb->lruCache = pCache;
5,012,584✔
2910

2911
  tsdbInfo("vgId:%d, lruCache opened with capacity:%zu bytes, numShards:%d (configured:%d)",
5,012,584✔
2912
           TD_VID(pTsdb->pVnode), cfgCapacity, taosLRUCacheGetNumShards(pCache), numShardBits);
2913
           
2914
  TAOS_RETURN(0);
5,013,230✔
2915

2916
_err:
×
2917
  if (code) {
×
2918
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2919
    if (pCache) {
×
2920
      taosLRUCacheCleanup(pCache);
×
2921
      pCache = NULL;
×
2922
    }
2923
  }
2924

2925
  pTsdb->lruCache = pCache;
×
2926
  TAOS_RETURN(code);
×
2927
}
2928

2929
void tsdbCloseCache(STsdb *pTsdb) {
5,013,769✔
2930
  SLRUCache *pCache = pTsdb->lruCache;
5,013,769✔
2931
  if (pCache) {
5,013,900✔
2932
    taosLRUCacheEraseUnrefEntries(pCache);
5,013,554✔
2933

2934
    taosLRUCacheCleanup(pCache);
5,013,900✔
2935

2936
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
5,014,031✔
2937
  }
2938

2939
#ifdef USE_SHARED_STORAGE
2940
  if (tsSsEnabled) {
5,014,031✔
2941
    tsdbCloseBCache(pTsdb);
24,780✔
2942
    tsdbClosePgCache(pTsdb);
24,780✔
2943
  }
2944
#endif
2945

2946
  tsdbCloseRocksCache(pTsdb);
5,014,031✔
2947
}
5,014,031✔
2948

2949
// Rebuild only the last cache (lruCache) with a new shard count.
2950
// Must be called with pTsdb->lruMutex held by the caller.
2951
int32_t tsdbRebuildLastCache(STsdb *pTsdb, int32_t numShardBits) {
1,652✔
2952
  size_t     cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
1,652✔
2953
  SLRUCache *pNewCache = taosLRUCacheInit(cfgCapacity, numShardBits, .5);
1,652✔
2954
  if (pNewCache == NULL) {
1,652✔
2955
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
2956
  }
2957
  taosLRUCacheSetStrictCapacity(pNewCache, false);
1,652✔
2958

2959
  // Swap in the new cache and clean up the old one
2960
  SLRUCache *pOldCache = pTsdb->lruCache;
1,652✔
2961
  pTsdb->lruCache = pNewCache;
1,652✔
2962

2963
  if (pOldCache) {
1,652✔
2964
    int64_t start = taosGetTimestampMs();
1,652✔
2965
    taosLRUCacheEraseUnrefEntries(pOldCache);
1,652✔
2966
    taosLRUCacheCleanup(pOldCache);
1,652✔
2967
    int64_t end = taosGetTimestampMs();
1,652✔
2968
    tsdbInfo("vgId:%d, lruCache erase unref entries and cleanup time:%" PRId64 " ms", TD_VID(pTsdb->pVnode),
1,652✔
2969
             end - start);
2970
  }
2971

2972
  tsdbInfo("vgId:%d, lruCache rebuilt with capacity:%zu bytes, numShards:%d (configured:%d)", TD_VID(pTsdb->pVnode),
1,652✔
2973
           cfgCapacity, taosLRUCacheGetNumShards(pNewCache), numShardBits);
2974

2975
  TAOS_RETURN(0);
1,652✔
2976
}
2977

2978
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
2979
  if (cacheType == 0) {  // last_row
×
2980
    *(uint64_t *)key = (uint64_t)uid;
×
2981
  } else {  // last
2982
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2983
  }
2984

2985
  *len = sizeof(uint64_t);
×
2986
}
×
2987

2988
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
2989
  tb_uid_t suid = 0;
×
2990

2991
  SMetaReader mr = {0};
×
2992
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
2993
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
2994
    metaReaderClear(&mr);  // table not esist
×
2995
    return 0;
×
2996
  }
2997

2998
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
2999
    suid = mr.me.ctbEntry.suid;
×
3000
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
3001
    suid = 0;
×
3002
  } else {
3003
    suid = 0;
×
3004
  }
3005

3006
  metaReaderClear(&mr);
×
3007

3008
  return suid;
×
3009
}
3010

3011
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
3012
  int32_t code = 0;
×
3013

3014
  if (pDelIdx) {
×
3015
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
3016
  }
3017

3018
  TAOS_RETURN(code);
×
3019
}
3020

3021
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
3022
  int32_t   code = 0;
×
3023
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
3024

3025
  for (; pDelData; pDelData = pDelData->pNext) {
×
3026
    if (!taosArrayPush(aDelData, pDelData)) {
×
3027
      TAOS_RETURN(terrno);
×
3028
    }
3029
  }
3030

3031
  TAOS_RETURN(code);
×
3032
}
3033

3034
static uint64_t *getUidList(SCacheRowsReader *pReader) {
17,944✔
3035
  if (!pReader->uidList) {
17,944✔
3036
    int32_t numOfTables = pReader->numOfTables;
4,732✔
3037

3038
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
4,732✔
3039
    if (!pReader->uidList) {
4,732✔
3040
      return NULL;
×
3041
    }
3042

3043
    for (int32_t i = 0; i < numOfTables; ++i) {
18,516✔
3044
      uint64_t uid = pReader->pTableList[i].uid;
13,784✔
3045
      pReader->uidList[i] = uid;
13,784✔
3046
    }
3047

3048
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
4,732✔
3049
  }
3050

3051
  return pReader->uidList;
17,944✔
3052
}
3053

3054
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
17,944✔
3055
                               bool isFile) {
3056
  int32_t   code = 0;
17,944✔
3057
  int32_t   numOfTables = pReader->numOfTables;
17,944✔
3058
  int64_t   suid = pReader->info.suid;
17,944✔
3059
  uint64_t *uidList = getUidList(pReader);
17,944✔
3060

3061
  if (!uidList) {
17,944✔
3062
    TAOS_RETURN(terrno);
×
3063
  }
3064

3065
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
25,912✔
3066
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
7,968✔
3067
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
7,968✔
3068
      continue;
×
3069
    }
3070

3071
    if (pTombBlk->minTbid.suid > suid ||
7,968✔
3072
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
7,968✔
3073
      break;
3074
    }
3075

3076
    STombBlock block = {0};
7,968✔
3077
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
7,968✔
3078
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
7,968✔
3079
    if (code != TSDB_CODE_SUCCESS) {
7,968✔
3080
      TAOS_RETURN(code);
×
3081
    }
3082

3083
    uint64_t        uid = uidList[j];
7,968✔
3084
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
7,968✔
3085
    if (!pInfo) {
7,968✔
3086
      tTombBlockDestroy(&block);
×
3087
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
3088
    }
3089

3090
    if (pInfo->pTombData == NULL) {
7,968✔
3091
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
996✔
3092
    }
3093

3094
    STombRecord record = {0};
7,968✔
3095
    bool        finished = false;
7,968✔
3096
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
15,936✔
3097
      code = tTombBlockGet(&block, k, &record);
7,968✔
3098
      if (code != TSDB_CODE_SUCCESS) {
7,968✔
3099
        finished = true;
×
3100
        break;
×
3101
      }
3102

3103
      if (record.suid < suid) {
7,968✔
3104
        continue;
×
3105
      }
3106
      if (record.suid > suid) {
7,968✔
3107
        finished = true;
×
3108
        break;
×
3109
      }
3110

3111
      bool newTable = false;
7,968✔
3112
      if (uid < record.uid) {
7,968✔
3113
        while (j < numOfTables && uidList[j] < record.uid) {
47,808✔
3114
          ++j;
39,840✔
3115
          newTable = true;
39,840✔
3116
        }
3117

3118
        if (j >= numOfTables) {
7,968✔
3119
          finished = true;
×
3120
          break;
×
3121
        }
3122

3123
        uid = uidList[j];
7,968✔
3124
      }
3125

3126
      if (record.uid < uid) {
7,968✔
3127
        continue;
×
3128
      }
3129

3130
      if (newTable) {
7,968✔
3131
        pInfo = getTableLoadInfo(pReader, uid);
7,968✔
3132
        if (!pInfo) {
7,968✔
3133
          code = TSDB_CODE_OUT_OF_MEMORY;
×
3134
          finished = true;
×
3135
          break;
×
3136
        }
3137
        if (pInfo->pTombData == NULL) {
7,968✔
3138
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
996✔
3139
          if (!pInfo->pTombData) {
996✔
3140
            code = terrno;
×
3141
            finished = true;
×
3142
            break;
×
3143
          }
3144
        }
3145
      }
3146

3147
      if (record.version <= pReader->info.verRange.maxVer) {
7,968✔
3148
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
3149
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
3150

3151
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
7,968✔
3152
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
15,936✔
3153
          TAOS_RETURN(terrno);
×
3154
        }
3155
      }
3156
    }
3157

3158
    tTombBlockDestroy(&block);
7,968✔
3159

3160
    if (finished) {
7,968✔
3161
      TAOS_RETURN(code);
×
3162
    }
3163
  }
3164

3165
  TAOS_RETURN(TSDB_CODE_SUCCESS);
17,944✔
3166
}
3167

3168
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
7,968✔
3169
  const TTombBlkArray *pBlkArray = NULL;
7,968✔
3170

3171
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
7,968✔
3172

3173
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
7,968✔
3174
}
3175

3176
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
9,976✔
3177
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
9,976✔
3178
  const TTombBlkArray *pBlkArray = NULL;
9,976✔
3179

3180
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
9,976✔
3181

3182
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
9,976✔
3183
}
3184

3185
typedef struct {
3186
  SMergeTree  mergeTree;
3187
  SMergeTree *pMergeTree;
3188
} SFSLastIter;
3189

3190
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
17,944✔
3191
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
3192
  int32_t code = 0;
17,944✔
3193
  destroySttBlockReader(pr->pLDataIterArray, NULL);
17,944✔
3194
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
17,944✔
3195
  if (pr->pLDataIterArray == NULL) return terrno;
17,944✔
3196

3197
  SMergeTreeConf conf = {
17,944✔
3198
      .uid = uid,
3199
      .suid = suid,
3200
      .pTsdb = pTsdb,
3201
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
3202
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
3203
      .strictTimeRange = false,
3204
      .cacheStatis = false,
3205
      .pSchema = pTSchema,
3206
      .pCurrentFileset = pFileSet,
3207
      .backward = 1,
3208
      .pSttFileBlockIterArray = pr->pLDataIterArray,
17,944✔
3209
      .pCols = aCols,
3210
      .numOfCols = nCols,
3211
      .loadTombFn = loadSttTomb,
3212
      .pReader = pr,
3213
      .idstr = pr->idstr,
17,944✔
3214
      .pCurRowKey = &pr->rowKey,
17,944✔
3215
  };
3216

3217
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
17,944✔
3218

3219
  iter->pMergeTree = &iter->mergeTree;
17,944✔
3220

3221
  TAOS_RETURN(code);
17,944✔
3222
}
3223

3224
static int32_t lastIterClose(SFSLastIter **iter) {
996✔
3225
  int32_t code = 0;
996✔
3226

3227
  if ((*iter)->pMergeTree) {
996✔
3228
    tMergeTreeClose((*iter)->pMergeTree);
996✔
3229
    (*iter)->pMergeTree = NULL;
996✔
3230
  }
3231

3232
  *iter = NULL;
996✔
3233

3234
  TAOS_RETURN(code);
996✔
3235
}
3236

3237
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
20,145✔
3238
  bool hasVal = false;
20,145✔
3239
  *ppRow = NULL;
20,145✔
3240

3241
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
20,145✔
3242
  if (code != 0) {
20,145✔
3243
    return code;
×
3244
  }
3245

3246
  if (!hasVal) {
20,145✔
3247
    *ppRow = NULL;
15,928✔
3248
    TAOS_RETURN(code);
15,928✔
3249
  }
3250

3251
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
4,217✔
3252
  TAOS_RETURN(code);
4,217✔
3253
}
3254

3255
typedef enum SFSNEXTROWSTATES {
3256
  SFSNEXTROW_FS,
3257
  SFSNEXTROW_FILESET,
3258
  SFSNEXTROW_INDEXLIST,
3259
  SFSNEXTROW_BRINBLOCK,
3260
  SFSNEXTROW_BRINRECORD,
3261
  SFSNEXTROW_BLOCKDATA,
3262
  SFSNEXTROW_BLOCKROW,
3263
  SFSNEXTROW_NEXTSTTROW
3264
} SFSNEXTROWSTATES;
3265

3266
struct CacheNextRowIter;
3267

3268
typedef struct SFSNextRowIter {
3269
  SFSNEXTROWSTATES         state;         // [input]
3270
  SBlockIdx               *pBlockIdxExp;  // [input]
3271
  STSchema                *pTSchema;      // [input]
3272
  tb_uid_t                 suid;
3273
  tb_uid_t                 uid;
3274
  int32_t                  iFileSet;
3275
  STFileSet               *pFileSet;
3276
  TFileSetArray           *aDFileSet;
3277
  SArray                  *pIndexList;
3278
  int32_t                  iBrinIndex;
3279
  SBrinBlock               brinBlock;
3280
  SBrinBlock              *pBrinBlock;
3281
  int32_t                  iBrinRecord;
3282
  SBrinRecord              brinRecord;
3283
  SBlockData               blockData;
3284
  SBlockData              *pBlockData;
3285
  int32_t                  nRow;
3286
  int32_t                  iRow;
3287
  TSDBROW                  row;
3288
  int64_t                  lastTs;
3289
  SFSLastIter              lastIter;
3290
  SFSLastIter             *pLastIter;
3291
  int8_t                   lastEmpty;
3292
  TSDBROW                 *pLastRow;
3293
  SRow                    *pTSRow;
3294
  SRowMerger               rowMerger;
3295
  SCacheRowsReader        *pr;
3296
  struct CacheNextRowIter *pRowIter;
3297
} SFSNextRowIter;
3298

3299
static void clearLastFileSet(SFSNextRowIter *state);
3300

3301
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
76,413✔
3302
                                int nCols) {
3303
  int32_t         code = 0, lino = 0;
76,413✔
3304
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
76,413✔
3305
  STsdb          *pTsdb = state->pr->pTsdb;
76,413✔
3306

3307
  if (SFSNEXTROW_FS == state->state) {
76,413✔
3308
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
75,208✔
3309

3310
    state->state = SFSNEXTROW_FILESET;
75,208✔
3311
  }
3312

3313
  if (SFSNEXTROW_FILESET == state->state) {
76,413✔
3314
  _next_fileset:
89,144✔
3315
    clearLastFileSet(state);
89,144✔
3316

3317
    if (--state->iFileSet < 0) {
89,144✔
3318
      *ppRow = NULL;
71,200✔
3319

3320
      TAOS_RETURN(code);
71,200✔
3321
    } else {
3322
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
17,944✔
3323
    }
3324

3325
    STFileObj **pFileObj = state->pFileSet->farr;
17,944✔
3326
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
17,944✔
3327
      if (state->pFileSet != state->pr->pCurFileSet) {
7,968✔
3328
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
7,968✔
3329
        const char           *filesName[4] = {0};
7,968✔
3330
        if (pFileObj[0] != NULL) {
7,968✔
3331
          conf.files[0].file = *pFileObj[0]->f;
7,968✔
3332
          conf.files[0].exist = true;
7,968✔
3333
          filesName[0] = pFileObj[0]->fname;
7,968✔
3334

3335
          conf.files[1].file = *pFileObj[1]->f;
7,968✔
3336
          conf.files[1].exist = true;
7,968✔
3337
          filesName[1] = pFileObj[1]->fname;
7,968✔
3338

3339
          conf.files[2].file = *pFileObj[2]->f;
7,968✔
3340
          conf.files[2].exist = true;
7,968✔
3341
          filesName[2] = pFileObj[2]->fname;
7,968✔
3342
        }
3343

3344
        if (pFileObj[3] != NULL) {
7,968✔
3345
          conf.files[3].exist = true;
7,968✔
3346
          conf.files[3].file = *pFileObj[3]->f;
7,968✔
3347
          filesName[3] = pFileObj[3]->fname;
7,968✔
3348
        }
3349

3350
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
7,968✔
3351

3352
        state->pr->pCurFileSet = state->pFileSet;
7,968✔
3353

3354
        code = loadDataTomb(state->pr, state->pr->pFileReader);
7,968✔
3355
        if (code != TSDB_CODE_SUCCESS) {
7,968✔
3356
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3357
                    tstrerror(code));
3358
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3359
        }
3360

3361
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
7,968✔
3362
      }
3363

3364
      if (!state->pIndexList) {
7,968✔
3365
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
7,968✔
3366
        if (!state->pIndexList) {
7,968✔
3367
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3368
        }
3369
      } else {
3370
        taosArrayClear(state->pIndexList);
×
3371
      }
3372

3373
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
7,968✔
3374

3375
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
15,936✔
3376
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
7,968✔
3377
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
7,968✔
3378
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
7,968✔
3379
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
1,992✔
3380
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3381
            }
3382
          }
3383
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
3384
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
3385
          break;
3386
        }
3387
      }
3388

3389
      int indexSize = TARRAY_SIZE(state->pIndexList);
7,968✔
3390
      if (indexSize <= 0) {
7,968✔
3391
        goto _check_stt_data;
6,972✔
3392
      }
3393

3394
      state->state = SFSNEXTROW_INDEXLIST;
996✔
3395
      state->iBrinIndex = 1;
996✔
3396
    }
3397

3398
  _check_stt_data:
17,944✔
3399
    if (state->pFileSet != state->pr->pCurFileSet) {
17,944✔
3400
      state->pr->pCurFileSet = state->pFileSet;
8,936✔
3401
    }
3402

3403
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
17,944✔
3404
                                 state->pr, state->lastTs, aCols, nCols),
3405
                    &lino, _err);
3406

3407
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
17,944✔
3408

3409
    if (!state->pLastRow) {
17,944✔
3410
      state->lastEmpty = 1;
14,208✔
3411

3412
      if (SFSNEXTROW_INDEXLIST != state->state) {
14,208✔
3413
        clearLastFileSet(state);
13,212✔
3414
        goto _next_fileset;
13,212✔
3415
      }
3416
    } else {
3417
      state->lastEmpty = 0;
3,736✔
3418

3419
      if (SFSNEXTROW_INDEXLIST != state->state) {
3,736✔
3420
        state->state = SFSNEXTROW_NEXTSTTROW;
3,736✔
3421

3422
        *ppRow = state->pLastRow;
3,736✔
3423
        state->pLastRow = NULL;
3,736✔
3424

3425
        TAOS_RETURN(code);
3,736✔
3426
      }
3427
    }
3428

3429
    state->pLastIter = &state->lastIter;
996✔
3430
  }
3431

3432
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
2,201✔
3433
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
1,205✔
3434

3435
    if (!state->pLastRow) {
1,205✔
3436
      if (state->pLastIter) {
724✔
3437
        code = lastIterClose(&state->pLastIter);
×
3438
        if (code != TSDB_CODE_SUCCESS) {
×
3439
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3440
                    tstrerror(code));
3441
          TAOS_RETURN(code);
×
3442
        }
3443
      }
3444

3445
      clearLastFileSet(state);
724✔
3446
      state->state = SFSNEXTROW_FILESET;
724✔
3447
      goto _next_fileset;
724✔
3448
    } else {
3449
      *ppRow = state->pLastRow;
481✔
3450
      state->pLastRow = NULL;
481✔
3451

3452
      TAOS_RETURN(code);
481✔
3453
    }
3454
  }
3455

3456
  if (SFSNEXTROW_INDEXLIST == state->state) {
996✔
3457
    SBrinBlk *pBrinBlk = NULL;
996✔
3458
  _next_brinindex:
996✔
3459
    if (--state->iBrinIndex < 0) {
996✔
3460
      if (state->pLastRow) {
×
3461
        state->state = SFSNEXTROW_NEXTSTTROW;
×
3462
        *ppRow = state->pLastRow;
×
3463
        state->pLastRow = NULL;
×
3464
        return code;
×
3465
      }
3466

3467
      clearLastFileSet(state);
×
3468
      goto _next_fileset;
×
3469
    } else {
3470
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
996✔
3471
    }
3472

3473
    if (!state->pBrinBlock) {
996✔
3474
      state->pBrinBlock = &state->brinBlock;
996✔
3475
    } else {
3476
      tBrinBlockClear(&state->brinBlock);
×
3477
    }
3478

3479
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
996✔
3480

3481
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
996✔
3482
    state->state = SFSNEXTROW_BRINBLOCK;
996✔
3483
  }
3484

3485
  if (SFSNEXTROW_BRINBLOCK == state->state) {
996✔
3486
  _next_brinrecord:
996✔
3487
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
996✔
3488
      tBrinBlockClear(&state->brinBlock);
×
3489
      goto _next_brinindex;
×
3490
    }
3491

3492
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
996✔
3493

3494
    SBrinRecord *pRecord = &state->brinRecord;
996✔
3495
    if (pRecord->uid != state->uid) {
996✔
3496
      // TODO: goto next brin block early
3497
      --state->iBrinRecord;
×
3498
      goto _next_brinrecord;
×
3499
    }
3500

3501
    state->state = SFSNEXTROW_BRINRECORD;
996✔
3502
  }
3503

3504
  if (SFSNEXTROW_BRINRECORD == state->state) {
996✔
3505
    SBrinRecord *pRecord = &state->brinRecord;
996✔
3506

3507
    if (!state->pBlockData) {
996✔
3508
      state->pBlockData = &state->blockData;
996✔
3509

3510
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
996✔
3511
    } else {
3512
      tBlockDataReset(state->pBlockData);
×
3513
    }
3514

3515
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
996✔
3516
      --nCols;
996✔
3517
      ++aCols;
996✔
3518
    }
3519

3520
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
996✔
3521
                                                      state->pTSchema, aCols, nCols),
3522
                    &lino, _err);
3523

3524
    state->nRow = state->blockData.nRow;
996✔
3525
    state->iRow = state->nRow - 1;
996✔
3526

3527
    state->state = SFSNEXTROW_BLOCKROW;
996✔
3528
  }
3529

3530
  if (SFSNEXTROW_BLOCKROW == state->state) {
996✔
3531
    if (state->iRow < 0) {
996✔
3532
      --state->iBrinRecord;
×
3533
      goto _next_brinrecord;
×
3534
    }
3535

3536
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
996✔
3537
    if (!state->pLastIter) {
996✔
3538
      *ppRow = &state->row;
×
3539
      --state->iRow;
×
3540
      return code;
×
3541
    }
3542

3543
    if (!state->pLastRow) {
996✔
3544
      // get next row from fslast and process with fs row, --state->Row if select fs row
3545
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
996✔
3546
    }
3547

3548
    if (!state->pLastRow) {
996✔
3549
      if (state->pLastIter) {
996✔
3550
        code = lastIterClose(&state->pLastIter);
996✔
3551
        if (code != TSDB_CODE_SUCCESS) {
996✔
3552
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3553
                    tstrerror(code));
3554
          TAOS_RETURN(code);
×
3555
        }
3556
      }
3557

3558
      *ppRow = &state->row;
996✔
3559
      --state->iRow;
996✔
3560
      return code;
996✔
3561
    }
3562

3563
    // process state->pLastRow & state->row
3564
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
3565
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
3566
    if (lastRowTs > rowTs) {
×
3567
      *ppRow = state->pLastRow;
×
3568
      state->pLastRow = NULL;
×
3569

3570
      TAOS_RETURN(code);
×
3571
    } else if (lastRowTs < rowTs) {
×
3572
      *ppRow = &state->row;
×
3573
      --state->iRow;
×
3574

3575
      TAOS_RETURN(code);
×
3576
    } else {
3577
      // TODO: merge rows and *ppRow = mergedRow
3578
      SRowMerger *pMerger = &state->rowMerger;
×
3579
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
3580
      if (code != TSDB_CODE_SUCCESS) {
×
3581
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3582
                  tstrerror(code));
3583
        TAOS_RETURN(code);
×
3584
      }
3585

3586
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
3587
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
3588

3589
      if (state->pTSRow) {
×
3590
        taosMemoryFree(state->pTSRow);
×
3591
        state->pTSRow = NULL;
×
3592
      }
3593

3594
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
3595

3596
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
3597
      *ppRow = &state->row;
×
3598
      --state->iRow;
×
3599

3600
      tsdbRowMergerClear(pMerger);
×
3601

3602
      TAOS_RETURN(code);
×
3603
    }
3604
  }
3605

3606
_err:
×
3607
  clearLastFileSet(state);
×
3608

3609
  *ppRow = NULL;
×
3610

3611
  if (code) {
×
3612
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3613
              tstrerror(code));
3614
  }
3615

3616
  TAOS_RETURN(code);
×
3617
}
3618

3619
typedef struct CacheNextRowIter {
3620
  SArray           *pMemDelData;
3621
  SArray           *pSkyline;
3622
  int64_t           iSkyline;
3623
  SBlockIdx         idx;
3624
  SMemNextRowIter   memState;
3625
  SMemNextRowIter   imemState;
3626
  SFSNextRowIter    fsState;
3627
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
3628
  TsdbNextRowState  input[3];
3629
  SCacheRowsReader *pr;
3630
  STsdb            *pTsdb;
3631
} CacheNextRowIter;
3632

3633
int32_t clearNextRowFromFS(void *iter) {
75,208✔
3634
  int32_t code = 0;
75,208✔
3635

3636
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
75,208✔
3637
  if (!state) {
75,208✔
3638
    TAOS_RETURN(code);
×
3639
  }
3640

3641
  if (state->pLastIter) {
75,208✔
3642
    code = lastIterClose(&state->pLastIter);
×
3643
    if (code != TSDB_CODE_SUCCESS) {
×
3644
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3645
      TAOS_RETURN(code);
×
3646
    }
3647
  }
3648

3649
  if (state->pBlockData) {
75,208✔
3650
    tBlockDataDestroy(state->pBlockData);
996✔
3651
    state->pBlockData = NULL;
996✔
3652
  }
3653

3654
  if (state->pBrinBlock) {
75,208✔
3655
    tBrinBlockDestroy(state->pBrinBlock);
996✔
3656
    state->pBrinBlock = NULL;
996✔
3657
  }
3658

3659
  if (state->pIndexList) {
75,208✔
3660
    taosArrayDestroy(state->pIndexList);
7,968✔
3661
    state->pIndexList = NULL;
7,968✔
3662
  }
3663

3664
  if (state->pTSRow) {
75,208✔
3665
    taosMemoryFree(state->pTSRow);
×
3666
    state->pTSRow = NULL;
×
3667
  }
3668

3669
  if (state->pRowIter->pSkyline) {
75,208✔
3670
    taosArrayDestroy(state->pRowIter->pSkyline);
71,070✔
3671
    state->pRowIter->pSkyline = NULL;
71,070✔
3672
  }
3673

3674
  TAOS_RETURN(code);
75,208✔
3675
}
3676

3677
static void clearLastFileSet(SFSNextRowIter *state) {
103,080✔
3678
  if (state->pLastIter) {
103,080✔
3679
    int code = lastIterClose(&state->pLastIter);
×
3680
    if (code != TSDB_CODE_SUCCESS) {
×
3681
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3682
      return;
×
3683
    }
3684
  }
3685

3686
  if (state->pBlockData) {
103,080✔
3687
    tBlockDataDestroy(state->pBlockData);
×
3688
    state->pBlockData = NULL;
×
3689
  }
3690

3691
  if (state->pr->pFileReader) {
103,080✔
3692
    tsdbDataFileReaderClose(&state->pr->pFileReader);
7,968✔
3693
    state->pr->pFileReader = NULL;
7,968✔
3694

3695
    state->pr->pCurFileSet = NULL;
7,968✔
3696
  }
3697

3698
  if (state->pTSRow) {
103,080✔
3699
    taosMemoryFree(state->pTSRow);
×
3700
    state->pTSRow = NULL;
×
3701
  }
3702

3703
  if (state->pRowIter->pSkyline) {
103,080✔
3704
    taosArrayDestroy(state->pRowIter->pSkyline);
724✔
3705
    state->pRowIter->pSkyline = NULL;
724✔
3706

3707
    void   *pe = NULL;
724✔
3708
    int32_t iter = 0;
724✔
3709
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
1,448✔
3710
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
724✔
3711
      taosArrayDestroy(pInfo->pTombData);
724✔
3712
      pInfo->pTombData = NULL;
724✔
3713
    }
3714
  }
3715
}
3716

3717
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
75,208✔
3718
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
3719
                               SCacheRowsReader *pr) {
3720
  int32_t code = 0, lino = 0;
75,208✔
3721

3722
  STbData *pMem = NULL;
75,208✔
3723
  if (pReadSnap->pMem) {
75,208✔
3724
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
75,208✔
3725
  }
3726

3727
  STbData *pIMem = NULL;
75,208✔
3728
  if (pReadSnap->pIMem) {
75,208✔
3729
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
3730
  }
3731

3732
  pIter->pTsdb = pTsdb;
75,208✔
3733

3734
  pIter->pMemDelData = NULL;
75,208✔
3735

3736
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
75,208✔
3737

3738
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
75,208✔
3739

3740
  pIter->fsState.pRowIter = pIter;
75,208✔
3741
  pIter->fsState.state = SFSNEXTROW_FS;
75,208✔
3742
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
75,208✔
3743
  pIter->fsState.pBlockIdxExp = &pIter->idx;
75,208✔
3744
  pIter->fsState.pTSchema = pTSchema;
75,208✔
3745
  pIter->fsState.suid = suid;
75,208✔
3746
  pIter->fsState.uid = uid;
75,208✔
3747
  pIter->fsState.lastTs = lastTs;
75,208✔
3748
  pIter->fsState.pr = pr;
75,208✔
3749

3750
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
75,208✔
3751
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
75,208✔
3752
  pIter->input[2] =
75,208✔
3753
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
75,208✔
3754

3755
  if (pMem) {
75,208✔
3756
    pIter->memState.pMem = pMem;
67,652✔
3757
    pIter->memState.state = SMEMNEXTROW_ENTER;
67,652✔
3758
    pIter->memState.lastTs = lastTs;
67,652✔
3759
    pIter->input[0].stop = false;
67,652✔
3760
    pIter->input[0].next = true;
67,652✔
3761
  }
3762

3763
  if (pIMem) {
75,208✔
3764
    pIter->imemState.pMem = pIMem;
×
3765
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
3766
    pIter->imemState.lastTs = lastTs;
×
3767
    pIter->input[1].stop = false;
×
3768
    pIter->input[1].next = true;
×
3769
  }
3770

3771
  pIter->pr = pr;
75,208✔
3772

3773
_err:
75,208✔
3774
  TAOS_RETURN(code);
75,208✔
3775
}
3776

3777
static void nextRowIterClose(CacheNextRowIter *pIter) {
75,208✔
3778
  for (int i = 0; i < 3; ++i) {
300,832✔
3779
    if (pIter->input[i].nextRowClearFn) {
225,624✔
3780
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
75,208✔
3781
    }
3782
  }
3783

3784
  if (pIter->pSkyline) {
75,208✔
3785
    taosArrayDestroy(pIter->pSkyline);
×
3786
  }
3787

3788
  if (pIter->pMemDelData) {
75,208✔
3789
    taosArrayDestroy(pIter->pMemDelData);
75,208✔
3790
  }
3791
}
75,208✔
3792

3793
// iterate next row non deleted backward ts, version (from high to low)
3794
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
81,865✔
3795
                              int16_t *aCols, int nCols) {
3796
  int32_t code = 0, lino = 0;
81,865✔
3797

3798
  for (;;) {
624✔
3799
    for (int i = 0; i < 3; ++i) {
329,956✔
3800
      if (pIter->input[i].next && !pIter->input[i].stop) {
247,467✔
3801
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
150,141✔
3802
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3803
                        &lino, _err);
3804

3805
        if (pIter->input[i].pRow == NULL) {
150,141✔
3806
          pIter->input[i].stop = true;
74,786✔
3807
          pIter->input[i].next = false;
74,786✔
3808
        }
3809
      }
3810
    }
3811

3812
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
82,489✔
3813
      *ppRow = NULL;
7,624✔
3814
      *pIgnoreEarlierTs =
15,248✔
3815
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
7,624✔
3816

3817
      TAOS_RETURN(code);
7,624✔
3818
    }
3819

3820
    // select maxpoint(s) from mem, imem, fs and last
3821
    TSDBROW *max[4] = {0};
74,865✔
3822
    int      iMax[4] = {-1, -1, -1, -1};
74,865✔
3823
    int      nMax = 0;
74,865✔
3824
    SRowKey  maxKey = {.ts = TSKEY_MIN};
74,865✔
3825

3826
    for (int i = 0; i < 3; ++i) {
299,460✔
3827
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
224,595✔
3828
        STsdbRowKey tsdbRowKey = {0};
76,079✔
3829
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
76,079✔
3830

3831
        // merging & deduplicating on client side
3832
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
76,079✔
3833
        if (c <= 0) {
76,079✔
3834
          if (c < 0) {
75,979✔
3835
            nMax = 0;
75,979✔
3836
            maxKey = tsdbRowKey.key;
75,979✔
3837
          }
3838

3839
          iMax[nMax] = i;
75,979✔
3840
          max[nMax++] = pIter->input[i].pRow;
75,979✔
3841
        }
3842
        pIter->input[i].next = false;
76,079✔
3843
      }
3844
    }
3845

3846
    // delete detection
3847
    TSDBROW *merge[4] = {0};
74,865✔
3848
    int      iMerge[4] = {-1, -1, -1, -1};
74,865✔
3849
    int      nMerge = 0;
74,865✔
3850
    for (int i = 0; i < nMax; ++i) {
149,730✔
3851
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
74,865✔
3852

3853
      if (!pIter->pSkyline) {
74,865✔
3854
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
71,794✔
3855
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
71,794✔
3856

3857
        uint64_t        uid = pIter->idx.uid;
71,794✔
3858
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
71,794✔
3859
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
71,794✔
3860

3861
        if (pInfo->pTombData == NULL) {
71,794✔
3862
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
69,802✔
3863
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
69,802✔
3864
        }
3865

3866
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
71,794✔
3867
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3868
        }
3869

3870
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
71,794✔
3871
        if (delSize > 0) {
71,794✔
3872
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
2,244✔
3873
          TAOS_CHECK_GOTO(code, &lino, _err);
2,244✔
3874
        }
3875
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
71,794✔
3876
      }
3877

3878
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
74,865✔
3879
      if (!deleted) {
74,865✔
3880
        iMerge[nMerge] = iMax[i];
74,241✔
3881
        merge[nMerge++] = max[i];
74,241✔
3882
      }
3883

3884
      pIter->input[iMax[i]].next = deleted;
74,865✔
3885
    }
3886

3887
    if (nMerge > 0) {
74,865✔
3888
      pIter->input[iMerge[0]].next = true;
74,241✔
3889

3890
      *ppRow = merge[0];
74,241✔
3891

3892
      TAOS_RETURN(code);
74,241✔
3893
    }
3894
  }
3895

3896
_err:
×
3897
  if (code) {
×
3898
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3899
  }
3900

3901
  TAOS_RETURN(code);
×
3902
}
3903

3904
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
75,208✔
3905
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
75,208✔
3906
  if (NULL == pColArray) {
75,208✔
3907
    TAOS_RETURN(terrno);
×
3908
  }
3909

3910
  for (int32_t i = 0; i < nCols; ++i) {
275,686✔
3911
    int16_t  slotId = slotIds[i];
200,478✔
3912
    SLastCol col = {.rowKey.ts = 0,
200,478✔
3913
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
200,478✔
3914
    if (!taosArrayPush(pColArray, &col)) {
200,478✔
3915
      TAOS_RETURN(terrno);
×
3916
    }
3917
  }
3918
  *ppColArray = pColArray;
75,208✔
3919

3920
  TAOS_RETURN(TSDB_CODE_SUCCESS);
75,208✔
3921
}
3922

3923
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
66,650✔
3924
                            int nCols, int16_t *slotIds) {
3925
  int32_t   code = 0, lino = 0;
66,650✔
3926
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
66,650✔
3927
  int16_t   nLastCol = nCols;
66,650✔
3928
  int16_t   noneCol = 0;
66,650✔
3929
  bool      setNoneCol = false;
66,650✔
3930
  bool      hasRow = false;
66,650✔
3931
  bool      ignoreEarlierTs = false;
66,650✔
3932
  SArray   *pColArray = NULL;
66,650✔
3933
  SColVal  *pColVal = &(SColVal){0};
66,650✔
3934

3935
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
66,650✔
3936

3937
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
66,650✔
3938
  if (NULL == aColArray) {
66,650✔
UNCOV
3939
    taosArrayDestroy(pColArray);
×
3940

3941
    TAOS_RETURN(terrno);
×
3942
  }
3943

3944
  for (int i = 0; i < nCols; ++i) {
228,758✔
3945
    if (!taosArrayPush(aColArray, &aCols[i])) {
324,216✔
3946
      taosArrayDestroy(pColArray);
×
3947

3948
      TAOS_RETURN(terrno);
×
3949
    }
3950
  }
3951

3952
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
66,650✔
3953

3954
  // inverse iterator
3955
  CacheNextRowIter iter = {0};
66,650✔
3956
  code =
3957
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
66,650✔
3958
  TAOS_CHECK_GOTO(code, &lino, _err);
66,650✔
3959

3960
  do {
3961
    TSDBROW *pRow = NULL;
73,307✔
3962
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
73,307✔
3963

3964
    if (!pRow) {
73,307✔
3965
      break;
6,645✔
3966
    }
3967

3968
    hasRow = true;
66,662✔
3969

3970
    int32_t sversion = TSDBROW_SVERSION(pRow);
66,662✔
3971
    if (sversion != -1) {
66,662✔
3972
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
63,052✔
3973

3974
      pTSchema = pr->pCurrSchema;
63,052✔
3975
    }
3976
    // int16_t nCol = pTSchema->numOfCols;
3977

3978
    STsdbRowKey rowKey = {0};
66,662✔
3979
    tsdbRowGetKey(pRow, &rowKey);
66,662✔
3980

3981
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
66,662✔
3982
      lastRowKey = rowKey;
63,591✔
3983

3984
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
217,606✔
3985
        if (iCol >= nLastCol) {
154,015✔
3986
          break;
×
3987
        }
3988
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
154,015✔
3989
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
154,015✔
3990
          if (!setNoneCol) {
100✔
3991
            noneCol = iCol;
×
3992
            setNoneCol = true;
×
3993
          }
3994
          continue;
100✔
3995
        }
3996
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
153,915✔
3997
          continue;
×
3998
        }
3999
        if (slotIds[iCol] == 0) {
153,915✔
4000
          STColumn *pTColumn = &pTSchema->columns[0];
63,591✔
4001
          SValue    val = {.type = pTColumn->type};
63,591✔
4002
          VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
63,591✔
4003
          *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
63,591✔
4004

4005
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
63,591✔
4006
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
63,591✔
4007

4008
          taosArraySet(pColArray, 0, &colTmp);
63,591✔
4009
          continue;
63,591✔
4010
        }
4011
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
90,324✔
4012

4013
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
90,324✔
4014
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
90,324✔
4015

4016
        if (!COL_VAL_IS_VALUE(pColVal)) {
90,324✔
4017
          if (!setNoneCol) {
9,030✔
4018
            noneCol = iCol;
5,561✔
4019
            setNoneCol = true;
5,561✔
4020
          }
4021
        } else {
4022
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
81,294✔
4023
          if (aColIndex >= 0) {
81,294✔
4024
            taosArrayRemove(aColArray, aColIndex);
81,294✔
4025
          }
4026
        }
4027
      }
4028
      if (!setNoneCol) {
63,591✔
4029
        // done, goto return pColArray
4030
        break;
58,030✔
4031
      } else {
4032
        continue;
5,561✔
4033
      }
4034
    }
4035

4036
    // merge into pColArray
4037
    setNoneCol = false;
3,071✔
4038
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
10,707✔
4039
      if (iCol >= nLastCol) {
7,636✔
4040
        break;
×
4041
      }
4042
      // high version's column value
4043
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
7,636✔
4044
        continue;
100✔
4045
      }
4046

4047
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
7,536✔
4048
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
7,536✔
4049
        continue;
×
4050
      }
4051
      SColVal *tColVal = &lastColVal->colVal;
7,536✔
4052
      if (COL_VAL_IS_VALUE(tColVal)) continue;
7,536✔
4053

4054
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
5,046✔
4055
      if (COL_VAL_IS_VALUE(pColVal)) {
5,046✔
4056
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
3,950✔
4057
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
3,950✔
4058

4059
        tsdbCacheFreeSLastColItem(lastColVal);
3,950✔
4060
        taosArraySet(pColArray, iCol, &lastCol);
3,950✔
4061
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
3,950✔
4062
        if (aColIndex >= 0) {
3,950✔
4063
          taosArrayRemove(aColArray, aColIndex);
3,950✔
4064
        }
4065
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
1,096✔
4066
        noneCol = iCol;
1,096✔
4067
        setNoneCol = true;
1,096✔
4068
      }
4069
    }
4070
  } while (setNoneCol);
8,632✔
4071

4072
  if (!hasRow) {
66,650✔
4073
    if (ignoreEarlierTs) {
3,059✔
4074
      taosArrayDestroy(pColArray);
×
4075
      pColArray = NULL;
×
4076
    } else {
4077
      taosArrayClear(pColArray);
3,059✔
4078
    }
4079
  }
4080
  *ppLastArray = pColArray;
66,650✔
4081

4082
  nextRowIterClose(&iter);
66,650✔
4083
  taosArrayDestroy(aColArray);
66,650✔
4084

4085
  TAOS_RETURN(code);
66,650✔
4086

4087
_err:
×
4088
  nextRowIterClose(&iter);
×
4089
  // taosMemoryFreeClear(pTSchema);
4090
  *ppLastArray = NULL;
×
4091
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
4092
  taosArrayDestroy(aColArray);
×
4093

4094
  if (code) {
×
4095
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4096
              tstrerror(code));
4097
  }
4098

4099
  TAOS_RETURN(code);
×
4100
}
4101

4102
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
8,558✔
4103
                               int nCols, int16_t *slotIds) {
4104
  int32_t   code = 0, lino = 0;
8,558✔
4105
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
8,558✔
4106
  int16_t   nLastCol = nCols;
8,558✔
4107
  int16_t   noneCol = 0;
8,558✔
4108
  bool      setNoneCol = false;
8,558✔
4109
  bool      hasRow = false;
8,558✔
4110
  bool      ignoreEarlierTs = false;
8,558✔
4111
  SArray   *pColArray = NULL;
8,558✔
4112
  SColVal  *pColVal = &(SColVal){0};
8,558✔
4113

4114
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
8,558✔
4115

4116
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
8,558✔
4117
  if (NULL == aColArray) {
8,558✔
4118
    taosArrayDestroy(pColArray);
×
4119

4120
    TAOS_RETURN(terrno);
×
4121
  }
4122

4123
  for (int i = 0; i < nCols; ++i) {
46,928✔
4124
    if (!taosArrayPush(aColArray, &aCols[i])) {
76,740✔
4125
      taosArrayDestroy(pColArray);
×
4126

4127
      TAOS_RETURN(terrno);
×
4128
    }
4129
  }
4130

4131
  // inverse iterator
4132
  CacheNextRowIter iter = {0};
8,558✔
4133
  code =
4134
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
8,558✔
4135
  TAOS_CHECK_GOTO(code, &lino, _err);
8,558✔
4136

4137
  do {
4138
    TSDBROW *pRow = NULL;
8,558✔
4139
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
8,558✔
4140

4141
    if (!pRow) {
8,558✔
4142
      break;
979✔
4143
    }
4144

4145
    hasRow = true;
7,579✔
4146

4147
    int32_t sversion = TSDBROW_SVERSION(pRow);
7,579✔
4148
    if (sversion != -1) {
7,579✔
4149
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
6,600✔
4150

4151
      pTSchema = pr->pCurrSchema;
6,600✔
4152
    }
4153
    // int16_t nCol = pTSchema->numOfCols;
4154

4155
    STsdbRowKey rowKey = {0};
7,579✔
4156
    tsdbRowGetKey(pRow, &rowKey);
7,579✔
4157

4158
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
42,016✔
4159
      if (iCol >= nLastCol) {
34,437✔
4160
        break;
×
4161
      }
4162
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
34,437✔
4163
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
34,437✔
4164
        continue;
×
4165
      }
4166
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
34,437✔
4167
        continue;
×
4168
      }
4169
      if (slotIds[iCol] == 0) {
34,437✔
4170
        STColumn *pTColumn = &pTSchema->columns[0];
7,579✔
4171
        SValue    val = {.type = pTColumn->type};
7,579✔
4172
        VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
7,579✔
4173
        *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
7,579✔
4174

4175
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
7,579✔
4176
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
7,579✔
4177

4178
        taosArraySet(pColArray, 0, &colTmp);
7,579✔
4179
        continue;
7,579✔
4180
      }
4181
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
26,858✔
4182

4183
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
26,858✔
4184
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
26,858✔
4185

4186
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
26,858✔
4187
      if (aColIndex >= 0) {
26,858✔
4188
        taosArrayRemove(aColArray, aColIndex);
26,858✔
4189
      }
4190
    }
4191

4192
    break;
7,579✔
4193
  } while (1);
4194

4195
  if (!hasRow) {
8,558✔
4196
    if (ignoreEarlierTs) {
979✔
4197
      taosArrayDestroy(pColArray);
×
4198
      pColArray = NULL;
×
4199
    } else {
4200
      taosArrayClear(pColArray);
979✔
4201
    }
4202
  }
4203
  *ppLastArray = pColArray;
8,558✔
4204

4205
  nextRowIterClose(&iter);
8,558✔
4206
  taosArrayDestroy(aColArray);
8,558✔
4207

4208
  TAOS_RETURN(code);
8,558✔
4209

4210
_err:
×
4211
  nextRowIterClose(&iter);
×
4212

4213
  *ppLastArray = NULL;
×
4214
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
4215
  taosArrayDestroy(aColArray);
×
4216

4217
  if (code) {
×
4218
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4219
              tstrerror(code));
4220
  }
4221

4222
  TAOS_RETURN(code);
×
4223
}
4224

4225
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
12,755,092✔
4226

4227
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
15,374✔
4228
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
15,374✔
4229
}
15,374✔
4230

4231
#ifdef BUILD_NO_CALL
4232
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
4233
#endif
4234

4235
size_t tsdbCacheGetUsage(SVnode *pVnode) {
176,075,193✔
4236
  size_t usage = 0;
176,075,193✔
4237
  if (pVnode->pTsdb != NULL) {
176,075,193✔
4238
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
176,075,193✔
4239
  }
4240

4241
  return usage;
176,075,193✔
4242
}
4243

4244
int32_t tsdbCacheGetElems(SVnode *pVnode) {
176,075,193✔
4245
  int32_t elems = 0;
176,075,193✔
4246
  if (pVnode->pTsdb != NULL) {
176,075,193✔
4247
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
176,075,193✔
4248
  }
4249

4250
  return elems;
176,075,193✔
4251
}
4252

4253
#ifdef USE_SHARED_STORAGE
4254
// block cache
4255
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
12,760,048✔
4256
  struct {
4257
    int32_t fid;
4258
    int64_t commitID;
4259
    int64_t blkno;
4260
  } bKey = {0};
12,760,048✔
4261

4262
  bKey.fid = fid;
12,760,048✔
4263
  bKey.commitID = commitID;
12,760,048✔
4264
  bKey.blkno = blkno;
12,760,048✔
4265

4266
  *len = sizeof(bKey);
12,760,048✔
4267
  memcpy(key, &bKey, *len);
12,760,048✔
4268
}
12,760,048✔
4269

4270
static int32_t tsdbCacheLoadBlockSs(STsdbFD *pFD, uint8_t **ppBlock) {
×
4271
  int32_t code = 0;
×
4272

4273
  int64_t block_size = tsSsBlockSize * pFD->szPage;
×
4274
  int64_t block_offset = (pFD->blkno - 1) * block_size;
×
4275

4276
  char *buf = taosMemoryMalloc(block_size);
×
4277
  if (buf == NULL) {
×
4278
    code = TSDB_CODE_OUT_OF_MEMORY;
×
4279
    goto _exit;
×
4280
  }
4281

4282
  // TODO: pFD->objName is not initialized, but this function is never called.
4283
  code = tssReadFileFromDefault(pFD->objName, block_offset, buf, &block_size);
×
4284
  if (code != TSDB_CODE_SUCCESS) {
×
4285
    taosMemoryFree(buf);
×
4286
    goto _exit;
×
4287
  }
4288
  *ppBlock = buf;
×
4289

4290
_exit:
×
4291
  return code;
×
4292
}
4293

4294
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
10,491,026✔
4295
  (void)ud;
4296
  uint8_t *pBlock = (uint8_t *)value;
10,491,026✔
4297

4298
  taosMemoryFree(pBlock);
10,491,026✔
4299
}
10,491,026✔
4300

4301
int32_t tsdbCacheGetBlockSs(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
4302
  int32_t code = 0;
×
4303
  char    key[128] = {0};
×
4304
  int     keyLen = 0;
×
4305

4306
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
4307
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
4308
  if (!h) {
×
4309
    STsdb *pTsdb = pFD->pTsdb;
×
4310
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
4311

4312
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
4313
    if (!h) {
×
4314
      uint8_t *pBlock = NULL;
×
4315
      code = tsdbCacheLoadBlockSs(pFD, &pBlock);
×
4316
      //  if table's empty or error, return code of -1
4317
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
4318
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4319

4320
        *handle = NULL;
×
4321
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
4322
          code = TSDB_CODE_OUT_OF_MEMORY;
×
4323
        }
4324

4325
        TAOS_RETURN(code);
×
4326
      }
4327

4328
      size_t              charge = tsSsBlockSize * pFD->szPage;
×
4329
      _taos_lru_deleter_t deleter = deleteBCache;
×
4330
      LRUStatus           status =
4331
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
4332
      if (status != TAOS_LRU_STATUS_OK) {
4333
        // code = -1;
4334
      }
4335
    }
4336

4337
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4338
  }
4339

4340
  *handle = h;
×
4341

4342
  TAOS_RETURN(code);
×
4343
}
4344

4345
int32_t tsdbCacheGetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
2,269,022✔
4346
  if (!tsSsEnabled) {
2,269,022✔
4347
    return TSDB_CODE_OPS_NOT_SUPPORT;
×
4348
  }
4349

4350
  int32_t code = 0;
2,269,022✔
4351
  char    key[128] = {0};
2,269,022✔
4352
  int     keyLen = 0;
2,269,022✔
4353

4354
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
2,269,022✔
4355
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
2,269,022✔
4356

4357
  return code;
2,269,022✔
4358
}
4359

4360
void tsdbCacheSetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
10,491,026✔
4361
  if (!tsSsEnabled) {
10,491,026✔
4362
    return;
×
4363
  }
4364

4365
  char       key[128] = {0};
10,491,026✔
4366
  int        keyLen = 0;
10,491,026✔
4367
  LRUHandle *handle = NULL;
10,491,026✔
4368

4369
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
10,491,026✔
4370
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
10,491,026✔
4371
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
10,491,026✔
4372
  if (!handle) {
10,491,026✔
4373
    size_t              charge = pFD->szPage;
10,491,026✔
4374
    _taos_lru_deleter_t deleter = deleteBCache;
10,491,026✔
4375
    uint8_t            *pPg = taosMemoryMalloc(charge);
10,491,026✔
4376
    if (!pPg) {
10,491,026✔
4377
      (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4378

4379
      return;  // ignore error with ss cache and leave error untouched
×
4380
    }
4381
    memcpy(pPg, pPage, charge);
10,491,026✔
4382

4383
    LRUStatus status =
4384
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
10,491,026✔
4385
    if (status != TAOS_LRU_STATUS_OK) {
4386
      // ignore cache updating if not ok
4387
      // code = TSDB_CODE_OUT_OF_MEMORY;
4388
    }
4389
  }
4390
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
10,491,026✔
4391

4392
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
10,491,026✔
4393
}
4394
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc