• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.63
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tcs.h"
17
#include "tsdb.h"
18
#include "tsdbDataFileRW.h"
19
#include "tsdbIter.h"
20
#include "tsdbReadUtil.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
54,551✔
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
54,551✔
27
    tsdbTrace(" release lru cache failed");
20,386✔
28
  }
29
}
54,607✔
30

31
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
11,907✔
32
  int32_t code = 0, lino = 0;
11,907✔
33
#ifdef USE_S3
34
  int32_t    szPage = pTsdb->pVnode->config.tsdbPageSize;
11,907✔
35
  int64_t    szBlock = tsS3BlockSize <= 1024 ? 1024 : tsS3BlockSize;
11,907✔
36
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szBlock * szPage, 0, .5);
11,907✔
37
  if (pCache == NULL) {
11,941!
38
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
39
  }
40

41
  taosLRUCacheSetStrictCapacity(pCache, false);
11,941✔
42

43
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
11,941✔
44

45
  pTsdb->bCache = pCache;
11,939✔
46

47
_err:
11,939✔
48
  if (code) {
11,939!
49
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
50
              tstrerror(code));
51
  }
52
#endif
53
  TAOS_RETURN(code);
11,939✔
54
}
55

56
static void tsdbCloseBCache(STsdb *pTsdb) {
11,939✔
57
#ifdef USE_S3
58
  SLRUCache *pCache = pTsdb->bCache;
11,939✔
59
  if (pCache) {
11,939!
60
    int32_t elems = taosLRUCacheGetElems(pCache);
11,940✔
61
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
11,940✔
62
    taosLRUCacheEraseUnrefEntries(pCache);
11,940✔
63
    elems = taosLRUCacheGetElems(pCache);
11,941✔
64
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
11,941✔
65

66
    taosLRUCacheCleanup(pCache);
11,941✔
67

68
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
11,941✔
69
  }
70
#endif
71
}
11,939✔
72

73
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
11,932✔
74
  int32_t code = 0, lino = 0;
11,932✔
75
#ifdef USE_S3
76
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
11,932✔
77

78
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3PageCacheSize * szPage, 0, .5);
11,932✔
79
  if (pCache == NULL) {
11,941!
80
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
81
  }
82

83
  taosLRUCacheSetStrictCapacity(pCache, false);
11,941✔
84

85
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
11,941✔
86

87
  pTsdb->pgCache = pCache;
11,941✔
88

89
_err:
11,941✔
90
  if (code) {
11,941!
91
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
92
  }
93
#endif
94
  TAOS_RETURN(code);
11,941✔
95
}
96

97
static void tsdbClosePgCache(STsdb *pTsdb) {
11,941✔
98
#ifdef USE_S3
99
  SLRUCache *pCache = pTsdb->pgCache;
11,941✔
100
  if (pCache) {
11,941!
101
    int32_t elems = taosLRUCacheGetElems(pCache);
11,941✔
102
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
11,941✔
103
    taosLRUCacheEraseUnrefEntries(pCache);
11,941✔
104
    elems = taosLRUCacheGetElems(pCache);
11,941✔
105
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
11,941✔
106

107
    taosLRUCacheCleanup(pCache);
11,941✔
108

109
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
11,941✔
110
  }
111
#endif
112
}
11,941✔
113

114
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
115

116
enum {
117
  LFLAG_LAST_ROW = 0,
118
  LFLAG_LAST = 1,
119
};
120

121
typedef struct {
122
  tb_uid_t uid;
123
  int16_t  cid;
124
  int8_t   lflag;
125
} SLastKey;
126

127
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
128
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
129

130
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
11,899✔
131
  SVnode *pVnode = pTsdb->pVnode;
11,899✔
132
  vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
11,899✔
133

134
  int32_t offset = strlen(path);
11,917✔
135
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP);
11,917✔
136
}
11,917✔
137

138
static const char *myCmpName(void *state) {
62,608✔
139
  (void)state;
140
  return "myCmp";
62,608✔
141
}
142

143
static void myCmpDestroy(void *state) { (void)state; }
11,937✔
144

145
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
1,029,179✔
146
  (void)state;
147
  (void)alen;
148
  (void)blen;
149
  SLastKey *lhs = (SLastKey *)a;
1,029,179✔
150
  SLastKey *rhs = (SLastKey *)b;
1,029,179✔
151

152
  if (lhs->uid < rhs->uid) {
1,029,179✔
153
    return -1;
381,375✔
154
  } else if (lhs->uid > rhs->uid) {
647,804✔
155
    return 1;
113,302✔
156
  }
157

158
  if (lhs->cid < rhs->cid) {
534,502✔
159
    return -1;
234,904✔
160
  } else if (lhs->cid > rhs->cid) {
299,598✔
161
    return 1;
142,597✔
162
  }
163

164
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
157,001✔
165
    return -1;
48,329✔
166
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
108,672✔
167
    return 1;
86,181✔
168
  }
169

170
  return 0;
22,491✔
171
}
172

173
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
11,938✔
174
  int32_t code = 0, lino = 0;
11,938✔
175
#ifdef USE_ROCKSDB
176
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
11,938✔
177
  if (NULL == cmp) {
11,940!
178
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
179
  }
180

181
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
11,940✔
182
  pTsdb->rCache.tableoptions = tableoptions;
11,941✔
183

184
  rocksdb_options_t *options = rocksdb_options_create();
11,941✔
185
  if (NULL == options) {
11,938!
186
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
187
  }
188

189
  rocksdb_options_set_create_if_missing(options, 1);
11,938✔
190
  rocksdb_options_set_comparator(options, cmp);
11,938✔
191
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
11,932✔
192
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
11,935✔
193
  // rocksdb_options_set_inplace_update_support(options, 1);
194
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
195

196
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
11,921✔
197
  if (NULL == writeoptions) {
11,938!
198
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
199
  }
200
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
11,938✔
201

202
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
11,936✔
203
  if (NULL == readoptions) {
11,936!
204
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
205
  }
206

207
  char *err = NULL;
11,936✔
208
  char  cachePath[TSDB_FILENAME_LEN] = {0};
11,936✔
209
  tsdbGetRocksPath(pTsdb, cachePath);
11,936✔
210

211
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
11,891✔
212
  if (NULL == db) {
11,935!
213
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
214
    rocksdb_free(err);
×
215

216
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
217
  }
218

219
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
11,935✔
220
  if (NULL == flushoptions) {
11,937!
221
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
222
  }
223

224
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
11,937✔
225

226
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
11,940!
227

228
  pTsdb->rCache.writebatch = writebatch;
11,927✔
229
  pTsdb->rCache.my_comparator = cmp;
11,927✔
230
  pTsdb->rCache.options = options;
11,927✔
231
  pTsdb->rCache.writeoptions = writeoptions;
11,927✔
232
  pTsdb->rCache.readoptions = readoptions;
11,927✔
233
  pTsdb->rCache.flushoptions = flushoptions;
11,927✔
234
  pTsdb->rCache.db = db;
11,927✔
235
  pTsdb->rCache.sver = -1;
11,927✔
236
  pTsdb->rCache.suid = -1;
11,927✔
237
  pTsdb->rCache.uid = -1;
11,927✔
238
  pTsdb->rCache.pTSchema = NULL;
11,927✔
239
  pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
11,927✔
240
  if (!pTsdb->rCache.ctxArray) {
11,930!
241
    TAOS_CHECK_GOTO(terrno, &lino, _err7);
×
242
  }
243

244
  TAOS_RETURN(code);
11,930✔
245

246
_err7:
×
247
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
×
248
_err6:
×
249
  rocksdb_writebatch_destroy(writebatch);
×
250
_err5:
×
251
  rocksdb_close(pTsdb->rCache.db);
×
252
_err4:
×
253
  rocksdb_readoptions_destroy(readoptions);
×
254
_err3:
×
255
  rocksdb_writeoptions_destroy(writeoptions);
×
256
_err2:
×
257
  rocksdb_options_destroy(options);
×
258
  rocksdb_block_based_options_destroy(tableoptions);
×
259
_err:
×
260
  rocksdb_comparator_destroy(cmp);
×
261
#endif
262
  TAOS_RETURN(code);
×
263
}
264

265
static void tsdbCloseRocksCache(STsdb *pTsdb) {
11,941✔
266
#ifdef USE_ROCKSDB
267
  rocksdb_close(pTsdb->rCache.db);
11,941✔
268
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
11,938✔
269
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
11,937✔
270
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
11,940✔
271
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
11,941✔
272
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
11,936✔
273
  rocksdb_options_destroy(pTsdb->rCache.options);
11,935✔
274
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
11,941✔
275
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
11,939✔
276
  taosMemoryFree(pTsdb->rCache.pTSchema);
11,941!
277
  taosArrayDestroy(pTsdb->rCache.ctxArray);
11,938✔
278
#endif
279
}
11,939✔
280

281
static void rocksMayWrite(STsdb *pTsdb, bool force) {
194,899✔
282
#ifdef USE_ROCKSDB
283
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
194,899✔
284

285
  int count = rocksdb_writebatch_count(wb);
194,899✔
286
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
194,917!
287
    char *err = NULL;
1,010✔
288

289
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
1,010✔
290
    if (NULL != err) {
1,010!
291
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
292
                err);
293
      rocksdb_free(err);
×
294
    }
295

296
    rocksdb_writebatch_clear(wb);
1,010✔
297
  }
298
#endif
299
}
194,917✔
300

301
typedef struct {
302
  TSKEY  ts;
303
  int8_t dirty;
304
  struct {
305
    int16_t cid;
306
    int8_t  type;
307
    int8_t  flag;
308
    union {
309
      int64_t val;
310
      struct {
311
        uint32_t nData;
312
        uint8_t *pData;
313
      };
314
    } value;
315
  } colVal;
316
} SLastColV0;
317

318
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
690✔
319
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
690✔
320

321
  pLastCol->rowKey.ts = pLastColV0->ts;
690✔
322
  pLastCol->rowKey.numOfPKs = 0;
690✔
323
  pLastCol->dirty = pLastColV0->dirty;
690✔
324
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
690✔
325
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
690✔
326
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
690✔
327

328
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
690✔
329

330
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
690!
331
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
16✔
332
    pLastCol->colVal.value.pData = NULL;
16✔
333
    if (pLastCol->colVal.value.nData > 0) {
16✔
334
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
10✔
335
    }
336
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
16✔
337
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
674✔
338
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
180✔
339
    pLastCol->colVal.value.pData = (uint8_t*)(&pLastColV0[1]);
180✔
340
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
180✔
341
  } else {
342
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
494✔
343
    return sizeof(SLastColV0);
494✔
344
  }
345
}
346

347
static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
690✔
348
  if (!value) {
690!
UNCOV
349
    return TSDB_CODE_INVALID_PARA;
×
350
  }
351

352
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
690!
353
  if (NULL == pLastCol) {
690!
UNCOV
354
    return terrno;
×
355
  }
356

357
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
690✔
358
  if (offset == size) {
690!
359
    // version 0
360
    *ppLastCol = pLastCol;
×
361

362
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
363
  } else if (offset > size) {
690!
UNCOV
364
    taosMemoryFreeClear(pLastCol);
×
365

UNCOV
366
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
367
  }
368

369
  // version
370
  int8_t version = *(int8_t *)(value + offset);
690✔
371
  offset += sizeof(int8_t);
690✔
372

373
  // numOfPKs
374
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
690✔
375
  offset += sizeof(uint8_t);
690✔
376

377
  // pks
378
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
690!
379
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
×
380
    offset += sizeof(SValue);
×
381

382
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
UNCOV
383
      pLastCol->rowKey.pks[i].pData = NULL;
×
UNCOV
384
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
UNCOV
385
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
UNCOV
386
        offset += pLastCol->rowKey.pks[i].nData;
×
387
      }
388
    }
389
  }
390

391
  if (version >= LAST_COL_VERSION_2) {
690!
392
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
690✔
393
  }
394

395
  if (offset > size) {
690!
UNCOV
396
    taosMemoryFreeClear(pLastCol);
×
397

UNCOV
398
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
399
  }
400

401
  *ppLastCol = pLastCol;
690✔
402

403
  TAOS_RETURN(TSDB_CODE_SUCCESS);
690✔
404
}
405

406
/*
407
typedef struct {
408
  SLastColV0 lastColV0;
409
  char       colData[];
410
  int8_t     version;
411
  uint8_t    numOfPKs;
412
  SValue     pks[0];
413
  char       pk0Data[];
414
  SValue     pks[1];
415
  char       pk1Data[];
416
  ...
417
} SLastColDisk;
418
*/
419
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
64,316✔
420
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
64,316✔
421

422
  pLastColV0->ts = pLastCol->rowKey.ts;
64,316✔
423
  pLastColV0->dirty = pLastCol->dirty;
64,316✔
424
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
64,316✔
425
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
64,316✔
426
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
64,316✔
427
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
64,316!
428
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
6,853✔
429
    if (pLastCol->colVal.value.nData > 0) {
6,853✔
430
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
3,063✔
431
    }
432
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
6,853✔
433
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
57,463✔
434
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
2,736✔
435
    if (pLastCol->colVal.value.nData > 0) {
2,736✔
436
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
1,493✔
437
    }
438
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
2,736✔
439
  } else {
440
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
54,727✔
441
    return sizeof(SLastColV0);
54,727✔
442
  }
443

444
  return 0;
445
}
446

447
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
64,319✔
448
  *size = sizeof(SLastColV0);
64,319✔
449
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
64,319!
450
    *size += pLastCol->colVal.value.nData;
6,857✔
451
  }
452
  if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
64,319✔
453
    *size += DECIMAL128_BYTES;
2,736✔
454
  }
455
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
64,319✔
456

457
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
64,339✔
458
    *size += sizeof(SValue);
20✔
459
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
20!
460
      *size += pLastCol->rowKey.pks[i].nData;
4✔
461
    }
462
  }
463

464
  *value = taosMemoryMalloc(*size);
64,319!
465
  if (NULL == *value) {
64,316!
UNCOV
466
    TAOS_RETURN(terrno);
×
467
  }
468

469
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
64,316✔
470

471
  // version
472
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
64,317✔
473
  offset++;
64,317✔
474

475
  // numOfPKs
476
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
64,317✔
477
  offset++;
64,317✔
478

479
  // pks
480
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
64,337✔
481
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
20✔
482
    offset += sizeof(SValue);
20✔
483
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
20!
484
      if (pLastCol->rowKey.pks[i].nData > 0) {
4!
485
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
4✔
486
      }
487
      offset += pLastCol->rowKey.pks[i].nData;
4✔
488
    }
489
  }
490

491
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
64,317✔
492

493
  TAOS_RETURN(TSDB_CODE_SUCCESS);
64,317✔
494
}
495

496
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
497

498
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
317,890✔
499
  SLastCol *pLastCol = (SLastCol *)value;
317,890✔
500

501
  if (pLastCol->dirty) {
317,890✔
502
    STsdb *pTsdb = (STsdb *)ud;
50,584✔
503

504
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
50,584✔
505
    if (code) {
50,617!
UNCOV
506
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
507
      return code;
×
508
    }
509

510
    pLastCol->dirty = 0;
50,617✔
511

512
    rocksMayWrite(pTsdb, false);
50,617✔
513
  }
514

515
  return 0;
317,935✔
516
}
517

518
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
909,295✔
519
  bool deleted = false;
909,295✔
520
  while (*iSkyline > 0) {
909,295✔
521
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
407✔
522
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
407✔
523

524
    if (key->ts > pItemBack->ts) {
407✔
525
      return false;
2✔
526
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
405!
527
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
393!
528
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
529
        return true;
393✔
530
      } else {
UNCOV
531
        if (*iSkyline > 1) {
×
UNCOV
532
          --*iSkyline;
×
533
        } else {
UNCOV
534
          return false;
×
535
        }
536
      }
537
    } else {
538
      if (*iSkyline > 1) {
12!
UNCOV
539
        --*iSkyline;
×
540
      } else {
541
        return false;
12✔
542
      }
543
    }
544
  }
545

546
  return deleted;
908,888✔
547
}
548

549
// Get next non-deleted row from imem
550
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
374,658✔
551
  int32_t code = 0;
374,658✔
552

553
  if (tsdbTbDataIterNext(pTbIter)) {
374,658✔
554
    TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
373,949✔
555
    TSDBKEY  rowKey = TSDBROW_KEY(pMemRow);
373,949✔
556
    bool     deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
373,949✔
557
    if (!deleted) {
373,710✔
558
      return pMemRow;
373,529✔
559
    }
560
  }
561

562
  return NULL;
600✔
563
}
564

565
// Get first non-deleted row from imem
566
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
2,829✔
567
                                    int64_t *piSkyline) {
568
  int32_t code = 0;
2,829✔
569

570
  tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
2,829✔
571
  TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
2,829✔
572
  if (pMemRow) {
2,829✔
573
    // if non deleted, return the found row.
574
    TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
2,551✔
575
    bool    deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
2,551✔
576
    if (!deleted) {
2,551✔
577
      return pMemRow;
2,345✔
578
    }
579
  } else {
580
    return NULL;
278✔
581
  }
582

583
  // continue to find the non-deleted first row from imem, using get next row
584
  return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
206✔
585
}
586

587
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
2,136✔
588
  SRocksCache *pRCache = &pTsdb->rCache;
2,136✔
589
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
2,136✔
590

591
  if (suid > 0 && suid == pRCache->suid) {
50!
UNCOV
592
    pRCache->sver = -1;
×
UNCOV
593
    pRCache->suid = -1;
×
594
  }
595
  if (suid == 0 && uid == pRCache->uid) {
50!
596
    pRCache->sver = -1;
5✔
597
    pRCache->uid = -1;
5✔
598
  }
599
}
600

601
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
373,945✔
602
  SRocksCache *pRCache = &pTsdb->rCache;
373,945✔
603
  if (pRCache->pTSchema && sver == pRCache->sver) {
373,945✔
604
    if (suid > 0 && suid == pRCache->suid) {
373,668✔
605
      return 0;
257,047✔
606
    }
607
    if (suid == 0 && uid == pRCache->uid) {
116,621✔
608
      return 0;
116,309✔
609
    }
610
  }
611

612
  pRCache->suid = suid;
589✔
613
  pRCache->uid = uid;
589✔
614
  pRCache->sver = sver;
589✔
615
  tDestroyTSchema(pRCache->pTSchema);
589!
616
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema);
589✔
617
}
618

619
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
620

621
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
2,829✔
622
  int32_t     code = 0;
2,829✔
623
  int32_t     lino = 0;
2,829✔
624
  STsdb      *pTsdb = imem->pTsdb;
2,829✔
625
  SArray     *pMemDelData = NULL;
2,829✔
626
  SArray     *pSkyline = NULL;
2,829✔
627
  int64_t     iSkyline = 0;
2,829✔
628
  STbDataIter tbIter = {0};
2,829✔
629
  TSDBROW    *pMemRow = NULL;
2,829✔
630
  STSchema   *pTSchema = NULL;
2,829✔
631
  SSHashObj  *iColHash = NULL;
2,829✔
632
  int32_t     sver;
633
  int32_t     nCol;
634
  SArray     *ctxArray = pTsdb->rCache.ctxArray;
2,829✔
635
  STsdbRowKey tsdbRowKey = {0};
2,829✔
636
  STSDBRowIter iter = {0};
2,829✔
637

638
  STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
2,829✔
639

640
  // load imem tomb data and build skyline
641
  TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
2,829!
642

643
  // tsdbBuildDeleteSkyline
644
  size_t delSize = TARRAY_SIZE(pMemDelData);
2,829✔
645
  if (delSize > 0) {
2,829✔
646
    pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
485✔
647
    if (!pSkyline) {
485!
UNCOV
648
      TAOS_CHECK_EXIT(terrno);
×
649
    }
650

651
    TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline));
485!
652
    iSkyline = taosArrayGetSize(pSkyline) - 1;
485✔
653
  }
654

655
  pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
2,829✔
656
  if (!pMemRow) {
2,829✔
657
    goto _exit;
480✔
658
  }
659

660
  // iter first row to last_row/last col values to ctxArray, and mark last null col ids
661
  sver = TSDBROW_SVERSION(pMemRow);
2,349✔
662
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
2,349!
663
  pTSchema = pTsdb->rCache.pTSchema;
2,349✔
664
  nCol = pTSchema->numOfCols;
2,349✔
665

666
  tsdbRowGetKey(pMemRow, &tsdbRowKey);
2,349✔
667

668
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
2,349!
669

670
  int32_t iCol = 0;
2,349✔
671
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
22,609!
672
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
20,260✔
673
    if (!taosArrayPush(ctxArray, &updateCtx)) {
20,262!
UNCOV
674
      TAOS_CHECK_EXIT(terrno);
×
675
    }
676

677
    if (COL_VAL_IS_VALUE(pColVal)) {
20,262✔
678
      updateCtx.lflag = LFLAG_LAST;
16,557✔
679
      if (!taosArrayPush(ctxArray, &updateCtx)) {
16,555!
UNCOV
680
        TAOS_CHECK_EXIT(terrno);
×
681
      }
682
    } else {
683
      if (!iColHash) {
3,705✔
684
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
421✔
685
        if (iColHash == NULL) {
421!
UNCOV
686
          TAOS_CHECK_EXIT(terrno);
×
687
        }
688
      }
689

690
      if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
3,705!
UNCOV
691
        TAOS_CHECK_EXIT(terrno);
×
692
      }
693
    }
694
  }
695
  tsdbRowClose(&iter);
2,343✔
696

697
  // continue to get next row to fill null last col values
698
  pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
2,349✔
699
  while (pMemRow) {
373,947✔
700
    if (tSimpleHashGetSize(iColHash) == 0) {
373,523✔
701
      break;
1,924✔
702
    }
703

704
    sver = TSDBROW_SVERSION(pMemRow);
371,598!
705
    TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
371,598!
706
    pTSchema = pTsdb->rCache.pTSchema;
371,462✔
707

708
    STsdbRowKey tsdbRowKey = {0};
371,462✔
709
    tsdbRowGetKey(pMemRow, &tsdbRowKey);
371,462✔
710

711
    TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
371,361!
712

713
    int32_t iCol = 0;
371,933✔
714
    for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
6,859,942!
715
      if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
6,490,403✔
716
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
174✔
717
        if (!taosArrayPush(ctxArray, &updateCtx)) {
174!
UNCOV
718
          TAOS_CHECK_EXIT(terrno);
×
719
        }
720

721
        TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
174!
722
      }
723
    }
724
    tsdbRowClose(&iter);
367,729✔
725

726
    pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
372,103✔
727
  }
728

729
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
2,348!
730

731
_exit:
2,349✔
732
  if (code) {
2,829!
UNCOV
733
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
734

UNCOV
735
    tsdbRowClose(&iter);
×
736
  }
737

738
  taosArrayClear(ctxArray);
2,829✔
739
  // destroy any allocated resource
740
  tSimpleHashCleanup(iColHash);
2,829✔
741
  if (pMemDelData) {
2,829!
742
    taosArrayDestroy(pMemDelData);
2,829✔
743
  }
744
  if (pSkyline) {
2,829✔
745
    taosArrayDestroy(pSkyline);
485✔
746
  }
747

748
  TAOS_RETURN(code);
2,829✔
749
}
750

751
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
3,665✔
752
  if (!pTsdb) return 0;
3,665!
753
  if (!pTsdb->imem) return 0;
3,665✔
754

755
  int32_t    code = 0;
662✔
756
  int32_t    lino = 0;
662✔
757
  SMemTable *imem = pTsdb->imem;
662✔
758
  int32_t    nTbData = imem->nTbData;
662✔
759
  int64_t    nRow = imem->nRow;
662✔
760
  int64_t    nDel = imem->nDel;
662✔
761

762
  if (nRow == 0 || nTbData == 0) return 0;
662!
763

764
  TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
375!
765

766
_exit:
375✔
767
  if (code) {
375!
UNCOV
768
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
769
  } else {
770
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
375!
771
  }
772

773
  TAOS_RETURN(code);
375✔
774
}
775

776
int32_t tsdbCacheCommit(STsdb *pTsdb) {
3,665✔
777
  int32_t code = 0;
3,665✔
778

779
  // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
780
  // flush dirty data of lru into rocks
781
  // 4, and update when writing if !updateCacheBatch
782
  // 5, merge cache & mem if updateCacheBatch
783

784
  if (tsUpdateCacheBatch) {
3,665!
785
    code = tsdbCacheUpdateFromIMem(pTsdb);
3,665✔
786
    if (code) {
3,665!
UNCOV
787
      tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
788

UNCOV
789
      TAOS_RETURN(code);
×
790
    }
791
  }
792

793
  char                 *err = NULL;
3,665✔
794
  SLRUCache            *pCache = pTsdb->lruCache;
3,665✔
795
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
796

797
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
3,665✔
798

799
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
3,665✔
800

801
#ifdef USE_ROCKSDB
802
  rocksMayWrite(pTsdb, true);
3,665✔
803
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
3,665✔
804
#endif
805
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
3,665✔
806
#ifdef USE_ROCKSDB
807
  if (NULL != err) {
3,665!
UNCOV
808
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
UNCOV
809
    rocksdb_free(err);
×
810
    code = TSDB_CODE_FAILED;
×
811
  }
812
#endif
813
  TAOS_RETURN(code);
3,665✔
814
}
815

816
static int32_t reallocVarDataVal(SValue *pValue) {
10,567✔
817
  if (IS_VAR_DATA_TYPE(pValue->type)) {
10,567!
818
    uint8_t *pVal = pValue->pData;
10,568✔
819
    uint32_t nData = pValue->nData;
10,568✔
820
    if (nData > 0) {
10,568✔
821
      uint8_t *p = taosMemoryMalloc(nData);
4,663!
822
      if (!p) {
4,663!
UNCOV
823
        TAOS_RETURN(terrno);
×
824
      }
825
      pValue->pData = p;
4,663✔
826
      (void)memcpy(pValue->pData, pVal, nData);
4,663✔
827
    } else {
828
      pValue->pData = NULL;
5,905✔
829
    }
830
  }
831

832
  TAOS_RETURN(TSDB_CODE_SUCCESS);
10,567✔
833
}
834

835
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
10,566✔
836

837
// realloc pk data and col data.
838
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
102,571✔
839
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
102,571✔
840
  size_t  charge = sizeof(SLastCol);
102,571✔
841

842
  int8_t i = 0;
102,571✔
843
  for (; i < pCol->rowKey.numOfPKs; i++) {
102,667✔
844
    SValue *pValue = &pCol->rowKey.pks[i];
96✔
845
    if (IS_VAR_DATA_TYPE(pValue->type)) {
96!
846
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
4!
847
      charge += pValue->nData;
4✔
848
    }
849
  }
850

851
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
102,571✔
852
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
10,580!
853
    charge += pCol->colVal.value.nData;
10,564✔
854
  }
855

856
  if (pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
102,555✔
857
    if (pCol->colVal.value.nData > 0) {
3,312✔
858
      void *p = taosMemoryMalloc(pCol->colVal.value.nData);
1,592!
859
      if (!p) TAOS_CHECK_EXIT(terrno);
1,592!
860
      (void)memcpy(p, pCol->colVal.value.pData, pCol->colVal.value.nData);
1,592✔
861
      pCol->colVal.value.pData = p;
1,592✔
862
    }
863
    charge += pCol->colVal.value.nData;
3,312✔
864
  }
865

866
  if (pCharge) {
102,555✔
867
    *pCharge = charge;
73,671✔
868
  }
869

870
_exit:
28,884✔
871
  if (TSDB_CODE_SUCCESS != code) {
102,555!
UNCOV
872
    for (int8_t j = 0; j < i; j++) {
×
UNCOV
873
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
UNCOV
874
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
875
      }
876
    }
877

UNCOV
878
    (void)memset(pCol, 0, sizeof(SLastCol));
×
879
  }
880

881
  TAOS_RETURN(code);
102,555✔
882
}
883

884
void tsdbCacheFreeSLastColItem(void *pItem) {
31,968✔
885
  SLastCol *pCol = (SLastCol *)pItem;
31,968✔
886
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
32,044✔
887
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
76!
UNCOV
888
      taosMemoryFree(pCol->rowKey.pks[i].pData);
×
889
    }
890
  }
891

892
  if ((IS_VAR_DATA_TYPE(pCol->colVal.value.type) || pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) &&
31,968!
893
      pCol->colVal.value.pData) {
3,883✔
894
    taosMemoryFree(pCol->colVal.value.pData);
2,164!
895
  }
896
}
31,968✔
897

898
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
73,693✔
899
  SLastCol *pLastCol = (SLastCol *)value;
73,693✔
900

901
  if (pLastCol->dirty) {
73,693✔
902
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
3,050!
UNCOV
903
      STsdb *pTsdb = (STsdb *)ud;
×
UNCOV
904
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
905
    }
906
  }
907

908
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
73,710✔
909
    SValue *pValue = &pLastCol->rowKey.pks[i];
20✔
910
    if (IS_VAR_DATA_TYPE(pValue->type)) {
20!
911
      taosMemoryFree(pValue->pData);
4!
912
    }
913
  }
914

915
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) ||
73,690!
916
      pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL /* && pLastCol->colVal.value.nData > 0*/) {
66,357✔
917
    taosMemoryFree(pLastCol->colVal.value.pData);
10,473!
918
  }
919

920
  taosMemoryFree(value);
73,692✔
921
}
73,704✔
922

923
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
33,693✔
924
  SLastCol *pLastCol = (SLastCol *)value;
33,693✔
925
  pLastCol->dirty = 0;
33,693✔
926
}
33,693✔
927

928
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
929

930
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
26,217✔
931
  int32_t code = 0, lino = 0;
26,217✔
932

933
  SLRUCache            *pCache = pTsdb->lruCache;
26,217✔
934
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
935
  SRowKey               emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
26,217✔
936
  SLastCol              emptyCol = {
26,217✔
937
                   .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
938

939
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
26,217✔
940
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
26,217✔
941
  if (code) {
26,224!
UNCOV
942
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
943
  }
944

945
  TAOS_RETURN(code);
26,224✔
946
}
947

948
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
2,581✔
949
  int32_t code = 0;
2,581✔
950
  char   *err = NULL;
2,581✔
951

952
  SLRUCache            *pCache = pTsdb->lruCache;
2,581✔
953
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
954

955
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
2,581✔
956
#ifdef USE_ROCKSDB
957
  rocksMayWrite(pTsdb, true);
2,595✔
958
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
2,596✔
959
  if (NULL != err) {
2,596!
UNCOV
960
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
UNCOV
961
    rocksdb_free(err);
×
UNCOV
962
    code = TSDB_CODE_FAILED;
×
963
  }
964
#endif
965
  TAOS_RETURN(code);
2,596✔
966
}
967

968
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
175,237✔
969
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
970
#ifdef USE_ROCKSDB
971
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
175,237!
972
  if (!valuesList) return terrno;
175,240!
973
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
175,240!
974
  if (!valuesListSizes) {
175,215!
UNCOV
975
    taosMemoryFreeClear(valuesList);
×
UNCOV
976
    return terrno;
×
977
  }
978
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
175,215✔
979
  if (!errs) {
175,215!
UNCOV
980
    taosMemoryFreeClear(valuesList);
×
UNCOV
981
    taosMemoryFreeClear(valuesListSizes);
×
982
    return terrno;
×
983
  }
984
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
175,215✔
985
                    valuesListSizes, errs);
986
  for (size_t i = 0; i < numKeys; ++i) {
1,040,430✔
987
    rocksdb_free(errs[i]);
865,187✔
988
  }
989
  taosMemoryFreeClear(errs);
175,243✔
990

991
  *pppValuesList = valuesList;
175,237✔
992
  *ppValuesListSizes = valuesListSizes;
175,237✔
993
#endif
994
  TAOS_RETURN(TSDB_CODE_SUCCESS);
175,237✔
995
}
996

997
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
107,509✔
998
  int32_t code = 0;
107,509✔
999

1000
  // build keys & multi get from rocks
1001
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
107,509!
1002
  if (!keys_list) {
107,512!
UNCOV
1003
    return terrno;
×
1004
  }
1005
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
107,512!
1006
  if (!keys_list_sizes) {
107,501!
UNCOV
1007
    taosMemoryFree(keys_list);
×
UNCOV
1008
    return terrno;
×
1009
  }
1010
  const size_t klen = ROCKS_KEY_LEN;
107,501✔
1011

1012
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
107,501!
1013
  if (!keys) {
107,518!
UNCOV
1014
    taosMemoryFree(keys_list);
×
UNCOV
1015
    taosMemoryFree(keys_list_sizes);
×
UNCOV
1016
    return terrno;
×
1017
  }
1018
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
107,518✔
1019
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
107,518✔
1020

1021
  keys_list[0] = keys;
107,518✔
1022
  keys_list[1] = keys + sizeof(SLastKey);
107,518✔
1023
  keys_list_sizes[0] = klen;
107,518✔
1024
  keys_list_sizes[1] = klen;
107,518✔
1025

1026
  char  **values_list = NULL;
107,518✔
1027
  size_t *values_list_sizes = NULL;
107,518✔
1028

1029
  // was written by caller
1030
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
1031

1032
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
107,518!
1033
                                              &values_list_sizes),
1034
                  NULL, _exit);
1035
#ifdef USE_ROCKSDB
1036
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
107,473✔
1037
#endif
1038
  {
1039
#ifdef USE_ROCKSDB
1040
    SLastCol *pLastCol = NULL;
107,473✔
1041
    if (values_list[0] != NULL) {
107,473✔
1042
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
281✔
1043
      if (code != TSDB_CODE_SUCCESS) {
281!
UNCOV
1044
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1045
                  tstrerror(code));
UNCOV
1046
        goto _exit;
×
1047
      }
1048
      if (NULL != pLastCol) {
281!
1049
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
281✔
1050
      }
1051
      taosMemoryFreeClear(pLastCol);
281!
1052
    }
1053

1054
    pLastCol = NULL;
107,473✔
1055
    if (values_list[1] != NULL) {
107,473✔
1056
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
281✔
1057
      if (code != TSDB_CODE_SUCCESS) {
281!
UNCOV
1058
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1059
                  tstrerror(code));
UNCOV
1060
        goto _exit;
×
1061
      }
1062
      if (NULL != pLastCol) {
281!
1063
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
281✔
1064
      }
1065
      taosMemoryFreeClear(pLastCol);
281!
1066
    }
1067

1068
    rocksdb_free(values_list[0]);
107,473✔
1069
    rocksdb_free(values_list[1]);
107,519✔
1070
#endif
1071

1072
    for (int i = 0; i < 2; i++) {
322,412✔
1073
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
214,897✔
1074
      if (h) {
214,996✔
1075
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
562✔
1076
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
562✔
1077
      }
1078
    }
1079
  }
1080

1081
_exit:
107,515✔
1082
  taosMemoryFree(keys_list[0]);
107,515!
1083

1084
  taosMemoryFree(keys_list);
107,520!
1085
  taosMemoryFree(keys_list_sizes);
107,521!
1086
  taosMemoryFree(values_list);
107,524!
1087
  taosMemoryFree(values_list_sizes);
107,519!
1088

1089
  TAOS_RETURN(code);
107,522✔
1090
}
1091

1092
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
2,101✔
1093
  int32_t code = 0;
2,101✔
1094

1095
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
2,101✔
1096

1097
  if (suid < 0) {
2,103✔
1098
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
417✔
1099
      int16_t cid = pSchemaRow->pSchema[i].colId;
374✔
1100
      int8_t  col_type = pSchemaRow->pSchema[i].type;
374✔
1101

1102
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
374✔
1103
      if (code != TSDB_CODE_SUCCESS) {
374!
UNCOV
1104
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1105
                  tstrerror(code));
1106
      }
1107
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
374✔
1108
      if (code != TSDB_CODE_SUCCESS) {
374!
UNCOV
1109
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1110
                  tstrerror(code));
1111
      }
1112
    }
1113
  } else {
1114
    STSchema *pTSchema = NULL;
2,060✔
1115
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
2,060✔
1116
    if (code != TSDB_CODE_SUCCESS) {
2,060!
UNCOV
1117
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1118

UNCOV
1119
      TAOS_RETURN(code);
×
1120
    }
1121

1122
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
14,280✔
1123
      int16_t cid = pTSchema->columns[i].colId;
12,220✔
1124
      int8_t  col_type = pTSchema->columns[i].type;
12,220✔
1125

1126
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
12,220✔
1127
      if (code != TSDB_CODE_SUCCESS) {
12,220!
1128
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1129
                  tstrerror(code));
1130
      }
1131
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
12,220✔
1132
      if (code != TSDB_CODE_SUCCESS) {
12,220!
1133
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1134
                  tstrerror(code));
1135
      }
1136
    }
1137

1138
    taosMemoryFree(pTSchema);
2,060!
1139
  }
1140

1141
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
2,103✔
1142

1143
  TAOS_RETURN(code);
2,103✔
1144
}
1145

1146
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
10✔
1147
  int32_t code = 0;
10✔
1148

1149
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
10✔
1150

1151
  code = tsdbCacheCommitNoLock(pTsdb);
10✔
1152
  if (code != TSDB_CODE_SUCCESS) {
10!
UNCOV
1153
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1154
              tstrerror(code));
1155
  }
1156

1157
  if (pSchemaRow != NULL) {
10!
UNCOV
1158
    bool hasPrimayKey = false;
×
UNCOV
1159
    int  nCols = pSchemaRow->nCols;
×
UNCOV
1160
    if (nCols >= 2) {
×
UNCOV
1161
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
1162
    }
UNCOV
1163
    for (int i = 0; i < nCols; ++i) {
×
UNCOV
1164
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
UNCOV
1165
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1166

UNCOV
1167
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1168
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1169
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1170
                  tstrerror(code));
1171
      }
1172
    }
1173
  } else {
1174
    STSchema *pTSchema = NULL;
10✔
1175
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
10✔
1176
    if (code != TSDB_CODE_SUCCESS) {
10!
UNCOV
1177
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1178

UNCOV
1179
      TAOS_RETURN(code);
×
1180
    }
1181

1182
    bool hasPrimayKey = false;
10✔
1183
    int  nCols = pTSchema->numOfCols;
10✔
1184
    if (nCols >= 2) {
10!
1185
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
10✔
1186
    }
1187
    for (int i = 0; i < nCols; ++i) {
31✔
1188
      int16_t cid = pTSchema->columns[i].colId;
21✔
1189
      int8_t  col_type = pTSchema->columns[i].type;
21✔
1190

1191
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
21✔
1192
      if (code != TSDB_CODE_SUCCESS) {
21!
UNCOV
1193
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1194
                  tstrerror(code));
1195
      }
1196
    }
1197

1198
    taosMemoryFree(pTSchema);
10!
1199
  }
1200

1201
  rocksMayWrite(pTsdb, false);
10✔
1202

1203
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
10✔
1204

1205
  TAOS_RETURN(code);
10✔
1206
}
1207

1208
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
2,506✔
1209
  int32_t code = 0;
2,506✔
1210

1211
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
2,506✔
1212

1213
  code = tsdbCacheCommitNoLock(pTsdb);
2,518✔
1214
  if (code != TSDB_CODE_SUCCESS) {
2,514!
UNCOV
1215
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1216
              tstrerror(code));
1217
  }
1218

1219
  STSchema *pTSchema = NULL;
2,514✔
1220
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
2,514✔
1221
  if (code != TSDB_CODE_SUCCESS) {
2,511!
UNCOV
1222
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1223

UNCOV
1224
    TAOS_RETURN(code);
×
1225
  }
1226

1227
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
6,197✔
1228
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
3,676✔
1229

1230
    bool hasPrimayKey = false;
3,676✔
1231
    int  nCols = pTSchema->numOfCols;
3,676✔
1232
    if (nCols >= 2) {
3,676!
1233
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
3,676✔
1234
    }
1235

1236
    for (int i = 0; i < nCols; ++i) {
110,912✔
1237
      int16_t cid = pTSchema->columns[i].colId;
107,228✔
1238
      int8_t  col_type = pTSchema->columns[i].type;
107,228✔
1239

1240
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
107,228✔
1241
      if (code != TSDB_CODE_SUCCESS) {
107,236!
1242
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1243
                  tstrerror(code));
1244
      }
1245
    }
1246
  }
1247

1248
  taosMemoryFree(pTSchema);
2,521!
1249

1250
  rocksMayWrite(pTsdb, false);
2,518✔
1251

1252
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
2,518✔
1253

1254
  TAOS_RETURN(code);
2,518✔
1255
}
1256

UNCOV
1257
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
×
1258
  int32_t code = 0;
×
1259

1260
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1261

UNCOV
1262
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
UNCOV
1263
  if (code != TSDB_CODE_SUCCESS) {
×
1264
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1265
              tstrerror(code));
1266
  }
UNCOV
1267
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
UNCOV
1268
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1269
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1270
              tstrerror(code));
1271
  }
1272
  // rocksMayWrite(pTsdb, true, false, false);
UNCOV
1273
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1274

UNCOV
1275
  TAOS_RETURN(code);
×
1276
}
1277

UNCOV
1278
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
×
UNCOV
1279
  int32_t code = 0;
×
1280

UNCOV
1281
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1282

UNCOV
1283
  code = tsdbCacheCommitNoLock(pTsdb);
×
UNCOV
1284
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1285
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1286
              tstrerror(code));
1287
  }
1288

UNCOV
1289
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
UNCOV
1290
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1291
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1292
              tstrerror(code));
1293
  }
1294

UNCOV
1295
  rocksMayWrite(pTsdb, false);
×
1296

UNCOV
1297
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1298

UNCOV
1299
  TAOS_RETURN(code);
×
1300
}
1301

1302
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
136✔
1303
  int32_t code = 0;
136✔
1304

1305
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
136✔
1306

1307
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
656✔
1308
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
519✔
1309

1310
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
519✔
1311
    if (code != TSDB_CODE_SUCCESS) {
520!
UNCOV
1312
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1313
                tstrerror(code));
1314
    }
1315
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
520✔
1316
    if (code != TSDB_CODE_SUCCESS) {
520!
UNCOV
1317
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1318
                tstrerror(code));
1319
    }
1320
  }
1321

1322
  // rocksMayWrite(pTsdb, true, false, false);
1323
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
137✔
1324
  TAOS_RETURN(code);
136✔
1325
}
1326

1327
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
68✔
1328
  int32_t code = 0;
68✔
1329

1330
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
68✔
1331

1332
  code = tsdbCacheCommitNoLock(pTsdb);
68✔
1333
  if (code != TSDB_CODE_SUCCESS) {
68!
UNCOV
1334
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1335
              tstrerror(code));
1336
  }
1337

1338
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
328✔
1339
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
260✔
1340

1341
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
260✔
1342
    if (code != TSDB_CODE_SUCCESS) {
260!
UNCOV
1343
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1344
                tstrerror(code));
1345
    }
1346
  }
1347

1348
  rocksMayWrite(pTsdb, false);
68✔
1349

1350
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
68✔
1351

1352
  TAOS_RETURN(code);
68✔
1353
}
1354

1355
typedef struct {
1356
  int      idx;
1357
  SLastKey key;
1358
} SIdxKey;
1359

1360
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
×
1361
  // update rowkey
UNCOV
1362
  pLastCol->rowKey.ts = TSKEY_MIN;
×
UNCOV
1363
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
UNCOV
1364
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
UNCOV
1365
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
UNCOV
1366
      taosMemoryFreeClear(pPKValue->pData);
×
UNCOV
1367
      pPKValue->nData = 0;
×
1368
    } else {
UNCOV
1369
      valueClearDatum(pPKValue, pPKValue->type);
×
1370
    }
1371
  }
UNCOV
1372
  pLastCol->rowKey.numOfPKs = 0;
×
1373

1374
  // update colval
UNCOV
1375
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
×
UNCOV
1376
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
UNCOV
1377
    pLastCol->colVal.value.nData = 0;
×
1378
  } else {
UNCOV
1379
    valueClearDatum(&pLastCol->colVal.value, pLastCol->colVal.value.type);
×
1380
  }
1381

UNCOV
1382
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
×
UNCOV
1383
  pLastCol->dirty = 1;
×
UNCOV
1384
  pLastCol->cacheStatus = cacheStatus;
×
UNCOV
1385
}
×
1386

1387
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
64,318✔
1388
  int32_t code = 0;
64,318✔
1389
#ifdef USE_ROCKSDB
1390
  char   *rocks_value = NULL;
64,318✔
1391
  size_t  vlen = 0;
64,318✔
1392

1393
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
64,318✔
1394
  if (code) {
64,316!
UNCOV
1395
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
1396
    TAOS_RETURN(code);
×
1397
  }
1398

1399
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
64,316✔
1400
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
64,316✔
1401
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
64,360✔
1402
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
64,360✔
1403

1404
  taosMemoryFree(rocks_value);
64,358✔
1405
#endif
1406
  TAOS_RETURN(code);
64,354✔
1407
}
1408

1409
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
73,670✔
1410
  int32_t code = 0, lino = 0;
73,670✔
1411

1412
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
73,670!
1413
  if (!pLRULastCol) {
73,690!
UNCOV
1414
    return terrno;
×
1415
  }
1416

1417
  size_t charge = 0;
73,690✔
1418
  *pLRULastCol = *pLastCol;
73,690✔
1419
  pLRULastCol->dirty = dirty;
73,690✔
1420
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
73,690!
1421

1422
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
73,662✔
1423
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
1424
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
73,660!
UNCOV
1425
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
UNCOV
1426
    code = TSDB_CODE_FAILED;
×
UNCOV
1427
    pLRULastCol = NULL;
×
1428
  }
1429

1430
_exit:
73,660✔
1431
  if (TSDB_CODE_SUCCESS != code) {
73,660!
UNCOV
1432
    taosMemoryFree(pLRULastCol);
×
1433
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1434
  }
1435

1436
  TAOS_RETURN(code);
73,660✔
1437
}
1438

1439
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
2,349✔
1440
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
2,349!
UNCOV
1441
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1442
  }
1443

1444
  int32_t code = 0, lino = 0;
2,349✔
1445

1446
  int        num_keys = TARRAY_SIZE(updCtxArray);
2,349✔
1447
  SArray    *remainCols = NULL;
2,349✔
1448
  SLRUCache *pCache = pTsdb->lruCache;
2,349✔
1449

1450
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
2,349✔
1451
  for (int i = 0; i < num_keys; ++i) {
39,318✔
1452
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
36,965✔
1453
    int8_t          lflag = updCtx->lflag;
36,965✔
1454
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
36,965✔
1455
    SColVal        *pColVal = &updCtx->colVal;
36,965✔
1456

1457
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
36,965!
UNCOV
1458
      continue;
×
1459
    }
1460

1461
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
36,965✔
1462
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
36,965✔
1463
    if (h) {
36,960✔
1464
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
23,577✔
1465
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
23,564✔
1466
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
23,417✔
1467
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
23,410!
1468
          SLastCol newLastCol = {
23,410✔
1469
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1470
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
23,410✔
1471
        }
1472
      }
1473

1474
      tsdbLRUCacheRelease(pCache, h, false);
23,551✔
1475
      TAOS_CHECK_EXIT(code);
23,588!
1476
    } else {
1477
      if (!remainCols) {
13,383✔
1478
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
387✔
1479
        if (!remainCols) {
387!
1480
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1481
        }
1482
      }
1483
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
26,764!
UNCOV
1484
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1485
      }
1486
    }
1487
  }
1488

1489
  if (remainCols) {
2,353✔
1490
    num_keys = TARRAY_SIZE(remainCols);
387✔
1491
  }
1492
  if (remainCols && num_keys > 0) {
2,353!
1493
    char  **keys_list = NULL;
387✔
1494
    size_t *keys_list_sizes = NULL;
387✔
1495
    char  **values_list = NULL;
387✔
1496
    size_t *values_list_sizes = NULL;
387✔
1497
    char  **errs = NULL;
387✔
1498
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
387!
1499
    if (!keys_list) {
387!
UNCOV
1500
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
UNCOV
1501
      return terrno;
×
1502
    }
1503
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
387!
1504
    if (!keys_list_sizes) {
387!
UNCOV
1505
      taosMemoryFree(keys_list);
×
UNCOV
1506
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
UNCOV
1507
      return terrno;
×
1508
    }
1509
    for (int i = 0; i < num_keys; ++i) {
13,770✔
1510
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
13,383✔
1511

1512
      keys_list[i] = (char *)&idxKey->key;
13,383✔
1513
      keys_list_sizes[i] = ROCKS_KEY_LEN;
13,383✔
1514
    }
1515

1516
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
387✔
1517

1518
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
387✔
1519
                                       &values_list_sizes);
1520
    if (code) {
387!
UNCOV
1521
      taosMemoryFree(keys_list);
×
UNCOV
1522
      taosMemoryFree(keys_list_sizes);
×
UNCOV
1523
      goto _exit;
×
1524
    }
1525

1526
    // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
1527
    for (int i = 0; i < num_keys; ++i) {
13,770✔
1528
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
13,383✔
1529
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
13,383✔
1530
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
13,383✔
1531
      SColVal        *pColVal = &updCtx->colVal;
13,383✔
1532

1533
      SLastCol *pLastCol = NULL;
13,383✔
1534
      if (values_list[i] != NULL) {
13,383✔
1535
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
53✔
1536
        if (code != TSDB_CODE_SUCCESS) {
53!
UNCOV
1537
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1538
                    tstrerror(code));
1539
          goto _exit;
×
1540
        }
1541
      }
1542
      /*
1543
      if (code) {
1544
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1545
      }
1546
      */
1547
      SLastCol *pToFree = pLastCol;
13,383✔
1548

1549
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
13,383!
UNCOV
1550
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
1551
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1552
                    tstrerror(code));
1553
          taosMemoryFreeClear(pToFree);
×
1554
          break;
×
1555
        }
1556

1557
        // cache invalid => skip update
UNCOV
1558
        taosMemoryFreeClear(pToFree);
×
1559
        continue;
×
1560
      }
1561

1562
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
13,383!
UNCOV
1563
        taosMemoryFreeClear(pToFree);
×
UNCOV
1564
        continue;
×
1565
      }
1566

1567
      int32_t cmp_res = 1;
13,383✔
1568
      if (pLastCol) {
13,383✔
1569
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
53✔
1570
      }
1571

1572
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
13,383!
1573
        SLastCol lastColTmp = {
13,383✔
1574
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
1575
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
13,383!
UNCOV
1576
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1577
                    tstrerror(code));
UNCOV
1578
          taosMemoryFreeClear(pToFree);
×
UNCOV
1579
          break;
×
1580
        }
1581
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
13,383!
UNCOV
1582
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1583
                    tstrerror(code));
UNCOV
1584
          taosMemoryFreeClear(pToFree);
×
UNCOV
1585
          break;
×
1586
        }
1587
      }
1588

1589
      taosMemoryFreeClear(pToFree);
13,383!
1590
    }
1591

1592
    rocksMayWrite(pTsdb, false);
387✔
1593

1594
    taosMemoryFree(keys_list);
387!
1595
    taosMemoryFree(keys_list_sizes);
387!
1596
    if (values_list) {
387!
1597
#ifdef USE_ROCKSDB
1598
      for (int i = 0; i < num_keys; ++i) {
13,770✔
1599
        rocksdb_free(values_list[i]);
13,383✔
1600
      }
1601
#endif
1602
      taosMemoryFree(values_list);
387!
1603
    }
1604
    taosMemoryFree(values_list_sizes);
387!
1605
  }
1606

1607
_exit:
1,966✔
1608
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
2,353✔
1609
  taosArrayDestroy(remainCols);
2,349✔
1610

1611
  if (code) {
2,349!
UNCOV
1612
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1613
              tstrerror(code));
1614
  }
1615

1616
  TAOS_RETURN(code);
2,349✔
1617
}
1618

1619
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
×
1620
                                 SRow **aRow) {
1621
  int32_t code = 0, lino = 0;
×
1622

1623
  // 1. prepare last
UNCOV
1624
  TSDBROW      lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
×
1625
  STSchema    *pTSchema = NULL;
×
1626
  int32_t      sver = TSDBROW_SVERSION(&lRow);
×
1627
  SSHashObj   *iColHash = NULL;
×
1628
  STSDBRowIter iter = {0};
×
1629

UNCOV
1630
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
1631
  pTSchema = pTsdb->rCache.pTSchema;
×
1632

1633
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
×
1634
  int32_t nCol = pTSchema->numOfCols;
×
UNCOV
1635
  SArray *ctxArray = pTsdb->rCache.ctxArray;
×
1636

1637
  // 1. prepare by lrow
1638
  STsdbRowKey tsdbRowKey = {0};
×
1639
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1640

UNCOV
1641
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1642

UNCOV
1643
  int32_t iCol = 0;
×
UNCOV
1644
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
1645
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1646
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1647
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1648
    }
1649

1650
    if (COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
1651
      updateCtx.lflag = LFLAG_LAST;
×
1652
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1653
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1654
      }
1655
    } else {
1656
      if (!iColHash) {
×
1657
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
1658
        if (iColHash == NULL) {
×
1659
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1660
        }
1661
      }
1662

1663
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
×
1664
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1665
      }
1666
    }
1667
  }
1668

1669
  // 2. prepare by the other rows
UNCOV
1670
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
×
UNCOV
1671
    if (tSimpleHashGetSize(iColHash) == 0) {
×
UNCOV
1672
      break;
×
1673
    }
1674

UNCOV
1675
    tRow.pTSRow = aRow[iRow];
×
1676

UNCOV
1677
    STsdbRowKey tsdbRowKey = {0};
×
1678
    tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1679

1680
    void   *pIte = NULL;
×
UNCOV
1681
    int32_t iter = 0;
×
UNCOV
1682
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
×
1683
      int32_t iCol = ((int32_t *)pIte)[0];
×
1684
      SColVal colVal = COL_VAL_NONE(0, 0);
×
1685
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
×
1686

1687
      if (COL_VAL_IS_VALUE(&colVal)) {
×
UNCOV
1688
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
UNCOV
1689
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1690
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1691
        }
1692
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
1693
        if (code != TSDB_CODE_SUCCESS) {
×
1694
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1695
                    __LINE__, tstrerror(code));
1696
        }
1697
      }
1698
    }
1699
  }
1700

1701
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1702

1703
_exit:
×
UNCOV
1704
  if (code) {
×
UNCOV
1705
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1706
  }
1707

1708
  tsdbRowClose(&iter);
×
UNCOV
1709
  tSimpleHashCleanup(iColHash);
×
UNCOV
1710
  taosArrayClear(ctxArray);
×
1711

UNCOV
1712
  TAOS_RETURN(code);
×
1713
}
1714

UNCOV
1715
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
1716
  int32_t      code = 0, lino = 0;
×
1717
  STSDBRowIter iter = {0};
×
UNCOV
1718
  STSchema    *pTSchema = NULL;
×
UNCOV
1719
  SArray      *ctxArray = NULL;
×
1720

1721
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
UNCOV
1722
  int32_t sver = TSDBROW_SVERSION(&lRow);
×
1723

1724
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1725

1726
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
UNCOV
1727
  if (ctxArray == NULL) {
×
UNCOV
1728
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1729
  }
1730

1731
  // 1. prepare last
UNCOV
1732
  STsdbRowKey tsdbRowKey = {0};
×
1733
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1734

1735
  {
1736
    SValue tsVal = {.type = TSDB_DATA_TYPE_TIMESTAMP};
×
UNCOV
1737
    VALUE_SET_TRIVIAL_DATUM(&tsVal, lRow.pBlockData->aTSKEY[lRow.iRow]);
×
1738
    SLastUpdateCtx updateCtx = {
×
1739
        .lflag = LFLAG_LAST,
1740
        .tsdbRowKey = tsdbRowKey,
1741
        .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, tsVal)};
1742
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
1743
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1744
    }
1745
  }
1746

UNCOV
1747
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1748

1749
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1750
    SColData *pColData = &pBlockData->aColData[iColData];
×
1751
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
1752
      continue;
×
1753
    }
1754

UNCOV
1755
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
1756
      STsdbRowKey tsdbRowKey = {0};
×
UNCOV
1757
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1758

1759
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
1760
      if (colType == 2) {
×
1761
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
UNCOV
1762
        TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit);
×
1763

UNCOV
1764
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
UNCOV
1765
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
1766
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1767
        }
UNCOV
1768
        break;
×
1769
      }
1770
    }
1771
  }
1772

1773
  // 2. prepare last row
UNCOV
1774
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
UNCOV
1775
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
UNCOV
1776
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
UNCOV
1777
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
1778
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1779
    }
1780
  }
1781

UNCOV
1782
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1783

UNCOV
1784
_exit:
×
UNCOV
1785
  tsdbRowClose(&iter);
×
UNCOV
1786
  taosMemoryFreeClear(pTSchema);
×
1787
  taosArrayDestroy(ctxArray);
×
1788

UNCOV
1789
  TAOS_RETURN(code);
×
1790
}
1791

1792
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1793
                            int nCols, int16_t *slotIds);
1794

1795
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1796
                               int nCols, int16_t *slotIds);
1797

1798
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
117✔
1799
                                    SCacheRowsReader *pr, int8_t ltype) {
1800
  int32_t               code = 0, lino = 0;
117✔
1801
  // rocksdb_writebatch_t *wb = NULL;
1802
  SArray               *pTmpColArray = NULL;
117✔
1803
  bool                  extraTS = false;
117✔
1804

1805
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
117✔
1806
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
118✔
1807
    // ignore 'ts' loaded from cache and load it from tsdb
1808
    // SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
1809
    // tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
1810

1811
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
80✔
1812
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
80!
UNCOV
1813
      TAOS_RETURN(terrno);
×
1814
    }
1815

1816
    extraTS = true;
79✔
1817
  }
1818

1819
  int      num_keys = TARRAY_SIZE(remainCols);
117✔
1820
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
117!
1821

1822
  int16_t *lastColIds = NULL, *lastSlotIds = NULL, *lastrowColIds = NULL, *lastrowSlotIds = NULL;
119✔
1823
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
119!
1824
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
118!
1825
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
119!
1826
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
119!
1827
  SArray *lastTmpColArray = NULL, *lastTmpIndexArray = NULL, *lastrowTmpColArray = NULL, *lastrowTmpIndexArray = NULL;
119✔
1828

1829
  int lastIndex = 0;
119✔
1830
  int lastrowIndex = 0;
119✔
1831

1832
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
119!
UNCOV
1833
    TAOS_CHECK_EXIT(terrno);
×
1834
  }
1835

1836
  for (int i = 0; i < num_keys; ++i) {
466✔
1837
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
346✔
1838
    if (extraTS && !i) {
347✔
1839
      slotIds[i] = 0;
80✔
1840
    } else {
1841
      slotIds[i] = pr->pSlotIds[idxKey->idx];
267✔
1842
    }
1843

1844
    if (IS_LAST_KEY(idxKey->key)) {
347✔
1845
      if (NULL == lastTmpIndexArray) {
191✔
1846
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
72✔
1847
        if (!lastTmpIndexArray) {
72!
UNCOV
1848
          TAOS_CHECK_EXIT(terrno);
×
1849
        }
1850
      }
1851
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
191!
UNCOV
1852
        TAOS_CHECK_EXIT(terrno);
×
1853
      }
1854
      lastColIds[lastIndex] = idxKey->key.cid;
191✔
1855
      if (extraTS && !i) {
191✔
1856
        lastSlotIds[lastIndex] = 0;
66✔
1857
      } else {
1858
        lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
125✔
1859
      }
1860
      lastIndex++;
191✔
1861
    } else {
1862
      if (NULL == lastrowTmpIndexArray) {
156✔
1863
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
47✔
1864
        if (!lastrowTmpIndexArray) {
47!
1865
          TAOS_CHECK_EXIT(terrno);
×
1866
        }
1867
      }
1868
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
156!
UNCOV
1869
        TAOS_CHECK_EXIT(terrno);
×
1870
      }
1871
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
156✔
1872
      if (extraTS && !i) {
156✔
1873
        lastrowSlotIds[lastrowIndex] = 0;
14✔
1874
      } else {
1875
        lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
142✔
1876
      }
1877
      lastrowIndex++;
156✔
1878
    }
1879
  }
1880

1881
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
120✔
1882
  if (!pTmpColArray) {
119!
UNCOV
1883
    TAOS_CHECK_EXIT(terrno);
×
1884
  }
1885

1886
  if (lastTmpIndexArray != NULL) {
119✔
1887
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
72!
1888
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
234✔
1889
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
162!
1890
                           taosArrayGet(lastTmpColArray, i))) {
162✔
UNCOV
1891
        TAOS_CHECK_EXIT(terrno);
×
1892
      }
1893
    }
1894
  }
1895

1896
  if (lastrowTmpIndexArray != NULL) {
119✔
1897
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
47!
1898
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
154✔
1899
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
107!
1900
                           taosArrayGet(lastrowTmpColArray, i))) {
107✔
UNCOV
1901
        TAOS_CHECK_EXIT(terrno);
×
1902
      }
1903
    }
1904
  }
1905

1906
  SLRUCache *pCache = pTsdb->lruCache;
119✔
1907
  for (int i = 0; i < num_keys; ++i) {
467✔
1908
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
348✔
1909
    SLastCol *pLastCol = NULL;
348✔
1910

1911
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
348!
1912
      pLastCol = taosArrayGet(pTmpColArray, i);
269✔
1913
    }
1914

1915
    // still null, then make up a none col value
1916
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
348✔
1917
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
348✔
1918
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
1919
    if (!pLastCol) {
348✔
1920
      pLastCol = &noneCol;
79✔
1921
    }
1922

1923
    if (!extraTS || i > 0) {
348✔
1924
      taosArraySet(pLastArray, idxKey->idx, pLastCol);
268✔
1925
    }
1926
    // taosArrayRemove(remainCols, i);
1927

1928
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
347!
UNCOV
1929
      continue;
×
1930
    }
1931
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
347!
UNCOV
1932
      continue;
×
1933
    }
1934

1935
    // store result back to rocks cache
1936
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
347✔
1937
    if (code) {
348!
UNCOV
1938
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
UNCOV
1939
      TAOS_CHECK_EXIT(code);
×
1940
    }
1941

1942
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
348✔
1943
    if (code) {
348!
UNCOV
1944
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
UNCOV
1945
      TAOS_CHECK_EXIT(code);
×
1946
    }
1947
  }
1948

1949
  rocksMayWrite(pTsdb, false);
119✔
1950

1951
_exit:
119✔
1952
  taosArrayDestroy(lastrowTmpIndexArray);
119✔
1953
  taosArrayDestroy(lastrowTmpColArray);
119✔
1954
  taosArrayDestroy(lastTmpIndexArray);
119✔
1955
  taosArrayDestroy(lastTmpColArray);
119✔
1956

1957
  taosMemoryFree(lastColIds);
118!
1958
  taosMemoryFree(lastSlotIds);
119!
1959
  taosMemoryFree(lastrowColIds);
119!
1960
  taosMemoryFree(lastrowSlotIds);
119!
1961

1962
  taosArrayDestroy(pTmpColArray);
119✔
1963

1964
  taosMemoryFree(slotIds);
119!
1965

1966
  TAOS_RETURN(code);
119✔
1967
}
1968

1969
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
158✔
1970
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
1971
  int32_t code = 0, lino = 0;
158✔
1972
  int     num_keys = TARRAY_SIZE(remainCols);
158✔
1973
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
158!
1974
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
158!
1975
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
158!
1976
  if (!keys_list || !keys_list_sizes || !key_list) {
158!
UNCOV
1977
    taosMemoryFree(keys_list);
×
UNCOV
1978
    taosMemoryFree(keys_list_sizes);
×
UNCOV
1979
    TAOS_RETURN(terrno);
×
1980
  }
1981
  char  **values_list = NULL;
158✔
1982
  size_t *values_list_sizes = NULL;
158✔
1983
  for (int i = 0; i < num_keys; ++i) {
501✔
1984
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
343✔
1985
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
343✔
1986
    keys_list_sizes[i] = ROCKS_KEY_LEN;
343✔
1987
  }
1988

1989
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
158✔
1990

1991
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
158✔
1992
                                     &values_list_sizes);
1993
  if (code) {
157!
UNCOV
1994
    taosMemoryFree(key_list);
×
UNCOV
1995
    taosMemoryFree(keys_list);
×
1996
    taosMemoryFree(keys_list_sizes);
×
1997
    TAOS_RETURN(code);
×
1998
  }
1999

2000
  SLRUCache *pCache = pTsdb->lruCache;
157✔
2001
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
496✔
2002
    SLastCol *pLastCol = NULL;
339✔
2003
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
339✔
2004
    if (ignore) {
339✔
2005
      ++j;
1✔
2006
      continue;
1✔
2007
    }
2008

2009
    if (values_list[i] != NULL) {
338✔
2010
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
75✔
2011
      if (code != TSDB_CODE_SUCCESS) {
75!
UNCOV
2012
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2013
                  tstrerror(code));
UNCOV
2014
        goto _exit;
×
2015
      }
2016
    }
2017
    SLastCol *pToFree = pLastCol;
338✔
2018
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
338✔
2019
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
413!
2020
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
75✔
2021
      if (code) {
75!
UNCOV
2022
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
UNCOV
2023
        taosMemoryFreeClear(pToFree);
×
UNCOV
2024
        TAOS_CHECK_EXIT(code);
×
2025
      }
2026

2027
      SLastCol lastCol = *pLastCol;
75✔
2028
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
75✔
2029
      if (TSDB_CODE_SUCCESS != code) {
75!
UNCOV
2030
        taosMemoryFreeClear(pToFree);
×
UNCOV
2031
        TAOS_CHECK_EXIT(code);
×
2032
      }
2033

2034
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
75✔
2035
      taosArrayRemove(remainCols, j);
75✔
2036
      taosArrayRemove(ignoreFromRocks, j);
75✔
2037
    } else {
2038
      ++j;
263✔
2039
    }
2040

2041
    taosMemoryFreeClear(pToFree);
338!
2042
  }
2043

2044
  if (TARRAY_SIZE(remainCols) > 0) {
157✔
2045
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
2046
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
118✔
2047
  }
2048

2049
_exit:
39✔
2050
  taosMemoryFree(key_list);
158!
2051
  taosMemoryFree(keys_list);
158!
2052
  taosMemoryFree(keys_list_sizes);
158!
2053
  if (values_list) {
158!
2054
  #ifdef USE_ROCKSDB
2055
    for (int i = 0; i < num_keys; ++i) {
501✔
2056
      rocksdb_free(values_list[i]);
343✔
2057
    }
2058
  #endif
2059
    taosMemoryFree(values_list);
158!
2060
  }
2061
  taosMemoryFree(values_list_sizes);
158!
2062

2063
  TAOS_RETURN(code);
158✔
2064
}
2065

2066
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
9,509✔
2067
                                        int8_t ltype, SArray *keyArray) {
2068
  int32_t    code = 0, lino = 0;
9,509✔
2069
  SArray    *remainCols = NULL;
9,509✔
2070
  SArray    *ignoreFromRocks = NULL;
9,509✔
2071
  SLRUCache *pCache = pTsdb->lruCache;
9,509✔
2072
  SArray    *pCidList = pr->pCidList;
9,509✔
2073
  int        numKeys = TARRAY_SIZE(pCidList);
9,509✔
2074

2075
  for (int i = 0; i < numKeys; ++i) {
29,174✔
2076
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
19,660✔
2077

2078
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
19,660✔
2079
    // for select last_row, last case
2080
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
19,660✔
2081
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
19,660✔
2082
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
1,354✔
2083
    }
2084
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
19,656✔
2085
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
709✔
2086
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
709✔
2087
    }
2088

2089
    if (!taosArrayPush(keyArray, &key)) {
19,642!
UNCOV
2090
      TAOS_CHECK_EXIT(terrno);
×
2091
    }
2092

2093
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
19,642✔
2094
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
19,667✔
2095
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
38,971!
2096
      SLastCol lastCol = *pLastCol;
19,327✔
2097
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
19,327!
UNCOV
2098
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2099
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
2100
      }
2101

2102
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
19,301!
2103
        code = terrno;
×
UNCOV
2104
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2105
        goto _exit;
×
2106
      }
2107
    } else {
2108
      // no cache or cache is invalid
2109
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
343✔
2110
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
343✔
2111

2112
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
343!
2113
        code = terrno;
×
2114
        tsdbLRUCacheRelease(pCache, h, false);
×
2115
        goto _exit;
×
2116
      }
2117

2118
      if (!remainCols) {
343✔
2119
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
158!
UNCOV
2120
          code = terrno;
×
UNCOV
2121
          tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2122
          goto _exit;
×
2123
        }
2124
      }
2125
      if (!ignoreFromRocks) {
342✔
2126
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
157!
UNCOV
2127
          code = terrno;
×
UNCOV
2128
          tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2129
          goto _exit;
×
2130
        }
2131
      }
2132
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
685!
2133
        code = terrno;
×
2134
        tsdbLRUCacheRelease(pCache, h, false);
×
2135
        goto _exit;
×
2136
      }
2137
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
343!
2138
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
343!
UNCOV
2139
        code = terrno;
×
2140
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2141
        goto _exit;
×
2142
      }
2143
    }
2144

2145
    if (h) {
19,644✔
2146
      tsdbLRUCacheRelease(pCache, h, false);
19,300✔
2147
    }
2148
  }
2149

2150
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
9,514!
2151
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
158✔
2152

2153
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
501✔
2154
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
343✔
2155
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
343✔
2156
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
343✔
2157
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
343!
UNCOV
2158
        SLastCol lastCol = *pLastCol;
×
UNCOV
2159
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
UNCOV
2160
        if (code) {
×
UNCOV
2161
          tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2162
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
UNCOV
2163
          TAOS_RETURN(code);
×
2164
        }
2165

UNCOV
2166
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
2167

UNCOV
2168
        taosArrayRemove(remainCols, i);
×
UNCOV
2169
        taosArrayRemove(ignoreFromRocks, i);
×
2170
      } else {
2171
        // no cache or cache is invalid
2172
        ++i;
343✔
2173
      }
2174
      if (h) {
343✔
2175
        tsdbLRUCacheRelease(pCache, h, false);
1✔
2176
      }
2177
    }
2178

2179
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
2180
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
158✔
2181

2182
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
158✔
2183
  }
2184

2185
_exit:
9,356✔
2186
  if (remainCols) {
9,500✔
2187
    taosArrayDestroy(remainCols);
158✔
2188
  }
2189
  if (ignoreFromRocks) {
9,500✔
2190
    taosArrayDestroy(ignoreFromRocks);
158✔
2191
  }
2192

2193
  TAOS_RETURN(code);
9,500✔
2194
}
2195

2196
typedef enum SMEMNEXTROWSTATES {
2197
  SMEMNEXTROW_ENTER,
2198
  SMEMNEXTROW_NEXT,
2199
} SMEMNEXTROWSTATES;
2200

2201
typedef struct SMemNextRowIter {
2202
  SMEMNEXTROWSTATES state;
2203
  STbData          *pMem;  // [input]
2204
  STbDataIter       iter;  // mem buffer skip list iterator
2205
  int64_t           lastTs;
2206
} SMemNextRowIter;
2207

2208
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
538,888✔
2209
                                 int nCols) {
2210
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
538,888✔
2211
  int32_t          code = 0;
538,888✔
2212
  *pIgnoreEarlierTs = false;
538,888✔
2213
  switch (state->state) {
538,888!
2214
    case SMEMNEXTROW_ENTER: {
4,970✔
2215
      if (state->pMem != NULL) {
4,970!
2216
        /*
2217
        if (state->pMem->maxKey <= state->lastTs) {
2218
          *ppRow = NULL;
2219
          *pIgnoreEarlierTs = true;
2220

2221
          TAOS_RETURN(code);
2222
        }
2223
        */
2224
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
4,971✔
2225

2226
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
4,972!
2227
        if (pMemRow) {
4,972✔
2228
          *ppRow = pMemRow;
4,971✔
2229
          state->state = SMEMNEXTROW_NEXT;
4,971✔
2230

2231
          TAOS_RETURN(code);
4,971✔
2232
        }
2233
      }
2234

UNCOV
2235
      *ppRow = NULL;
×
2236

UNCOV
2237
      TAOS_RETURN(code);
×
2238
    }
2239
    case SMEMNEXTROW_NEXT:
534,985✔
2240
      if (tsdbTbDataIterNext(&state->iter)) {
534,985!
2241
        *ppRow = tsdbTbDataIterGet(&state->iter);
529,215!
2242

2243
        TAOS_RETURN(code);
529,215✔
2244
      } else {
UNCOV
2245
        *ppRow = NULL;
×
2246

UNCOV
2247
        TAOS_RETURN(code);
×
2248
      }
UNCOV
2249
    default:
×
UNCOV
2250
      break;
×
2251
  }
2252

UNCOV
2253
_err:
×
UNCOV
2254
  *ppRow = NULL;
×
2255

UNCOV
2256
  TAOS_RETURN(code);
×
2257
}
2258

2259
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2260
                                  int nCols);
2261
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2262

2263
typedef struct {
2264
  TSDBROW             *pRow;
2265
  bool                 stop;
2266
  bool                 next;
2267
  bool                 ignoreEarlierTs;
2268
  void                *iter;
2269
  _next_row_fn_t       nextRowFn;
2270
  _next_row_clear_fn_t nextRowClearFn;
2271
} TsdbNextRowState;
2272

2273
typedef struct {
2274
  SArray           *pMemDelData;
2275
  SArray           *pSkyline;
2276
  int64_t           iSkyline;
2277
  SBlockIdx         idx;
2278
  SMemNextRowIter   memState;
2279
  SMemNextRowIter   imemState;
2280
  TSDBROW           memRow, imemRow;
2281
  TsdbNextRowState  input[2];
2282
  SCacheRowsReader *pr;
2283
  STsdb            *pTsdb;
2284
} MemNextRowIter;
2285

2286
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
9,501✔
2287
                              STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
2288
  int32_t code = 0, lino = 0;
9,501✔
2289

2290
  STbData *pMem = NULL;
9,501✔
2291
  if (pReadSnap->pMem) {
9,501!
2292
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
9,502✔
2293
  }
2294

2295
  STbData *pIMem = NULL;
9,513✔
2296
  if (pReadSnap->pIMem) {
9,513✔
2297
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
8✔
2298
  }
2299

2300
  pIter->pTsdb = pTsdb;
9,513✔
2301

2302
  pIter->pMemDelData = NULL;
9,513✔
2303

2304
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
9,513!
2305

2306
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
9,512✔
2307

2308
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
9,512✔
2309
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
9,512✔
2310

2311
  if (pMem) {
9,512✔
2312
    pIter->memState.pMem = pMem;
4,889✔
2313
    pIter->memState.state = SMEMNEXTROW_ENTER;
4,889✔
2314
    pIter->input[0].stop = false;
4,889✔
2315
    pIter->input[0].next = true;
4,889✔
2316
  }
2317

2318
  if (pIMem) {
9,512✔
2319
    pIter->imemState.pMem = pIMem;
8✔
2320
    pIter->imemState.state = SMEMNEXTROW_ENTER;
8✔
2321
    pIter->input[1].stop = false;
8✔
2322
    pIter->input[1].next = true;
8✔
2323
  }
2324

2325
  pIter->pr = pr;
9,512✔
2326

2327
_exit:
9,512✔
2328
  if (code) {
9,512!
UNCOV
2329
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2330
  }
2331

2332
  TAOS_RETURN(code);
9,512✔
2333
}
2334

2335
static void memRowIterClose(MemNextRowIter *pIter) {
9,510✔
2336
  for (int i = 0; i < 2; ++i) {
28,537✔
2337
    if (pIter->input[i].nextRowClearFn) {
19,023!
UNCOV
2338
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
2339
    }
2340
  }
2341

2342
  if (pIter->pSkyline) {
9,514✔
2343
    taosArrayDestroy(pIter->pSkyline);
4,887✔
2344
  }
2345

2346
  if (pIter->pMemDelData) {
9,511!
2347
    taosArrayDestroy(pIter->pMemDelData);
9,511✔
2348
  }
2349
}
9,511✔
2350

2351
static void freeTableInfoFunc(void *param) {
4,894✔
2352
  void **p = (void **)param;
4,894✔
2353
  taosMemoryFreeClear(*p);
4,894!
2354
}
4,896✔
2355

2356
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
4,998✔
2357
  if (!pReader->pTableMap) {
4,998✔
2358
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
3,282✔
2359
    if (!pReader->pTableMap) {
3,284!
UNCOV
2360
      return NULL;
×
2361
    }
2362

2363
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
3,284✔
2364
  }
2365

2366
  STableLoadInfo  *pInfo = NULL;
4,999✔
2367
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
4,999✔
2368
  if (!ppInfo) {
4,999✔
2369
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
4,891!
2370
    if (pInfo) {
4,895!
2371
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
4,895!
UNCOV
2372
        return NULL;
×
2373
      }
2374
    }
2375

2376
    return pInfo;
4,895✔
2377
  }
2378

2379
  return *ppInfo;
108✔
2380
}
2381

2382
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
543,987✔
2383
  int32_t code = 0, lino = 0;
543,987✔
2384

2385
  for (;;) {
163✔
2386
    for (int i = 0; i < 2; ++i) {
1,616,361✔
2387
      if (pIter->input[i].next && !pIter->input[i].stop) {
1,081,489!
2388
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
540,284!
2389
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
2390
                        &lino, _exit);
2391

2392
        if (pIter->input[i].pRow == NULL) {
531,006✔
2393
          pIter->input[i].stop = true;
145✔
2394
          pIter->input[i].next = false;
145✔
2395
        }
2396
      }
2397
    }
2398

2399
    if (pIter->input[0].stop && pIter->input[1].stop) {
534,872!
2400
      return NULL;
535,030✔
2401
    }
2402

2403
    TSDBROW *max[2] = {0};
530,097✔
2404
    int      iMax[2] = {-1, -1};
530,097✔
2405
    int      nMax = 0;
530,097✔
2406
    SRowKey  maxKey = {.ts = TSKEY_MIN};
530,097✔
2407

2408
    for (int i = 0; i < 2; ++i) {
1,597,838✔
2409
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
1,066,292!
2410
        STsdbRowKey tsdbRowKey = {0};
532,402✔
2411
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
532,402✔
2412

2413
        // merging & deduplicating on client side
2414
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
532,065✔
2415
        if (c <= 0) {
533,851✔
2416
          if (c < 0) {
533,132✔
2417
            nMax = 0;
530,964✔
2418
            maxKey = tsdbRowKey.key;
530,964✔
2419
          }
2420

2421
          iMax[nMax] = i;
533,132✔
2422
          max[nMax++] = pIter->input[i].pRow;
533,132✔
2423
        }
2424
        pIter->input[i].next = false;
533,851✔
2425
      }
2426
    }
2427

2428
    TSDBROW *merge[2] = {0};
531,546✔
2429
    int      iMerge[2] = {-1, -1};
531,546✔
2430
    int      nMerge = 0;
531,546✔
2431
    for (int i = 0; i < nMax; ++i) {
1,064,945✔
2432
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
534,527!
2433

2434
      if (!pIter->pSkyline) {
534,527✔
2435
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
4,884✔
2436
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
4,886!
2437

2438
        uint64_t        uid = pIter->idx.uid;
4,886✔
2439
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
4,886✔
2440
        TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
4,888!
2441

2442
        if (pInfo->pTombData == NULL) {
4,888✔
2443
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
4,812✔
2444
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
4,812!
2445
        }
2446

2447
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
4,888!
UNCOV
2448
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2449
        }
2450

2451
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
4,887✔
2452
        if (delSize > 0) {
4,887✔
2453
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
1✔
2454
          TAOS_CHECK_GOTO(code, &lino, _exit);
1!
2455
        }
2456
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
4,887✔
2457
      }
2458

2459
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
534,528✔
2460
      if (!deleted) {
533,399!
2461
        iMerge[nMerge] = iMax[i];
534,917✔
2462
        merge[nMerge++] = max[i];
534,917✔
2463
      }
2464

2465
      pIter->input[iMax[i]].next = deleted;
533,399✔
2466
    }
2467

2468
    if (nMerge > 0) {
530,418✔
2469
      pIter->input[iMerge[0]].next = true;
530,255✔
2470

2471
      return merge[0];
530,255✔
2472
    }
2473
  }
2474

UNCOV
2475
_exit:
×
UNCOV
2476
  if (code) {
×
UNCOV
2477
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2478
  }
2479

UNCOV
2480
  return NULL;
×
2481
}
2482

2483
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
2,844✔
2484
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
2,844✔
2485
  *ppDst = taosMemoryMalloc(len);
2,844!
2486
  if (NULL == *ppDst) {
2,847!
UNCOV
2487
    TAOS_RETURN(terrno);
×
2488
  }
2489
  memcpy(*ppDst, pSrc, len);
2,847✔
2490

2491
  TAOS_RETURN(TSDB_CODE_SUCCESS);
2,847✔
2492
}
2493

2494
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
530,158✔
2495
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
530,158✔
2496
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
2,845✔
2497
  }
2498

2499
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
527,313!
2500
    TAOS_RETURN(TSDB_CODE_SUCCESS);
527,331✔
2501
  }
2502

UNCOV
2503
  taosMemoryFreeClear(pReader->pCurrSchema);
×
UNCOV
2504
  TAOS_RETURN(
×
2505
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
2506
}
2507

2508
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
9,498✔
2509
                                        SArray *keyArray) {
2510
  int32_t        code = 0;
9,498✔
2511
  int32_t        lino = 0;
9,498✔
2512
  STSchema      *pTSchema = pr->pSchema;
9,498✔
2513
  SLRUCache     *pCache = pTsdb->lruCache;
9,498✔
2514
  SArray        *pCidList = pr->pCidList;
9,498✔
2515
  int            numKeys = TARRAY_SIZE(pCidList);
9,498✔
2516
  MemNextRowIter iter = {0};
9,498✔
2517
  SSHashObj     *iColHash = NULL;
9,498✔
2518
  STSDBRowIter   rowIter = {0};
9,498✔
2519

2520
  // 1, get from mem, imem filtered with delete info
2521
  TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
9,498!
2522

2523
  TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
9,510✔
2524
  if (!pRow) {
9,510✔
2525
    goto _exit;
4,630✔
2526
  }
2527

2528
  int32_t sversion = TSDBROW_SVERSION(pRow);
4,880!
2529
  if (sversion != -1) {
4,880!
2530
    TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
4,886!
2531

2532
    pTSchema = pr->pCurrSchema;
4,886✔
2533
  }
2534
  int32_t nCol = pTSchema->numOfCols;
4,880✔
2535

2536
  STsdbRowKey rowKey = {0};
4,880✔
2537
  tsdbRowGetKey(pRow, &rowKey);
4,880✔
2538

2539
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
4,886!
2540

2541
  int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
4,885✔
2542
  for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) {
54,359!
2543
    SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
49,470✔
2544
    if (pColVal->cid < pTargetCol->colVal.cid) {
49,470✔
2545
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
39,010✔
2546

2547
      continue;
39,012✔
2548
    }
2549
    if (pColVal->cid > pTargetCol->colVal.cid) {
10,460!
2550
      break;
×
2551
    }
2552

2553
    int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
10,460✔
2554
    if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
10,460✔
2555
      if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
6,938!
2556
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
5,898✔
2557
        TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
5,898!
2558

2559
        tsdbCacheFreeSLastColItem(pTargetCol);
5,900✔
2560
        taosArraySet(pLastArray, jCol, &lastCol);
5,899✔
2561
      }
2562
    } else {
2563
      if (COL_VAL_IS_VALUE(pColVal)) {
3,522✔
2564
        if (cmp_res <= 0) {
2,953!
2565
          SLastCol lastCol = {
2,953✔
2566
              .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2567
          TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
2,953!
2568

2569
          tsdbCacheFreeSLastColItem(pTargetCol);
2,952✔
2570
          taosArraySet(pLastArray, jCol, &lastCol);
2,952✔
2571
        }
2572
      } else {
2573
        if (!iColHash) {
569✔
2574
          iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
214✔
2575
          if (iColHash == NULL) {
214!
UNCOV
2576
            TAOS_CHECK_EXIT(terrno);
×
2577
          }
2578
        }
2579

2580
        if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
569!
UNCOV
2581
          TAOS_CHECK_EXIT(terrno);
×
2582
        }
2583
      }
2584
    }
2585

2586
    ++jCol;
10,460✔
2587

2588
    if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) {
10,460✔
2589
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
6,107✔
2590
    }
2591
  }
2592
  tsdbRowClose(&rowIter);
4,887✔
2593

2594
  if (iColHash && tSimpleHashGetSize(iColHash) > 0) {
4,886!
2595
    pRow = memRowIterGet(&iter, false, NULL, 0);
214✔
2596
    while (pRow) {
528,902✔
2597
      if (tSimpleHashGetSize(iColHash) == 0) {
528,757✔
2598
        break;
68✔
2599
      }
2600

2601
      sversion = TSDBROW_SVERSION(pRow);
525,899!
2602
      if (sversion != -1) {
525,899!
2603
        TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
526,388!
2604

2605
        pTSchema = pr->pCurrSchema;
516,216✔
2606
      }
2607
      nCol = pTSchema->numOfCols;
515,727✔
2608

2609
      STsdbRowKey tsdbRowKey = {0};
515,727✔
2610
      tsdbRowGetKey(pRow, &tsdbRowKey);
515,727✔
2611

2612
      TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
512,725!
2613

2614
      iCol = 0;
528,439✔
2615
      for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
4,643,912!
2616
           pColVal = tsdbRowIterNext(&rowIter), iCol++) {
3,847,533✔
2617
        int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
4,240,082✔
2618
        if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
3,847,533✔
2619
          SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol];
424✔
2620

2621
          int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key);
424✔
2622
          if (cmp_res <= 0) {
424!
2623
            SLastCol lastCol = {
424✔
2624
                .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2625
            TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
424!
2626

2627
            tsdbCacheFreeSLastColItem(pTargetCol);
424✔
2628
            taosArraySet(pLastArray, *pjCol, &lastCol);
424✔
2629
          }
2630

2631
          TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
424!
2632
        }
2633
      }
2634
      tsdbRowClose(&rowIter);
382,167✔
2635

2636
      pRow = memRowIterGet(&iter, false, NULL, 0);
535,486✔
2637
    }
2638
  }
2639

2640
_exit:
4,817✔
2641
  if (code) {
9,515!
UNCOV
2642
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2643

UNCOV
2644
    tsdbRowClose(&rowIter);
×
2645
  }
2646

2647
  tSimpleHashCleanup(iColHash);
9,515✔
2648

2649
  memRowIterClose(&iter);
9,510✔
2650

2651
  TAOS_RETURN(code);
9,511✔
2652
}
2653

2654
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
9,500✔
2655
  int32_t code = 0;
9,500✔
2656
  int32_t lino = 0;
9,500✔
2657

2658
  SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
9,500✔
2659
  if (!keyArray) {
9,510!
UNCOV
2660
    TAOS_CHECK_EXIT(terrno);
×
2661
  }
2662

2663
  TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
9,510!
2664

2665
  if (tsUpdateCacheBatch) {
9,498!
2666
    TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
9,499!
2667
  }
2668

2669
_exit:
9,509✔
2670
  if (code) {
9,509!
UNCOV
2671
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2672
  }
2673

2674
  if (keyArray) {
9,508!
2675
    taosArrayDestroy(keyArray);
9,508✔
2676
  }
2677

2678
  TAOS_RETURN(code);
9,516✔
2679
}
2680

2681
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
67,190✔
2682
  int32_t   code = 0, lino = 0;
67,190✔
2683
  STSchema *pTSchema = NULL;
67,190✔
2684
  int       sver = -1;
67,190✔
2685
  int       numKeys = 0;
67,190✔
2686
  SArray   *remainCols = NULL;
67,190✔
2687

2688
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
67,190!
2689

2690
  int numCols = pTSchema->numOfCols;
67,189✔
2691

2692
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
67,189✔
2693

2694
  for (int i = 0; i < numCols; ++i) {
391,080✔
2695
    int16_t cid = pTSchema->columns[i].colId;
323,853✔
2696
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
971,552✔
2697
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
647,662✔
2698
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
647,662✔
2699
      if (h) {
647,703✔
2700
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
11,131✔
2701
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
11,131!
2702
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
10,219✔
2703
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
10,219✔
2704
                              .dirty = 1,
2705
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2706
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
10,219✔
2707
        }
2708
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
11,131✔
2709
        TAOS_CHECK_EXIT(code);
11,131!
2710
      } else {
2711
        if (!remainCols) {
636,572✔
2712
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
67,165✔
2713
        }
2714
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
1,273,142!
UNCOV
2715
          TAOS_CHECK_EXIT(terrno);
×
2716
        }
2717
      }
2718
    }
2719
  }
2720

2721
  if (remainCols) {
67,227✔
2722
    numKeys = TARRAY_SIZE(remainCols);
67,167✔
2723
  }
2724

2725
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
67,227!
2726
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
67,188!
2727
  char  **values_list = NULL;
67,189✔
2728
  size_t *values_list_sizes = NULL;
67,189✔
2729

2730
  if (!keys_list || !keys_list_sizes) {
67,189!
2731
    code = terrno;
1✔
UNCOV
2732
    goto _exit;
×
2733
  }
2734
  const size_t klen = ROCKS_KEY_LEN;
67,188✔
2735

2736
  for (int i = 0; i < numKeys; ++i) {
703,711✔
2737
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
636,522!
2738
    if (!key) {
636,535!
UNCOV
2739
      code = terrno;
×
UNCOV
2740
      goto _exit;
×
2741
    }
2742
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
636,535✔
2743

2744
    ((SLastKey *)key)[0] = idxKey->key;
636,523✔
2745

2746
    keys_list[i] = key;
636,523✔
2747
    keys_list_sizes[i] = klen;
636,523✔
2748
  }
2749

2750
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
67,189✔
2751

2752
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
67,190!
2753
                                              &values_list, &values_list_sizes),
2754
                  NULL, _exit);
2755

2756
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
2757
  for (int i = 0; i < numKeys; ++i) {
703,655✔
2758
    SLastCol *pLastCol = NULL;
636,468✔
2759
    if (values_list[i] != NULL) {
636,468!
UNCOV
2760
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
UNCOV
2761
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2762
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2763
                  tstrerror(code));
UNCOV
2764
        goto _exit;
×
2765
      }
2766
    }
2767
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
636,468✔
2768
    SLastKey *pLastKey = &idxKey->key;
636,469✔
2769
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
636,469!
UNCOV
2770
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
×
UNCOV
2771
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
×
2772
                             .dirty = 0,
2773
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2774

UNCOV
2775
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
×
UNCOV
2776
        taosMemoryFreeClear(pLastCol);
×
UNCOV
2777
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
2778
        goto _exit;
×
2779
      }
UNCOV
2780
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
×
UNCOV
2781
        taosMemoryFreeClear(pLastCol);
×
UNCOV
2782
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
2783
        goto _exit;
×
2784
      }
2785
    }
2786

2787
    if (pLastCol == NULL) {
636,469!
2788
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
636,469✔
2789
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2790
    }
2791

2792
    taosMemoryFreeClear(pLastCol);
636,466!
2793
  }
2794

2795
  rocksMayWrite(pTsdb, false);
67,187✔
2796

2797
_exit:
67,190✔
2798
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
67,190✔
2799

2800
  for (int i = 0; i < numKeys; ++i) {
703,752✔
2801
    taosMemoryFree(keys_list[i]);
636,560!
2802
  }
2803
  taosMemoryFree(keys_list);
67,192!
2804
  taosMemoryFree(keys_list_sizes);
67,189!
2805
  if (values_list) {
67,188!
2806
 #if USE_ROCKSDB   
2807
    for (int i = 0; i < numKeys; ++i) {
703,724✔
2808
      rocksdb_free(values_list[i]);
636,561✔
2809
    }
2810
#endif
2811
    taosMemoryFree(values_list);
67,163!
2812
  }
2813
  taosMemoryFree(values_list_sizes);
67,190!
2814
  taosArrayDestroy(remainCols);
67,189✔
2815
  taosMemoryFree(pTSchema);
67,189!
2816

2817
  TAOS_RETURN(code);
67,188✔
2818
}
2819

2820
int32_t tsdbOpenCache(STsdb *pTsdb) {
11,913✔
2821
  int32_t code = 0, lino = 0;
11,913✔
2822
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
11,913✔
2823

2824
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
11,913✔
2825
  if (pCache == NULL) {
11,937!
UNCOV
2826
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2827
  }
2828

2829
  TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
11,937!
2830

2831
  TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
11,936!
2832

2833
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
11,939!
2834

2835
  taosLRUCacheSetStrictCapacity(pCache, false);
11,934✔
2836

2837
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
11,935✔
2838

2839
_err:
11,917✔
2840
  if (code) {
11,917!
UNCOV
2841
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2842
  }
2843

2844
  pTsdb->lruCache = pCache;
11,917✔
2845

2846
  TAOS_RETURN(code);
11,917✔
2847
}
2848

2849
void tsdbCloseCache(STsdb *pTsdb) {
11,939✔
2850
  SLRUCache *pCache = pTsdb->lruCache;
11,939✔
2851
  if (pCache) {
11,939!
2852
    taosLRUCacheEraseUnrefEntries(pCache);
11,940✔
2853

2854
    taosLRUCacheCleanup(pCache);
11,941✔
2855

2856
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
11,940✔
2857
  }
2858

2859
  tsdbCloseBCache(pTsdb);
11,940✔
2860
  tsdbClosePgCache(pTsdb);
11,941✔
2861
  tsdbCloseRocksCache(pTsdb);
11,941✔
2862
}
11,939✔
2863

UNCOV
2864
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
UNCOV
2865
  if (cacheType == 0) {  // last_row
×
2866
    *(uint64_t *)key = (uint64_t)uid;
×
2867
  } else {  // last
2868
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2869
  }
2870

2871
  *len = sizeof(uint64_t);
×
2872
}
×
2873

2874
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
2875
  tb_uid_t suid = 0;
×
2876

UNCOV
2877
  SMetaReader mr = {0};
×
2878
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
UNCOV
2879
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
UNCOV
2880
    metaReaderClear(&mr);  // table not esist
×
2881
    return 0;
×
2882
  }
2883

UNCOV
2884
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
2885
    suid = mr.me.ctbEntry.suid;
×
2886
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
2887
    suid = 0;
×
2888
  } else {
UNCOV
2889
    suid = 0;
×
2890
  }
2891

UNCOV
2892
  metaReaderClear(&mr);
×
2893

UNCOV
2894
  return suid;
×
2895
}
2896

UNCOV
2897
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
UNCOV
2898
  int32_t code = 0;
×
2899

2900
  if (pDelIdx) {
×
UNCOV
2901
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
2902
  }
2903

UNCOV
2904
  TAOS_RETURN(code);
×
2905
}
2906

UNCOV
2907
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
UNCOV
2908
  int32_t   code = 0;
×
UNCOV
2909
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
2910

UNCOV
2911
  for (; pDelData; pDelData = pDelData->pNext) {
×
UNCOV
2912
    if (!taosArrayPush(aDelData, pDelData)) {
×
UNCOV
2913
      TAOS_RETURN(terrno);
×
2914
    }
2915
  }
2916

UNCOV
2917
  TAOS_RETURN(code);
×
2918
}
2919

2920
static uint64_t *getUidList(SCacheRowsReader *pReader) {
56✔
2921
  if (!pReader->uidList) {
56✔
2922
    int32_t numOfTables = pReader->numOfTables;
14✔
2923

2924
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
14!
2925
    if (!pReader->uidList) {
14!
UNCOV
2926
      return NULL;
×
2927
    }
2928

2929
    for (int32_t i = 0; i < numOfTables; ++i) {
50✔
2930
      uint64_t uid = pReader->pTableList[i].uid;
36✔
2931
      pReader->uidList[i] = uid;
36✔
2932
    }
2933

2934
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
14✔
2935
  }
2936

2937
  return pReader->uidList;
56✔
2938
}
2939

2940
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
56✔
2941
                               bool isFile) {
2942
  int32_t   code = 0;
56✔
2943
  int32_t   numOfTables = pReader->numOfTables;
56✔
2944
  int64_t   suid = pReader->info.suid;
56✔
2945
  uint64_t *uidList = getUidList(pReader);
56✔
2946

2947
  if (!uidList) {
56!
UNCOV
2948
    TAOS_RETURN(terrno);
×
2949
  }
2950

2951
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
72!
2952
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
16✔
2953
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
16!
UNCOV
2954
      continue;
×
2955
    }
2956

2957
    if (pTombBlk->minTbid.suid > suid ||
16!
2958
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
16!
2959
      break;
2960
    }
2961

2962
    STombBlock block = {0};
16✔
2963
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
16✔
2964
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
16!
2965
    if (code != TSDB_CODE_SUCCESS) {
16!
UNCOV
2966
      TAOS_RETURN(code);
×
2967
    }
2968

2969
    uint64_t        uid = uidList[j];
16✔
2970
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
16✔
2971
    if (!pInfo) {
16!
UNCOV
2972
      tTombBlockDestroy(&block);
×
UNCOV
2973
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
2974
    }
2975

2976
    if (pInfo->pTombData == NULL) {
16✔
2977
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
2✔
2978
    }
2979

2980
    STombRecord record = {0};
16✔
2981
    bool        finished = false;
16✔
2982
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
32✔
2983
      code = tTombBlockGet(&block, k, &record);
16✔
2984
      if (code != TSDB_CODE_SUCCESS) {
16!
UNCOV
2985
        finished = true;
×
UNCOV
2986
        break;
×
2987
      }
2988

2989
      if (record.suid < suid) {
16!
UNCOV
2990
        continue;
×
2991
      }
2992
      if (record.suid > suid) {
16!
2993
        finished = true;
×
2994
        break;
×
2995
      }
2996

2997
      bool newTable = false;
16✔
2998
      if (uid < record.uid) {
16!
2999
        while (j < numOfTables && uidList[j] < record.uid) {
96!
3000
          ++j;
80✔
3001
          newTable = true;
80✔
3002
        }
3003

3004
        if (j >= numOfTables) {
16!
UNCOV
3005
          finished = true;
×
UNCOV
3006
          break;
×
3007
        }
3008

3009
        uid = uidList[j];
16✔
3010
      }
3011

3012
      if (record.uid < uid) {
16!
3013
        continue;
×
3014
      }
3015

3016
      if (newTable) {
16!
3017
        pInfo = getTableLoadInfo(pReader, uid);
16✔
3018
        if (!pInfo) {
16!
UNCOV
3019
          code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
3020
          finished = true;
×
3021
          break;
×
3022
        }
3023
        if (pInfo->pTombData == NULL) {
16✔
3024
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
2✔
3025
          if (!pInfo->pTombData) {
2!
UNCOV
3026
            code = terrno;
×
UNCOV
3027
            finished = true;
×
UNCOV
3028
            break;
×
3029
          }
3030
        }
3031
      }
3032

3033
      if (record.version <= pReader->info.verRange.maxVer) {
16!
3034
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
3035
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
3036

3037
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
16✔
3038
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
32!
UNCOV
3039
          TAOS_RETURN(terrno);
×
3040
        }
3041
      }
3042
    }
3043

3044
    tTombBlockDestroy(&block);
16✔
3045

3046
    if (finished) {
16!
UNCOV
3047
      TAOS_RETURN(code);
×
3048
    }
3049
  }
3050

3051
  TAOS_RETURN(TSDB_CODE_SUCCESS);
56✔
3052
}
3053

3054
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
16✔
3055
  const TTombBlkArray *pBlkArray = NULL;
16✔
3056

3057
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
16!
3058

3059
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
16✔
3060
}
3061

3062
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
40✔
3063
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
40✔
3064
  const TTombBlkArray *pBlkArray = NULL;
40✔
3065

3066
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
40!
3067

3068
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
40✔
3069
}
3070

3071
typedef struct {
3072
  SMergeTree  mergeTree;
3073
  SMergeTree *pMergeTree;
3074
} SFSLastIter;
3075

3076
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
56✔
3077
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
3078
  int32_t code = 0;
56✔
3079
  destroySttBlockReader(pr->pLDataIterArray, NULL);
56✔
3080
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
56✔
3081
  if (pr->pLDataIterArray == NULL) return terrno;
56!
3082

3083
  SMergeTreeConf conf = {
56✔
3084
      .uid = uid,
3085
      .suid = suid,
3086
      .pTsdb = pTsdb,
3087
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
3088
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
3089
      .strictTimeRange = false,
3090
      .pSchema = pTSchema,
3091
      .pCurrentFileset = pFileSet,
3092
      .backward = 1,
3093
      .pSttFileBlockIterArray = pr->pLDataIterArray,
56✔
3094
      .pCols = aCols,
3095
      .numOfCols = nCols,
3096
      .loadTombFn = loadSttTomb,
3097
      .pReader = pr,
3098
      .idstr = pr->idstr,
56✔
3099
      .pCurRowKey = &pr->rowKey,
56✔
3100
  };
3101

3102
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
56!
3103

3104
  iter->pMergeTree = &iter->mergeTree;
56✔
3105

3106
  TAOS_RETURN(code);
56✔
3107
}
3108

3109
static int32_t lastIterClose(SFSLastIter **iter) {
2✔
3110
  int32_t code = 0;
2✔
3111

3112
  if ((*iter)->pMergeTree) {
2!
3113
    tMergeTreeClose((*iter)->pMergeTree);
2✔
3114
    (*iter)->pMergeTree = NULL;
2✔
3115
  }
3116

3117
  *iter = NULL;
2✔
3118

3119
  TAOS_RETURN(code);
2✔
3120
}
3121

3122
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
62✔
3123
  bool hasVal = false;
62✔
3124
  *ppRow = NULL;
62✔
3125

3126
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
62✔
3127
  if (code != 0) {
62!
UNCOV
3128
    return code;
×
3129
  }
3130

3131
  if (!hasVal) {
62✔
3132
    *ppRow = NULL;
49✔
3133
    TAOS_RETURN(code);
49✔
3134
  }
3135

3136
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
13✔
3137
  TAOS_RETURN(code);
13✔
3138
}
3139

3140
typedef enum SFSNEXTROWSTATES {
3141
  SFSNEXTROW_FS,
3142
  SFSNEXTROW_FILESET,
3143
  SFSNEXTROW_INDEXLIST,
3144
  SFSNEXTROW_BRINBLOCK,
3145
  SFSNEXTROW_BRINRECORD,
3146
  SFSNEXTROW_BLOCKDATA,
3147
  SFSNEXTROW_BLOCKROW,
3148
  SFSNEXTROW_NEXTSTTROW
3149
} SFSNEXTROWSTATES;
3150

3151
struct CacheNextRowIter;
3152

3153
typedef struct SFSNextRowIter {
3154
  SFSNEXTROWSTATES         state;         // [input]
3155
  SBlockIdx               *pBlockIdxExp;  // [input]
3156
  STSchema                *pTSchema;      // [input]
3157
  tb_uid_t                 suid;
3158
  tb_uid_t                 uid;
3159
  int32_t                  iFileSet;
3160
  STFileSet               *pFileSet;
3161
  TFileSetArray           *aDFileSet;
3162
  SArray                  *pIndexList;
3163
  int32_t                  iBrinIndex;
3164
  SBrinBlock               brinBlock;
3165
  SBrinBlock              *pBrinBlock;
3166
  int32_t                  iBrinRecord;
3167
  SBrinRecord              brinRecord;
3168
  SBlockData               blockData;
3169
  SBlockData              *pBlockData;
3170
  int32_t                  nRow;
3171
  int32_t                  iRow;
3172
  TSDBROW                  row;
3173
  int64_t                  lastTs;
3174
  SFSLastIter              lastIter;
3175
  SFSLastIter             *pLastIter;
3176
  int8_t                   lastEmpty;
3177
  TSDBROW                 *pLastRow;
3178
  SRow                    *pTSRow;
3179
  SRowMerger               rowMerger;
3180
  SCacheRowsReader        *pr;
3181
  struct CacheNextRowIter *pRowIter;
3182
} SFSNextRowIter;
3183

3184
static void clearLastFileSet(SFSNextRowIter *state);
3185

3186
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
122✔
3187
                                int nCols) {
3188
  int32_t         code = 0, lino = 0;
122✔
3189
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
122✔
3190
  STsdb          *pTsdb = state->pr->pTsdb;
122✔
3191

3192
  if (SFSNEXTROW_FS == state->state) {
122✔
3193
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
118✔
3194

3195
    state->state = SFSNEXTROW_FILESET;
118✔
3196
  }
3197

3198
  if (SFSNEXTROW_FILESET == state->state) {
122✔
3199
  _next_fileset:
119✔
3200
    clearLastFileSet(state);
164✔
3201

3202
    if (--state->iFileSet < 0) {
162✔
3203
      *ppRow = NULL;
106✔
3204

3205
      TAOS_RETURN(code);
106✔
3206
    } else {
3207
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
56✔
3208
    }
3209

3210
    STFileObj **pFileObj = state->pFileSet->farr;
56✔
3211
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
56!
3212
      if (state->pFileSet != state->pr->pCurFileSet) {
16!
3213
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
16✔
3214
        const char           *filesName[4] = {0};
16✔
3215
        if (pFileObj[0] != NULL) {
16!
3216
          conf.files[0].file = *pFileObj[0]->f;
16✔
3217
          conf.files[0].exist = true;
16✔
3218
          filesName[0] = pFileObj[0]->fname;
16✔
3219

3220
          conf.files[1].file = *pFileObj[1]->f;
16✔
3221
          conf.files[1].exist = true;
16✔
3222
          filesName[1] = pFileObj[1]->fname;
16✔
3223

3224
          conf.files[2].file = *pFileObj[2]->f;
16✔
3225
          conf.files[2].exist = true;
16✔
3226
          filesName[2] = pFileObj[2]->fname;
16✔
3227
        }
3228

3229
        if (pFileObj[3] != NULL) {
16!
3230
          conf.files[3].exist = true;
16✔
3231
          conf.files[3].file = *pFileObj[3]->f;
16✔
3232
          filesName[3] = pFileObj[3]->fname;
16✔
3233
        }
3234

3235
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
16!
3236

3237
        state->pr->pCurFileSet = state->pFileSet;
16✔
3238

3239
        code = loadDataTomb(state->pr, state->pr->pFileReader);
16✔
3240
        if (code != TSDB_CODE_SUCCESS) {
16!
UNCOV
3241
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3242
                    tstrerror(code));
3243
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3244
        }
3245

3246
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
16!
3247
      }
3248

3249
      if (!state->pIndexList) {
16!
3250
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
16✔
3251
        if (!state->pIndexList) {
16!
UNCOV
3252
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3253
        }
3254
      } else {
UNCOV
3255
        taosArrayClear(state->pIndexList);
×
3256
      }
3257

3258
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
16✔
3259

3260
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
32✔
3261
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
16✔
3262
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
16!
3263
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
16✔
3264
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
4!
UNCOV
3265
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3266
            }
3267
          }
UNCOV
3268
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
UNCOV
3269
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
3270
          break;
3271
        }
3272
      }
3273

3274
      int indexSize = TARRAY_SIZE(state->pIndexList);
16✔
3275
      if (indexSize <= 0) {
16✔
3276
        goto _check_stt_data;
14✔
3277
      }
3278

3279
      state->state = SFSNEXTROW_INDEXLIST;
2✔
3280
      state->iBrinIndex = 1;
2✔
3281
    }
3282

3283
  _check_stt_data:
40✔
3284
    if (state->pFileSet != state->pr->pCurFileSet) {
56✔
3285
      state->pr->pCurFileSet = state->pFileSet;
38✔
3286
    }
3287

3288
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
56!
3289
                                 state->pr, state->lastTs, aCols, nCols),
3290
                    &lino, _err);
3291

3292
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
56!
3293

3294
    if (!state->pLastRow) {
56✔
3295
      state->lastEmpty = 1;
46✔
3296

3297
      if (SFSNEXTROW_INDEXLIST != state->state) {
46✔
3298
        clearLastFileSet(state);
44✔
3299
        goto _next_fileset;
44✔
3300
      }
3301
    } else {
3302
      state->lastEmpty = 0;
10✔
3303

3304
      if (SFSNEXTROW_INDEXLIST != state->state) {
10!
3305
        state->state = SFSNEXTROW_NEXTSTTROW;
10✔
3306

3307
        *ppRow = state->pLastRow;
10✔
3308
        state->pLastRow = NULL;
10✔
3309

3310
        TAOS_RETURN(code);
10✔
3311
      }
3312
    }
3313

3314
    state->pLastIter = &state->lastIter;
2✔
3315
  }
3316

3317
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
5✔
3318
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
4!
3319

3320
    if (!state->pLastRow) {
4✔
3321
      if (state->pLastIter) {
1!
3322
        code = lastIterClose(&state->pLastIter);
×
3323
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3324
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3325
                    tstrerror(code));
3326
          TAOS_RETURN(code);
×
3327
        }
3328
      }
3329

3330
      clearLastFileSet(state);
1✔
3331
      state->state = SFSNEXTROW_FILESET;
1✔
3332
      goto _next_fileset;
1✔
3333
    } else {
3334
      *ppRow = state->pLastRow;
3✔
3335
      state->pLastRow = NULL;
3✔
3336

3337
      TAOS_RETURN(code);
3✔
3338
    }
3339
  }
3340

3341
  if (SFSNEXTROW_INDEXLIST == state->state) {
1!
3342
    SBrinBlk *pBrinBlk = NULL;
2✔
3343
  _next_brinindex:
2✔
3344
    if (--state->iBrinIndex < 0) {
2!
UNCOV
3345
      if (state->pLastRow) {
×
UNCOV
3346
        state->state = SFSNEXTROW_NEXTSTTROW;
×
3347
        *ppRow = state->pLastRow;
×
3348
        state->pLastRow = NULL;
×
UNCOV
3349
        return code;
×
3350
      }
3351

UNCOV
3352
      clearLastFileSet(state);
×
UNCOV
3353
      goto _next_fileset;
×
3354
    } else {
3355
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
2✔
3356
    }
3357

3358
    if (!state->pBrinBlock) {
2!
3359
      state->pBrinBlock = &state->brinBlock;
2✔
3360
    } else {
UNCOV
3361
      tBrinBlockClear(&state->brinBlock);
×
3362
    }
3363

3364
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
2!
3365

3366
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
2✔
3367
    state->state = SFSNEXTROW_BRINBLOCK;
2✔
3368
  }
3369

3370
  if (SFSNEXTROW_BRINBLOCK == state->state) {
1!
3371
  _next_brinrecord:
2✔
3372
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
2!
UNCOV
3373
      tBrinBlockClear(&state->brinBlock);
×
UNCOV
3374
      goto _next_brinindex;
×
3375
    }
3376

3377
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
2!
3378

3379
    SBrinRecord *pRecord = &state->brinRecord;
2✔
3380
    if (pRecord->uid != state->uid) {
2!
3381
      // TODO: goto next brin block early
UNCOV
3382
      --state->iBrinRecord;
×
UNCOV
3383
      goto _next_brinrecord;
×
3384
    }
3385

3386
    state->state = SFSNEXTROW_BRINRECORD;
2✔
3387
  }
3388

3389
  if (SFSNEXTROW_BRINRECORD == state->state) {
1!
3390
    SBrinRecord *pRecord = &state->brinRecord;
2✔
3391

3392
    if (!state->pBlockData) {
2!
3393
      state->pBlockData = &state->blockData;
2✔
3394

3395
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
2!
3396
    } else {
3397
      tBlockDataReset(state->pBlockData);
×
3398
    }
3399

3400
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
2!
3401
      --nCols;
2✔
3402
      ++aCols;
2✔
3403
    }
3404

3405
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
2!
3406
                                                      state->pTSchema, aCols, nCols),
3407
                    &lino, _err);
3408

3409
    state->nRow = state->blockData.nRow;
2✔
3410
    state->iRow = state->nRow - 1;
2✔
3411

3412
    state->state = SFSNEXTROW_BLOCKROW;
2✔
3413
  }
3414

3415
  if (SFSNEXTROW_BLOCKROW == state->state) {
1!
3416
    if (state->iRow < 0) {
2!
UNCOV
3417
      --state->iBrinRecord;
×
UNCOV
3418
      goto _next_brinrecord;
×
3419
    }
3420

3421
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
2✔
3422
    if (!state->pLastIter) {
2!
3423
      *ppRow = &state->row;
×
3424
      --state->iRow;
×
3425
      return code;
2✔
3426
    }
3427

3428
    if (!state->pLastRow) {
2!
3429
      // get next row from fslast and process with fs row, --state->Row if select fs row
3430
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
2!
3431
    }
3432

3433
    if (!state->pLastRow) {
2!
3434
      if (state->pLastIter) {
2!
3435
        code = lastIterClose(&state->pLastIter);
2✔
3436
        if (code != TSDB_CODE_SUCCESS) {
2!
3437
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3438
                    tstrerror(code));
3439
          TAOS_RETURN(code);
×
3440
        }
3441
      }
3442

3443
      *ppRow = &state->row;
2✔
3444
      --state->iRow;
2✔
3445
      return code;
2✔
3446
    }
3447

3448
    // process state->pLastRow & state->row
3449
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
3450
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
UNCOV
3451
    if (lastRowTs > rowTs) {
×
UNCOV
3452
      *ppRow = state->pLastRow;
×
3453
      state->pLastRow = NULL;
×
3454

3455
      TAOS_RETURN(code);
×
3456
    } else if (lastRowTs < rowTs) {
×
3457
      *ppRow = &state->row;
×
UNCOV
3458
      --state->iRow;
×
3459

UNCOV
3460
      TAOS_RETURN(code);
×
3461
    } else {
3462
      // TODO: merge rows and *ppRow = mergedRow
UNCOV
3463
      SRowMerger *pMerger = &state->rowMerger;
×
UNCOV
3464
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
3465
      if (code != TSDB_CODE_SUCCESS) {
×
3466
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3467
                  tstrerror(code));
3468
        TAOS_RETURN(code);
×
3469
      }
3470

3471
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
UNCOV
3472
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
3473

UNCOV
3474
      if (state->pTSRow) {
×
3475
        taosMemoryFree(state->pTSRow);
×
UNCOV
3476
        state->pTSRow = NULL;
×
3477
      }
3478

UNCOV
3479
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
3480

UNCOV
3481
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
UNCOV
3482
      *ppRow = &state->row;
×
UNCOV
3483
      --state->iRow;
×
3484

UNCOV
3485
      tsdbRowMergerClear(pMerger);
×
3486

UNCOV
3487
      TAOS_RETURN(code);
×
3488
    }
3489
  }
3490

UNCOV
3491
_err:
×
UNCOV
3492
  clearLastFileSet(state);
×
3493

UNCOV
3494
  *ppRow = NULL;
×
3495

UNCOV
3496
  if (code) {
×
3497
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3498
              tstrerror(code));
3499
  }
3500

3501
  TAOS_RETURN(code);
×
3502
}
3503

3504
typedef struct CacheNextRowIter {
3505
  SArray           *pMemDelData;
3506
  SArray           *pSkyline;
3507
  int64_t           iSkyline;
3508
  SBlockIdx         idx;
3509
  SMemNextRowIter   memState;
3510
  SMemNextRowIter   imemState;
3511
  SFSNextRowIter    fsState;
3512
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
3513
  TsdbNextRowState  input[3];
3514
  SCacheRowsReader *pr;
3515
  STsdb            *pTsdb;
3516
} CacheNextRowIter;
3517

3518
int32_t clearNextRowFromFS(void *iter) {
119✔
3519
  int32_t code = 0;
119✔
3520

3521
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
119✔
3522
  if (!state) {
119!
UNCOV
3523
    TAOS_RETURN(code);
×
3524
  }
3525

3526
  if (state->pLastIter) {
119!
UNCOV
3527
    code = lastIterClose(&state->pLastIter);
×
UNCOV
3528
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3529
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3530
      TAOS_RETURN(code);
×
3531
    }
3532
  }
3533

3534
  if (state->pBlockData) {
119✔
3535
    tBlockDataDestroy(state->pBlockData);
2✔
3536
    state->pBlockData = NULL;
2✔
3537
  }
3538

3539
  if (state->pBrinBlock) {
119✔
3540
    tBrinBlockDestroy(state->pBrinBlock);
2✔
3541
    state->pBrinBlock = NULL;
2✔
3542
  }
3543

3544
  if (state->pIndexList) {
119✔
3545
    taosArrayDestroy(state->pIndexList);
16✔
3546
    state->pIndexList = NULL;
16✔
3547
  }
3548

3549
  if (state->pTSRow) {
119!
UNCOV
3550
    taosMemoryFree(state->pTSRow);
×
UNCOV
3551
    state->pTSRow = NULL;
×
3552
  }
3553

3554
  if (state->pRowIter->pSkyline) {
119✔
3555
    taosArrayDestroy(state->pRowIter->pSkyline);
83✔
3556
    state->pRowIter->pSkyline = NULL;
83✔
3557
  }
3558

3559
  TAOS_RETURN(code);
119✔
3560
}
3561

3562
static void clearLastFileSet(SFSNextRowIter *state) {
209✔
3563
  if (state->pLastIter) {
209!
UNCOV
3564
    int code = lastIterClose(&state->pLastIter);
×
UNCOV
3565
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3566
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3567
      return;
×
3568
    }
3569
  }
3570

3571
  if (state->pBlockData) {
209!
UNCOV
3572
    tBlockDataDestroy(state->pBlockData);
×
UNCOV
3573
    state->pBlockData = NULL;
×
3574
  }
3575

3576
  if (state->pr->pFileReader) {
209✔
3577
    tsdbDataFileReaderClose(&state->pr->pFileReader);
16✔
3578
    state->pr->pFileReader = NULL;
16✔
3579

3580
    state->pr->pCurFileSet = NULL;
16✔
3581
  }
3582

3583
  if (state->pTSRow) {
209!
UNCOV
3584
    taosMemoryFree(state->pTSRow);
×
UNCOV
3585
    state->pTSRow = NULL;
×
3586
  }
3587

3588
  if (state->pRowIter->pSkyline) {
209✔
3589
    taosArrayDestroy(state->pRowIter->pSkyline);
1✔
3590
    state->pRowIter->pSkyline = NULL;
1✔
3591

3592
    void   *pe = NULL;
1✔
3593
    int32_t iter = 0;
1✔
3594
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
2✔
3595
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
1✔
3596
      taosArrayDestroy(pInfo->pTombData);
1✔
3597
      pInfo->pTombData = NULL;
1✔
3598
    }
3599
  }
3600
}
3601

3602
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
119✔
3603
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
3604
                               SCacheRowsReader *pr) {
3605
  int32_t code = 0, lino = 0;
119✔
3606

3607
  STbData *pMem = NULL;
119✔
3608
  if (pReadSnap->pMem) {
119✔
3609
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
118✔
3610
  }
3611

3612
  STbData *pIMem = NULL;
120✔
3613
  if (pReadSnap->pIMem) {
120!
UNCOV
3614
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
3615
  }
3616

3617
  pIter->pTsdb = pTsdb;
120✔
3618

3619
  pIter->pMemDelData = NULL;
120✔
3620

3621
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
120!
3622

3623
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
118✔
3624

3625
  pIter->fsState.pRowIter = pIter;
118✔
3626
  pIter->fsState.state = SFSNEXTROW_FS;
118✔
3627
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
118✔
3628
  pIter->fsState.pBlockIdxExp = &pIter->idx;
118✔
3629
  pIter->fsState.pTSchema = pTSchema;
118✔
3630
  pIter->fsState.suid = suid;
118✔
3631
  pIter->fsState.uid = uid;
118✔
3632
  pIter->fsState.lastTs = lastTs;
118✔
3633
  pIter->fsState.pr = pr;
118✔
3634

3635
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
118✔
3636
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
118✔
3637
  pIter->input[2] =
118✔
3638
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
118✔
3639

3640
  if (pMem) {
118✔
3641
    pIter->memState.pMem = pMem;
75✔
3642
    pIter->memState.state = SMEMNEXTROW_ENTER;
75✔
3643
    pIter->memState.lastTs = lastTs;
75✔
3644
    pIter->input[0].stop = false;
75✔
3645
    pIter->input[0].next = true;
75✔
3646
  }
3647

3648
  if (pIMem) {
118!
UNCOV
3649
    pIter->imemState.pMem = pIMem;
×
UNCOV
3650
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
UNCOV
3651
    pIter->imemState.lastTs = lastTs;
×
UNCOV
3652
    pIter->input[1].stop = false;
×
UNCOV
3653
    pIter->input[1].next = true;
×
3654
  }
3655

3656
  pIter->pr = pr;
118✔
3657

3658
_err:
118✔
3659
  TAOS_RETURN(code);
118✔
3660
}
3661

3662
static void nextRowIterClose(CacheNextRowIter *pIter) {
119✔
3663
  for (int i = 0; i < 3; ++i) {
476✔
3664
    if (pIter->input[i].nextRowClearFn) {
357✔
3665
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
119✔
3666
    }
3667
  }
3668

3669
  if (pIter->pSkyline) {
119!
UNCOV
3670
    taosArrayDestroy(pIter->pSkyline);
×
3671
  }
3672

3673
  if (pIter->pMemDelData) {
119!
3674
    taosArrayDestroy(pIter->pMemDelData);
119✔
3675
  }
3676
}
119✔
3677

3678
// iterate next row non deleted backward ts, version (from high to low)
3679
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
135✔
3680
                              int16_t *aCols, int nCols) {
3681
  int32_t code = 0, lino = 0;
135✔
3682

3683
  for (;;) {
1✔
3684
    for (int i = 0; i < 3; ++i) {
544✔
3685
      if (pIter->input[i].next && !pIter->input[i].stop) {
407!
3686
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
210!
3687
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3688
                        &lino, _err);
3689

3690
        if (pIter->input[i].pRow == NULL) {
211✔
3691
          pIter->input[i].stop = true;
115✔
3692
          pIter->input[i].next = false;
115✔
3693
        }
3694
      }
3695
    }
3696

3697
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
137!
3698
      *ppRow = NULL;
43✔
3699
      *pIgnoreEarlierTs =
43✔
3700
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
43!
3701

3702
      TAOS_RETURN(code);
135✔
3703
    }
3704

3705
    // select maxpoint(s) from mem, imem, fs and last
3706
    TSDBROW *max[4] = {0};
94✔
3707
    int      iMax[4] = {-1, -1, -1, -1};
94✔
3708
    int      nMax = 0;
94✔
3709
    SRowKey  maxKey = {.ts = TSKEY_MIN};
94✔
3710

3711
    for (int i = 0; i < 3; ++i) {
373✔
3712
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
279!
3713
        STsdbRowKey tsdbRowKey = {0};
97✔
3714
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
97✔
3715

3716
        // merging & deduplicating on client side
3717
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
97✔
3718
        if (c <= 0) {
97✔
3719
          if (c < 0) {
94!
3720
            nMax = 0;
94✔
3721
            maxKey = tsdbRowKey.key;
94✔
3722
          }
3723

3724
          iMax[nMax] = i;
94✔
3725
          max[nMax++] = pIter->input[i].pRow;
94✔
3726
        }
3727
        pIter->input[i].next = false;
97✔
3728
      }
3729
    }
3730

3731
    // delete detection
3732
    TSDBROW *merge[4] = {0};
94✔
3733
    int      iMerge[4] = {-1, -1, -1, -1};
94✔
3734
    int      nMerge = 0;
94✔
3735
    for (int i = 0; i < nMax; ++i) {
187✔
3736
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
93✔
3737

3738
      if (!pIter->pSkyline) {
93✔
3739
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
84✔
3740
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
84!
3741

3742
        uint64_t        uid = pIter->idx.uid;
84✔
3743
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
84✔
3744
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
84!
3745

3746
        if (pInfo->pTombData == NULL) {
84✔
3747
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
80✔
3748
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
80!
3749
        }
3750

3751
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
84!
UNCOV
3752
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3753
        }
3754

3755
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
84✔
3756
        if (delSize > 0) {
84✔
3757
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
4✔
3758
          TAOS_CHECK_GOTO(code, &lino, _err);
4!
3759
        }
3760
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
84✔
3761
      }
3762

3763
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
93✔
3764
      if (!deleted) {
93✔
3765
        iMerge[nMerge] = iMax[i];
92✔
3766
        merge[nMerge++] = max[i];
92✔
3767
      }
3768

3769
      pIter->input[iMax[i]].next = deleted;
93✔
3770
    }
3771

3772
    if (nMerge > 0) {
94✔
3773
      pIter->input[iMerge[0]].next = true;
93✔
3774

3775
      *ppRow = merge[0];
93✔
3776

3777
      TAOS_RETURN(code);
93✔
3778
    }
3779
  }
3780

UNCOV
3781
_err:
×
UNCOV
3782
  if (code) {
×
UNCOV
3783
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3784
  }
3785

UNCOV
3786
  TAOS_RETURN(code);
×
3787
}
3788

3789
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
119✔
3790
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
119✔
3791
  if (NULL == pColArray) {
119!
UNCOV
3792
    TAOS_RETURN(terrno);
×
3793
  }
3794

3795
  for (int32_t i = 0; i < nCols; ++i) {
467✔
3796
    int16_t  slotId = slotIds[i];
346✔
3797
    SLastCol col = {.rowKey.ts = 0,
346✔
3798
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
346✔
3799
    if (!taosArrayPush(pColArray, &col)) {
348!
3800
      TAOS_RETURN(terrno);
×
3801
    }
3802
  }
3803
  *ppColArray = pColArray;
121✔
3804

3805
  TAOS_RETURN(TSDB_CODE_SUCCESS);
121✔
3806
}
3807

3808
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
72✔
3809
                            int nCols, int16_t *slotIds) {
3810
  int32_t   code = 0, lino = 0;
72✔
3811
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
72✔
3812
  int16_t   nLastCol = nCols;
72✔
3813
  int16_t   noneCol = 0;
72✔
3814
  bool      setNoneCol = false;
72✔
3815
  bool      hasRow = false;
72✔
3816
  bool      ignoreEarlierTs = false;
72✔
3817
  SArray   *pColArray = NULL;
72✔
3818
  SColVal  *pColVal = &(SColVal){0};
72✔
3819

3820
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
72!
3821

3822
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
72✔
3823
  if (NULL == aColArray) {
72!
UNCOV
3824
    taosArrayDestroy(pColArray);
×
3825

UNCOV
3826
    TAOS_RETURN(terrno);
×
3827
  }
3828

3829
  for (int i = 0; i < nCols; ++i) {
264✔
3830
    if (!taosArrayPush(aColArray, &aCols[i])) {
384!
UNCOV
3831
      taosArrayDestroy(pColArray);
×
3832

UNCOV
3833
      TAOS_RETURN(terrno);
×
3834
    }
3835
  }
3836

3837
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
72✔
3838

3839
  // inverse iterator
3840
  CacheNextRowIter iter = {0};
72✔
3841
  code =
3842
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
72✔
3843
  TAOS_CHECK_GOTO(code, &lino, _err);
71!
3844

3845
  do {
3846
    TSDBROW *pRow = NULL;
87✔
3847
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
87✔
3848

3849
    if (!pRow) {
88✔
3850
      break;
66✔
3851
    }
3852

3853
    hasRow = true;
69✔
3854

3855
    int32_t sversion = TSDBROW_SVERSION(pRow);
69✔
3856
    if (sversion != -1) {
69✔
3857
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
62!
3858

3859
      pTSchema = pr->pCurrSchema;
62✔
3860
    }
3861
    // int16_t nCol = pTSchema->numOfCols;
3862

3863
    STsdbRowKey rowKey = {0};
69✔
3864
    tsdbRowGetKey(pRow, &rowKey);
69✔
3865

3866
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
69✔
3867
      lastRowKey = rowKey;
60✔
3868

3869
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
222✔
3870
        if (iCol >= nLastCol) {
162!
UNCOV
3871
          break;
×
3872
        }
3873
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
162✔
3874
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
162!
UNCOV
3875
          if (!setNoneCol) {
×
UNCOV
3876
            noneCol = iCol;
×
UNCOV
3877
            setNoneCol = true;
×
3878
          }
3879
          continue;
100✔
3880
        }
3881
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
162✔
3882
          continue;
40✔
3883
        }
3884
        if (slotIds[iCol] == 0) {
122✔
3885
          STColumn *pTColumn = &pTSchema->columns[0];
60✔
3886
          SValue    val = {.type = pTColumn->type};
60✔
3887
          VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
60✔
3888
          *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
60✔
3889

3890
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
60✔
3891
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
60!
3892

3893
          taosArraySet(pColArray, 0, &colTmp);
60✔
3894
          continue;
60✔
3895
        }
3896
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
62✔
3897

3898
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
62✔
3899
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
62!
3900

3901
        if (!COL_VAL_IS_VALUE(pColVal)) {
62✔
3902
          if (!setNoneCol) {
22✔
3903
            noneCol = iCol;
13✔
3904
            setNoneCol = true;
13✔
3905
          }
3906
        } else {
3907
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
40✔
3908
          if (aColIndex >= 0) {
40!
3909
            taosArrayRemove(aColArray, aColIndex);
40✔
3910
          }
3911
        }
3912
      }
3913
      if (!setNoneCol) {
60✔
3914
        // done, goto return pColArray
3915
        break;
47✔
3916
      } else {
3917
        continue;
13✔
3918
      }
3919
    }
3920

3921
    // merge into pColArray
3922
    setNoneCol = false;
9✔
3923
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
30✔
3924
      if (iCol >= nLastCol) {
21!
UNCOV
3925
        break;
×
3926
      }
3927
      // high version's column value
3928
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
21!
UNCOV
3929
        continue;
×
3930
      }
3931

3932
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
21✔
3933
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
21!
UNCOV
3934
        continue;
×
3935
      }
3936
      SColVal *tColVal = &lastColVal->colVal;
21✔
3937
      if (COL_VAL_IS_VALUE(tColVal)) continue;
21✔
3938

3939
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
15✔
3940
      if (COL_VAL_IS_VALUE(pColVal)) {
15✔
3941
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
12✔
3942
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
12!
3943

3944
        tsdbCacheFreeSLastColItem(lastColVal);
12✔
3945
        taosArraySet(pColArray, iCol, &lastCol);
12✔
3946
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
12✔
3947
        if (aColIndex >= 0) {
12!
3948
          taosArrayRemove(aColArray, aColIndex);
12✔
3949
        }
3950
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
3!
3951
        noneCol = iCol;
3✔
3952
        setNoneCol = true;
3✔
3953
      }
3954
    }
3955
  } while (setNoneCol);
22✔
3956

3957
  if (!hasRow) {
72✔
3958
    if (ignoreEarlierTs) {
12!
UNCOV
3959
      taosArrayDestroy(pColArray);
×
UNCOV
3960
      pColArray = NULL;
×
3961
    } else {
3962
      taosArrayClear(pColArray);
12✔
3963
    }
3964
  }
3965
  *ppLastArray = pColArray;
72✔
3966

3967
  nextRowIterClose(&iter);
72✔
3968
  taosArrayDestroy(aColArray);
72✔
3969

3970
  TAOS_RETURN(code);
72✔
3971

UNCOV
3972
_err:
×
UNCOV
3973
  nextRowIterClose(&iter);
×
3974
  // taosMemoryFreeClear(pTSchema);
3975
  *ppLastArray = NULL;
×
UNCOV
3976
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
3977
  taosArrayDestroy(aColArray);
×
3978

UNCOV
3979
  if (code) {
×
UNCOV
3980
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3981
              tstrerror(code));
3982
  }
3983

3984
  TAOS_RETURN(code);
×
3985
}
3986

3987
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
47✔
3988
                               int nCols, int16_t *slotIds) {
3989
  int32_t   code = 0, lino = 0;
47✔
3990
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
47✔
3991
  int16_t   nLastCol = nCols;
47✔
3992
  int16_t   noneCol = 0;
47✔
3993
  bool      setNoneCol = false;
47✔
3994
  bool      hasRow = false;
47✔
3995
  bool      ignoreEarlierTs = false;
47✔
3996
  SArray   *pColArray = NULL;
47✔
3997
  SColVal  *pColVal = &(SColVal){0};
47✔
3998

3999
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
47!
4000

4001
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
47✔
4002
  if (NULL == aColArray) {
47!
UNCOV
4003
    taosArrayDestroy(pColArray);
×
4004

UNCOV
4005
    TAOS_RETURN(terrno);
×
4006
  }
4007

4008
  for (int i = 0; i < nCols; ++i) {
203✔
4009
    if (!taosArrayPush(aColArray, &aCols[i])) {
312!
UNCOV
4010
      taosArrayDestroy(pColArray);
×
4011

UNCOV
4012
      TAOS_RETURN(terrno);
×
4013
    }
4014
  }
4015

4016
  // inverse iterator
4017
  CacheNextRowIter iter = {0};
47✔
4018
  code =
4019
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
47✔
4020
  TAOS_CHECK_GOTO(code, &lino, _err);
47!
4021

4022
  do {
4023
    TSDBROW *pRow = NULL;
47✔
4024
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
47✔
4025

4026
    if (!pRow) {
47✔
4027
      break;
24✔
4028
    }
4029

4030
    hasRow = true;
23✔
4031

4032
    int32_t sversion = TSDBROW_SVERSION(pRow);
23✔
4033
    if (sversion != -1) {
23✔
4034
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
19!
4035

4036
      pTSchema = pr->pCurrSchema;
19✔
4037
    }
4038
    // int16_t nCol = pTSchema->numOfCols;
4039

4040
    STsdbRowKey rowKey = {0};
23✔
4041
    tsdbRowGetKey(pRow, &rowKey);
23✔
4042

4043
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
130✔
4044
      if (iCol >= nLastCol) {
107!
UNCOV
4045
        break;
×
4046
      }
4047
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
107✔
4048
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
107!
4049
        continue;
23✔
4050
      }
4051
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
107!
4052
        continue;
×
4053
      }
4054
      if (slotIds[iCol] == 0) {
107✔
4055
        STColumn *pTColumn = &pTSchema->columns[0];
23✔
4056
        SValue    val = {.type = pTColumn->type};
23✔
4057
        VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
23✔
4058
        *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
23✔
4059

4060
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
23✔
4061
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
23!
4062

4063
        taosArraySet(pColArray, 0, &colTmp);
23✔
4064
        continue;
23✔
4065
      }
4066
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
84✔
4067

4068
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
84✔
4069
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
84!
4070

4071
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
84✔
4072
      if (aColIndex >= 0) {
84!
4073
        taosArrayRemove(aColArray, aColIndex);
84✔
4074
      }
4075
    }
4076

4077
    break;
23✔
4078
  } while (1);
4079

4080
  if (!hasRow) {
47✔
4081
    if (ignoreEarlierTs) {
24!
UNCOV
4082
      taosArrayDestroy(pColArray);
×
UNCOV
4083
      pColArray = NULL;
×
4084
    } else {
4085
      taosArrayClear(pColArray);
24✔
4086
    }
4087
  }
4088
  *ppLastArray = pColArray;
47✔
4089

4090
  nextRowIterClose(&iter);
47✔
4091
  taosArrayDestroy(aColArray);
47✔
4092

4093
  TAOS_RETURN(code);
47✔
4094

UNCOV
4095
_err:
×
UNCOV
4096
  nextRowIterClose(&iter);
×
4097

UNCOV
4098
  *ppLastArray = NULL;
×
UNCOV
4099
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
UNCOV
4100
  taosArrayDestroy(aColArray);
×
4101

UNCOV
4102
  if (code) {
×
UNCOV
4103
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4104
              tstrerror(code));
4105
  }
4106

UNCOV
4107
  TAOS_RETURN(code);
×
4108
}
4109

4110
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
×
4111

4112
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
224✔
4113
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
224✔
4114
}
224✔
4115

4116
#ifdef BUILD_NO_CALL
4117
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
4118
#endif
4119

4120
size_t tsdbCacheGetUsage(SVnode *pVnode) {
1,166,108✔
4121
  size_t usage = 0;
1,166,108✔
4122
  if (pVnode->pTsdb != NULL) {
1,166,108!
4123
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
1,166,108✔
4124
  }
4125

4126
  return usage;
1,166,108✔
4127
}
4128

4129
int32_t tsdbCacheGetElems(SVnode *pVnode) {
1,166,108✔
4130
  int32_t elems = 0;
1,166,108✔
4131
  if (pVnode->pTsdb != NULL) {
1,166,108!
4132
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
1,166,108✔
4133
  }
4134

4135
  return elems;
1,166,108✔
4136
}
4137

4138
#ifdef USE_S3
4139
// block cache
4140
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
×
4141
  struct {
4142
    int32_t fid;
4143
    int64_t commitID;
4144
    int64_t blkno;
4145
  } bKey = {0};
×
4146

4147
  bKey.fid = fid;
×
4148
  bKey.commitID = commitID;
×
UNCOV
4149
  bKey.blkno = blkno;
×
4150

4151
  *len = sizeof(bKey);
×
4152
  memcpy(key, &bKey, *len);
×
4153
}
×
4154

UNCOV
4155
static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
×
4156
  int32_t code = 0;
×
4157

4158
  int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
×
4159

UNCOV
4160
  TAOS_CHECK_RETURN(tcsGetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock));
×
4161

4162
  tsdbTrace("block:%p load from s3", *ppBlock);
×
4163

4164
_exit:
×
4165
  return code;
×
4166
}
4167

UNCOV
4168
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
×
4169
  (void)ud;
UNCOV
4170
  uint8_t *pBlock = (uint8_t *)value;
×
4171

4172
  taosMemoryFree(pBlock);
×
4173
}
×
4174

4175
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
UNCOV
4176
  int32_t code = 0;
×
UNCOV
4177
  char    key[128] = {0};
×
UNCOV
4178
  int     keyLen = 0;
×
4179

UNCOV
4180
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
4181
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
UNCOV
4182
  if (!h) {
×
UNCOV
4183
    STsdb *pTsdb = pFD->pTsdb;
×
4184
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
4185

4186
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
UNCOV
4187
    if (!h) {
×
UNCOV
4188
      uint8_t *pBlock = NULL;
×
4189
      code = tsdbCacheLoadBlockS3(pFD, &pBlock);
×
4190
      //  if table's empty or error, return code of -1
4191
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
4192
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4193

4194
        *handle = NULL;
×
4195
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
UNCOV
4196
          code = TSDB_CODE_OUT_OF_MEMORY;
×
4197
        }
4198

UNCOV
4199
        TAOS_RETURN(code);
×
4200
      }
4201

4202
      size_t              charge = tsS3BlockSize * pFD->szPage;
×
4203
      _taos_lru_deleter_t deleter = deleteBCache;
×
4204
      LRUStatus           status =
4205
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
4206
      if (status != TAOS_LRU_STATUS_OK) {
4207
        // code = -1;
4208
      }
4209
    }
4210

4211
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4212
  }
4213

UNCOV
4214
  *handle = h;
×
4215

UNCOV
4216
  TAOS_RETURN(code);
×
4217
}
4218

UNCOV
4219
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
×
UNCOV
4220
  int32_t code = 0;
×
UNCOV
4221
  char    key[128] = {0};
×
UNCOV
4222
  int     keyLen = 0;
×
4223

4224
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
UNCOV
4225
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
×
4226

UNCOV
4227
  return code;
×
4228
}
4229

UNCOV
4230
void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
×
UNCOV
4231
  char       key[128] = {0};
×
UNCOV
4232
  int        keyLen = 0;
×
UNCOV
4233
  LRUHandle *handle = NULL;
×
4234

UNCOV
4235
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
UNCOV
4236
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
×
UNCOV
4237
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
×
UNCOV
4238
  if (!handle) {
×
UNCOV
4239
    size_t              charge = pFD->szPage;
×
UNCOV
4240
    _taos_lru_deleter_t deleter = deleteBCache;
×
UNCOV
4241
    uint8_t            *pPg = taosMemoryMalloc(charge);
×
UNCOV
4242
    if (!pPg) {
×
UNCOV
4243
      return;  // ignore error with s3 cache and leave error untouched
×
4244
    }
UNCOV
4245
    memcpy(pPg, pPage, charge);
×
4246

4247
    LRUStatus status =
UNCOV
4248
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
×
4249
    if (status != TAOS_LRU_STATUS_OK) {
4250
      // ignore cache updating if not ok
4251
      // code = TSDB_CODE_OUT_OF_MEMORY;
4252
    }
4253
  }
UNCOV
4254
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4255

UNCOV
4256
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
4257
}
4258
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc