• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4506

15 Jul 2025 12:33AM UTC coverage: 62.026% (-0.7%) from 62.706%
#4506

push

travis-ci

web-flow
docs: update stream docs (#31874)

155391 of 320094 branches covered (48.55%)

Branch coverage included in aggregate %.

240721 of 318525 relevant lines covered (75.57%)

6529048.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.87
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tss.h"
17
#include "tsdb.h"
18
#include "tsdbDataFileRW.h"
19
#include "tsdbIter.h"
20
#include "tsdbReadUtil.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
122,867✔
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
122,867✔
27
    tsdbTrace(" release lru cache failed");
33,300✔
28
  }
29
}
123,045✔
30

31
#ifdef USE_SHARED_STORAGE
32

33
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
×
34
  int32_t code = 0, lino = 0;
×
35
  int32_t    szPage = pTsdb->pVnode->config.tsdbPageSize;
×
36
  int64_t    szBlock = tsSsBlockSize <= 1024 ? 1024 : tsSsBlockSize;
×
37

38
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsBlockCacheSize * szBlock * szPage, 0, .5);
×
39
  if (pCache == NULL) {
×
40
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
41
  }
42

43
  taosLRUCacheSetStrictCapacity(pCache, false);
×
44

45
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
×
46

47
  pTsdb->bCache = pCache;
×
48

49
_err:
×
50
  if (code) {
×
51
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
52
              tstrerror(code));
53
  }
54
  
55
  TAOS_RETURN(code);
×
56
}
57

58
static void tsdbCloseBCache(STsdb *pTsdb) {
×
59
  SLRUCache *pCache = pTsdb->bCache;
×
60
  if (pCache) {
×
61
    int32_t elems = taosLRUCacheGetElems(pCache);
×
62
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
×
63
    taosLRUCacheEraseUnrefEntries(pCache);
×
64
    elems = taosLRUCacheGetElems(pCache);
×
65
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
×
66

67
    taosLRUCacheCleanup(pCache);
×
68

69
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
×
70
  }
71
}
×
72

73
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
×
74
  int32_t code = 0, lino = 0;
×
75
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
×
76

77
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsPageCacheSize * szPage, 0, .5);
×
78
  if (pCache == NULL) {
×
79
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
80
  }
81

82
  taosLRUCacheSetStrictCapacity(pCache, false);
×
83

84
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
×
85

86
  pTsdb->pgCache = pCache;
×
87

88
_err:
×
89
  if (code) {
×
90
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
91
  }
92
  
93
  TAOS_RETURN(code);
×
94
}
95

96
static void tsdbClosePgCache(STsdb *pTsdb) {
×
97
  SLRUCache *pCache = pTsdb->pgCache;
×
98
  if (pCache) {
×
99
    int32_t elems = taosLRUCacheGetElems(pCache);
×
100
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
×
101
    taosLRUCacheEraseUnrefEntries(pCache);
×
102
    elems = taosLRUCacheGetElems(pCache);
×
103
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
×
104

105
    taosLRUCacheCleanup(pCache);
×
106

107
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
×
108
  }
109
}
×
110

111
#endif // USE_SHARED_STORAGE
112

113
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
114

115
enum {
116
  LFLAG_LAST_ROW = 0,
117
  LFLAG_LAST = 1,
118
};
119

120
typedef struct {
121
  tb_uid_t uid;
122
  int16_t  cid;
123
  int8_t   lflag;
124
} SLastKey;
125

126
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
127
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
128

129
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
14,403✔
130
  SVnode *pVnode = pTsdb->pVnode;
14,403✔
131
  vnodeGetPrimaryPath(pVnode, false, path, TSDB_FILENAME_LEN);
14,403✔
132

133
  int32_t offset = strlen(path);
14,568✔
134
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%scache.rdb", TD_DIRSEP, pTsdb->name, TD_DIRSEP);
14,568✔
135
}
14,568✔
136

137
static const char *myCmpName(void *state) {
76,943✔
138
  (void)state;
139
  return "myCmp";
76,943✔
140
}
141

142
static void myCmpDestroy(void *state) { (void)state; }
14,585✔
143

144
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
8,311,884✔
145
  (void)state;
146
  (void)alen;
147
  (void)blen;
148
  SLastKey *lhs = (SLastKey *)a;
8,311,884✔
149
  SLastKey *rhs = (SLastKey *)b;
8,311,884✔
150

151
  if (lhs->uid < rhs->uid) {
8,311,884✔
152
    return -1;
5,452,237✔
153
  } else if (lhs->uid > rhs->uid) {
2,859,647✔
154
    return 1;
430,322✔
155
  }
156

157
  if (lhs->cid < rhs->cid) {
2,429,325✔
158
    return -1;
1,629,913✔
159
  } else if (lhs->cid > rhs->cid) {
799,412✔
160
    return 1;
276,612✔
161
  }
162

163
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
522,800✔
164
    return -1;
419,326✔
165
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
103,474!
166
    return 1;
239,148✔
167
  }
168

169
  return 0;
×
170
}
171

172
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
14,570✔
173
  int32_t code = 0, lino = 0;
14,570✔
174
#ifdef USE_ROCKSDB
175
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
14,570✔
176
  if (NULL == cmp) {
14,599!
177
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
178
  }
179

180
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
14,599✔
181
  pTsdb->rCache.tableoptions = tableoptions;
14,601✔
182

183
  rocksdb_options_t *options = rocksdb_options_create();
14,601✔
184
  if (NULL == options) {
14,566!
185
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
186
  }
187

188
  rocksdb_options_set_create_if_missing(options, 1);
14,566✔
189
  rocksdb_options_set_comparator(options, cmp);
14,520✔
190
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
14,523✔
191
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
14,503✔
192
  // rocksdb_options_set_inplace_update_support(options, 1);
193
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
194

195
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
14,442✔
196
  if (NULL == writeoptions) {
14,450!
197
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
198
  }
199
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
14,450✔
200

201
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
14,493✔
202
  if (NULL == readoptions) {
14,455!
203
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
204
  }
205

206
  char *err = NULL;
14,455✔
207
  char  cachePath[TSDB_FILENAME_LEN] = {0};
14,455✔
208
  tsdbGetRocksPath(pTsdb, cachePath);
14,455✔
209

210
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
14,559✔
211
  if (NULL == db) {
14,601!
212
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
213
    rocksdb_free(err);
×
214

215
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
216
  }
217

218
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
14,601✔
219
  if (NULL == flushoptions) {
14,600!
220
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
221
  }
222

223
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
14,600✔
224

225
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
14,603!
226

227
  pTsdb->rCache.writebatch = writebatch;
14,599✔
228
  pTsdb->rCache.my_comparator = cmp;
14,599✔
229
  pTsdb->rCache.options = options;
14,599✔
230
  pTsdb->rCache.writeoptions = writeoptions;
14,599✔
231
  pTsdb->rCache.readoptions = readoptions;
14,599✔
232
  pTsdb->rCache.flushoptions = flushoptions;
14,599✔
233
  pTsdb->rCache.db = db;
14,599✔
234
  pTsdb->rCache.sver = -1;
14,599✔
235
  pTsdb->rCache.suid = -1;
14,599✔
236
  pTsdb->rCache.uid = -1;
14,599✔
237
  pTsdb->rCache.pTSchema = NULL;
14,599✔
238
  pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
14,599✔
239
  if (!pTsdb->rCache.ctxArray) {
14,602!
240
    TAOS_CHECK_GOTO(terrno, &lino, _err7);
×
241
  }
242

243
  TAOS_RETURN(code);
14,602✔
244

245
_err7:
×
246
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
×
247
_err6:
×
248
  rocksdb_writebatch_destroy(writebatch);
×
249
_err5:
×
250
  rocksdb_close(pTsdb->rCache.db);
×
251
_err4:
×
252
  rocksdb_readoptions_destroy(readoptions);
×
253
_err3:
×
254
  rocksdb_writeoptions_destroy(writeoptions);
×
255
_err2:
×
256
  rocksdb_options_destroy(options);
×
257
  rocksdb_block_based_options_destroy(tableoptions);
×
258
_err:
×
259
  rocksdb_comparator_destroy(cmp);
×
260
#endif
261
  TAOS_RETURN(code);
×
262
}
263

264
static void tsdbCloseRocksCache(STsdb *pTsdb) {
14,609✔
265
#ifdef USE_ROCKSDB
266
  rocksdb_close(pTsdb->rCache.db);
14,609✔
267
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
14,603✔
268
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
14,595✔
269
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
14,582✔
270
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
14,602✔
271
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
14,578✔
272
  rocksdb_options_destroy(pTsdb->rCache.options);
14,566✔
273
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
14,604✔
274
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
14,600✔
275
  taosMemoryFree(pTsdb->rCache.pTSchema);
14,583!
276
  taosArrayDestroy(pTsdb->rCache.ctxArray);
14,578✔
277
#endif
278
}
14,598✔
279

280
static void rocksMayWrite(STsdb *pTsdb, bool force) {
134,561✔
281
#ifdef USE_ROCKSDB
282
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
134,561✔
283

284
  int count = rocksdb_writebatch_count(wb);
134,561✔
285
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
134,551✔
286
    char *err = NULL;
2,249✔
287

288
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
2,249✔
289
    if (NULL != err) {
2,247!
290
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
291
                err);
292
      rocksdb_free(err);
×
293
    }
294

295
    rocksdb_writebatch_clear(wb);
2,247✔
296
  }
297
#endif
298
}
134,548✔
299

300
typedef struct {
301
  TSKEY  ts;
302
  int8_t dirty;
303
  struct {
304
    int16_t cid;
305
    int8_t  type;
306
    int8_t  flag;
307
    union {
308
      int64_t val;
309
      struct {
310
        uint32_t nData;
311
        uint8_t *pData;
312
      };
313
    } value;
314
  } colVal;
315
} SLastColV0;
316

317
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
1,206✔
318
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
1,206✔
319

320
  pLastCol->rowKey.ts = pLastColV0->ts;
1,206✔
321
  pLastCol->rowKey.numOfPKs = 0;
1,206✔
322
  pLastCol->dirty = pLastColV0->dirty;
1,206✔
323
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
1,206✔
324
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
1,206✔
325
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
1,206✔
326

327
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
1,206✔
328

329
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
1,206!
330
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
16✔
331
    pLastCol->colVal.value.pData = NULL;
16✔
332
    if (pLastCol->colVal.value.nData > 0) {
16✔
333
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
10✔
334
    }
335
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
16✔
336
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
1,190✔
337
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
300✔
338
    pLastCol->colVal.value.pData = (uint8_t*)(&pLastColV0[1]);
300✔
339
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
300✔
340
  } else {
341
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
890✔
342
    return sizeof(SLastColV0);
890✔
343
  }
344
}
345

346
static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
1,206✔
347
  if (!value) {
1,206!
348
    return TSDB_CODE_INVALID_PARA;
×
349
  }
350

351
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
1,206!
352
  if (NULL == pLastCol) {
1,206!
353
    return terrno;
×
354
  }
355

356
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
1,206✔
357
  if (offset == size) {
1,206!
358
    // version 0
359
    *ppLastCol = pLastCol;
×
360

361
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
362
  } else if (offset > size) {
1,206!
363
    taosMemoryFreeClear(pLastCol);
×
364

365
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
366
  }
367

368
  // version
369
  int8_t version = *(int8_t *)(value + offset);
1,206✔
370
  offset += sizeof(int8_t);
1,206✔
371

372
  // numOfPKs
373
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
1,206✔
374
  offset += sizeof(uint8_t);
1,206✔
375

376
  // pks
377
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
1,206!
378
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
×
379
    offset += sizeof(SValue);
×
380

381
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
382
      pLastCol->rowKey.pks[i].pData = NULL;
×
383
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
384
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
385
        offset += pLastCol->rowKey.pks[i].nData;
×
386
      }
387
    }
388
  }
389

390
  if (version >= LAST_COL_VERSION_2) {
1,206!
391
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
1,206✔
392
  }
393

394
  if (offset > size) {
1,206!
395
    taosMemoryFreeClear(pLastCol);
×
396

397
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
398
  }
399

400
  *ppLastCol = pLastCol;
1,206✔
401

402
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,206✔
403
}
404

405
/*
406
typedef struct {
407
  SLastColV0 lastColV0;
408
  char       colData[];
409
  int8_t     version;
410
  uint8_t    numOfPKs;
411
  SValue     pks[0];
412
  char       pk0Data[];
413
  SValue     pks[1];
414
  char       pk1Data[];
415
  ...
416
} SLastColDisk;
417
*/
418
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
135,061✔
419
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
135,061✔
420

421
  pLastColV0->ts = pLastCol->rowKey.ts;
135,061✔
422
  pLastColV0->dirty = pLastCol->dirty;
135,061✔
423
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
135,061✔
424
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
135,061✔
425
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
135,061✔
426
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
135,061✔
427
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
8,304✔
428
    if (pLastCol->colVal.value.nData > 0) {
8,304✔
429
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
3,873✔
430
    }
431
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
8,304✔
432
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
126,757✔
433
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
4,521✔
434
    if (pLastCol->colVal.value.nData > 0) {
4,521✔
435
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
2,566✔
436
    }
437
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
4,521✔
438
  } else {
439
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
122,236✔
440
    return sizeof(SLastColV0);
122,236✔
441
  }
442

443
  return 0;
444
}
445

446
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
135,300✔
447
  *size = sizeof(SLastColV0);
135,300✔
448
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
135,300✔
449
    *size += pLastCol->colVal.value.nData;
8,346✔
450
  }
451
  if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
135,300✔
452
    *size += DECIMAL128_BYTES;
4,522✔
453
  }
454
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
135,300✔
455

456
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
159,577✔
457
    *size += sizeof(SValue);
24,277✔
458
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
24,277!
459
      *size += pLastCol->rowKey.pks[i].nData;
8,009✔
460
    }
461
  }
462

463
  *value = taosMemoryMalloc(*size);
135,300✔
464
  if (NULL == *value) {
135,073!
465
    TAOS_RETURN(terrno);
×
466
  }
467

468
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
135,073✔
469

470
  // version
471
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
134,848✔
472
  offset++;
134,848✔
473

474
  // numOfPKs
475
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
134,848✔
476
  offset++;
134,848✔
477

478
  // pks
479
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
159,099✔
480
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
24,251✔
481
    offset += sizeof(SValue);
24,251✔
482
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
24,251!
483
      if (pLastCol->rowKey.pks[i].nData > 0) {
8,002!
484
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
8,003✔
485
      }
486
      offset += pLastCol->rowKey.pks[i].nData;
8,002✔
487
    }
488
  }
489

490
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
134,848✔
491

492
  TAOS_RETURN(TSDB_CODE_SUCCESS);
134,848✔
493
}
494

495
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
496

497
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
182,365✔
498
  SLastCol *pLastCol = (SLastCol *)value;
182,365✔
499

500
  if (pLastCol->dirty) {
182,365✔
501
    STsdb *pTsdb = (STsdb *)ud;
108,138✔
502

503
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
108,138✔
504
    if (code) {
108,553!
505
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
506
      return code;
×
507
    }
508

509
    pLastCol->dirty = 0;
108,553✔
510

511
    rocksMayWrite(pTsdb, false);
108,553✔
512
  }
513

514
  return 0;
182,426✔
515
}
516

517
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
1,093,657✔
518
  bool deleted = false;
1,093,657✔
519
  while (*iSkyline > 0) {
1,093,657✔
520
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
22✔
521
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
22✔
522

523
    if (key->ts > pItemBack->ts) {
22✔
524
      return false;
2✔
525
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
20!
526
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
6!
527
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
528
        return true;
6✔
529
      } else {
530
        if (*iSkyline > 1) {
×
531
          --*iSkyline;
×
532
        } else {
533
          return false;
×
534
        }
535
      }
536
    } else {
537
      if (*iSkyline > 1) {
14!
538
        --*iSkyline;
×
539
      } else {
540
        return false;
14✔
541
      }
542
    }
543
  }
544

545
  return deleted;
1,093,635✔
546
}
547

548
// Get next non-deleted row from imem
549
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
249,773✔
550
  int32_t code = 0;
249,773✔
551

552
  if (tsdbTbDataIterNext(pTbIter)) {
249,773✔
553
    TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
249,025✔
554
    TSDBKEY  rowKey = TSDBROW_KEY(pMemRow);
249,025✔
555
    bool     deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
249,025✔
556
    if (!deleted) {
249,013✔
557
      return pMemRow;
249,008✔
558
    }
559
  }
560

561
  return NULL;
723✔
562
}
563

564
// Get first non-deleted row from imem
565
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
12,853✔
566
                                    int64_t *piSkyline) {
567
  int32_t code = 0;
12,853✔
568

569
  tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
12,853✔
570
  TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
12,830✔
571
  if (pMemRow) {
12,830!
572
    // if non deleted, return the found row.
573
    TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
12,850✔
574
    bool    deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
12,850✔
575
    if (!deleted) {
12,850✔
576
      return pMemRow;
12,845✔
577
    }
578
  } else {
579
    return NULL;
×
580
  }
581

582
  // continue to find the non-deleted first row from imem, using get next row
583
  return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
5✔
584
}
585

586
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
500✔
587
  SRocksCache *pRCache = &pTsdb->rCache;
500✔
588
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
500!
589

590
  if (suid > 0 && suid == pRCache->suid) {
50!
591
    pRCache->sver = -1;
×
592
    pRCache->suid = -1;
×
593
  }
594
  if (suid == 0 && uid == pRCache->uid) {
50!
595
    pRCache->sver = -1;
5✔
596
    pRCache->uid = -1;
5✔
597
  }
598
}
599

600
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
249,704✔
601
  SRocksCache *pRCache = &pTsdb->rCache;
249,704✔
602
  if (pRCache->pTSchema && sver == pRCache->sver) {
249,704✔
603
    if (suid > 0 && suid == pRCache->suid) {
249,468✔
604
      return 0;
245,514✔
605
    }
606
    if (suid == 0 && uid == pRCache->uid) {
3,954✔
607
      return 0;
3,403✔
608
    }
609
  }
610

611
  pRCache->suid = suid;
787✔
612
  pRCache->uid = uid;
787✔
613
  pRCache->sver = sver;
787✔
614
  tDestroyTSchema(pRCache->pTSchema);
787!
615
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema);
787✔
616
}
617

618
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
619

620
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
12,858✔
621
  int32_t     code = 0;
12,858✔
622
  int32_t     lino = 0;
12,858✔
623
  STsdb      *pTsdb = imem->pTsdb;
12,858✔
624
  SArray     *pMemDelData = NULL;
12,858✔
625
  SArray     *pSkyline = NULL;
12,858✔
626
  int64_t     iSkyline = 0;
12,858✔
627
  STbDataIter tbIter = {0};
12,858✔
628
  TSDBROW    *pMemRow = NULL;
12,858✔
629
  STSchema   *pTSchema = NULL;
12,858✔
630
  SSHashObj  *iColHash = NULL;
12,858✔
631
  int32_t     sver;
632
  int32_t     nCol;
633
  SArray     *ctxArray = pTsdb->rCache.ctxArray;
12,858✔
634
  STsdbRowKey tsdbRowKey = {0};
12,858✔
635
  STSDBRowIter iter = {0};
12,858✔
636

637
  STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
12,858✔
638

639
  // load imem tomb data and build skyline
640
  TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
12,896!
641

642
  // tsdbBuildDeleteSkyline
643
  size_t delSize = TARRAY_SIZE(pMemDelData);
12,855✔
644
  if (delSize > 0) {
12,855✔
645
    pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
6✔
646
    if (!pSkyline) {
6!
647
      TAOS_CHECK_EXIT(terrno);
×
648
    }
649

650
    TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline));
6!
651
    iSkyline = taosArrayGetSize(pSkyline) - 1;
6✔
652
  }
653

654
  pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
12,855✔
655
  if (!pMemRow) {
12,850!
656
    goto _exit;
×
657
  }
658

659
  // iter first row to last_row/last col values to ctxArray, and mark last null col ids
660
  sver = TSDBROW_SVERSION(pMemRow);
12,850✔
661
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
12,850!
662
  pTSchema = pTsdb->rCache.pTSchema;
12,834✔
663
  nCol = pTSchema->numOfCols;
12,834✔
664

665
  tsdbRowGetKey(pMemRow, &tsdbRowKey);
12,834✔
666

667
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
12,822!
668

669
  int32_t iCol = 0;
12,825✔
670
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
72,819!
671
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
59,978✔
672
    if (!taosArrayPush(ctxArray, &updateCtx)) {
59,920!
673
      TAOS_CHECK_EXIT(terrno);
×
674
    }
675

676
    if (COL_VAL_IS_VALUE(pColVal)) {
59,920✔
677
      updateCtx.lflag = LFLAG_LAST;
44,436✔
678
      if (!taosArrayPush(ctxArray, &updateCtx)) {
44,504!
679
        TAOS_CHECK_EXIT(terrno);
×
680
      }
681
    } else {
682
      if (!iColHash) {
15,484✔
683
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
778✔
684
        if (iColHash == NULL) {
778!
685
          TAOS_CHECK_EXIT(terrno);
×
686
        }
687
      }
688

689
      if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
15,484!
690
        TAOS_CHECK_EXIT(terrno);
×
691
      }
692
    }
693
  }
694
  tsdbRowClose(&iter);
12,802✔
695

696
  // continue to get next row to fill null last col values
697
  pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
12,855✔
698
  while (pMemRow) {
249,696✔
699
    if (tSimpleHashGetSize(iColHash) == 0) {
248,981✔
700
      break;
12,118✔
701
    }
702

703
    sver = TSDBROW_SVERSION(pMemRow);
236,856!
704
    TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
236,856!
705
    pTSchema = pTsdb->rCache.pTSchema;
236,838✔
706

707
    STsdbRowKey tsdbRowKey = {0};
236,838✔
708
    tsdbRowGetKey(pMemRow, &tsdbRowKey);
236,838✔
709

710
    TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
236,893!
711

712
    int32_t iCol = 0;
236,951✔
713
    for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
3,343,561!
714
      if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
3,108,440✔
715
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
9,250✔
716
        if (!taosArrayPush(ctxArray, &updateCtx)) {
9,251!
717
          TAOS_CHECK_EXIT(terrno);
×
718
        }
719

720
        TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
9,251!
721
      }
722
    }
723
    tsdbRowClose(&iter);
235,366✔
724

725
    pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
236,929✔
726
  }
727

728
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
12,833!
729

730
_exit:
12,884✔
731
  if (code) {
12,884!
732
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
733

734
    tsdbRowClose(&iter);
×
735
  }
736

737
  taosArrayClear(ctxArray);
12,884✔
738
  // destroy any allocated resource
739
  tSimpleHashCleanup(iColHash);
12,881✔
740
  if (pMemDelData) {
12,872!
741
    taosArrayDestroy(pMemDelData);
12,872✔
742
  }
743
  if (pSkyline) {
12,881✔
744
    taosArrayDestroy(pSkyline);
6✔
745
  }
746

747
  TAOS_RETURN(code);
12,881✔
748
}
749

750
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
368✔
751
  if (!pTsdb) return 0;
368!
752
  if (!pTsdb->imem) return 0;
368✔
753

754
  int32_t    code = 0;
305✔
755
  int32_t    lino = 0;
305✔
756
  SMemTable *imem = pTsdb->imem;
305✔
757
  int32_t    nTbData = imem->nTbData;
305✔
758
  int64_t    nRow = imem->nRow;
305✔
759
  int64_t    nDel = imem->nDel;
305✔
760

761
  if (nRow == 0 || nTbData == 0) return 0;
305!
762

763
  TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
305!
764

765
_exit:
305✔
766
  if (code) {
305!
767
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
768
  } else {
769
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
305!
770
  }
771

772
  TAOS_RETURN(code);
305✔
773
}
774

775
int32_t tsdbCacheCommit(STsdb *pTsdb) {
368✔
776
  int32_t code = 0;
368✔
777

778
  // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
779
  // flush dirty data of lru into rocks
780
  // 4, and update when writing if !updateCacheBatch
781
  // 5, merge cache & mem if updateCacheBatch
782

783
  if (tsUpdateCacheBatch) {
368!
784
    code = tsdbCacheUpdateFromIMem(pTsdb);
368✔
785
    if (code) {
368!
786
      tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
787

788
      TAOS_RETURN(code);
×
789
    }
790
  }
791

792
  char                 *err = NULL;
368✔
793
  SLRUCache            *pCache = pTsdb->lruCache;
368✔
794
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
795

796
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
368✔
797

798
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
368✔
799

800
#ifdef USE_ROCKSDB
801
  rocksMayWrite(pTsdb, true);
368✔
802
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
368✔
803
#endif
804
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
368✔
805
#ifdef USE_ROCKSDB
806
  if (NULL != err) {
368!
807
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
808
    rocksdb_free(err);
×
809
    code = TSDB_CODE_FAILED;
×
810
  }
811
#endif
812
  TAOS_RETURN(code);
368✔
813
}
814

815
static int32_t reallocVarDataVal(SValue *pValue) {
34,442✔
816
  if (IS_VAR_DATA_TYPE(pValue->type)) {
34,442!
817
    uint8_t *pVal = pValue->pData;
34,445✔
818
    uint32_t nData = pValue->nData;
34,445✔
819
    if (nData > 0) {
34,445✔
820
      uint8_t *p = taosMemoryMalloc(nData);
28,038!
821
      if (!p) {
28,028!
822
        TAOS_RETURN(terrno);
×
823
      }
824
      pValue->pData = p;
28,028✔
825
      (void)memcpy(pValue->pData, pVal, nData);
28,028✔
826
    } else {
827
      pValue->pData = NULL;
6,407✔
828
    }
829
  }
830

831
  TAOS_RETURN(TSDB_CODE_SUCCESS);
34,432✔
832
}
833

834
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
17,300✔
835

836
// realloc pk data and col data.
837
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
264,230✔
838
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
264,230✔
839
  size_t  charge = sizeof(SLastCol);
264,230✔
840

841
  int8_t i = 0;
264,230✔
842
  for (; i < pCol->rowKey.numOfPKs; i++) {
318,020✔
843
    SValue *pValue = &pCol->rowKey.pks[i];
53,805✔
844
    if (IS_VAR_DATA_TYPE(pValue->type)) {
53,805!
845
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
17,149!
846
      charge += pValue->nData;
17,134✔
847
    }
848
  }
849

850
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
264,215!
851
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
17,349!
852
    charge += pCol->colVal.value.nData;
17,299✔
853
  }
854

855
  if (pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
264,165✔
856
    if (pCol->colVal.value.nData > 0) {
6,144✔
857
      void *p = taosMemoryMalloc(pCol->colVal.value.nData);
3,026!
858
      if (!p) TAOS_CHECK_EXIT(terrno);
3,026!
859
      (void)memcpy(p, pCol->colVal.value.pData, pCol->colVal.value.nData);
3,026✔
860
      pCol->colVal.value.pData = p;
3,026✔
861
    }
862
    charge += pCol->colVal.value.nData;
6,144✔
863
  }
864

865
  if (pCharge) {
264,165✔
866
    *pCharge = charge;
209,405✔
867
  }
868

869
_exit:
54,760✔
870
  if (TSDB_CODE_SUCCESS != code) {
264,165!
871
    for (int8_t j = 0; j < i; j++) {
×
872
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
873
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
874
      }
875
    }
876

877
    (void)memset(pCol, 0, sizeof(SLastCol));
×
878
  }
879

880
  TAOS_RETURN(code);
264,165✔
881
}
882

883
void tsdbCacheFreeSLastColItem(void *pItem) {
61,995✔
884
  SLastCol *pCol = (SLastCol *)pItem;
61,995✔
885
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
95,823✔
886
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
33,812!
887
      taosMemoryFree(pCol->rowKey.pks[i].pData);
10,590!
888
    }
889
  }
890

891
  if ((IS_VAR_DATA_TYPE(pCol->colVal.value.type) || pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) &&
62,011!
892
      pCol->colVal.value.pData) {
10,230✔
893
    taosMemoryFree(pCol->colVal.value.pData);
8,892!
894
  }
895
}
62,015✔
896

897
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
208,636✔
898
  SLastCol *pLastCol = (SLastCol *)value;
208,636✔
899

900
  if (pLastCol->dirty) {
208,636✔
901
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
5,090!
902
      STsdb *pTsdb = (STsdb *)ud;
×
903
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
904
    }
905
  }
906

907
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
232,444✔
908
    SValue *pValue = &pLastCol->rowKey.pks[i];
23,893✔
909
    if (IS_VAR_DATA_TYPE(pValue->type)) {
23,893!
910
      taosMemoryFree(pValue->pData);
7,790!
911
    }
912
  }
913

914
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) ||
208,551!
915
      pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL /* && pLastCol->colVal.value.nData > 0*/) {
199,591✔
916
    taosMemoryFree(pLastCol->colVal.value.pData);
14,504!
917
  }
918

919
  taosMemoryFree(value);
208,667✔
920
}
209,327✔
921

922
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
88,168✔
923
  SLastCol *pLastCol = (SLastCol *)value;
88,168✔
924
  pLastCol->dirty = 0;
88,168✔
925
}
88,168✔
926

927
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
928

929
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
93,316✔
930
  int32_t code = 0, lino = 0;
93,316✔
931

932
  SLRUCache            *pCache = pTsdb->lruCache;
93,316✔
933
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
934
  SRowKey               emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
93,316✔
935
  SLastCol              emptyCol = {
93,316✔
936
                   .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
937

938
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
93,316✔
939
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
93,316✔
940
  if (code) {
93,376!
941
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
942
  }
943

944
  TAOS_RETURN(code);
93,376✔
945
}
946

947
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
2,103✔
948
  int32_t code = 0;
2,103✔
949
  char   *err = NULL;
2,103✔
950

951
  SLRUCache            *pCache = pTsdb->lruCache;
2,103✔
952
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
953

954
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
2,103✔
955
#ifdef USE_ROCKSDB
956
  rocksMayWrite(pTsdb, true);
2,104✔
957
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
2,103✔
958
  if (NULL != err) {
2,094!
959
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
960
    rocksdb_free(err);
×
961
    code = TSDB_CODE_FAILED;
×
962
  }
963
#endif
964
  TAOS_RETURN(code);
2,094✔
965
}
966

967
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
29,497✔
968
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
969
#ifdef USE_ROCKSDB
970
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
29,497!
971
  if (!valuesList) return terrno;
29,515!
972
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
29,515!
973
  if (!valuesListSizes) {
29,514!
974
    taosMemoryFreeClear(valuesList);
×
975
    return terrno;
×
976
  }
977
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
29,514!
978
  if (!errs) {
29,512!
979
    taosMemoryFreeClear(valuesList);
×
980
    taosMemoryFreeClear(valuesListSizes);
×
981
    return terrno;
×
982
  }
983
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
29,512✔
984
                    valuesListSizes, errs);
985
  for (size_t i = 0; i < numKeys; ++i) {
177,130✔
986
    rocksdb_free(errs[i]);
147,696✔
987
  }
988
  taosMemoryFreeClear(errs);
29,434!
989

990
  *pppValuesList = valuesList;
29,413✔
991
  *ppValuesListSizes = valuesListSizes;
29,413✔
992
#endif
993
  TAOS_RETURN(TSDB_CODE_SUCCESS);
29,413✔
994
}
995

996
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
18,751✔
997
  int32_t code = 0;
18,751✔
998

999
  // build keys & multi get from rocks
1000
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
18,751!
1001
  if (!keys_list) {
18,761!
1002
    return terrno;
×
1003
  }
1004
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
18,761!
1005
  if (!keys_list_sizes) {
18,771!
1006
    taosMemoryFree(keys_list);
×
1007
    return terrno;
×
1008
  }
1009
  const size_t klen = ROCKS_KEY_LEN;
18,771✔
1010

1011
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
18,771!
1012
  if (!keys) {
18,771!
1013
    taosMemoryFree(keys_list);
×
1014
    taosMemoryFree(keys_list_sizes);
×
1015
    return terrno;
×
1016
  }
1017
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
18,771✔
1018
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
18,771✔
1019

1020
  keys_list[0] = keys;
18,771✔
1021
  keys_list[1] = keys + sizeof(SLastKey);
18,771✔
1022
  keys_list_sizes[0] = klen;
18,771✔
1023
  keys_list_sizes[1] = klen;
18,771✔
1024

1025
  char  **values_list = NULL;
18,771✔
1026
  size_t *values_list_sizes = NULL;
18,771✔
1027

1028
  // was written by caller
1029
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
1030

1031
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
18,771!
1032
                                              &values_list_sizes),
1033
                  NULL, _exit);
1034
#ifdef USE_ROCKSDB
1035
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
18,707✔
1036
#endif
1037
  {
1038
#ifdef USE_ROCKSDB
1039
    SLastCol *pLastCol = NULL;
18,707✔
1040
    if (values_list[0] != NULL) {
18,707✔
1041
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
533✔
1042
      if (code != TSDB_CODE_SUCCESS) {
533!
1043
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1044
                  tstrerror(code));
1045
        goto _exit;
×
1046
      }
1047
      if (NULL != pLastCol) {
533!
1048
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
533✔
1049
      }
1050
      taosMemoryFreeClear(pLastCol);
533!
1051
    }
1052

1053
    pLastCol = NULL;
18,707✔
1054
    if (values_list[1] != NULL) {
18,707✔
1055
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
533✔
1056
      if (code != TSDB_CODE_SUCCESS) {
533!
1057
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1058
                  tstrerror(code));
1059
        goto _exit;
×
1060
      }
1061
      if (NULL != pLastCol) {
533!
1062
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
533✔
1063
      }
1064
      taosMemoryFreeClear(pLastCol);
533!
1065
    }
1066

1067
    rocksdb_free(values_list[0]);
18,707✔
1068
    rocksdb_free(values_list[1]);
18,695✔
1069
#endif
1070

1071
    for (int i = 0; i < 2; i++) {
56,177✔
1072
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
37,402✔
1073
      if (h) {
37,529✔
1074
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
1,066✔
1075
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
1,066✔
1076
      }
1077
    }
1078
  }
1079

1080
_exit:
18,775✔
1081
  taosMemoryFree(keys_list[0]);
18,775!
1082

1083
  taosMemoryFree(keys_list);
18,775!
1084
  taosMemoryFree(keys_list_sizes);
18,770!
1085
  taosMemoryFree(values_list);
18,774!
1086
  taosMemoryFree(values_list_sizes);
18,778!
1087

1088
  TAOS_RETURN(code);
18,772✔
1089
}
1090

1091
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
12,534✔
1092
  int32_t code = 0;
12,534✔
1093

1094
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
12,534✔
1095

1096
  if (suid < 0) {
12,535✔
1097
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
714✔
1098
      int16_t cid = pSchemaRow->pSchema[i].colId;
643✔
1099
      int8_t  col_type = pSchemaRow->pSchema[i].type;
643✔
1100

1101
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
643✔
1102
      if (code != TSDB_CODE_SUCCESS) {
643!
1103
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1104
                  tstrerror(code));
1105
      }
1106
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
643✔
1107
      if (code != TSDB_CODE_SUCCESS) {
643!
1108
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1109
                  tstrerror(code));
1110
      }
1111
    }
1112
  } else {
1113
    STSchema *pTSchema = NULL;
12,464✔
1114
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
12,464✔
1115
    if (code != TSDB_CODE_SUCCESS) {
12,464!
1116
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1117

1118
      TAOS_RETURN(code);
×
1119
    }
1120

1121
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
57,745✔
1122
      int16_t cid = pTSchema->columns[i].colId;
45,282✔
1123
      int8_t  col_type = pTSchema->columns[i].type;
45,282✔
1124

1125
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
45,282✔
1126
      if (code != TSDB_CODE_SUCCESS) {
45,289!
1127
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1128
                  tstrerror(code));
1129
      }
1130
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
45,289✔
1131
      if (code != TSDB_CODE_SUCCESS) {
45,281!
1132
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1133
                  tstrerror(code));
1134
      }
1135
    }
1136

1137
    taosMemoryFree(pTSchema);
12,463!
1138
  }
1139

1140
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
12,538✔
1141

1142
  TAOS_RETURN(code);
12,536✔
1143
}
1144

1145
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
76✔
1146
  int32_t code = 0;
76✔
1147

1148
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
76✔
1149

1150
  code = tsdbCacheCommitNoLock(pTsdb);
76✔
1151
  if (code != TSDB_CODE_SUCCESS) {
76!
1152
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1153
              tstrerror(code));
1154
  }
1155

1156
  if (pSchemaRow != NULL) {
76!
1157
    bool hasPrimayKey = false;
×
1158
    int  nCols = pSchemaRow->nCols;
×
1159
    if (nCols >= 2) {
×
1160
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
1161
    }
1162
    for (int i = 0; i < nCols; ++i) {
×
1163
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
1164
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1165

1166
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1167
      if (code != TSDB_CODE_SUCCESS) {
×
1168
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1169
                  tstrerror(code));
1170
      }
1171
    }
1172
  } else {
1173
    STSchema *pTSchema = NULL;
76✔
1174
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
76✔
1175
    if (code != TSDB_CODE_SUCCESS) {
76!
1176
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1177

1178
      TAOS_RETURN(code);
×
1179
    }
1180

1181
    bool hasPrimayKey = false;
76✔
1182
    int  nCols = pTSchema->numOfCols;
76✔
1183
    if (nCols >= 2) {
76!
1184
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
76✔
1185
    }
1186
    for (int i = 0; i < nCols; ++i) {
229✔
1187
      int16_t cid = pTSchema->columns[i].colId;
153✔
1188
      int8_t  col_type = pTSchema->columns[i].type;
153✔
1189

1190
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
153✔
1191
      if (code != TSDB_CODE_SUCCESS) {
153!
1192
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1193
                  tstrerror(code));
1194
      }
1195
    }
1196

1197
    taosMemoryFree(pTSchema);
76!
1198
  }
1199

1200
  rocksMayWrite(pTsdb, false);
76✔
1201

1202
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
76✔
1203

1204
  TAOS_RETURN(code);
76✔
1205
}
1206

1207
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
1,926✔
1208
  int32_t code = 0;
1,926✔
1209

1210
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
1,926✔
1211

1212
  code = tsdbCacheCommitNoLock(pTsdb);
1,936✔
1213
  if (code != TSDB_CODE_SUCCESS) {
1,917!
1214
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1215
              tstrerror(code));
1216
  }
1217

1218
  STSchema *pTSchema = NULL;
1,917✔
1219
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
1,917✔
1220
  if (code != TSDB_CODE_SUCCESS) {
1,932✔
1221
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
5✔
1222

1223
    TAOS_RETURN(code);
6✔
1224
  }
1225

1226
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
4,243✔
1227
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
2,322✔
1228

1229
    bool hasPrimayKey = false;
2,322✔
1230
    int  nCols = pTSchema->numOfCols;
2,322✔
1231
    if (nCols >= 2) {
2,322!
1232
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
2,324✔
1233
    }
1234

1235
    for (int i = 0; i < nCols; ++i) {
20,537✔
1236
      int16_t cid = pTSchema->columns[i].colId;
18,221✔
1237
      int8_t  col_type = pTSchema->columns[i].type;
18,221✔
1238

1239
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
18,221✔
1240
      if (code != TSDB_CODE_SUCCESS) {
18,212!
1241
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1242
                  tstrerror(code));
1243
      }
1244
    }
1245
  }
1246

1247
  taosMemoryFree(pTSchema);
1,921!
1248

1249
  rocksMayWrite(pTsdb, false);
1,929✔
1250

1251
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
1,928✔
1252

1253
  TAOS_RETURN(code);
1,929✔
1254
}
1255

1256
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
×
1257
  int32_t code = 0;
×
1258

1259
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1260

1261
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
1262
  if (code != TSDB_CODE_SUCCESS) {
×
1263
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1264
              tstrerror(code));
1265
  }
1266
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
1267
  if (code != TSDB_CODE_SUCCESS) {
×
1268
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1269
              tstrerror(code));
1270
  }
1271
  // rocksMayWrite(pTsdb, true, false, false);
1272
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1273

1274
  TAOS_RETURN(code);
×
1275
}
1276

1277
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
×
1278
  int32_t code = 0;
×
1279

1280
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1281

1282
  code = tsdbCacheCommitNoLock(pTsdb);
×
1283
  if (code != TSDB_CODE_SUCCESS) {
×
1284
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1285
              tstrerror(code));
1286
  }
1287

1288
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1289
  if (code != TSDB_CODE_SUCCESS) {
×
1290
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1291
              tstrerror(code));
1292
  }
1293

1294
  rocksMayWrite(pTsdb, false);
×
1295

1296
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1297

1298
  TAOS_RETURN(code);
×
1299
}
1300

1301
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
183✔
1302
  int32_t code = 0;
183✔
1303

1304
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
183✔
1305

1306
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
944✔
1307
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
759✔
1308

1309
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
759✔
1310
    if (code != TSDB_CODE_SUCCESS) {
759!
1311
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1312
                tstrerror(code));
1313
    }
1314
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
759✔
1315
    if (code != TSDB_CODE_SUCCESS) {
760!
1316
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1317
                tstrerror(code));
1318
    }
1319
  }
1320

1321
  // rocksMayWrite(pTsdb, true, false, false);
1322
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
185✔
1323
  TAOS_RETURN(code);
184✔
1324
}
1325

1326
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
92✔
1327
  int32_t code = 0;
92✔
1328

1329
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
92✔
1330

1331
  code = tsdbCacheCommitNoLock(pTsdb);
92✔
1332
  if (code != TSDB_CODE_SUCCESS) {
92!
1333
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1334
              tstrerror(code));
1335
  }
1336

1337
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
472✔
1338
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
380✔
1339

1340
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
380✔
1341
    if (code != TSDB_CODE_SUCCESS) {
380!
1342
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1343
                tstrerror(code));
1344
    }
1345
  }
1346

1347
  rocksMayWrite(pTsdb, false);
92✔
1348

1349
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
92✔
1350

1351
  TAOS_RETURN(code);
92✔
1352
}
1353

1354
typedef struct {
1355
  int      idx;
1356
  SLastKey key;
1357
} SIdxKey;
1358

1359
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
×
1360
  // update rowkey
1361
  pLastCol->rowKey.ts = TSKEY_MIN;
×
1362
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
1363
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
1364
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
1365
      taosMemoryFreeClear(pPKValue->pData);
×
1366
      pPKValue->nData = 0;
×
1367
    } else {
1368
      valueClearDatum(pPKValue, pPKValue->type);
×
1369
    }
1370
  }
1371
  pLastCol->rowKey.numOfPKs = 0;
×
1372

1373
  // update colval
1374
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
×
1375
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
1376
    pLastCol->colVal.value.nData = 0;
×
1377
  } else {
1378
    valueClearDatum(&pLastCol->colVal.value, pLastCol->colVal.value.type);
×
1379
  }
1380

1381
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
×
1382
  pLastCol->dirty = 1;
×
1383
  pLastCol->cacheStatus = cacheStatus;
×
1384
}
×
1385

1386
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
135,409✔
1387
  int32_t code = 0;
135,409✔
1388
#ifdef USE_ROCKSDB
1389
  char   *rocks_value = NULL;
135,409✔
1390
  size_t  vlen = 0;
135,409✔
1391

1392
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
135,409✔
1393
  if (code) {
134,792!
1394
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
1395
    TAOS_RETURN(code);
×
1396
  }
1397

1398
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
134,792✔
1399
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
134,792✔
1400
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
135,654✔
1401
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
135,973✔
1402

1403
  taosMemoryFree(rocks_value);
136,411✔
1404
#endif
1405
  TAOS_RETURN(code);
135,944✔
1406
}
1407

1408
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
209,296✔
1409
  int32_t code = 0, lino = 0;
209,296✔
1410

1411
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
209,296!
1412
  if (!pLRULastCol) {
209,534!
1413
    return terrno;
×
1414
  }
1415

1416
  size_t charge = 0;
209,534✔
1417
  *pLRULastCol = *pLastCol;
209,534✔
1418
  pLRULastCol->dirty = dirty;
209,534✔
1419
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
209,534!
1420

1421
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
209,141✔
1422
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
1423
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
209,404!
1424
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
1425
    code = TSDB_CODE_FAILED;
×
1426
    pLRULastCol = NULL;
×
1427
  }
1428

1429
_exit:
209,404✔
1430
  if (TSDB_CODE_SUCCESS != code) {
209,404!
1431
    taosMemoryFree(pLRULastCol);
×
1432
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1433
  }
1434

1435
  TAOS_RETURN(code);
209,404✔
1436
}
1437

1438
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
12,842✔
1439
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
12,842!
1440
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1441
  }
1442

1443
  int32_t code = 0, lino = 0;
12,842✔
1444

1445
  int        num_keys = TARRAY_SIZE(updCtxArray);
12,842✔
1446
  SArray    *remainCols = NULL;
12,842✔
1447
  SLRUCache *pCache = pTsdb->lruCache;
12,842✔
1448

1449
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
12,842✔
1450
  for (int i = 0; i < num_keys; ++i) {
126,398✔
1451
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
113,424✔
1452
    int8_t          lflag = updCtx->lflag;
113,424✔
1453
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
113,424✔
1454
    SColVal        *pColVal = &updCtx->colVal;
113,424✔
1455

1456
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
113,424!
1457
      continue;
×
1458
    }
1459

1460
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
113,424✔
1461
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
113,424✔
1462
    if (h) {
113,823✔
1463
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
88,858✔
1464
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
88,693!
1465
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
88,703✔
1466
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
88,606!
1467
          SLastCol newLastCol = {
88,606✔
1468
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1469
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
88,606✔
1470
        }
1471
      }
1472

1473
      tsdbLRUCacheRelease(pCache, h, false);
88,519✔
1474
      TAOS_CHECK_EXIT(code);
88,649!
1475
    } else {
1476
      if (!remainCols) {
24,965✔
1477
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
683✔
1478
        if (!remainCols) {
683!
1479
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1480
        }
1481
      }
1482
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
49,872!
1483
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1484
      }
1485
    }
1486
  }
1487

1488
  if (remainCols) {
12,974✔
1489
    num_keys = TARRAY_SIZE(remainCols);
683✔
1490
  }
1491
  if (remainCols && num_keys > 0) {
12,974!
1492
    char  **keys_list = NULL;
683✔
1493
    size_t *keys_list_sizes = NULL;
683✔
1494
    char  **values_list = NULL;
683✔
1495
    size_t *values_list_sizes = NULL;
683✔
1496
    char  **errs = NULL;
683✔
1497
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
683!
1498
    if (!keys_list) {
683!
1499
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1500
      return terrno;
×
1501
    }
1502
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
683!
1503
    if (!keys_list_sizes) {
683!
1504
      taosMemoryFree(keys_list);
×
1505
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1506
      return terrno;
×
1507
    }
1508
    for (int i = 0; i < num_keys; ++i) {
25,598✔
1509
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
24,915✔
1510

1511
      keys_list[i] = (char *)&idxKey->key;
24,915✔
1512
      keys_list_sizes[i] = ROCKS_KEY_LEN;
24,915✔
1513
    }
1514

1515
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
683✔
1516

1517
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
683✔
1518
                                       &values_list_sizes);
1519
    if (code) {
683!
1520
      taosMemoryFree(keys_list);
×
1521
      taosMemoryFree(keys_list_sizes);
×
1522
      goto _exit;
×
1523
    }
1524

1525
    // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
1526
    for (int i = 0; i < num_keys; ++i) {
25,567✔
1527
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
24,886✔
1528
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
24,886✔
1529
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
24,883✔
1530
      SColVal        *pColVal = &updCtx->colVal;
24,883✔
1531

1532
      SLastCol *pLastCol = NULL;
24,883✔
1533
      if (values_list[i] != NULL) {
24,883✔
1534
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
65✔
1535
        if (code != TSDB_CODE_SUCCESS) {
65!
1536
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1537
                    tstrerror(code));
1538
          goto _exit;
×
1539
        }
1540
      }
1541
      /*
1542
      if (code) {
1543
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1544
      }
1545
      */
1546
      SLastCol *pToFree = pLastCol;
24,883✔
1547

1548
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
24,883!
1549
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
1550
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1551
                    tstrerror(code));
1552
          taosMemoryFreeClear(pToFree);
×
1553
          break;
×
1554
        }
1555

1556
        // cache invalid => skip update
1557
        taosMemoryFreeClear(pToFree);
×
1558
        continue;
×
1559
      }
1560

1561
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
24,883!
1562
        taosMemoryFreeClear(pToFree);
×
1563
        continue;
×
1564
      }
1565

1566
      int32_t cmp_res = 1;
24,883✔
1567
      if (pLastCol) {
24,883✔
1568
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
65✔
1569
      }
1570

1571
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
24,883!
1572
        SLastCol lastColTmp = {
24,884✔
1573
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
1574
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
24,884!
1575
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1576
                    tstrerror(code));
1577
          taosMemoryFreeClear(pToFree);
×
1578
          break;
×
1579
        }
1580
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
24,866✔
1581
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
2!
1582
                    tstrerror(code));
1583
          taosMemoryFreeClear(pToFree);
×
1584
          break;
×
1585
        }
1586
      }
1587

1588
      taosMemoryFreeClear(pToFree);
24,884!
1589
    }
1590

1591
    rocksMayWrite(pTsdb, false);
681✔
1592

1593
    taosMemoryFree(keys_list);
683!
1594
    taosMemoryFree(keys_list_sizes);
683!
1595
    if (values_list) {
683!
1596
#ifdef USE_ROCKSDB
1597
      for (int i = 0; i < num_keys; ++i) {
25,598✔
1598
        rocksdb_free(values_list[i]);
24,915✔
1599
      }
1600
#endif
1601
      taosMemoryFree(values_list);
683!
1602
    }
1603
    taosMemoryFree(values_list_sizes);
683!
1604
  }
1605

1606
_exit:
12,291✔
1607
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
12,974✔
1608
  taosArrayDestroy(remainCols);
12,908✔
1609

1610
  if (code) {
12,892!
1611
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1612
              tstrerror(code));
1613
  }
1614

1615
  TAOS_RETURN(code);
12,892✔
1616
}
1617

1618
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
×
1619
                                 SRow **aRow) {
1620
  int32_t code = 0, lino = 0;
×
1621

1622
  // 1. prepare last
1623
  TSDBROW      lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
×
1624
  STSchema    *pTSchema = NULL;
×
1625
  int32_t      sver = TSDBROW_SVERSION(&lRow);
×
1626
  SSHashObj   *iColHash = NULL;
×
1627
  STSDBRowIter iter = {0};
×
1628

1629
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
1630
  pTSchema = pTsdb->rCache.pTSchema;
×
1631

1632
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
×
1633
  int32_t nCol = pTSchema->numOfCols;
×
1634
  SArray *ctxArray = pTsdb->rCache.ctxArray;
×
1635

1636
  // 1. prepare by lrow
1637
  STsdbRowKey tsdbRowKey = {0};
×
1638
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1639

1640
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1641

1642
  int32_t iCol = 0;
×
1643
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
1644
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1645
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1646
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1647
    }
1648

1649
    if (COL_VAL_IS_VALUE(pColVal)) {
×
1650
      updateCtx.lflag = LFLAG_LAST;
×
1651
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1652
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1653
      }
1654
    } else {
1655
      if (!iColHash) {
×
1656
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
1657
        if (iColHash == NULL) {
×
1658
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1659
        }
1660
      }
1661

1662
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
×
1663
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1664
      }
1665
    }
1666
  }
1667

1668
  // 2. prepare by the other rows
1669
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
×
1670
    if (tSimpleHashGetSize(iColHash) == 0) {
×
1671
      break;
×
1672
    }
1673

1674
    tRow.pTSRow = aRow[iRow];
×
1675

1676
    STsdbRowKey tsdbRowKey = {0};
×
1677
    tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1678

1679
    void   *pIte = NULL;
×
1680
    int32_t iter = 0;
×
1681
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
×
1682
      int32_t iCol = ((int32_t *)pIte)[0];
×
1683
      SColVal colVal = COL_VAL_NONE(0, 0);
×
1684
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
×
1685

1686
      if (COL_VAL_IS_VALUE(&colVal)) {
×
1687
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1688
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1689
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1690
        }
1691
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
1692
        if (code != TSDB_CODE_SUCCESS) {
×
1693
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1694
                    __LINE__, tstrerror(code));
1695
        }
1696
      }
1697
    }
1698
  }
1699

1700
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1701

1702
_exit:
×
1703
  if (code) {
×
1704
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1705
  }
1706

1707
  tsdbRowClose(&iter);
×
1708
  tSimpleHashCleanup(iColHash);
×
1709
  taosArrayClear(ctxArray);
×
1710

1711
  TAOS_RETURN(code);
×
1712
}
1713

1714
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
1715
  int32_t      code = 0, lino = 0;
×
1716
  STSDBRowIter iter = {0};
×
1717
  STSchema    *pTSchema = NULL;
×
1718
  SArray      *ctxArray = NULL;
×
1719

1720
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
1721
  int32_t sver = TSDBROW_SVERSION(&lRow);
×
1722

1723
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1724

1725
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
1726
  if (ctxArray == NULL) {
×
1727
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1728
  }
1729

1730
  // 1. prepare last
1731
  STsdbRowKey tsdbRowKey = {0};
×
1732
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1733

1734
  {
1735
    SValue tsVal = {.type = TSDB_DATA_TYPE_TIMESTAMP};
×
1736
    VALUE_SET_TRIVIAL_DATUM(&tsVal, lRow.pBlockData->aTSKEY[lRow.iRow]);
×
1737
    SLastUpdateCtx updateCtx = {
×
1738
        .lflag = LFLAG_LAST,
1739
        .tsdbRowKey = tsdbRowKey,
1740
        .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, tsVal)};
1741
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1742
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1743
    }
1744
  }
1745

1746
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1747

1748
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1749
    SColData *pColData = &pBlockData->aColData[iColData];
×
1750
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
1751
      continue;
×
1752
    }
1753

1754
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
1755
      STsdbRowKey tsdbRowKey = {0};
×
1756
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1757

1758
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
1759
      if (colType == 2) {
×
1760
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
1761
        TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit);
×
1762

1763
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1764
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1765
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1766
        }
1767
        break;
×
1768
      }
1769
    }
1770
  }
1771

1772
  // 2. prepare last row
1773
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1774
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
1775
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1776
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1777
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1778
    }
1779
  }
1780

1781
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1782

1783
_exit:
×
1784
  tsdbRowClose(&iter);
×
1785
  taosMemoryFreeClear(pTSchema);
×
1786
  taosArrayDestroy(ctxArray);
×
1787

1788
  TAOS_RETURN(code);
×
1789
}
1790

1791
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1792
                            int nCols, int16_t *slotIds);
1793

1794
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1795
                               int nCols, int16_t *slotIds);
1796

1797
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
1,143✔
1798
                                    SCacheRowsReader *pr, int8_t ltype) {
1799
  int32_t               code = 0, lino = 0;
1,143✔
1800
  // rocksdb_writebatch_t *wb = NULL;
1801
  SArray               *pTmpColArray = NULL;
1,143✔
1802
  bool                  extraTS = false;
1,143✔
1803

1804
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
1,143✔
1805
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
1,144✔
1806
    // ignore 'ts' loaded from cache and load it from tsdb
1807
    // SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
1808
    // tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
1809

1810
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
537✔
1811
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
537!
1812
      TAOS_RETURN(terrno);
×
1813
    }
1814

1815
    extraTS = true;
538✔
1816
  }
1817

1818
  int      num_keys = TARRAY_SIZE(remainCols);
1,145✔
1819
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
1,145!
1820

1821
  int16_t *lastColIds = NULL, *lastSlotIds = NULL, *lastrowColIds = NULL, *lastrowSlotIds = NULL;
1,144✔
1822
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
1,144!
1823
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
1,145!
1824
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
1,145!
1825
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
1,145!
1826
  SArray *lastTmpColArray = NULL, *lastTmpIndexArray = NULL, *lastrowTmpColArray = NULL, *lastrowTmpIndexArray = NULL;
1,142✔
1827

1828
  int lastIndex = 0;
1,142✔
1829
  int lastrowIndex = 0;
1,142✔
1830

1831
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
1,142!
1832
    TAOS_CHECK_EXIT(terrno);
×
1833
  }
1834

1835
  for (int i = 0; i < num_keys; ++i) {
3,536✔
1836
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
2,388✔
1837
    if (extraTS && !i) {
2,387✔
1838
      slotIds[i] = 0;
537✔
1839
    } else {
1840
      slotIds[i] = pr->pSlotIds[idxKey->idx];
1,850✔
1841
    }
1842

1843
    if (IS_LAST_KEY(idxKey->key)) {
2,387✔
1844
      if (NULL == lastTmpIndexArray) {
1,145✔
1845
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
547✔
1846
        if (!lastTmpIndexArray) {
547!
1847
          TAOS_CHECK_EXIT(terrno);
×
1848
        }
1849
      }
1850
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
1,148!
1851
        TAOS_CHECK_EXIT(terrno);
×
1852
      }
1853
      lastColIds[lastIndex] = idxKey->key.cid;
1,148✔
1854
      if (extraTS && !i) {
1,148✔
1855
        lastSlotIds[lastIndex] = 0;
538✔
1856
      } else {
1857
        lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
610✔
1858
      }
1859
      lastIndex++;
1,148✔
1860
    } else {
1861
      if (NULL == lastrowTmpIndexArray) {
1,242✔
1862
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
597✔
1863
        if (!lastrowTmpIndexArray) {
597!
1864
          TAOS_CHECK_EXIT(terrno);
×
1865
        }
1866
      }
1867
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
1,245!
1868
        TAOS_CHECK_EXIT(terrno);
×
1869
      }
1870
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
1,245✔
1871
      if (extraTS && !i) {
1,245!
1872
        lastrowSlotIds[lastrowIndex] = 0;
×
1873
      } else {
1874
        lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
1,245✔
1875
      }
1876
      lastrowIndex++;
1,245✔
1877
    }
1878
  }
1879

1880
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
1,148✔
1881
  if (!pTmpColArray) {
1,143!
1882
    TAOS_CHECK_EXIT(terrno);
×
1883
  }
1884

1885
  if (lastTmpIndexArray != NULL) {
1,143✔
1886
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
547!
1887
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
1,676✔
1888
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
1,127!
1889
                           taosArrayGet(lastTmpColArray, i))) {
1,127✔
1890
        TAOS_CHECK_EXIT(terrno);
×
1891
      }
1892
    }
1893
  }
1894

1895
  if (lastrowTmpIndexArray != NULL) {
1,144✔
1896
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
596!
1897
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
1,820✔
1898
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
1,223!
1899
                           taosArrayGet(lastrowTmpColArray, i))) {
1,222✔
1900
        TAOS_CHECK_EXIT(terrno);
×
1901
      }
1902
    }
1903
  }
1904

1905
  SLRUCache *pCache = pTsdb->lruCache;
1,145✔
1906
  for (int i = 0; i < num_keys; ++i) {
3,538✔
1907
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
2,393✔
1908
    SLastCol *pLastCol = NULL;
2,392✔
1909

1910
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
2,392!
1911
      pLastCol = taosArrayGet(pTmpColArray, i);
2,351✔
1912
    }
1913

1914
    // still null, then make up a none col value
1915
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
2,392✔
1916
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
2,392✔
1917
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
1918
    if (!pLastCol) {
2,392✔
1919
      pLastCol = &noneCol;
40✔
1920
    }
1921

1922
    if (!extraTS || i > 0) {
2,392✔
1923
      taosArraySet(pLastArray, idxKey->idx, pLastCol);
1,854✔
1924
    }
1925
    // taosArrayRemove(remainCols, i);
1926

1927
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
2,393!
1928
      continue;
×
1929
    }
1930
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
2,393!
1931
      continue;
×
1932
    }
1933

1934
    // store result back to rocks cache
1935
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
2,393✔
1936
    if (code) {
2,389!
1937
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1938
      TAOS_CHECK_EXIT(code);
×
1939
    }
1940

1941
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
2,389✔
1942
    if (code) {
2,393!
1943
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1944
      TAOS_CHECK_EXIT(code);
×
1945
    }
1946
  }
1947

1948
  rocksMayWrite(pTsdb, false);
1,145✔
1949

1950
_exit:
1,145✔
1951
  taosArrayDestroy(lastrowTmpIndexArray);
1,145✔
1952
  taosArrayDestroy(lastrowTmpColArray);
1,145✔
1953
  taosArrayDestroy(lastTmpIndexArray);
1,145✔
1954
  taosArrayDestroy(lastTmpColArray);
1,145✔
1955

1956
  taosMemoryFree(lastColIds);
1,145!
1957
  taosMemoryFree(lastSlotIds);
1,145!
1958
  taosMemoryFree(lastrowColIds);
1,145!
1959
  taosMemoryFree(lastrowSlotIds);
1,145!
1960

1961
  taosArrayDestroy(pTmpColArray);
1,145✔
1962

1963
  taosMemoryFree(slotIds);
1,145!
1964

1965
  TAOS_RETURN(code);
1,145✔
1966
}
1967

1968
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
1,182✔
1969
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
1970
  int32_t code = 0, lino = 0;
1,182✔
1971
  int     num_keys = TARRAY_SIZE(remainCols);
1,182✔
1972
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
1,182!
1973
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
1,183!
1974
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
1,184!
1975
  if (!keys_list || !keys_list_sizes || !key_list) {
1,183!
1976
    taosMemoryFree(keys_list);
×
1977
    taosMemoryFree(keys_list_sizes);
×
1978
    TAOS_RETURN(terrno);
×
1979
  }
1980
  char  **values_list = NULL;
1,183✔
1981
  size_t *values_list_sizes = NULL;
1,183✔
1982
  for (int i = 0; i < num_keys; ++i) {
3,110✔
1983
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
1,927✔
1984
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
1,927✔
1985
    keys_list_sizes[i] = ROCKS_KEY_LEN;
1,927✔
1986
  }
1987

1988
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
1,183✔
1989

1990
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
1,180✔
1991
                                     &values_list_sizes);
1992
  if (code) {
1,180!
1993
    taosMemoryFree(key_list);
×
1994
    taosMemoryFree(keys_list);
×
1995
    taosMemoryFree(keys_list_sizes);
×
1996
    TAOS_RETURN(code);
×
1997
  }
1998

1999
  SLRUCache *pCache = pTsdb->lruCache;
1,180✔
2000
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
3,109!
2001
    SLastCol *pLastCol = NULL;
1,929✔
2002
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
1,929✔
2003
    if (ignore) {
1,929✔
2004
      ++j;
1✔
2005
      continue;
1✔
2006
    }
2007

2008
    if (values_list[i] != NULL) {
1,928✔
2009
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
75✔
2010
      if (code != TSDB_CODE_SUCCESS) {
75✔
2011
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
1!
2012
                  tstrerror(code));
2013
        goto _exit;
×
2014
      }
2015
    }
2016
    SLastCol *pToFree = pLastCol;
1,927✔
2017
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
1,927✔
2018
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
2,003!
2019
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
75✔
2020
      if (code) {
75!
2021
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
2022
        taosMemoryFreeClear(pToFree);
×
2023
        TAOS_CHECK_EXIT(code);
×
2024
      }
2025

2026
      SLastCol lastCol = *pLastCol;
75✔
2027
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
75✔
2028
      if (TSDB_CODE_SUCCESS != code) {
75!
2029
        taosMemoryFreeClear(pToFree);
×
2030
        TAOS_CHECK_EXIT(code);
×
2031
      }
2032

2033
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
75✔
2034
      taosArrayRemove(remainCols, j);
75✔
2035
      taosArrayRemove(ignoreFromRocks, j);
75✔
2036
    } else {
2037
      ++j;
1,852✔
2038
    }
2039

2040
    taosMemoryFreeClear(pToFree);
1,928!
2041
  }
2042

2043
  if (TARRAY_SIZE(remainCols) > 0) {
1,180✔
2044
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
2045
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
1,144✔
2046
  }
2047

2048
_exit:
36✔
2049
  taosMemoryFree(key_list);
1,181!
2050
  taosMemoryFree(keys_list);
1,184!
2051
  taosMemoryFree(keys_list_sizes);
1,183!
2052
  if (values_list) {
1,184!
2053
  #ifdef USE_ROCKSDB
2054
    for (int i = 0; i < num_keys; ++i) {
3,114✔
2055
      rocksdb_free(values_list[i]);
1,930✔
2056
    }
2057
  #endif
2058
    taosMemoryFree(values_list);
1,184!
2059
  }
2060
  taosMemoryFree(values_list_sizes);
1,184!
2061

2062
  TAOS_RETURN(code);
1,184✔
2063
}
2064

2065
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
20,351✔
2066
                                        int8_t ltype, SArray *keyArray) {
2067
  int32_t    code = 0, lino = 0;
20,351✔
2068
  SArray    *remainCols = NULL;
20,351✔
2069
  SArray    *ignoreFromRocks = NULL;
20,351✔
2070
  SLRUCache *pCache = pTsdb->lruCache;
20,351✔
2071
  SArray    *pCidList = pr->pCidList;
20,351✔
2072
  int        numKeys = TARRAY_SIZE(pCidList);
20,351✔
2073

2074
  for (int i = 0; i < numKeys; ++i) {
55,449✔
2075
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
35,081✔
2076

2077
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
35,081✔
2078
    // for select last_row, last case
2079
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
35,081✔
2080
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
35,081✔
2081
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
2,728✔
2082
    }
2083
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
35,079✔
2084
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
1,428✔
2085
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
1,428✔
2086
    }
2087

2088
    if (!taosArrayPush(keyArray, &key)) {
35,076!
2089
      TAOS_CHECK_EXIT(terrno);
×
2090
    }
2091

2092
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
35,076✔
2093
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
35,101✔
2094
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
68,245!
2095
      SLastCol lastCol = *pLastCol;
33,170✔
2096
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
33,170!
2097
        tsdbLRUCacheRelease(pCache, h, false);
×
2098
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
2099
      }
2100

2101
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
33,146!
2102
        code = terrno;
×
2103
        tsdbLRUCacheRelease(pCache, h, false);
×
2104
        goto _exit;
×
2105
      }
2106
    } else {
2107
      // no cache or cache is invalid
2108
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
1,929✔
2109
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
1,929✔
2110

2111
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
1,930!
2112
        code = terrno;
×
2113
        tsdbLRUCacheRelease(pCache, h, false);
×
2114
        goto _exit;
×
2115
      }
2116

2117
      if (!remainCols) {
1,930✔
2118
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
1,184!
2119
          code = terrno;
×
2120
          tsdbLRUCacheRelease(pCache, h, false);
×
2121
          goto _exit;
×
2122
        }
2123
      }
2124
      if (!ignoreFromRocks) {
1,928✔
2125
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
1,182!
2126
          code = terrno;
×
2127
          tsdbLRUCacheRelease(pCache, h, false);
×
2128
          goto _exit;
×
2129
        }
2130
      }
2131
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
3,859!
2132
        code = terrno;
×
2133
        tsdbLRUCacheRelease(pCache, h, false);
×
2134
        goto _exit;
×
2135
      }
2136
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
1,930!
2137
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
1,930!
2138
        code = terrno;
×
2139
        tsdbLRUCacheRelease(pCache, h, false);
×
2140
        goto _exit;
×
2141
      }
2142
    }
2143

2144
    if (h) {
35,076✔
2145
      tsdbLRUCacheRelease(pCache, h, false);
33,141✔
2146
    }
2147
  }
2148

2149
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
20,368!
2150
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
1,184✔
2151

2152
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
3,113✔
2153
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
1,930✔
2154
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
1,930✔
2155
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
1,930✔
2156
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
1,929!
2157
        SLastCol lastCol = *pLastCol;
×
2158
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
2159
        if (code) {
×
2160
          tsdbLRUCacheRelease(pCache, h, false);
×
2161
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
2162
          TAOS_RETURN(code);
×
2163
        }
2164

2165
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
2166

2167
        taosArrayRemove(remainCols, i);
×
2168
        taosArrayRemove(ignoreFromRocks, i);
×
2169
      } else {
2170
        // no cache or cache is invalid
2171
        ++i;
1,929✔
2172
      }
2173
      if (h) {
1,929✔
2174
        tsdbLRUCacheRelease(pCache, h, false);
1✔
2175
      }
2176
    }
2177

2178
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
2179
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
1,183✔
2180

2181
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
1,184✔
2182
  }
2183

2184
_exit:
19,184✔
2185
  if (remainCols) {
20,358✔
2186
    taosArrayDestroy(remainCols);
1,184✔
2187
  }
2188
  if (ignoreFromRocks) {
20,358✔
2189
    taosArrayDestroy(ignoreFromRocks);
1,184✔
2190
  }
2191

2192
  TAOS_RETURN(code);
20,358✔
2193
}
2194

2195
typedef enum SMEMNEXTROWSTATES {
2196
  SMEMNEXTROW_ENTER,
2197
  SMEMNEXTROW_NEXT,
2198
} SMEMNEXTROWSTATES;
2199

2200
typedef struct SMemNextRowIter {
2201
  SMEMNEXTROWSTATES state;
2202
  STbData          *pMem;  // [input]
2203
  STbDataIter       iter;  // mem buffer skip list iterator
2204
  int64_t           lastTs;
2205
} SMemNextRowIter;
2206

2207
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
838,476✔
2208
                                 int nCols) {
2209
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
838,476✔
2210
  int32_t          code = 0;
838,476✔
2211
  *pIgnoreEarlierTs = false;
838,476✔
2212
  switch (state->state) {
838,476!
2213
    case SMEMNEXTROW_ENTER: {
12,162✔
2214
      if (state->pMem != NULL) {
12,162!
2215
        /*
2216
        if (state->pMem->maxKey <= state->lastTs) {
2217
          *ppRow = NULL;
2218
          *pIgnoreEarlierTs = true;
2219

2220
          TAOS_RETURN(code);
2221
        }
2222
        */
2223
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
12,167✔
2224

2225
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
12,165!
2226
        if (pMemRow) {
12,165!
2227
          *ppRow = pMemRow;
12,169✔
2228
          state->state = SMEMNEXTROW_NEXT;
12,169✔
2229

2230
          TAOS_RETURN(code);
12,169✔
2231
        }
2232
      }
2233

2234
      *ppRow = NULL;
×
2235

2236
      TAOS_RETURN(code);
×
2237
    }
2238
    case SMEMNEXTROW_NEXT:
826,649✔
2239
      if (tsdbTbDataIterNext(&state->iter)) {
826,649!
2240
        *ppRow = tsdbTbDataIterGet(&state->iter);
820,300!
2241

2242
        TAOS_RETURN(code);
820,300✔
2243
      } else {
2244
        *ppRow = NULL;
×
2245

2246
        TAOS_RETURN(code);
×
2247
      }
2248
    default:
×
2249
      break;
×
2250
  }
2251

2252
_err:
×
2253
  *ppRow = NULL;
×
2254

2255
  TAOS_RETURN(code);
×
2256
}
2257

2258
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2259
                                  int nCols);
2260
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2261

2262
typedef struct {
2263
  TSDBROW             *pRow;
2264
  bool                 stop;
2265
  bool                 next;
2266
  bool                 ignoreEarlierTs;
2267
  void                *iter;
2268
  _next_row_fn_t       nextRowFn;
2269
  _next_row_clear_fn_t nextRowClearFn;
2270
} TsdbNextRowState;
2271

2272
typedef struct {
2273
  SArray           *pMemDelData;
2274
  SArray           *pSkyline;
2275
  int64_t           iSkyline;
2276
  SBlockIdx         idx;
2277
  SMemNextRowIter   memState;
2278
  SMemNextRowIter   imemState;
2279
  TSDBROW           memRow, imemRow;
2280
  TsdbNextRowState  input[2];
2281
  SCacheRowsReader *pr;
2282
  STsdb            *pTsdb;
2283
} MemNextRowIter;
2284

2285
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
20,355✔
2286
                              STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
2287
  int32_t code = 0, lino = 0;
20,355✔
2288

2289
  STbData *pMem = NULL;
20,355✔
2290
  if (pReadSnap->pMem) {
20,355!
2291
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
20,357✔
2292
  }
2293

2294
  STbData *pIMem = NULL;
20,361✔
2295
  if (pReadSnap->pIMem) {
20,361✔
2296
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
15✔
2297
  }
2298

2299
  pIter->pTsdb = pTsdb;
20,361✔
2300

2301
  pIter->pMemDelData = NULL;
20,361✔
2302

2303
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
20,361!
2304

2305
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
20,358✔
2306

2307
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
20,358✔
2308
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
20,358✔
2309

2310
  if (pMem) {
20,358✔
2311
    pIter->memState.pMem = pMem;
11,582✔
2312
    pIter->memState.state = SMEMNEXTROW_ENTER;
11,582✔
2313
    pIter->input[0].stop = false;
11,582✔
2314
    pIter->input[0].next = true;
11,582✔
2315
  }
2316

2317
  if (pIMem) {
20,358✔
2318
    pIter->imemState.pMem = pIMem;
15✔
2319
    pIter->imemState.state = SMEMNEXTROW_ENTER;
15✔
2320
    pIter->input[1].stop = false;
15✔
2321
    pIter->input[1].next = true;
15✔
2322
  }
2323

2324
  pIter->pr = pr;
20,358✔
2325

2326
_exit:
20,358✔
2327
  if (code) {
20,358!
2328
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2329
  }
2330

2331
  TAOS_RETURN(code);
20,358✔
2332
}
2333

2334
static void memRowIterClose(MemNextRowIter *pIter) {
20,339✔
2335
  for (int i = 0; i < 2; ++i) {
61,031✔
2336
    if (pIter->input[i].nextRowClearFn) {
40,693!
2337
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
2338
    }
2339
  }
2340

2341
  if (pIter->pSkyline) {
20,338✔
2342
    taosArrayDestroy(pIter->pSkyline);
11,586✔
2343
  }
2344

2345
  if (pIter->pMemDelData) {
20,333!
2346
    taosArrayDestroy(pIter->pMemDelData);
20,346✔
2347
  }
2348
}
20,335✔
2349

2350
static void freeTableInfoFunc(void *param) {
12,149✔
2351
  void **p = (void **)param;
12,149✔
2352
  taosMemoryFreeClear(*p);
12,149!
2353
}
12,150✔
2354

2355
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
12,742✔
2356
  if (!pReader->pTableMap) {
12,742✔
2357
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
8,053✔
2358
    if (!pReader->pTableMap) {
8,057!
2359
      return NULL;
×
2360
    }
2361

2362
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
8,057✔
2363
  }
2364

2365
  STableLoadInfo  *pInfo = NULL;
12,739✔
2366
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
12,739✔
2367
  if (!ppInfo) {
12,739✔
2368
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
12,135!
2369
    if (pInfo) {
12,142!
2370
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
12,150!
2371
        return NULL;
×
2372
      }
2373
    }
2374

2375
    return pInfo;
12,135✔
2376
  }
2377

2378
  return *ppInfo;
604✔
2379
}
2380

2381
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
845,580✔
2382
  int32_t code = 0, lino = 0;
845,580✔
2383

2384
  for (;;) {
×
2385
    for (int i = 0; i < 2; ++i) {
2,518,981✔
2386
      if (pIter->input[i].next && !pIter->input[i].stop) {
1,685,988!
2387
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
839,422!
2388
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
2389
                        &lino, _exit);
2390

2391
        if (pIter->input[i].pRow == NULL) {
826,961✔
2392
          pIter->input[i].stop = true;
211✔
2393
          pIter->input[i].next = false;
211✔
2394
        }
2395
      }
2396
    }
2397

2398
    if (pIter->input[0].stop && pIter->input[1].stop) {
832,993✔
2399
      return NULL;
836,049✔
2400
    }
2401

2402
    TSDBROW *max[2] = {0};
824,013✔
2403
    int      iMax[2] = {-1, -1};
824,013✔
2404
    int      nMax = 0;
824,013✔
2405
    SRowKey  maxKey = {.ts = TSKEY_MIN};
824,013✔
2406

2407
    for (int i = 0; i < 2; ++i) {
2,484,491✔
2408
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
1,655,240!
2409
        STsdbRowKey tsdbRowKey = {0};
826,559✔
2410
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
826,559✔
2411

2412
        // merging & deduplicating on client side
2413
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
822,064✔
2414
        if (c <= 0) {
831,797✔
2415
          if (c < 0) {
830,831✔
2416
            nMax = 0;
817,164✔
2417
            maxKey = tsdbRowKey.key;
817,164✔
2418
          }
2419

2420
          iMax[nMax] = i;
830,831✔
2421
          max[nMax++] = pIter->input[i].pRow;
830,831✔
2422
        }
2423
        pIter->input[i].next = false;
831,797✔
2424
      }
2425
    }
2426

2427
    TSDBROW *merge[2] = {0};
829,251✔
2428
    int      iMerge[2] = {-1, -1};
829,251✔
2429
    int      nMerge = 0;
829,251✔
2430
    for (int i = 0; i < nMax; ++i) {
1,659,408✔
2431
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
832,465!
2432

2433
      if (!pIter->pSkyline) {
832,465✔
2434
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
11,576✔
2435
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
11,580!
2436

2437
        uint64_t        uid = pIter->idx.uid;
11,580✔
2438
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
11,580✔
2439
        TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
11,572!
2440

2441
        if (pInfo->pTombData == NULL) {
11,572✔
2442
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
11,004✔
2443
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
11,013!
2444
        }
2445

2446
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
11,581!
2447
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2448
        }
2449

2450
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
11,583✔
2451
        if (delSize > 0) {
11,583✔
2452
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
1✔
2453
          TAOS_CHECK_GOTO(code, &lino, _exit);
1!
2454
        }
2455
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
11,583✔
2456
      }
2457

2458
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
832,471✔
2459
      if (!deleted) {
830,157!
2460
        iMerge[nMerge] = iMax[i];
830,812✔
2461
        merge[nMerge++] = max[i];
830,812✔
2462
      }
2463

2464
      pIter->input[iMax[i]].next = deleted;
830,157✔
2465
    }
2466

2467
    if (nMerge > 0) {
826,943!
2468
      pIter->input[iMerge[0]].next = true;
827,069✔
2469

2470
      return merge[0];
827,069✔
2471
    }
2472
  }
2473

2474
_exit:
×
2475
  if (code) {
×
2476
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2477
  }
2478

2479
  return NULL;
×
2480
}
2481

2482
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
7,307✔
2483
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
7,307✔
2484
  *ppDst = taosMemoryMalloc(len);
7,307!
2485
  if (NULL == *ppDst) {
7,308!
2486
    TAOS_RETURN(terrno);
×
2487
  }
2488
  memcpy(*ppDst, pSrc, len);
7,308✔
2489

2490
  TAOS_RETURN(TSDB_CODE_SUCCESS);
7,308✔
2491
}
2492

2493
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
820,705✔
2494
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
820,705✔
2495
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
7,308✔
2496
  }
2497

2498
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
813,397!
2499
    TAOS_RETURN(TSDB_CODE_SUCCESS);
813,524✔
2500
  }
2501

2502
  taosMemoryFreeClear(pReader->pCurrSchema);
×
2503
  TAOS_RETURN(
×
2504
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
2505
}
2506

2507
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
20,349✔
2508
                                        SArray *keyArray) {
2509
  int32_t        code = 0;
20,349✔
2510
  int32_t        lino = 0;
20,349✔
2511
  STSchema      *pTSchema = pr->pSchema;
20,349✔
2512
  SLRUCache     *pCache = pTsdb->lruCache;
20,349✔
2513
  SArray        *pCidList = pr->pCidList;
20,349✔
2514
  int            numKeys = TARRAY_SIZE(pCidList);
20,349✔
2515
  MemNextRowIter iter = {0};
20,349✔
2516
  SSHashObj     *iColHash = NULL;
20,349✔
2517
  STSDBRowIter   rowIter = {0};
20,349✔
2518

2519
  // 1, get from mem, imem filtered with delete info
2520
  TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
20,349!
2521

2522
  TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
20,349✔
2523
  if (!pRow) {
20,346✔
2524
    goto _exit;
8,768✔
2525
  }
2526

2527
  int32_t sversion = TSDBROW_SVERSION(pRow);
11,578✔
2528
  if (sversion != -1) {
11,578!
2529
    TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
11,579!
2530

2531
    pTSchema = pr->pCurrSchema;
11,580✔
2532
  }
2533
  int32_t nCol = pTSchema->numOfCols;
11,579✔
2534

2535
  STsdbRowKey rowKey = {0};
11,579✔
2536
  tsdbRowGetKey(pRow, &rowKey);
11,579✔
2537

2538
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
11,584!
2539

2540
  int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
11,583✔
2541
  for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) {
67,095!
2542
    SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
55,502✔
2543
    if (pColVal->cid < pTargetCol->colVal.cid) {
55,502✔
2544
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
36,056✔
2545

2546
      continue;
36,059✔
2547
    }
2548
    if (pColVal->cid > pTargetCol->colVal.cid) {
19,446!
2549
      break;
×
2550
    }
2551

2552
    int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
19,446✔
2553
    if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
19,447✔
2554
      if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
11,255!
2555
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
11,255✔
2556
        TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
11,255!
2557

2558
        tsdbCacheFreeSLastColItem(pTargetCol);
11,257✔
2559
        taosArraySet(pLastArray, jCol, &lastCol);
11,257✔
2560
      }
2561
    } else {
2562
      if (COL_VAL_IS_VALUE(pColVal)) {
8,192✔
2563
        if (cmp_res <= 0) {
7,205!
2564
          SLastCol lastCol = {
7,206✔
2565
              .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2566
          TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
7,206!
2567

2568
          tsdbCacheFreeSLastColItem(pTargetCol);
7,213✔
2569
          taosArraySet(pLastArray, jCol, &lastCol);
7,210✔
2570
        }
2571
      } else {
2572
        if (!iColHash) {
987✔
2573
          iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
414✔
2574
          if (iColHash == NULL) {
414!
2575
            TAOS_CHECK_EXIT(terrno);
×
2576
          }
2577
        }
2578

2579
        if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
987!
2580
          TAOS_CHECK_EXIT(terrno);
×
2581
        }
2582
      }
2583
    }
2584

2585
    ++jCol;
19,451✔
2586

2587
    if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) {
19,451✔
2588
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
7,874✔
2589
    }
2590
  }
2591
  tsdbRowClose(&rowIter);
11,580✔
2592

2593
  if (iColHash && tSimpleHashGetSize(iColHash) > 0) {
11,585!
2594
    pRow = memRowIterGet(&iter, false, NULL, 0);
414✔
2595
    while (pRow) {
817,076✔
2596
      if (tSimpleHashGetSize(iColHash) == 0) {
816,865✔
2597
        break;
203✔
2598
      }
2599

2600
      sversion = TSDBROW_SVERSION(pRow);
808,318!
2601
      if (sversion != -1) {
808,318!
2602
        TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
810,131!
2603

2604
        pTSchema = pr->pCurrSchema;
795,152✔
2605
      }
2606
      nCol = pTSchema->numOfCols;
793,339✔
2607

2608
      STsdbRowKey tsdbRowKey = {0};
793,339✔
2609
      tsdbRowGetKey(pRow, &tsdbRowKey);
793,339✔
2610

2611
      TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
801,342!
2612

2613
      iCol = 0;
824,693✔
2614
      for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
7,299,249!
2615
           pColVal = tsdbRowIterNext(&rowIter), iCol++) {
5,990,639✔
2616
        int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
6,624,908✔
2617
        if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
5,990,639✔
2618
          SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol];
776✔
2619

2620
          int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key);
776✔
2621
          if (cmp_res <= 0) {
776!
2622
            SLastCol lastCol = {
776✔
2623
                .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2624
            TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
776!
2625

2626
            tsdbCacheFreeSLastColItem(pTargetCol);
776✔
2627
            taosArraySet(pLastArray, *pjCol, &lastCol);
776✔
2628
          }
2629

2630
          TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
776!
2631
        }
2632
      }
2633
      tsdbRowClose(&rowIter);
636,075✔
2634

2635
      pRow = memRowIterGet(&iter, false, NULL, 0);
829,308✔
2636
    }
2637
  }
2638

2639
_exit:
11,382✔
2640
  if (code) {
20,353!
2641
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2642

2643
    tsdbRowClose(&rowIter);
×
2644
  }
2645

2646
  tSimpleHashCleanup(iColHash);
20,353✔
2647

2648
  memRowIterClose(&iter);
20,342✔
2649

2650
  TAOS_RETURN(code);
20,348✔
2651
}
2652

2653
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
20,340✔
2654
  int32_t code = 0;
20,340✔
2655
  int32_t lino = 0;
20,340✔
2656

2657
  SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
20,340✔
2658
  if (!keyArray) {
20,357!
2659
    TAOS_CHECK_EXIT(terrno);
×
2660
  }
2661

2662
  TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
20,357!
2663

2664
  if (tsUpdateCacheBatch) {
20,358✔
2665
    TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
20,353!
2666
  }
2667

2668
_exit:
20,349✔
2669
  if (code) {
20,349!
2670
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2671
  }
2672

2673
  if (keyArray) {
20,345!
2674
    taosArrayDestroy(keyArray);
20,345✔
2675
  }
2676

2677
  TAOS_RETURN(code);
20,345✔
2678
}
2679

2680
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
8,878✔
2681
  int32_t   code = 0, lino = 0;
8,878✔
2682
  STSchema *pTSchema = NULL;
8,878✔
2683
  int       sver = -1;
8,878✔
2684
  int       numKeys = 0;
8,878✔
2685
  SArray   *remainCols = NULL;
8,878✔
2686

2687
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
8,878!
2688

2689
  int numCols = pTSchema->numOfCols;
8,878✔
2690

2691
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
8,878✔
2692

2693
  for (int i = 0; i < numCols; ++i) {
50,712✔
2694
    int16_t cid = pTSchema->columns[i].colId;
41,833✔
2695
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
125,499✔
2696
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
83,665✔
2697
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
83,665✔
2698
      if (h) {
83,666✔
2699
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
130✔
2700
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
130!
2701
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
4✔
2702
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
4✔
2703
                              .dirty = 1,
2704
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2705
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
4✔
2706
        }
2707
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
130✔
2708
        TAOS_CHECK_EXIT(code);
130!
2709
      } else {
2710
        if (!remainCols) {
83,536✔
2711
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
8,867✔
2712
        }
2713
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
167,073!
2714
          TAOS_CHECK_EXIT(terrno);
×
2715
        }
2716
      }
2717
    }
2718
  }
2719

2720
  if (remainCols) {
8,879✔
2721
    numKeys = TARRAY_SIZE(remainCols);
8,868✔
2722
  }
2723

2724
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
8,879!
2725
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
8,878!
2726
  char  **values_list = NULL;
8,878✔
2727
  size_t *values_list_sizes = NULL;
8,878✔
2728

2729
  if (!keys_list || !keys_list_sizes) {
8,878!
2730
    code = terrno;
×
2731
    goto _exit;
×
2732
  }
2733
  const size_t klen = ROCKS_KEY_LEN;
8,878✔
2734

2735
  for (int i = 0; i < numKeys; ++i) {
92,414✔
2736
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
83,536!
2737
    if (!key) {
83,536!
2738
      code = terrno;
×
2739
      goto _exit;
×
2740
    }
2741
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
83,536✔
2742

2743
    ((SLastKey *)key)[0] = idxKey->key;
83,536✔
2744

2745
    keys_list[i] = key;
83,536✔
2746
    keys_list_sizes[i] = klen;
83,536✔
2747
  }
2748

2749
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
8,878✔
2750

2751
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
8,877!
2752
                                              &values_list, &values_list_sizes),
2753
                  NULL, _exit);
2754

2755
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
2756
  for (int i = 0; i < numKeys; ++i) {
92,413✔
2757
    SLastCol *pLastCol = NULL;
83,535✔
2758
    if (values_list[i] != NULL) {
83,535!
2759
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
2760
      if (code != TSDB_CODE_SUCCESS) {
×
2761
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2762
                  tstrerror(code));
2763
        goto _exit;
×
2764
      }
2765
    }
2766
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
83,535✔
2767
    SLastKey *pLastKey = &idxKey->key;
83,535✔
2768
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
83,535!
2769
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
×
2770
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
×
2771
                             .dirty = 0,
2772
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2773

2774
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
×
2775
        taosMemoryFreeClear(pLastCol);
×
2776
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2777
        goto _exit;
×
2778
      }
2779
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
×
2780
        taosMemoryFreeClear(pLastCol);
×
2781
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2782
        goto _exit;
×
2783
      }
2784
    }
2785

2786
    if (pLastCol == NULL) {
83,535✔
2787
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
83,534✔
2788
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2789
    }
2790

2791
    taosMemoryFreeClear(pLastCol);
83,536!
2792
  }
2793

2794
  rocksMayWrite(pTsdb, false);
8,878✔
2795

2796
_exit:
8,878✔
2797
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
8,878✔
2798

2799
  for (int i = 0; i < numKeys; ++i) {
92,414✔
2800
    taosMemoryFree(keys_list[i]);
83,536!
2801
  }
2802
  taosMemoryFree(keys_list);
8,878!
2803
  taosMemoryFree(keys_list_sizes);
8,878!
2804
  if (values_list) {
8,878!
2805
 #if USE_ROCKSDB   
2806
    for (int i = 0; i < numKeys; ++i) {
92,414✔
2807
      rocksdb_free(values_list[i]);
83,536✔
2808
    }
2809
#endif
2810
    taosMemoryFree(values_list);
8,878!
2811
  }
2812
  taosMemoryFree(values_list_sizes);
8,878!
2813
  taosArrayDestroy(remainCols);
8,878✔
2814
  taosMemoryFree(pTSchema);
8,878!
2815

2816
  TAOS_RETURN(code);
8,878✔
2817
}
2818

2819
int32_t tsdbOpenCache(STsdb *pTsdb) {
14,580✔
2820
  int32_t code = 0, lino = 0;
14,580✔
2821
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
14,580✔
2822

2823
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
14,580✔
2824
  if (pCache == NULL) {
14,609!
2825
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2826
  }
2827

2828
#ifdef USE_SHARED_STORAGE
2829
  if (tsSsEnabled) {
14,609!
2830
    TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
×
2831
    TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
×
2832
  }
2833
#endif
2834

2835
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
14,609!
2836

2837
  taosLRUCacheSetStrictCapacity(pCache, false);
14,597✔
2838

2839
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
14,607✔
2840

2841
_err:
14,606✔
2842
  if (code) {
14,606!
2843
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2844
  }
2845

2846
  pTsdb->lruCache = pCache;
14,606✔
2847

2848
  TAOS_RETURN(code);
14,606✔
2849
}
2850

2851
void tsdbCloseCache(STsdb *pTsdb) {
14,608✔
2852
  SLRUCache *pCache = pTsdb->lruCache;
14,608✔
2853
  if (pCache) {
14,608!
2854
    taosLRUCacheEraseUnrefEntries(pCache);
14,609✔
2855

2856
    taosLRUCacheCleanup(pCache);
14,608✔
2857

2858
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
14,609✔
2859
  }
2860

2861
#ifdef USE_SHARED_STORAGE
2862
  if (tsSsEnabled) {
14,609!
2863
    tsdbCloseBCache(pTsdb);
×
2864
    tsdbClosePgCache(pTsdb);
×
2865
  }
2866
#endif
2867

2868
  tsdbCloseRocksCache(pTsdb);
14,609✔
2869
}
14,596✔
2870

2871
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
2872
  if (cacheType == 0) {  // last_row
×
2873
    *(uint64_t *)key = (uint64_t)uid;
×
2874
  } else {  // last
2875
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2876
  }
2877

2878
  *len = sizeof(uint64_t);
×
2879
}
×
2880

2881
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
2882
  tb_uid_t suid = 0;
×
2883

2884
  SMetaReader mr = {0};
×
2885
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
2886
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
2887
    metaReaderClear(&mr);  // table not esist
×
2888
    return 0;
×
2889
  }
2890

2891
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
2892
    suid = mr.me.ctbEntry.suid;
×
2893
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
2894
    suid = 0;
×
2895
  } else {
2896
    suid = 0;
×
2897
  }
2898

2899
  metaReaderClear(&mr);
×
2900

2901
  return suid;
×
2902
}
2903

2904
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
2905
  int32_t code = 0;
×
2906

2907
  if (pDelIdx) {
×
2908
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
2909
  }
2910

2911
  TAOS_RETURN(code);
×
2912
}
2913

2914
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
2915
  int32_t   code = 0;
×
2916
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
2917

2918
  for (; pDelData; pDelData = pDelData->pNext) {
×
2919
    if (!taosArrayPush(aDelData, pDelData)) {
×
2920
      TAOS_RETURN(terrno);
×
2921
    }
2922
  }
2923

2924
  TAOS_RETURN(code);
×
2925
}
2926

2927
static uint64_t *getUidList(SCacheRowsReader *pReader) {
6,788✔
2928
  if (!pReader->uidList) {
6,788✔
2929
    int32_t numOfTables = pReader->numOfTables;
294✔
2930

2931
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
294!
2932
    if (!pReader->uidList) {
295!
2933
      return NULL;
×
2934
    }
2935

2936
    for (int32_t i = 0; i < numOfTables; ++i) {
872✔
2937
      uint64_t uid = pReader->pTableList[i].uid;
577✔
2938
      pReader->uidList[i] = uid;
577✔
2939
    }
2940

2941
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
295✔
2942
  }
2943

2944
  return pReader->uidList;
6,786✔
2945
}
2946

2947
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
6,790✔
2948
                               bool isFile) {
2949
  int32_t   code = 0;
6,790✔
2950
  int32_t   numOfTables = pReader->numOfTables;
6,790✔
2951
  int64_t   suid = pReader->info.suid;
6,790✔
2952
  uint64_t *uidList = getUidList(pReader);
6,790✔
2953

2954
  if (!uidList) {
6,788!
2955
    TAOS_RETURN(terrno);
×
2956
  }
2957

2958
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
6,804!
2959
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
16✔
2960
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
16!
2961
      continue;
×
2962
    }
2963

2964
    if (pTombBlk->minTbid.suid > suid ||
16!
2965
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
16!
2966
      break;
2967
    }
2968

2969
    STombBlock block = {0};
16✔
2970
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
16✔
2971
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
16!
2972
    if (code != TSDB_CODE_SUCCESS) {
16!
2973
      TAOS_RETURN(code);
×
2974
    }
2975

2976
    uint64_t        uid = uidList[j];
16✔
2977
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
16✔
2978
    if (!pInfo) {
16!
2979
      tTombBlockDestroy(&block);
×
2980
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
2981
    }
2982

2983
    if (pInfo->pTombData == NULL) {
16✔
2984
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
2✔
2985
    }
2986

2987
    STombRecord record = {0};
16✔
2988
    bool        finished = false;
16✔
2989
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
32✔
2990
      code = tTombBlockGet(&block, k, &record);
16✔
2991
      if (code != TSDB_CODE_SUCCESS) {
16!
2992
        finished = true;
×
2993
        break;
×
2994
      }
2995

2996
      if (record.suid < suid) {
16!
2997
        continue;
×
2998
      }
2999
      if (record.suid > suid) {
16!
3000
        finished = true;
×
3001
        break;
×
3002
      }
3003

3004
      bool newTable = false;
16✔
3005
      if (uid < record.uid) {
16!
3006
        while (j < numOfTables && uidList[j] < record.uid) {
96!
3007
          ++j;
80✔
3008
          newTable = true;
80✔
3009
        }
3010

3011
        if (j >= numOfTables) {
16!
3012
          finished = true;
×
3013
          break;
×
3014
        }
3015

3016
        uid = uidList[j];
16✔
3017
      }
3018

3019
      if (record.uid < uid) {
16!
3020
        continue;
×
3021
      }
3022

3023
      if (newTable) {
16!
3024
        pInfo = getTableLoadInfo(pReader, uid);
16✔
3025
        if (!pInfo) {
16!
3026
          code = TSDB_CODE_OUT_OF_MEMORY;
×
3027
          finished = true;
×
3028
          break;
×
3029
        }
3030
        if (pInfo->pTombData == NULL) {
16✔
3031
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
2✔
3032
          if (!pInfo->pTombData) {
2!
3033
            code = terrno;
×
3034
            finished = true;
×
3035
            break;
×
3036
          }
3037
        }
3038
      }
3039

3040
      if (record.version <= pReader->info.verRange.maxVer) {
16!
3041
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
3042
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
3043

3044
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
16✔
3045
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
32!
3046
          TAOS_RETURN(terrno);
×
3047
        }
3048
      }
3049
    }
3050

3051
    tTombBlockDestroy(&block);
16✔
3052

3053
    if (finished) {
16!
3054
      TAOS_RETURN(code);
×
3055
    }
3056
  }
3057

3058
  TAOS_RETURN(TSDB_CODE_SUCCESS);
6,788✔
3059
}
3060

3061
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
16✔
3062
  const TTombBlkArray *pBlkArray = NULL;
16✔
3063

3064
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
16!
3065

3066
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
16✔
3067
}
3068

3069
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
6,784✔
3070
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
6,784✔
3071
  const TTombBlkArray *pBlkArray = NULL;
6,784✔
3072

3073
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
6,784!
3074

3075
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
6,788✔
3076
}
3077

3078
typedef struct {
3079
  SMergeTree  mergeTree;
3080
  SMergeTree *pMergeTree;
3081
} SFSLastIter;
3082

3083
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
6,818✔
3084
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
3085
  int32_t code = 0;
6,818✔
3086
  destroySttBlockReader(pr->pLDataIterArray, NULL);
6,818✔
3087
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
6,834✔
3088
  if (pr->pLDataIterArray == NULL) return terrno;
6,833!
3089

3090
  SMergeTreeConf conf = {
6,833✔
3091
      .uid = uid,
3092
      .suid = suid,
3093
      .pTsdb = pTsdb,
3094
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
3095
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
3096
      .strictTimeRange = false,
3097
      .pSchema = pTSchema,
3098
      .pCurrentFileset = pFileSet,
3099
      .backward = 1,
3100
      .pSttFileBlockIterArray = pr->pLDataIterArray,
6,833✔
3101
      .pCols = aCols,
3102
      .numOfCols = nCols,
3103
      .loadTombFn = loadSttTomb,
3104
      .pReader = pr,
3105
      .idstr = pr->idstr,
6,833✔
3106
      .pCurRowKey = &pr->rowKey,
6,833✔
3107
  };
3108

3109
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
6,833!
3110

3111
  iter->pMergeTree = &iter->mergeTree;
6,787✔
3112

3113
  TAOS_RETURN(code);
6,787✔
3114
}
3115

3116
static int32_t lastIterClose(SFSLastIter **iter) {
2✔
3117
  int32_t code = 0;
2✔
3118

3119
  if ((*iter)->pMergeTree) {
2!
3120
    tMergeTreeClose((*iter)->pMergeTree);
2✔
3121
    (*iter)->pMergeTree = NULL;
2✔
3122
  }
3123

3124
  *iter = NULL;
2✔
3125

3126
  TAOS_RETURN(code);
2✔
3127
}
3128

3129
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
6,791✔
3130
  bool hasVal = false;
6,791✔
3131
  *ppRow = NULL;
6,791✔
3132

3133
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
6,791✔
3134
  if (code != 0) {
6,792!
3135
    return code;
×
3136
  }
3137

3138
  if (!hasVal) {
6,792✔
3139
    *ppRow = NULL;
6,230✔
3140
    TAOS_RETURN(code);
6,230✔
3141
  }
3142

3143
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
562✔
3144
  TAOS_RETURN(code);
562✔
3145
}
3146

3147
typedef enum SFSNEXTROWSTATES {
3148
  SFSNEXTROW_FS,
3149
  SFSNEXTROW_FILESET,
3150
  SFSNEXTROW_INDEXLIST,
3151
  SFSNEXTROW_BRINBLOCK,
3152
  SFSNEXTROW_BRINRECORD,
3153
  SFSNEXTROW_BLOCKDATA,
3154
  SFSNEXTROW_BLOCKROW,
3155
  SFSNEXTROW_NEXTSTTROW
3156
} SFSNEXTROWSTATES;
3157

3158
struct CacheNextRowIter;
3159

3160
typedef struct SFSNextRowIter {
3161
  SFSNEXTROWSTATES         state;         // [input]
3162
  SBlockIdx               *pBlockIdxExp;  // [input]
3163
  STSchema                *pTSchema;      // [input]
3164
  tb_uid_t                 suid;
3165
  tb_uid_t                 uid;
3166
  int32_t                  iFileSet;
3167
  STFileSet               *pFileSet;
3168
  TFileSetArray           *aDFileSet;
3169
  SArray                  *pIndexList;
3170
  int32_t                  iBrinIndex;
3171
  SBrinBlock               brinBlock;
3172
  SBrinBlock              *pBrinBlock;
3173
  int32_t                  iBrinRecord;
3174
  SBrinRecord              brinRecord;
3175
  SBlockData               blockData;
3176
  SBlockData              *pBlockData;
3177
  int32_t                  nRow;
3178
  int32_t                  iRow;
3179
  TSDBROW                  row;
3180
  int64_t                  lastTs;
3181
  SFSLastIter              lastIter;
3182
  SFSLastIter             *pLastIter;
3183
  int8_t                   lastEmpty;
3184
  TSDBROW                 *pLastRow;
3185
  SRow                    *pTSRow;
3186
  SRowMerger               rowMerger;
3187
  SCacheRowsReader        *pr;
3188
  struct CacheNextRowIter *pRowIter;
3189
} SFSNextRowIter;
3190

3191
static void clearLastFileSet(SFSNextRowIter *state);
3192

3193
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
1,148✔
3194
                                int nCols) {
3195
  int32_t         code = 0, lino = 0;
1,148✔
3196
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
1,148✔
3197
  STsdb          *pTsdb = state->pr->pTsdb;
1,148✔
3198

3199
  if (SFSNEXTROW_FS == state->state) {
1,148✔
3200
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
1,143✔
3201

3202
    state->state = SFSNEXTROW_FILESET;
1,143✔
3203
  }
3204

3205
  if (SFSNEXTROW_FILESET == state->state) {
1,148✔
3206
  _next_fileset:
1,143✔
3207
    clearLastFileSet(state);
7,373✔
3208

3209
    if (--state->iFileSet < 0) {
7,370✔
3210
      *ppRow = NULL;
552✔
3211

3212
      TAOS_RETURN(code);
552✔
3213
    } else {
3214
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
6,818✔
3215
    }
3216

3217
    STFileObj **pFileObj = state->pFileSet->farr;
6,818✔
3218
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
6,818!
3219
      if (state->pFileSet != state->pr->pCurFileSet) {
16!
3220
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
16✔
3221
        const char           *filesName[4] = {0};
16✔
3222
        if (pFileObj[0] != NULL) {
16!
3223
          conf.files[0].file = *pFileObj[0]->f;
16✔
3224
          conf.files[0].exist = true;
16✔
3225
          filesName[0] = pFileObj[0]->fname;
16✔
3226

3227
          conf.files[1].file = *pFileObj[1]->f;
16✔
3228
          conf.files[1].exist = true;
16✔
3229
          filesName[1] = pFileObj[1]->fname;
16✔
3230

3231
          conf.files[2].file = *pFileObj[2]->f;
16✔
3232
          conf.files[2].exist = true;
16✔
3233
          filesName[2] = pFileObj[2]->fname;
16✔
3234
        }
3235

3236
        if (pFileObj[3] != NULL) {
16!
3237
          conf.files[3].exist = true;
16✔
3238
          conf.files[3].file = *pFileObj[3]->f;
16✔
3239
          filesName[3] = pFileObj[3]->fname;
16✔
3240
        }
3241

3242
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
16!
3243

3244
        state->pr->pCurFileSet = state->pFileSet;
16✔
3245

3246
        code = loadDataTomb(state->pr, state->pr->pFileReader);
16✔
3247
        if (code != TSDB_CODE_SUCCESS) {
16!
3248
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3249
                    tstrerror(code));
3250
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3251
        }
3252

3253
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
16!
3254
      }
3255

3256
      if (!state->pIndexList) {
16!
3257
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
16✔
3258
        if (!state->pIndexList) {
16!
3259
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3260
        }
3261
      } else {
3262
        taosArrayClear(state->pIndexList);
×
3263
      }
3264

3265
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
16✔
3266

3267
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
32✔
3268
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
16✔
3269
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
16!
3270
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
16✔
3271
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
4!
3272
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3273
            }
3274
          }
3275
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
3276
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
3277
          break;
3278
        }
3279
      }
3280

3281
      int indexSize = TARRAY_SIZE(state->pIndexList);
16✔
3282
      if (indexSize <= 0) {
16✔
3283
        goto _check_stt_data;
14✔
3284
      }
3285

3286
      state->state = SFSNEXTROW_INDEXLIST;
2✔
3287
      state->iBrinIndex = 1;
2✔
3288
    }
3289

3290
  _check_stt_data:
6,802✔
3291
    if (state->pFileSet != state->pr->pCurFileSet) {
6,818✔
3292
      state->pr->pCurFileSet = state->pFileSet;
6,757✔
3293
    }
3294

3295
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
6,818!
3296
                                 state->pr, state->lastTs, aCols, nCols),
3297
                    &lino, _err);
3298

3299
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
6,784!
3300

3301
    if (!state->pLastRow) {
6,786✔
3302
      state->lastEmpty = 1;
6,229✔
3303

3304
      if (SFSNEXTROW_INDEXLIST != state->state) {
6,229✔
3305
        clearLastFileSet(state);
6,226✔
3306
        goto _next_fileset;
6,229✔
3307
      }
3308
    } else {
3309
      state->lastEmpty = 0;
557✔
3310

3311
      if (SFSNEXTROW_INDEXLIST != state->state) {
557!
3312
        state->state = SFSNEXTROW_NEXTSTTROW;
560✔
3313

3314
        *ppRow = state->pLastRow;
560✔
3315
        state->pLastRow = NULL;
560✔
3316

3317
        TAOS_RETURN(code);
560✔
3318
      }
3319
    }
3320

3321
    state->pLastIter = &state->lastIter;
×
3322
  }
3323

3324
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
5!
3325
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
6!
3326

3327
    if (!state->pLastRow) {
6✔
3328
      if (state->pLastIter) {
1!
3329
        code = lastIterClose(&state->pLastIter);
×
3330
        if (code != TSDB_CODE_SUCCESS) {
×
3331
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3332
                    tstrerror(code));
3333
          TAOS_RETURN(code);
×
3334
        }
3335
      }
3336

3337
      clearLastFileSet(state);
1✔
3338
      state->state = SFSNEXTROW_FILESET;
1✔
3339
      goto _next_fileset;
1✔
3340
    } else {
3341
      *ppRow = state->pLastRow;
5✔
3342
      state->pLastRow = NULL;
5✔
3343

3344
      TAOS_RETURN(code);
5✔
3345
    }
3346
  }
3347

3348
  if (SFSNEXTROW_INDEXLIST == state->state) {
×
3349
    SBrinBlk *pBrinBlk = NULL;
2✔
3350
  _next_brinindex:
2✔
3351
    if (--state->iBrinIndex < 0) {
2!
3352
      if (state->pLastRow) {
×
3353
        state->state = SFSNEXTROW_NEXTSTTROW;
×
3354
        *ppRow = state->pLastRow;
×
3355
        state->pLastRow = NULL;
×
3356
        return code;
×
3357
      }
3358

3359
      clearLastFileSet(state);
×
3360
      goto _next_fileset;
×
3361
    } else {
3362
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
2✔
3363
    }
3364

3365
    if (!state->pBrinBlock) {
2!
3366
      state->pBrinBlock = &state->brinBlock;
2✔
3367
    } else {
3368
      tBrinBlockClear(&state->brinBlock);
×
3369
    }
3370

3371
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
2!
3372

3373
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
2✔
3374
    state->state = SFSNEXTROW_BRINBLOCK;
2✔
3375
  }
3376

3377
  if (SFSNEXTROW_BRINBLOCK == state->state) {
×
3378
  _next_brinrecord:
2✔
3379
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
2!
3380
      tBrinBlockClear(&state->brinBlock);
×
3381
      goto _next_brinindex;
×
3382
    }
3383

3384
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
2!
3385

3386
    SBrinRecord *pRecord = &state->brinRecord;
2✔
3387
    if (pRecord->uid != state->uid) {
2!
3388
      // TODO: goto next brin block early
3389
      --state->iBrinRecord;
×
3390
      goto _next_brinrecord;
×
3391
    }
3392

3393
    state->state = SFSNEXTROW_BRINRECORD;
2✔
3394
  }
3395

3396
  if (SFSNEXTROW_BRINRECORD == state->state) {
×
3397
    SBrinRecord *pRecord = &state->brinRecord;
2✔
3398

3399
    if (!state->pBlockData) {
2!
3400
      state->pBlockData = &state->blockData;
2✔
3401

3402
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
2!
3403
    } else {
3404
      tBlockDataReset(state->pBlockData);
×
3405
    }
3406

3407
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
2!
3408
      --nCols;
2✔
3409
      ++aCols;
2✔
3410
    }
3411

3412
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
2!
3413
                                                      state->pTSchema, aCols, nCols),
3414
                    &lino, _err);
3415

3416
    state->nRow = state->blockData.nRow;
2✔
3417
    state->iRow = state->nRow - 1;
2✔
3418

3419
    state->state = SFSNEXTROW_BLOCKROW;
2✔
3420
  }
3421

3422
  if (SFSNEXTROW_BLOCKROW == state->state) {
×
3423
    if (state->iRow < 0) {
2!
3424
      --state->iBrinRecord;
×
3425
      goto _next_brinrecord;
×
3426
    }
3427

3428
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
2✔
3429
    if (!state->pLastIter) {
2!
3430
      *ppRow = &state->row;
×
3431
      --state->iRow;
×
3432
      return code;
2✔
3433
    }
3434

3435
    if (!state->pLastRow) {
2!
3436
      // get next row from fslast and process with fs row, --state->Row if select fs row
3437
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
2!
3438
    }
3439

3440
    if (!state->pLastRow) {
2!
3441
      if (state->pLastIter) {
2!
3442
        code = lastIterClose(&state->pLastIter);
2✔
3443
        if (code != TSDB_CODE_SUCCESS) {
2!
3444
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3445
                    tstrerror(code));
3446
          TAOS_RETURN(code);
×
3447
        }
3448
      }
3449

3450
      *ppRow = &state->row;
2✔
3451
      --state->iRow;
2✔
3452
      return code;
2✔
3453
    }
3454

3455
    // process state->pLastRow & state->row
3456
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
3457
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
3458
    if (lastRowTs > rowTs) {
×
3459
      *ppRow = state->pLastRow;
×
3460
      state->pLastRow = NULL;
×
3461

3462
      TAOS_RETURN(code);
×
3463
    } else if (lastRowTs < rowTs) {
×
3464
      *ppRow = &state->row;
×
3465
      --state->iRow;
×
3466

3467
      TAOS_RETURN(code);
×
3468
    } else {
3469
      // TODO: merge rows and *ppRow = mergedRow
3470
      SRowMerger *pMerger = &state->rowMerger;
×
3471
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
3472
      if (code != TSDB_CODE_SUCCESS) {
×
3473
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3474
                  tstrerror(code));
3475
        TAOS_RETURN(code);
×
3476
      }
3477

3478
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
3479
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
3480

3481
      if (state->pTSRow) {
×
3482
        taosMemoryFree(state->pTSRow);
×
3483
        state->pTSRow = NULL;
×
3484
      }
3485

3486
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
3487

3488
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
3489
      *ppRow = &state->row;
×
3490
      --state->iRow;
×
3491

3492
      tsdbRowMergerClear(pMerger);
×
3493

3494
      TAOS_RETURN(code);
×
3495
    }
3496
  }
3497

3498
_err:
×
3499
  clearLastFileSet(state);
×
3500

3501
  *ppRow = NULL;
×
3502

3503
  if (code) {
×
3504
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3505
              tstrerror(code));
3506
  }
3507

3508
  TAOS_RETURN(code);
×
3509
}
3510

3511
typedef struct CacheNextRowIter {
3512
  SArray           *pMemDelData;
3513
  SArray           *pSkyline;
3514
  int64_t           iSkyline;
3515
  SBlockIdx         idx;
3516
  SMemNextRowIter   memState;
3517
  SMemNextRowIter   imemState;
3518
  SFSNextRowIter    fsState;
3519
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
3520
  TsdbNextRowState  input[3];
3521
  SCacheRowsReader *pr;
3522
  STsdb            *pTsdb;
3523
} CacheNextRowIter;
3524

3525
int32_t clearNextRowFromFS(void *iter) {
1,144✔
3526
  int32_t code = 0;
1,144✔
3527

3528
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
1,144✔
3529
  if (!state) {
1,144!
3530
    TAOS_RETURN(code);
×
3531
  }
3532

3533
  if (state->pLastIter) {
1,144!
3534
    code = lastIterClose(&state->pLastIter);
×
3535
    if (code != TSDB_CODE_SUCCESS) {
×
3536
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3537
      TAOS_RETURN(code);
×
3538
    }
3539
  }
3540

3541
  if (state->pBlockData) {
1,144✔
3542
    tBlockDataDestroy(state->pBlockData);
2✔
3543
    state->pBlockData = NULL;
2✔
3544
  }
3545

3546
  if (state->pBrinBlock) {
1,144✔
3547
    tBrinBlockDestroy(state->pBrinBlock);
2✔
3548
    state->pBrinBlock = NULL;
2✔
3549
  }
3550

3551
  if (state->pIndexList) {
1,144✔
3552
    taosArrayDestroy(state->pIndexList);
16✔
3553
    state->pIndexList = NULL;
16✔
3554
  }
3555

3556
  if (state->pTSRow) {
1,144!
3557
    taosMemoryFree(state->pTSRow);
×
3558
    state->pTSRow = NULL;
×
3559
  }
3560

3561
  if (state->pRowIter->pSkyline) {
1,144✔
3562
    taosArrayDestroy(state->pRowIter->pSkyline);
1,132✔
3563
    state->pRowIter->pSkyline = NULL;
1,133✔
3564
  }
3565

3566
  TAOS_RETURN(code);
1,145✔
3567
}
3568

3569
static void clearLastFileSet(SFSNextRowIter *state) {
13,574✔
3570
  if (state->pLastIter) {
13,574!
3571
    int code = lastIterClose(&state->pLastIter);
×
3572
    if (code != TSDB_CODE_SUCCESS) {
×
3573
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3574
      return;
×
3575
    }
3576
  }
3577

3578
  if (state->pBlockData) {
13,574!
3579
    tBlockDataDestroy(state->pBlockData);
×
3580
    state->pBlockData = NULL;
×
3581
  }
3582

3583
  if (state->pr->pFileReader) {
13,574✔
3584
    tsdbDataFileReaderClose(&state->pr->pFileReader);
16✔
3585
    state->pr->pFileReader = NULL;
16✔
3586

3587
    state->pr->pCurFileSet = NULL;
16✔
3588
  }
3589

3590
  if (state->pTSRow) {
13,574!
3591
    taosMemoryFree(state->pTSRow);
×
3592
    state->pTSRow = NULL;
×
3593
  }
3594

3595
  if (state->pRowIter->pSkyline) {
13,574✔
3596
    taosArrayDestroy(state->pRowIter->pSkyline);
1✔
3597
    state->pRowIter->pSkyline = NULL;
1✔
3598

3599
    void   *pe = NULL;
1✔
3600
    int32_t iter = 0;
1✔
3601
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
2✔
3602
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
1✔
3603
      taosArrayDestroy(pInfo->pTombData);
1✔
3604
      pInfo->pTombData = NULL;
1✔
3605
    }
3606
  }
3607
}
3608

3609
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
1,144✔
3610
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
3611
                               SCacheRowsReader *pr) {
3612
  int32_t code = 0, lino = 0;
1,144✔
3613

3614
  STbData *pMem = NULL;
1,144✔
3615
  if (pReadSnap->pMem) {
1,144!
3616
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
1,145✔
3617
  }
3618

3619
  STbData *pIMem = NULL;
1,144✔
3620
  if (pReadSnap->pIMem) {
1,144!
3621
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
3622
  }
3623

3624
  pIter->pTsdb = pTsdb;
1,144✔
3625

3626
  pIter->pMemDelData = NULL;
1,144✔
3627

3628
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
1,144!
3629

3630
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
1,145✔
3631

3632
  pIter->fsState.pRowIter = pIter;
1,145✔
3633
  pIter->fsState.state = SFSNEXTROW_FS;
1,145✔
3634
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
1,145✔
3635
  pIter->fsState.pBlockIdxExp = &pIter->idx;
1,145✔
3636
  pIter->fsState.pTSchema = pTSchema;
1,145✔
3637
  pIter->fsState.suid = suid;
1,145✔
3638
  pIter->fsState.uid = uid;
1,145✔
3639
  pIter->fsState.lastTs = lastTs;
1,145✔
3640
  pIter->fsState.pr = pr;
1,145✔
3641

3642
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
1,145✔
3643
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
1,145✔
3644
  pIter->input[2] =
1,145✔
3645
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
1,145✔
3646

3647
  if (pMem) {
1,145✔
3648
    pIter->memState.pMem = pMem;
571✔
3649
    pIter->memState.state = SMEMNEXTROW_ENTER;
571✔
3650
    pIter->memState.lastTs = lastTs;
571✔
3651
    pIter->input[0].stop = false;
571✔
3652
    pIter->input[0].next = true;
571✔
3653
  }
3654

3655
  if (pIMem) {
1,145!
3656
    pIter->imemState.pMem = pIMem;
×
3657
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
3658
    pIter->imemState.lastTs = lastTs;
×
3659
    pIter->input[1].stop = false;
×
3660
    pIter->input[1].next = true;
×
3661
  }
3662

3663
  pIter->pr = pr;
1,145✔
3664

3665
_err:
1,145✔
3666
  TAOS_RETURN(code);
1,145✔
3667
}
3668

3669
static void nextRowIterClose(CacheNextRowIter *pIter) {
1,144✔
3670
  for (int i = 0; i < 3; ++i) {
4,578✔
3671
    if (pIter->input[i].nextRowClearFn) {
3,433✔
3672
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
1,145✔
3673
    }
3674
  }
3675

3676
  if (pIter->pSkyline) {
1,145!
3677
    taosArrayDestroy(pIter->pSkyline);
×
3678
  }
3679

3680
  if (pIter->pMemDelData) {
1,145!
3681
    taosArrayDestroy(pIter->pMemDelData);
1,145✔
3682
  }
3683
}
1,145✔
3684

3685
// iterate next row non deleted backward ts, version (from high to low)
3686
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
1,161✔
3687
                              int16_t *aCols, int nCols) {
3688
  int32_t code = 0, lino = 0;
1,161✔
3689

3690
  for (;;) {
2✔
3691
    for (int i = 0; i < 3; ++i) {
4,649✔
3692
      if (pIter->input[i].next && !pIter->input[i].stop) {
3,488!
3693
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
1,734!
3694
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3695
                        &lino, _err);
3696

3697
        if (pIter->input[i].pRow == NULL) {
1,732✔
3698
          pIter->input[i].stop = true;
589✔
3699
          pIter->input[i].next = false;
589✔
3700
        }
3701
      }
3702
    }
3703

3704
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
1,161!
3705
      *ppRow = NULL;
19✔
3706
      *pIgnoreEarlierTs =
19✔
3707
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
19!
3708

3709
      TAOS_RETURN(code);
1,163✔
3710
    }
3711

3712
    // select maxpoint(s) from mem, imem, fs and last
3713
    TSDBROW *max[4] = {0};
1,142✔
3714
    int      iMax[4] = {-1, -1, -1, -1};
1,142✔
3715
    int      nMax = 0;
1,142✔
3716
    SRowKey  maxKey = {.ts = TSKEY_MIN};
1,142✔
3717

3718
    for (int i = 0; i < 3; ++i) {
4,576✔
3719
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
3,433!
3720
        STsdbRowKey tsdbRowKey = {0};
1,145✔
3721
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
1,145✔
3722

3723
        // merging & deduplicating on client side
3724
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
1,146✔
3725
        if (c <= 0) {
1,146!
3726
          if (c < 0) {
1,146!
3727
            nMax = 0;
1,146✔
3728
            maxKey = tsdbRowKey.key;
1,146✔
3729
          }
3730

3731
          iMax[nMax] = i;
1,146✔
3732
          max[nMax++] = pIter->input[i].pRow;
1,146✔
3733
        }
3734
        pIter->input[i].next = false;
1,146✔
3735
      }
3736
    }
3737

3738
    // delete detection
3739
    TSDBROW *merge[4] = {0};
1,143✔
3740
    int      iMerge[4] = {-1, -1, -1, -1};
1,143✔
3741
    int      nMerge = 0;
1,143✔
3742
    for (int i = 0; i < nMax; ++i) {
2,288✔
3743
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
1,145✔
3744

3745
      if (!pIter->pSkyline) {
1,145✔
3746
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
1,134✔
3747
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
1,133!
3748

3749
        uint64_t        uid = pIter->idx.uid;
1,133✔
3750
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
1,133✔
3751
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
1,134!
3752

3753
        if (pInfo->pTombData == NULL) {
1,134✔
3754
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
1,130✔
3755
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
1,130!
3756
        }
3757

3758
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
1,134!
3759
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3760
        }
3761

3762
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
1,134✔
3763
        if (delSize > 0) {
1,134✔
3764
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
4✔
3765
          TAOS_CHECK_GOTO(code, &lino, _err);
4!
3766
        }
3767
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
1,134✔
3768
      }
3769

3770
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
1,145✔
3771
      if (!deleted) {
1,145✔
3772
        iMerge[nMerge] = iMax[i];
1,144✔
3773
        merge[nMerge++] = max[i];
1,144✔
3774
      }
3775

3776
      pIter->input[iMax[i]].next = deleted;
1,145✔
3777
    }
3778

3779
    if (nMerge > 0) {
1,143✔
3780
      pIter->input[iMerge[0]].next = true;
1,141✔
3781

3782
      *ppRow = merge[0];
1,141✔
3783

3784
      TAOS_RETURN(code);
1,141✔
3785
    }
3786
  }
3787

3788
_err:
×
3789
  if (code) {
×
3790
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3791
  }
3792

3793
  TAOS_RETURN(code);
×
3794
}
3795

3796
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
1,143✔
3797
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
1,143✔
3798
  if (NULL == pColArray) {
1,144!
3799
    TAOS_RETURN(terrno);
×
3800
  }
3801

3802
  for (int32_t i = 0; i < nCols; ++i) {
3,537✔
3803
    int16_t  slotId = slotIds[i];
2,393✔
3804
    SLastCol col = {.rowKey.ts = 0,
2,393✔
3805
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
2,393✔
3806
    if (!taosArrayPush(pColArray, &col)) {
2,393!
3807
      TAOS_RETURN(terrno);
×
3808
    }
3809
  }
3810
  *ppColArray = pColArray;
1,144✔
3811

3812
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,144✔
3813
}
3814

3815
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
548✔
3816
                            int nCols, int16_t *slotIds) {
3817
  int32_t   code = 0, lino = 0;
548✔
3818
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
548✔
3819
  int16_t   nLastCol = nCols;
548✔
3820
  int16_t   noneCol = 0;
548✔
3821
  bool      setNoneCol = false;
548✔
3822
  bool      hasRow = false;
548✔
3823
  bool      ignoreEarlierTs = false;
548✔
3824
  SArray   *pColArray = NULL;
548✔
3825
  SColVal  *pColVal = &(SColVal){0};
548✔
3826

3827
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
548!
3828

3829
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
548✔
3830
  if (NULL == aColArray) {
548!
3831
    taosArrayDestroy(pColArray);
×
3832

3833
    TAOS_RETURN(terrno);
×
3834
  }
3835

3836
  for (int i = 0; i < nCols; ++i) {
1,696✔
3837
    if (!taosArrayPush(aColArray, &aCols[i])) {
2,296!
3838
      taosArrayDestroy(pColArray);
×
3839

3840
      TAOS_RETURN(terrno);
×
3841
    }
3842
  }
3843

3844
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
548✔
3845

3846
  // inverse iterator
3847
  CacheNextRowIter iter = {0};
548✔
3848
  code =
3849
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
548✔
3850
  TAOS_CHECK_GOTO(code, &lino, _err);
548!
3851

3852
  do {
3853
    TSDBROW *pRow = NULL;
566✔
3854
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
566✔
3855

3856
    if (!pRow) {
566✔
3857
      break;
540✔
3858
    }
3859

3860
    hasRow = true;
553✔
3861

3862
    int32_t sversion = TSDBROW_SVERSION(pRow);
553✔
3863
    if (sversion != -1) {
553✔
3864
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
300!
3865

3866
      pTSchema = pr->pCurrSchema;
300✔
3867
    }
3868
    // int16_t nCol = pTSchema->numOfCols;
3869

3870
    STsdbRowKey rowKey = {0};
553✔
3871
    tsdbRowGetKey(pRow, &rowKey);
553✔
3872

3873
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
553✔
3874
      lastRowKey = rowKey;
541✔
3875

3876
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
1,668✔
3877
        if (iCol >= nLastCol) {
1,127!
3878
          break;
×
3879
        }
3880
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
1,127✔
3881
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
1,126!
3882
          if (!setNoneCol) {
×
3883
            noneCol = iCol;
×
3884
            setNoneCol = true;
×
3885
          }
3886
          continue;
582✔
3887
        }
3888
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
1,126✔
3889
          continue;
40✔
3890
        }
3891
        if (slotIds[iCol] == 0) {
1,086✔
3892
          STColumn *pTColumn = &pTSchema->columns[0];
541✔
3893
          SValue    val = {.type = pTColumn->type};
541✔
3894
          VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
541✔
3895
          *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
541✔
3896

3897
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
541✔
3898
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
541!
3899

3900
          taosArraySet(pColArray, 0, &colTmp);
541✔
3901
          continue;
542✔
3902
        }
3903
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
545✔
3904

3905
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
546✔
3906
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
546!
3907

3908
        if (!COL_VAL_IS_VALUE(pColVal)) {
546✔
3909
          if (!setNoneCol) {
26✔
3910
            noneCol = iCol;
15✔
3911
            setNoneCol = true;
15✔
3912
          }
3913
        } else {
3914
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
520✔
3915
          if (aColIndex >= 0) {
520!
3916
            taosArrayRemove(aColArray, aColIndex);
520✔
3917
          }
3918
        }
3919
      }
3920
      if (!setNoneCol) {
541✔
3921
        // done, goto return pColArray
3922
        break;
527✔
3923
      } else {
3924
        continue;
14✔
3925
      }
3926
    }
3927

3928
    // merge into pColArray
3929
    setNoneCol = false;
12✔
3930
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
37✔
3931
      if (iCol >= nLastCol) {
25!
3932
        break;
×
3933
      }
3934
      // high version's column value
3935
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
25!
3936
        continue;
×
3937
      }
3938

3939
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
25✔
3940
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
25!
3941
        continue;
×
3942
      }
3943
      SColVal *tColVal = &lastColVal->colVal;
25✔
3944
      if (COL_VAL_IS_VALUE(tColVal)) continue;
25✔
3945

3946
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
19✔
3947
      if (COL_VAL_IS_VALUE(pColVal)) {
19✔
3948
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
16✔
3949
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
16!
3950

3951
        tsdbCacheFreeSLastColItem(lastColVal);
16✔
3952
        taosArraySet(pColArray, iCol, &lastCol);
16✔
3953
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
16✔
3954
        if (aColIndex >= 0) {
16!
3955
          taosArrayRemove(aColArray, aColIndex);
16✔
3956
        }
3957
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
3!
3958
        noneCol = iCol;
3✔
3959
        setNoneCol = true;
3✔
3960
      }
3961
    }
3962
  } while (setNoneCol);
26✔
3963

3964
  if (!hasRow) {
548✔
3965
    if (ignoreEarlierTs) {
6!
3966
      taosArrayDestroy(pColArray);
×
3967
      pColArray = NULL;
×
3968
    } else {
3969
      taosArrayClear(pColArray);
6✔
3970
    }
3971
  }
3972
  *ppLastArray = pColArray;
548✔
3973

3974
  nextRowIterClose(&iter);
548✔
3975
  taosArrayDestroy(aColArray);
548✔
3976

3977
  TAOS_RETURN(code);
548✔
3978

3979
_err:
×
3980
  nextRowIterClose(&iter);
×
3981
  // taosMemoryFreeClear(pTSchema);
3982
  *ppLastArray = NULL;
×
3983
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
3984
  taosArrayDestroy(aColArray);
×
3985

3986
  if (code) {
×
3987
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3988
              tstrerror(code));
3989
  }
3990

3991
  TAOS_RETURN(code);
×
3992
}
3993

3994
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
596✔
3995
                               int nCols, int16_t *slotIds) {
3996
  int32_t   code = 0, lino = 0;
596✔
3997
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
596✔
3998
  int16_t   nLastCol = nCols;
596✔
3999
  int16_t   noneCol = 0;
596✔
4000
  bool      setNoneCol = false;
596✔
4001
  bool      hasRow = false;
596✔
4002
  bool      ignoreEarlierTs = false;
596✔
4003
  SArray   *pColArray = NULL;
596✔
4004
  SColVal  *pColVal = &(SColVal){0};
596✔
4005

4006
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
596!
4007

4008
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
597✔
4009
  if (NULL == aColArray) {
596!
4010
    taosArrayDestroy(pColArray);
×
4011

4012
    TAOS_RETURN(terrno);
×
4013
  }
4014

4015
  for (int i = 0; i < nCols; ++i) {
1,842✔
4016
    if (!taosArrayPush(aColArray, &aCols[i])) {
2,489!
4017
      taosArrayDestroy(pColArray);
×
4018

4019
      TAOS_RETURN(terrno);
×
4020
    }
4021
  }
4022

4023
  // inverse iterator
4024
  CacheNextRowIter iter = {0};
598✔
4025
  code =
4026
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
598✔
4027
  TAOS_CHECK_GOTO(code, &lino, _err);
597!
4028

4029
  do {
4030
    TSDBROW *pRow = NULL;
597✔
4031
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
597✔
4032

4033
    if (!pRow) {
597✔
4034
      break;
6✔
4035
    }
4036

4037
    hasRow = true;
591✔
4038

4039
    int32_t sversion = TSDBROW_SVERSION(pRow);
591✔
4040
    if (sversion != -1) {
591✔
4041
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
277!
4042

4043
      pTSchema = pr->pCurrSchema;
277✔
4044
    }
4045
    // int16_t nCol = pTSchema->numOfCols;
4046

4047
    STsdbRowKey rowKey = {0};
591✔
4048
    tsdbRowGetKey(pRow, &rowKey);
591✔
4049

4050
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
1,816✔
4051
      if (iCol >= nLastCol) {
1,225!
4052
        break;
×
4053
      }
4054
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
1,225✔
4055
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
1,225!
4056
        continue;
591✔
4057
      }
4058
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
1,225!
4059
        continue;
×
4060
      }
4061
      if (slotIds[iCol] == 0) {
1,225✔
4062
        STColumn *pTColumn = &pTSchema->columns[0];
591✔
4063
        SValue    val = {.type = pTColumn->type};
591✔
4064
        VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
591✔
4065
        *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
591✔
4066

4067
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
591✔
4068
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
591!
4069

4070
        taosArraySet(pColArray, 0, &colTmp);
591✔
4071
        continue;
591✔
4072
      }
4073
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
634✔
4074

4075
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
634✔
4076
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
634!
4077

4078
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
634✔
4079
      if (aColIndex >= 0) {
634!
4080
        taosArrayRemove(aColArray, aColIndex);
634✔
4081
      }
4082
    }
4083

4084
    break;
591✔
4085
  } while (1);
4086

4087
  if (!hasRow) {
597✔
4088
    if (ignoreEarlierTs) {
6!
4089
      taosArrayDestroy(pColArray);
×
4090
      pColArray = NULL;
×
4091
    } else {
4092
      taosArrayClear(pColArray);
6✔
4093
    }
4094
  }
4095
  *ppLastArray = pColArray;
597✔
4096

4097
  nextRowIterClose(&iter);
597✔
4098
  taosArrayDestroy(aColArray);
597✔
4099

4100
  TAOS_RETURN(code);
597✔
4101

4102
_err:
×
4103
  nextRowIterClose(&iter);
×
4104

4105
  *ppLastArray = NULL;
×
4106
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
4107
  taosArrayDestroy(aColArray);
×
4108

4109
  if (code) {
×
4110
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4111
              tstrerror(code));
4112
  }
4113

4114
  TAOS_RETURN(code);
×
4115
}
4116

4117
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
×
4118

4119
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
165✔
4120
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
165✔
4121
}
165✔
4122

4123
#ifdef BUILD_NO_CALL
4124
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
4125
#endif
4126

4127
size_t tsdbCacheGetUsage(SVnode *pVnode) {
260,863✔
4128
  size_t usage = 0;
260,863✔
4129
  if (pVnode->pTsdb != NULL) {
260,863!
4130
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
260,863✔
4131
  }
4132

4133
  return usage;
260,863✔
4134
}
4135

4136
int32_t tsdbCacheGetElems(SVnode *pVnode) {
260,863✔
4137
  int32_t elems = 0;
260,863✔
4138
  if (pVnode->pTsdb != NULL) {
260,863!
4139
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
260,863✔
4140
  }
4141

4142
  return elems;
260,863✔
4143
}
4144

4145
#ifdef USE_SHARED_STORAGE
4146
// block cache
4147
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
×
4148
  struct {
4149
    int32_t fid;
4150
    int64_t commitID;
4151
    int64_t blkno;
4152
  } bKey = {0};
×
4153

4154
  bKey.fid = fid;
×
4155
  bKey.commitID = commitID;
×
4156
  bKey.blkno = blkno;
×
4157

4158
  *len = sizeof(bKey);
×
4159
  memcpy(key, &bKey, *len);
×
4160
}
×
4161

4162
static int32_t tsdbCacheLoadBlockSs(STsdbFD *pFD, uint8_t **ppBlock) {
×
4163
  int32_t code = 0;
×
4164

4165
  int64_t block_size = tsSsBlockSize * pFD->szPage;
×
4166
  int64_t block_offset = (pFD->blkno - 1) * block_size;
×
4167
  
4168
  char* buf = taosMemoryMalloc(block_size);
×
4169
  if (buf == NULL) {
×
4170
    code = TSDB_CODE_OUT_OF_MEMORY;
×
4171
    goto _exit;
×
4172
  }
4173

4174
  // TODO: pFD->objName is not initialized, but this function is never called.
4175
  code = tssReadFileFromDefault(pFD->objName, block_offset, buf, &block_size);
×
4176
  if (code != TSDB_CODE_SUCCESS) {
×
4177
    taosMemoryFree(buf);
×
4178
    goto _exit;
×
4179
  }
4180
  *ppBlock = buf;
×
4181

4182
_exit:
×
4183
  return code;
×
4184
}
4185

4186
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
×
4187
  (void)ud;
4188
  uint8_t *pBlock = (uint8_t *)value;
×
4189

4190
  taosMemoryFree(pBlock);
×
4191
}
×
4192

4193
int32_t tsdbCacheGetBlockSs(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
4194
  int32_t code = 0;
×
4195
  char    key[128] = {0};
×
4196
  int     keyLen = 0;
×
4197

4198
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
4199
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
4200
  if (!h) {
×
4201
    STsdb *pTsdb = pFD->pTsdb;
×
4202
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
4203

4204
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
4205
    if (!h) {
×
4206
      uint8_t *pBlock = NULL;
×
4207
      code = tsdbCacheLoadBlockSs(pFD, &pBlock);
×
4208
      //  if table's empty or error, return code of -1
4209
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
4210
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4211

4212
        *handle = NULL;
×
4213
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
4214
          code = TSDB_CODE_OUT_OF_MEMORY;
×
4215
        }
4216

4217
        TAOS_RETURN(code);
×
4218
      }
4219

4220
      size_t              charge = tsSsBlockSize * pFD->szPage;
×
4221
      _taos_lru_deleter_t deleter = deleteBCache;
×
4222
      LRUStatus           status =
4223
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
4224
      if (status != TAOS_LRU_STATUS_OK) {
4225
        // code = -1;
4226
      }
4227
    }
4228

4229
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4230
  }
4231

4232
  *handle = h;
×
4233

4234
  TAOS_RETURN(code);
×
4235
}
4236

4237
int32_t tsdbCacheGetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
×
4238
  if (!tsSsEnabled) {
×
4239
    return TSDB_CODE_OPS_NOT_SUPPORT;
×
4240
  }
4241

4242
  int32_t code = 0;
×
4243
  char    key[128] = {0};
×
4244
  int     keyLen = 0;
×
4245

4246
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
4247
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
×
4248

4249
  return code;
×
4250
}
4251

4252
void tsdbCacheSetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
×
4253
  if (!tsSsEnabled) {
×
4254
    return;
×
4255
  }
4256

4257
  char       key[128] = {0};
×
4258
  int        keyLen = 0;
×
4259
  LRUHandle *handle = NULL;
×
4260

4261
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
4262
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
×
4263
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
×
4264
  if (!handle) {
×
4265
    size_t              charge = pFD->szPage;
×
4266
    _taos_lru_deleter_t deleter = deleteBCache;
×
4267
    uint8_t            *pPg = taosMemoryMalloc(charge);
×
4268
    if (!pPg) {
×
4269
      (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4270

4271
      return;  // ignore error with ss cache and leave error untouched
×
4272
    }
4273
    memcpy(pPg, pPage, charge);
×
4274

4275
    LRUStatus status =
4276
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
×
4277
    if (status != TAOS_LRU_STATUS_OK) {
4278
      // ignore cache updating if not ok
4279
      // code = TSDB_CODE_OUT_OF_MEMORY;
4280
    }
4281
  }
4282
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4283

4284
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
4285
}
4286
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc