• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.87
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tcs.h"
17
#include "tsdb.h"
18
#include "tsdbDataFileRW.h"
19
#include "tsdbIter.h"
20
#include "tsdbReadUtil.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
151,803✔
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
151,803✔
27
    tsdbTrace(" release lru cache failed");
48,896✔
28
  }
29
}
152,337✔
30

31
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
15,630✔
32
  int32_t code = 0, lino = 0;
15,630✔
33
#ifdef USE_S3
34
  int32_t    szPage = pTsdb->pVnode->config.tsdbPageSize;
15,630✔
35
  int64_t    szBlock = tsS3BlockSize <= 1024 ? 1024 : tsS3BlockSize;
15,630✔
36
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szBlock * szPage, 0, .5);
15,630✔
37
  if (pCache == NULL) {
15,686!
38
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
39
  }
40

41
  taosLRUCacheSetStrictCapacity(pCache, false);
15,686✔
42

43
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
15,685✔
44

45
  pTsdb->bCache = pCache;
15,686✔
46

47
_err:
15,686✔
48
  if (code) {
15,686!
49
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
50
              tstrerror(code));
51
  }
52
#endif
53
  TAOS_RETURN(code);
15,686✔
54
}
55

56
static void tsdbCloseBCache(STsdb *pTsdb) {
15,687✔
57
#ifdef USE_S3
58
  SLRUCache *pCache = pTsdb->bCache;
15,687✔
59
  if (pCache) {
15,687!
60
    int32_t elems = taosLRUCacheGetElems(pCache);
15,687✔
61
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
15,687✔
62
    taosLRUCacheEraseUnrefEntries(pCache);
15,687✔
63
    elems = taosLRUCacheGetElems(pCache);
15,687✔
64
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
15,687✔
65

66
    taosLRUCacheCleanup(pCache);
15,687✔
67

68
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
15,687✔
69
  }
70
#endif
71
}
15,687✔
72

73
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
15,684✔
74
  int32_t code = 0, lino = 0;
15,684✔
75
#ifdef USE_S3
76
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
15,684✔
77

78
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3PageCacheSize * szPage, 0, .5);
15,684✔
79
  if (pCache == NULL) {
15,686!
80
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
81
  }
82

83
  taosLRUCacheSetStrictCapacity(pCache, false);
15,686✔
84

85
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
15,686✔
86

87
  pTsdb->pgCache = pCache;
15,685✔
88

89
_err:
15,685✔
90
  if (code) {
15,685!
91
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
92
  }
93
#endif
94
  TAOS_RETURN(code);
15,685✔
95
}
96

97
static void tsdbClosePgCache(STsdb *pTsdb) {
15,687✔
98
#ifdef USE_S3
99
  SLRUCache *pCache = pTsdb->pgCache;
15,687✔
100
  if (pCache) {
15,687!
101
    int32_t elems = taosLRUCacheGetElems(pCache);
15,687✔
102
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
15,687✔
103
    taosLRUCacheEraseUnrefEntries(pCache);
15,687✔
104
    elems = taosLRUCacheGetElems(pCache);
15,687✔
105
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
15,687✔
106

107
    taosLRUCacheCleanup(pCache);
15,687✔
108

109
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
15,687✔
110
  }
111
#endif
112
}
15,687✔
113

114
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
115

116
enum {
117
  LFLAG_LAST_ROW = 0,
118
  LFLAG_LAST = 1,
119
};
120

121
typedef struct {
122
  tb_uid_t uid;
123
  int16_t  cid;
124
  int8_t   lflag;
125
} SLastKey;
126

127
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
128
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
129

130
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
15,404✔
131
  SVnode *pVnode = pTsdb->pVnode;
15,404✔
132
  vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
15,404✔
133

134
  int32_t offset = strlen(path);
15,641✔
135
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP);
15,641✔
136
}
15,641✔
137

138
static const char *myCmpName(void *state) {
82,993✔
139
  (void)state;
140
  return "myCmp";
82,993✔
141
}
142

143
static void myCmpDestroy(void *state) { (void)state; }
15,678✔
144

145
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
9,096,935✔
146
  (void)state;
147
  (void)alen;
148
  (void)blen;
149
  SLastKey *lhs = (SLastKey *)a;
9,096,935✔
150
  SLastKey *rhs = (SLastKey *)b;
9,096,935✔
151

152
  if (lhs->uid < rhs->uid) {
9,096,935✔
153
    return -1;
5,682,406✔
154
  } else if (lhs->uid > rhs->uid) {
3,414,529✔
155
    return 1;
439,837✔
156
  }
157

158
  if (lhs->cid < rhs->cid) {
2,974,692✔
159
    return -1;
1,937,876✔
160
  } else if (lhs->cid > rhs->cid) {
1,036,816✔
161
    return 1;
363,433✔
162
  }
163

164
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
673,383✔
165
    return -1;
451,107✔
166
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
222,276!
167
    return 1;
276,705✔
168
  }
169

170
  return 0;
×
171
}
172

173
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
15,686✔
174
  int32_t code = 0, lino = 0;
15,686✔
175
#ifdef USE_ROCKSDB
176
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
15,686✔
177
  if (NULL == cmp) {
15,677!
178
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
179
  }
180

181
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
15,677✔
182
  pTsdb->rCache.tableoptions = tableoptions;
15,678✔
183

184
  rocksdb_options_t *options = rocksdb_options_create();
15,678✔
185
  if (NULL == options) {
15,634!
186
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
187
  }
188

189
  rocksdb_options_set_create_if_missing(options, 1);
15,634✔
190
  rocksdb_options_set_comparator(options, cmp);
15,497✔
191
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
15,520✔
192
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
15,548✔
193
  // rocksdb_options_set_inplace_update_support(options, 1);
194
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
195

196
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
15,448✔
197
  if (NULL == writeoptions) {
15,504!
198
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
199
  }
200
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
15,504✔
201

202
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
15,463✔
203
  if (NULL == readoptions) {
15,426!
204
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
205
  }
206

207
  char *err = NULL;
15,426✔
208
  char  cachePath[TSDB_FILENAME_LEN] = {0};
15,426✔
209
  tsdbGetRocksPath(pTsdb, cachePath);
15,426✔
210

211
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
15,619✔
212
  if (NULL == db) {
15,678!
213
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
214
    rocksdb_free(err);
×
215

216
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
217
  }
218

219
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
15,678✔
220
  if (NULL == flushoptions) {
15,681!
221
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
222
  }
223

224
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
15,681✔
225

226
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
15,681!
227

228
  pTsdb->rCache.writebatch = writebatch;
15,672✔
229
  pTsdb->rCache.my_comparator = cmp;
15,672✔
230
  pTsdb->rCache.options = options;
15,672✔
231
  pTsdb->rCache.writeoptions = writeoptions;
15,672✔
232
  pTsdb->rCache.readoptions = readoptions;
15,672✔
233
  pTsdb->rCache.flushoptions = flushoptions;
15,672✔
234
  pTsdb->rCache.db = db;
15,672✔
235
  pTsdb->rCache.sver = -1;
15,672✔
236
  pTsdb->rCache.suid = -1;
15,672✔
237
  pTsdb->rCache.uid = -1;
15,672✔
238
  pTsdb->rCache.pTSchema = NULL;
15,672✔
239
  pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
15,672✔
240
  if (!pTsdb->rCache.ctxArray) {
15,681!
241
    TAOS_CHECK_GOTO(terrno, &lino, _err7);
×
242
  }
243

244
  TAOS_RETURN(code);
15,681✔
245

246
_err7:
×
247
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
×
248
_err6:
×
249
  rocksdb_writebatch_destroy(writebatch);
×
250
_err5:
×
251
  rocksdb_close(pTsdb->rCache.db);
×
252
_err4:
×
253
  rocksdb_readoptions_destroy(readoptions);
×
254
_err3:
×
255
  rocksdb_writeoptions_destroy(writeoptions);
×
256
_err2:
×
257
  rocksdb_options_destroy(options);
×
258
  rocksdb_block_based_options_destroy(tableoptions);
×
259
_err:
×
260
  rocksdb_comparator_destroy(cmp);
×
261
#endif
262
  TAOS_RETURN(code);
×
263
}
264

265
static void tsdbCloseRocksCache(STsdb *pTsdb) {
15,687✔
266
#ifdef USE_ROCKSDB
267
  rocksdb_close(pTsdb->rCache.db);
15,687✔
268
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
15,684✔
269
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
15,681✔
270
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
15,674✔
271
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
15,679✔
272
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
15,666✔
273
  rocksdb_options_destroy(pTsdb->rCache.options);
15,666✔
274
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
15,682✔
275
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
15,683✔
276
  taosMemoryFree(pTsdb->rCache.pTSchema);
15,671!
277
  taosArrayDestroy(pTsdb->rCache.ctxArray);
15,673✔
278
#endif
279
}
15,684✔
280

281
static void rocksMayWrite(STsdb *pTsdb, bool force) {
259,101✔
282
#ifdef USE_ROCKSDB
283
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
259,101✔
284

285
  int count = rocksdb_writebatch_count(wb);
259,101✔
286
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
259,118✔
287
    char *err = NULL;
2,684✔
288

289
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
2,684✔
290
    if (NULL != err) {
2,680!
291
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
292
                err);
293
      rocksdb_free(err);
×
294
    }
295

296
    rocksdb_writebatch_clear(wb);
2,680✔
297
  }
298
#endif
299
}
259,114✔
300

301
typedef struct {
302
  TSKEY  ts;
303
  int8_t dirty;
304
  struct {
305
    int16_t cid;
306
    int8_t  type;
307
    int8_t  flag;
308
    union {
309
      int64_t val;
310
      struct {
311
        uint32_t nData;
312
        uint8_t *pData;
313
      };
314
    } value;
315
  } colVal;
316
} SLastColV0;
317

318
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
1,206✔
319
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
1,206✔
320

321
  pLastCol->rowKey.ts = pLastColV0->ts;
1,206✔
322
  pLastCol->rowKey.numOfPKs = 0;
1,206✔
323
  pLastCol->dirty = pLastColV0->dirty;
1,206✔
324
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
1,206✔
325
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
1,206✔
326
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
1,206✔
327

328
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
1,206✔
329

330
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
1,206!
331
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
16✔
332
    pLastCol->colVal.value.pData = NULL;
16✔
333
    if (pLastCol->colVal.value.nData > 0) {
16✔
334
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
10✔
335
    }
336
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
16✔
337
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
1,190✔
338
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
300✔
339
    pLastCol->colVal.value.pData = (uint8_t*)(&pLastColV0[1]);
300✔
340
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
300✔
341
  } else {
342
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
890✔
343
    return sizeof(SLastColV0);
890✔
344
  }
345
}
346

347
static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
1,206✔
348
  if (!value) {
1,206!
349
    return TSDB_CODE_INVALID_PARA;
×
350
  }
351

352
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
1,206!
353
  if (NULL == pLastCol) {
1,206!
354
    return terrno;
×
355
  }
356

357
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
1,206✔
358
  if (offset == size) {
1,206!
359
    // version 0
360
    *ppLastCol = pLastCol;
×
361

362
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
363
  } else if (offset > size) {
1,206!
364
    taosMemoryFreeClear(pLastCol);
×
365

366
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
367
  }
368

369
  // version
370
  int8_t version = *(int8_t *)(value + offset);
1,206✔
371
  offset += sizeof(int8_t);
1,206✔
372

373
  // numOfPKs
374
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
1,206✔
375
  offset += sizeof(uint8_t);
1,206✔
376

377
  // pks
378
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
1,206!
379
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
×
380
    offset += sizeof(SValue);
×
381

382
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
383
      pLastCol->rowKey.pks[i].pData = NULL;
×
384
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
385
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
386
        offset += pLastCol->rowKey.pks[i].nData;
×
387
      }
388
    }
389
  }
390

391
  if (version >= LAST_COL_VERSION_2) {
1,206!
392
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
1,206✔
393
  }
394

395
  if (offset > size) {
1,206!
396
    taosMemoryFreeClear(pLastCol);
×
397

398
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
399
  }
400

401
  *ppLastCol = pLastCol;
1,206✔
402

403
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,206✔
404
}
405

406
/*
407
typedef struct {
408
  SLastColV0 lastColV0;
409
  char       colData[];
410
  int8_t     version;
411
  uint8_t    numOfPKs;
412
  SValue     pks[0];
413
  char       pk0Data[];
414
  SValue     pks[1];
415
  char       pk1Data[];
416
  ...
417
} SLastColDisk;
418
*/
419
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
161,437✔
420
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
161,437✔
421

422
  pLastColV0->ts = pLastCol->rowKey.ts;
161,437✔
423
  pLastColV0->dirty = pLastCol->dirty;
161,437✔
424
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
161,437✔
425
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
161,437✔
426
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
161,437✔
427
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
161,437!
428
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
12,796✔
429
    if (pLastCol->colVal.value.nData > 0) {
12,796✔
430
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
5,571✔
431
    }
432
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
12,796✔
433
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
148,641✔
434
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
4,501✔
435
    if (pLastCol->colVal.value.nData > 0) {
4,501✔
436
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
2,561✔
437
    }
438
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
4,501✔
439
  } else {
440
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
144,140✔
441
    return sizeof(SLastColV0);
144,140✔
442
  }
443

444
  return 0;
445
}
446

447
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
161,436✔
448
  *size = sizeof(SLastColV0);
161,436✔
449
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
161,436!
450
    *size += pLastCol->colVal.value.nData;
12,808✔
451
  }
452
  if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
161,436✔
453
    *size += DECIMAL128_BYTES;
4,500✔
454
  }
455
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
161,436✔
456

457
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
179,871✔
458
    *size += sizeof(SValue);
18,435✔
459
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
18,435!
460
      *size += pLastCol->rowKey.pks[i].nData;
6,033✔
461
    }
462
  }
463

464
  *value = taosMemoryMalloc(*size);
161,436!
465
  if (NULL == *value) {
161,445!
466
    TAOS_RETURN(terrno);
×
467
  }
468

469
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
161,445✔
470

471
  // version
472
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
161,359✔
473
  offset++;
161,359✔
474

475
  // numOfPKs
476
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
161,359✔
477
  offset++;
161,359✔
478

479
  // pks
480
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
179,786✔
481
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
18,427✔
482
    offset += sizeof(SValue);
18,427✔
483
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
18,427!
484
      if (pLastCol->rowKey.pks[i].nData > 0) {
6,033!
485
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
6,033✔
486
      }
487
      offset += pLastCol->rowKey.pks[i].nData;
6,033✔
488
    }
489
  }
490

491
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
161,359✔
492

493
  TAOS_RETURN(TSDB_CODE_SUCCESS);
161,359✔
494
}
495

496
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
497

498
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
471,994✔
499
  SLastCol *pLastCol = (SLastCol *)value;
471,994✔
500

501
  if (pLastCol->dirty) {
471,994✔
502
    STsdb *pTsdb = (STsdb *)ud;
123,392✔
503

504
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
123,392✔
505
    if (code) {
123,537!
506
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
507
      return code;
×
508
    }
509

510
    pLastCol->dirty = 0;
123,537✔
511

512
    rocksMayWrite(pTsdb, false);
123,537✔
513
  }
514

515
  return 0;
472,123✔
516
}
517

518
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
1,559,282✔
519
  bool deleted = false;
1,559,282✔
520
  while (*iSkyline > 0) {
1,559,282✔
521
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
472✔
522
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
472✔
523

524
    if (key->ts > pItemBack->ts) {
472✔
525
      return false;
2✔
526
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
470!
527
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
456!
528
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
529
        return true;
456✔
530
      } else {
531
        if (*iSkyline > 1) {
×
532
          --*iSkyline;
×
533
        } else {
534
          return false;
×
535
        }
536
      }
537
    } else {
538
      if (*iSkyline > 1) {
14!
539
        --*iSkyline;
×
540
      } else {
541
        return false;
14✔
542
      }
543
    }
544
  }
545

546
  return deleted;
1,558,810✔
547
}
548

549
// Get next non-deleted row from imem
550
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
433,466✔
551
  int32_t code = 0;
433,466✔
552

553
  if (tsdbTbDataIterNext(pTbIter)) {
433,466✔
554
    TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
432,362✔
555
    TSDBKEY  rowKey = TSDBROW_KEY(pMemRow);
432,362✔
556
    bool     deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
432,362✔
557
    if (!deleted) {
432,282✔
558
      return pMemRow;
432,068✔
559
    }
560
  }
561

562
  return NULL;
1,222✔
563
}
564

565
// Get first non-deleted row from imem
566
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
13,727✔
567
                                    int64_t *piSkyline) {
568
  int32_t code = 0;
13,727✔
569

570
  tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
13,727✔
571
  TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
13,707✔
572
  if (pMemRow) {
13,707✔
573
    // if non deleted, return the found row.
574
    TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
13,394✔
575
    bool    deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
13,394✔
576
    if (!deleted) {
13,396✔
577
      return pMemRow;
13,159✔
578
    }
579
  } else {
580
    return NULL;
313✔
581
  }
582

583
  // continue to find the non-deleted first row from imem, using get next row
584
  return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
237✔
585
}
586

587
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
2,532✔
588
  SRocksCache *pRCache = &pTsdb->rCache;
2,532✔
589
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
2,532✔
590

591
  if (suid > 0 && suid == pRCache->suid) {
50!
592
    pRCache->sver = -1;
×
593
    pRCache->suid = -1;
×
594
  }
595
  if (suid == 0 && uid == pRCache->uid) {
50!
596
    pRCache->sver = -1;
5✔
597
    pRCache->uid = -1;
5✔
598
  }
599
}
600

601
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
433,018✔
602
  SRocksCache *pRCache = &pTsdb->rCache;
433,018✔
603
  if (pRCache->pTSchema && sver == pRCache->sver) {
433,018✔
604
    if (suid > 0 && suid == pRCache->suid) {
432,613✔
605
      return 0;
296,051✔
606
    }
607
    if (suid == 0 && uid == pRCache->uid) {
136,562✔
608
      return 0;
135,970✔
609
    }
610
  }
611

612
  pRCache->suid = suid;
997✔
613
  pRCache->uid = uid;
997✔
614
  pRCache->sver = sver;
997✔
615
  tDestroyTSchema(pRCache->pTSchema);
997!
616
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema);
997✔
617
}
618

619
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
620

621
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
13,701✔
622
  int32_t     code = 0;
13,701✔
623
  int32_t     lino = 0;
13,701✔
624
  STsdb      *pTsdb = imem->pTsdb;
13,701✔
625
  SArray     *pMemDelData = NULL;
13,701✔
626
  SArray     *pSkyline = NULL;
13,701✔
627
  int64_t     iSkyline = 0;
13,701✔
628
  STbDataIter tbIter = {0};
13,701✔
629
  TSDBROW    *pMemRow = NULL;
13,701✔
630
  STSchema   *pTSchema = NULL;
13,701✔
631
  SSHashObj  *iColHash = NULL;
13,701✔
632
  int32_t     sver;
633
  int32_t     nCol;
634
  SArray     *ctxArray = pTsdb->rCache.ctxArray;
13,701✔
635
  STsdbRowKey tsdbRowKey = {0};
13,701✔
636
  STSDBRowIter iter = {0};
13,701✔
637

638
  STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
13,701✔
639

640
  // load imem tomb data and build skyline
641
  TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
13,757!
642

643
  // tsdbBuildDeleteSkyline
644
  size_t delSize = TARRAY_SIZE(pMemDelData);
13,727✔
645
  if (delSize > 0) {
13,727✔
646
    pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
564✔
647
    if (!pSkyline) {
564!
648
      TAOS_CHECK_EXIT(terrno);
×
649
    }
650

651
    TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline));
564!
652
    iSkyline = taosArrayGetSize(pSkyline) - 1;
564✔
653
  }
654

655
  pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
13,727✔
656
  if (!pMemRow) {
13,722✔
657
    goto _exit;
558✔
658
  }
659

660
  // iter first row to last_row/last col values to ctxArray, and mark last null col ids
661
  sver = TSDBROW_SVERSION(pMemRow);
13,164✔
662
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
13,164!
663
  pTSchema = pTsdb->rCache.pTSchema;
13,157✔
664
  nCol = pTSchema->numOfCols;
13,157✔
665

666
  tsdbRowGetKey(pMemRow, &tsdbRowKey);
13,157✔
667

668
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
13,141!
669

670
  int32_t iCol = 0;
13,135✔
671
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
80,507!
672
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
67,330✔
673
    if (!taosArrayPush(ctxArray, &updateCtx)) {
67,317!
674
      TAOS_CHECK_EXIT(terrno);
×
675
    }
676

677
    if (COL_VAL_IS_VALUE(pColVal)) {
67,317✔
678
      updateCtx.lflag = LFLAG_LAST;
51,050✔
679
      if (!taosArrayPush(ctxArray, &updateCtx)) {
51,102!
680
        TAOS_CHECK_EXIT(terrno);
×
681
      }
682
    } else {
683
      if (!iColHash) {
16,267✔
684
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
1,065✔
685
        if (iColHash == NULL) {
1,065!
686
          TAOS_CHECK_EXIT(terrno);
×
687
        }
688
      }
689

690
      if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
16,267!
691
        TAOS_CHECK_EXIT(terrno);
×
692
      }
693
    }
694
  }
695
  tsdbRowClose(&iter);
13,153✔
696

697
  // continue to get next row to fill null last col values
698
  pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
13,159✔
699
  while (pMemRow) {
433,023✔
700
    if (tSimpleHashGetSize(iColHash) == 0) {
432,022✔
701
      break;
12,144✔
702
    }
703

704
    sver = TSDBROW_SVERSION(pMemRow);
419,859!
705
    TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
419,859!
706
    pTSchema = pTsdb->rCache.pTSchema;
419,842✔
707

708
    STsdbRowKey tsdbRowKey = {0};
419,842✔
709
    tsdbRowGetKey(pMemRow, &tsdbRowKey);
419,842✔
710

711
    TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
419,850!
712

713
    int32_t iCol = 0;
420,006✔
714
    for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
8,112,049!
715
      if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
7,696,567✔
716
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
7,085✔
717
        if (!taosArrayPush(ctxArray, &updateCtx)) {
7,085!
718
          TAOS_CHECK_EXIT(terrno);
×
719
        }
720

721
        TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
7,085!
722
      }
723
    }
724
    tsdbRowClose(&iter);
417,870✔
725

726
    pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
420,088✔
727
  }
728

729
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
13,145!
730

731
_exit:
13,189✔
732
  if (code) {
13,747!
733
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
734

735
    tsdbRowClose(&iter);
×
736
  }
737

738
  taosArrayClear(ctxArray);
13,747✔
739
  // destroy any allocated resource
740
  tSimpleHashCleanup(iColHash);
13,744✔
741
  if (pMemDelData) {
13,745!
742
    taosArrayDestroy(pMemDelData);
13,745✔
743
  }
744
  if (pSkyline) {
13,720✔
745
    taosArrayDestroy(pSkyline);
564✔
746
  }
747

748
  TAOS_RETURN(code);
13,720✔
749
}
750

751
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
4,476✔
752
  if (!pTsdb) return 0;
4,476!
753
  if (!pTsdb->imem) return 0;
4,476✔
754

755
  int32_t    code = 0;
868✔
756
  int32_t    lino = 0;
868✔
757
  SMemTable *imem = pTsdb->imem;
868✔
758
  int32_t    nTbData = imem->nTbData;
868✔
759
  int64_t    nRow = imem->nRow;
868✔
760
  int64_t    nDel = imem->nDel;
868✔
761

762
  if (nRow == 0 || nTbData == 0) return 0;
868!
763

764
  TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
519!
765

766
_exit:
519✔
767
  if (code) {
519!
768
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
769
  } else {
770
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
519!
771
  }
772

773
  TAOS_RETURN(code);
519✔
774
}
775

776
int32_t tsdbCacheCommit(STsdb *pTsdb) {
4,476✔
777
  int32_t code = 0;
4,476✔
778

779
  // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
780
  // flush dirty data of lru into rocks
781
  // 4, and update when writing if !updateCacheBatch
782
  // 5, merge cache & mem if updateCacheBatch
783

784
  if (tsUpdateCacheBatch) {
4,476!
785
    code = tsdbCacheUpdateFromIMem(pTsdb);
4,476✔
786
    if (code) {
4,476!
787
      tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
788

789
      TAOS_RETURN(code);
×
790
    }
791
  }
792

793
  char                 *err = NULL;
4,476✔
794
  SLRUCache            *pCache = pTsdb->lruCache;
4,476✔
795
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
796

797
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
4,476✔
798

799
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
4,476✔
800

801
#ifdef USE_ROCKSDB
802
  rocksMayWrite(pTsdb, true);
4,476✔
803
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
4,476✔
804
#endif
805
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
4,469✔
806
#ifdef USE_ROCKSDB
807
  if (NULL != err) {
4,475!
808
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
809
    rocksdb_free(err);
×
810
    code = TSDB_CODE_FAILED;
×
811
  }
812
#endif
813
  TAOS_RETURN(code);
4,475✔
814
}
815

816
static int32_t reallocVarDataVal(SValue *pValue) {
35,474✔
817
  if (IS_VAR_DATA_TYPE(pValue->type)) {
35,474!
818
    uint8_t *pVal = pValue->pData;
35,487✔
819
    uint32_t nData = pValue->nData;
35,487✔
820
    if (nData > 0) {
35,487✔
821
      uint8_t *p = taosMemoryMalloc(nData);
23,192!
822
      if (!p) {
23,194!
823
        TAOS_RETURN(terrno);
×
824
      }
825
      pValue->pData = p;
23,194✔
826
      (void)memcpy(pValue->pData, pVal, nData);
23,194✔
827
    } else {
828
      pValue->pData = NULL;
12,295✔
829
    }
830
  }
831

832
  TAOS_RETURN(TSDB_CODE_SUCCESS);
35,476✔
833
}
834

835
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
24,009✔
836

837
// realloc pk data and col data.
838
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
299,243✔
839
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
299,243✔
840
  size_t  charge = sizeof(SLastCol);
299,243✔
841

842
  int8_t i = 0;
299,243✔
843
  for (; i < pCol->rowKey.numOfPKs; i++) {
335,093✔
844
    SValue *pValue = &pCol->rowKey.pks[i];
35,857✔
845
    if (IS_VAR_DATA_TYPE(pValue->type)) {
35,857!
846
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
11,475!
847
      charge += pValue->nData;
11,468✔
848
    }
849
  }
850

851
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
299,236!
852
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
24,105!
853
    charge += pCol->colVal.value.nData;
24,009✔
854
  }
855

856
  if (pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
299,140✔
857
    if (pCol->colVal.value.nData > 0) {
6,126✔
858
      void *p = taosMemoryMalloc(pCol->colVal.value.nData);
3,020!
859
      if (!p) TAOS_CHECK_EXIT(terrno);
3,020!
860
      (void)memcpy(p, pCol->colVal.value.pData, pCol->colVal.value.nData);
3,020✔
861
      pCol->colVal.value.pData = p;
3,020✔
862
    }
863
    charge += pCol->colVal.value.nData;
6,126✔
864
  }
865

866
  if (pCharge) {
299,140✔
867
    *pCharge = charge;
234,667✔
868
  }
869

870
_exit:
64,473✔
871
  if (TSDB_CODE_SUCCESS != code) {
299,140!
872
    for (int8_t j = 0; j < i; j++) {
×
873
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
874
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
875
      }
876
    }
877

878
    (void)memset(pCol, 0, sizeof(SLastCol));
×
879
  }
880

881
  TAOS_RETURN(code);
299,140✔
882
}
883

884
void tsdbCacheFreeSLastColItem(void *pItem) {
73,892✔
885
  SLastCol *pCol = (SLastCol *)pItem;
73,892✔
886
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
94,270✔
887
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
20,377!
888
      taosMemoryFree(pCol->rowKey.pks[i].pData);
6,524!
889
    }
890
  }
891

892
  if ((IS_VAR_DATA_TYPE(pCol->colVal.value.type) || pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) &&
73,893!
893
      pCol->colVal.value.pData) {
12,811✔
894
    taosMemoryFree(pCol->colVal.value.pData);
8,389!
895
  }
896
}
73,894✔
897

898
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
234,063✔
899
  SLastCol *pLastCol = (SLastCol *)value;
234,063✔
900

901
  if (pLastCol->dirty) {
234,063✔
902
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
5,090!
903
      STsdb *pTsdb = (STsdb *)ud;
×
904
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
905
    }
906
  }
907

908
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
252,317✔
909
    SValue *pValue = &pLastCol->rowKey.pks[i];
18,411✔
910
    if (IS_VAR_DATA_TYPE(pValue->type)) {
18,411!
911
      taosMemoryFree(pValue->pData);
6,025!
912
    }
913
  }
914

915
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) ||
233,906!
916
      pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL /* && pLastCol->colVal.value.nData > 0*/) {
220,383✔
917
    taosMemoryFree(pLastCol->colVal.value.pData);
19,051!
918
  }
919

920
  taosMemoryFree(value);
233,954✔
921
}
234,638✔
922

923
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
102,068✔
924
  SLastCol *pLastCol = (SLastCol *)value;
102,068✔
925
  pLastCol->dirty = 0;
102,068✔
926
}
102,068✔
927

928
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
929

930
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
94,011✔
931
  int32_t code = 0, lino = 0;
94,011✔
932

933
  SLRUCache            *pCache = pTsdb->lruCache;
94,011✔
934
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
935
  SRowKey               emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
94,011✔
936
  SLastCol              emptyCol = {
94,011✔
937
                   .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
938

939
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
94,011✔
940
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
94,011✔
941
  if (code) {
94,065!
942
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
943
  }
944

945
  TAOS_RETURN(code);
94,065✔
946
}
947

948
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
3,149✔
949
  int32_t code = 0;
3,149✔
950
  char   *err = NULL;
3,149✔
951

952
  SLRUCache            *pCache = pTsdb->lruCache;
3,149✔
953
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
954

955
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
3,149✔
956
#ifdef USE_ROCKSDB
957
  rocksMayWrite(pTsdb, true);
3,148✔
958
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
3,147✔
959
  if (NULL != err) {
3,134!
960
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
961
    rocksdb_free(err);
×
962
    code = TSDB_CODE_FAILED;
×
963
  }
964
#endif
965
  TAOS_RETURN(code);
3,134✔
966
}
967

968
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
89,213✔
969
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
970
#ifdef USE_ROCKSDB
971
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
89,213!
972
  if (!valuesList) return terrno;
89,228!
973
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
89,228!
974
  if (!valuesListSizes) {
89,230!
975
    taosMemoryFreeClear(valuesList);
×
976
    return terrno;
×
977
  }
978
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
89,230✔
979
  if (!errs) {
89,236!
980
    taosMemoryFreeClear(valuesList);
×
981
    taosMemoryFreeClear(valuesListSizes);
×
982
    return terrno;
×
983
  }
984
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
89,236✔
985
                    valuesListSizes, errs);
986
  for (size_t i = 0; i < numKeys; ++i) {
749,348✔
987
    rocksdb_free(errs[i]);
660,157✔
988
  }
989
  taosMemoryFreeClear(errs);
89,191!
990

991
  *pppValuesList = valuesList;
89,186✔
992
  *ppValuesListSizes = valuesListSizes;
89,186✔
993
#endif
994
  TAOS_RETURN(TSDB_CODE_SUCCESS);
89,186✔
995
}
996

997
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
26,786✔
998
  int32_t code = 0;
26,786✔
999

1000
  // build keys & multi get from rocks
1001
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
26,786!
1002
  if (!keys_list) {
26,792!
1003
    return terrno;
×
1004
  }
1005
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
26,792!
1006
  if (!keys_list_sizes) {
26,813!
1007
    taosMemoryFree(keys_list);
×
1008
    return terrno;
×
1009
  }
1010
  const size_t klen = ROCKS_KEY_LEN;
26,813✔
1011

1012
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
26,813!
1013
  if (!keys) {
26,801!
1014
    taosMemoryFree(keys_list);
×
1015
    taosMemoryFree(keys_list_sizes);
×
1016
    return terrno;
×
1017
  }
1018
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
26,801✔
1019
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
26,801✔
1020

1021
  keys_list[0] = keys;
26,801✔
1022
  keys_list[1] = keys + sizeof(SLastKey);
26,801✔
1023
  keys_list_sizes[0] = klen;
26,801✔
1024
  keys_list_sizes[1] = klen;
26,801✔
1025

1026
  char  **values_list = NULL;
26,801✔
1027
  size_t *values_list_sizes = NULL;
26,801✔
1028

1029
  // was written by caller
1030
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
1031

1032
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
26,801!
1033
                                              &values_list_sizes),
1034
                  NULL, _exit);
1035
#ifdef USE_ROCKSDB
1036
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
26,772✔
1037
#endif
1038
  {
1039
#ifdef USE_ROCKSDB
1040
    SLastCol *pLastCol = NULL;
26,772✔
1041
    if (values_list[0] != NULL) {
26,772✔
1042
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
533✔
1043
      if (code != TSDB_CODE_SUCCESS) {
533!
1044
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1045
                  tstrerror(code));
1046
        goto _exit;
×
1047
      }
1048
      if (NULL != pLastCol) {
533!
1049
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
533✔
1050
      }
1051
      taosMemoryFreeClear(pLastCol);
533!
1052
    }
1053

1054
    pLastCol = NULL;
26,772✔
1055
    if (values_list[1] != NULL) {
26,772✔
1056
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
533✔
1057
      if (code != TSDB_CODE_SUCCESS) {
533!
1058
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1059
                  tstrerror(code));
1060
        goto _exit;
×
1061
      }
1062
      if (NULL != pLastCol) {
533!
1063
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
533✔
1064
      }
1065
      taosMemoryFreeClear(pLastCol);
533!
1066
    }
1067

1068
    rocksdb_free(values_list[0]);
26,772✔
1069
    rocksdb_free(values_list[1]);
26,786✔
1070
#endif
1071

1072
    for (int i = 0; i < 2; i++) {
80,311✔
1073
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
53,496✔
1074
      if (h) {
53,616✔
1075
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
1,066✔
1076
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
1,066✔
1077
      }
1078
    }
1079
  }
1080

1081
_exit:
26,815✔
1082
  taosMemoryFree(keys_list[0]);
26,815!
1083

1084
  taosMemoryFree(keys_list);
26,812!
1085
  taosMemoryFree(keys_list_sizes);
26,813!
1086
  taosMemoryFree(values_list);
26,806!
1087
  taosMemoryFree(values_list_sizes);
26,812!
1088

1089
  TAOS_RETURN(code);
26,800✔
1090
}
1091

1092
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
12,542✔
1093
  int32_t code = 0;
12,542✔
1094

1095
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
12,542✔
1096

1097
  if (suid < 0) {
12,551✔
1098
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
714✔
1099
      int16_t cid = pSchemaRow->pSchema[i].colId;
643✔
1100
      int8_t  col_type = pSchemaRow->pSchema[i].type;
643✔
1101

1102
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
643✔
1103
      if (code != TSDB_CODE_SUCCESS) {
643!
1104
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1105
                  tstrerror(code));
1106
      }
1107
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
643✔
1108
      if (code != TSDB_CODE_SUCCESS) {
643!
1109
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1110
                  tstrerror(code));
1111
      }
1112
    }
1113
  } else {
1114
    STSchema *pTSchema = NULL;
12,480✔
1115
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
12,480✔
1116
    if (code != TSDB_CODE_SUCCESS) {
12,471!
1117
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1118

1119
      TAOS_RETURN(code);
×
1120
    }
1121

1122
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
58,111✔
1123
      int16_t cid = pTSchema->columns[i].colId;
45,628✔
1124
      int8_t  col_type = pTSchema->columns[i].type;
45,628✔
1125

1126
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
45,628✔
1127
      if (code != TSDB_CODE_SUCCESS) {
45,635!
1128
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1129
                  tstrerror(code));
1130
      }
1131
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
45,635✔
1132
      if (code != TSDB_CODE_SUCCESS) {
45,635!
1133
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1134
                  tstrerror(code));
1135
      }
1136
    }
1137

1138
    taosMemoryFree(pTSchema);
12,483!
1139
  }
1140

1141
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
12,550✔
1142

1143
  TAOS_RETURN(code);
12,549✔
1144
}
1145

1146
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
76✔
1147
  int32_t code = 0;
76✔
1148

1149
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
76✔
1150

1151
  code = tsdbCacheCommitNoLock(pTsdb);
76✔
1152
  if (code != TSDB_CODE_SUCCESS) {
76!
1153
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1154
              tstrerror(code));
1155
  }
1156

1157
  if (pSchemaRow != NULL) {
76!
1158
    bool hasPrimayKey = false;
×
1159
    int  nCols = pSchemaRow->nCols;
×
1160
    if (nCols >= 2) {
×
1161
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
1162
    }
1163
    for (int i = 0; i < nCols; ++i) {
×
1164
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
1165
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1166

1167
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1168
      if (code != TSDB_CODE_SUCCESS) {
×
1169
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1170
                  tstrerror(code));
1171
      }
1172
    }
1173
  } else {
1174
    STSchema *pTSchema = NULL;
76✔
1175
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
76✔
1176
    if (code != TSDB_CODE_SUCCESS) {
76!
1177
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1178

1179
      TAOS_RETURN(code);
×
1180
    }
1181

1182
    bool hasPrimayKey = false;
76✔
1183
    int  nCols = pTSchema->numOfCols;
76✔
1184
    if (nCols >= 2) {
76!
1185
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
76✔
1186
    }
1187
    for (int i = 0; i < nCols; ++i) {
229✔
1188
      int16_t cid = pTSchema->columns[i].colId;
153✔
1189
      int8_t  col_type = pTSchema->columns[i].type;
153✔
1190

1191
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
153✔
1192
      if (code != TSDB_CODE_SUCCESS) {
153!
1193
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1194
                  tstrerror(code));
1195
      }
1196
    }
1197

1198
    taosMemoryFree(pTSchema);
76!
1199
  }
1200

1201
  rocksMayWrite(pTsdb, false);
76✔
1202

1203
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
76✔
1204

1205
  TAOS_RETURN(code);
76✔
1206
}
1207

1208
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
2,963✔
1209
  int32_t code = 0;
2,963✔
1210

1211
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
2,963✔
1212

1213
  code = tsdbCacheCommitNoLock(pTsdb);
2,981✔
1214
  if (code != TSDB_CODE_SUCCESS) {
2,955!
1215
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1216
              tstrerror(code));
1217
  }
1218

1219
  STSchema *pTSchema = NULL;
2,955✔
1220
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
2,955✔
1221
  if (code != TSDB_CODE_SUCCESS) {
2,975✔
1222
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
4✔
1223

1224
    TAOS_RETURN(code);
6✔
1225
  }
1226

1227
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
6,272✔
1228
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
3,294✔
1229

1230
    bool hasPrimayKey = false;
3,294✔
1231
    int  nCols = pTSchema->numOfCols;
3,294✔
1232
    if (nCols >= 2) {
3,294!
1233
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
3,294✔
1234
    }
1235

1236
    for (int i = 0; i < nCols; ++i) {
29,556✔
1237
      int16_t cid = pTSchema->columns[i].colId;
26,255✔
1238
      int8_t  col_type = pTSchema->columns[i].type;
26,255✔
1239

1240
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
26,255✔
1241
      if (code != TSDB_CODE_SUCCESS) {
26,262!
1242
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1243
                  tstrerror(code));
1244
      }
1245
    }
1246
  }
1247

1248
  taosMemoryFree(pTSchema);
2,978!
1249

1250
  rocksMayWrite(pTsdb, false);
2,974✔
1251

1252
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
2,975✔
1253

1254
  TAOS_RETURN(code);
2,975✔
1255
}
1256

1257
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
×
1258
  int32_t code = 0;
×
1259

1260
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1261

1262
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
1263
  if (code != TSDB_CODE_SUCCESS) {
×
1264
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1265
              tstrerror(code));
1266
  }
1267
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
1268
  if (code != TSDB_CODE_SUCCESS) {
×
1269
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1270
              tstrerror(code));
1271
  }
1272
  // rocksMayWrite(pTsdb, true, false, false);
1273
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1274

1275
  TAOS_RETURN(code);
×
1276
}
1277

1278
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
×
1279
  int32_t code = 0;
×
1280

1281
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1282

1283
  code = tsdbCacheCommitNoLock(pTsdb);
×
1284
  if (code != TSDB_CODE_SUCCESS) {
×
1285
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1286
              tstrerror(code));
1287
  }
1288

1289
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1290
  if (code != TSDB_CODE_SUCCESS) {
×
1291
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1292
              tstrerror(code));
1293
  }
1294

1295
  rocksMayWrite(pTsdb, false);
×
1296

1297
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1298

1299
  TAOS_RETURN(code);
×
1300
}
1301

1302
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
184✔
1303
  int32_t code = 0;
184✔
1304

1305
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
184✔
1306

1307
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
944✔
1308
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
760✔
1309

1310
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
760✔
1311
    if (code != TSDB_CODE_SUCCESS) {
760!
1312
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1313
                tstrerror(code));
1314
    }
1315
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
760✔
1316
    if (code != TSDB_CODE_SUCCESS) {
760!
1317
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1318
                tstrerror(code));
1319
    }
1320
  }
1321

1322
  // rocksMayWrite(pTsdb, true, false, false);
1323
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
184✔
1324
  TAOS_RETURN(code);
184✔
1325
}
1326

1327
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
92✔
1328
  int32_t code = 0;
92✔
1329

1330
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
92✔
1331

1332
  code = tsdbCacheCommitNoLock(pTsdb);
92✔
1333
  if (code != TSDB_CODE_SUCCESS) {
92!
1334
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1335
              tstrerror(code));
1336
  }
1337

1338
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
472✔
1339
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
380✔
1340

1341
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
380✔
1342
    if (code != TSDB_CODE_SUCCESS) {
380!
1343
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1344
                tstrerror(code));
1345
    }
1346
  }
1347

1348
  rocksMayWrite(pTsdb, false);
92✔
1349

1350
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
92✔
1351

1352
  TAOS_RETURN(code);
92✔
1353
}
1354

1355
typedef struct {
1356
  int      idx;
1357
  SLastKey key;
1358
} SIdxKey;
1359

1360
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
×
1361
  // update rowkey
1362
  pLastCol->rowKey.ts = TSKEY_MIN;
×
1363
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
1364
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
1365
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
1366
      taosMemoryFreeClear(pPKValue->pData);
×
1367
      pPKValue->nData = 0;
×
1368
    } else {
1369
      valueClearDatum(pPKValue, pPKValue->type);
×
1370
    }
1371
  }
1372
  pLastCol->rowKey.numOfPKs = 0;
×
1373

1374
  // update colval
1375
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
×
1376
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
1377
    pLastCol->colVal.value.nData = 0;
×
1378
  } else {
1379
    valueClearDatum(&pLastCol->colVal.value, pLastCol->colVal.value.type);
×
1380
  }
1381

1382
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
×
1383
  pLastCol->dirty = 1;
×
1384
  pLastCol->cacheStatus = cacheStatus;
×
1385
}
×
1386

1387
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
161,454✔
1388
  int32_t code = 0;
161,454✔
1389
#ifdef USE_ROCKSDB
1390
  char   *rocks_value = NULL;
161,454✔
1391
  size_t  vlen = 0;
161,454✔
1392

1393
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
161,454✔
1394
  if (code) {
161,108!
1395
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
1396
    TAOS_RETURN(code);
×
1397
  }
1398

1399
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
161,108✔
1400
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
161,108✔
1401
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
161,696✔
1402
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
161,834✔
1403

1404
  taosMemoryFree(rocks_value);
161,873!
1405
#endif
1406
  TAOS_RETURN(code);
161,755✔
1407
}
1408

1409
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
234,206✔
1410
  int32_t code = 0, lino = 0;
234,206✔
1411

1412
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
234,206!
1413
  if (!pLRULastCol) {
234,803!
1414
    return terrno;
×
1415
  }
1416

1417
  size_t charge = 0;
234,803✔
1418
  *pLRULastCol = *pLastCol;
234,803✔
1419
  pLRULastCol->dirty = dirty;
234,803✔
1420
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
234,803!
1421

1422
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
234,466✔
1423
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
1424
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
234,234!
1425
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
1426
    code = TSDB_CODE_FAILED;
×
1427
    pLRULastCol = NULL;
×
1428
  }
1429

1430
_exit:
234,234✔
1431
  if (TSDB_CODE_SUCCESS != code) {
234,234!
1432
    taosMemoryFree(pLRULastCol);
×
1433
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1434
  }
1435

1436
  TAOS_RETURN(code);
234,234✔
1437
}
1438

1439
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
13,152✔
1440
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
13,152!
1441
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1442
  }
1443

1444
  int32_t code = 0, lino = 0;
13,152✔
1445

1446
  int        num_keys = TARRAY_SIZE(updCtxArray);
13,152✔
1447
  SArray    *remainCols = NULL;
13,152✔
1448
  SLRUCache *pCache = pTsdb->lruCache;
13,152✔
1449

1450
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
13,152✔
1451
  for (int i = 0; i < num_keys; ++i) {
138,529✔
1452
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
125,353✔
1453
    int8_t          lflag = updCtx->lflag;
125,353✔
1454
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
125,353✔
1455
    SColVal        *pColVal = &updCtx->colVal;
125,353✔
1456

1457
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
125,353!
1458
      continue;
×
1459
    }
1460

1461
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
125,353✔
1462
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
125,353✔
1463
    if (h) {
125,371✔
1464
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
89,454✔
1465
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
89,298✔
1466
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
88,934✔
1467
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
88,770!
1468
          SLastCol newLastCol = {
88,690✔
1469
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1470
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
88,690✔
1471
        }
1472
      }
1473

1474
      tsdbLRUCacheRelease(pCache, h, false);
89,089✔
1475
      TAOS_CHECK_EXIT(code);
89,449!
1476
    } else {
1477
      if (!remainCols) {
35,917✔
1478
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
931✔
1479
        if (!remainCols) {
931!
1480
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1481
        }
1482
      }
1483
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
71,819!
1484
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1485
      }
1486
    }
1487
  }
1488

1489
  if (remainCols) {
13,176✔
1490
    num_keys = TARRAY_SIZE(remainCols);
931✔
1491
  }
1492
  if (remainCols && num_keys > 0) {
13,176!
1493
    char  **keys_list = NULL;
931✔
1494
    size_t *keys_list_sizes = NULL;
931✔
1495
    char  **values_list = NULL;
931✔
1496
    size_t *values_list_sizes = NULL;
931✔
1497
    char  **errs = NULL;
931✔
1498
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
931!
1499
    if (!keys_list) {
931!
1500
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1501
      return terrno;
×
1502
    }
1503
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
931!
1504
    if (!keys_list_sizes) {
931!
1505
      taosMemoryFree(keys_list);
×
1506
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1507
      return terrno;
×
1508
    }
1509
    for (int i = 0; i < num_keys; ++i) {
36,851✔
1510
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
35,920✔
1511

1512
      keys_list[i] = (char *)&idxKey->key;
35,920✔
1513
      keys_list_sizes[i] = ROCKS_KEY_LEN;
35,920✔
1514
    }
1515

1516
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
931✔
1517

1518
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
931✔
1519
                                       &values_list_sizes);
1520
    if (code) {
931!
1521
      taosMemoryFree(keys_list);
×
1522
      taosMemoryFree(keys_list_sizes);
×
1523
      goto _exit;
×
1524
    }
1525

1526
    // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
1527
    for (int i = 0; i < num_keys; ++i) {
36,815✔
1528
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
35,884✔
1529
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
35,884✔
1530
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
35,873✔
1531
      SColVal        *pColVal = &updCtx->colVal;
35,873✔
1532

1533
      SLastCol *pLastCol = NULL;
35,873✔
1534
      if (values_list[i] != NULL) {
35,873✔
1535
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
65✔
1536
        if (code != TSDB_CODE_SUCCESS) {
65!
1537
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1538
                    tstrerror(code));
1539
          goto _exit;
×
1540
        }
1541
      }
1542
      /*
1543
      if (code) {
1544
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1545
      }
1546
      */
1547
      SLastCol *pToFree = pLastCol;
35,873✔
1548

1549
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
35,873!
1550
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
1551
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1552
                    tstrerror(code));
1553
          taosMemoryFreeClear(pToFree);
×
1554
          break;
×
1555
        }
1556

1557
        // cache invalid => skip update
1558
        taosMemoryFreeClear(pToFree);
×
1559
        continue;
×
1560
      }
1561

1562
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
35,873!
1563
        taosMemoryFreeClear(pToFree);
×
1564
        continue;
×
1565
      }
1566

1567
      int32_t cmp_res = 1;
35,873✔
1568
      if (pLastCol) {
35,873✔
1569
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
65✔
1570
      }
1571

1572
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
35,873!
1573
        SLastCol lastColTmp = {
35,873✔
1574
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
1575
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
35,873!
1576
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1577
                    tstrerror(code));
1578
          taosMemoryFreeClear(pToFree);
×
1579
          break;
×
1580
        }
1581
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
35,883!
1582
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1583
                    tstrerror(code));
1584
          taosMemoryFreeClear(pToFree);
×
1585
          break;
×
1586
        }
1587
      }
1588

1589
      taosMemoryFreeClear(pToFree);
35,884!
1590
    }
1591

1592
    rocksMayWrite(pTsdb, false);
931✔
1593

1594
    taosMemoryFree(keys_list);
931!
1595
    taosMemoryFree(keys_list_sizes);
931!
1596
    if (values_list) {
931!
1597
#ifdef USE_ROCKSDB
1598
      for (int i = 0; i < num_keys; ++i) {
36,851✔
1599
        rocksdb_free(values_list[i]);
35,920✔
1600
      }
1601
#endif
1602
      taosMemoryFree(values_list);
931!
1603
    }
1604
    taosMemoryFree(values_list_sizes);
931!
1605
  }
1606

1607
_exit:
12,245✔
1608
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
13,176✔
1609
  taosArrayDestroy(remainCols);
13,198✔
1610

1611
  if (code) {
13,191!
1612
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1613
              tstrerror(code));
1614
  }
1615

1616
  TAOS_RETURN(code);
13,191✔
1617
}
1618

1619
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
×
1620
                                 SRow **aRow) {
1621
  int32_t code = 0, lino = 0;
×
1622

1623
  // 1. prepare last
1624
  TSDBROW      lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
×
1625
  STSchema    *pTSchema = NULL;
×
1626
  int32_t      sver = TSDBROW_SVERSION(&lRow);
×
1627
  SSHashObj   *iColHash = NULL;
×
1628
  STSDBRowIter iter = {0};
×
1629

1630
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
1631
  pTSchema = pTsdb->rCache.pTSchema;
×
1632

1633
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
×
1634
  int32_t nCol = pTSchema->numOfCols;
×
1635
  SArray *ctxArray = pTsdb->rCache.ctxArray;
×
1636

1637
  // 1. prepare by lrow
1638
  STsdbRowKey tsdbRowKey = {0};
×
1639
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1640

1641
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1642

1643
  int32_t iCol = 0;
×
1644
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
1645
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1646
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1647
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1648
    }
1649

1650
    if (COL_VAL_IS_VALUE(pColVal)) {
×
1651
      updateCtx.lflag = LFLAG_LAST;
×
1652
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1653
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1654
      }
1655
    } else {
1656
      if (!iColHash) {
×
1657
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
1658
        if (iColHash == NULL) {
×
1659
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1660
        }
1661
      }
1662

1663
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
×
1664
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1665
      }
1666
    }
1667
  }
1668

1669
  // 2. prepare by the other rows
1670
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
×
1671
    if (tSimpleHashGetSize(iColHash) == 0) {
×
1672
      break;
×
1673
    }
1674

1675
    tRow.pTSRow = aRow[iRow];
×
1676

1677
    STsdbRowKey tsdbRowKey = {0};
×
1678
    tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1679

1680
    void   *pIte = NULL;
×
1681
    int32_t iter = 0;
×
1682
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
×
1683
      int32_t iCol = ((int32_t *)pIte)[0];
×
1684
      SColVal colVal = COL_VAL_NONE(0, 0);
×
1685
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
×
1686

1687
      if (COL_VAL_IS_VALUE(&colVal)) {
×
1688
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1689
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1690
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1691
        }
1692
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
1693
        if (code != TSDB_CODE_SUCCESS) {
×
1694
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1695
                    __LINE__, tstrerror(code));
1696
        }
1697
      }
1698
    }
1699
  }
1700

1701
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1702

1703
_exit:
×
1704
  if (code) {
×
1705
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1706
  }
1707

1708
  tsdbRowClose(&iter);
×
1709
  tSimpleHashCleanup(iColHash);
×
1710
  taosArrayClear(ctxArray);
×
1711

1712
  TAOS_RETURN(code);
×
1713
}
1714

1715
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
1716
  int32_t      code = 0, lino = 0;
×
1717
  STSDBRowIter iter = {0};
×
1718
  STSchema    *pTSchema = NULL;
×
1719
  SArray      *ctxArray = NULL;
×
1720

1721
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
1722
  int32_t sver = TSDBROW_SVERSION(&lRow);
×
1723

1724
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1725

1726
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
1727
  if (ctxArray == NULL) {
×
1728
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1729
  }
1730

1731
  // 1. prepare last
1732
  STsdbRowKey tsdbRowKey = {0};
×
1733
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1734

1735
  {
1736
    SValue tsVal = {.type = TSDB_DATA_TYPE_TIMESTAMP};
×
1737
    VALUE_SET_TRIVIAL_DATUM(&tsVal, lRow.pBlockData->aTSKEY[lRow.iRow]);
×
1738
    SLastUpdateCtx updateCtx = {
×
1739
        .lflag = LFLAG_LAST,
1740
        .tsdbRowKey = tsdbRowKey,
1741
        .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, tsVal)};
1742
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1743
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1744
    }
1745
  }
1746

1747
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1748

1749
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1750
    SColData *pColData = &pBlockData->aColData[iColData];
×
1751
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
1752
      continue;
×
1753
    }
1754

1755
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
1756
      STsdbRowKey tsdbRowKey = {0};
×
1757
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1758

1759
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
1760
      if (colType == 2) {
×
1761
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
1762
        TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit);
×
1763

1764
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1765
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1766
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1767
        }
1768
        break;
×
1769
      }
1770
    }
1771
  }
1772

1773
  // 2. prepare last row
1774
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1775
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
1776
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1777
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1778
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1779
    }
1780
  }
1781

1782
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1783

1784
_exit:
×
1785
  tsdbRowClose(&iter);
×
1786
  taosMemoryFreeClear(pTSchema);
×
1787
  taosArrayDestroy(ctxArray);
×
1788

1789
  TAOS_RETURN(code);
×
1790
}
1791

1792
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1793
                            int nCols, int16_t *slotIds);
1794

1795
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1796
                               int nCols, int16_t *slotIds);
1797

1798
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
1,052✔
1799
                                    SCacheRowsReader *pr, int8_t ltype) {
1800
  int32_t               code = 0, lino = 0;
1,052✔
1801
  // rocksdb_writebatch_t *wb = NULL;
1802
  SArray               *pTmpColArray = NULL;
1,052✔
1803
  bool                  extraTS = false;
1,052✔
1804

1805
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
1,052✔
1806
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
1,052✔
1807
    // ignore 'ts' loaded from cache and load it from tsdb
1808
    // SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
1809
    // tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
1810

1811
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
596✔
1812
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
596!
1813
      TAOS_RETURN(terrno);
×
1814
    }
1815

1816
    extraTS = true;
596✔
1817
  }
1818

1819
  int      num_keys = TARRAY_SIZE(remainCols);
1,052✔
1820
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
1,052!
1821

1822
  int16_t *lastColIds = NULL, *lastSlotIds = NULL, *lastrowColIds = NULL, *lastrowSlotIds = NULL;
1,052✔
1823
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
1,052!
1824
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
1,053!
1825
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
1,053!
1826
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
1,053!
1827
  SArray *lastTmpColArray = NULL, *lastTmpIndexArray = NULL, *lastrowTmpColArray = NULL, *lastrowTmpIndexArray = NULL;
1,054✔
1828

1829
  int lastIndex = 0;
1,054✔
1830
  int lastrowIndex = 0;
1,054✔
1831

1832
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
1,054!
1833
    TAOS_CHECK_EXIT(terrno);
1!
1834
  }
1835

1836
  for (int i = 0; i < num_keys; ++i) {
3,270✔
1837
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
2,216✔
1838
    if (extraTS && !i) {
2,215✔
1839
      slotIds[i] = 0;
596✔
1840
    } else {
1841
      slotIds[i] = pr->pSlotIds[idxKey->idx];
1,619✔
1842
    }
1843

1844
    if (IS_LAST_KEY(idxKey->key)) {
2,215✔
1845
      if (NULL == lastTmpIndexArray) {
1,256✔
1846
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
598✔
1847
        if (!lastTmpIndexArray) {
598!
1848
          TAOS_CHECK_EXIT(terrno);
×
1849
        }
1850
      }
1851
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
1,257!
1852
        TAOS_CHECK_EXIT(terrno);
×
1853
      }
1854
      lastColIds[lastIndex] = idxKey->key.cid;
1,257✔
1855
      if (extraTS && !i) {
1,257✔
1856
        lastSlotIds[lastIndex] = 0;
578✔
1857
      } else {
1858
        lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
679✔
1859
      }
1860
      lastIndex++;
1,257✔
1861
    } else {
1862
      if (NULL == lastrowTmpIndexArray) {
959✔
1863
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
455✔
1864
        if (!lastrowTmpIndexArray) {
453!
1865
          TAOS_CHECK_EXIT(terrno);
×
1866
        }
1867
      }
1868
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
960!
1869
        TAOS_CHECK_EXIT(terrno);
×
1870
      }
1871
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
960✔
1872
      if (extraTS && !i) {
960✔
1873
        lastrowSlotIds[lastrowIndex] = 0;
18✔
1874
      } else {
1875
        lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
942✔
1876
      }
1877
      lastrowIndex++;
960✔
1878
    }
1879
  }
1880

1881
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
1,054✔
1882
  if (!pTmpColArray) {
1,050!
1883
    TAOS_CHECK_EXIT(terrno);
×
1884
  }
1885

1886
  if (lastTmpIndexArray != NULL) {
1,050✔
1887
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
597!
1888
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
1,804✔
1889
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
1,207!
1890
                           taosArrayGet(lastTmpColArray, i))) {
1,207✔
1891
        TAOS_CHECK_EXIT(terrno);
×
1892
      }
1893
    }
1894
  }
1895

1896
  if (lastrowTmpIndexArray != NULL) {
1,050✔
1897
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
453!
1898
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
1,338✔
1899
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
883!
1900
                           taosArrayGet(lastrowTmpColArray, i))) {
881✔
1901
        TAOS_CHECK_EXIT(terrno);
×
1902
      }
1903
    }
1904
  }
1905

1906
  SLRUCache *pCache = pTsdb->lruCache;
1,052✔
1907
  for (int i = 0; i < num_keys; ++i) {
3,270✔
1908
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
2,217✔
1909
    SLastCol *pLastCol = NULL;
2,217✔
1910

1911
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
2,217!
1912
      pLastCol = taosArrayGet(pTmpColArray, i);
2,091✔
1913
    }
1914

1915
    // still null, then make up a none col value
1916
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
2,216✔
1917
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
2,216✔
1918
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
1919
    if (!pLastCol) {
2,216✔
1920
      pLastCol = &noneCol;
128✔
1921
    }
1922

1923
    if (!extraTS || i > 0) {
2,216✔
1924
      taosArraySet(pLastArray, idxKey->idx, pLastCol);
1,622✔
1925
    }
1926
    // taosArrayRemove(remainCols, i);
1927

1928
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
2,216!
1929
      continue;
×
1930
    }
1931
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
2,216!
1932
      continue;
×
1933
    }
1934

1935
    // store result back to rocks cache
1936
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
2,216✔
1937
    if (code) {
2,216!
1938
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1939
      TAOS_CHECK_EXIT(code);
×
1940
    }
1941

1942
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
2,216✔
1943
    if (code) {
2,217!
1944
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1945
      TAOS_CHECK_EXIT(code);
1!
1946
    }
1947
  }
1948

1949
  rocksMayWrite(pTsdb, false);
1,053✔
1950

1951
_exit:
1,053✔
1952
  taosArrayDestroy(lastrowTmpIndexArray);
1,053✔
1953
  taosArrayDestroy(lastrowTmpColArray);
1,053✔
1954
  taosArrayDestroy(lastTmpIndexArray);
1,053✔
1955
  taosArrayDestroy(lastTmpColArray);
1,052✔
1956

1957
  taosMemoryFree(lastColIds);
1,053!
1958
  taosMemoryFree(lastSlotIds);
1,053!
1959
  taosMemoryFree(lastrowColIds);
1,053!
1960
  taosMemoryFree(lastrowSlotIds);
1,053!
1961

1962
  taosArrayDestroy(pTmpColArray);
1,053✔
1963

1964
  taosMemoryFree(slotIds);
1,053!
1965

1966
  TAOS_RETURN(code);
1,053✔
1967
}
1968

1969
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
1,093✔
1970
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
1971
  int32_t code = 0, lino = 0;
1,093✔
1972
  int     num_keys = TARRAY_SIZE(remainCols);
1,093✔
1973
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
1,093!
1974
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
1,092!
1975
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
1,092!
1976
  if (!keys_list || !keys_list_sizes || !key_list) {
1,092!
1977
    taosMemoryFree(keys_list);
×
1978
    taosMemoryFree(keys_list_sizes);
×
1979
    TAOS_RETURN(terrno);
×
1980
  }
1981
  char  **values_list = NULL;
1,092✔
1982
  size_t *values_list_sizes = NULL;
1,092✔
1983
  for (int i = 0; i < num_keys; ++i) {
2,789✔
1984
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
1,698✔
1985
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
1,697✔
1986
    keys_list_sizes[i] = ROCKS_KEY_LEN;
1,697✔
1987
  }
1988

1989
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
1,091✔
1990

1991
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
1,087✔
1992
                                     &values_list_sizes);
1993
  if (code) {
1,091!
1994
    taosMemoryFree(key_list);
×
1995
    taosMemoryFree(keys_list);
×
1996
    taosMemoryFree(keys_list_sizes);
×
1997
    TAOS_RETURN(code);
×
1998
  }
1999

2000
  SLRUCache *pCache = pTsdb->lruCache;
1,091✔
2001
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
2,784!
2002
    SLastCol *pLastCol = NULL;
1,696✔
2003
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
1,696✔
2004
    if (ignore) {
1,696✔
2005
      ++j;
1✔
2006
      continue;
1✔
2007
    }
2008

2009
    if (values_list[i] != NULL) {
1,695✔
2010
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
75✔
2011
      if (code != TSDB_CODE_SUCCESS) {
75✔
2012
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
3!
2013
                  tstrerror(code));
2014
        goto _exit;
×
2015
      }
2016
    }
2017
    SLastCol *pToFree = pLastCol;
1,692✔
2018
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
1,692✔
2019
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
1,767!
2020
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
75✔
2021
      if (code) {
75!
2022
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
2023
        taosMemoryFreeClear(pToFree);
×
2024
        TAOS_CHECK_EXIT(code);
×
2025
      }
2026

2027
      SLastCol lastCol = *pLastCol;
75✔
2028
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
75✔
2029
      if (TSDB_CODE_SUCCESS != code) {
75!
2030
        taosMemoryFreeClear(pToFree);
×
2031
        TAOS_CHECK_EXIT(code);
×
2032
      }
2033

2034
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
75✔
2035
      taosArrayRemove(remainCols, j);
75✔
2036
      taosArrayRemove(ignoreFromRocks, j);
75✔
2037
    } else {
2038
      ++j;
1,617✔
2039
    }
2040

2041
    taosMemoryFreeClear(pToFree);
1,692!
2042
  }
2043

2044
  if (TARRAY_SIZE(remainCols) > 0) {
1,088✔
2045
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
2046
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
1,053✔
2047
  }
2048

2049
_exit:
35✔
2050
  taosMemoryFree(key_list);
1,088!
2051
  taosMemoryFree(keys_list);
1,092!
2052
  taosMemoryFree(keys_list_sizes);
1,093!
2053
  if (values_list) {
1,093!
2054
  #ifdef USE_ROCKSDB
2055
    for (int i = 0; i < num_keys; ++i) {
2,791✔
2056
      rocksdb_free(values_list[i]);
1,699✔
2057
    }
2058
  #endif
2059
    taosMemoryFree(values_list);
1,092!
2060
  }
2061
  taosMemoryFree(values_list_sizes);
1,093!
2062

2063
  TAOS_RETURN(code);
1,092✔
2064
}
2065

2066
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
29,702✔
2067
                                        int8_t ltype, SArray *keyArray) {
2068
  int32_t    code = 0, lino = 0;
29,702✔
2069
  SArray    *remainCols = NULL;
29,702✔
2070
  SArray    *ignoreFromRocks = NULL;
29,702✔
2071
  SLRUCache *pCache = pTsdb->lruCache;
29,702✔
2072
  SArray    *pCidList = pr->pCidList;
29,702✔
2073
  int        numKeys = TARRAY_SIZE(pCidList);
29,702✔
2074

2075
  for (int i = 0; i < numKeys; ++i) {
78,982✔
2076
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
49,241✔
2077

2078
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
49,241✔
2079
    // for select last_row, last case
2080
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
49,241✔
2081
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
49,241✔
2082
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
2,710✔
2083
    }
2084
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
49,216✔
2085
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
1,419✔
2086
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
1,419✔
2087
    }
2088

2089
    if (!taosArrayPush(keyArray, &key)) {
49,213!
2090
      TAOS_CHECK_EXIT(terrno);
×
2091
    }
2092

2093
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
49,213✔
2094
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
49,269✔
2095
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
96,757✔
2096
      SLastCol lastCol = *pLastCol;
47,560✔
2097
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
47,560!
2098
        tsdbLRUCacheRelease(pCache, h, false);
×
2099
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
2100
      }
2101

2102
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
47,495!
2103
        code = terrno;
×
2104
        tsdbLRUCacheRelease(pCache, h, false);
×
2105
        goto _exit;
×
2106
      }
2107
    } else {
2108
      // no cache or cache is invalid
2109
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
1,702✔
2110
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
1,702✔
2111

2112
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
1,699!
2113
        code = terrno;
×
2114
        tsdbLRUCacheRelease(pCache, h, false);
×
2115
        goto _exit;
×
2116
      }
2117

2118
      if (!remainCols) {
1,699✔
2119
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
1,092!
2120
          code = terrno;
×
2121
          tsdbLRUCacheRelease(pCache, h, false);
×
2122
          goto _exit;
×
2123
        }
2124
      }
2125
      if (!ignoreFromRocks) {
1,700✔
2126
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
1,093!
2127
          code = terrno;
×
2128
          tsdbLRUCacheRelease(pCache, h, false);
×
2129
          goto _exit;
×
2130
        }
2131
      }
2132
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
3,400!
2133
        code = terrno;
×
2134
        tsdbLRUCacheRelease(pCache, h, false);
×
2135
        goto _exit;
×
2136
      }
2137
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
1,700!
2138
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
1,699!
2139
        code = terrno;
×
2140
        tsdbLRUCacheRelease(pCache, h, false);
×
2141
        goto _exit;
×
2142
      }
2143
    }
2144

2145
    if (h) {
49,194✔
2146
      tsdbLRUCacheRelease(pCache, h, false);
47,480✔
2147
    }
2148
  }
2149

2150
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
29,741!
2151
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
1,093✔
2152

2153
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
2,792✔
2154
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
1,699✔
2155
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
1,699✔
2156
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
1,700✔
2157
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
1,700!
2158
        SLastCol lastCol = *pLastCol;
×
2159
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
2160
        if (code) {
×
2161
          tsdbLRUCacheRelease(pCache, h, false);
×
2162
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
2163
          TAOS_RETURN(code);
×
2164
        }
2165

2166
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
2167

2168
        taosArrayRemove(remainCols, i);
×
2169
        taosArrayRemove(ignoreFromRocks, i);
×
2170
      } else {
2171
        // no cache or cache is invalid
2172
        ++i;
1,700✔
2173
      }
2174
      if (h) {
1,700✔
2175
        tsdbLRUCacheRelease(pCache, h, false);
1✔
2176
      }
2177
    }
2178

2179
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
2180
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
1,093✔
2181

2182
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
1,090✔
2183
  }
2184

2185
_exit:
28,648✔
2186
  if (remainCols) {
29,714✔
2187
    taosArrayDestroy(remainCols);
1,093✔
2188
  }
2189
  if (ignoreFromRocks) {
29,714✔
2190
    taosArrayDestroy(ignoreFromRocks);
1,092✔
2191
  }
2192

2193
  TAOS_RETURN(code);
29,714✔
2194
}
2195

2196
typedef enum SMEMNEXTROWSTATES {
2197
  SMEMNEXTROW_ENTER,
2198
  SMEMNEXTROW_NEXT,
2199
} SMEMNEXTROWSTATES;
2200

2201
typedef struct SMemNextRowIter {
2202
  SMEMNEXTROWSTATES state;
2203
  STbData          *pMem;  // [input]
2204
  STbDataIter       iter;  // mem buffer skip list iterator
2205
  int64_t           lastTs;
2206
} SMemNextRowIter;
2207

2208
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
1,134,564✔
2209
                                 int nCols) {
2210
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
1,134,564✔
2211
  int32_t          code = 0;
1,134,564✔
2212
  *pIgnoreEarlierTs = false;
1,134,564✔
2213
  switch (state->state) {
1,134,564✔
2214
    case SMEMNEXTROW_ENTER: {
9,018✔
2215
      if (state->pMem != NULL) {
9,018!
2216
        /*
2217
        if (state->pMem->maxKey <= state->lastTs) {
2218
          *ppRow = NULL;
2219
          *pIgnoreEarlierTs = true;
2220

2221
          TAOS_RETURN(code);
2222
        }
2223
        */
2224
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
9,019✔
2225

2226
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
9,014!
2227
        if (pMemRow) {
9,014!
2228
          *ppRow = pMemRow;
9,016✔
2229
          state->state = SMEMNEXTROW_NEXT;
9,016✔
2230

2231
          TAOS_RETURN(code);
9,016✔
2232
        }
2233
      }
2234

2235
      *ppRow = NULL;
×
2236

2237
      TAOS_RETURN(code);
×
2238
    }
2239
    case SMEMNEXTROW_NEXT:
1,125,467✔
2240
      if (tsdbTbDataIterNext(&state->iter)) {
1,125,467!
2241
        *ppRow = tsdbTbDataIterGet(&state->iter);
1,112,534!
2242

2243
        TAOS_RETURN(code);
1,112,534✔
2244
      } else {
2245
        *ppRow = NULL;
×
2246

2247
        TAOS_RETURN(code);
×
2248
      }
2249
    default:
79✔
2250
      break;
79✔
2251
  }
2252

2253
_err:
79✔
2254
  *ppRow = NULL;
79✔
2255

2256
  TAOS_RETURN(code);
79✔
2257
}
2258

2259
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2260
                                  int nCols);
2261
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2262

2263
typedef struct {
2264
  TSDBROW             *pRow;
2265
  bool                 stop;
2266
  bool                 next;
2267
  bool                 ignoreEarlierTs;
2268
  void                *iter;
2269
  _next_row_fn_t       nextRowFn;
2270
  _next_row_clear_fn_t nextRowClearFn;
2271
} TsdbNextRowState;
2272

2273
typedef struct {
2274
  SArray           *pMemDelData;
2275
  SArray           *pSkyline;
2276
  int64_t           iSkyline;
2277
  SBlockIdx         idx;
2278
  SMemNextRowIter   memState;
2279
  SMemNextRowIter   imemState;
2280
  TSDBROW           memRow, imemRow;
2281
  TsdbNextRowState  input[2];
2282
  SCacheRowsReader *pr;
2283
  STsdb            *pTsdb;
2284
} MemNextRowIter;
2285

2286
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
29,713✔
2287
                              STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
2288
  int32_t code = 0, lino = 0;
29,713✔
2289

2290
  STbData *pMem = NULL;
29,713✔
2291
  if (pReadSnap->pMem) {
29,713!
2292
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
29,722✔
2293
  }
2294

2295
  STbData *pIMem = NULL;
29,728✔
2296
  if (pReadSnap->pIMem) {
29,728✔
2297
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
15✔
2298
  }
2299

2300
  pIter->pTsdb = pTsdb;
29,728✔
2301

2302
  pIter->pMemDelData = NULL;
29,728✔
2303

2304
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
29,728!
2305

2306
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
29,716✔
2307

2308
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
29,716✔
2309
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
29,716✔
2310

2311
  if (pMem) {
29,716✔
2312
    pIter->memState.pMem = pMem;
8,856✔
2313
    pIter->memState.state = SMEMNEXTROW_ENTER;
8,856✔
2314
    pIter->input[0].stop = false;
8,856✔
2315
    pIter->input[0].next = true;
8,856✔
2316
  }
2317

2318
  if (pIMem) {
29,716✔
2319
    pIter->imemState.pMem = pIMem;
15✔
2320
    pIter->imemState.state = SMEMNEXTROW_ENTER;
15✔
2321
    pIter->input[1].stop = false;
15✔
2322
    pIter->input[1].next = true;
15✔
2323
  }
2324

2325
  pIter->pr = pr;
29,716✔
2326

2327
_exit:
29,716✔
2328
  if (code) {
29,716!
2329
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2330
  }
2331

2332
  TAOS_RETURN(code);
29,716✔
2333
}
2334

2335
static void memRowIterClose(MemNextRowIter *pIter) {
29,719✔
2336
  for (int i = 0; i < 2; ++i) {
89,160✔
2337
    if (pIter->input[i].nextRowClearFn) {
59,441!
2338
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
2339
    }
2340
  }
2341

2342
  if (pIter->pSkyline) {
29,719✔
2343
    taosArrayDestroy(pIter->pSkyline);
8,855✔
2344
  }
2345

2346
  if (pIter->pMemDelData) {
29,718!
2347
    taosArrayDestroy(pIter->pMemDelData);
29,721✔
2348
  }
2349
}
29,721✔
2350

2351
static void freeTableInfoFunc(void *param) {
9,705✔
2352
  void **p = (void **)param;
9,705✔
2353
  taosMemoryFreeClear(*p);
9,705!
2354
}
9,705✔
2355

2356
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
9,883✔
2357
  if (!pReader->pTableMap) {
9,883✔
2358
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
7,217✔
2359
    if (!pReader->pTableMap) {
7,221!
2360
      return NULL;
×
2361
    }
2362

2363
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
7,221✔
2364
  }
2365

2366
  STableLoadInfo  *pInfo = NULL;
9,887✔
2367
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
9,887✔
2368
  if (!ppInfo) {
9,883✔
2369
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
9,701!
2370
    if (pInfo) {
9,703!
2371
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
9,705!
2372
        return NULL;
×
2373
      }
2374
    }
2375

2376
    return pInfo;
9,698✔
2377
  }
2378

2379
  return *ppInfo;
182✔
2380
}
2381

2382
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
1,156,157✔
2383
  int32_t code = 0, lino = 0;
1,156,157✔
2384

2385
  for (;;) {
×
2386
    for (int i = 0; i < 2; ++i) {
3,428,572✔
2387
      if (pIter->input[i].next && !pIter->input[i].stop) {
2,293,409!
2388
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
1,135,901!
2389
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
2390
                        &lino, _exit);
2391

2392
        if (pIter->input[i].pRow == NULL) {
1,115,024✔
2393
          pIter->input[i].stop = true;
240✔
2394
          pIter->input[i].next = false;
240✔
2395
        }
2396
      }
2397
    }
2398

2399
    if (pIter->input[0].stop && pIter->input[1].stop) {
1,135,163✔
2400
      return NULL;
1,142,123✔
2401
    }
2402

2403
    TSDBROW *max[2] = {0};
1,114,044✔
2404
    int      iMax[2] = {-1, -1};
1,114,044✔
2405
    int      nMax = 0;
1,114,044✔
2406
    SRowKey  maxKey = {.ts = TSKEY_MIN};
1,114,044✔
2407

2408
    for (int i = 0; i < 2; ++i) {
3,347,706✔
2409
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
2,223,446!
2410
        STsdbRowKey tsdbRowKey = {0};
1,109,047✔
2411
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
1,109,047✔
2412

2413
        // merging & deduplicating on client side
2414
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
1,106,845✔
2415
        if (c <= 0) {
1,119,263✔
2416
          if (c < 0) {
1,119,022✔
2417
            nMax = 0;
1,101,045✔
2418
            maxKey = tsdbRowKey.key;
1,101,045✔
2419
          }
2420

2421
          iMax[nMax] = i;
1,119,022✔
2422
          max[nMax++] = pIter->input[i].pRow;
1,119,022✔
2423
        }
2424
        pIter->input[i].next = false;
1,119,263✔
2425
      }
2426
    }
2427

2428
    TSDBROW *merge[2] = {0};
1,124,260✔
2429
    int      iMerge[2] = {-1, -1};
1,124,260✔
2430
    int      nMerge = 0;
1,124,260✔
2431
    for (int i = 0; i < nMax; ++i) {
2,235,811✔
2432
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
1,114,924!
2433

2434
      if (!pIter->pSkyline) {
1,114,924✔
2435
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
8,847✔
2436
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
8,854!
2437

2438
        uint64_t        uid = pIter->idx.uid;
8,854✔
2439
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
8,854✔
2440
        TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
8,852!
2441

2442
        if (pInfo->pTombData == NULL) {
8,852✔
2443
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
8,705✔
2444
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
8,707!
2445
        }
2446

2447
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
8,854!
2448
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2449
        }
2450

2451
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
8,857✔
2452
        if (delSize > 0) {
8,857✔
2453
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
1✔
2454
          TAOS_CHECK_GOTO(code, &lino, _exit);
1!
2455
        }
2456
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
8,857✔
2457
      }
2458

2459
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
1,114,933✔
2460
      if (!deleted) {
1,111,551!
2461
        iMerge[nMerge] = iMax[i];
1,112,167✔
2462
        merge[nMerge++] = max[i];
1,112,167✔
2463
      }
2464

2465
      pIter->input[iMax[i]].next = deleted;
1,111,551✔
2466
    }
2467

2468
    if (nMerge > 0) {
1,120,887!
2469
      pIter->input[iMerge[0]].next = true;
1,121,004✔
2470

2471
      return merge[0];
1,121,004✔
2472
    }
2473
  }
2474

2475
_exit:
×
2476
  if (code) {
×
2477
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2478
  }
2479

2480
  return NULL;
×
2481
}
2482

2483
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
6,300✔
2484
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
6,300✔
2485
  *ppDst = taosMemoryMalloc(len);
6,300!
2486
  if (NULL == *ppDst) {
6,300!
2487
    TAOS_RETURN(terrno);
×
2488
  }
2489
  memcpy(*ppDst, pSrc, len);
6,300✔
2490

2491
  TAOS_RETURN(TSDB_CODE_SUCCESS);
6,300✔
2492
}
2493

2494
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
1,102,342✔
2495
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
1,102,342✔
2496
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
6,300✔
2497
  }
2498

2499
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
1,096,042!
2500
    TAOS_RETURN(TSDB_CODE_SUCCESS);
1,095,836✔
2501
  }
2502

2503
  taosMemoryFreeClear(pReader->pCurrSchema);
206!
2504
  TAOS_RETURN(
206✔
2505
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
2506
}
2507

2508
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
29,693✔
2509
                                        SArray *keyArray) {
2510
  int32_t        code = 0;
29,693✔
2511
  int32_t        lino = 0;
29,693✔
2512
  STSchema      *pTSchema = pr->pSchema;
29,693✔
2513
  SLRUCache     *pCache = pTsdb->lruCache;
29,693✔
2514
  SArray        *pCidList = pr->pCidList;
29,693✔
2515
  int            numKeys = TARRAY_SIZE(pCidList);
29,693✔
2516
  MemNextRowIter iter = {0};
29,693✔
2517
  SSHashObj     *iColHash = NULL;
29,693✔
2518
  STSDBRowIter   rowIter = {0};
29,693✔
2519

2520
  // 1, get from mem, imem filtered with delete info
2521
  TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
29,693!
2522

2523
  TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
29,710✔
2524
  if (!pRow) {
29,727✔
2525
    goto _exit;
20,872✔
2526
  }
2527

2528
  int32_t sversion = TSDBROW_SVERSION(pRow);
8,855!
2529
  if (sversion != -1) {
8,855!
2530
    TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
8,857!
2531

2532
    pTSchema = pr->pCurrSchema;
8,850✔
2533
  }
2534
  int32_t nCol = pTSchema->numOfCols;
8,848✔
2535

2536
  STsdbRowKey rowKey = {0};
8,848✔
2537
  tsdbRowGetKey(pRow, &rowKey);
8,848✔
2538

2539
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
8,852!
2540

2541
  int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
8,853✔
2542
  for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) {
73,947!
2543
    SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
65,091✔
2544
    if (pColVal->cid < pTargetCol->colVal.cid) {
65,091✔
2545
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
50,013✔
2546

2547
      continue;
50,012✔
2548
    }
2549
    if (pColVal->cid > pTargetCol->colVal.cid) {
15,078!
2550
      break;
×
2551
    }
2552

2553
    int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
15,078✔
2554
    if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
15,078✔
2555
      if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
8,160!
2556
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
8,160✔
2557
        TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
8,160!
2558

2559
        tsdbCacheFreeSLastColItem(pTargetCol);
8,167✔
2560
        taosArraySet(pLastArray, jCol, &lastCol);
8,166✔
2561
      }
2562
    } else {
2563
      if (COL_VAL_IS_VALUE(pColVal)) {
6,918✔
2564
        if (cmp_res <= 0) {
5,902!
2565
          SLastCol lastCol = {
5,903✔
2566
              .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2567
          TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
5,903!
2568

2569
          tsdbCacheFreeSLastColItem(pTargetCol);
5,906✔
2570
          taosArraySet(pLastArray, jCol, &lastCol);
5,904✔
2571
        }
2572
      } else {
2573
        if (!iColHash) {
1,016✔
2574
          iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
443✔
2575
          if (iColHash == NULL) {
443!
2576
            TAOS_CHECK_EXIT(terrno);
×
2577
          }
2578
        }
2579

2580
        if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
1,016!
2581
          TAOS_CHECK_EXIT(terrno);
×
2582
        }
2583
      }
2584
    }
2585

2586
    ++jCol;
15,083✔
2587

2588
    if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) {
15,083✔
2589
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
6,232✔
2590
    }
2591
  }
2592
  tsdbRowClose(&rowIter);
8,854✔
2593

2594
  if (iColHash && tSimpleHashGetSize(iColHash) > 0) {
8,855!
2595
    pRow = memRowIterGet(&iter, false, NULL, 0);
443✔
2596
    while (pRow) {
1,103,353✔
2597
      if (tSimpleHashGetSize(iColHash) == 0) {
1,103,113✔
2598
        break;
202✔
2599
      }
2600

2601
      sversion = TSDBROW_SVERSION(pRow);
1,095,277!
2602
      if (sversion != -1) {
1,095,277!
2603
        TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
1,095,286!
2604

2605
        pTSchema = pr->pCurrSchema;
1,081,952✔
2606
      }
2607
      nCol = pTSchema->numOfCols;
1,081,943✔
2608

2609
      STsdbRowKey tsdbRowKey = {0};
1,081,943✔
2610
      tsdbRowGetKey(pRow, &tsdbRowKey);
1,081,943✔
2611

2612
      TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
1,078,939!
2613

2614
      iCol = 0;
1,118,771✔
2615
      for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
9,848,319!
2616
           pColVal = tsdbRowIterNext(&rowIter), iCol++) {
8,089,779✔
2617
        int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
9,075,219✔
2618
        if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
8,089,778✔
2619
          SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol];
775✔
2620

2621
          int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key);
775✔
2622
          if (cmp_res <= 0) {
775!
2623
            SLastCol lastCol = {
775✔
2624
                .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2625
            TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
775!
2626

2627
            tsdbCacheFreeSLastColItem(pTargetCol);
776✔
2628
            taosArraySet(pLastArray, *pjCol, &lastCol);
776✔
2629
          }
2630

2631
          TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
776!
2632
        }
2633
      }
2634
      tsdbRowClose(&rowIter);
720,415✔
2635

2636
      pRow = memRowIterGet(&iter, false, NULL, 0);
1,129,179✔
2637
    }
2638
  }
2639

2640
_exit:
8,652✔
2641
  if (code) {
29,726!
2642
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2643

2644
    tsdbRowClose(&rowIter);
×
2645
  }
2646

2647
  tSimpleHashCleanup(iColHash);
29,726✔
2648

2649
  memRowIterClose(&iter);
29,723✔
2650

2651
  TAOS_RETURN(code);
29,724✔
2652
}
2653

2654
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
29,698✔
2655
  int32_t code = 0;
29,698✔
2656
  int32_t lino = 0;
29,698✔
2657

2658
  SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
29,698✔
2659
  if (!keyArray) {
29,704!
2660
    TAOS_CHECK_EXIT(terrno);
×
2661
  }
2662

2663
  TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
29,704!
2664

2665
  if (tsUpdateCacheBatch) {
29,718!
2666
    TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
29,721!
2667
  }
2668

2669
_exit:
29,717✔
2670
  if (code) {
29,717!
2671
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2672
  }
2673

2674
  if (keyArray) {
29,723!
2675
    taosArrayDestroy(keyArray);
29,723✔
2676
  }
2677

2678
  TAOS_RETURN(code);
29,724✔
2679
}
2680

2681
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
60,403✔
2682
  int32_t   code = 0, lino = 0;
60,403✔
2683
  STSchema *pTSchema = NULL;
60,403✔
2684
  int       sver = -1;
60,403✔
2685
  int       numKeys = 0;
60,403✔
2686
  SArray   *remainCols = NULL;
60,403✔
2687

2688
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
60,403!
2689

2690
  int numCols = pTSchema->numOfCols;
60,404✔
2691

2692
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
60,404✔
2693

2694
  for (int i = 0; i < numCols; ++i) {
352,073✔
2695
    int16_t cid = pTSchema->columns[i].colId;
291,661✔
2696
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
874,969✔
2697
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
583,300✔
2698
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
583,300✔
2699
      if (h) {
583,339✔
2700
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
14,154✔
2701
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
14,154!
2702
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
13,292✔
2703
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
13,292✔
2704
                              .dirty = 1,
2705
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2706
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
13,292✔
2707
        }
2708
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
14,154✔
2709
        TAOS_CHECK_EXIT(code);
14,154!
2710
      } else {
2711
        if (!remainCols) {
569,185✔
2712
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
60,380✔
2713
        }
2714
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
1,138,338!
2715
          TAOS_CHECK_EXIT(terrno);
×
2716
        }
2717
      }
2718
    }
2719
  }
2720

2721
  if (remainCols) {
60,412✔
2722
    numKeys = TARRAY_SIZE(remainCols);
60,380✔
2723
  }
2724

2725
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
60,412!
2726
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
60,403!
2727
  char  **values_list = NULL;
60,404✔
2728
  size_t *values_list_sizes = NULL;
60,404✔
2729

2730
  if (!keys_list || !keys_list_sizes) {
60,404!
2731
    code = terrno;
×
2732
    goto _exit;
×
2733
  }
2734
  const size_t klen = ROCKS_KEY_LEN;
60,404✔
2735

2736
  for (int i = 0; i < numKeys; ++i) {
629,464✔
2737
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
569,058!
2738
    if (!key) {
569,187!
2739
      code = terrno;
×
2740
      goto _exit;
×
2741
    }
2742
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
569,187✔
2743

2744
    ((SLastKey *)key)[0] = idxKey->key;
569,060✔
2745

2746
    keys_list[i] = key;
569,060✔
2747
    keys_list_sizes[i] = klen;
569,060✔
2748
  }
2749

2750
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
60,406✔
2751

2752
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
60,403!
2753
                                              &values_list, &values_list_sizes),
2754
                  NULL, _exit);
2755

2756
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
2757
  for (int i = 0; i < numKeys; ++i) {
629,483✔
2758
    SLastCol *pLastCol = NULL;
569,082✔
2759
    if (values_list[i] != NULL) {
569,082!
2760
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
2761
      if (code != TSDB_CODE_SUCCESS) {
×
2762
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2763
                  tstrerror(code));
2764
        goto _exit;
×
2765
      }
2766
    }
2767
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
569,082✔
2768
    SLastKey *pLastKey = &idxKey->key;
569,088✔
2769
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
569,088!
2770
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
×
2771
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
×
2772
                             .dirty = 0,
2773
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2774

2775
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
×
2776
        taosMemoryFreeClear(pLastCol);
×
2777
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2778
        goto _exit;
×
2779
      }
2780
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
×
2781
        taosMemoryFreeClear(pLastCol);
×
2782
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2783
        goto _exit;
×
2784
      }
2785
    }
2786

2787
    if (pLastCol == NULL) {
569,088✔
2788
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
569,087✔
2789
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2790
    }
2791

2792
    taosMemoryFreeClear(pLastCol);
569,081!
2793
  }
2794

2795
  rocksMayWrite(pTsdb, false);
60,401✔
2796

2797
_exit:
60,403✔
2798
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
60,403✔
2799

2800
  for (int i = 0; i < numKeys; ++i) {
629,595✔
2801
    taosMemoryFree(keys_list[i]);
569,189!
2802
  }
2803
  taosMemoryFree(keys_list);
60,406!
2804
  taosMemoryFree(keys_list_sizes);
60,403!
2805
  if (values_list) {
60,404!
2806
 #if USE_ROCKSDB   
2807
    for (int i = 0; i < numKeys; ++i) {
629,583✔
2808
      rocksdb_free(values_list[i]);
569,183✔
2809
    }
2810
#endif
2811
    taosMemoryFree(values_list);
60,400!
2812
  }
2813
  taosMemoryFree(values_list_sizes);
60,404!
2814
  taosArrayDestroy(remainCols);
60,402✔
2815
  taosMemoryFree(pTSchema);
60,404!
2816

2817
  TAOS_RETURN(code);
60,404✔
2818
}
2819

2820
int32_t tsdbOpenCache(STsdb *pTsdb) {
15,652✔
2821
  int32_t code = 0, lino = 0;
15,652✔
2822
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
15,652✔
2823

2824
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
15,652✔
2825
  if (pCache == NULL) {
15,685!
2826
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2827
  }
2828

2829
  TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
15,685!
2830

2831
  TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
15,684!
2832

2833
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
15,685!
2834

2835
  taosLRUCacheSetStrictCapacity(pCache, false);
15,676✔
2836

2837
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
15,686✔
2838

2839
_err:
15,680✔
2840
  if (code) {
15,680!
2841
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2842
  }
2843

2844
  pTsdb->lruCache = pCache;
15,680✔
2845

2846
  TAOS_RETURN(code);
15,680✔
2847
}
2848

2849
void tsdbCloseCache(STsdb *pTsdb) {
15,687✔
2850
  SLRUCache *pCache = pTsdb->lruCache;
15,687✔
2851
  if (pCache) {
15,687!
2852
    taosLRUCacheEraseUnrefEntries(pCache);
15,687✔
2853

2854
    taosLRUCacheCleanup(pCache);
15,686✔
2855

2856
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
15,687✔
2857
  }
2858

2859
  tsdbCloseBCache(pTsdb);
15,687✔
2860
  tsdbClosePgCache(pTsdb);
15,687✔
2861
  tsdbCloseRocksCache(pTsdb);
15,687✔
2862
}
15,680✔
2863

2864
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
2865
  if (cacheType == 0) {  // last_row
×
2866
    *(uint64_t *)key = (uint64_t)uid;
×
2867
  } else {  // last
2868
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2869
  }
2870

2871
  *len = sizeof(uint64_t);
×
2872
}
×
2873

2874
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
2875
  tb_uid_t suid = 0;
×
2876

2877
  SMetaReader mr = {0};
×
2878
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
2879
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
2880
    metaReaderClear(&mr);  // table not esist
×
2881
    return 0;
×
2882
  }
2883

2884
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
2885
    suid = mr.me.ctbEntry.suid;
×
2886
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
2887
    suid = 0;
×
2888
  } else {
2889
    suid = 0;
×
2890
  }
2891

2892
  metaReaderClear(&mr);
×
2893

2894
  return suid;
×
2895
}
2896

2897
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
2898
  int32_t code = 0;
×
2899

2900
  if (pDelIdx) {
×
2901
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
2902
  }
2903

2904
  TAOS_RETURN(code);
×
2905
}
2906

2907
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
2908
  int32_t   code = 0;
×
2909
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
2910

2911
  for (; pDelData; pDelData = pDelData->pNext) {
×
2912
    if (!taosArrayPush(aDelData, pDelData)) {
×
2913
      TAOS_RETURN(terrno);
×
2914
    }
2915
  }
2916

2917
  TAOS_RETURN(code);
×
2918
}
2919

2920
static uint64_t *getUidList(SCacheRowsReader *pReader) {
10,283✔
2921
  if (!pReader->uidList) {
10,283✔
2922
    int32_t numOfTables = pReader->numOfTables;
453✔
2923

2924
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
453!
2925
    if (!pReader->uidList) {
453!
2926
      return NULL;
×
2927
    }
2928

2929
    for (int32_t i = 0; i < numOfTables; ++i) {
1,340✔
2930
      uint64_t uid = pReader->pTableList[i].uid;
887✔
2931
      pReader->uidList[i] = uid;
887✔
2932
    }
2933

2934
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
453✔
2935
  }
2936

2937
  return pReader->uidList;
10,284✔
2938
}
2939

2940
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
10,284✔
2941
                               bool isFile) {
2942
  int32_t   code = 0;
10,284✔
2943
  int32_t   numOfTables = pReader->numOfTables;
10,284✔
2944
  int64_t   suid = pReader->info.suid;
10,284✔
2945
  uint64_t *uidList = getUidList(pReader);
10,284✔
2946

2947
  if (!uidList) {
10,287!
2948
    TAOS_RETURN(terrno);
×
2949
  }
2950

2951
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
10,309!
2952
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
16✔
2953
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
16!
2954
      continue;
×
2955
    }
2956

2957
    if (pTombBlk->minTbid.suid > suid ||
16!
2958
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
16!
2959
      break;
2960
    }
2961

2962
    STombBlock block = {0};
16✔
2963
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
16✔
2964
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
16!
2965
    if (code != TSDB_CODE_SUCCESS) {
16!
2966
      TAOS_RETURN(code);
×
2967
    }
2968

2969
    uint64_t        uid = uidList[j];
16✔
2970
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
16✔
2971
    if (!pInfo) {
16!
2972
      tTombBlockDestroy(&block);
×
2973
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
2974
    }
2975

2976
    if (pInfo->pTombData == NULL) {
16✔
2977
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
2✔
2978
    }
2979

2980
    STombRecord record = {0};
16✔
2981
    bool        finished = false;
16✔
2982
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
32✔
2983
      code = tTombBlockGet(&block, k, &record);
16✔
2984
      if (code != TSDB_CODE_SUCCESS) {
16!
2985
        finished = true;
×
2986
        break;
×
2987
      }
2988

2989
      if (record.suid < suid) {
16!
2990
        continue;
×
2991
      }
2992
      if (record.suid > suid) {
16!
2993
        finished = true;
×
2994
        break;
×
2995
      }
2996

2997
      bool newTable = false;
16✔
2998
      if (uid < record.uid) {
16!
2999
        while (j < numOfTables && uidList[j] < record.uid) {
96!
3000
          ++j;
80✔
3001
          newTable = true;
80✔
3002
        }
3003

3004
        if (j >= numOfTables) {
16!
3005
          finished = true;
×
3006
          break;
×
3007
        }
3008

3009
        uid = uidList[j];
16✔
3010
      }
3011

3012
      if (record.uid < uid) {
16!
3013
        continue;
×
3014
      }
3015

3016
      if (newTable) {
16!
3017
        pInfo = getTableLoadInfo(pReader, uid);
16✔
3018
        if (!pInfo) {
16!
3019
          code = TSDB_CODE_OUT_OF_MEMORY;
×
3020
          finished = true;
×
3021
          break;
×
3022
        }
3023
        if (pInfo->pTombData == NULL) {
16✔
3024
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
2✔
3025
          if (!pInfo->pTombData) {
2!
3026
            code = terrno;
×
3027
            finished = true;
×
3028
            break;
×
3029
          }
3030
        }
3031
      }
3032

3033
      if (record.version <= pReader->info.verRange.maxVer) {
16!
3034
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
3035
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
3036

3037
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
16✔
3038
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
32!
3039
          TAOS_RETURN(terrno);
×
3040
        }
3041
      }
3042
    }
3043

3044
    tTombBlockDestroy(&block);
16✔
3045

3046
    if (finished) {
16!
3047
      TAOS_RETURN(code);
×
3048
    }
3049
  }
3050

3051
  TAOS_RETURN(TSDB_CODE_SUCCESS);
10,293✔
3052
}
3053

3054
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
16✔
3055
  const TTombBlkArray *pBlkArray = NULL;
16✔
3056

3057
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
16!
3058

3059
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
16✔
3060
}
3061

3062
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
10,273✔
3063
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
10,273✔
3064
  const TTombBlkArray *pBlkArray = NULL;
10,273✔
3065

3066
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
10,273!
3067

3068
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
10,279✔
3069
}
3070

3071
typedef struct {
3072
  SMergeTree  mergeTree;
3073
  SMergeTree *pMergeTree;
3074
} SFSLastIter;
3075

3076
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
10,279✔
3077
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
3078
  int32_t code = 0;
10,279✔
3079
  destroySttBlockReader(pr->pLDataIterArray, NULL);
10,279✔
3080
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
10,347✔
3081
  if (pr->pLDataIterArray == NULL) return terrno;
10,346!
3082

3083
  SMergeTreeConf conf = {
10,346✔
3084
      .uid = uid,
3085
      .suid = suid,
3086
      .pTsdb = pTsdb,
3087
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
3088
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
3089
      .strictTimeRange = false,
3090
      .pSchema = pTSchema,
3091
      .pCurrentFileset = pFileSet,
3092
      .backward = 1,
3093
      .pSttFileBlockIterArray = pr->pLDataIterArray,
10,346✔
3094
      .pCols = aCols,
3095
      .numOfCols = nCols,
3096
      .loadTombFn = loadSttTomb,
3097
      .pReader = pr,
3098
      .idstr = pr->idstr,
10,346✔
3099
      .pCurRowKey = &pr->rowKey,
10,346✔
3100
  };
3101

3102
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
10,346!
3103

3104
  iter->pMergeTree = &iter->mergeTree;
10,262✔
3105

3106
  TAOS_RETURN(code);
10,262✔
3107
}
3108

3109
static int32_t lastIterClose(SFSLastIter **iter) {
2✔
3110
  int32_t code = 0;
2✔
3111

3112
  if ((*iter)->pMergeTree) {
2!
3113
    tMergeTreeClose((*iter)->pMergeTree);
2✔
3114
    (*iter)->pMergeTree = NULL;
2✔
3115
  }
3116

3117
  *iter = NULL;
2✔
3118

3119
  TAOS_RETURN(code);
2✔
3120
}
3121

3122
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
10,260✔
3123
  bool hasVal = false;
10,260✔
3124
  *ppRow = NULL;
10,260✔
3125

3126
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
10,260✔
3127
  if (code != 0) {
10,255!
3128
    return code;
×
3129
  }
3130

3131
  if (!hasVal) {
10,255✔
3132
    *ppRow = NULL;
9,415✔
3133
    TAOS_RETURN(code);
9,415✔
3134
  }
3135

3136
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
840✔
3137
  TAOS_RETURN(code);
840✔
3138
}
3139

3140
typedef enum SFSNEXTROWSTATES {
3141
  SFSNEXTROW_FS,
3142
  SFSNEXTROW_FILESET,
3143
  SFSNEXTROW_INDEXLIST,
3144
  SFSNEXTROW_BRINBLOCK,
3145
  SFSNEXTROW_BRINRECORD,
3146
  SFSNEXTROW_BLOCKDATA,
3147
  SFSNEXTROW_BLOCKROW,
3148
  SFSNEXTROW_NEXTSTTROW
3149
} SFSNEXTROWSTATES;
3150

3151
struct CacheNextRowIter;
3152

3153
typedef struct SFSNextRowIter {
3154
  SFSNEXTROWSTATES         state;         // [input]
3155
  SBlockIdx               *pBlockIdxExp;  // [input]
3156
  STSchema                *pTSchema;      // [input]
3157
  tb_uid_t                 suid;
3158
  tb_uid_t                 uid;
3159
  int32_t                  iFileSet;
3160
  STFileSet               *pFileSet;
3161
  TFileSetArray           *aDFileSet;
3162
  SArray                  *pIndexList;
3163
  int32_t                  iBrinIndex;
3164
  SBrinBlock               brinBlock;
3165
  SBrinBlock              *pBrinBlock;
3166
  int32_t                  iBrinRecord;
3167
  SBrinRecord              brinRecord;
3168
  SBlockData               blockData;
3169
  SBlockData              *pBlockData;
3170
  int32_t                  nRow;
3171
  int32_t                  iRow;
3172
  TSDBROW                  row;
3173
  int64_t                  lastTs;
3174
  SFSLastIter              lastIter;
3175
  SFSLastIter             *pLastIter;
3176
  int8_t                   lastEmpty;
3177
  TSDBROW                 *pLastRow;
3178
  SRow                    *pTSRow;
3179
  SRowMerger               rowMerger;
3180
  SCacheRowsReader        *pr;
3181
  struct CacheNextRowIter *pRowIter;
3182
} SFSNextRowIter;
3183

3184
static void clearLastFileSet(SFSNextRowIter *state);
3185

3186
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
1,059✔
3187
                                int nCols) {
3188
  int32_t         code = 0, lino = 0;
1,059✔
3189
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
1,059✔
3190
  STsdb          *pTsdb = state->pr->pTsdb;
1,059✔
3191

3192
  if (SFSNEXTROW_FS == state->state) {
1,059✔
3193
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
1,053✔
3194

3195
    state->state = SFSNEXTROW_FILESET;
1,053✔
3196
  }
3197

3198
  if (SFSNEXTROW_FILESET == state->state) {
1,059✔
3199
  _next_fileset:
1,054✔
3200
    clearLastFileSet(state);
10,451✔
3201

3202
    if (--state->iFileSet < 0) {
10,445✔
3203
      *ppRow = NULL;
167✔
3204

3205
      TAOS_RETURN(code);
167✔
3206
    } else {
3207
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
10,278✔
3208
    }
3209

3210
    STFileObj **pFileObj = state->pFileSet->farr;
10,278✔
3211
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
10,278!
3212
      if (state->pFileSet != state->pr->pCurFileSet) {
16!
3213
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
16✔
3214
        const char           *filesName[4] = {0};
16✔
3215
        if (pFileObj[0] != NULL) {
16!
3216
          conf.files[0].file = *pFileObj[0]->f;
16✔
3217
          conf.files[0].exist = true;
16✔
3218
          filesName[0] = pFileObj[0]->fname;
16✔
3219

3220
          conf.files[1].file = *pFileObj[1]->f;
16✔
3221
          conf.files[1].exist = true;
16✔
3222
          filesName[1] = pFileObj[1]->fname;
16✔
3223

3224
          conf.files[2].file = *pFileObj[2]->f;
16✔
3225
          conf.files[2].exist = true;
16✔
3226
          filesName[2] = pFileObj[2]->fname;
16✔
3227
        }
3228

3229
        if (pFileObj[3] != NULL) {
16!
3230
          conf.files[3].exist = true;
16✔
3231
          conf.files[3].file = *pFileObj[3]->f;
16✔
3232
          filesName[3] = pFileObj[3]->fname;
16✔
3233
        }
3234

3235
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
16!
3236

3237
        state->pr->pCurFileSet = state->pFileSet;
16✔
3238

3239
        code = loadDataTomb(state->pr, state->pr->pFileReader);
16✔
3240
        if (code != TSDB_CODE_SUCCESS) {
16!
3241
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3242
                    tstrerror(code));
3243
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3244
        }
3245

3246
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
16!
3247
      }
3248

3249
      if (!state->pIndexList) {
16!
3250
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
16✔
3251
        if (!state->pIndexList) {
16!
3252
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3253
        }
3254
      } else {
3255
        taosArrayClear(state->pIndexList);
×
3256
      }
3257

3258
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
16✔
3259

3260
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
32✔
3261
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
16✔
3262
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
16!
3263
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
16✔
3264
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
4!
3265
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3266
            }
3267
          }
3268
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
3269
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
3270
          break;
3271
        }
3272
      }
3273

3274
      int indexSize = TARRAY_SIZE(state->pIndexList);
16✔
3275
      if (indexSize <= 0) {
16✔
3276
        goto _check_stt_data;
14✔
3277
      }
3278

3279
      state->state = SFSNEXTROW_INDEXLIST;
2✔
3280
      state->iBrinIndex = 1;
2✔
3281
    }
3282

3283
  _check_stt_data:
10,262✔
3284
    if (state->pFileSet != state->pr->pCurFileSet) {
10,278✔
3285
      state->pr->pCurFileSet = state->pFileSet;
10,186✔
3286
    }
3287

3288
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
10,278!
3289
                                 state->pr, state->lastTs, aCols, nCols),
3290
                    &lino, _err);
3291

3292
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
10,255!
3293

3294
    if (!state->pLastRow) {
10,254✔
3295
      state->lastEmpty = 1;
9,406✔
3296

3297
      if (SFSNEXTROW_INDEXLIST != state->state) {
9,406✔
3298
        clearLastFileSet(state);
9,404✔
3299
        goto _next_fileset;
9,396✔
3300
      }
3301
    } else {
3302
      state->lastEmpty = 0;
848✔
3303

3304
      if (SFSNEXTROW_INDEXLIST != state->state) {
848!
3305
        state->state = SFSNEXTROW_NEXTSTTROW;
849✔
3306

3307
        *ppRow = state->pLastRow;
849✔
3308
        state->pLastRow = NULL;
849✔
3309

3310
        TAOS_RETURN(code);
849✔
3311
      }
3312
    }
3313

3314
    state->pLastIter = &state->lastIter;
1✔
3315
  }
3316

3317
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
6!
3318
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
6!
3319

3320
    if (!state->pLastRow) {
6✔
3321
      if (state->pLastIter) {
1!
3322
        code = lastIterClose(&state->pLastIter);
×
3323
        if (code != TSDB_CODE_SUCCESS) {
×
3324
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3325
                    tstrerror(code));
3326
          TAOS_RETURN(code);
×
3327
        }
3328
      }
3329

3330
      clearLastFileSet(state);
1✔
3331
      state->state = SFSNEXTROW_FILESET;
1✔
3332
      goto _next_fileset;
1✔
3333
    } else {
3334
      *ppRow = state->pLastRow;
5✔
3335
      state->pLastRow = NULL;
5✔
3336

3337
      TAOS_RETURN(code);
5✔
3338
    }
3339
  }
3340

UNCOV
3341
  if (SFSNEXTROW_INDEXLIST == state->state) {
×
3342
    SBrinBlk *pBrinBlk = NULL;
2✔
3343
  _next_brinindex:
2✔
3344
    if (--state->iBrinIndex < 0) {
2!
3345
      if (state->pLastRow) {
×
3346
        state->state = SFSNEXTROW_NEXTSTTROW;
×
3347
        *ppRow = state->pLastRow;
×
3348
        state->pLastRow = NULL;
×
3349
        return code;
×
3350
      }
3351

3352
      clearLastFileSet(state);
×
3353
      goto _next_fileset;
×
3354
    } else {
3355
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
2✔
3356
    }
3357

3358
    if (!state->pBrinBlock) {
2!
3359
      state->pBrinBlock = &state->brinBlock;
2✔
3360
    } else {
3361
      tBrinBlockClear(&state->brinBlock);
×
3362
    }
3363

3364
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
2!
3365

3366
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
2✔
3367
    state->state = SFSNEXTROW_BRINBLOCK;
2✔
3368
  }
3369

UNCOV
3370
  if (SFSNEXTROW_BRINBLOCK == state->state) {
×
3371
  _next_brinrecord:
2✔
3372
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
2!
3373
      tBrinBlockClear(&state->brinBlock);
×
3374
      goto _next_brinindex;
×
3375
    }
3376

3377
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
2!
3378

3379
    SBrinRecord *pRecord = &state->brinRecord;
2✔
3380
    if (pRecord->uid != state->uid) {
2!
3381
      // TODO: goto next brin block early
3382
      --state->iBrinRecord;
×
3383
      goto _next_brinrecord;
×
3384
    }
3385

3386
    state->state = SFSNEXTROW_BRINRECORD;
2✔
3387
  }
3388

UNCOV
3389
  if (SFSNEXTROW_BRINRECORD == state->state) {
×
3390
    SBrinRecord *pRecord = &state->brinRecord;
2✔
3391

3392
    if (!state->pBlockData) {
2!
3393
      state->pBlockData = &state->blockData;
2✔
3394

3395
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
2!
3396
    } else {
3397
      tBlockDataReset(state->pBlockData);
×
3398
    }
3399

3400
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
2!
3401
      --nCols;
2✔
3402
      ++aCols;
2✔
3403
    }
3404

3405
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
2!
3406
                                                      state->pTSchema, aCols, nCols),
3407
                    &lino, _err);
3408

3409
    state->nRow = state->blockData.nRow;
2✔
3410
    state->iRow = state->nRow - 1;
2✔
3411

3412
    state->state = SFSNEXTROW_BLOCKROW;
2✔
3413
  }
3414

UNCOV
3415
  if (SFSNEXTROW_BLOCKROW == state->state) {
×
3416
    if (state->iRow < 0) {
2!
3417
      --state->iBrinRecord;
×
3418
      goto _next_brinrecord;
×
3419
    }
3420

3421
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
2✔
3422
    if (!state->pLastIter) {
2!
3423
      *ppRow = &state->row;
×
3424
      --state->iRow;
×
3425
      return code;
2✔
3426
    }
3427

3428
    if (!state->pLastRow) {
2!
3429
      // get next row from fslast and process with fs row, --state->Row if select fs row
3430
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
2!
3431
    }
3432

3433
    if (!state->pLastRow) {
2!
3434
      if (state->pLastIter) {
2!
3435
        code = lastIterClose(&state->pLastIter);
2✔
3436
        if (code != TSDB_CODE_SUCCESS) {
2!
3437
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3438
                    tstrerror(code));
3439
          TAOS_RETURN(code);
×
3440
        }
3441
      }
3442

3443
      *ppRow = &state->row;
2✔
3444
      --state->iRow;
2✔
3445
      return code;
2✔
3446
    }
3447

3448
    // process state->pLastRow & state->row
3449
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
3450
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
3451
    if (lastRowTs > rowTs) {
×
3452
      *ppRow = state->pLastRow;
×
3453
      state->pLastRow = NULL;
×
3454

3455
      TAOS_RETURN(code);
×
3456
    } else if (lastRowTs < rowTs) {
×
3457
      *ppRow = &state->row;
×
3458
      --state->iRow;
×
3459

3460
      TAOS_RETURN(code);
×
3461
    } else {
3462
      // TODO: merge rows and *ppRow = mergedRow
3463
      SRowMerger *pMerger = &state->rowMerger;
×
3464
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
3465
      if (code != TSDB_CODE_SUCCESS) {
×
3466
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3467
                  tstrerror(code));
3468
        TAOS_RETURN(code);
×
3469
      }
3470

3471
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
3472
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
3473

3474
      if (state->pTSRow) {
×
3475
        taosMemoryFree(state->pTSRow);
×
3476
        state->pTSRow = NULL;
×
3477
      }
3478

3479
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
3480

3481
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
3482
      *ppRow = &state->row;
×
3483
      --state->iRow;
×
3484

3485
      tsdbRowMergerClear(pMerger);
×
3486

3487
      TAOS_RETURN(code);
×
3488
    }
3489
  }
3490

3491
_err:
×
3492
  clearLastFileSet(state);
×
3493

3494
  *ppRow = NULL;
×
3495

3496
  if (code) {
×
3497
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3498
              tstrerror(code));
3499
  }
3500

3501
  TAOS_RETURN(code);
×
3502
}
3503

3504
typedef struct CacheNextRowIter {
3505
  SArray           *pMemDelData;
3506
  SArray           *pSkyline;
3507
  int64_t           iSkyline;
3508
  SBlockIdx         idx;
3509
  SMemNextRowIter   memState;
3510
  SMemNextRowIter   imemState;
3511
  SFSNextRowIter    fsState;
3512
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
3513
  TsdbNextRowState  input[3];
3514
  SCacheRowsReader *pr;
3515
  STsdb            *pTsdb;
3516
} CacheNextRowIter;
3517

3518
int32_t clearNextRowFromFS(void *iter) {
1,048✔
3519
  int32_t code = 0;
1,048✔
3520

3521
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
1,048✔
3522
  if (!state) {
1,048!
3523
    TAOS_RETURN(code);
×
3524
  }
3525

3526
  if (state->pLastIter) {
1,048!
3527
    code = lastIterClose(&state->pLastIter);
×
3528
    if (code != TSDB_CODE_SUCCESS) {
×
3529
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3530
      TAOS_RETURN(code);
×
3531
    }
3532
  }
3533

3534
  if (state->pBlockData) {
1,048✔
3535
    tBlockDataDestroy(state->pBlockData);
2✔
3536
    state->pBlockData = NULL;
2✔
3537
  }
3538

3539
  if (state->pBrinBlock) {
1,048✔
3540
    tBrinBlockDestroy(state->pBrinBlock);
2✔
3541
    state->pBrinBlock = NULL;
2✔
3542
  }
3543

3544
  if (state->pIndexList) {
1,048✔
3545
    taosArrayDestroy(state->pIndexList);
16✔
3546
    state->pIndexList = NULL;
16✔
3547
  }
3548

3549
  if (state->pTSRow) {
1,048!
3550
    taosMemoryFree(state->pTSRow);
×
3551
    state->pTSRow = NULL;
×
3552
  }
3553

3554
  if (state->pRowIter->pSkyline) {
1,048✔
3555
    taosArrayDestroy(state->pRowIter->pSkyline);
995✔
3556
    state->pRowIter->pSkyline = NULL;
997✔
3557
  }
3558

3559
  TAOS_RETURN(code);
1,050✔
3560
}
3561

3562
static void clearLastFileSet(SFSNextRowIter *state) {
19,822✔
3563
  if (state->pLastIter) {
19,822!
3564
    int code = lastIterClose(&state->pLastIter);
×
3565
    if (code != TSDB_CODE_SUCCESS) {
×
3566
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3567
      return;
×
3568
    }
3569
  }
3570

3571
  if (state->pBlockData) {
19,822!
3572
    tBlockDataDestroy(state->pBlockData);
×
3573
    state->pBlockData = NULL;
×
3574
  }
3575

3576
  if (state->pr->pFileReader) {
19,822✔
3577
    tsdbDataFileReaderClose(&state->pr->pFileReader);
16✔
3578
    state->pr->pFileReader = NULL;
16✔
3579

3580
    state->pr->pCurFileSet = NULL;
16✔
3581
  }
3582

3583
  if (state->pTSRow) {
19,822!
3584
    taosMemoryFree(state->pTSRow);
×
3585
    state->pTSRow = NULL;
×
3586
  }
3587

3588
  if (state->pRowIter->pSkyline) {
19,822✔
3589
    taosArrayDestroy(state->pRowIter->pSkyline);
1✔
3590
    state->pRowIter->pSkyline = NULL;
1✔
3591

3592
    void   *pe = NULL;
1✔
3593
    int32_t iter = 0;
1✔
3594
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
2✔
3595
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
1✔
3596
      taosArrayDestroy(pInfo->pTombData);
1✔
3597
      pInfo->pTombData = NULL;
1✔
3598
    }
3599
  }
3600
}
3601

3602
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
1,053✔
3603
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
3604
                               SCacheRowsReader *pr) {
3605
  int32_t code = 0, lino = 0;
1,053✔
3606

3607
  STbData *pMem = NULL;
1,053✔
3608
  if (pReadSnap->pMem) {
1,053!
3609
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
1,053✔
3610
  }
3611

3612
  STbData *pIMem = NULL;
1,054✔
3613
  if (pReadSnap->pIMem) {
1,054!
3614
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
3615
  }
3616

3617
  pIter->pTsdb = pTsdb;
1,054✔
3618

3619
  pIter->pMemDelData = NULL;
1,054✔
3620

3621
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
1,054!
3622

3623
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
1,053✔
3624

3625
  pIter->fsState.pRowIter = pIter;
1,053✔
3626
  pIter->fsState.state = SFSNEXTROW_FS;
1,053✔
3627
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
1,053✔
3628
  pIter->fsState.pBlockIdxExp = &pIter->idx;
1,053✔
3629
  pIter->fsState.pTSchema = pTSchema;
1,053✔
3630
  pIter->fsState.suid = suid;
1,053✔
3631
  pIter->fsState.uid = uid;
1,053✔
3632
  pIter->fsState.lastTs = lastTs;
1,053✔
3633
  pIter->fsState.pr = pr;
1,053✔
3634

3635
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
1,053✔
3636
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
1,053✔
3637
  pIter->input[2] =
1,053✔
3638
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
1,053✔
3639

3640
  if (pMem) {
1,053✔
3641
    pIter->memState.pMem = pMem;
149✔
3642
    pIter->memState.state = SMEMNEXTROW_ENTER;
149✔
3643
    pIter->memState.lastTs = lastTs;
149✔
3644
    pIter->input[0].stop = false;
149✔
3645
    pIter->input[0].next = true;
149✔
3646
  }
3647

3648
  if (pIMem) {
1,053!
3649
    pIter->imemState.pMem = pIMem;
×
3650
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
3651
    pIter->imemState.lastTs = lastTs;
×
3652
    pIter->input[1].stop = false;
×
3653
    pIter->input[1].next = true;
×
3654
  }
3655

3656
  pIter->pr = pr;
1,053✔
3657

3658
_err:
1,053✔
3659
  TAOS_RETURN(code);
1,053✔
3660
}
3661

3662
static void nextRowIterClose(CacheNextRowIter *pIter) {
1,048✔
3663
  for (int i = 0; i < 3; ++i) {
4,196✔
3664
    if (pIter->input[i].nextRowClearFn) {
3,146✔
3665
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
1,050✔
3666
    }
3667
  }
3668

3669
  if (pIter->pSkyline) {
1,050!
3670
    taosArrayDestroy(pIter->pSkyline);
×
3671
  }
3672

3673
  if (pIter->pMemDelData) {
1,050!
3674
    taosArrayDestroy(pIter->pMemDelData);
1,054✔
3675
  }
3676
}
1,048✔
3677

3678
// iterate next row non deleted backward ts, version (from high to low)
3679
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
1,071✔
3680
                              int16_t *aCols, int nCols) {
3681
  int32_t code = 0, lino = 0;
1,071✔
3682

3683
  for (;;) {
1✔
3684
    for (int i = 0; i < 3; ++i) {
4,287✔
3685
      if (pIter->input[i].next && !pIter->input[i].stop) {
3,217!
3686
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
1,221!
3687
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3688
                        &lino, _err);
3689

3690
        if (pIter->input[i].pRow == NULL) {
1,219✔
3691
          pIter->input[i].stop = true;
210✔
3692
          pIter->input[i].next = false;
210✔
3693
        }
3694
      }
3695
    }
3696

3697
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
1,070!
3698
      *ppRow = NULL;
64✔
3699
      *pIgnoreEarlierTs =
64✔
3700
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
64!
3701

3702
      TAOS_RETURN(code);
1,071✔
3703
    }
3704

3705
    // select maxpoint(s) from mem, imem, fs and last
3706
    TSDBROW *max[4] = {0};
1,006✔
3707
    int      iMax[4] = {-1, -1, -1, -1};
1,006✔
3708
    int      nMax = 0;
1,006✔
3709
    SRowKey  maxKey = {.ts = TSKEY_MIN};
1,006✔
3710

3711
    for (int i = 0; i < 3; ++i) {
4,029✔
3712
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
3,024!
3713
        STsdbRowKey tsdbRowKey = {0};
1,012✔
3714
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
1,012✔
3715

3716
        // merging & deduplicating on client side
3717
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
1,012✔
3718
        if (c <= 0) {
1,011✔
3719
          if (c < 0) {
1,008!
3720
            nMax = 0;
1,008✔
3721
            maxKey = tsdbRowKey.key;
1,008✔
3722
          }
3723

3724
          iMax[nMax] = i;
1,008✔
3725
          max[nMax++] = pIter->input[i].pRow;
1,008✔
3726
        }
3727
        pIter->input[i].next = false;
1,011✔
3728
      }
3729
    }
3730

3731
    // delete detection
3732
    TSDBROW *merge[4] = {0};
1,005✔
3733
    int      iMerge[4] = {-1, -1, -1, -1};
1,005✔
3734
    int      nMerge = 0;
1,005✔
3735
    for (int i = 0; i < nMax; ++i) {
2,013✔
3736
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
1,008✔
3737

3738
      if (!pIter->pSkyline) {
1,008✔
3739
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
998✔
3740
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
998!
3741

3742
        uint64_t        uid = pIter->idx.uid;
998✔
3743
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
998✔
3744
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
996!
3745

3746
        if (pInfo->pTombData == NULL) {
996✔
3747
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
993✔
3748
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
994!
3749
        }
3750

3751
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
997!
3752
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3753
        }
3754

3755
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
998✔
3756
        if (delSize > 0) {
998✔
3757
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
4✔
3758
          TAOS_CHECK_GOTO(code, &lino, _err);
4!
3759
        }
3760
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
998✔
3761
      }
3762

3763
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
1,006✔
3764
      if (!deleted) {
1,008✔
3765
        iMerge[nMerge] = iMax[i];
1,007✔
3766
        merge[nMerge++] = max[i];
1,007✔
3767
      }
3768

3769
      pIter->input[iMax[i]].next = deleted;
1,008✔
3770
    }
3771

3772
    if (nMerge > 0) {
1,005✔
3773
      pIter->input[iMerge[0]].next = true;
1,004✔
3774

3775
      *ppRow = merge[0];
1,004✔
3776

3777
      TAOS_RETURN(code);
1,004✔
3778
    }
3779
  }
3780

3781
_err:
×
3782
  if (code) {
×
3783
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3784
  }
3785

3786
  TAOS_RETURN(code);
×
3787
}
3788

3789
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
1,050✔
3790
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
1,050✔
3791
  if (NULL == pColArray) {
1,051!
UNCOV
3792
    TAOS_RETURN(terrno);
×
3793
  }
3794

3795
  for (int32_t i = 0; i < nCols; ++i) {
3,269✔
3796
    int16_t  slotId = slotIds[i];
2,213✔
3797
    SLastCol col = {.rowKey.ts = 0,
2,213✔
3798
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
2,213✔
3799
    if (!taosArrayPush(pColArray, &col)) {
2,218!
3800
      TAOS_RETURN(terrno);
×
3801
    }
3802
  }
3803
  *ppColArray = pColArray;
1,056✔
3804

3805
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,056✔
3806
}
3807

3808
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
592✔
3809
                            int nCols, int16_t *slotIds) {
3810
  int32_t   code = 0, lino = 0;
592✔
3811
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
592✔
3812
  int16_t   nLastCol = nCols;
592✔
3813
  int16_t   noneCol = 0;
592✔
3814
  bool      setNoneCol = false;
592✔
3815
  bool      hasRow = false;
592✔
3816
  bool      ignoreEarlierTs = false;
592✔
3817
  SArray   *pColArray = NULL;
592✔
3818
  SColVal  *pColVal = &(SColVal){0};
592✔
3819

3820
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
592!
3821

3822
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
598✔
3823
  if (NULL == aColArray) {
598!
3824
    taosArrayDestroy(pColArray);
×
3825

3826
    TAOS_RETURN(terrno);
×
3827
  }
3828

3829
  for (int i = 0; i < nCols; ++i) {
1,856✔
3830
    if (!taosArrayPush(aColArray, &aCols[i])) {
2,516!
3831
      taosArrayDestroy(pColArray);
×
3832

3833
      TAOS_RETURN(terrno);
×
3834
    }
3835
  }
3836

3837
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
598✔
3838

3839
  // inverse iterator
3840
  CacheNextRowIter iter = {0};
598✔
3841
  code =
3842
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
598✔
3843
  TAOS_CHECK_GOTO(code, &lino, _err);
597!
3844

3845
  do {
3846
    TSDBROW *pRow = NULL;
615✔
3847
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
615✔
3848

3849
    if (!pRow) {
613✔
3850
      break;
588✔
3851
    }
3852

3853
    hasRow = true;
586✔
3854

3855
    int32_t sversion = TSDBROW_SVERSION(pRow);
586✔
3856
    if (sversion != -1) {
586✔
3857
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
137!
3858

3859
      pTSchema = pr->pCurrSchema;
137✔
3860
    }
3861
    // int16_t nCol = pTSchema->numOfCols;
3862

3863
    STsdbRowKey rowKey = {0};
586✔
3864
    tsdbRowGetKey(pRow, &rowKey);
586✔
3865

3866
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
588✔
3867
      lastRowKey = rowKey;
576✔
3868

3869
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
1,781✔
3870
        if (iCol >= nLastCol) {
1,207!
3871
          break;
×
3872
        }
3873
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
1,207✔
3874
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
1,206!
3875
          if (!setNoneCol) {
×
3876
            noneCol = iCol;
×
3877
            setNoneCol = true;
×
3878
          }
3879
          continue;
615✔
3880
        }
3881
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
1,206✔
3882
          continue;
40✔
3883
        }
3884
        if (slotIds[iCol] == 0) {
1,166✔
3885
          STColumn *pTColumn = &pTSchema->columns[0];
577✔
3886
          SValue    val = {.type = pTColumn->type};
577✔
3887
          VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
577✔
3888
          *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
577✔
3889

3890
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
577✔
3891
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
577!
3892

3893
          taosArraySet(pColArray, 0, &colTmp);
575✔
3894
          continue;
575✔
3895
        }
3896
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
589✔
3897

3898
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
590✔
3899
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
590!
3900

3901
        if (!COL_VAL_IS_VALUE(pColVal)) {
591✔
3902
          if (!setNoneCol) {
26✔
3903
            noneCol = iCol;
15✔
3904
            setNoneCol = true;
15✔
3905
          }
3906
        } else {
3907
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
565✔
3908
          if (aColIndex >= 0) {
563!
3909
            taosArrayRemove(aColArray, aColIndex);
563✔
3910
          }
3911
        }
3912
      }
3913
      if (!setNoneCol) {
574✔
3914
        // done, goto return pColArray
3915
        break;
561✔
3916
      } else {
3917
        continue;
13✔
3918
      }
3919
    }
3920

3921
    // merge into pColArray
3922
    setNoneCol = false;
12✔
3923
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
37✔
3924
      if (iCol >= nLastCol) {
25!
3925
        break;
×
3926
      }
3927
      // high version's column value
3928
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
25!
3929
        continue;
×
3930
      }
3931

3932
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
25✔
3933
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
25!
3934
        continue;
×
3935
      }
3936
      SColVal *tColVal = &lastColVal->colVal;
25✔
3937
      if (COL_VAL_IS_VALUE(tColVal)) continue;
25✔
3938

3939
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
19✔
3940
      if (COL_VAL_IS_VALUE(pColVal)) {
19✔
3941
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
16✔
3942
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
16!
3943

3944
        tsdbCacheFreeSLastColItem(lastColVal);
16✔
3945
        taosArraySet(pColArray, iCol, &lastCol);
16✔
3946
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
16✔
3947
        if (aColIndex >= 0) {
16!
3948
          taosArrayRemove(aColArray, aColIndex);
16✔
3949
        }
3950
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
3!
3951
        noneCol = iCol;
3✔
3952
        setNoneCol = true;
3✔
3953
      }
3954
    }
3955
  } while (setNoneCol);
25✔
3956

3957
  if (!hasRow) {
595✔
3958
    if (ignoreEarlierTs) {
20!
3959
      taosArrayDestroy(pColArray);
×
3960
      pColArray = NULL;
×
3961
    } else {
3962
      taosArrayClear(pColArray);
20✔
3963
    }
3964
  }
3965
  *ppLastArray = pColArray;
595✔
3966

3967
  nextRowIterClose(&iter);
595✔
3968
  taosArrayDestroy(aColArray);
597✔
3969

3970
  TAOS_RETURN(code);
598✔
3971

3972
_err:
×
3973
  nextRowIterClose(&iter);
×
3974
  // taosMemoryFreeClear(pTSchema);
3975
  *ppLastArray = NULL;
×
3976
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
3977
  taosArrayDestroy(aColArray);
×
3978

3979
  if (code) {
×
3980
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3981
              tstrerror(code));
3982
  }
3983

3984
  TAOS_RETURN(code);
×
3985
}
3986

3987
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
453✔
3988
                               int nCols, int16_t *slotIds) {
3989
  int32_t   code = 0, lino = 0;
453✔
3990
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
453✔
3991
  int16_t   nLastCol = nCols;
453✔
3992
  int16_t   noneCol = 0;
453✔
3993
  bool      setNoneCol = false;
453✔
3994
  bool      hasRow = false;
453✔
3995
  bool      ignoreEarlierTs = false;
453✔
3996
  SArray   *pColArray = NULL;
453✔
3997
  SColVal  *pColVal = &(SColVal){0};
453✔
3998

3999
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
453!
4000

4001
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
456✔
4002
  if (NULL == aColArray) {
456✔
4003
    taosArrayDestroy(pColArray);
1✔
4004

4005
    TAOS_RETURN(terrno);
×
4006
  }
4007

4008
  for (int i = 0; i < nCols; ++i) {
1,414✔
4009
    if (!taosArrayPush(aColArray, &aCols[i])) {
1,918!
4010
      taosArrayDestroy(pColArray);
×
4011

4012
      TAOS_RETURN(terrno);
×
4013
    }
4014
  }
4015

4016
  // inverse iterator
4017
  CacheNextRowIter iter = {0};
455✔
4018
  code =
4019
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
455✔
4020
  TAOS_CHECK_GOTO(code, &lino, _err);
456!
4021

4022
  do {
4023
    TSDBROW *pRow = NULL;
456✔
4024
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
456✔
4025

4026
    if (!pRow) {
455✔
4027
      break;
36✔
4028
    }
4029

4030
    hasRow = true;
419✔
4031

4032
    int32_t sversion = TSDBROW_SVERSION(pRow);
419✔
4033
    if (sversion != -1) {
419✔
4034
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
18!
4035

4036
      pTSchema = pr->pCurrSchema;
18✔
4037
    }
4038
    // int16_t nCol = pTSchema->numOfCols;
4039

4040
    STsdbRowKey rowKey = {0};
419✔
4041
    tsdbRowGetKey(pRow, &rowKey);
419✔
4042

4043
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
1,303✔
4044
      if (iCol >= nLastCol) {
883!
4045
        break;
×
4046
      }
4047
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
883✔
4048
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
883!
4049
        continue;
420✔
4050
      }
4051
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
883!
4052
        continue;
×
4053
      }
4054
      if (slotIds[iCol] == 0) {
883✔
4055
        STColumn *pTColumn = &pTSchema->columns[0];
420✔
4056
        SValue    val = {.type = pTColumn->type};
420✔
4057
        VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
420✔
4058
        *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
420✔
4059

4060
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
420✔
4061
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
420!
4062

4063
        taosArraySet(pColArray, 0, &colTmp);
420✔
4064
        continue;
420✔
4065
      }
4066
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
463✔
4067

4068
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
463✔
4069
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
463!
4070

4071
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
463✔
4072
      if (aColIndex >= 0) {
463!
4073
        taosArrayRemove(aColArray, aColIndex);
463✔
4074
      }
4075
    }
4076

4077
    break;
420✔
4078
  } while (1);
4079

4080
  if (!hasRow) {
456✔
4081
    if (ignoreEarlierTs) {
35!
4082
      taosArrayDestroy(pColArray);
×
4083
      pColArray = NULL;
×
4084
    } else {
4085
      taosArrayClear(pColArray);
35✔
4086
    }
4087
  }
4088
  *ppLastArray = pColArray;
455✔
4089

4090
  nextRowIterClose(&iter);
455✔
4091
  taosArrayDestroy(aColArray);
455✔
4092

4093
  TAOS_RETURN(code);
456✔
4094

4095
_err:
×
4096
  nextRowIterClose(&iter);
×
4097

4098
  *ppLastArray = NULL;
×
4099
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
4100
  taosArrayDestroy(aColArray);
×
4101

4102
  if (code) {
×
4103
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4104
              tstrerror(code));
4105
  }
4106

4107
  TAOS_RETURN(code);
×
4108
}
4109

4110
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
×
4111

4112
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
374✔
4113
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
374✔
4114
}
374✔
4115

4116
#ifdef BUILD_NO_CALL
4117
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
4118
#endif
4119

4120
size_t tsdbCacheGetUsage(SVnode *pVnode) {
732,280✔
4121
  size_t usage = 0;
732,280✔
4122
  if (pVnode->pTsdb != NULL) {
732,280!
4123
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
732,280✔
4124
  }
4125

4126
  return usage;
732,280✔
4127
}
4128

4129
int32_t tsdbCacheGetElems(SVnode *pVnode) {
732,280✔
4130
  int32_t elems = 0;
732,280✔
4131
  if (pVnode->pTsdb != NULL) {
732,280!
4132
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
732,280✔
4133
  }
4134

4135
  return elems;
732,280✔
4136
}
4137

4138
#ifdef USE_S3
4139
// block cache
4140
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
×
4141
  struct {
4142
    int32_t fid;
4143
    int64_t commitID;
4144
    int64_t blkno;
4145
  } bKey = {0};
×
4146

4147
  bKey.fid = fid;
×
4148
  bKey.commitID = commitID;
×
4149
  bKey.blkno = blkno;
×
4150

4151
  *len = sizeof(bKey);
×
4152
  memcpy(key, &bKey, *len);
×
4153
}
×
4154

4155
static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
×
4156
  int32_t code = 0;
×
4157

4158
  int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
×
4159

4160
  TAOS_CHECK_RETURN(tcsGetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock));
×
4161

4162
  tsdbTrace("block:%p load from s3", *ppBlock);
×
4163

4164
_exit:
×
4165
  return code;
×
4166
}
4167

4168
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
×
4169
  (void)ud;
4170
  uint8_t *pBlock = (uint8_t *)value;
×
4171

4172
  taosMemoryFree(pBlock);
×
4173
}
×
4174

4175
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
4176
  int32_t code = 0;
×
4177
  char    key[128] = {0};
×
4178
  int     keyLen = 0;
×
4179

4180
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
4181
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
4182
  if (!h) {
×
4183
    STsdb *pTsdb = pFD->pTsdb;
×
4184
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
4185

4186
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
4187
    if (!h) {
×
4188
      uint8_t *pBlock = NULL;
×
4189
      code = tsdbCacheLoadBlockS3(pFD, &pBlock);
×
4190
      //  if table's empty or error, return code of -1
4191
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
4192
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4193

4194
        *handle = NULL;
×
4195
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
4196
          code = TSDB_CODE_OUT_OF_MEMORY;
×
4197
        }
4198

4199
        TAOS_RETURN(code);
×
4200
      }
4201

4202
      size_t              charge = tsS3BlockSize * pFD->szPage;
×
4203
      _taos_lru_deleter_t deleter = deleteBCache;
×
4204
      LRUStatus           status =
4205
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
4206
      if (status != TAOS_LRU_STATUS_OK) {
4207
        // code = -1;
4208
      }
4209
    }
4210

4211
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4212
  }
4213

4214
  *handle = h;
×
4215

4216
  TAOS_RETURN(code);
×
4217
}
4218

4219
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
×
4220
  int32_t code = 0;
×
4221
  char    key[128] = {0};
×
4222
  int     keyLen = 0;
×
4223

4224
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
4225
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
×
4226

4227
  return code;
×
4228
}
4229

4230
void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
×
4231
  char       key[128] = {0};
×
4232
  int        keyLen = 0;
×
4233
  LRUHandle *handle = NULL;
×
4234

4235
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
4236
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
×
4237
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
×
4238
  if (!handle) {
×
4239
    size_t              charge = pFD->szPage;
×
4240
    _taos_lru_deleter_t deleter = deleteBCache;
×
4241
    uint8_t            *pPg = taosMemoryMalloc(charge);
×
4242
    if (!pPg) {
×
4243
      (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4244

4245
      return;  // ignore error with s3 cache and leave error untouched
×
4246
    }
4247
    memcpy(pPg, pPage, charge);
×
4248

4249
    LRUStatus status =
4250
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
×
4251
    if (status != TAOS_LRU_STATUS_OK) {
4252
      // ignore cache updating if not ok
4253
      // code = TSDB_CODE_OUT_OF_MEMORY;
4254
    }
4255
  }
4256
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4257

4258
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
4259
}
4260
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc