• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.39
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tcs.h"
17
#include "tsdb.h"
18
#include "tsdbDataFileRW.h"
19
#include "tsdbIter.h"
20
#include "tsdbReadUtil.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

UNCOV
25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
×
UNCOV
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
×
UNCOV
27
    tsdbTrace(" release lru cache failed");
×
28
  }
UNCOV
29
}
×
30

31
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
20✔
32
  int32_t code = 0, lino = 0;
20✔
33
#ifdef USE_S3
34
  int32_t    szPage = pTsdb->pVnode->config.tsdbPageSize;
20✔
35
  int64_t    szBlock = tsS3BlockSize <= 1024 ? 1024 : tsS3BlockSize;
20✔
36
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szBlock * szPage, 0, .5);
20✔
37
  if (pCache == NULL) {
20!
UNCOV
38
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
39
  }
40

41
  taosLRUCacheSetStrictCapacity(pCache, false);
20✔
42

43
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
20✔
44

45
  pTsdb->bCache = pCache;
20✔
46

47
_err:
20✔
48
  if (code) {
20!
UNCOV
49
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
50
              tstrerror(code));
51
  }
52
#endif
53
  TAOS_RETURN(code);
20✔
54
}
55

56
static void tsdbCloseBCache(STsdb *pTsdb) {
20✔
57
#ifdef USE_S3
58
  SLRUCache *pCache = pTsdb->bCache;
20✔
59
  if (pCache) {
20!
60
    int32_t elems = taosLRUCacheGetElems(pCache);
20✔
61
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
20!
62
    taosLRUCacheEraseUnrefEntries(pCache);
20✔
63
    elems = taosLRUCacheGetElems(pCache);
20✔
64
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
20!
65

66
    taosLRUCacheCleanup(pCache);
20✔
67

68
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
20✔
69
  }
70
#endif
71
}
20✔
72

73
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
20✔
74
  int32_t code = 0, lino = 0;
20✔
75
#ifdef USE_S3
76
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
20✔
77

78
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3PageCacheSize * szPage, 0, .5);
20✔
79
  if (pCache == NULL) {
20!
UNCOV
80
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
81
  }
82

83
  taosLRUCacheSetStrictCapacity(pCache, false);
20✔
84

85
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
20✔
86

87
  pTsdb->pgCache = pCache;
20✔
88

89
_err:
20✔
90
  if (code) {
20!
UNCOV
91
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
92
  }
93
#endif
94
  TAOS_RETURN(code);
20✔
95
}
96

97
static void tsdbClosePgCache(STsdb *pTsdb) {
20✔
98
#ifdef USE_S3
99
  SLRUCache *pCache = pTsdb->pgCache;
20✔
100
  if (pCache) {
20!
101
    int32_t elems = taosLRUCacheGetElems(pCache);
20✔
102
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
20!
103
    taosLRUCacheEraseUnrefEntries(pCache);
20✔
104
    elems = taosLRUCacheGetElems(pCache);
20✔
105
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
20!
106

107
    taosLRUCacheCleanup(pCache);
20✔
108

109
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
20✔
110
  }
111
#endif
112
}
20✔
113

114
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
115

116
enum {
117
  LFLAG_LAST_ROW = 0,
118
  LFLAG_LAST = 1,
119
};
120

121
typedef struct {
122
  tb_uid_t uid;
123
  int16_t  cid;
124
  int8_t   lflag;
125
} SLastKey;
126

127
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
128
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
129

130
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
20✔
131
  SVnode *pVnode = pTsdb->pVnode;
20✔
132
  vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
20✔
133

134
  int32_t offset = strlen(path);
20✔
135
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP);
20✔
136
}
20✔
137

138
static const char *myCmpName(void *state) {
100✔
139
  (void)state;
140
  return "myCmp";
100✔
141
}
142

143
static void myCmpDestroy(void *state) { (void)state; }
20✔
144

UNCOV
145
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
×
146
  (void)state;
147
  (void)alen;
148
  (void)blen;
UNCOV
149
  SLastKey *lhs = (SLastKey *)a;
×
UNCOV
150
  SLastKey *rhs = (SLastKey *)b;
×
151

UNCOV
152
  if (lhs->uid < rhs->uid) {
×
UNCOV
153
    return -1;
×
UNCOV
154
  } else if (lhs->uid > rhs->uid) {
×
UNCOV
155
    return 1;
×
156
  }
157

UNCOV
158
  if (lhs->cid < rhs->cid) {
×
UNCOV
159
    return -1;
×
UNCOV
160
  } else if (lhs->cid > rhs->cid) {
×
UNCOV
161
    return 1;
×
162
  }
163

UNCOV
164
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
×
UNCOV
165
    return -1;
×
UNCOV
166
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
×
UNCOV
167
    return 1;
×
168
  }
169

UNCOV
170
  return 0;
×
171
}
172

173
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
20✔
174
  int32_t code = 0, lino = 0;
20✔
175
#ifdef USE_ROCKSDB
176
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
20✔
177
  if (NULL == cmp) {
20!
UNCOV
178
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
179
  }
180

181
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
20✔
182
  pTsdb->rCache.tableoptions = tableoptions;
20✔
183

184
  rocksdb_options_t *options = rocksdb_options_create();
20✔
185
  if (NULL == options) {
20!
UNCOV
186
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
187
  }
188

189
  rocksdb_options_set_create_if_missing(options, 1);
20✔
190
  rocksdb_options_set_comparator(options, cmp);
20✔
191
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
20✔
192
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
20✔
193
  // rocksdb_options_set_inplace_update_support(options, 1);
194
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
195

196
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
20✔
197
  if (NULL == writeoptions) {
20!
198
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
199
  }
200
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
20✔
201

202
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
20✔
203
  if (NULL == readoptions) {
20!
UNCOV
204
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
205
  }
206

207
  char *err = NULL;
20✔
208
  char  cachePath[TSDB_FILENAME_LEN] = {0};
20✔
209
  tsdbGetRocksPath(pTsdb, cachePath);
20✔
210

211
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
19✔
212
  if (NULL == db) {
20!
UNCOV
213
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
UNCOV
214
    rocksdb_free(err);
×
215

UNCOV
216
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
217
  }
218

219
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
20✔
220
  if (NULL == flushoptions) {
20!
UNCOV
221
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
222
  }
223

224
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
20✔
225

226
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
20!
227

228
  pTsdb->rCache.writebatch = writebatch;
20✔
229
  pTsdb->rCache.my_comparator = cmp;
20✔
230
  pTsdb->rCache.options = options;
20✔
231
  pTsdb->rCache.writeoptions = writeoptions;
20✔
232
  pTsdb->rCache.readoptions = readoptions;
20✔
233
  pTsdb->rCache.flushoptions = flushoptions;
20✔
234
  pTsdb->rCache.db = db;
20✔
235
  pTsdb->rCache.sver = -1;
20✔
236
  pTsdb->rCache.suid = -1;
20✔
237
  pTsdb->rCache.uid = -1;
20✔
238
  pTsdb->rCache.pTSchema = NULL;
20✔
239
  pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
20✔
240
  if (!pTsdb->rCache.ctxArray) {
20!
241
    TAOS_CHECK_GOTO(terrno, &lino, _err7);
×
242
  }
243

244
  TAOS_RETURN(code);
20✔
245

246
_err7:
×
247
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
×
248
_err6:
×
249
  rocksdb_writebatch_destroy(writebatch);
×
250
_err5:
×
251
  rocksdb_close(pTsdb->rCache.db);
×
252
_err4:
×
253
  rocksdb_readoptions_destroy(readoptions);
×
254
_err3:
×
UNCOV
255
  rocksdb_writeoptions_destroy(writeoptions);
×
256
_err2:
×
UNCOV
257
  rocksdb_options_destroy(options);
×
UNCOV
258
  rocksdb_block_based_options_destroy(tableoptions);
×
UNCOV
259
_err:
×
UNCOV
260
  rocksdb_comparator_destroy(cmp);
×
261
#endif
UNCOV
262
  TAOS_RETURN(code);
×
263
}
264

265
static void tsdbCloseRocksCache(STsdb *pTsdb) {
20✔
266
#ifdef USE_ROCKSDB
267
  rocksdb_close(pTsdb->rCache.db);
20✔
268
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
20✔
269
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
20✔
270
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
20✔
271
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
20✔
272
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
20✔
273
  rocksdb_options_destroy(pTsdb->rCache.options);
20✔
274
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
20✔
275
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
20✔
276
  taosMemoryFree(pTsdb->rCache.pTSchema);
20!
277
  taosArrayDestroy(pTsdb->rCache.ctxArray);
20✔
278
#endif
279
}
20✔
280

281
static void rocksMayWrite(STsdb *pTsdb, bool force) {
8✔
282
#ifdef USE_ROCKSDB
283
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
8✔
284

285
  int count = rocksdb_writebatch_count(wb);
8✔
286
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
8!
UNCOV
287
    char *err = NULL;
×
288

UNCOV
289
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
×
UNCOV
290
    if (NULL != err) {
×
UNCOV
291
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
292
                err);
UNCOV
293
      rocksdb_free(err);
×
294
    }
295

UNCOV
296
    rocksdb_writebatch_clear(wb);
×
297
  }
298
#endif
299
}
8✔
300

301
typedef struct {
302
  TSKEY  ts;
303
  int8_t dirty;
304
  struct {
305
    int16_t cid;
306
    int8_t  type;
307
    int8_t  flag;
308
    union {
309
      int64_t val;
310
      struct {
311
        uint32_t nData;
312
        uint8_t *pData;
313
      };
314
    } value;
315
  } colVal;
316
} SLastColV0;
317

UNCOV
318
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
×
UNCOV
319
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
×
320

UNCOV
321
  pLastCol->rowKey.ts = pLastColV0->ts;
×
UNCOV
322
  pLastCol->rowKey.numOfPKs = 0;
×
UNCOV
323
  pLastCol->dirty = pLastColV0->dirty;
×
UNCOV
324
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
×
UNCOV
325
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
×
UNCOV
326
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
×
327

UNCOV
328
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
×
329

UNCOV
330
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
×
UNCOV
331
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
×
UNCOV
332
    pLastCol->colVal.value.pData = NULL;
×
UNCOV
333
    if (pLastCol->colVal.value.nData > 0) {
×
UNCOV
334
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
×
335
    }
UNCOV
336
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
×
337
  } else {
UNCOV
338
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
×
UNCOV
339
    return sizeof(SLastColV0);
×
340
  }
341
}
342

UNCOV
343
static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
×
UNCOV
344
  if (!value) {
×
UNCOV
345
    return TSDB_CODE_INVALID_PARA;
×
346
  }
347

348
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
×
UNCOV
349
  if (NULL == pLastCol) {
×
350
    return terrno;
×
351
  }
352

UNCOV
353
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
×
UNCOV
354
  if (offset == size) {
×
355
    // version 0
UNCOV
356
    *ppLastCol = pLastCol;
×
357

UNCOV
358
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
UNCOV
359
  } else if (offset > size) {
×
UNCOV
360
    taosMemoryFreeClear(pLastCol);
×
361

UNCOV
362
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
363
  }
364

365
  // version
366
  int8_t version = *(int8_t *)(value + offset);
×
UNCOV
367
  offset += sizeof(int8_t);
×
368

369
  // numOfPKs
370
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
×
371
  offset += sizeof(uint8_t);
×
372

373
  // pks
UNCOV
374
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
UNCOV
375
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
×
UNCOV
376
    offset += sizeof(SValue);
×
377

UNCOV
378
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
UNCOV
379
      pLastCol->rowKey.pks[i].pData = NULL;
×
UNCOV
380
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
UNCOV
381
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
382
        offset += pLastCol->rowKey.pks[i].nData;
×
383
      }
384
    }
385
  }
386

UNCOV
387
  if (version >= LAST_COL_VERSION_2) {
×
UNCOV
388
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
×
389
  }
390

UNCOV
391
  if (offset > size) {
×
UNCOV
392
    taosMemoryFreeClear(pLastCol);
×
393

UNCOV
394
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
395
  }
396

UNCOV
397
  *ppLastCol = pLastCol;
×
398

UNCOV
399
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
400
}
401

402
/*
403
typedef struct {
404
  SLastColV0 lastColV0;
405
  char       colData[];
406
  int8_t     version;
407
  uint8_t    numOfPKs;
408
  SValue     pks[0];
409
  char       pk0Data[];
410
  SValue     pks[1];
411
  char       pk1Data[];
412
  ...
413
} SLastColDisk;
414
*/
UNCOV
415
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
×
UNCOV
416
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
×
417

UNCOV
418
  pLastColV0->ts = pLastCol->rowKey.ts;
×
UNCOV
419
  pLastColV0->dirty = pLastCol->dirty;
×
UNCOV
420
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
×
UNCOV
421
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
×
UNCOV
422
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
×
UNCOV
423
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
×
UNCOV
424
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
×
UNCOV
425
    if (pLastCol->colVal.value.nData > 0) {
×
UNCOV
426
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
×
427
    }
UNCOV
428
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
×
429
  } else {
UNCOV
430
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
×
UNCOV
431
    return sizeof(SLastColV0);
×
432
  }
433

434
  return 0;
435
}
436

UNCOV
437
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
×
UNCOV
438
  *size = sizeof(SLastColV0);
×
UNCOV
439
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
×
UNCOV
440
    *size += pLastCol->colVal.value.nData;
×
441
  }
UNCOV
442
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
×
443

UNCOV
444
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
UNCOV
445
    *size += sizeof(SValue);
×
UNCOV
446
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
UNCOV
447
      *size += pLastCol->rowKey.pks[i].nData;
×
448
    }
449
  }
450

UNCOV
451
  *value = taosMemoryMalloc(*size);
×
UNCOV
452
  if (NULL == *value) {
×
UNCOV
453
    TAOS_RETURN(terrno);
×
454
  }
455

UNCOV
456
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
×
457

458
  // version
UNCOV
459
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
×
UNCOV
460
  offset++;
×
461

462
  // numOfPKs
UNCOV
463
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
×
UNCOV
464
  offset++;
×
465

466
  // pks
UNCOV
467
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
UNCOV
468
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
×
UNCOV
469
    offset += sizeof(SValue);
×
UNCOV
470
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
UNCOV
471
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
UNCOV
472
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
×
473
      }
UNCOV
474
      offset += pLastCol->rowKey.pks[i].nData;
×
475
    }
476
  }
477

UNCOV
478
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
×
479

UNCOV
480
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
481
}
482

483
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
484

UNCOV
485
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
×
UNCOV
486
  SLastCol *pLastCol = (SLastCol *)value;
×
487

UNCOV
488
  if (pLastCol->dirty) {
×
UNCOV
489
    STsdb *pTsdb = (STsdb *)ud;
×
490

UNCOV
491
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
×
UNCOV
492
    if (code) {
×
UNCOV
493
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
494
      return code;
×
495
    }
496

UNCOV
497
    pLastCol->dirty = 0;
×
498

UNCOV
499
    rocksMayWrite(pTsdb, false);
×
500
  }
501

UNCOV
502
  return 0;
×
503
}
504

UNCOV
505
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
×
UNCOV
506
  bool deleted = false;
×
UNCOV
507
  while (*iSkyline > 0) {
×
508
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
×
509
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
×
510

511
    if (key->ts > pItemBack->ts) {
×
UNCOV
512
      return false;
×
UNCOV
513
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
×
UNCOV
514
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
×
515
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
516
        return true;
×
517
      } else {
UNCOV
518
        if (*iSkyline > 1) {
×
UNCOV
519
          --*iSkyline;
×
520
        } else {
UNCOV
521
          return false;
×
522
        }
523
      }
524
    } else {
UNCOV
525
      if (*iSkyline > 1) {
×
UNCOV
526
        --*iSkyline;
×
527
      } else {
UNCOV
528
        return false;
×
529
      }
530
    }
531
  }
532

UNCOV
533
  return deleted;
×
534
}
535

536
// Get next non-deleted row from imem
UNCOV
537
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
×
UNCOV
538
  int32_t code = 0;
×
539

UNCOV
540
  if (tsdbTbDataIterNext(pTbIter)) {
×
UNCOV
541
    TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
×
UNCOV
542
    TSDBKEY  rowKey = TSDBROW_KEY(pMemRow);
×
UNCOV
543
    bool     deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
×
UNCOV
544
    if (!deleted) {
×
UNCOV
545
      return pMemRow;
×
546
    }
547
  }
548

UNCOV
549
  return NULL;
×
550
}
551

552
// Get first non-deleted row from imem
UNCOV
553
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
×
554
                                    int64_t *piSkyline) {
UNCOV
555
  int32_t code = 0;
×
556

UNCOV
557
  tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
×
UNCOV
558
  TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
×
UNCOV
559
  if (pMemRow) {
×
560
    // if non deleted, return the found row.
UNCOV
561
    TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
×
UNCOV
562
    bool    deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
×
UNCOV
563
    if (!deleted) {
×
UNCOV
564
      return pMemRow;
×
565
    }
566
  } else {
UNCOV
567
    return NULL;
×
568
  }
569

570
  // continue to find the non-deleted first row from imem, using get next row
UNCOV
571
  return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
×
572
}
573

574
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
×
UNCOV
575
  SRocksCache *pRCache = &pTsdb->rCache;
×
UNCOV
576
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
×
577

UNCOV
578
  if (suid > 0 && suid == pRCache->suid) {
×
UNCOV
579
    pRCache->sver = -1;
×
UNCOV
580
    pRCache->suid = -1;
×
581
  }
UNCOV
582
  if (suid == 0 && uid == pRCache->uid) {
×
UNCOV
583
    pRCache->sver = -1;
×
UNCOV
584
    pRCache->uid = -1;
×
585
  }
586
}
587

UNCOV
588
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
×
UNCOV
589
  SRocksCache *pRCache = &pTsdb->rCache;
×
UNCOV
590
  if (pRCache->pTSchema && sver == pRCache->sver) {
×
UNCOV
591
    if (suid > 0 && suid == pRCache->suid) {
×
UNCOV
592
      return 0;
×
593
    }
UNCOV
594
    if (suid == 0 && uid == pRCache->uid) {
×
UNCOV
595
      return 0;
×
596
    }
597
  }
598

UNCOV
599
  pRCache->suid = suid;
×
UNCOV
600
  pRCache->uid = uid;
×
UNCOV
601
  pRCache->sver = sver;
×
UNCOV
602
  tDestroyTSchema(pRCache->pTSchema);
×
UNCOV
603
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema);
×
604
}
605

606
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
607

UNCOV
608
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
×
UNCOV
609
  int32_t     code = 0;
×
UNCOV
610
  int32_t     lino = 0;
×
UNCOV
611
  STsdb      *pTsdb = imem->pTsdb;
×
UNCOV
612
  SArray     *pMemDelData = NULL;
×
UNCOV
613
  SArray     *pSkyline = NULL;
×
UNCOV
614
  int64_t     iSkyline = 0;
×
UNCOV
615
  STbDataIter tbIter = {0};
×
UNCOV
616
  TSDBROW    *pMemRow = NULL;
×
UNCOV
617
  STSchema   *pTSchema = NULL;
×
UNCOV
618
  SSHashObj  *iColHash = NULL;
×
619
  int32_t     sver;
620
  int32_t     nCol;
UNCOV
621
  SArray     *ctxArray = pTsdb->rCache.ctxArray;
×
UNCOV
622
  STsdbRowKey tsdbRowKey = {0};
×
UNCOV
623
  STSDBRowIter iter = {0};
×
624

625
  STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
×
626

627
  // load imem tomb data and build skyline
UNCOV
628
  TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
×
629

630
  // tsdbBuildDeleteSkyline
UNCOV
631
  size_t delSize = TARRAY_SIZE(pMemDelData);
×
UNCOV
632
  if (delSize > 0) {
×
UNCOV
633
    pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
×
UNCOV
634
    if (!pSkyline) {
×
UNCOV
635
      TAOS_CHECK_EXIT(terrno);
×
636
    }
637

UNCOV
638
    TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline));
×
UNCOV
639
    iSkyline = taosArrayGetSize(pSkyline) - 1;
×
640
  }
641

UNCOV
642
  pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
×
UNCOV
643
  if (!pMemRow) {
×
UNCOV
644
    goto _exit;
×
645
  }
646

647
  // iter first row to last_row/last col values to ctxArray, and mark last null col ids
UNCOV
648
  sver = TSDBROW_SVERSION(pMemRow);
×
UNCOV
649
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
UNCOV
650
  pTSchema = pTsdb->rCache.pTSchema;
×
651
  nCol = pTSchema->numOfCols;
×
652

UNCOV
653
  tsdbRowGetKey(pMemRow, &tsdbRowKey);
×
654

UNCOV
655
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
×
656

657
  int32_t iCol = 0;
×
UNCOV
658
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
UNCOV
659
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
UNCOV
660
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
661
      TAOS_CHECK_EXIT(terrno);
×
662
    }
663

UNCOV
664
    if (COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
665
      updateCtx.lflag = LFLAG_LAST;
×
UNCOV
666
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
667
        TAOS_CHECK_EXIT(terrno);
×
668
      }
669
    } else {
UNCOV
670
      if (!iColHash) {
×
UNCOV
671
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
×
UNCOV
672
        if (iColHash == NULL) {
×
UNCOV
673
          TAOS_CHECK_EXIT(terrno);
×
674
        }
675
      }
676

UNCOV
677
      if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
×
UNCOV
678
        TAOS_CHECK_EXIT(terrno);
×
679
      }
680
    }
681
  }
UNCOV
682
  tsdbRowClose(&iter);
×
683

684
  // continue to get next row to fill null last col values
UNCOV
685
  pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
×
UNCOV
686
  while (pMemRow) {
×
UNCOV
687
    if (tSimpleHashGetSize(iColHash) == 0) {
×
UNCOV
688
      break;
×
689
    }
690

UNCOV
691
    sver = TSDBROW_SVERSION(pMemRow);
×
UNCOV
692
    TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
×
UNCOV
693
    pTSchema = pTsdb->rCache.pTSchema;
×
694

695
    STsdbRowKey tsdbRowKey = {0};
×
UNCOV
696
    tsdbRowGetKey(pMemRow, &tsdbRowKey);
×
697

UNCOV
698
    TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
×
699

UNCOV
700
    int32_t iCol = 0;
×
UNCOV
701
    for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
UNCOV
702
      if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
703
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
UNCOV
704
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
705
          TAOS_CHECK_EXIT(terrno);
×
706
        }
707

UNCOV
708
        TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
×
709
      }
710
    }
UNCOV
711
    tsdbRowClose(&iter);
×
712

UNCOV
713
    pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
×
714
  }
715

UNCOV
716
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
717

UNCOV
718
_exit:
×
UNCOV
719
  if (code) {
×
UNCOV
720
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
721

UNCOV
722
    tsdbRowClose(&iter);
×
723
  }
724

UNCOV
725
  taosArrayClear(ctxArray);
×
726
  // destroy any allocated resource
UNCOV
727
  tSimpleHashCleanup(iColHash);
×
UNCOV
728
  if (pMemDelData) {
×
UNCOV
729
    taosArrayDestroy(pMemDelData);
×
730
  }
UNCOV
731
  if (pSkyline) {
×
UNCOV
732
    taosArrayDestroy(pSkyline);
×
733
  }
734

UNCOV
735
  TAOS_RETURN(code);
×
736
}
737

UNCOV
738
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
×
UNCOV
739
  if (!pTsdb) return 0;
×
UNCOV
740
  if (!pTsdb->imem) return 0;
×
741

UNCOV
742
  int32_t    code = 0;
×
UNCOV
743
  int32_t    lino = 0;
×
UNCOV
744
  SMemTable *imem = pTsdb->imem;
×
745
  int32_t    nTbData = imem->nTbData;
×
UNCOV
746
  int64_t    nRow = imem->nRow;
×
UNCOV
747
  int64_t    nDel = imem->nDel;
×
748

UNCOV
749
  if (nRow == 0 || nTbData == 0) return 0;
×
750

UNCOV
751
  TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
×
752

UNCOV
753
_exit:
×
UNCOV
754
  if (code) {
×
UNCOV
755
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
756
  } else {
UNCOV
757
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
×
758
  }
759

UNCOV
760
  TAOS_RETURN(code);
×
761
}
762

UNCOV
763
int32_t tsdbCacheCommit(STsdb *pTsdb) {
×
764
  int32_t code = 0;
×
765

766
  // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
767
  // flush dirty data of lru into rocks
768
  // 4, and update when writing if !updateCacheBatch
769
  // 5, merge cache & mem if updateCacheBatch
770

UNCOV
771
  if (tsUpdateCacheBatch) {
×
UNCOV
772
    code = tsdbCacheUpdateFromIMem(pTsdb);
×
UNCOV
773
    if (code) {
×
UNCOV
774
      tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
775

UNCOV
776
      TAOS_RETURN(code);
×
777
    }
778
  }
779

UNCOV
780
  char                 *err = NULL;
×
UNCOV
781
  SLRUCache            *pCache = pTsdb->lruCache;
×
782
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
783

784
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
785

786
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
×
787

788
#ifdef USE_ROCKSDB
UNCOV
789
  rocksMayWrite(pTsdb, true);
×
UNCOV
790
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
×
791
#endif
UNCOV
792
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
793
#ifdef USE_ROCKSDB
UNCOV
794
  if (NULL != err) {
×
UNCOV
795
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
UNCOV
796
    rocksdb_free(err);
×
UNCOV
797
    code = TSDB_CODE_FAILED;
×
798
  }
799
#endif
UNCOV
800
  TAOS_RETURN(code);
×
801
}
802

UNCOV
803
static int32_t reallocVarDataVal(SValue *pValue) {
×
UNCOV
804
  if (IS_VAR_DATA_TYPE(pValue->type)) {
×
UNCOV
805
    uint8_t *pVal = pValue->pData;
×
UNCOV
806
    uint32_t nData = pValue->nData;
×
UNCOV
807
    if (nData > 0) {
×
UNCOV
808
      uint8_t *p = taosMemoryMalloc(nData);
×
UNCOV
809
      if (!p) {
×
UNCOV
810
        TAOS_RETURN(terrno);
×
811
      }
UNCOV
812
      pValue->pData = p;
×
UNCOV
813
      (void)memcpy(pValue->pData, pVal, nData);
×
814
    } else {
UNCOV
815
      pValue->pData = NULL;
×
816
    }
817
  }
818

UNCOV
819
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
820
}
821

UNCOV
822
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
×
823

824
// realloc pk data and col data.
UNCOV
825
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
×
UNCOV
826
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
UNCOV
827
  size_t  charge = sizeof(SLastCol);
×
828

UNCOV
829
  int8_t i = 0;
×
UNCOV
830
  for (; i < pCol->rowKey.numOfPKs; i++) {
×
UNCOV
831
    SValue *pValue = &pCol->rowKey.pks[i];
×
UNCOV
832
    if (IS_VAR_DATA_TYPE(pValue->type)) {
×
UNCOV
833
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
×
UNCOV
834
      charge += pValue->nData;
×
835
    }
836
  }
837

838
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
×
839
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
×
840
    charge += pCol->colVal.value.nData;
×
841
  }
842

UNCOV
843
  if (pCharge) {
×
844
    *pCharge = charge;
×
845
  }
846

UNCOV
847
_exit:
×
UNCOV
848
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
849
    for (int8_t j = 0; j < i; j++) {
×
UNCOV
850
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
UNCOV
851
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
852
      }
853
    }
854

UNCOV
855
    (void)memset(pCol, 0, sizeof(SLastCol));
×
856
  }
857

UNCOV
858
  TAOS_RETURN(code);
×
859
}
860

UNCOV
861
void tsdbCacheFreeSLastColItem(void *pItem) {
×
UNCOV
862
  SLastCol *pCol = (SLastCol *)pItem;
×
UNCOV
863
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
×
UNCOV
864
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
×
UNCOV
865
      taosMemoryFree(pCol->rowKey.pks[i].pData);
×
866
    }
867
  }
868

869
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) {
×
UNCOV
870
    taosMemoryFree(pCol->colVal.value.pData);
×
871
  }
UNCOV
872
}
×
873

UNCOV
874
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
×
UNCOV
875
  SLastCol *pLastCol = (SLastCol *)value;
×
876

UNCOV
877
  if (pLastCol->dirty) {
×
UNCOV
878
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
×
UNCOV
879
      STsdb *pTsdb = (STsdb *)ud;
×
UNCOV
880
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
881
    }
882
  }
883

UNCOV
884
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
×
UNCOV
885
    SValue *pValue = &pLastCol->rowKey.pks[i];
×
UNCOV
886
    if (IS_VAR_DATA_TYPE(pValue->type)) {
×
UNCOV
887
      taosMemoryFree(pValue->pData);
×
888
    }
889
  }
890

UNCOV
891
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) /* && pLastCol->colVal.value.nData > 0*/) {
×
UNCOV
892
    taosMemoryFree(pLastCol->colVal.value.pData);
×
893
  }
894

UNCOV
895
  taosMemoryFree(value);
×
UNCOV
896
}
×
897

UNCOV
898
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
×
UNCOV
899
  SLastCol *pLastCol = (SLastCol *)value;
×
UNCOV
900
  pLastCol->dirty = 0;
×
UNCOV
901
}
×
902

903
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
904

UNCOV
905
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
×
906
  int32_t code = 0, lino = 0;
×
907

UNCOV
908
  SLRUCache            *pCache = pTsdb->lruCache;
×
909
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
UNCOV
910
  SRowKey               emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
×
UNCOV
911
  SLastCol              emptyCol = {
×
912
                   .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
913

UNCOV
914
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
×
UNCOV
915
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
×
UNCOV
916
  if (code) {
×
UNCOV
917
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
918
  }
919

UNCOV
920
  TAOS_RETURN(code);
×
921
}
922

923
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
4✔
924
  int32_t code = 0;
4✔
925
  char   *err = NULL;
4✔
926

927
  SLRUCache            *pCache = pTsdb->lruCache;
4✔
928
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
929

930
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
4✔
931
#ifdef USE_ROCKSDB
932
  rocksMayWrite(pTsdb, true);
4✔
933
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
4✔
934
  if (NULL != err) {
4!
UNCOV
935
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
UNCOV
936
    rocksdb_free(err);
×
UNCOV
937
    code = TSDB_CODE_FAILED;
×
938
  }
939
#endif
940
  TAOS_RETURN(code);
4✔
941
}
942

UNCOV
943
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
×
944
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
945
#ifdef USE_ROCKSDB
946
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
×
UNCOV
947
  if (!valuesList) return terrno;
×
UNCOV
948
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
×
UNCOV
949
  if (!valuesListSizes) {
×
UNCOV
950
    taosMemoryFreeClear(valuesList);
×
UNCOV
951
    return terrno;
×
952
  }
UNCOV
953
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
×
UNCOV
954
  if (!errs) {
×
UNCOV
955
    taosMemoryFreeClear(valuesList);
×
UNCOV
956
    taosMemoryFreeClear(valuesListSizes);
×
UNCOV
957
    return terrno;
×
958
  }
UNCOV
959
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
×
960
                    valuesListSizes, errs);
UNCOV
961
  for (size_t i = 0; i < numKeys; ++i) {
×
UNCOV
962
    rocksdb_free(errs[i]);
×
963
  }
UNCOV
964
  taosMemoryFreeClear(errs);
×
965

966
  *pppValuesList = valuesList;
×
UNCOV
967
  *ppValuesListSizes = valuesListSizes;
×
968
#endif
UNCOV
969
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
970
}
971

UNCOV
972
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
×
UNCOV
973
  int32_t code = 0;
×
974

975
  // build keys & multi get from rocks
UNCOV
976
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
×
977
  if (!keys_list) {
×
978
    return terrno;
×
979
  }
UNCOV
980
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
×
UNCOV
981
  if (!keys_list_sizes) {
×
UNCOV
982
    taosMemoryFree(keys_list);
×
UNCOV
983
    return terrno;
×
984
  }
UNCOV
985
  const size_t klen = ROCKS_KEY_LEN;
×
986

UNCOV
987
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
×
UNCOV
988
  if (!keys) {
×
UNCOV
989
    taosMemoryFree(keys_list);
×
UNCOV
990
    taosMemoryFree(keys_list_sizes);
×
UNCOV
991
    return terrno;
×
992
  }
UNCOV
993
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
×
UNCOV
994
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
×
995

UNCOV
996
  keys_list[0] = keys;
×
UNCOV
997
  keys_list[1] = keys + sizeof(SLastKey);
×
UNCOV
998
  keys_list_sizes[0] = klen;
×
UNCOV
999
  keys_list_sizes[1] = klen;
×
1000

UNCOV
1001
  char  **values_list = NULL;
×
UNCOV
1002
  size_t *values_list_sizes = NULL;
×
1003

1004
  // was written by caller
1005
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
1006

1007
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
×
1008
                                              &values_list_sizes),
1009
                  NULL, _exit);
1010
#ifdef USE_ROCKSDB
UNCOV
1011
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
×
1012
#endif
1013
  {
1014
#ifdef USE_ROCKSDB
UNCOV
1015
    SLastCol *pLastCol = NULL;
×
UNCOV
1016
    if (values_list[0] != NULL) {
×
UNCOV
1017
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
×
UNCOV
1018
      if (code != TSDB_CODE_SUCCESS) {
×
1019
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1020
                  tstrerror(code));
1021
        goto _exit;
×
1022
      }
UNCOV
1023
      if (NULL != pLastCol) {
×
UNCOV
1024
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
×
1025
      }
UNCOV
1026
      taosMemoryFreeClear(pLastCol);
×
1027
    }
1028

UNCOV
1029
    pLastCol = NULL;
×
UNCOV
1030
    if (values_list[1] != NULL) {
×
UNCOV
1031
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
×
UNCOV
1032
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1033
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1034
                  tstrerror(code));
UNCOV
1035
        goto _exit;
×
1036
      }
UNCOV
1037
      if (NULL != pLastCol) {
×
UNCOV
1038
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
×
1039
      }
UNCOV
1040
      taosMemoryFreeClear(pLastCol);
×
1041
    }
1042

UNCOV
1043
    rocksdb_free(values_list[0]);
×
UNCOV
1044
    rocksdb_free(values_list[1]);
×
1045
#endif
1046

UNCOV
1047
    for (int i = 0; i < 2; i++) {
×
UNCOV
1048
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
×
UNCOV
1049
      if (h) {
×
UNCOV
1050
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
×
UNCOV
1051
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
×
1052
      }
1053
    }
1054
  }
1055

UNCOV
1056
_exit:
×
UNCOV
1057
  taosMemoryFree(keys_list[0]);
×
1058

UNCOV
1059
  taosMemoryFree(keys_list);
×
UNCOV
1060
  taosMemoryFree(keys_list_sizes);
×
UNCOV
1061
  taosMemoryFree(values_list);
×
UNCOV
1062
  taosMemoryFree(values_list_sizes);
×
1063

1064
  TAOS_RETURN(code);
×
1065
}
1066

UNCOV
1067
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
×
UNCOV
1068
  int32_t code = 0;
×
1069

UNCOV
1070
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1071

UNCOV
1072
  if (suid < 0) {
×
UNCOV
1073
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
×
UNCOV
1074
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
UNCOV
1075
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1076

1077
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
×
UNCOV
1078
      if (code != TSDB_CODE_SUCCESS) {
×
1079
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1080
                  tstrerror(code));
1081
      }
UNCOV
1082
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
×
UNCOV
1083
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1084
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1085
                  tstrerror(code));
1086
      }
1087
    }
1088
  } else {
UNCOV
1089
    STSchema *pTSchema = NULL;
×
UNCOV
1090
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
×
UNCOV
1091
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1092
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1093

UNCOV
1094
      TAOS_RETURN(code);
×
1095
    }
1096

UNCOV
1097
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
×
UNCOV
1098
      int16_t cid = pTSchema->columns[i].colId;
×
UNCOV
1099
      int8_t  col_type = pTSchema->columns[i].type;
×
1100

UNCOV
1101
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
×
UNCOV
1102
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1103
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1104
                  tstrerror(code));
1105
      }
UNCOV
1106
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
×
UNCOV
1107
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1108
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1109
                  tstrerror(code));
1110
      }
1111
    }
1112

1113
    taosMemoryFree(pTSchema);
×
1114
  }
1115

UNCOV
1116
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1117

1118
  TAOS_RETURN(code);
×
1119
}
1120

1121
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
×
UNCOV
1122
  int32_t code = 0;
×
1123

1124
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1125

UNCOV
1126
  code = tsdbCacheCommitNoLock(pTsdb);
×
1127
  if (code != TSDB_CODE_SUCCESS) {
×
1128
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1129
              tstrerror(code));
1130
  }
1131

UNCOV
1132
  if (pSchemaRow != NULL) {
×
UNCOV
1133
    bool hasPrimayKey = false;
×
UNCOV
1134
    int  nCols = pSchemaRow->nCols;
×
UNCOV
1135
    if (nCols >= 2) {
×
UNCOV
1136
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
1137
    }
UNCOV
1138
    for (int i = 0; i < nCols; ++i) {
×
1139
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
UNCOV
1140
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1141

UNCOV
1142
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
UNCOV
1143
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1144
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1145
                  tstrerror(code));
1146
      }
1147
    }
1148
  } else {
UNCOV
1149
    STSchema *pTSchema = NULL;
×
UNCOV
1150
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
×
UNCOV
1151
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1152
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1153

UNCOV
1154
      TAOS_RETURN(code);
×
1155
    }
1156

UNCOV
1157
    bool hasPrimayKey = false;
×
UNCOV
1158
    int  nCols = pTSchema->numOfCols;
×
UNCOV
1159
    if (nCols >= 2) {
×
UNCOV
1160
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
×
1161
    }
UNCOV
1162
    for (int i = 0; i < nCols; ++i) {
×
UNCOV
1163
      int16_t cid = pTSchema->columns[i].colId;
×
UNCOV
1164
      int8_t  col_type = pTSchema->columns[i].type;
×
1165

UNCOV
1166
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
UNCOV
1167
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1168
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1169
                  tstrerror(code));
1170
      }
1171
    }
1172

UNCOV
1173
    taosMemoryFree(pTSchema);
×
1174
  }
1175

UNCOV
1176
  rocksMayWrite(pTsdb, false);
×
1177

UNCOV
1178
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1179

UNCOV
1180
  TAOS_RETURN(code);
×
1181
}
1182

1183
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
4✔
1184
  int32_t code = 0;
4✔
1185

1186
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
4✔
1187

1188
  code = tsdbCacheCommitNoLock(pTsdb);
4✔
1189
  if (code != TSDB_CODE_SUCCESS) {
4!
UNCOV
1190
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1191
              tstrerror(code));
1192
  }
1193

1194
  STSchema *pTSchema = NULL;
4✔
1195
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
4✔
1196
  if (code != TSDB_CODE_SUCCESS) {
4!
UNCOV
1197
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1198

UNCOV
1199
    TAOS_RETURN(code);
×
1200
  }
1201

1202
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
4!
UNCOV
1203
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
×
1204

UNCOV
1205
    bool hasPrimayKey = false;
×
UNCOV
1206
    int  nCols = pTSchema->numOfCols;
×
UNCOV
1207
    if (nCols >= 2) {
×
UNCOV
1208
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
×
1209
    }
1210

UNCOV
1211
    for (int i = 0; i < nCols; ++i) {
×
UNCOV
1212
      int16_t cid = pTSchema->columns[i].colId;
×
UNCOV
1213
      int8_t  col_type = pTSchema->columns[i].type;
×
1214

UNCOV
1215
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
UNCOV
1216
      if (code != TSDB_CODE_SUCCESS) {
×
1217
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1218
                  tstrerror(code));
1219
      }
1220
    }
1221
  }
1222

1223
  taosMemoryFree(pTSchema);
4!
1224

1225
  rocksMayWrite(pTsdb, false);
4✔
1226

1227
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
4✔
1228

1229
  TAOS_RETURN(code);
4✔
1230
}
1231

UNCOV
1232
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
×
1233
  int32_t code = 0;
×
1234

1235
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1236

UNCOV
1237
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
1238
  if (code != TSDB_CODE_SUCCESS) {
×
1239
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1240
              tstrerror(code));
1241
  }
UNCOV
1242
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
1243
  if (code != TSDB_CODE_SUCCESS) {
×
1244
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1245
              tstrerror(code));
1246
  }
1247
  // rocksMayWrite(pTsdb, true, false, false);
UNCOV
1248
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1249

1250
  TAOS_RETURN(code);
×
1251
}
1252

UNCOV
1253
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
×
UNCOV
1254
  int32_t code = 0;
×
1255

UNCOV
1256
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1257

UNCOV
1258
  code = tsdbCacheCommitNoLock(pTsdb);
×
1259
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1260
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1261
              tstrerror(code));
1262
  }
1263

UNCOV
1264
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
UNCOV
1265
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1266
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1267
              tstrerror(code));
1268
  }
1269

UNCOV
1270
  rocksMayWrite(pTsdb, false);
×
1271

1272
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1273

UNCOV
1274
  TAOS_RETURN(code);
×
1275
}
1276

1277
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
×
UNCOV
1278
  int32_t code = 0;
×
1279

UNCOV
1280
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1281

UNCOV
1282
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
×
UNCOV
1283
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
×
1284

UNCOV
1285
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
UNCOV
1286
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1287
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1288
                tstrerror(code));
1289
    }
UNCOV
1290
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
UNCOV
1291
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1292
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1293
                tstrerror(code));
1294
    }
1295
  }
1296

1297
  // rocksMayWrite(pTsdb, true, false, false);
UNCOV
1298
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
UNCOV
1299
  TAOS_RETURN(code);
×
1300
}
1301

UNCOV
1302
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
×
1303
  int32_t code = 0;
×
1304

UNCOV
1305
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1306

UNCOV
1307
  code = tsdbCacheCommitNoLock(pTsdb);
×
UNCOV
1308
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1309
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1310
              tstrerror(code));
1311
  }
1312

UNCOV
1313
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
×
UNCOV
1314
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
×
1315

UNCOV
1316
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
UNCOV
1317
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1318
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1319
                tstrerror(code));
1320
    }
1321
  }
1322

1323
  rocksMayWrite(pTsdb, false);
×
1324

1325
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1326

1327
  TAOS_RETURN(code);
×
1328
}
1329

1330
typedef struct {
1331
  int      idx;
1332
  SLastKey key;
1333
} SIdxKey;
1334

1335
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
×
1336
  // update rowkey
1337
  pLastCol->rowKey.ts = TSKEY_MIN;
×
UNCOV
1338
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
1339
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
UNCOV
1340
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
UNCOV
1341
      taosMemoryFreeClear(pPKValue->pData);
×
1342
      pPKValue->nData = 0;
×
1343
    } else {
1344
      pPKValue->val = 0;
×
1345
    }
1346
  }
UNCOV
1347
  pLastCol->rowKey.numOfPKs = 0;
×
1348

1349
  // update colval
UNCOV
1350
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
×
UNCOV
1351
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
UNCOV
1352
    pLastCol->colVal.value.nData = 0;
×
1353
  } else {
1354
    pLastCol->colVal.value.val = 0;
×
1355
  }
1356

UNCOV
1357
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
×
UNCOV
1358
  pLastCol->dirty = 1;
×
UNCOV
1359
  pLastCol->cacheStatus = cacheStatus;
×
UNCOV
1360
}
×
1361

UNCOV
1362
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
×
UNCOV
1363
  int32_t code = 0;
×
1364
#ifdef USE_ROCKSDB
UNCOV
1365
  char   *rocks_value = NULL;
×
UNCOV
1366
  size_t  vlen = 0;
×
1367

UNCOV
1368
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
×
UNCOV
1369
  if (code) {
×
UNCOV
1370
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
1371
    TAOS_RETURN(code);
×
1372
  }
1373

UNCOV
1374
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
×
UNCOV
1375
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
×
UNCOV
1376
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
×
UNCOV
1377
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
×
1378

UNCOV
1379
  taosMemoryFree(rocks_value);
×
1380
#endif
UNCOV
1381
  TAOS_RETURN(code);
×
1382
}
1383

1384
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
×
1385
  int32_t code = 0, lino = 0;
×
1386

UNCOV
1387
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
×
UNCOV
1388
  if (!pLRULastCol) {
×
UNCOV
1389
    return terrno;
×
1390
  }
1391

1392
  size_t charge = 0;
×
UNCOV
1393
  *pLRULastCol = *pLastCol;
×
UNCOV
1394
  pLRULastCol->dirty = dirty;
×
UNCOV
1395
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
×
1396

UNCOV
1397
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
×
1398
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
UNCOV
1399
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
×
1400
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
UNCOV
1401
    code = TSDB_CODE_FAILED;
×
UNCOV
1402
    pLRULastCol = NULL;
×
1403
  }
1404

UNCOV
1405
_exit:
×
UNCOV
1406
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1407
    taosMemoryFree(pLRULastCol);
×
UNCOV
1408
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1409
  }
1410

UNCOV
1411
  TAOS_RETURN(code);
×
1412
}
1413

UNCOV
1414
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
×
UNCOV
1415
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
×
UNCOV
1416
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1417
  }
1418

UNCOV
1419
  int32_t code = 0, lino = 0;
×
1420

UNCOV
1421
  int        num_keys = TARRAY_SIZE(updCtxArray);
×
UNCOV
1422
  SArray    *remainCols = NULL;
×
UNCOV
1423
  SLRUCache *pCache = pTsdb->lruCache;
×
1424

UNCOV
1425
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
UNCOV
1426
  for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1427
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
×
UNCOV
1428
    int8_t          lflag = updCtx->lflag;
×
UNCOV
1429
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
×
UNCOV
1430
    SColVal        *pColVal = &updCtx->colVal;
×
1431

UNCOV
1432
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
1433
      continue;
×
1434
    }
1435

UNCOV
1436
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
×
UNCOV
1437
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
×
UNCOV
1438
    if (h) {
×
1439
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
×
UNCOV
1440
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
×
UNCOV
1441
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
×
UNCOV
1442
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
×
1443
          SLastCol newLastCol = {
×
1444
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
UNCOV
1445
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
×
1446
        }
1447
      }
1448

UNCOV
1449
      tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
1450
      TAOS_CHECK_EXIT(code);
×
1451
    } else {
UNCOV
1452
      if (!remainCols) {
×
UNCOV
1453
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
×
UNCOV
1454
        if (!remainCols) {
×
UNCOV
1455
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1456
        }
1457
      }
UNCOV
1458
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
×
1459
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1460
      }
1461
    }
1462
  }
1463

1464
  if (remainCols) {
×
1465
    num_keys = TARRAY_SIZE(remainCols);
×
1466
  }
UNCOV
1467
  if (remainCols && num_keys > 0) {
×
UNCOV
1468
    char  **keys_list = NULL;
×
UNCOV
1469
    size_t *keys_list_sizes = NULL;
×
UNCOV
1470
    char  **values_list = NULL;
×
UNCOV
1471
    size_t *values_list_sizes = NULL;
×
UNCOV
1472
    char  **errs = NULL;
×
UNCOV
1473
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
×
UNCOV
1474
    if (!keys_list) {
×
UNCOV
1475
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
UNCOV
1476
      return terrno;
×
1477
    }
UNCOV
1478
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
×
UNCOV
1479
    if (!keys_list_sizes) {
×
1480
      taosMemoryFree(keys_list);
×
1481
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1482
      return terrno;
×
1483
    }
UNCOV
1484
    for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1485
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
×
1486

UNCOV
1487
      keys_list[i] = (char *)&idxKey->key;
×
UNCOV
1488
      keys_list_sizes[i] = ROCKS_KEY_LEN;
×
1489
    }
1490

UNCOV
1491
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
×
1492

UNCOV
1493
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
×
1494
                                       &values_list_sizes);
UNCOV
1495
    if (code) {
×
1496
      taosMemoryFree(keys_list);
×
UNCOV
1497
      taosMemoryFree(keys_list_sizes);
×
1498
      goto _exit;
×
1499
    }
1500

1501
    // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
UNCOV
1502
    for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1503
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
×
UNCOV
1504
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
×
UNCOV
1505
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
×
UNCOV
1506
      SColVal        *pColVal = &updCtx->colVal;
×
1507

UNCOV
1508
      SLastCol *pLastCol = NULL;
×
1509
      if (values_list[i] != NULL) {
×
1510
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
UNCOV
1511
        if (code != TSDB_CODE_SUCCESS) {
×
1512
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1513
                    tstrerror(code));
UNCOV
1514
          goto _exit;
×
1515
        }
1516
      }
1517
      /*
1518
      if (code) {
1519
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1520
      }
1521
      */
1522
      SLastCol *pToFree = pLastCol;
×
1523

UNCOV
1524
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
×
UNCOV
1525
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
UNCOV
1526
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1527
                    tstrerror(code));
UNCOV
1528
          taosMemoryFreeClear(pToFree);
×
UNCOV
1529
          break;
×
1530
        }
1531

1532
        // cache invalid => skip update
UNCOV
1533
        taosMemoryFreeClear(pToFree);
×
UNCOV
1534
        continue;
×
1535
      }
1536

1537
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
×
1538
        taosMemoryFreeClear(pToFree);
×
UNCOV
1539
        continue;
×
1540
      }
1541

UNCOV
1542
      int32_t cmp_res = 1;
×
1543
      if (pLastCol) {
×
1544
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
×
1545
      }
1546

UNCOV
1547
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
×
UNCOV
1548
        SLastCol lastColTmp = {
×
1549
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
UNCOV
1550
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
×
UNCOV
1551
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1552
                    tstrerror(code));
UNCOV
1553
          taosMemoryFreeClear(pToFree);
×
UNCOV
1554
          break;
×
1555
        }
UNCOV
1556
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
×
UNCOV
1557
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1558
                    tstrerror(code));
UNCOV
1559
          taosMemoryFreeClear(pToFree);
×
UNCOV
1560
          break;
×
1561
        }
1562
      }
1563

UNCOV
1564
      taosMemoryFreeClear(pToFree);
×
1565
    }
1566

UNCOV
1567
    rocksMayWrite(pTsdb, false);
×
1568

1569
    taosMemoryFree(keys_list);
×
UNCOV
1570
    taosMemoryFree(keys_list_sizes);
×
UNCOV
1571
    if (values_list) {
×
1572
#ifdef USE_ROCKSDB
UNCOV
1573
      for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1574
        rocksdb_free(values_list[i]);
×
1575
      }
1576
#endif
UNCOV
1577
      taosMemoryFree(values_list);
×
1578
    }
UNCOV
1579
    taosMemoryFree(values_list_sizes);
×
1580
  }
1581

1582
_exit:
×
1583
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1584
  taosArrayDestroy(remainCols);
×
1585

UNCOV
1586
  if (code) {
×
1587
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1588
              tstrerror(code));
1589
  }
1590

1591
  TAOS_RETURN(code);
×
1592
}
1593

UNCOV
1594
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
×
1595
                                 SRow **aRow) {
1596
  int32_t code = 0, lino = 0;
×
1597

1598
  // 1. prepare last
UNCOV
1599
  TSDBROW      lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
×
1600
  STSchema    *pTSchema = NULL;
×
1601
  int32_t      sver = TSDBROW_SVERSION(&lRow);
×
1602
  SSHashObj   *iColHash = NULL;
×
1603
  STSDBRowIter iter = {0};
×
1604

UNCOV
1605
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
UNCOV
1606
  pTSchema = pTsdb->rCache.pTSchema;
×
1607

1608
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
×
1609
  int32_t nCol = pTSchema->numOfCols;
×
1610
  SArray *ctxArray = pTsdb->rCache.ctxArray;
×
1611

1612
  // 1. prepare by lrow
1613
  STsdbRowKey tsdbRowKey = {0};
×
1614
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1615

1616
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1617

UNCOV
1618
  int32_t iCol = 0;
×
UNCOV
1619
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
1620
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1621
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
1622
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1623
    }
1624

UNCOV
1625
    if (COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
1626
      updateCtx.lflag = LFLAG_LAST;
×
1627
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1628
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1629
      }
1630
    } else {
UNCOV
1631
      if (!iColHash) {
×
1632
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
UNCOV
1633
        if (iColHash == NULL) {
×
1634
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1635
        }
1636
      }
1637

1638
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
×
1639
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1640
      }
1641
    }
1642
  }
1643

1644
  // 2. prepare by the other rows
1645
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
×
1646
    if (tSimpleHashGetSize(iColHash) == 0) {
×
1647
      break;
×
1648
    }
1649

1650
    tRow.pTSRow = aRow[iRow];
×
1651

UNCOV
1652
    STsdbRowKey tsdbRowKey = {0};
×
UNCOV
1653
    tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1654

UNCOV
1655
    void   *pIte = NULL;
×
UNCOV
1656
    int32_t iter = 0;
×
UNCOV
1657
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
×
1658
      int32_t iCol = ((int32_t *)pIte)[0];
×
UNCOV
1659
      SColVal colVal = COL_VAL_NONE(0, 0);
×
1660
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
×
1661

1662
      if (COL_VAL_IS_VALUE(&colVal)) {
×
UNCOV
1663
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
UNCOV
1664
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1665
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1666
        }
1667
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
UNCOV
1668
        if (code != TSDB_CODE_SUCCESS) {
×
1669
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1670
                    __LINE__, tstrerror(code));
1671
        }
1672
      }
1673
    }
1674
  }
1675

1676
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1677

1678
_exit:
×
1679
  if (code) {
×
UNCOV
1680
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1681
  }
1682

1683
  tsdbRowClose(&iter);
×
1684
  tSimpleHashCleanup(iColHash);
×
1685
  taosArrayClear(ctxArray);
×
1686

UNCOV
1687
  TAOS_RETURN(code);
×
1688
}
1689

1690
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
UNCOV
1691
  int32_t      code = 0, lino = 0;
×
UNCOV
1692
  STSDBRowIter iter = {0};
×
1693
  STSchema    *pTSchema = NULL;
×
UNCOV
1694
  SArray      *ctxArray = NULL;
×
1695

1696
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
UNCOV
1697
  int32_t sver = TSDBROW_SVERSION(&lRow);
×
1698

1699
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1700

UNCOV
1701
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
UNCOV
1702
  if (ctxArray == NULL) {
×
1703
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1704
  }
1705

1706
  // 1. prepare last
1707
  STsdbRowKey tsdbRowKey = {0};
×
1708
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1709

1710
  {
1711
    SLastUpdateCtx updateCtx = {
×
1712
        .lflag = LFLAG_LAST,
1713
        .tsdbRowKey = tsdbRowKey,
UNCOV
1714
        .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP,
×
1715
                                                                       .val = lRow.pBlockData->aTSKEY[lRow.iRow]}))};
1716
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1717
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1718
    }
1719
  }
1720

1721
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1722

UNCOV
1723
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1724
    SColData *pColData = &pBlockData->aColData[iColData];
×
UNCOV
1725
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
UNCOV
1726
      continue;
×
1727
    }
1728

UNCOV
1729
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
1730
      STsdbRowKey tsdbRowKey = {0};
×
1731
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1732

1733
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
1734
      if (colType == 2) {
×
UNCOV
1735
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
UNCOV
1736
        TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit);
×
1737

1738
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
UNCOV
1739
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1740
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1741
        }
1742
        break;
×
1743
      }
1744
    }
1745
  }
1746

1747
  // 2. prepare last row
UNCOV
1748
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
UNCOV
1749
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
UNCOV
1750
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
UNCOV
1751
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
1752
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1753
    }
1754
  }
1755

UNCOV
1756
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1757

UNCOV
1758
_exit:
×
UNCOV
1759
  tsdbRowClose(&iter);
×
UNCOV
1760
  taosMemoryFreeClear(pTSchema);
×
UNCOV
1761
  taosArrayDestroy(ctxArray);
×
1762

UNCOV
1763
  TAOS_RETURN(code);
×
1764
}
1765

1766
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1767
                            int nCols, int16_t *slotIds);
1768

1769
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1770
                               int nCols, int16_t *slotIds);
1771

UNCOV
1772
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
×
1773
                                    SCacheRowsReader *pr, int8_t ltype) {
UNCOV
1774
  int32_t               code = 0, lino = 0;
×
1775
  // rocksdb_writebatch_t *wb = NULL;
UNCOV
1776
  SArray               *pTmpColArray = NULL;
×
UNCOV
1777
  bool                  extraTS = false;
×
1778

UNCOV
1779
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
×
UNCOV
1780
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
×
1781
    // ignore 'ts' loaded from cache and load it from tsdb
1782
    // SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
1783
    // tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
1784

UNCOV
1785
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
×
UNCOV
1786
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
×
UNCOV
1787
      TAOS_RETURN(terrno);
×
1788
    }
1789

UNCOV
1790
    extraTS = true;
×
1791
  }
1792

UNCOV
1793
  int      num_keys = TARRAY_SIZE(remainCols);
×
UNCOV
1794
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
×
1795

UNCOV
1796
  int16_t *lastColIds = NULL, *lastSlotIds = NULL, *lastrowColIds = NULL, *lastrowSlotIds = NULL;
×
UNCOV
1797
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
×
UNCOV
1798
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
×
UNCOV
1799
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
×
UNCOV
1800
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
×
UNCOV
1801
  SArray *lastTmpColArray = NULL, *lastTmpIndexArray = NULL, *lastrowTmpColArray = NULL, *lastrowTmpIndexArray = NULL;
×
1802

UNCOV
1803
  int lastIndex = 0;
×
1804
  int lastrowIndex = 0;
×
1805

UNCOV
1806
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
×
UNCOV
1807
    TAOS_CHECK_EXIT(terrno);
×
1808
  }
1809

UNCOV
1810
  for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1811
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
×
UNCOV
1812
    if (extraTS && !i) {
×
UNCOV
1813
      slotIds[i] = 0;
×
1814
    } else {
UNCOV
1815
      slotIds[i] = pr->pSlotIds[idxKey->idx];
×
1816
    }
1817

UNCOV
1818
    if (IS_LAST_KEY(idxKey->key)) {
×
UNCOV
1819
      if (NULL == lastTmpIndexArray) {
×
UNCOV
1820
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
×
1821
        if (!lastTmpIndexArray) {
×
UNCOV
1822
          TAOS_CHECK_EXIT(terrno);
×
1823
        }
1824
      }
1825
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
×
UNCOV
1826
        TAOS_CHECK_EXIT(terrno);
×
1827
      }
UNCOV
1828
      lastColIds[lastIndex] = idxKey->key.cid;
×
UNCOV
1829
      if (extraTS && !i) {
×
UNCOV
1830
        lastSlotIds[lastIndex] = 0;
×
1831
      } else {
UNCOV
1832
        lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
×
1833
      }
UNCOV
1834
      lastIndex++;
×
1835
    } else {
UNCOV
1836
      if (NULL == lastrowTmpIndexArray) {
×
UNCOV
1837
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
×
UNCOV
1838
        if (!lastrowTmpIndexArray) {
×
1839
          TAOS_CHECK_EXIT(terrno);
×
1840
        }
1841
      }
UNCOV
1842
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
×
UNCOV
1843
        TAOS_CHECK_EXIT(terrno);
×
1844
      }
UNCOV
1845
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
×
UNCOV
1846
      if (extraTS && !i) {
×
1847
        lastrowSlotIds[lastrowIndex] = 0;
×
1848
      } else {
UNCOV
1849
        lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
×
1850
      }
UNCOV
1851
      lastrowIndex++;
×
1852
    }
1853
  }
1854

UNCOV
1855
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
×
UNCOV
1856
  if (!pTmpColArray) {
×
1857
    TAOS_CHECK_EXIT(terrno);
×
1858
  }
1859

UNCOV
1860
  if (lastTmpIndexArray != NULL) {
×
UNCOV
1861
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
×
UNCOV
1862
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
×
UNCOV
1863
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
×
UNCOV
1864
                           taosArrayGet(lastTmpColArray, i))) {
×
UNCOV
1865
        TAOS_CHECK_EXIT(terrno);
×
1866
      }
1867
    }
1868
  }
1869

UNCOV
1870
  if (lastrowTmpIndexArray != NULL) {
×
UNCOV
1871
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
×
UNCOV
1872
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
×
UNCOV
1873
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
×
UNCOV
1874
                           taosArrayGet(lastrowTmpColArray, i))) {
×
UNCOV
1875
        TAOS_CHECK_EXIT(terrno);
×
1876
      }
1877
    }
1878
  }
1879

UNCOV
1880
  SLRUCache *pCache = pTsdb->lruCache;
×
UNCOV
1881
  for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1882
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
×
UNCOV
1883
    SLastCol *pLastCol = NULL;
×
1884

1885
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
×
UNCOV
1886
      pLastCol = taosArrayGet(pTmpColArray, i);
×
1887
    }
1888

1889
    // still null, then make up a none col value
UNCOV
1890
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
×
UNCOV
1891
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
×
1892
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
UNCOV
1893
    if (!pLastCol) {
×
1894
      pLastCol = &noneCol;
×
1895
    }
1896

UNCOV
1897
    if (!extraTS || i > 0) {
×
UNCOV
1898
      taosArraySet(pLastArray, idxKey->idx, pLastCol);
×
1899
    }
1900
    // taosArrayRemove(remainCols, i);
1901

UNCOV
1902
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
×
UNCOV
1903
      continue;
×
1904
    }
UNCOV
1905
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
×
UNCOV
1906
      continue;
×
1907
    }
1908

1909
    // store result back to rocks cache
UNCOV
1910
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
×
UNCOV
1911
    if (code) {
×
UNCOV
1912
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
UNCOV
1913
      TAOS_CHECK_EXIT(code);
×
1914
    }
1915

UNCOV
1916
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
×
UNCOV
1917
    if (code) {
×
UNCOV
1918
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
UNCOV
1919
      TAOS_CHECK_EXIT(code);
×
1920
    }
1921
  }
1922

UNCOV
1923
  rocksMayWrite(pTsdb, false);
×
1924

UNCOV
1925
_exit:
×
UNCOV
1926
  taosArrayDestroy(lastrowTmpIndexArray);
×
UNCOV
1927
  taosArrayDestroy(lastrowTmpColArray);
×
UNCOV
1928
  taosArrayDestroy(lastTmpIndexArray);
×
UNCOV
1929
  taosArrayDestroy(lastTmpColArray);
×
1930

UNCOV
1931
  taosMemoryFree(lastColIds);
×
UNCOV
1932
  taosMemoryFree(lastSlotIds);
×
1933
  taosMemoryFree(lastrowColIds);
×
1934
  taosMemoryFree(lastrowSlotIds);
×
1935

UNCOV
1936
  taosArrayDestroy(pTmpColArray);
×
1937

UNCOV
1938
  taosMemoryFree(slotIds);
×
1939

UNCOV
1940
  TAOS_RETURN(code);
×
1941
}
1942

UNCOV
1943
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
×
1944
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
UNCOV
1945
  int32_t code = 0, lino = 0;
×
UNCOV
1946
  int     num_keys = TARRAY_SIZE(remainCols);
×
UNCOV
1947
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
×
UNCOV
1948
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
×
UNCOV
1949
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
×
1950
  if (!keys_list || !keys_list_sizes || !key_list) {
×
1951
    taosMemoryFree(keys_list);
×
1952
    taosMemoryFree(keys_list_sizes);
×
1953
    TAOS_RETURN(terrno);
×
1954
  }
UNCOV
1955
  char  **values_list = NULL;
×
UNCOV
1956
  size_t *values_list_sizes = NULL;
×
UNCOV
1957
  for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1958
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
×
UNCOV
1959
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
×
UNCOV
1960
    keys_list_sizes[i] = ROCKS_KEY_LEN;
×
1961
  }
1962

UNCOV
1963
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
×
1964

UNCOV
1965
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
×
1966
                                     &values_list_sizes);
UNCOV
1967
  if (code) {
×
1968
    taosMemoryFree(key_list);
×
UNCOV
1969
    taosMemoryFree(keys_list);
×
1970
    taosMemoryFree(keys_list_sizes);
×
UNCOV
1971
    TAOS_RETURN(code);
×
1972
  }
1973

UNCOV
1974
  SLRUCache *pCache = pTsdb->lruCache;
×
UNCOV
1975
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
×
UNCOV
1976
    SLastCol *pLastCol = NULL;
×
UNCOV
1977
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
×
1978
    if (ignore) {
×
1979
      ++j;
×
1980
      continue;
×
1981
    }
1982

UNCOV
1983
    if (values_list[i] != NULL) {
×
UNCOV
1984
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
UNCOV
1985
      if (code != TSDB_CODE_SUCCESS) {
×
1986
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1987
                  tstrerror(code));
UNCOV
1988
        goto _exit;
×
1989
      }
1990
    }
UNCOV
1991
    SLastCol *pToFree = pLastCol;
×
UNCOV
1992
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
×
UNCOV
1993
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
×
UNCOV
1994
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
×
UNCOV
1995
      if (code) {
×
UNCOV
1996
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
UNCOV
1997
        taosMemoryFreeClear(pToFree);
×
UNCOV
1998
        TAOS_CHECK_EXIT(code);
×
1999
      }
2000

UNCOV
2001
      SLastCol lastCol = *pLastCol;
×
UNCOV
2002
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
UNCOV
2003
      if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
2004
        taosMemoryFreeClear(pToFree);
×
UNCOV
2005
        TAOS_CHECK_EXIT(code);
×
2006
      }
2007

UNCOV
2008
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
UNCOV
2009
      taosArrayRemove(remainCols, j);
×
UNCOV
2010
      taosArrayRemove(ignoreFromRocks, j);
×
2011
    } else {
UNCOV
2012
      ++j;
×
2013
    }
2014

UNCOV
2015
    taosMemoryFreeClear(pToFree);
×
2016
  }
2017

UNCOV
2018
  if (TARRAY_SIZE(remainCols) > 0) {
×
2019
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
UNCOV
2020
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
×
2021
  }
2022

UNCOV
2023
_exit:
×
UNCOV
2024
  taosMemoryFree(key_list);
×
UNCOV
2025
  taosMemoryFree(keys_list);
×
UNCOV
2026
  taosMemoryFree(keys_list_sizes);
×
UNCOV
2027
  if (values_list) {
×
2028
  #ifdef USE_ROCKSDB
UNCOV
2029
    for (int i = 0; i < num_keys; ++i) {
×
UNCOV
2030
      rocksdb_free(values_list[i]);
×
2031
    }
2032
  #endif
UNCOV
2033
    taosMemoryFree(values_list);
×
2034
  }
UNCOV
2035
  taosMemoryFree(values_list_sizes);
×
2036

UNCOV
2037
  TAOS_RETURN(code);
×
2038
}
2039

UNCOV
2040
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
×
2041
                                        int8_t ltype, SArray *keyArray) {
UNCOV
2042
  int32_t    code = 0, lino = 0;
×
UNCOV
2043
  SArray    *remainCols = NULL;
×
2044
  SArray    *ignoreFromRocks = NULL;
×
UNCOV
2045
  SLRUCache *pCache = pTsdb->lruCache;
×
UNCOV
2046
  SArray    *pCidList = pr->pCidList;
×
UNCOV
2047
  int        numKeys = TARRAY_SIZE(pCidList);
×
2048

UNCOV
2049
  for (int i = 0; i < numKeys; ++i) {
×
UNCOV
2050
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
×
2051

2052
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
×
2053
    // for select last_row, last case
UNCOV
2054
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
×
UNCOV
2055
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
×
UNCOV
2056
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
×
2057
    }
2058
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
×
2059
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
×
UNCOV
2060
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
×
2061
    }
2062

UNCOV
2063
    if (!taosArrayPush(keyArray, &key)) {
×
UNCOV
2064
      TAOS_CHECK_EXIT(terrno);
×
2065
    }
2066

2067
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
×
2068
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
×
2069
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
×
UNCOV
2070
      SLastCol lastCol = *pLastCol;
×
UNCOV
2071
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
×
UNCOV
2072
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2073
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
2074
      }
2075

2076
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
×
UNCOV
2077
        code = terrno;
×
UNCOV
2078
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2079
        goto _exit;
×
2080
      }
2081
    } else {
2082
      // no cache or cache is invalid
2083
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
×
UNCOV
2084
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
×
2085

UNCOV
2086
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
×
2087
        code = terrno;
×
2088
        tsdbLRUCacheRelease(pCache, h, false);
×
2089
        goto _exit;
×
2090
      }
2091

UNCOV
2092
      if (!remainCols) {
×
2093
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
×
2094
          code = terrno;
×
2095
          tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2096
          goto _exit;
×
2097
        }
2098
      }
UNCOV
2099
      if (!ignoreFromRocks) {
×
UNCOV
2100
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
×
UNCOV
2101
          code = terrno;
×
UNCOV
2102
          tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2103
          goto _exit;
×
2104
        }
2105
      }
UNCOV
2106
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
×
UNCOV
2107
        code = terrno;
×
UNCOV
2108
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2109
        goto _exit;
×
2110
      }
UNCOV
2111
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
×
2112
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
×
2113
        code = terrno;
×
2114
        tsdbLRUCacheRelease(pCache, h, false);
×
2115
        goto _exit;
×
2116
      }
2117
    }
2118

UNCOV
2119
    if (h) {
×
2120
      tsdbLRUCacheRelease(pCache, h, false);
×
2121
    }
2122
  }
2123

UNCOV
2124
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
×
UNCOV
2125
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
2126

UNCOV
2127
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
×
UNCOV
2128
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
×
UNCOV
2129
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
×
UNCOV
2130
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
×
UNCOV
2131
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
×
UNCOV
2132
        SLastCol lastCol = *pLastCol;
×
UNCOV
2133
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
UNCOV
2134
        if (code) {
×
UNCOV
2135
          tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2136
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
UNCOV
2137
          TAOS_RETURN(code);
×
2138
        }
2139

UNCOV
2140
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
2141

UNCOV
2142
        taosArrayRemove(remainCols, i);
×
UNCOV
2143
        taosArrayRemove(ignoreFromRocks, i);
×
2144
      } else {
2145
        // no cache or cache is invalid
UNCOV
2146
        ++i;
×
2147
      }
UNCOV
2148
      if (h) {
×
UNCOV
2149
        tsdbLRUCacheRelease(pCache, h, false);
×
2150
      }
2151
    }
2152

2153
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
UNCOV
2154
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
×
2155

UNCOV
2156
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
2157
  }
2158

UNCOV
2159
_exit:
×
UNCOV
2160
  if (remainCols) {
×
UNCOV
2161
    taosArrayDestroy(remainCols);
×
2162
  }
UNCOV
2163
  if (ignoreFromRocks) {
×
UNCOV
2164
    taosArrayDestroy(ignoreFromRocks);
×
2165
  }
2166

UNCOV
2167
  TAOS_RETURN(code);
×
2168
}
2169

2170
typedef enum SMEMNEXTROWSTATES {
2171
  SMEMNEXTROW_ENTER,
2172
  SMEMNEXTROW_NEXT,
2173
} SMEMNEXTROWSTATES;
2174

2175
typedef struct SMemNextRowIter {
2176
  SMEMNEXTROWSTATES state;
2177
  STbData          *pMem;  // [input]
2178
  STbDataIter       iter;  // mem buffer skip list iterator
2179
  int64_t           lastTs;
2180
} SMemNextRowIter;
2181

UNCOV
2182
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
×
2183
                                 int nCols) {
UNCOV
2184
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
×
UNCOV
2185
  int32_t          code = 0;
×
UNCOV
2186
  *pIgnoreEarlierTs = false;
×
UNCOV
2187
  switch (state->state) {
×
UNCOV
2188
    case SMEMNEXTROW_ENTER: {
×
2189
      if (state->pMem != NULL) {
×
2190
        /*
2191
        if (state->pMem->maxKey <= state->lastTs) {
2192
          *ppRow = NULL;
2193
          *pIgnoreEarlierTs = true;
2194

2195
          TAOS_RETURN(code);
2196
        }
2197
        */
UNCOV
2198
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
×
2199

UNCOV
2200
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
×
2201
        if (pMemRow) {
×
UNCOV
2202
          *ppRow = pMemRow;
×
2203
          state->state = SMEMNEXTROW_NEXT;
×
2204

UNCOV
2205
          TAOS_RETURN(code);
×
2206
        }
2207
      }
2208

UNCOV
2209
      *ppRow = NULL;
×
2210

UNCOV
2211
      TAOS_RETURN(code);
×
2212
    }
UNCOV
2213
    case SMEMNEXTROW_NEXT:
×
UNCOV
2214
      if (tsdbTbDataIterNext(&state->iter)) {
×
UNCOV
2215
        *ppRow = tsdbTbDataIterGet(&state->iter);
×
2216

UNCOV
2217
        TAOS_RETURN(code);
×
2218
      } else {
UNCOV
2219
        *ppRow = NULL;
×
2220

UNCOV
2221
        TAOS_RETURN(code);
×
2222
      }
UNCOV
2223
    default:
×
UNCOV
2224
      break;
×
2225
  }
2226

UNCOV
2227
_err:
×
UNCOV
2228
  *ppRow = NULL;
×
2229

UNCOV
2230
  TAOS_RETURN(code);
×
2231
}
2232

2233
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2234
                                  int nCols);
2235
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2236

2237
typedef struct {
2238
  TSDBROW             *pRow;
2239
  bool                 stop;
2240
  bool                 next;
2241
  bool                 ignoreEarlierTs;
2242
  void                *iter;
2243
  _next_row_fn_t       nextRowFn;
2244
  _next_row_clear_fn_t nextRowClearFn;
2245
} TsdbNextRowState;
2246

2247
typedef struct {
2248
  SArray           *pMemDelData;
2249
  SArray           *pSkyline;
2250
  int64_t           iSkyline;
2251
  SBlockIdx         idx;
2252
  SMemNextRowIter   memState;
2253
  SMemNextRowIter   imemState;
2254
  TSDBROW           memRow, imemRow;
2255
  TsdbNextRowState  input[2];
2256
  SCacheRowsReader *pr;
2257
  STsdb            *pTsdb;
2258
} MemNextRowIter;
2259

UNCOV
2260
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
×
2261
                              STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
UNCOV
2262
  int32_t code = 0, lino = 0;
×
2263

UNCOV
2264
  STbData *pMem = NULL;
×
UNCOV
2265
  if (pReadSnap->pMem) {
×
UNCOV
2266
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
×
2267
  }
2268

UNCOV
2269
  STbData *pIMem = NULL;
×
UNCOV
2270
  if (pReadSnap->pIMem) {
×
UNCOV
2271
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
2272
  }
2273

2274
  pIter->pTsdb = pTsdb;
×
2275

2276
  pIter->pMemDelData = NULL;
×
2277

UNCOV
2278
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
×
2279

UNCOV
2280
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
×
2281

UNCOV
2282
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
×
2283
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
×
2284

UNCOV
2285
  if (pMem) {
×
UNCOV
2286
    pIter->memState.pMem = pMem;
×
UNCOV
2287
    pIter->memState.state = SMEMNEXTROW_ENTER;
×
UNCOV
2288
    pIter->input[0].stop = false;
×
UNCOV
2289
    pIter->input[0].next = true;
×
2290
  }
2291

2292
  if (pIMem) {
×
UNCOV
2293
    pIter->imemState.pMem = pIMem;
×
UNCOV
2294
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
UNCOV
2295
    pIter->input[1].stop = false;
×
UNCOV
2296
    pIter->input[1].next = true;
×
2297
  }
2298

UNCOV
2299
  pIter->pr = pr;
×
2300

UNCOV
2301
_exit:
×
UNCOV
2302
  if (code) {
×
UNCOV
2303
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2304
  }
2305

UNCOV
2306
  TAOS_RETURN(code);
×
2307
}
2308

UNCOV
2309
static void memRowIterClose(MemNextRowIter *pIter) {
×
UNCOV
2310
  for (int i = 0; i < 2; ++i) {
×
UNCOV
2311
    if (pIter->input[i].nextRowClearFn) {
×
UNCOV
2312
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
2313
    }
2314
  }
2315

UNCOV
2316
  if (pIter->pSkyline) {
×
UNCOV
2317
    taosArrayDestroy(pIter->pSkyline);
×
2318
  }
2319

UNCOV
2320
  if (pIter->pMemDelData) {
×
UNCOV
2321
    taosArrayDestroy(pIter->pMemDelData);
×
2322
  }
UNCOV
2323
}
×
2324

UNCOV
2325
static void freeTableInfoFunc(void *param) {
×
2326
  void **p = (void **)param;
×
UNCOV
2327
  taosMemoryFreeClear(*p);
×
UNCOV
2328
}
×
2329

UNCOV
2330
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
×
UNCOV
2331
  if (!pReader->pTableMap) {
×
UNCOV
2332
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
UNCOV
2333
    if (!pReader->pTableMap) {
×
UNCOV
2334
      return NULL;
×
2335
    }
2336

UNCOV
2337
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
×
2338
  }
2339

UNCOV
2340
  STableLoadInfo  *pInfo = NULL;
×
UNCOV
2341
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
×
UNCOV
2342
  if (!ppInfo) {
×
UNCOV
2343
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
×
UNCOV
2344
    if (pInfo) {
×
UNCOV
2345
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
×
UNCOV
2346
        return NULL;
×
2347
      }
2348
    }
2349

UNCOV
2350
    return pInfo;
×
2351
  }
2352

UNCOV
2353
  return *ppInfo;
×
2354
}
2355

UNCOV
2356
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
×
UNCOV
2357
  int32_t code = 0, lino = 0;
×
2358

UNCOV
2359
  for (;;) {
×
UNCOV
2360
    for (int i = 0; i < 2; ++i) {
×
UNCOV
2361
      if (pIter->input[i].next && !pIter->input[i].stop) {
×
UNCOV
2362
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
×
2363
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
2364
                        &lino, _exit);
2365

UNCOV
2366
        if (pIter->input[i].pRow == NULL) {
×
UNCOV
2367
          pIter->input[i].stop = true;
×
UNCOV
2368
          pIter->input[i].next = false;
×
2369
        }
2370
      }
2371
    }
2372

UNCOV
2373
    if (pIter->input[0].stop && pIter->input[1].stop) {
×
UNCOV
2374
      return NULL;
×
2375
    }
2376

UNCOV
2377
    TSDBROW *max[2] = {0};
×
UNCOV
2378
    int      iMax[2] = {-1, -1};
×
UNCOV
2379
    int      nMax = 0;
×
UNCOV
2380
    SRowKey  maxKey = {.ts = TSKEY_MIN};
×
2381

UNCOV
2382
    for (int i = 0; i < 2; ++i) {
×
UNCOV
2383
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
×
UNCOV
2384
        STsdbRowKey tsdbRowKey = {0};
×
UNCOV
2385
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
×
2386

2387
        // merging & deduplicating on client side
UNCOV
2388
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
×
UNCOV
2389
        if (c <= 0) {
×
UNCOV
2390
          if (c < 0) {
×
UNCOV
2391
            nMax = 0;
×
UNCOV
2392
            maxKey = tsdbRowKey.key;
×
2393
          }
2394

UNCOV
2395
          iMax[nMax] = i;
×
UNCOV
2396
          max[nMax++] = pIter->input[i].pRow;
×
2397
        }
UNCOV
2398
        pIter->input[i].next = false;
×
2399
      }
2400
    }
2401

2402
    TSDBROW *merge[2] = {0};
×
UNCOV
2403
    int      iMerge[2] = {-1, -1};
×
UNCOV
2404
    int      nMerge = 0;
×
UNCOV
2405
    for (int i = 0; i < nMax; ++i) {
×
UNCOV
2406
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
×
2407

UNCOV
2408
      if (!pIter->pSkyline) {
×
UNCOV
2409
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
×
UNCOV
2410
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
×
2411

UNCOV
2412
        uint64_t        uid = pIter->idx.uid;
×
UNCOV
2413
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
×
UNCOV
2414
        TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
×
2415

UNCOV
2416
        if (pInfo->pTombData == NULL) {
×
UNCOV
2417
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
×
UNCOV
2418
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
×
2419
        }
2420

UNCOV
2421
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
×
UNCOV
2422
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2423
        }
2424

UNCOV
2425
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
×
UNCOV
2426
        if (delSize > 0) {
×
UNCOV
2427
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
×
UNCOV
2428
          TAOS_CHECK_GOTO(code, &lino, _exit);
×
2429
        }
2430
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
×
2431
      }
2432

UNCOV
2433
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
×
2434
      if (!deleted) {
×
UNCOV
2435
        iMerge[nMerge] = iMax[i];
×
UNCOV
2436
        merge[nMerge++] = max[i];
×
2437
      }
2438

UNCOV
2439
      pIter->input[iMax[i]].next = deleted;
×
2440
    }
2441

UNCOV
2442
    if (nMerge > 0) {
×
UNCOV
2443
      pIter->input[iMerge[0]].next = true;
×
2444

UNCOV
2445
      return merge[0];
×
2446
    }
2447
  }
2448

UNCOV
2449
_exit:
×
UNCOV
2450
  if (code) {
×
UNCOV
2451
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2452
  }
2453

UNCOV
2454
  return NULL;
×
2455
}
2456

2457
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
×
2458
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
×
UNCOV
2459
  *ppDst = taosMemoryMalloc(len);
×
UNCOV
2460
  if (NULL == *ppDst) {
×
UNCOV
2461
    TAOS_RETURN(terrno);
×
2462
  }
UNCOV
2463
  memcpy(*ppDst, pSrc, len);
×
2464

UNCOV
2465
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
2466
}
2467

UNCOV
2468
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
×
UNCOV
2469
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
×
UNCOV
2470
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
×
2471
  }
2472

UNCOV
2473
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
×
UNCOV
2474
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
2475
  }
2476

UNCOV
2477
  taosMemoryFreeClear(pReader->pCurrSchema);
×
UNCOV
2478
  TAOS_RETURN(
×
2479
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
2480
}
2481

UNCOV
2482
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
×
2483
                                        SArray *keyArray) {
UNCOV
2484
  int32_t        code = 0;
×
UNCOV
2485
  int32_t        lino = 0;
×
UNCOV
2486
  STSchema      *pTSchema = pr->pSchema;
×
UNCOV
2487
  SLRUCache     *pCache = pTsdb->lruCache;
×
UNCOV
2488
  SArray        *pCidList = pr->pCidList;
×
UNCOV
2489
  int            numKeys = TARRAY_SIZE(pCidList);
×
UNCOV
2490
  MemNextRowIter iter = {0};
×
UNCOV
2491
  SSHashObj     *iColHash = NULL;
×
UNCOV
2492
  STSDBRowIter   rowIter = {0};
×
2493

2494
  // 1, get from mem, imem filtered with delete info
UNCOV
2495
  TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
×
2496

UNCOV
2497
  TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
×
UNCOV
2498
  if (!pRow) {
×
UNCOV
2499
    goto _exit;
×
2500
  }
2501

UNCOV
2502
  int32_t sversion = TSDBROW_SVERSION(pRow);
×
UNCOV
2503
  if (sversion != -1) {
×
2504
    TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
×
2505

UNCOV
2506
    pTSchema = pr->pCurrSchema;
×
2507
  }
UNCOV
2508
  int32_t nCol = pTSchema->numOfCols;
×
2509

UNCOV
2510
  STsdbRowKey rowKey = {0};
×
UNCOV
2511
  tsdbRowGetKey(pRow, &rowKey);
×
2512

UNCOV
2513
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
×
2514

UNCOV
2515
  int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
×
UNCOV
2516
  for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) {
×
UNCOV
2517
    SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
×
UNCOV
2518
    if (pColVal->cid < pTargetCol->colVal.cid) {
×
UNCOV
2519
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
×
2520

UNCOV
2521
      continue;
×
2522
    }
UNCOV
2523
    if (pColVal->cid > pTargetCol->colVal.cid) {
×
UNCOV
2524
      break;
×
2525
    }
2526

UNCOV
2527
    int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
×
UNCOV
2528
    if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
×
UNCOV
2529
      if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
×
2530
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
×
UNCOV
2531
        TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
×
2532

UNCOV
2533
        tsdbCacheFreeSLastColItem(pTargetCol);
×
UNCOV
2534
        taosArraySet(pLastArray, jCol, &lastCol);
×
2535
      }
2536
    } else {
UNCOV
2537
      if (COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
2538
        if (cmp_res <= 0) {
×
UNCOV
2539
          SLastCol lastCol = {
×
2540
              .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
UNCOV
2541
          TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
×
2542

UNCOV
2543
          tsdbCacheFreeSLastColItem(pTargetCol);
×
UNCOV
2544
          taosArraySet(pLastArray, jCol, &lastCol);
×
2545
        }
2546
      } else {
UNCOV
2547
        if (!iColHash) {
×
UNCOV
2548
          iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
×
UNCOV
2549
          if (iColHash == NULL) {
×
UNCOV
2550
            TAOS_CHECK_EXIT(terrno);
×
2551
          }
2552
        }
2553

UNCOV
2554
        if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
×
UNCOV
2555
          TAOS_CHECK_EXIT(terrno);
×
2556
        }
2557
      }
2558
    }
2559

UNCOV
2560
    ++jCol;
×
2561

UNCOV
2562
    if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) {
×
UNCOV
2563
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
×
2564
    }
2565
  }
UNCOV
2566
  tsdbRowClose(&rowIter);
×
2567

UNCOV
2568
  if (iColHash && tSimpleHashGetSize(iColHash) > 0) {
×
UNCOV
2569
    pRow = memRowIterGet(&iter, false, NULL, 0);
×
UNCOV
2570
    while (pRow) {
×
UNCOV
2571
      if (tSimpleHashGetSize(iColHash) == 0) {
×
UNCOV
2572
        break;
×
2573
      }
2574

UNCOV
2575
      sversion = TSDBROW_SVERSION(pRow);
×
UNCOV
2576
      if (sversion != -1) {
×
UNCOV
2577
        TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
×
2578

UNCOV
2579
        pTSchema = pr->pCurrSchema;
×
2580
      }
UNCOV
2581
      nCol = pTSchema->numOfCols;
×
2582

UNCOV
2583
      STsdbRowKey tsdbRowKey = {0};
×
UNCOV
2584
      tsdbRowGetKey(pRow, &tsdbRowKey);
×
2585

UNCOV
2586
      TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
×
2587

UNCOV
2588
      iCol = 0;
×
UNCOV
2589
      for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
×
UNCOV
2590
           pColVal = tsdbRowIterNext(&rowIter), iCol++) {
×
UNCOV
2591
        int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
×
UNCOV
2592
        if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
2593
          SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol];
×
2594

UNCOV
2595
          int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key);
×
2596
          if (cmp_res <= 0) {
×
UNCOV
2597
            SLastCol lastCol = {
×
2598
                .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
UNCOV
2599
            TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
×
2600

UNCOV
2601
            tsdbCacheFreeSLastColItem(pTargetCol);
×
UNCOV
2602
            taosArraySet(pLastArray, *pjCol, &lastCol);
×
2603
          }
2604

UNCOV
2605
          TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
×
2606
        }
2607
      }
UNCOV
2608
      tsdbRowClose(&rowIter);
×
2609

UNCOV
2610
      pRow = memRowIterGet(&iter, false, NULL, 0);
×
2611
    }
2612
  }
2613

2614
_exit:
×
UNCOV
2615
  if (code) {
×
UNCOV
2616
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2617

UNCOV
2618
    tsdbRowClose(&rowIter);
×
2619
  }
2620

UNCOV
2621
  tSimpleHashCleanup(iColHash);
×
2622

UNCOV
2623
  memRowIterClose(&iter);
×
2624

2625
  TAOS_RETURN(code);
×
2626
}
2627

UNCOV
2628
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
×
UNCOV
2629
  int32_t code = 0;
×
UNCOV
2630
  int32_t lino = 0;
×
2631

UNCOV
2632
  SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
×
UNCOV
2633
  if (!keyArray) {
×
UNCOV
2634
    TAOS_CHECK_EXIT(terrno);
×
2635
  }
2636

UNCOV
2637
  TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
×
2638

UNCOV
2639
  if (tsUpdateCacheBatch) {
×
UNCOV
2640
    TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
×
2641
  }
2642

UNCOV
2643
_exit:
×
UNCOV
2644
  if (code) {
×
UNCOV
2645
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2646
  }
2647

UNCOV
2648
  if (keyArray) {
×
UNCOV
2649
    taosArrayDestroy(keyArray);
×
2650
  }
2651

UNCOV
2652
  TAOS_RETURN(code);
×
2653
}
2654

UNCOV
2655
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
×
UNCOV
2656
  int32_t   code = 0, lino = 0;
×
UNCOV
2657
  STSchema *pTSchema = NULL;
×
UNCOV
2658
  int       sver = -1;
×
UNCOV
2659
  int       numKeys = 0;
×
UNCOV
2660
  SArray   *remainCols = NULL;
×
2661

UNCOV
2662
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
2663

UNCOV
2664
  int numCols = pTSchema->numOfCols;
×
2665

UNCOV
2666
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
2667

UNCOV
2668
  for (int i = 0; i < numCols; ++i) {
×
2669
    int16_t cid = pTSchema->columns[i].colId;
×
UNCOV
2670
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
×
UNCOV
2671
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
×
UNCOV
2672
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
×
UNCOV
2673
      if (h) {
×
UNCOV
2674
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
×
UNCOV
2675
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
×
UNCOV
2676
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
×
UNCOV
2677
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
×
2678
                              .dirty = 1,
2679
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
UNCOV
2680
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
×
2681
        }
UNCOV
2682
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
×
UNCOV
2683
        TAOS_CHECK_EXIT(code);
×
2684
      } else {
2685
        if (!remainCols) {
×
2686
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
×
2687
        }
UNCOV
2688
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
×
UNCOV
2689
          TAOS_CHECK_EXIT(terrno);
×
2690
        }
2691
      }
2692
    }
2693
  }
2694

UNCOV
2695
  if (remainCols) {
×
UNCOV
2696
    numKeys = TARRAY_SIZE(remainCols);
×
2697
  }
2698

UNCOV
2699
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
×
UNCOV
2700
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
×
UNCOV
2701
  char  **values_list = NULL;
×
UNCOV
2702
  size_t *values_list_sizes = NULL;
×
2703

UNCOV
2704
  if (!keys_list || !keys_list_sizes) {
×
UNCOV
2705
    code = terrno;
×
UNCOV
2706
    goto _exit;
×
2707
  }
UNCOV
2708
  const size_t klen = ROCKS_KEY_LEN;
×
2709

UNCOV
2710
  for (int i = 0; i < numKeys; ++i) {
×
UNCOV
2711
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
×
UNCOV
2712
    if (!key) {
×
UNCOV
2713
      code = terrno;
×
2714
      goto _exit;
×
2715
    }
2716
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
×
2717

2718
    ((SLastKey *)key)[0] = idxKey->key;
×
2719

UNCOV
2720
    keys_list[i] = key;
×
UNCOV
2721
    keys_list_sizes[i] = klen;
×
2722
  }
2723

2724
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
×
2725

UNCOV
2726
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
×
2727
                                              &values_list, &values_list_sizes),
2728
                  NULL, _exit);
2729

2730
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
2731
  for (int i = 0; i < numKeys; ++i) {
×
2732
    SLastCol *pLastCol = NULL;
×
UNCOV
2733
    if (values_list[i] != NULL) {
×
2734
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
2735
      if (code != TSDB_CODE_SUCCESS) {
×
2736
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2737
                  tstrerror(code));
UNCOV
2738
        goto _exit;
×
2739
      }
2740
    }
UNCOV
2741
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
×
UNCOV
2742
    SLastKey *pLastKey = &idxKey->key;
×
UNCOV
2743
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
×
UNCOV
2744
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
×
UNCOV
2745
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
×
2746
                             .dirty = 0,
2747
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2748

UNCOV
2749
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
×
UNCOV
2750
        taosMemoryFreeClear(pLastCol);
×
UNCOV
2751
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
2752
        goto _exit;
×
2753
      }
UNCOV
2754
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
×
UNCOV
2755
        taosMemoryFreeClear(pLastCol);
×
UNCOV
2756
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
2757
        goto _exit;
×
2758
      }
2759
    }
2760

UNCOV
2761
    if (pLastCol == NULL) {
×
UNCOV
2762
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
×
2763
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2764
    }
2765

UNCOV
2766
    taosMemoryFreeClear(pLastCol);
×
2767
  }
2768

UNCOV
2769
  rocksMayWrite(pTsdb, false);
×
2770

UNCOV
2771
_exit:
×
UNCOV
2772
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
2773

UNCOV
2774
  for (int i = 0; i < numKeys; ++i) {
×
UNCOV
2775
    taosMemoryFree(keys_list[i]);
×
2776
  }
UNCOV
2777
  taosMemoryFree(keys_list);
×
2778
  taosMemoryFree(keys_list_sizes);
×
UNCOV
2779
  if (values_list) {
×
2780
 #if USE_ROCKSDB   
UNCOV
2781
    for (int i = 0; i < numKeys; ++i) {
×
UNCOV
2782
      rocksdb_free(values_list[i]);
×
2783
    }
2784
#endif
UNCOV
2785
    taosMemoryFree(values_list);
×
2786
  }
UNCOV
2787
  taosMemoryFree(values_list_sizes);
×
UNCOV
2788
  taosArrayDestroy(remainCols);
×
UNCOV
2789
  taosMemoryFree(pTSchema);
×
2790

UNCOV
2791
  TAOS_RETURN(code);
×
2792
}
2793

2794
int32_t tsdbOpenCache(STsdb *pTsdb) {
20✔
2795
  int32_t code = 0, lino = 0;
20✔
2796
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
20✔
2797

2798
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
20✔
2799
  if (pCache == NULL) {
20!
UNCOV
2800
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2801
  }
2802

2803
  TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
20!
2804

2805
  TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
20!
2806

2807
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
20!
2808

2809
  taosLRUCacheSetStrictCapacity(pCache, false);
20✔
2810

2811
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
20✔
2812

2813
_err:
20✔
2814
  if (code) {
20!
UNCOV
2815
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2816
  }
2817

2818
  pTsdb->lruCache = pCache;
20✔
2819

2820
  TAOS_RETURN(code);
20✔
2821
}
2822

2823
void tsdbCloseCache(STsdb *pTsdb) {
20✔
2824
  SLRUCache *pCache = pTsdb->lruCache;
20✔
2825
  if (pCache) {
20!
2826
    taosLRUCacheEraseUnrefEntries(pCache);
20✔
2827

2828
    taosLRUCacheCleanup(pCache);
20✔
2829

2830
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
20✔
2831
  }
2832

2833
  tsdbCloseBCache(pTsdb);
20✔
2834
  tsdbClosePgCache(pTsdb);
20✔
2835
  tsdbCloseRocksCache(pTsdb);
20✔
2836
}
20✔
2837

2838
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
2839
  if (cacheType == 0) {  // last_row
×
UNCOV
2840
    *(uint64_t *)key = (uint64_t)uid;
×
2841
  } else {  // last
UNCOV
2842
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2843
  }
2844

UNCOV
2845
  *len = sizeof(uint64_t);
×
2846
}
×
2847

UNCOV
2848
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
2849
  tb_uid_t suid = 0;
×
2850

UNCOV
2851
  SMetaReader mr = {0};
×
2852
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
2853
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
UNCOV
2854
    metaReaderClear(&mr);  // table not esist
×
UNCOV
2855
    return 0;
×
2856
  }
2857

UNCOV
2858
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
2859
    suid = mr.me.ctbEntry.suid;
×
2860
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
2861
    suid = 0;
×
2862
  } else {
2863
    suid = 0;
×
2864
  }
2865

UNCOV
2866
  metaReaderClear(&mr);
×
2867

UNCOV
2868
  return suid;
×
2869
}
2870

UNCOV
2871
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
UNCOV
2872
  int32_t code = 0;
×
2873

UNCOV
2874
  if (pDelIdx) {
×
UNCOV
2875
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
2876
  }
2877

2878
  TAOS_RETURN(code);
×
2879
}
2880

UNCOV
2881
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
UNCOV
2882
  int32_t   code = 0;
×
UNCOV
2883
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
2884

UNCOV
2885
  for (; pDelData; pDelData = pDelData->pNext) {
×
UNCOV
2886
    if (!taosArrayPush(aDelData, pDelData)) {
×
UNCOV
2887
      TAOS_RETURN(terrno);
×
2888
    }
2889
  }
2890

UNCOV
2891
  TAOS_RETURN(code);
×
2892
}
2893

UNCOV
2894
static uint64_t *getUidList(SCacheRowsReader *pReader) {
×
UNCOV
2895
  if (!pReader->uidList) {
×
UNCOV
2896
    int32_t numOfTables = pReader->numOfTables;
×
2897

UNCOV
2898
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
×
UNCOV
2899
    if (!pReader->uidList) {
×
2900
      return NULL;
×
2901
    }
2902

UNCOV
2903
    for (int32_t i = 0; i < numOfTables; ++i) {
×
UNCOV
2904
      uint64_t uid = pReader->pTableList[i].uid;
×
UNCOV
2905
      pReader->uidList[i] = uid;
×
2906
    }
2907

UNCOV
2908
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
×
2909
  }
2910

UNCOV
2911
  return pReader->uidList;
×
2912
}
2913

UNCOV
2914
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
×
2915
                               bool isFile) {
UNCOV
2916
  int32_t   code = 0;
×
UNCOV
2917
  int32_t   numOfTables = pReader->numOfTables;
×
2918
  int64_t   suid = pReader->info.suid;
×
UNCOV
2919
  uint64_t *uidList = getUidList(pReader);
×
2920

UNCOV
2921
  if (!uidList) {
×
UNCOV
2922
    TAOS_RETURN(terrno);
×
2923
  }
2924

2925
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
×
UNCOV
2926
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
×
UNCOV
2927
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
×
UNCOV
2928
      continue;
×
2929
    }
2930

UNCOV
2931
    if (pTombBlk->minTbid.suid > suid ||
×
UNCOV
2932
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
×
2933
      break;
2934
    }
2935

UNCOV
2936
    STombBlock block = {0};
×
2937
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
×
2938
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
×
UNCOV
2939
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2940
      TAOS_RETURN(code);
×
2941
    }
2942

UNCOV
2943
    uint64_t        uid = uidList[j];
×
UNCOV
2944
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
×
2945
    if (!pInfo) {
×
2946
      tTombBlockDestroy(&block);
×
UNCOV
2947
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
2948
    }
2949

UNCOV
2950
    if (pInfo->pTombData == NULL) {
×
UNCOV
2951
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
×
2952
    }
2953

UNCOV
2954
    STombRecord record = {0};
×
UNCOV
2955
    bool        finished = false;
×
UNCOV
2956
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
×
2957
      code = tTombBlockGet(&block, k, &record);
×
2958
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2959
        finished = true;
×
UNCOV
2960
        break;
×
2961
      }
2962

UNCOV
2963
      if (record.suid < suid) {
×
UNCOV
2964
        continue;
×
2965
      }
UNCOV
2966
      if (record.suid > suid) {
×
UNCOV
2967
        finished = true;
×
UNCOV
2968
        break;
×
2969
      }
2970

2971
      bool newTable = false;
×
2972
      if (uid < record.uid) {
×
2973
        while (j < numOfTables && uidList[j] < record.uid) {
×
UNCOV
2974
          ++j;
×
UNCOV
2975
          newTable = true;
×
2976
        }
2977

2978
        if (j >= numOfTables) {
×
2979
          finished = true;
×
2980
          break;
×
2981
        }
2982

UNCOV
2983
        uid = uidList[j];
×
2984
      }
2985

UNCOV
2986
      if (record.uid < uid) {
×
UNCOV
2987
        continue;
×
2988
      }
2989

UNCOV
2990
      if (newTable) {
×
2991
        pInfo = getTableLoadInfo(pReader, uid);
×
UNCOV
2992
        if (!pInfo) {
×
UNCOV
2993
          code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2994
          finished = true;
×
UNCOV
2995
          break;
×
2996
        }
UNCOV
2997
        if (pInfo->pTombData == NULL) {
×
UNCOV
2998
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
×
2999
          if (!pInfo->pTombData) {
×
UNCOV
3000
            code = terrno;
×
UNCOV
3001
            finished = true;
×
UNCOV
3002
            break;
×
3003
          }
3004
        }
3005
      }
3006

UNCOV
3007
      if (record.version <= pReader->info.verRange.maxVer) {
×
3008
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
3009
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
3010

UNCOV
3011
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
×
UNCOV
3012
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
×
UNCOV
3013
          TAOS_RETURN(terrno);
×
3014
        }
3015
      }
3016
    }
3017

UNCOV
3018
    tTombBlockDestroy(&block);
×
3019

UNCOV
3020
    if (finished) {
×
UNCOV
3021
      TAOS_RETURN(code);
×
3022
    }
3023
  }
3024

UNCOV
3025
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
3026
}
3027

UNCOV
3028
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
×
UNCOV
3029
  const TTombBlkArray *pBlkArray = NULL;
×
3030

UNCOV
3031
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
×
3032

UNCOV
3033
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
×
3034
}
3035

UNCOV
3036
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
×
UNCOV
3037
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
×
UNCOV
3038
  const TTombBlkArray *pBlkArray = NULL;
×
3039

UNCOV
3040
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
×
3041

UNCOV
3042
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
×
3043
}
3044

3045
typedef struct {
3046
  SMergeTree  mergeTree;
3047
  SMergeTree *pMergeTree;
3048
} SFSLastIter;
3049

UNCOV
3050
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
×
3051
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
UNCOV
3052
  int32_t code = 0;
×
UNCOV
3053
  destroySttBlockReader(pr->pLDataIterArray, NULL);
×
UNCOV
3054
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
×
UNCOV
3055
  if (pr->pLDataIterArray == NULL) return terrno;
×
3056

UNCOV
3057
  SMergeTreeConf conf = {
×
3058
      .uid = uid,
3059
      .suid = suid,
3060
      .pTsdb = pTsdb,
3061
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
3062
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
3063
      .strictTimeRange = false,
3064
      .pSchema = pTSchema,
3065
      .pCurrentFileset = pFileSet,
3066
      .backward = 1,
UNCOV
3067
      .pSttFileBlockIterArray = pr->pLDataIterArray,
×
3068
      .pCols = aCols,
3069
      .numOfCols = nCols,
3070
      .loadTombFn = loadSttTomb,
3071
      .pReader = pr,
UNCOV
3072
      .idstr = pr->idstr,
×
UNCOV
3073
      .pCurRowKey = &pr->rowKey,
×
3074
  };
3075

UNCOV
3076
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
×
3077

UNCOV
3078
  iter->pMergeTree = &iter->mergeTree;
×
3079

3080
  TAOS_RETURN(code);
×
3081
}
3082

UNCOV
3083
static int32_t lastIterClose(SFSLastIter **iter) {
×
UNCOV
3084
  int32_t code = 0;
×
3085

UNCOV
3086
  if ((*iter)->pMergeTree) {
×
UNCOV
3087
    tMergeTreeClose((*iter)->pMergeTree);
×
UNCOV
3088
    (*iter)->pMergeTree = NULL;
×
3089
  }
3090

UNCOV
3091
  *iter = NULL;
×
3092

UNCOV
3093
  TAOS_RETURN(code);
×
3094
}
3095

UNCOV
3096
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
×
UNCOV
3097
  bool hasVal = false;
×
UNCOV
3098
  *ppRow = NULL;
×
3099

UNCOV
3100
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
×
UNCOV
3101
  if (code != 0) {
×
UNCOV
3102
    return code;
×
3103
  }
3104

UNCOV
3105
  if (!hasVal) {
×
UNCOV
3106
    *ppRow = NULL;
×
UNCOV
3107
    TAOS_RETURN(code);
×
3108
  }
3109

UNCOV
3110
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
×
UNCOV
3111
  TAOS_RETURN(code);
×
3112
}
3113

3114
typedef enum SFSNEXTROWSTATES {
3115
  SFSNEXTROW_FS,
3116
  SFSNEXTROW_FILESET,
3117
  SFSNEXTROW_INDEXLIST,
3118
  SFSNEXTROW_BRINBLOCK,
3119
  SFSNEXTROW_BRINRECORD,
3120
  SFSNEXTROW_BLOCKDATA,
3121
  SFSNEXTROW_BLOCKROW,
3122
  SFSNEXTROW_NEXTSTTROW
3123
} SFSNEXTROWSTATES;
3124

3125
struct CacheNextRowIter;
3126

3127
typedef struct SFSNextRowIter {
3128
  SFSNEXTROWSTATES         state;         // [input]
3129
  SBlockIdx               *pBlockIdxExp;  // [input]
3130
  STSchema                *pTSchema;      // [input]
3131
  tb_uid_t                 suid;
3132
  tb_uid_t                 uid;
3133
  int32_t                  iFileSet;
3134
  STFileSet               *pFileSet;
3135
  TFileSetArray           *aDFileSet;
3136
  SArray                  *pIndexList;
3137
  int32_t                  iBrinIndex;
3138
  SBrinBlock               brinBlock;
3139
  SBrinBlock              *pBrinBlock;
3140
  int32_t                  iBrinRecord;
3141
  SBrinRecord              brinRecord;
3142
  SBlockData               blockData;
3143
  SBlockData              *pBlockData;
3144
  int32_t                  nRow;
3145
  int32_t                  iRow;
3146
  TSDBROW                  row;
3147
  int64_t                  lastTs;
3148
  SFSLastIter              lastIter;
3149
  SFSLastIter             *pLastIter;
3150
  int8_t                   lastEmpty;
3151
  TSDBROW                 *pLastRow;
3152
  SRow                    *pTSRow;
3153
  SRowMerger               rowMerger;
3154
  SCacheRowsReader        *pr;
3155
  struct CacheNextRowIter *pRowIter;
3156
} SFSNextRowIter;
3157

3158
static void clearLastFileSet(SFSNextRowIter *state);
3159

UNCOV
3160
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
×
3161
                                int nCols) {
UNCOV
3162
  int32_t         code = 0, lino = 0;
×
UNCOV
3163
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
×
UNCOV
3164
  STsdb          *pTsdb = state->pr->pTsdb;
×
3165

UNCOV
3166
  if (SFSNEXTROW_FS == state->state) {
×
UNCOV
3167
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
×
3168

UNCOV
3169
    state->state = SFSNEXTROW_FILESET;
×
3170
  }
3171

UNCOV
3172
  if (SFSNEXTROW_FILESET == state->state) {
×
UNCOV
3173
  _next_fileset:
×
UNCOV
3174
    clearLastFileSet(state);
×
3175

UNCOV
3176
    if (--state->iFileSet < 0) {
×
UNCOV
3177
      *ppRow = NULL;
×
3178

UNCOV
3179
      TAOS_RETURN(code);
×
3180
    } else {
UNCOV
3181
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
×
3182
    }
3183

UNCOV
3184
    STFileObj **pFileObj = state->pFileSet->farr;
×
UNCOV
3185
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
×
UNCOV
3186
      if (state->pFileSet != state->pr->pCurFileSet) {
×
UNCOV
3187
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
×
UNCOV
3188
        const char           *filesName[4] = {0};
×
UNCOV
3189
        if (pFileObj[0] != NULL) {
×
UNCOV
3190
          conf.files[0].file = *pFileObj[0]->f;
×
UNCOV
3191
          conf.files[0].exist = true;
×
UNCOV
3192
          filesName[0] = pFileObj[0]->fname;
×
3193

UNCOV
3194
          conf.files[1].file = *pFileObj[1]->f;
×
3195
          conf.files[1].exist = true;
×
UNCOV
3196
          filesName[1] = pFileObj[1]->fname;
×
3197

UNCOV
3198
          conf.files[2].file = *pFileObj[2]->f;
×
UNCOV
3199
          conf.files[2].exist = true;
×
UNCOV
3200
          filesName[2] = pFileObj[2]->fname;
×
3201
        }
3202

UNCOV
3203
        if (pFileObj[3] != NULL) {
×
3204
          conf.files[3].exist = true;
×
UNCOV
3205
          conf.files[3].file = *pFileObj[3]->f;
×
UNCOV
3206
          filesName[3] = pFileObj[3]->fname;
×
3207
        }
3208

UNCOV
3209
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
×
3210

UNCOV
3211
        state->pr->pCurFileSet = state->pFileSet;
×
3212

UNCOV
3213
        code = loadDataTomb(state->pr, state->pr->pFileReader);
×
UNCOV
3214
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3215
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3216
                    tstrerror(code));
3217
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3218
        }
3219

3220
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
×
3221
      }
3222

UNCOV
3223
      if (!state->pIndexList) {
×
UNCOV
3224
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
×
UNCOV
3225
        if (!state->pIndexList) {
×
UNCOV
3226
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3227
        }
3228
      } else {
UNCOV
3229
        taosArrayClear(state->pIndexList);
×
3230
      }
3231

UNCOV
3232
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
×
3233

UNCOV
3234
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
×
UNCOV
3235
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
×
UNCOV
3236
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
×
UNCOV
3237
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
×
UNCOV
3238
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
×
UNCOV
3239
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3240
            }
3241
          }
UNCOV
3242
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
UNCOV
3243
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
3244
          break;
3245
        }
3246
      }
3247

UNCOV
3248
      int indexSize = TARRAY_SIZE(state->pIndexList);
×
UNCOV
3249
      if (indexSize <= 0) {
×
UNCOV
3250
        goto _check_stt_data;
×
3251
      }
3252

UNCOV
3253
      state->state = SFSNEXTROW_INDEXLIST;
×
UNCOV
3254
      state->iBrinIndex = 1;
×
3255
    }
3256

UNCOV
3257
  _check_stt_data:
×
UNCOV
3258
    if (state->pFileSet != state->pr->pCurFileSet) {
×
UNCOV
3259
      state->pr->pCurFileSet = state->pFileSet;
×
3260
    }
3261

UNCOV
3262
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
×
3263
                                 state->pr, state->lastTs, aCols, nCols),
3264
                    &lino, _err);
3265

UNCOV
3266
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
×
3267

UNCOV
3268
    if (!state->pLastRow) {
×
UNCOV
3269
      state->lastEmpty = 1;
×
3270

UNCOV
3271
      if (SFSNEXTROW_INDEXLIST != state->state) {
×
UNCOV
3272
        clearLastFileSet(state);
×
UNCOV
3273
        goto _next_fileset;
×
3274
      }
3275
    } else {
3276
      state->lastEmpty = 0;
×
3277

3278
      if (SFSNEXTROW_INDEXLIST != state->state) {
×
UNCOV
3279
        state->state = SFSNEXTROW_NEXTSTTROW;
×
3280

UNCOV
3281
        *ppRow = state->pLastRow;
×
UNCOV
3282
        state->pLastRow = NULL;
×
3283

UNCOV
3284
        TAOS_RETURN(code);
×
3285
      }
3286
    }
3287

UNCOV
3288
    state->pLastIter = &state->lastIter;
×
3289
  }
3290

UNCOV
3291
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
×
UNCOV
3292
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
×
3293

UNCOV
3294
    if (!state->pLastRow) {
×
UNCOV
3295
      if (state->pLastIter) {
×
UNCOV
3296
        code = lastIterClose(&state->pLastIter);
×
3297
        if (code != TSDB_CODE_SUCCESS) {
×
3298
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3299
                    tstrerror(code));
3300
          TAOS_RETURN(code);
×
3301
        }
3302
      }
3303

3304
      clearLastFileSet(state);
×
3305
      state->state = SFSNEXTROW_FILESET;
×
UNCOV
3306
      goto _next_fileset;
×
3307
    } else {
UNCOV
3308
      *ppRow = state->pLastRow;
×
UNCOV
3309
      state->pLastRow = NULL;
×
3310

UNCOV
3311
      TAOS_RETURN(code);
×
3312
    }
3313
  }
3314

UNCOV
3315
  if (SFSNEXTROW_INDEXLIST == state->state) {
×
UNCOV
3316
    SBrinBlk *pBrinBlk = NULL;
×
UNCOV
3317
  _next_brinindex:
×
UNCOV
3318
    if (--state->iBrinIndex < 0) {
×
UNCOV
3319
      if (state->pLastRow) {
×
UNCOV
3320
        state->state = SFSNEXTROW_NEXTSTTROW;
×
UNCOV
3321
        *ppRow = state->pLastRow;
×
UNCOV
3322
        state->pLastRow = NULL;
×
UNCOV
3323
        return code;
×
3324
      }
3325

3326
      clearLastFileSet(state);
×
UNCOV
3327
      goto _next_fileset;
×
3328
    } else {
UNCOV
3329
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
×
3330
    }
3331

UNCOV
3332
    if (!state->pBrinBlock) {
×
UNCOV
3333
      state->pBrinBlock = &state->brinBlock;
×
3334
    } else {
3335
      tBrinBlockClear(&state->brinBlock);
×
3336
    }
3337

UNCOV
3338
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
×
3339

UNCOV
3340
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
×
UNCOV
3341
    state->state = SFSNEXTROW_BRINBLOCK;
×
3342
  }
3343

UNCOV
3344
  if (SFSNEXTROW_BRINBLOCK == state->state) {
×
UNCOV
3345
  _next_brinrecord:
×
UNCOV
3346
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
×
UNCOV
3347
      tBrinBlockClear(&state->brinBlock);
×
UNCOV
3348
      goto _next_brinindex;
×
3349
    }
3350

UNCOV
3351
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
×
3352

UNCOV
3353
    SBrinRecord *pRecord = &state->brinRecord;
×
UNCOV
3354
    if (pRecord->uid != state->uid) {
×
3355
      // TODO: goto next brin block early
UNCOV
3356
      --state->iBrinRecord;
×
UNCOV
3357
      goto _next_brinrecord;
×
3358
    }
3359

UNCOV
3360
    state->state = SFSNEXTROW_BRINRECORD;
×
3361
  }
3362

UNCOV
3363
  if (SFSNEXTROW_BRINRECORD == state->state) {
×
UNCOV
3364
    SBrinRecord *pRecord = &state->brinRecord;
×
3365

UNCOV
3366
    if (!state->pBlockData) {
×
UNCOV
3367
      state->pBlockData = &state->blockData;
×
3368

3369
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
×
3370
    } else {
UNCOV
3371
      tBlockDataReset(state->pBlockData);
×
3372
    }
3373

UNCOV
3374
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
×
3375
      --nCols;
×
3376
      ++aCols;
×
3377
    }
3378

UNCOV
3379
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
×
3380
                                                      state->pTSchema, aCols, nCols),
3381
                    &lino, _err);
3382

UNCOV
3383
    state->nRow = state->blockData.nRow;
×
UNCOV
3384
    state->iRow = state->nRow - 1;
×
3385

UNCOV
3386
    state->state = SFSNEXTROW_BLOCKROW;
×
3387
  }
3388

3389
  if (SFSNEXTROW_BLOCKROW == state->state) {
×
UNCOV
3390
    if (state->iRow < 0) {
×
3391
      --state->iBrinRecord;
×
UNCOV
3392
      goto _next_brinrecord;
×
3393
    }
3394

UNCOV
3395
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
×
UNCOV
3396
    if (!state->pLastIter) {
×
UNCOV
3397
      *ppRow = &state->row;
×
UNCOV
3398
      --state->iRow;
×
UNCOV
3399
      return code;
×
3400
    }
3401

3402
    if (!state->pLastRow) {
×
3403
      // get next row from fslast and process with fs row, --state->Row if select fs row
3404
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
×
3405
    }
3406

3407
    if (!state->pLastRow) {
×
3408
      if (state->pLastIter) {
×
3409
        code = lastIterClose(&state->pLastIter);
×
3410
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3411
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3412
                    tstrerror(code));
UNCOV
3413
          TAOS_RETURN(code);
×
3414
        }
3415
      }
3416

3417
      *ppRow = &state->row;
×
3418
      --state->iRow;
×
UNCOV
3419
      return code;
×
3420
    }
3421

3422
    // process state->pLastRow & state->row
3423
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
3424
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
UNCOV
3425
    if (lastRowTs > rowTs) {
×
3426
      *ppRow = state->pLastRow;
×
3427
      state->pLastRow = NULL;
×
3428

UNCOV
3429
      TAOS_RETURN(code);
×
UNCOV
3430
    } else if (lastRowTs < rowTs) {
×
3431
      *ppRow = &state->row;
×
UNCOV
3432
      --state->iRow;
×
3433

3434
      TAOS_RETURN(code);
×
3435
    } else {
3436
      // TODO: merge rows and *ppRow = mergedRow
3437
      SRowMerger *pMerger = &state->rowMerger;
×
UNCOV
3438
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
3439
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3440
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3441
                  tstrerror(code));
UNCOV
3442
        TAOS_RETURN(code);
×
3443
      }
3444

UNCOV
3445
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
3446
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
3447

3448
      if (state->pTSRow) {
×
3449
        taosMemoryFree(state->pTSRow);
×
UNCOV
3450
        state->pTSRow = NULL;
×
3451
      }
3452

3453
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
3454

UNCOV
3455
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
UNCOV
3456
      *ppRow = &state->row;
×
UNCOV
3457
      --state->iRow;
×
3458

UNCOV
3459
      tsdbRowMergerClear(pMerger);
×
3460

UNCOV
3461
      TAOS_RETURN(code);
×
3462
    }
3463
  }
3464

UNCOV
3465
_err:
×
UNCOV
3466
  clearLastFileSet(state);
×
3467

UNCOV
3468
  *ppRow = NULL;
×
3469

UNCOV
3470
  if (code) {
×
UNCOV
3471
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3472
              tstrerror(code));
3473
  }
3474

3475
  TAOS_RETURN(code);
×
3476
}
3477

3478
typedef struct CacheNextRowIter {
3479
  SArray           *pMemDelData;
3480
  SArray           *pSkyline;
3481
  int64_t           iSkyline;
3482
  SBlockIdx         idx;
3483
  SMemNextRowIter   memState;
3484
  SMemNextRowIter   imemState;
3485
  SFSNextRowIter    fsState;
3486
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
3487
  TsdbNextRowState  input[3];
3488
  SCacheRowsReader *pr;
3489
  STsdb            *pTsdb;
3490
} CacheNextRowIter;
3491

UNCOV
3492
int32_t clearNextRowFromFS(void *iter) {
×
UNCOV
3493
  int32_t code = 0;
×
3494

UNCOV
3495
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
×
UNCOV
3496
  if (!state) {
×
UNCOV
3497
    TAOS_RETURN(code);
×
3498
  }
3499

UNCOV
3500
  if (state->pLastIter) {
×
UNCOV
3501
    code = lastIterClose(&state->pLastIter);
×
3502
    if (code != TSDB_CODE_SUCCESS) {
×
3503
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3504
      TAOS_RETURN(code);
×
3505
    }
3506
  }
3507

UNCOV
3508
  if (state->pBlockData) {
×
UNCOV
3509
    tBlockDataDestroy(state->pBlockData);
×
UNCOV
3510
    state->pBlockData = NULL;
×
3511
  }
3512

UNCOV
3513
  if (state->pBrinBlock) {
×
UNCOV
3514
    tBrinBlockDestroy(state->pBrinBlock);
×
UNCOV
3515
    state->pBrinBlock = NULL;
×
3516
  }
3517

3518
  if (state->pIndexList) {
×
3519
    taosArrayDestroy(state->pIndexList);
×
UNCOV
3520
    state->pIndexList = NULL;
×
3521
  }
3522

UNCOV
3523
  if (state->pTSRow) {
×
3524
    taosMemoryFree(state->pTSRow);
×
3525
    state->pTSRow = NULL;
×
3526
  }
3527

UNCOV
3528
  if (state->pRowIter->pSkyline) {
×
UNCOV
3529
    taosArrayDestroy(state->pRowIter->pSkyline);
×
UNCOV
3530
    state->pRowIter->pSkyline = NULL;
×
3531
  }
3532

UNCOV
3533
  TAOS_RETURN(code);
×
3534
}
3535

3536
static void clearLastFileSet(SFSNextRowIter *state) {
×
3537
  if (state->pLastIter) {
×
UNCOV
3538
    int code = lastIterClose(&state->pLastIter);
×
UNCOV
3539
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3540
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3541
      return;
×
3542
    }
3543
  }
3544

UNCOV
3545
  if (state->pBlockData) {
×
UNCOV
3546
    tBlockDataDestroy(state->pBlockData);
×
UNCOV
3547
    state->pBlockData = NULL;
×
3548
  }
3549

UNCOV
3550
  if (state->pr->pFileReader) {
×
UNCOV
3551
    tsdbDataFileReaderClose(&state->pr->pFileReader);
×
UNCOV
3552
    state->pr->pFileReader = NULL;
×
3553

UNCOV
3554
    state->pr->pCurFileSet = NULL;
×
3555
  }
3556

UNCOV
3557
  if (state->pTSRow) {
×
UNCOV
3558
    taosMemoryFree(state->pTSRow);
×
UNCOV
3559
    state->pTSRow = NULL;
×
3560
  }
3561

UNCOV
3562
  if (state->pRowIter->pSkyline) {
×
UNCOV
3563
    taosArrayDestroy(state->pRowIter->pSkyline);
×
UNCOV
3564
    state->pRowIter->pSkyline = NULL;
×
3565

3566
    void   *pe = NULL;
×
UNCOV
3567
    int32_t iter = 0;
×
UNCOV
3568
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
×
UNCOV
3569
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
×
UNCOV
3570
      taosArrayDestroy(pInfo->pTombData);
×
UNCOV
3571
      pInfo->pTombData = NULL;
×
3572
    }
3573
  }
3574
}
3575

UNCOV
3576
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
×
3577
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
3578
                               SCacheRowsReader *pr) {
UNCOV
3579
  int32_t code = 0, lino = 0;
×
3580

UNCOV
3581
  STbData *pMem = NULL;
×
UNCOV
3582
  if (pReadSnap->pMem) {
×
UNCOV
3583
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
×
3584
  }
3585

UNCOV
3586
  STbData *pIMem = NULL;
×
UNCOV
3587
  if (pReadSnap->pIMem) {
×
UNCOV
3588
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
3589
  }
3590

UNCOV
3591
  pIter->pTsdb = pTsdb;
×
3592

UNCOV
3593
  pIter->pMemDelData = NULL;
×
3594

UNCOV
3595
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
×
3596

UNCOV
3597
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
×
3598

UNCOV
3599
  pIter->fsState.pRowIter = pIter;
×
UNCOV
3600
  pIter->fsState.state = SFSNEXTROW_FS;
×
3601
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
×
3602
  pIter->fsState.pBlockIdxExp = &pIter->idx;
×
3603
  pIter->fsState.pTSchema = pTSchema;
×
3604
  pIter->fsState.suid = suid;
×
3605
  pIter->fsState.uid = uid;
×
UNCOV
3606
  pIter->fsState.lastTs = lastTs;
×
UNCOV
3607
  pIter->fsState.pr = pr;
×
3608

UNCOV
3609
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
×
UNCOV
3610
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
×
UNCOV
3611
  pIter->input[2] =
×
UNCOV
3612
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
×
3613

UNCOV
3614
  if (pMem) {
×
UNCOV
3615
    pIter->memState.pMem = pMem;
×
UNCOV
3616
    pIter->memState.state = SMEMNEXTROW_ENTER;
×
UNCOV
3617
    pIter->memState.lastTs = lastTs;
×
UNCOV
3618
    pIter->input[0].stop = false;
×
UNCOV
3619
    pIter->input[0].next = true;
×
3620
  }
3621

3622
  if (pIMem) {
×
UNCOV
3623
    pIter->imemState.pMem = pIMem;
×
UNCOV
3624
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
UNCOV
3625
    pIter->imemState.lastTs = lastTs;
×
UNCOV
3626
    pIter->input[1].stop = false;
×
UNCOV
3627
    pIter->input[1].next = true;
×
3628
  }
3629

UNCOV
3630
  pIter->pr = pr;
×
3631

UNCOV
3632
_err:
×
UNCOV
3633
  TAOS_RETURN(code);
×
3634
}
3635

UNCOV
3636
static void nextRowIterClose(CacheNextRowIter *pIter) {
×
UNCOV
3637
  for (int i = 0; i < 3; ++i) {
×
UNCOV
3638
    if (pIter->input[i].nextRowClearFn) {
×
UNCOV
3639
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
3640
    }
3641
  }
3642

UNCOV
3643
  if (pIter->pSkyline) {
×
UNCOV
3644
    taosArrayDestroy(pIter->pSkyline);
×
3645
  }
3646

UNCOV
3647
  if (pIter->pMemDelData) {
×
UNCOV
3648
    taosArrayDestroy(pIter->pMemDelData);
×
3649
  }
UNCOV
3650
}
×
3651

3652
// iterate next row non deleted backward ts, version (from high to low)
UNCOV
3653
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
×
3654
                              int16_t *aCols, int nCols) {
UNCOV
3655
  int32_t code = 0, lino = 0;
×
3656

UNCOV
3657
  for (;;) {
×
UNCOV
3658
    for (int i = 0; i < 3; ++i) {
×
UNCOV
3659
      if (pIter->input[i].next && !pIter->input[i].stop) {
×
UNCOV
3660
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
×
3661
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3662
                        &lino, _err);
3663

UNCOV
3664
        if (pIter->input[i].pRow == NULL) {
×
UNCOV
3665
          pIter->input[i].stop = true;
×
UNCOV
3666
          pIter->input[i].next = false;
×
3667
        }
3668
      }
3669
    }
3670

UNCOV
3671
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
×
UNCOV
3672
      *ppRow = NULL;
×
UNCOV
3673
      *pIgnoreEarlierTs =
×
UNCOV
3674
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
×
3675

UNCOV
3676
      TAOS_RETURN(code);
×
3677
    }
3678

3679
    // select maxpoint(s) from mem, imem, fs and last
UNCOV
3680
    TSDBROW *max[4] = {0};
×
UNCOV
3681
    int      iMax[4] = {-1, -1, -1, -1};
×
UNCOV
3682
    int      nMax = 0;
×
UNCOV
3683
    SRowKey  maxKey = {.ts = TSKEY_MIN};
×
3684

UNCOV
3685
    for (int i = 0; i < 3; ++i) {
×
UNCOV
3686
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
×
UNCOV
3687
        STsdbRowKey tsdbRowKey = {0};
×
UNCOV
3688
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
×
3689

3690
        // merging & deduplicating on client side
UNCOV
3691
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
×
UNCOV
3692
        if (c <= 0) {
×
UNCOV
3693
          if (c < 0) {
×
UNCOV
3694
            nMax = 0;
×
UNCOV
3695
            maxKey = tsdbRowKey.key;
×
3696
          }
3697

UNCOV
3698
          iMax[nMax] = i;
×
UNCOV
3699
          max[nMax++] = pIter->input[i].pRow;
×
3700
        }
UNCOV
3701
        pIter->input[i].next = false;
×
3702
      }
3703
    }
3704

3705
    // delete detection
UNCOV
3706
    TSDBROW *merge[4] = {0};
×
UNCOV
3707
    int      iMerge[4] = {-1, -1, -1, -1};
×
UNCOV
3708
    int      nMerge = 0;
×
UNCOV
3709
    for (int i = 0; i < nMax; ++i) {
×
UNCOV
3710
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
×
3711

UNCOV
3712
      if (!pIter->pSkyline) {
×
UNCOV
3713
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
×
UNCOV
3714
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
×
3715

UNCOV
3716
        uint64_t        uid = pIter->idx.uid;
×
UNCOV
3717
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
×
UNCOV
3718
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
×
3719

UNCOV
3720
        if (pInfo->pTombData == NULL) {
×
UNCOV
3721
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
×
UNCOV
3722
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
×
3723
        }
3724

UNCOV
3725
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
×
UNCOV
3726
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3727
        }
3728

UNCOV
3729
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
×
UNCOV
3730
        if (delSize > 0) {
×
UNCOV
3731
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
×
UNCOV
3732
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3733
        }
3734
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
×
3735
      }
3736

UNCOV
3737
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
×
3738
      if (!deleted) {
×
UNCOV
3739
        iMerge[nMerge] = iMax[i];
×
UNCOV
3740
        merge[nMerge++] = max[i];
×
3741
      }
3742

UNCOV
3743
      pIter->input[iMax[i]].next = deleted;
×
3744
    }
3745

UNCOV
3746
    if (nMerge > 0) {
×
UNCOV
3747
      pIter->input[iMerge[0]].next = true;
×
3748

UNCOV
3749
      *ppRow = merge[0];
×
3750

UNCOV
3751
      TAOS_RETURN(code);
×
3752
    }
3753
  }
3754

UNCOV
3755
_err:
×
UNCOV
3756
  if (code) {
×
UNCOV
3757
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3758
  }
3759

UNCOV
3760
  TAOS_RETURN(code);
×
3761
}
3762

UNCOV
3763
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
×
UNCOV
3764
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
×
UNCOV
3765
  if (NULL == pColArray) {
×
UNCOV
3766
    TAOS_RETURN(terrno);
×
3767
  }
3768

UNCOV
3769
  for (int32_t i = 0; i < nCols; ++i) {
×
UNCOV
3770
    int16_t  slotId = slotIds[i];
×
UNCOV
3771
    SLastCol col = {.rowKey.ts = 0,
×
UNCOV
3772
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
×
UNCOV
3773
    if (!taosArrayPush(pColArray, &col)) {
×
UNCOV
3774
      TAOS_RETURN(terrno);
×
3775
    }
3776
  }
UNCOV
3777
  *ppColArray = pColArray;
×
3778

UNCOV
3779
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
3780
}
3781

UNCOV
3782
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
×
3783
                            int nCols, int16_t *slotIds) {
UNCOV
3784
  int32_t   code = 0, lino = 0;
×
3785
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
×
UNCOV
3786
  int16_t   nLastCol = nCols;
×
UNCOV
3787
  int16_t   noneCol = 0;
×
UNCOV
3788
  bool      setNoneCol = false;
×
UNCOV
3789
  bool      hasRow = false;
×
UNCOV
3790
  bool      ignoreEarlierTs = false;
×
UNCOV
3791
  SArray   *pColArray = NULL;
×
UNCOV
3792
  SColVal  *pColVal = &(SColVal){0};
×
3793

UNCOV
3794
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
×
3795

UNCOV
3796
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
×
UNCOV
3797
  if (NULL == aColArray) {
×
UNCOV
3798
    taosArrayDestroy(pColArray);
×
3799

UNCOV
3800
    TAOS_RETURN(terrno);
×
3801
  }
3802

UNCOV
3803
  for (int i = 0; i < nCols; ++i) {
×
UNCOV
3804
    if (!taosArrayPush(aColArray, &aCols[i])) {
×
UNCOV
3805
      taosArrayDestroy(pColArray);
×
3806

UNCOV
3807
      TAOS_RETURN(terrno);
×
3808
    }
3809
  }
3810

UNCOV
3811
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
×
3812

3813
  // inverse iterator
UNCOV
3814
  CacheNextRowIter iter = {0};
×
3815
  code =
UNCOV
3816
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
×
UNCOV
3817
  TAOS_CHECK_GOTO(code, &lino, _err);
×
3818

3819
  do {
UNCOV
3820
    TSDBROW *pRow = NULL;
×
UNCOV
3821
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
×
3822

3823
    if (!pRow) {
×
UNCOV
3824
      break;
×
3825
    }
3826

3827
    hasRow = true;
×
3828

3829
    int32_t sversion = TSDBROW_SVERSION(pRow);
×
UNCOV
3830
    if (sversion != -1) {
×
UNCOV
3831
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
×
3832

UNCOV
3833
      pTSchema = pr->pCurrSchema;
×
3834
    }
3835
    // int16_t nCol = pTSchema->numOfCols;
3836

UNCOV
3837
    STsdbRowKey rowKey = {0};
×
UNCOV
3838
    tsdbRowGetKey(pRow, &rowKey);
×
3839

UNCOV
3840
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
×
UNCOV
3841
      lastRowKey = rowKey;
×
3842

UNCOV
3843
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
×
UNCOV
3844
        if (iCol >= nLastCol) {
×
UNCOV
3845
          break;
×
3846
        }
UNCOV
3847
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
×
UNCOV
3848
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
×
UNCOV
3849
          if (!setNoneCol) {
×
UNCOV
3850
            noneCol = iCol;
×
UNCOV
3851
            setNoneCol = true;
×
3852
          }
UNCOV
3853
          continue;
×
3854
        }
UNCOV
3855
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
×
UNCOV
3856
          continue;
×
3857
        }
UNCOV
3858
        if (slotIds[iCol] == 0) {
×
UNCOV
3859
          STColumn *pTColumn = &pTSchema->columns[0];
×
UNCOV
3860
          *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
×
3861

UNCOV
3862
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
×
UNCOV
3863
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
×
3864

UNCOV
3865
          taosArraySet(pColArray, 0, &colTmp);
×
UNCOV
3866
          continue;
×
3867
        }
UNCOV
3868
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
×
3869

UNCOV
3870
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
×
UNCOV
3871
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
×
3872

UNCOV
3873
        if (!COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
3874
          if (!setNoneCol) {
×
3875
            noneCol = iCol;
×
UNCOV
3876
            setNoneCol = true;
×
3877
          }
3878
        } else {
3879
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
×
UNCOV
3880
          if (aColIndex >= 0) {
×
UNCOV
3881
            taosArrayRemove(aColArray, aColIndex);
×
3882
          }
3883
        }
3884
      }
UNCOV
3885
      if (!setNoneCol) {
×
3886
        // done, goto return pColArray
UNCOV
3887
        break;
×
3888
      } else {
UNCOV
3889
        continue;
×
3890
      }
3891
    }
3892

3893
    // merge into pColArray
UNCOV
3894
    setNoneCol = false;
×
UNCOV
3895
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
×
UNCOV
3896
      if (iCol >= nLastCol) {
×
UNCOV
3897
        break;
×
3898
      }
3899
      // high version's column value
UNCOV
3900
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
×
UNCOV
3901
        continue;
×
3902
      }
3903

UNCOV
3904
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
×
UNCOV
3905
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
×
UNCOV
3906
        continue;
×
3907
      }
UNCOV
3908
      SColVal *tColVal = &lastColVal->colVal;
×
3909
      if (COL_VAL_IS_VALUE(tColVal)) continue;
×
3910

UNCOV
3911
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
×
UNCOV
3912
      if (COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
3913
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
×
UNCOV
3914
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
×
3915

UNCOV
3916
        tsdbCacheFreeSLastColItem(lastColVal);
×
UNCOV
3917
        taosArraySet(pColArray, iCol, &lastCol);
×
UNCOV
3918
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
×
UNCOV
3919
        if (aColIndex >= 0) {
×
UNCOV
3920
          taosArrayRemove(aColArray, aColIndex);
×
3921
        }
3922
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
×
3923
        noneCol = iCol;
×
UNCOV
3924
        setNoneCol = true;
×
3925
      }
3926
    }
3927
  } while (setNoneCol);
×
3928

3929
  if (!hasRow) {
×
3930
    if (ignoreEarlierTs) {
×
UNCOV
3931
      taosArrayDestroy(pColArray);
×
UNCOV
3932
      pColArray = NULL;
×
3933
    } else {
3934
      taosArrayClear(pColArray);
×
3935
    }
3936
  }
UNCOV
3937
  *ppLastArray = pColArray;
×
3938

UNCOV
3939
  nextRowIterClose(&iter);
×
UNCOV
3940
  taosArrayDestroy(aColArray);
×
3941

UNCOV
3942
  TAOS_RETURN(code);
×
3943

UNCOV
3944
_err:
×
UNCOV
3945
  nextRowIterClose(&iter);
×
3946
  // taosMemoryFreeClear(pTSchema);
UNCOV
3947
  *ppLastArray = NULL;
×
UNCOV
3948
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
UNCOV
3949
  taosArrayDestroy(aColArray);
×
3950

UNCOV
3951
  if (code) {
×
UNCOV
3952
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3953
              tstrerror(code));
3954
  }
3955

UNCOV
3956
  TAOS_RETURN(code);
×
3957
}
3958

UNCOV
3959
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
×
3960
                               int nCols, int16_t *slotIds) {
UNCOV
3961
  int32_t   code = 0, lino = 0;
×
3962
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
×
UNCOV
3963
  int16_t   nLastCol = nCols;
×
UNCOV
3964
  int16_t   noneCol = 0;
×
UNCOV
3965
  bool      setNoneCol = false;
×
UNCOV
3966
  bool      hasRow = false;
×
UNCOV
3967
  bool      ignoreEarlierTs = false;
×
UNCOV
3968
  SArray   *pColArray = NULL;
×
UNCOV
3969
  SColVal  *pColVal = &(SColVal){0};
×
3970

UNCOV
3971
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
×
3972

UNCOV
3973
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
×
UNCOV
3974
  if (NULL == aColArray) {
×
UNCOV
3975
    taosArrayDestroy(pColArray);
×
3976

UNCOV
3977
    TAOS_RETURN(terrno);
×
3978
  }
3979

UNCOV
3980
  for (int i = 0; i < nCols; ++i) {
×
UNCOV
3981
    if (!taosArrayPush(aColArray, &aCols[i])) {
×
UNCOV
3982
      taosArrayDestroy(pColArray);
×
3983

UNCOV
3984
      TAOS_RETURN(terrno);
×
3985
    }
3986
  }
3987

3988
  // inverse iterator
UNCOV
3989
  CacheNextRowIter iter = {0};
×
3990
  code =
UNCOV
3991
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
×
UNCOV
3992
  TAOS_CHECK_GOTO(code, &lino, _err);
×
3993

3994
  do {
3995
    TSDBROW *pRow = NULL;
×
UNCOV
3996
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
×
3997

UNCOV
3998
    if (!pRow) {
×
UNCOV
3999
      break;
×
4000
    }
4001

4002
    hasRow = true;
×
4003

UNCOV
4004
    int32_t sversion = TSDBROW_SVERSION(pRow);
×
UNCOV
4005
    if (sversion != -1) {
×
UNCOV
4006
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
×
4007

UNCOV
4008
      pTSchema = pr->pCurrSchema;
×
4009
    }
4010
    // int16_t nCol = pTSchema->numOfCols;
4011

UNCOV
4012
    STsdbRowKey rowKey = {0};
×
UNCOV
4013
    tsdbRowGetKey(pRow, &rowKey);
×
4014

UNCOV
4015
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
×
UNCOV
4016
      if (iCol >= nLastCol) {
×
UNCOV
4017
        break;
×
4018
      }
UNCOV
4019
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
×
UNCOV
4020
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
×
UNCOV
4021
        continue;
×
4022
      }
UNCOV
4023
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
×
UNCOV
4024
        continue;
×
4025
      }
UNCOV
4026
      if (slotIds[iCol] == 0) {
×
UNCOV
4027
        STColumn *pTColumn = &pTSchema->columns[0];
×
UNCOV
4028
        *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
×
4029

4030
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
×
4031
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
×
4032

UNCOV
4033
        taosArraySet(pColArray, 0, &colTmp);
×
UNCOV
4034
        continue;
×
4035
      }
UNCOV
4036
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
×
4037

UNCOV
4038
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
×
UNCOV
4039
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
×
4040

UNCOV
4041
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
×
UNCOV
4042
      if (aColIndex >= 0) {
×
4043
        taosArrayRemove(aColArray, aColIndex);
×
4044
      }
4045
    }
4046

4047
    break;
×
4048
  } while (1);
4049

4050
  if (!hasRow) {
×
4051
    if (ignoreEarlierTs) {
×
UNCOV
4052
      taosArrayDestroy(pColArray);
×
UNCOV
4053
      pColArray = NULL;
×
4054
    } else {
4055
      taosArrayClear(pColArray);
×
4056
    }
4057
  }
4058
  *ppLastArray = pColArray;
×
4059

UNCOV
4060
  nextRowIterClose(&iter);
×
UNCOV
4061
  taosArrayDestroy(aColArray);
×
4062

UNCOV
4063
  TAOS_RETURN(code);
×
4064

UNCOV
4065
_err:
×
UNCOV
4066
  nextRowIterClose(&iter);
×
4067

UNCOV
4068
  *ppLastArray = NULL;
×
UNCOV
4069
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
UNCOV
4070
  taosArrayDestroy(aColArray);
×
4071

UNCOV
4072
  if (code) {
×
UNCOV
4073
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4074
              tstrerror(code));
4075
  }
4076

UNCOV
4077
  TAOS_RETURN(code);
×
4078
}
4079

UNCOV
4080
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
×
4081

UNCOV
4082
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
×
UNCOV
4083
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
×
UNCOV
4084
}
×
4085

4086
#ifdef BUILD_NO_CALL
4087
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
4088
#endif
4089

4090
size_t tsdbCacheGetUsage(SVnode *pVnode) {
6✔
4091
  size_t usage = 0;
6✔
4092
  if (pVnode->pTsdb != NULL) {
6!
4093
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
6✔
4094
  }
4095

4096
  return usage;
6✔
4097
}
4098

4099
int32_t tsdbCacheGetElems(SVnode *pVnode) {
6✔
4100
  int32_t elems = 0;
6✔
4101
  if (pVnode->pTsdb != NULL) {
6!
4102
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
6✔
4103
  }
4104

4105
  return elems;
6✔
4106
}
4107

4108
#ifdef USE_S3
4109
// block cache
UNCOV
4110
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
×
4111
  struct {
4112
    int32_t fid;
4113
    int64_t commitID;
4114
    int64_t blkno;
4115
  } bKey = {0};
×
4116

4117
  bKey.fid = fid;
×
UNCOV
4118
  bKey.commitID = commitID;
×
4119
  bKey.blkno = blkno;
×
4120

UNCOV
4121
  *len = sizeof(bKey);
×
4122
  memcpy(key, &bKey, *len);
×
4123
}
×
4124

4125
static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
×
UNCOV
4126
  int32_t code = 0;
×
4127

4128
  int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
×
4129

4130
  TAOS_CHECK_RETURN(tcsGetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock));
×
4131

UNCOV
4132
  tsdbTrace("block:%p load from s3", *ppBlock);
×
4133

4134
_exit:
×
4135
  return code;
×
4136
}
4137

4138
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
×
4139
  (void)ud;
UNCOV
4140
  uint8_t *pBlock = (uint8_t *)value;
×
4141

4142
  taosMemoryFree(pBlock);
×
4143
}
×
4144

UNCOV
4145
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
4146
  int32_t code = 0;
×
UNCOV
4147
  char    key[128] = {0};
×
UNCOV
4148
  int     keyLen = 0;
×
4149

4150
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
UNCOV
4151
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
4152
  if (!h) {
×
UNCOV
4153
    STsdb *pTsdb = pFD->pTsdb;
×
UNCOV
4154
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
4155

UNCOV
4156
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
UNCOV
4157
    if (!h) {
×
4158
      uint8_t *pBlock = NULL;
×
UNCOV
4159
      code = tsdbCacheLoadBlockS3(pFD, &pBlock);
×
4160
      //  if table's empty or error, return code of -1
4161
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
UNCOV
4162
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4163

UNCOV
4164
        *handle = NULL;
×
UNCOV
4165
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
4166
          code = TSDB_CODE_OUT_OF_MEMORY;
×
4167
        }
4168

4169
        TAOS_RETURN(code);
×
4170
      }
4171

4172
      size_t              charge = tsS3BlockSize * pFD->szPage;
×
UNCOV
4173
      _taos_lru_deleter_t deleter = deleteBCache;
×
4174
      LRUStatus           status =
UNCOV
4175
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
4176
      if (status != TAOS_LRU_STATUS_OK) {
4177
        // code = -1;
4178
      }
4179
    }
4180

UNCOV
4181
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4182
  }
4183

4184
  *handle = h;
×
4185

4186
  TAOS_RETURN(code);
×
4187
}
4188

4189
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
×
4190
  int32_t code = 0;
×
UNCOV
4191
  char    key[128] = {0};
×
4192
  int     keyLen = 0;
×
4193

UNCOV
4194
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
4195
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
×
4196

UNCOV
4197
  return code;
×
4198
}
4199

UNCOV
4200
void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
×
4201
  char       key[128] = {0};
×
UNCOV
4202
  int        keyLen = 0;
×
4203
  LRUHandle *handle = NULL;
×
4204

UNCOV
4205
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
UNCOV
4206
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
×
UNCOV
4207
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
×
UNCOV
4208
  if (!handle) {
×
UNCOV
4209
    size_t              charge = pFD->szPage;
×
UNCOV
4210
    _taos_lru_deleter_t deleter = deleteBCache;
×
UNCOV
4211
    uint8_t            *pPg = taosMemoryMalloc(charge);
×
UNCOV
4212
    if (!pPg) {
×
UNCOV
4213
      return;  // ignore error with s3 cache and leave error untouched
×
4214
    }
UNCOV
4215
    memcpy(pPg, pPage, charge);
×
4216

4217
    LRUStatus status =
UNCOV
4218
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
×
4219
    if (status != TAOS_LRU_STATUS_OK) {
4220
      // ignore cache updating if not ok
4221
      // code = TSDB_CODE_OUT_OF_MEMORY;
4222
    }
4223
  }
UNCOV
4224
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4225

UNCOV
4226
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
4227
}
4228
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc