• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3541

26 Nov 2024 03:56AM UTC coverage: 60.776% (-0.07%) from 60.846%
#3541

push

travis-ci

web-flow
Merge pull request #28920 from taosdata/fix/TD-33008-3.0

fix(query)[TD-33008]. fix error handling in tsdbCacheRead

120076 of 252763 branches covered (47.51%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

1395 existing lines in 154 files now uncovered.

200995 of 275526 relevant lines covered (72.95%)

19612328.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.99
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tcs.h"
17
#include "tsdb.h"
18
#include "tsdbDataFileRW.h"
19
#include "tsdbIter.h"
20
#include "tsdbReadUtil.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
8,606,316✔
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
8,606,316✔
27
    tsdbTrace(" release lru cache failed");
75,231!
28
  }
29
}
8,617,242✔
30

31
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
13,712✔
32
  int32_t    code = 0, lino = 0;
13,712✔
33
  int32_t    szPage = pTsdb->pVnode->config.tsdbPageSize;
13,712✔
34
  int64_t    szBlock = tsS3BlockSize <= 1024 ? 1024 : tsS3BlockSize;
13,712✔
35
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szBlock * szPage, 0, .5);
13,712✔
36
  if (pCache == NULL) {
13,735!
37
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
38
  }
39

40
  taosLRUCacheSetStrictCapacity(pCache, false);
13,735✔
41

42
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
13,736✔
43

44
  pTsdb->bCache = pCache;
13,735✔
45

46
_err:
13,735✔
47
  if (code) {
13,735!
48
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
49
              tstrerror(code));
50
  }
51

52
  TAOS_RETURN(code);
13,735✔
53
}
54

55
static void tsdbCloseBCache(STsdb *pTsdb) {
13,736✔
56
  SLRUCache *pCache = pTsdb->bCache;
13,736✔
57
  if (pCache) {
13,736!
58
    int32_t elems = taosLRUCacheGetElems(pCache);
13,736✔
59
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
13,736✔
60
    taosLRUCacheEraseUnrefEntries(pCache);
13,736✔
61
    elems = taosLRUCacheGetElems(pCache);
13,736✔
62
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
13,736✔
63

64
    taosLRUCacheCleanup(pCache);
13,736✔
65

66
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
13,736✔
67
  }
68
}
13,736✔
69

70
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
13,733✔
71
  int32_t code = 0, lino = 0;
13,733✔
72
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
13,733✔
73

74
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3PageCacheSize * szPage, 0, .5);
13,733✔
75
  if (pCache == NULL) {
13,736!
76
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
77
  }
78

79
  taosLRUCacheSetStrictCapacity(pCache, false);
13,736✔
80

81
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
13,736✔
82

83
  pTsdb->pgCache = pCache;
13,736✔
84

85
_err:
13,736✔
86
  if (code) {
13,736!
87
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
88
  }
89

90
  TAOS_RETURN(code);
13,736✔
91
}
92

93
static void tsdbClosePgCache(STsdb *pTsdb) {
13,736✔
94
  SLRUCache *pCache = pTsdb->pgCache;
13,736✔
95
  if (pCache) {
13,736!
96
    int32_t elems = taosLRUCacheGetElems(pCache);
13,736✔
97
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
13,736✔
98
    taosLRUCacheEraseUnrefEntries(pCache);
13,736✔
99
    elems = taosLRUCacheGetElems(pCache);
13,736✔
100
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
13,736✔
101

102
    taosLRUCacheCleanup(pCache);
13,736✔
103

104
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
13,736✔
105
  }
106
}
13,736✔
107

108
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
109

110
enum {
111
  LFLAG_LAST_ROW = 0,
112
  LFLAG_LAST = 1,
113
};
114

115
typedef struct {
116
  tb_uid_t uid;
117
  int16_t  cid;
118
  int8_t   lflag;
119
} SLastKey;
120

121
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
122
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
123

124
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
13,668✔
125
  SVnode *pVnode = pTsdb->pVnode;
13,668✔
126
  vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
13,668✔
127

128
  int32_t offset = strlen(path);
13,692✔
129
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP);
13,692✔
130
}
13,692✔
131

132
static const char *myCmpName(void *state) {
72,313✔
133
  (void)state;
134
  return "myCmp";
72,313✔
135
}
136

137
static void myCmpDestroy(void *state) { (void)state; }
13,733✔
138

139
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
1,540,312✔
140
  (void)state;
141
  (void)alen;
142
  (void)blen;
143
  SLastKey *lhs = (SLastKey *)a;
1,540,312✔
144
  SLastKey *rhs = (SLastKey *)b;
1,540,312✔
145

146
  if (lhs->uid < rhs->uid) {
1,540,312✔
147
    return -1;
570,224✔
148
  } else if (lhs->uid > rhs->uid) {
970,088✔
149
    return 1;
193,135✔
150
  }
151

152
  if (lhs->cid < rhs->cid) {
776,953✔
153
    return -1;
375,552✔
154
  } else if (lhs->cid > rhs->cid) {
401,401✔
155
    return 1;
182,286✔
156
  }
157

158
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
219,115✔
159
    return -1;
61,021✔
160
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
158,094✔
161
    return 1;
70,403✔
162
  }
163

164
  return 0;
87,691✔
165
}
166

167
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
13,734✔
168
  int32_t code = 0, lino = 0;
13,734✔
169

170
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
13,734✔
171
  if (NULL == cmp) {
13,729!
172
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
173
  }
174

175
  rocksdb_cache_t *cache = rocksdb_cache_create_lru(5 * 1024 * 1024);
13,729✔
176
  pTsdb->rCache.blockcache = cache;
13,736✔
177

178
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
13,736✔
179
  pTsdb->rCache.tableoptions = tableoptions;
13,736✔
180

181
  rocksdb_options_t *options = rocksdb_options_create();
13,736✔
182
  if (NULL == options) {
13,732!
183
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
184
  }
185

186
  rocksdb_options_set_create_if_missing(options, 1);
13,732✔
187
  rocksdb_options_set_comparator(options, cmp);
13,734✔
188
  rocksdb_block_based_options_set_block_cache(tableoptions, cache);
13,729✔
189
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
13,734✔
190
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
13,736✔
191
  // rocksdb_options_set_inplace_update_support(options, 1);
192
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
193

194
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
13,725✔
195
  if (NULL == writeoptions) {
13,696!
196
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
197
  }
198
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
13,696✔
199

200
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
13,732✔
201
  if (NULL == readoptions) {
13,708!
202
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
203
  }
204

205
  char *err = NULL;
13,708✔
206
  char  cachePath[TSDB_FILENAME_LEN] = {0};
13,708✔
207
  tsdbGetRocksPath(pTsdb, cachePath);
13,708✔
208

209
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
13,682✔
210
  if (NULL == db) {
13,735!
211
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
212
    rocksdb_free(err);
×
213

214
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
215
  }
216

217
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
13,735✔
218
  if (NULL == flushoptions) {
13,735!
219
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
220
  }
221

222
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
13,735✔
223

224
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
13,736!
225

226
  pTsdb->rCache.writebatch = writebatch;
13,735✔
227
  pTsdb->rCache.my_comparator = cmp;
13,735✔
228
  pTsdb->rCache.options = options;
13,735✔
229
  pTsdb->rCache.writeoptions = writeoptions;
13,735✔
230
  pTsdb->rCache.readoptions = readoptions;
13,735✔
231
  pTsdb->rCache.flushoptions = flushoptions;
13,735✔
232
  pTsdb->rCache.db = db;
13,735✔
233
  pTsdb->rCache.sver = -1;
13,735✔
234
  pTsdb->rCache.suid = -1;
13,735✔
235
  pTsdb->rCache.uid = -1;
13,735✔
236
  pTsdb->rCache.pTSchema = NULL;
13,735✔
237

238
  TAOS_RETURN(code);
13,735✔
239

240
_err6:
×
241
  rocksdb_writebatch_destroy(writebatch);
×
242
_err5:
×
243
  rocksdb_close(pTsdb->rCache.db);
×
244
_err4:
×
245
  rocksdb_readoptions_destroy(readoptions);
×
246
_err3:
×
247
  rocksdb_writeoptions_destroy(writeoptions);
×
248
_err2:
×
249
  rocksdb_options_destroy(options);
×
250
  rocksdb_block_based_options_destroy(tableoptions);
×
251
  rocksdb_cache_destroy(cache);
×
252
_err:
×
253
  rocksdb_comparator_destroy(cmp);
×
254

255
  TAOS_RETURN(code);
×
256
}
257

258
static void tsdbCloseRocksCache(STsdb *pTsdb) {
13,736✔
259
  rocksdb_close(pTsdb->rCache.db);
13,736✔
260
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
13,734✔
261
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
13,734✔
262
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
13,736✔
263
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
13,736✔
264
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
13,736✔
265
  rocksdb_options_destroy(pTsdb->rCache.options);
13,736✔
266
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
13,735✔
267
  rocksdb_cache_destroy(pTsdb->rCache.blockcache);
13,735✔
268
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
13,736✔
269
  taosMemoryFree(pTsdb->rCache.pTSchema);
13,736✔
270
}
13,735✔
271

272
static void rocksMayWrite(STsdb *pTsdb, bool force) {
264,790✔
273
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
264,790✔
274

275
  int count = rocksdb_writebatch_count(wb);
264,790✔
276
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
264,804!
277
    char *err = NULL;
2,027✔
278

279
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
2,027✔
280
    if (NULL != err) {
2,026!
281
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
282
                err);
283
      rocksdb_free(err);
×
284
    }
285

286
    rocksdb_writebatch_clear(wb);
2,026✔
287
  }
288
}
264,804✔
289

290
typedef struct {
291
  TSKEY  ts;
292
  int8_t dirty;
293
  struct {
294
    int16_t cid;
295
    int8_t  type;
296
    int8_t  flag;
297
    union {
298
      int64_t val;
299
      struct {
300
        uint32_t nData;
301
        uint8_t *pData;
302
      };
303
    } value;
304
  } colVal;
305
} SLastColV0;
306

307
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
569✔
308
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
569✔
309

310
  pLastCol->rowKey.ts = pLastColV0->ts;
569✔
311
  pLastCol->rowKey.numOfPKs = 0;
569✔
312
  pLastCol->dirty = pLastColV0->dirty;
569✔
313
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
569✔
314
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
569✔
315
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
569✔
316

317
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
569✔
318

319
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
569!
320
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
19✔
321
    pLastCol->colVal.value.pData = NULL;
19✔
322
    if (pLastCol->colVal.value.nData > 0) {
19✔
323
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
10✔
324
    }
325
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
19✔
326
  } else {
327
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
550✔
328
    return sizeof(SLastColV0);
550✔
329
  }
330
}
331

332
static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
569✔
333
  if (!value) {
569!
334
    return TSDB_CODE_INVALID_PARA;
×
335
  }
336

337
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
569✔
338
  if (NULL == pLastCol) {
569!
339
    return terrno;
×
340
  }
341

342
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
569✔
343
  if (offset == size) {
569!
344
    // version 0
345
    *ppLastCol = pLastCol;
×
346

347
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
348
  } else if (offset > size) {
569!
349
    taosMemoryFreeClear(pLastCol);
×
350

351
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
352
  }
353

354
  // version
355
  int8_t version = *(int8_t *)(value + offset);
569✔
356
  offset += sizeof(int8_t);
569✔
357

358
  // numOfPKs
359
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
569✔
360
  offset += sizeof(uint8_t);
569✔
361

362
  // pks
363
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
569!
364
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
×
365
    offset += sizeof(SValue);
×
366

367
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
368
      pLastCol->rowKey.pks[i].pData = NULL;
×
369
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
370
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
371
        offset += pLastCol->rowKey.pks[i].nData;
×
372
      }
373
    }
374
  }
375

376
  if (version >= LAST_COL_VERSION_2) {
569!
377
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
569✔
378
  }
379

380
  if (offset > size) {
569!
381
    taosMemoryFreeClear(pLastCol);
×
382

383
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
384
  }
385

386
  *ppLastCol = pLastCol;
569✔
387

388
  TAOS_RETURN(TSDB_CODE_SUCCESS);
569✔
389
}
390

391
/*
392
typedef struct {
393
  SLastColV0 lastColV0;
394
  char       colData[];
395
  int8_t     version;
396
  uint8_t    numOfPKs;
397
  SValue     pks[0];
398
  char       pk0Data[];
399
  SValue     pks[1];
400
  char       pk1Data[];
401
  ...
402
} SLastColDisk;
403
*/
404
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
82,804✔
405
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
82,804✔
406

407
  pLastColV0->ts = pLastCol->rowKey.ts;
82,804✔
408
  pLastColV0->dirty = pLastCol->dirty;
82,804✔
409
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
82,804✔
410
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
82,804✔
411
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
82,804✔
412
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
82,804!
413
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
11,403✔
414
    if (pLastCol->colVal.value.nData > 0) {
11,403✔
415
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
5,677✔
416
    }
417
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
11,403✔
418
  } else {
419
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
71,401✔
420
    return sizeof(SLastColV0);
71,401✔
421
  }
422

423
  return 0;
424
}
425

426
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
82,799✔
427
  *size = sizeof(SLastColV0);
82,799✔
428
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
82,799!
429
    *size += pLastCol->colVal.value.nData;
11,406✔
430
  }
431
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
82,799✔
432

433
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
84,125✔
434
    *size += sizeof(SValue);
1,326✔
435
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
1,326!
436
      *size += pLastCol->rowKey.pks[i].nData;
336✔
437
    }
438
  }
439

440
  *value = taosMemoryMalloc(*size);
82,799✔
441
  if (NULL == *value) {
82,803!
442
    TAOS_RETURN(terrno);
×
443
  }
444

445
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
82,803✔
446

447
  // version
448
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
82,803✔
449
  offset++;
82,803✔
450

451
  // numOfPKs
452
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
82,803✔
453
  offset++;
82,803✔
454

455
  // pks
456
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
84,129✔
457
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
1,326✔
458
    offset += sizeof(SValue);
1,326✔
459
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
1,326!
460
      if (pLastCol->rowKey.pks[i].nData > 0) {
336!
461
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
336✔
462
      }
463
      offset += pLastCol->rowKey.pks[i].nData;
336✔
464
    }
465
  }
466

467
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
82,803✔
468

469
  TAOS_RETURN(TSDB_CODE_SUCCESS);
82,803✔
470
}
471

472
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
473

474
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
527,431✔
475
  SLastCol *pLastCol = (SLastCol *)value;
527,431✔
476

477
  if (pLastCol->dirty) {
527,431✔
478
    STsdb *pTsdb = (STsdb *)ud;
59,408✔
479

480
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
59,408✔
481
    if (code) {
59,404!
482
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
483
      return code;
×
484
    }
485

486
    pLastCol->dirty = 0;
59,404✔
487

488
    rocksMayWrite(pTsdb, false);
59,404✔
489
  }
490

491
  return 0;
527,480✔
492
}
493

494
int32_t tsdbCacheCommit(STsdb *pTsdb) {
67,958✔
495
  int32_t code = 0;
67,958✔
496
  char   *err = NULL;
67,958✔
497

498
  SLRUCache            *pCache = pTsdb->lruCache;
67,958✔
499
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
67,958✔
500

501
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
67,958✔
502

503
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
67,961✔
504

505
  rocksMayWrite(pTsdb, true);
67,961✔
506
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
67,961✔
507

508
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
67,961✔
509

510
  if (NULL != err) {
67,961!
511
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
512
    rocksdb_free(err);
×
513
    code = TSDB_CODE_FAILED;
×
514
  }
515

516
  TAOS_RETURN(code);
67,961✔
517
}
518

519
static int32_t reallocVarDataVal(SValue *pValue) {
1,289,612✔
520
  if (IS_VAR_DATA_TYPE(pValue->type)) {
1,289,612!
521
    uint8_t *pVal = pValue->pData;
1,289,758✔
522
    uint32_t nData = pValue->nData;
1,289,758✔
523
    if (nData > 0) {
1,289,758✔
524
      uint8_t *p = taosMemoryMalloc(nData);
860,892✔
525
      if (!p) {
860,827✔
526
        TAOS_RETURN(terrno);
44✔
527
      }
528
      pValue->pData = p;
860,783✔
529
      (void)memcpy(pValue->pData, pVal, nData);
860,783✔
530
    } else {
531
      pValue->pData = NULL;
428,866✔
532
    }
533
  }
534

535
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,289,503✔
536
}
537

538
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
1,286,207✔
539

540
// realloc pk data and col data.
541
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
8,643,385✔
542
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
8,643,385✔
543
  size_t  charge = sizeof(SLastCol);
8,643,385✔
544

545
  int8_t i = 0;
8,643,385✔
546
  for (; i < pCol->rowKey.numOfPKs; i++) {
8,655,131✔
547
    SValue *pValue = &pCol->rowKey.pks[i];
11,745✔
548
    if (IS_VAR_DATA_TYPE(pValue->type)) {
11,745!
549
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
3,442!
550
      charge += pValue->nData;
3,443✔
551
    }
552
  }
553

554
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
8,643,386!
555
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
1,285,326!
556
    charge += pCol->colVal.value.nData;
1,285,866✔
557
  }
558

559
  if (pCharge) {
8,643,926✔
560
    *pCharge = charge;
8,585,929✔
561
  }
562

563
_exit:
57,997✔
564
  if (TSDB_CODE_SUCCESS != code) {
8,643,926!
565
    for (int8_t j = 0; j < i; j++) {
×
566
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
567
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
568
      }
569
    }
570

571
    (void)memset(pCol, 0, sizeof(SLastCol));
×
572
  }
573

574
  TAOS_RETURN(code);
8,643,926✔
575
}
576

577
void tsdbCacheFreeSLastColItem(void *pItem) {
71,926✔
578
  SLastCol *pCol = (SLastCol *)pItem;
71,926✔
579
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
84,357✔
580
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
12,430!
581
      taosMemoryFree(pCol->rowKey.pks[i].pData);
3,802✔
582
    }
583
  }
584

585
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) {
71,927!
586
    taosMemoryFree(pCol->colVal.value.pData);
5,023✔
587
  }
588
}
71,927✔
589

590
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
8,580,925✔
591
  SLastCol *pLastCol = (SLastCol *)value;
8,580,925✔
592

593
  if (pLastCol->dirty) {
8,580,925✔
594
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
6,870!
595
      STsdb *pTsdb = (STsdb *)ud;
×
596
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
597
    }
598
  }
599

600
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
8,582,421✔
601
    SValue *pValue = &pLastCol->rowKey.pks[i];
1,342✔
602
    if (IS_VAR_DATA_TYPE(pValue->type)) {
1,342!
603
      taosMemoryFree(pValue->pData);
336✔
604
    }
605
  }
606

607
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) /* && pLastCol->colVal.value.nData > 0*/) {
8,581,079!
608
    taosMemoryFree(pLastCol->colVal.value.pData);
1,277,526✔
609
  }
610

611
  taosMemoryFree(value);
8,582,525✔
612
}
8,588,647✔
613

614
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
8,541,984✔
615
  SLastCol *pLastCol = (SLastCol *)value;
8,541,984✔
616
  pLastCol->dirty = 0;
8,541,984✔
617
}
8,541,984✔
618

619
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
620

621
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
17,648✔
622
  int32_t code = 0, lino = 0;
17,648✔
623

624
  SLRUCache            *pCache = pTsdb->lruCache;
17,648✔
625
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
17,648✔
626
  SRowKey               emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
17,648✔
627
  SLastCol              emptyCol = {
17,648✔
628
                   .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
629

630
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
17,648✔
631
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
17,648✔
632
  if (code) {
17,675!
633
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
634
  }
635

636
  TAOS_RETURN(code);
17,675✔
637
}
638

639
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
3,487✔
640
  int32_t code = 0;
3,487✔
641
  char   *err = NULL;
3,487✔
642

643
  SLRUCache            *pCache = pTsdb->lruCache;
3,487✔
644
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
3,487✔
645

646
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
3,487✔
647

648
  rocksMayWrite(pTsdb, true);
3,487✔
649
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
3,487✔
650

651
  if (NULL != err) {
3,487!
652
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
653
    rocksdb_free(err);
×
654
    code = TSDB_CODE_FAILED;
×
655
  }
656

657
  TAOS_RETURN(code);
3,487✔
658
}
659

660
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
178,612✔
661
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
662
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
178,612✔
663
  if (!valuesList) return terrno;
178,643!
664
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
178,643✔
665
  if (!valuesListSizes) {
178,586!
666
    taosMemoryFreeClear(valuesList);
×
667
    return terrno;
×
668
  }
669
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
178,586✔
670
  if (!errs) {
178,653!
671
    taosMemoryFreeClear(valuesList);
×
672
    taosMemoryFreeClear(valuesListSizes);
×
673
    return terrno;
×
674
  }
675
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
178,653✔
676
                    valuesListSizes, errs);
677
  for (size_t i = 0; i < numKeys; ++i) {
1,120,070✔
678
    rocksdb_free(errs[i]);
941,481✔
679
  }
680
  taosMemoryFreeClear(errs);
178,589!
681

682
  *pppValuesList = valuesList;
178,594✔
683
  *ppValuesListSizes = valuesListSizes;
178,594✔
684
  TAOS_RETURN(TSDB_CODE_SUCCESS);
178,594✔
685
}
686

687
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
113,320✔
688
  int32_t code = 0;
113,320✔
689

690
  // build keys & multi get from rocks
691
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
113,320✔
692
  if (!keys_list) {
113,394!
693
    return terrno;
×
694
  }
695
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
113,394✔
696
  if (!keys_list_sizes) {
113,359!
697
    taosMemoryFree(keys_list);
×
698
    return terrno;
×
699
  }
700
  const size_t klen = ROCKS_KEY_LEN;
113,359✔
701

702
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
113,359✔
703
  if (!keys) {
113,394!
704
    taosMemoryFree(keys_list);
×
705
    taosMemoryFree(keys_list_sizes);
×
706
    return terrno;
×
707
  }
708
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
113,394✔
709
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
113,394✔
710

711
  keys_list[0] = keys;
113,394✔
712
  keys_list[1] = keys + sizeof(SLastKey);
113,394✔
713
  keys_list_sizes[0] = klen;
113,394✔
714
  keys_list_sizes[1] = klen;
113,394✔
715

716
  char  **values_list = NULL;
113,394✔
717
  size_t *values_list_sizes = NULL;
113,394✔
718

719
  // was written by caller
720
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
721

722
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
113,394!
723
                                              &values_list_sizes),
724
                  NULL, _exit);
725

726
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
113,391✔
727
  {
728
    SLastCol *pLastCol = NULL;
113,391✔
729
    if (values_list[0] != NULL) {
113,391✔
730
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
233✔
731
      if (code != TSDB_CODE_SUCCESS) {
233!
732
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
733
                  tstrerror(code));
734
        goto _exit;
×
735
      }
736
      if (NULL != pLastCol) {
233!
737
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
233✔
738
      }
739
      taosMemoryFreeClear(pLastCol);
233!
740
    }
741

742
    pLastCol = NULL;
113,391✔
743
    if (values_list[1] != NULL) {
113,391✔
744
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
233✔
745
      if (code != TSDB_CODE_SUCCESS) {
233!
746
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
747
                  tstrerror(code));
748
        goto _exit;
×
749
      }
750
      if (NULL != pLastCol) {
233!
751
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
233✔
752
      }
753
      taosMemoryFreeClear(pLastCol);
233!
754
    }
755

756
    rocksdb_free(values_list[0]);
113,391✔
757
    rocksdb_free(values_list[1]);
113,426✔
758

759
    for (int i = 0; i < 2; i++) {
340,117✔
760
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
226,726✔
761
      if (h) {
226,726✔
762
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
466✔
763
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
466✔
764
      }
765
    }
766
  }
767

768
_exit:
113,391✔
769
  taosMemoryFree(keys_list[0]);
113,391✔
770

771
  taosMemoryFree(keys_list);
113,415✔
772
  taosMemoryFree(keys_list_sizes);
113,392✔
773
  taosMemoryFree(values_list);
113,386✔
774
  taosMemoryFree(values_list_sizes);
113,415✔
775

776
  TAOS_RETURN(code);
113,384✔
777
}
778

779
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
1,004✔
780
  int32_t code = 0;
1,004✔
781

782
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
1,004✔
783

784
  if (suid < 0) {
1,006✔
785
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
419✔
786
      int16_t cid = pSchemaRow->pSchema[i].colId;
377✔
787
      int8_t  col_type = pSchemaRow->pSchema[i].type;
377✔
788

789
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
377✔
790
      if (code != TSDB_CODE_SUCCESS) {
377!
791
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
792
                  tstrerror(code));
793
      }
794
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
377✔
795
      if (code != TSDB_CODE_SUCCESS) {
377!
796
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
797
                  tstrerror(code));
798
      }
799
    }
800
  } else {
801
    STSchema *pTSchema = NULL;
964✔
802
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
964✔
803
    if (code != TSDB_CODE_SUCCESS) {
964!
804
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
805

806
      TAOS_RETURN(code);
×
807
    }
808

809
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
9,265✔
810
      int16_t cid = pTSchema->columns[i].colId;
8,299✔
811
      int8_t  col_type = pTSchema->columns[i].type;
8,299✔
812

813
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
8,299✔
814
      if (code != TSDB_CODE_SUCCESS) {
8,301!
815
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
816
                  tstrerror(code));
817
      }
818
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
8,301✔
819
      if (code != TSDB_CODE_SUCCESS) {
8,301!
820
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
821
                  tstrerror(code));
822
      }
823
    }
824

825
    taosMemoryFree(pTSchema);
966✔
826
  }
827

828
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
1,006✔
829

830
  TAOS_RETURN(code);
1,006✔
831
}
832

833
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
75✔
834
  int32_t code = 0;
75✔
835

836
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
75✔
837

838
  code = tsdbCacheCommitNoLock(pTsdb);
75✔
839
  if (code != TSDB_CODE_SUCCESS) {
75!
840
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
841
              tstrerror(code));
842
  }
843

844
  if (pSchemaRow != NULL) {
75!
845
    bool hasPrimayKey = false;
×
846
    int  nCols = pSchemaRow->nCols;
×
847
    if (nCols >= 2) {
×
848
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
849
    }
850
    for (int i = 0; i < nCols; ++i) {
×
851
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
852
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
853

854
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
855
      if (code != TSDB_CODE_SUCCESS) {
×
856
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
857
                  tstrerror(code));
858
      }
859
    }
860
  } else {
861
    STSchema *pTSchema = NULL;
75✔
862
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
75✔
863
    if (code != TSDB_CODE_SUCCESS) {
75!
864
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
865

866
      TAOS_RETURN(code);
×
867
    }
868

869
    bool hasPrimayKey = false;
75✔
870
    int  nCols = pTSchema->numOfCols;
75✔
871
    if (nCols >= 2) {
75!
872
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
75✔
873
    }
874
    for (int i = 0; i < nCols; ++i) {
225✔
875
      int16_t cid = pTSchema->columns[i].colId;
150✔
876
      int8_t  col_type = pTSchema->columns[i].type;
150✔
877

878
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
150✔
879
      if (code != TSDB_CODE_SUCCESS) {
150!
880
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
881
                  tstrerror(code));
882
      }
883
    }
884

885
    taosMemoryFree(pTSchema);
75✔
886
  }
887

888
  rocksMayWrite(pTsdb, false);
75✔
889

890
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
75✔
891

892
  TAOS_RETURN(code);
75✔
893
}
894

895
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
3,366✔
896
  int32_t code = 0;
3,366✔
897

898
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
3,366✔
899

900
  code = tsdbCacheCommitNoLock(pTsdb);
3,377✔
901
  if (code != TSDB_CODE_SUCCESS) {
3,376!
902
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
903
              tstrerror(code));
904
  }
905

906
  STSchema *pTSchema = NULL;
3,376✔
907
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
3,376✔
908
  if (code != TSDB_CODE_SUCCESS) {
3,373!
UNCOV
909
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
910

911
    TAOS_RETURN(code);
×
912
  }
913

914
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
7,534✔
915
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
4,168✔
916

917
    bool hasPrimayKey = false;
4,168✔
918
    int  nCols = pTSchema->numOfCols;
4,168✔
919
    if (nCols >= 2) {
4,168!
920
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
4,170✔
921
    }
922

923
    for (int i = 0; i < nCols; ++i) {
117,253✔
924
      int16_t cid = pTSchema->columns[i].colId;
113,093✔
925
      int8_t  col_type = pTSchema->columns[i].type;
113,093✔
926

927
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
113,093✔
928
      if (code != TSDB_CODE_SUCCESS) {
113,075!
929
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
930
                  tstrerror(code));
931
      }
932
    }
933
  }
934

935
  taosMemoryFree(pTSchema);
3,366✔
936

937
  rocksMayWrite(pTsdb, false);
3,377✔
938

939
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
3,377✔
940

941
  TAOS_RETURN(code);
3,377✔
942
}
943

944
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
3✔
945
  int32_t code = 0;
3✔
946

947
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
3✔
948

949
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
3✔
950
  if (code != TSDB_CODE_SUCCESS) {
3!
951
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
952
              tstrerror(code));
953
  }
954
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
3✔
955
  if (code != TSDB_CODE_SUCCESS) {
3!
956
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
957
              tstrerror(code));
958
  }
959
  // rocksMayWrite(pTsdb, true, false, false);
960
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
3✔
961

962
  TAOS_RETURN(code);
3✔
963
}
964

965
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
3✔
966
  int32_t code = 0;
3✔
967

968
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
3✔
969

970
  code = tsdbCacheCommitNoLock(pTsdb);
3✔
971
  if (code != TSDB_CODE_SUCCESS) {
3!
972
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
973
              tstrerror(code));
974
  }
975

976
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
3✔
977
  if (code != TSDB_CODE_SUCCESS) {
3!
978
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
979
              tstrerror(code));
980
  }
981

982
  rocksMayWrite(pTsdb, false);
3✔
983

984
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
3✔
985

986
  TAOS_RETURN(code);
3✔
987
}
988

989
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
63✔
990
  int32_t code = 0;
63✔
991

992
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
63✔
993

994
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
224✔
995
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
160✔
996

997
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
160✔
998
    if (code != TSDB_CODE_SUCCESS) {
160!
999
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1000
                tstrerror(code));
1001
    }
1002
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
160✔
1003
    if (code != TSDB_CODE_SUCCESS) {
160!
1004
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1005
                tstrerror(code));
1006
    }
1007
  }
1008

1009
  // rocksMayWrite(pTsdb, true, false, false);
1010
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
64✔
1011
  TAOS_RETURN(code);
64✔
1012
}
1013

1014
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
32✔
1015
  int32_t code = 0;
32✔
1016

1017
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
32✔
1018

1019
  code = tsdbCacheCommitNoLock(pTsdb);
32✔
1020
  if (code != TSDB_CODE_SUCCESS) {
32!
1021
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1022
              tstrerror(code));
1023
  }
1024

1025
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
112✔
1026
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
80✔
1027

1028
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
80✔
1029
    if (code != TSDB_CODE_SUCCESS) {
80!
1030
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1031
                tstrerror(code));
1032
    }
1033
  }
1034

1035
  rocksMayWrite(pTsdb, false);
32✔
1036

1037
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
32✔
1038

1039
  TAOS_RETURN(code);
32✔
1040
}
1041

1042
typedef struct {
1043
  int      idx;
1044
  SLastKey key;
1045
} SIdxKey;
1046

1047
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
238✔
1048
  // update rowkey
1049
  pLastCol->rowKey.ts = TSKEY_MIN;
238✔
1050
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
238!
1051
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
1052
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
1053
      taosMemoryFreeClear(pPKValue->pData);
×
1054
      pPKValue->nData = 0;
×
1055
    } else {
1056
      pPKValue->val = 0;
×
1057
    }
1058
  }
1059
  pLastCol->rowKey.numOfPKs = 0;
238✔
1060

1061
  // update colval
1062
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
238!
1063
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
1064
    pLastCol->colVal.value.nData = 0;
×
1065
  } else {
1066
    pLastCol->colVal.value.val = 0;
238✔
1067
  }
1068

1069
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
238✔
1070
  pLastCol->dirty = 1;
238✔
1071
  pLastCol->cacheStatus = cacheStatus;
238✔
1072
}
238✔
1073

1074
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
82,801✔
1075
  int32_t code = 0;
82,801✔
1076
  char   *rocks_value = NULL;
82,801✔
1077
  size_t  vlen = 0;
82,801✔
1078

1079
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
82,801✔
1080
  if (code) {
82,800!
1081
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
1082
    TAOS_RETURN(code);
×
1083
  }
1084

1085
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
82,800✔
1086
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
82,800✔
1087
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
82,806✔
1088
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
82,812✔
1089

1090
  taosMemoryFree(rocks_value);
82,806✔
1091

1092
  TAOS_RETURN(code);
82,796✔
1093
}
1094

1095
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
8,583,628✔
1096
  int32_t code = 0, lino = 0;
8,583,628✔
1097

1098
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
8,583,628✔
1099
  if (!pLRULastCol) {
8,587,930!
1100
    return terrno;
×
1101
  }
1102

1103
  size_t charge = 0;
8,587,930✔
1104
  *pLRULastCol = *pLastCol;
8,587,930✔
1105
  pLRULastCol->dirty = dirty;
8,587,930✔
1106
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
8,587,930!
1107

1108
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
8,578,448✔
1109
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
1110
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
8,576,345!
1111
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
1112
    code = TSDB_CODE_FAILED;
×
1113
    pLRULastCol = NULL;
×
1114
  }
1115

1116
_exit:
8,576,345✔
1117
  if (TSDB_CODE_SUCCESS != code) {
8,576,345!
1118
    taosMemoryFree(pLRULastCol);
×
1119
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1120
  }
1121

1122
  TAOS_RETURN(code);
8,576,345✔
1123
}
1124

1125
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
230,648✔
1126
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
230,648!
1127
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1128
  }
1129

1130
  int32_t code = 0, lino = 0;
230,653✔
1131

1132
  int        num_keys = TARRAY_SIZE(updCtxArray);
230,653✔
1133
  SArray    *remainCols = NULL;
230,653✔
1134
  SLRUCache *pCache = pTsdb->lruCache;
230,653✔
1135

1136
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
230,653✔
1137
  for (int i = 0; i < num_keys; ++i) {
8,791,622✔
1138
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
8,560,265✔
1139
    int8_t          lflag = updCtx->lflag;
8,560,265✔
1140
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
8,560,265✔
1141
    SColVal        *pColVal = &updCtx->colVal;
8,560,265✔
1142

1143
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
8,560,265!
1144
      continue;
×
1145
    }
1146

1147
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
8,560,265✔
1148
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
8,560,265✔
1149
    if (h) {
8,556,930✔
1150
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
8,535,476✔
1151
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
8,530,168!
1152
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
8,530,604✔
1153
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
8,526,373!
1154
          SLastCol newLastCol = {
8,508,090✔
1155
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1156
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
8,508,090✔
1157
        }
1158
      }
1159

1160
      tsdbLRUCacheRelease(pCache, h, false);
8,529,977✔
1161
      TAOS_CHECK_EXIT(code);
8,539,290!
1162
    } else {
1163
      if (!remainCols) {
21,454✔
1164
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
545✔
1165
        if (!remainCols) {
545!
1166
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1167
        }
1168
      }
1169
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
43,127!
1170
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1171
      }
1172
    }
1173
  }
1174

1175
  if (remainCols) {
231,357✔
1176
    num_keys = TARRAY_SIZE(remainCols);
545✔
1177
  }
1178
  if (remainCols && num_keys > 0) {
231,357!
1179
    char  **keys_list = NULL;
545✔
1180
    size_t *keys_list_sizes = NULL;
545✔
1181
    char  **values_list = NULL;
545✔
1182
    size_t *values_list_sizes = NULL;
545✔
1183
    char  **errs = NULL;
545✔
1184
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
545✔
1185
    if (!keys_list) {
545!
1186
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1187
      return terrno;
×
1188
    }
1189
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
545✔
1190
    if (!keys_list_sizes) {
545!
1191
      taosMemoryFree(keys_list);
×
1192
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1193
      return terrno;
×
1194
    }
1195
    for (int i = 0; i < num_keys; ++i) {
22,218✔
1196
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
21,673✔
1197

1198
      keys_list[i] = (char *)&idxKey->key;
21,673✔
1199
      keys_list_sizes[i] = ROCKS_KEY_LEN;
21,673✔
1200
    }
1201

1202
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
545✔
1203

1204
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
545✔
1205
                                       &values_list_sizes);
1206
    if (code) {
545!
1207
      taosMemoryFree(keys_list);
×
1208
      taosMemoryFree(keys_list_sizes);
×
1209
      goto _exit;
×
1210
    }
1211

1212
    rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
545✔
1213
    for (int i = 0; i < num_keys; ++i) {
22,211✔
1214
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
21,666✔
1215
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
21,666✔
1216
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
21,666✔
1217
      SColVal        *pColVal = &updCtx->colVal;
21,666✔
1218

1219
      SLastCol *pLastCol = NULL;
21,666✔
1220
      if (values_list[i] != NULL) {
21,666✔
1221
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
33✔
1222
        if (code != TSDB_CODE_SUCCESS) {
33!
1223
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1224
                    tstrerror(code));
1225
          goto _exit;
×
1226
        }
1227
      }
1228
      /*
1229
      if (code) {
1230
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1231
      }
1232
      */
1233
      SLastCol *pToFree = pLastCol;
21,666✔
1234

1235
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
21,666!
1236
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
1237
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1238
                    tstrerror(code));
1239
          taosMemoryFreeClear(pToFree);
×
1240
          break;
×
1241
        }
1242

1243
        // cache invalid => skip update
1244
        taosMemoryFreeClear(pToFree);
×
1245
        continue;
×
1246
      }
1247

1248
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
21,666!
1249
        taosMemoryFreeClear(pToFree);
×
1250
        continue;
×
1251
      }
1252

1253
      int32_t cmp_res = 1;
21,666✔
1254
      if (pLastCol) {
21,666✔
1255
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
33✔
1256
      }
1257

1258
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
21,666!
1259
        SLastCol lastColTmp = {
21,643✔
1260
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
1261
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
21,643!
1262
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1263
                    tstrerror(code));
1264
          taosMemoryFreeClear(pToFree);
×
1265
          break;
×
1266
        }
1267
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
21,648!
1268
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1269
                    tstrerror(code));
1270
          taosMemoryFreeClear(pToFree);
×
1271
          break;
×
1272
        }
1273
      }
1274

1275
      taosMemoryFreeClear(pToFree);
21,666✔
1276
    }
1277

1278
    rocksMayWrite(pTsdb, false);
545✔
1279

1280
    taosMemoryFree(keys_list);
545✔
1281
    taosMemoryFree(keys_list_sizes);
545✔
1282
    if (values_list) {
545!
1283
      for (int i = 0; i < num_keys; ++i) {
22,218✔
1284
        rocksdb_free(values_list[i]);
21,673✔
1285
      }
1286
      taosMemoryFree(values_list);
545✔
1287
    }
1288
    taosMemoryFree(values_list_sizes);
545✔
1289
  }
1290

1291
_exit:
230,812✔
1292
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
231,357✔
1293
  taosArrayDestroy(remainCols);
230,659✔
1294

1295
  if (code) {
230,657!
1296
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1297
              tstrerror(code));
1298
  }
1299

1300
  TAOS_RETURN(code);
230,657✔
1301
}
1302

1303
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
133✔
1304
  SRocksCache *pRCache = &pTsdb->rCache;
133✔
1305
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
133!
1306

1307
  if (suid > 0 && suid == pRCache->suid) {
134✔
1308
    pRCache->sver = -1;
16✔
1309
    pRCache->suid = -1;
16✔
1310
  }
1311
  if (suid == 0 && uid == pRCache->uid) {
134✔
1312
    pRCache->sver = -1;
3✔
1313
    pRCache->uid = -1;
3✔
1314
  }
1315
}
1316

1317
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
230,653✔
1318
  SRocksCache *pRCache = &pTsdb->rCache;
230,653✔
1319
  if (pRCache->pTSchema && sver == pRCache->sver) {
230,653!
1320
    if (suid > 0 && suid == pRCache->suid) {
230,242✔
1321
      return 0;
44,912✔
1322
    }
1323
    if (suid == 0 && uid == pRCache->uid) {
185,330✔
1324
      return 0;
29,634✔
1325
    }
1326
  }
1327

1328
  pRCache->suid = suid;
156,107✔
1329
  pRCache->uid = uid;
156,107✔
1330
  pRCache->sver = sver;
156,107✔
1331
  tDestroyTSchema(pRCache->pTSchema);
156,107✔
1332
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pRCache->pTSchema);
156,107✔
1333
}
1334

1335
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
230,641✔
1336
                                 SRow **aRow) {
1337
  int32_t code = 0, lino = 0;
230,641✔
1338

1339
  // 1. prepare last
1340
  TSDBROW    lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
230,641✔
1341
  STSchema  *pTSchema = NULL;
230,641✔
1342
  int32_t    sver = TSDBROW_SVERSION(&lRow);
230,641!
1343
  SArray    *ctxArray = NULL;
230,641✔
1344
  SSHashObj *iColHash = NULL;
230,641✔
1345

1346
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
230,641!
1347
  pTSchema = pTsdb->rCache.pTSchema;
230,657✔
1348

1349
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
230,657✔
1350
  int32_t nCol = pTSchema->numOfCols;
230,657✔
1351

1352
  ctxArray = taosArrayInit(nCol * 2, sizeof(SLastUpdateCtx));
230,657✔
1353
  if (ctxArray == NULL) {
230,650!
1354
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1355
  }
1356

1357
  // 1. prepare by lrow
1358
  STsdbRowKey tsdbRowKey = {0};
230,650✔
1359
  tsdbRowGetKey(&lRow, &tsdbRowKey);
230,650✔
1360

1361
  STSDBRowIter iter = {0};
230,652✔
1362
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
230,652!
1363

1364
  int32_t iCol = 0;
230,660✔
1365
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
5,569,539!
1366
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
5,337,789✔
1367
    if (!taosArrayPush(ctxArray, &updateCtx)) {
5,338,567!
1368
      tsdbRowClose(&iter);
×
1369
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1370
    }
1371

1372
    if (COL_VAL_IS_VALUE(pColVal)) {
5,339,253✔
1373
      updateCtx.lflag = LFLAG_LAST;
3,235,368✔
1374
      if (!taosArrayPush(ctxArray, &updateCtx)) {
3,235,127!
1375
        tsdbRowClose(&iter);
×
1376
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1377
      }
1378
    } else {
1379
      if (!iColHash) {
2,103,885✔
1380
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
210,528✔
1381
        if (iColHash == NULL) {
210,537!
1382
          tsdbRowClose(&iter);
×
1383
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1384
        }
1385
      }
1386

1387
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
2,103,894!
1388
        tsdbRowClose(&iter);
×
1389
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1390
      }
1391
    }
1392
  }
1393
  tsdbRowClose(&iter);
230,634✔
1394

1395
  // 2. prepare by the other rows
1396
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
430,578✔
1397
    if (tSimpleHashGetSize(iColHash) == 0) {
200,530✔
1398
      break;
610✔
1399
    }
1400

1401
    tRow.pTSRow = aRow[iRow];
199,920✔
1402

1403
    STsdbRowKey tsdbRowKey = {0};
199,920✔
1404
    tsdbRowGetKey(&tRow, &tsdbRowKey);
199,920✔
1405

1406
    void   *pIte = NULL;
199,920✔
1407
    int32_t iter = 0;
199,920✔
1408
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
599,760✔
1409
      int32_t iCol = ((int32_t *)pIte)[0];
399,840✔
1410
      SColVal colVal = COL_VAL_NONE(0, 0);
399,840✔
1411
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
399,840✔
1412

1413
      if (COL_VAL_IS_VALUE(&colVal)) {
399,840!
1414
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1415
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1416
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1417
        }
1418
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
1419
        if (code != TSDB_CODE_SUCCESS) {
×
1420
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1421
                    __LINE__, tstrerror(code));
1422
        }
1423
      }
1424
    }
1425
  }
1426

1427
  // 3. do update
1428
  code = tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
230,658✔
1429
  if (code < TSDB_CODE_SUCCESS) {
230,653!
1430
    tsdbTrace("vgId:%d, %s tsdbCacheUpdate failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1431
              tstrerror(code));
1432
  }
1433

1434
_exit:
230,653✔
1435
  if (code) {
230,654!
1436
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1437
  }
1438

1439
  taosArrayDestroy(ctxArray);
230,654✔
1440
  tSimpleHashCleanup(iColHash);
230,656✔
1441

1442
  TAOS_RETURN(code);
230,657✔
1443
}
1444

1445
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
1446
  int32_t code = 0, lino = 0;
×
1447

1448
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
1449

1450
  STSchema *pTSchema = NULL;
×
1451
  int32_t   sver = TSDBROW_SVERSION(&lRow);
×
1452
  SArray   *ctxArray = NULL;
×
1453

1454
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1455

1456
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
1457
  if (ctxArray == NULL) {
×
1458
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1459
  }
1460

1461
  // 1. prepare last
1462
  STsdbRowKey tsdbRowKey = {0};
×
1463
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1464

1465
  {
1466
    SLastUpdateCtx updateCtx = {
×
1467
        .lflag = LFLAG_LAST,
1468
        .tsdbRowKey = tsdbRowKey,
1469
        .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP,
×
1470
                                                                       .val = lRow.pBlockData->aTSKEY[lRow.iRow]}))};
1471
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1472
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1473
    }
1474
  }
1475

1476
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1477

1478
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1479
    SColData *pColData = &pBlockData->aColData[iColData];
×
1480
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
1481
      continue;
×
1482
    }
1483

1484
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
1485
      STsdbRowKey tsdbRowKey = {0};
×
1486
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1487

1488
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
1489
      if (colType == 2) {
×
1490
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
1491
        tColDataGetValue(pColData, tRow.iRow, &colVal);
×
1492

1493
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1494
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1495
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1496
        }
1497
        break;
×
1498
      }
1499
    }
1500
  }
1501

1502
  // 2. prepare last row
1503
  STSDBRowIter iter = {0};
×
1504
  code = tsdbRowIterOpen(&iter, &lRow, pTSchema);
×
1505
  if (code != TSDB_CODE_SUCCESS) {
×
1506
    tsdbError("vgId:%d, %s tsdbRowIterOpen failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1507
              tstrerror(code));
1508
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
1509
  }
1510
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
1511
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1512
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1513
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1514
    }
1515
  }
1516
  tsdbRowClose(&iter);
×
1517

1518
  // 3. do update
1519
  code = tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
×
1520
  if (code != TSDB_CODE_SUCCESS) {
×
1521
    tsdbTrace("vgId:%d, %s tsdbCacheUpdate failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1522
              tstrerror(code));
1523
  }
1524

1525
_exit:
×
1526
  taosMemoryFreeClear(pTSchema);
×
1527
  taosArrayDestroy(ctxArray);
×
1528

1529
  TAOS_RETURN(code);
×
1530
}
1531

1532
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1533
                            int nCols, int16_t *slotIds);
1534

1535
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1536
                               int nCols, int16_t *slotIds);
1537

1538
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
791✔
1539
                                    SCacheRowsReader *pr, int8_t ltype) {
1540
  int32_t               code = 0, lino = 0;
791✔
1541
  rocksdb_writebatch_t *wb = NULL;
791✔
1542
  SArray               *pTmpColArray = NULL;
791✔
1543

1544
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
791✔
1545
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
791✔
1546
    // ignore 'ts' loaded from cache and load it from tsdb
1547
    SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
238✔
1548
    tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
238✔
1549

1550
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
238✔
1551
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
238!
1552
      TAOS_RETURN(terrno);
×
1553
    }
1554
  }
1555

1556
  int      num_keys = TARRAY_SIZE(remainCols);
791✔
1557
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
791✔
1558

1559
  int16_t *lastColIds = NULL;
790✔
1560
  int16_t *lastSlotIds = NULL;
790✔
1561
  int16_t *lastrowColIds = NULL;
790✔
1562
  int16_t *lastrowSlotIds = NULL;
790✔
1563
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
790✔
1564
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
790✔
1565
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
790✔
1566
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
790✔
1567
  SArray *lastTmpColArray = NULL;
789✔
1568
  SArray *lastTmpIndexArray = NULL;
789✔
1569
  SArray *lastrowTmpColArray = NULL;
789✔
1570
  SArray *lastrowTmpIndexArray = NULL;
789✔
1571

1572
  int lastIndex = 0;
789✔
1573
  int lastrowIndex = 0;
789✔
1574

1575
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
789!
1576
    TAOS_CHECK_EXIT(terrno);
×
1577
  }
1578

1579
  for (int i = 0; i < num_keys; ++i) {
2,536✔
1580
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
1,743✔
1581
    slotIds[i] = pr->pSlotIds[idxKey->idx];
1,745✔
1582
    if (IS_LAST_KEY(idxKey->key)) {
1,745✔
1583
      if (NULL == lastTmpIndexArray) {
555✔
1584
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
266✔
1585
        if (!lastTmpIndexArray) {
266!
1586
          TAOS_CHECK_EXIT(terrno);
×
1587
        }
1588
      }
1589
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
556!
1590
        TAOS_CHECK_EXIT(terrno);
×
1591
      }
1592
      lastColIds[lastIndex] = idxKey->key.cid;
556✔
1593
      lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
556✔
1594
      lastIndex++;
556✔
1595
    } else {
1596
      if (NULL == lastrowTmpIndexArray) {
1,190✔
1597
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
524✔
1598
        if (!lastrowTmpIndexArray) {
524!
1599
          TAOS_CHECK_EXIT(terrno);
×
1600
        }
1601
      }
1602
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
1,191!
1603
        TAOS_CHECK_EXIT(terrno);
×
1604
      }
1605
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
1,191✔
1606
      lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
1,191✔
1607
      lastrowIndex++;
1,191✔
1608
    }
1609
  }
1610

1611
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
793✔
1612
  if (!pTmpColArray) {
790!
1613
    TAOS_CHECK_EXIT(terrno);
×
1614
  }
1615

1616
  if (lastTmpIndexArray != NULL) {
790✔
1617
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
266!
1618
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
764✔
1619
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
498!
1620
                           taosArrayGet(lastTmpColArray, i))) {
498✔
1621
        TAOS_CHECK_EXIT(terrno);
×
1622
      }
1623
    }
1624
  }
1625

1626
  if (lastrowTmpIndexArray != NULL) {
790✔
1627
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
524!
1628
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
1,596✔
1629
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
1,071!
1630
                           taosArrayGet(lastrowTmpColArray, i))) {
1,073✔
1631
        TAOS_CHECK_EXIT(terrno);
×
1632
      }
1633
    }
1634
  }
1635

1636
  SLRUCache *pCache = pTsdb->lruCache;
788✔
1637
  for (int i = 0; i < num_keys; ++i) {
2,537✔
1638
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
1,747✔
1639
    SLastCol *pLastCol = NULL;
1,746✔
1640

1641
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
1,746!
1642
      pLastCol = taosArrayGet(pTmpColArray, i);
1,568✔
1643
    }
1644

1645
    // still null, then make up a none col value
1646
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
1,745✔
1647
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
1,745✔
1648
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
1649
    if (!pLastCol) {
1,745✔
1650
      pLastCol = &noneCol;
178✔
1651
    }
1652

1653
    taosArraySet(pLastArray, idxKey->idx, pLastCol);
1,745✔
1654
    // taosArrayRemove(remainCols, i);
1655

1656
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
1,747!
1657
      continue;
×
1658
    }
1659
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
1,747!
1660
      continue;
×
1661
    }
1662

1663
    // store result back to rocks cache
1664
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
1,747✔
1665
    if (code) {
1,749!
1666
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1667
      TAOS_CHECK_EXIT(code);
×
1668
    }
1669

1670
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
1,749✔
1671
    if (code) {
1,749!
1672
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1673
      TAOS_CHECK_EXIT(code);
×
1674
    }
1675
  }
1676

1677
  rocksMayWrite(pTsdb, false);
790✔
1678

1679
_exit:
791✔
1680
  taosArrayDestroy(lastrowTmpIndexArray);
791✔
1681
  taosArrayDestroy(lastrowTmpColArray);
791✔
1682
  taosArrayDestroy(lastTmpIndexArray);
791✔
1683
  taosArrayDestroy(lastTmpColArray);
791✔
1684

1685
  taosMemoryFree(lastColIds);
791✔
1686
  taosMemoryFree(lastSlotIds);
791✔
1687
  taosMemoryFree(lastrowColIds);
791✔
1688
  taosMemoryFree(lastrowSlotIds);
791✔
1689

1690
  taosArrayDestroy(pTmpColArray);
791✔
1691

1692
  taosMemoryFree(slotIds);
791✔
1693

1694
  TAOS_RETURN(code);
790✔
1695
}
1696

1697
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
828✔
1698
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
1699
  int32_t code = 0, lino = 0;
828✔
1700
  int     num_keys = TARRAY_SIZE(remainCols);
828✔
1701
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
828✔
1702
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
828✔
1703
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
829✔
1704
  if (!keys_list || !keys_list_sizes || !key_list) {
829!
1705
    taosMemoryFree(keys_list);
×
1706
    taosMemoryFree(keys_list_sizes);
×
1707
    TAOS_RETURN(terrno);
×
1708
  }
1709
  char  **values_list = NULL;
829✔
1710
  size_t *values_list_sizes = NULL;
829✔
1711
  for (int i = 0; i < num_keys; ++i) {
2,409✔
1712
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
1,580✔
1713
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
1,580✔
1714
    keys_list_sizes[i] = ROCKS_KEY_LEN;
1,580✔
1715
  }
1716

1717
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
829✔
1718

1719
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
828✔
1720
                                     &values_list_sizes);
1721
  if (code) {
829!
1722
    taosMemoryFree(key_list);
×
1723
    taosMemoryFree(keys_list);
×
1724
    taosMemoryFree(keys_list_sizes);
×
1725
    TAOS_RETURN(code);
×
1726
  }
1727

1728
  SLRUCache *pCache = pTsdb->lruCache;
829✔
1729
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
2,410!
1730
    SLastCol *pLastCol = NULL;
1,581✔
1731
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
1,581✔
1732
    if (ignore) {
1,581✔
1733
      ++j;
1✔
1734
      continue;
1✔
1735
    }
1736

1737
    if (values_list[i] != NULL) {
1,580✔
1738
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
70✔
1739
      if (code != TSDB_CODE_SUCCESS) {
70!
1740
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1741
                  tstrerror(code));
1742
        goto _exit;
×
1743
      }
1744
    }
1745
    SLastCol *pToFree = pLastCol;
1,580✔
1746
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
1,580✔
1747
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
1,650!
1748
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
70✔
1749
      if (code) {
70!
1750
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1751
        taosMemoryFreeClear(pToFree);
×
1752
        TAOS_CHECK_EXIT(code);
×
1753
      }
1754

1755
      SLastCol lastCol = *pLastCol;
70✔
1756
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
70✔
1757
      if (TSDB_CODE_SUCCESS != code) {
70!
1758
        taosMemoryFreeClear(pToFree);
×
1759
        TAOS_CHECK_EXIT(code);
×
1760
      }
1761

1762
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
70✔
1763
      taosArrayRemove(remainCols, j);
70✔
1764
      taosArrayRemove(ignoreFromRocks, j);
70✔
1765
    } else {
1766
      ++j;
1,510✔
1767
    }
1768

1769
    taosMemoryFreeClear(pToFree);
1,580✔
1770
  }
1771

1772
  if (TARRAY_SIZE(remainCols) > 0) {
829✔
1773
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
1774
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
791✔
1775
  }
1776

1777
_exit:
38✔
1778
  taosMemoryFree(key_list);
828✔
1779
  taosMemoryFree(keys_list);
829✔
1780
  taosMemoryFree(keys_list_sizes);
829✔
1781
  if (values_list) {
829!
1782
    for (int i = 0; i < num_keys; ++i) {
2,410✔
1783
      rocksdb_free(values_list[i]);
1,581✔
1784
    }
1785
    taosMemoryFree(values_list);
829✔
1786
  }
1787
  taosMemoryFree(values_list_sizes);
829✔
1788

1789
  TAOS_RETURN(code);
828✔
1790
}
1791

1792
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
20,320✔
1793
  int32_t    code = 0;
20,320✔
1794
  SArray    *remainCols = NULL;
20,320✔
1795
  SArray    *ignoreFromRocks = NULL;
20,320✔
1796
  SLRUCache *pCache = pTsdb->lruCache;
20,320✔
1797
  SArray    *pCidList = pr->pCidList;
20,320✔
1798
  int        numKeys = TARRAY_SIZE(pCidList);
20,320✔
1799

1800
  for (int i = 0; i < numKeys; ++i) {
78,380✔
1801
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
58,041✔
1802

1803
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
58,041✔
1804
    // for select last_row, last case
1805
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
58,041✔
1806
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
58,041✔
1807
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
2,721✔
1808
    }
1809
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
57,987✔
1810
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
1,425✔
1811
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
1,425✔
1812
    }
1813

1814
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
57,987✔
1815
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
58,017✔
1816
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
114,194!
1817
      SLastCol lastCol = *pLastCol;
56,416✔
1818
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
56,416!
1819
        tsdbLRUCacheRelease(pCache, h, false);
×
1820
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
1821
      }
1822

1823
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
56,197!
1824
        code = terrno;
×
1825
        tsdbLRUCacheRelease(pCache, h, false);
×
1826
        goto _exit;
×
1827
      }
1828
    } else {
1829
      // no cache or cache is invalid
1830
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
1,581✔
1831
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
1,581✔
1832

1833
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
1,579!
1834
        code = terrno;
×
1835
        tsdbLRUCacheRelease(pCache, h, false);
×
1836
        goto _exit;
×
1837
      }
1838

1839
      if (!remainCols) {
1,579✔
1840
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
827!
1841
          code = terrno;
×
1842
          tsdbLRUCacheRelease(pCache, h, false);
×
1843
          goto _exit;
×
1844
        }
1845
      }
1846
      if (!ignoreFromRocks) {
1,580✔
1847
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
828!
1848
          code = terrno;
×
1849
          tsdbLRUCacheRelease(pCache, h, false);
×
1850
          goto _exit;
×
1851
        }
1852
      }
1853
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
3,160!
1854
        code = terrno;
×
1855
        tsdbLRUCacheRelease(pCache, h, false);
×
1856
        goto _exit;
×
1857
      }
1858
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
1,580!
1859
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
1,579!
1860
        code = terrno;
×
1861
        tsdbLRUCacheRelease(pCache, h, false);
×
1862
        goto _exit;
×
1863
      }
1864
    }
1865

1866
    if (h) {
57,776✔
1867
      tsdbLRUCacheRelease(pCache, h, false);
56,187✔
1868
    }
1869
  }
1870

1871
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
20,339!
1872
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
829✔
1873

1874
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
2,409✔
1875
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
1,581✔
1876
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
1,581✔
1877
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
1,580✔
1878
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
1,580!
1879
        SLastCol lastCol = *pLastCol;
×
1880
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
1881
        if (code) {
×
1882
          tsdbLRUCacheRelease(pCache, h, false);
×
1883
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1884
          TAOS_RETURN(code);
×
1885
        }
1886

1887
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
1888

1889
        taosArrayRemove(remainCols, i);
×
1890
        taosArrayRemove(ignoreFromRocks, i);
×
1891
      } else {
1892
        // no cache or cache is invalid
1893
        ++i;
1,580✔
1894
      }
1895
      if (h) {
1,580✔
1896
        tsdbLRUCacheRelease(pCache, h, false);
1✔
1897
      }
1898
    }
1899

1900
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
1901
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
828✔
1902

1903
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
827✔
1904
  }
1905

1906
_exit:
19,510✔
1907
  if (remainCols) {
20,328✔
1908
    taosArrayDestroy(remainCols);
829✔
1909
  }
1910
  if (ignoreFromRocks) {
20,328✔
1911
    taosArrayDestroy(ignoreFromRocks);
829✔
1912
  }
1913

1914
  TAOS_RETURN(code);
20,328✔
1915
}
1916

1917
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
63,870✔
1918
  int32_t code = 0, lino = 0;
63,870✔
1919
  // fetch schema
1920
  STSchema *pTSchema = NULL;
63,870✔
1921
  int       sver = -1;
63,870✔
1922

1923
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
63,870!
1924

1925
  // build keys & multi get from rocks
1926
  int     numCols = pTSchema->numOfCols;
63,874✔
1927
  int     numKeys = 0;
63,874✔
1928
  SArray *remainCols = NULL;
63,874✔
1929

1930
  code = tsdbCacheCommit(pTsdb);
63,874✔
1931
  if (code != TSDB_CODE_SUCCESS) {
63,871!
1932
    tsdbTrace("vgId:%d, %s commit failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1933
              tstrerror(code));
1934
  }
1935

1936
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
63,871✔
1937

1938
  for (int i = 0; i < numCols; ++i) {
419,920✔
1939
    int16_t cid = pTSchema->columns[i].colId;
356,033✔
1940
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
1,068,093✔
1941
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
712,047✔
1942
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
712,047✔
1943
      if (h) {
712,063✔
1944
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
20,550✔
1945
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
20,550✔
1946
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
20,090✔
1947
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
20,090✔
1948
                              .dirty = 1,
1949
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
1950
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
20,090✔
1951
        }
1952
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
20,550✔
1953
        TAOS_CHECK_EXIT(code);
20,550!
1954
      } else {
1955
        if (!remainCols) {
691,513✔
1956
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
63,851✔
1957
        }
1958
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
1,383,024!
1959
          TAOS_CHECK_EXIT(terrno);
×
1960
        }
1961
      }
1962
    }
1963
  }
1964

1965
  if (remainCols) {
63,887✔
1966
    numKeys = TARRAY_SIZE(remainCols);
63,853✔
1967
  }
1968

1969
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
63,887✔
1970
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
63,874✔
1971
  char  **values_list = NULL;
63,874✔
1972
  size_t *values_list_sizes = NULL;
63,874✔
1973

1974
  if (!keys_list || !keys_list_sizes) {
63,874!
1975
    code = terrno;
×
1976
    goto _exit;
×
1977
  }
1978
  const size_t klen = ROCKS_KEY_LEN;
63,874✔
1979

1980
  for (int i = 0; i < numKeys; ++i) {
755,371✔
1981
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
691,494✔
1982
    if (!key) {
691,505!
1983
      code = terrno;
×
1984
      goto _exit;
×
1985
    }
1986
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
691,505✔
1987

1988
    ((SLastKey *)key)[0] = idxKey->key;
691,497✔
1989

1990
    keys_list[i] = key;
691,497✔
1991
    keys_list_sizes[i] = klen;
691,497✔
1992
  }
1993

1994
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
63,877✔
1995

1996
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
63,874!
1997
                                              &values_list, &values_list_sizes),
1998
                  NULL, _exit);
1999

2000
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
63,873✔
2001
  for (int i = 0; i < numKeys; ++i) {
755,289✔
2002
    SLastCol *pLastCol = NULL;
691,416✔
2003
    if (values_list[i] != NULL) {
691,416!
2004
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
2005
      if (code != TSDB_CODE_SUCCESS) {
×
2006
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2007
                  tstrerror(code));
2008
        goto _exit;
×
2009
      }
2010
    }
2011
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
691,416✔
2012
    SLastKey *pLastKey = &idxKey->key;
691,414✔
2013
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
691,414!
2014
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
×
2015
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
×
2016
                             .dirty = 0,
2017
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2018

2019
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
×
2020
        taosMemoryFreeClear(pLastCol);
×
2021
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2022
        goto _exit;
×
2023
      }
2024
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
×
2025
        taosMemoryFreeClear(pLastCol);
×
2026
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2027
        goto _exit;
×
2028
      }
2029
    }
2030

2031
    if (pLastCol == NULL) {
691,414!
2032
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
691,414✔
2033
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2034
    }
2035

2036
    taosMemoryFreeClear(pLastCol);
691,416!
2037
  }
2038

2039
  rocksMayWrite(pTsdb, false);
63,873✔
2040

2041
_exit:
63,874✔
2042
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
63,874✔
2043

2044
  for (int i = 0; i < numKeys; ++i) {
755,360✔
2045
    taosMemoryFree(keys_list[i]);
691,485✔
2046
  }
2047
  taosMemoryFree(keys_list);
63,875✔
2048
  taosMemoryFree(keys_list_sizes);
63,874✔
2049
  if (values_list) {
63,874✔
2050
    for (int i = 0; i < numKeys; ++i) {
755,368✔
2051
      rocksdb_free(values_list[i]);
691,496✔
2052
    }
2053
    taosMemoryFree(values_list);
63,872✔
2054
  }
2055
  taosMemoryFree(values_list_sizes);
63,875✔
2056
  taosArrayDestroy(remainCols);
63,873✔
2057
  taosMemoryFree(pTSchema);
63,873✔
2058

2059
  TAOS_RETURN(code);
63,873✔
2060
}
2061

2062
int32_t tsdbOpenCache(STsdb *pTsdb) {
13,709✔
2063
  int32_t code = 0, lino = 0;
13,709✔
2064
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
13,709✔
2065

2066
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
13,709✔
2067
  if (pCache == NULL) {
13,735!
2068
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2069
  }
2070

2071
  TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
13,735!
2072

2073
  TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
13,736!
2074

2075
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
13,736!
2076

2077
  taosLRUCacheSetStrictCapacity(pCache, false);
13,733✔
2078

2079
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
13,736✔
2080

2081
_err:
13,733✔
2082
  if (code) {
13,733!
2083
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2084
  }
2085

2086
  pTsdb->lruCache = pCache;
13,733✔
2087

2088
  TAOS_RETURN(code);
13,733✔
2089
}
2090

2091
void tsdbCloseCache(STsdb *pTsdb) {
13,735✔
2092
  SLRUCache *pCache = pTsdb->lruCache;
13,735✔
2093
  if (pCache) {
13,735!
2094
    taosLRUCacheEraseUnrefEntries(pCache);
13,735✔
2095

2096
    taosLRUCacheCleanup(pCache);
13,736✔
2097

2098
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
13,736✔
2099
  }
2100

2101
  tsdbCloseBCache(pTsdb);
13,736✔
2102
  tsdbClosePgCache(pTsdb);
13,736✔
2103
  tsdbCloseRocksCache(pTsdb);
13,736✔
2104
}
13,733✔
2105

2106
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
2107
  if (cacheType == 0) {  // last_row
×
2108
    *(uint64_t *)key = (uint64_t)uid;
×
2109
  } else {  // last
2110
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2111
  }
2112

2113
  *len = sizeof(uint64_t);
×
2114
}
×
2115

2116
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
2117
  tb_uid_t suid = 0;
×
2118

2119
  SMetaReader mr = {0};
×
2120
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
2121
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
2122
    metaReaderClear(&mr);  // table not esist
×
2123
    return 0;
×
2124
  }
2125

2126
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
2127
    suid = mr.me.ctbEntry.suid;
×
2128
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
2129
    suid = 0;
×
2130
  } else {
2131
    suid = 0;
×
2132
  }
2133

2134
  metaReaderClear(&mr);
×
2135

2136
  return suid;
×
2137
}
2138

2139
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
2140
  int32_t code = 0;
×
2141

2142
  if (pDelIdx) {
×
2143
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
2144
  }
2145

2146
  TAOS_RETURN(code);
×
2147
}
2148

2149
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
2150
  int32_t   code = 0;
×
2151
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
2152

2153
  for (; pDelData; pDelData = pDelData->pNext) {
×
2154
    if (!taosArrayPush(aDelData, pDelData)) {
×
2155
      TAOS_RETURN(terrno);
×
2156
    }
2157
  }
2158

2159
  TAOS_RETURN(code);
×
2160
}
2161

2162
static void freeTableInfoFunc(void *param) {
757✔
2163
  void **p = (void **)param;
757✔
2164
  taosMemoryFreeClear(*p);
757!
2165
}
757✔
2166

2167
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
788✔
2168
  if (!pReader->pTableMap) {
788✔
2169
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
337✔
2170
    if (!pReader->pTableMap) {
339!
2171
      return NULL;
×
2172
    }
2173

2174
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
339✔
2175
  }
2176

2177
  STableLoadInfo  *pInfo = NULL;
790✔
2178
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
790✔
2179
  if (!ppInfo) {
792✔
2180
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
756✔
2181
    if (pInfo) {
756!
2182
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
756!
2183
        return NULL;
×
2184
      }
2185
    }
2186

2187
    return pInfo;
756✔
2188
  }
2189

2190
  return *ppInfo;
36✔
2191
}
2192

2193
static uint64_t *getUidList(SCacheRowsReader *pReader) {
5,449✔
2194
  if (!pReader->uidList) {
5,449✔
2195
    int32_t numOfTables = pReader->numOfTables;
225✔
2196

2197
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
225✔
2198
    if (!pReader->uidList) {
224!
2199
      return NULL;
×
2200
    }
2201

2202
    for (int32_t i = 0; i < numOfTables; ++i) {
685✔
2203
      uint64_t uid = pReader->pTableList[i].uid;
461✔
2204
      pReader->uidList[i] = uid;
461✔
2205
    }
2206

2207
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
224✔
2208
  }
2209

2210
  return pReader->uidList;
5,455✔
2211
}
2212

2213
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
5,457✔
2214
                               bool isFile) {
2215
  int32_t   code = 0;
5,457✔
2216
  int32_t   numOfTables = pReader->numOfTables;
5,457✔
2217
  int64_t   suid = pReader->info.suid;
5,457✔
2218
  uint64_t *uidList = getUidList(pReader);
5,457✔
2219

2220
  if (!uidList) {
5,465✔
2221
    TAOS_RETURN(terrno);
1✔
2222
  }
2223

2224
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
5,489!
2225
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
28✔
2226
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
28!
2227
      continue;
2✔
2228
    }
2229

2230
    if (pTombBlk->minTbid.suid > suid ||
26!
2231
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
26!
2232
      break;
2233
    }
2234

2235
    STombBlock block = {0};
23✔
2236
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
23✔
2237
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
23!
2238
    if (code != TSDB_CODE_SUCCESS) {
23!
2239
      TAOS_RETURN(code);
×
2240
    }
2241

2242
    uint64_t        uid = uidList[j];
23✔
2243
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
23✔
2244
    if (!pInfo) {
23!
2245
      tTombBlockDestroy(&block);
×
2246
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
2247
    }
2248

2249
    if (pInfo->pTombData == NULL) {
23✔
2250
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
4✔
2251
    }
2252

2253
    STombRecord record = {0};
23✔
2254
    bool        finished = false;
23✔
2255
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
46✔
2256
      code = tTombBlockGet(&block, k, &record);
23✔
2257
      if (code != TSDB_CODE_SUCCESS) {
23!
2258
        finished = true;
×
2259
        break;
×
2260
      }
2261

2262
      if (record.suid < suid) {
23!
2263
        continue;
×
2264
      }
2265
      if (record.suid > suid) {
23!
2266
        finished = true;
×
2267
        break;
×
2268
      }
2269

2270
      bool newTable = false;
23✔
2271
      if (uid < record.uid) {
23✔
2272
        while (j < numOfTables && uidList[j] < record.uid) {
90!
2273
          ++j;
75✔
2274
          newTable = true;
75✔
2275
        }
2276

2277
        if (j >= numOfTables) {
15!
2278
          finished = true;
×
2279
          break;
×
2280
        }
2281

2282
        uid = uidList[j];
15✔
2283
      }
2284

2285
      if (record.uid < uid) {
23!
2286
        continue;
×
2287
      }
2288

2289
      if (newTable) {
23✔
2290
        pInfo = getTableLoadInfo(pReader, uid);
15✔
2291
        if (!pInfo) {
15!
2292
          code = TSDB_CODE_OUT_OF_MEMORY;
×
2293
          finished = true;
×
2294
          break;
×
2295
        }
2296
        if (pInfo->pTombData == NULL) {
15✔
2297
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
3✔
2298
          if (!pInfo->pTombData) {
3!
2299
            code = terrno;
×
2300
            finished = true;
×
2301
            break;
×
2302
          }
2303
        }
2304
      }
2305

2306
      if (record.version <= pReader->info.verRange.maxVer) {
23!
2307
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
2308
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
2309

2310
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
23✔
2311
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
46!
2312
          TAOS_RETURN(terrno);
×
2313
        }
2314
      }
2315
    }
2316

2317
    tTombBlockDestroy(&block);
23✔
2318

2319
    if (finished) {
23!
2320
      TAOS_RETURN(code);
×
2321
    }
2322
  }
2323

2324
  TAOS_RETURN(TSDB_CODE_SUCCESS);
5,464✔
2325
}
2326

2327
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
28✔
2328
  const TTombBlkArray *pBlkArray = NULL;
28✔
2329

2330
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
28!
2331

2332
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
28✔
2333
}
2334

2335
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
5,444✔
2336
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
5,444✔
2337
  const TTombBlkArray *pBlkArray = NULL;
5,444✔
2338

2339
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
5,444!
2340

2341
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
5,449✔
2342
}
2343

2344
typedef struct {
2345
  SMergeTree  mergeTree;
2346
  SMergeTree *pMergeTree;
2347
} SFSLastIter;
2348

2349
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
5,490✔
2350
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
2351
  int32_t code = 0;
5,490✔
2352
  destroySttBlockReader(pr->pLDataIterArray, NULL);
5,490✔
2353
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
5,520✔
2354
  if (pr->pLDataIterArray == NULL) return terrno;
5,525!
2355

2356
  SMergeTreeConf conf = {
5,525✔
2357
      .uid = uid,
2358
      .suid = suid,
2359
      .pTsdb = pTsdb,
2360
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
2361
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
2362
      .strictTimeRange = false,
2363
      .pSchema = pTSchema,
2364
      .pCurrentFileset = pFileSet,
2365
      .backward = 1,
2366
      .pSttFileBlockIterArray = pr->pLDataIterArray,
5,525✔
2367
      .pCols = aCols,
2368
      .numOfCols = nCols,
2369
      .loadTombFn = loadSttTomb,
2370
      .pReader = pr,
2371
      .idstr = pr->idstr,
5,525✔
2372
      .pCurRowKey = &pr->rowKey,
5,525✔
2373
  };
2374

2375
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
5,525!
2376

2377
  iter->pMergeTree = &iter->mergeTree;
5,457✔
2378

2379
  TAOS_RETURN(code);
5,457✔
2380
}
2381

2382
static int32_t lastIterClose(SFSLastIter **iter) {
4✔
2383
  int32_t code = 0;
4✔
2384

2385
  if ((*iter)->pMergeTree) {
4!
2386
    tMergeTreeClose((*iter)->pMergeTree);
4✔
2387
    (*iter)->pMergeTree = NULL;
4✔
2388
  }
2389

2390
  *iter = NULL;
4✔
2391

2392
  TAOS_RETURN(code);
4✔
2393
}
2394

2395
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
5,455✔
2396
  bool hasVal = false;
5,455✔
2397
  *ppRow = NULL;
5,455✔
2398

2399
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
5,455✔
2400
  if (code != 0) {
5,457!
2401
    return code;
×
2402
  }
2403

2404
  if (!hasVal) {
5,457✔
2405
    *ppRow = NULL;
5,072✔
2406
    TAOS_RETURN(code);
5,072✔
2407
  }
2408

2409
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
385✔
2410
  TAOS_RETURN(code);
385✔
2411
}
2412

2413
typedef enum SFSNEXTROWSTATES {
2414
  SFSNEXTROW_FS,
2415
  SFSNEXTROW_FILESET,
2416
  SFSNEXTROW_INDEXLIST,
2417
  SFSNEXTROW_BRINBLOCK,
2418
  SFSNEXTROW_BRINRECORD,
2419
  SFSNEXTROW_BLOCKDATA,
2420
  SFSNEXTROW_BLOCKROW,
2421
  SFSNEXTROW_NEXTSTTROW
2422
} SFSNEXTROWSTATES;
2423

2424
struct CacheNextRowIter;
2425

2426
typedef struct SFSNextRowIter {
2427
  SFSNEXTROWSTATES         state;         // [input]
2428
  SBlockIdx               *pBlockIdxExp;  // [input]
2429
  STSchema                *pTSchema;      // [input]
2430
  tb_uid_t                 suid;
2431
  tb_uid_t                 uid;
2432
  int32_t                  iFileSet;
2433
  STFileSet               *pFileSet;
2434
  TFileSetArray           *aDFileSet;
2435
  SArray                  *pIndexList;
2436
  int32_t                  iBrinIndex;
2437
  SBrinBlock               brinBlock;
2438
  SBrinBlock              *pBrinBlock;
2439
  int32_t                  iBrinRecord;
2440
  SBrinRecord              brinRecord;
2441
  SBlockData               blockData;
2442
  SBlockData              *pBlockData;
2443
  int32_t                  nRow;
2444
  int32_t                  iRow;
2445
  TSDBROW                  row;
2446
  int64_t                  lastTs;
2447
  SFSLastIter              lastIter;
2448
  SFSLastIter             *pLastIter;
2449
  int8_t                   lastEmpty;
2450
  TSDBROW                 *pLastRow;
2451
  SRow                    *pTSRow;
2452
  SRowMerger               rowMerger;
2453
  SCacheRowsReader        *pr;
2454
  struct CacheNextRowIter *pRowIter;
2455
} SFSNextRowIter;
2456

2457
static void clearLastFileSet(SFSNextRowIter *state);
2458

2459
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
795✔
2460
                                int nCols) {
2461
  int32_t         code = 0, lino = 0;
795✔
2462
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
795✔
2463
  STsdb          *pTsdb = state->pr->pTsdb;
795✔
2464

2465
  if (SFSNEXTROW_FS == state->state) {
795✔
2466
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
790✔
2467

2468
    state->state = SFSNEXTROW_FILESET;
790✔
2469
  }
2470

2471
  if (SFSNEXTROW_FILESET == state->state) {
795✔
2472
  _next_fileset:
789✔
2473
    clearLastFileSet(state);
5,848✔
2474

2475
    if (--state->iFileSet < 0) {
5,824✔
2476
      *ppRow = NULL;
334✔
2477

2478
      TAOS_RETURN(code);
334✔
2479
    } else {
2480
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
5,490✔
2481
    }
2482

2483
    STFileObj **pFileObj = state->pFileSet->farr;
5,490✔
2484
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
5,490!
2485
      if (state->pFileSet != state->pr->pCurFileSet) {
28!
2486
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
28✔
2487
        const char           *filesName[4] = {0};
28✔
2488
        if (pFileObj[0] != NULL) {
28!
2489
          conf.files[0].file = *pFileObj[0]->f;
28✔
2490
          conf.files[0].exist = true;
28✔
2491
          filesName[0] = pFileObj[0]->fname;
28✔
2492

2493
          conf.files[1].file = *pFileObj[1]->f;
28✔
2494
          conf.files[1].exist = true;
28✔
2495
          filesName[1] = pFileObj[1]->fname;
28✔
2496

2497
          conf.files[2].file = *pFileObj[2]->f;
28✔
2498
          conf.files[2].exist = true;
28✔
2499
          filesName[2] = pFileObj[2]->fname;
28✔
2500
        }
2501

2502
        if (pFileObj[3] != NULL) {
28!
2503
          conf.files[3].exist = true;
28✔
2504
          conf.files[3].file = *pFileObj[3]->f;
28✔
2505
          filesName[3] = pFileObj[3]->fname;
28✔
2506
        }
2507

2508
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
28!
2509

2510
        state->pr->pCurFileSet = state->pFileSet;
28✔
2511

2512
        code = loadDataTomb(state->pr, state->pr->pFileReader);
28✔
2513
        if (code != TSDB_CODE_SUCCESS) {
28!
2514
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2515
                    tstrerror(code));
2516
          TAOS_CHECK_GOTO(code, &lino, _err);
×
2517
        }
2518

2519
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
28!
2520
      }
2521

2522
      if (!state->pIndexList) {
28!
2523
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
28✔
2524
        if (!state->pIndexList) {
28!
2525
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
2526
        }
2527
      } else {
2528
        taosArrayClear(state->pIndexList);
×
2529
      }
2530

2531
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
28✔
2532

2533
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
56✔
2534
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
28✔
2535
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
28!
2536
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
28✔
2537
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
8!
2538
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
2539
            }
2540
          }
2541
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
2542
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
2543
          break;
2544
        }
2545
      }
2546

2547
      int indexSize = TARRAY_SIZE(state->pIndexList);
28✔
2548
      if (indexSize <= 0) {
28✔
2549
        goto _check_stt_data;
24✔
2550
      }
2551

2552
      state->state = SFSNEXTROW_INDEXLIST;
4✔
2553
      state->iBrinIndex = 1;
4✔
2554
    }
2555

2556
  _check_stt_data:
5,462✔
2557
    if (state->pFileSet != state->pr->pCurFileSet) {
5,490✔
2558
      state->pr->pCurFileSet = state->pFileSet;
5,435✔
2559
    }
2560

2561
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
5,490!
2562
                                 state->pr, state->lastTs, aCols, nCols),
2563
                    &lino, _err);
2564

2565
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
5,452!
2566

2567
    if (!state->pLastRow) {
5,456✔
2568
      state->lastEmpty = 1;
5,064✔
2569

2570
      if (SFSNEXTROW_INDEXLIST != state->state) {
5,064✔
2571
        clearLastFileSet(state);
5,060✔
2572
        goto _next_fileset;
5,058✔
2573
      }
2574
    } else {
2575
      state->lastEmpty = 0;
392✔
2576

2577
      if (SFSNEXTROW_INDEXLIST != state->state) {
392!
2578
        state->state = SFSNEXTROW_NEXTSTTROW;
398✔
2579

2580
        *ppRow = state->pLastRow;
398✔
2581
        state->pLastRow = NULL;
398✔
2582

2583
        TAOS_RETURN(code);
398✔
2584
      }
2585
    }
2586

2587
    state->pLastIter = &state->lastIter;
×
2588
  }
2589

2590
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
4!
2591
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
6!
2592

2593
    if (!state->pLastRow) {
6✔
2594
      if (state->pLastIter) {
1!
2595
        code = lastIterClose(&state->pLastIter);
×
2596
        if (code != TSDB_CODE_SUCCESS) {
×
2597
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2598
                    tstrerror(code));
2599
          TAOS_RETURN(code);
×
2600
        }
2601
      }
2602

2603
      clearLastFileSet(state);
1✔
2604
      state->state = SFSNEXTROW_FILESET;
1✔
2605
      goto _next_fileset;
1✔
2606
    } else {
2607
      *ppRow = state->pLastRow;
5✔
2608
      state->pLastRow = NULL;
5✔
2609

2610
      TAOS_RETURN(code);
5✔
2611
    }
2612
  }
2613

2614
  if (SFSNEXTROW_INDEXLIST == state->state) {
×
2615
    SBrinBlk *pBrinBlk = NULL;
4✔
2616
  _next_brinindex:
4✔
2617
    if (--state->iBrinIndex < 0) {
4!
2618
      if (state->pLastRow) {
×
2619
        state->state = SFSNEXTROW_NEXTSTTROW;
×
2620
        *ppRow = state->pLastRow;
×
2621
        state->pLastRow = NULL;
×
2622
        return code;
×
2623
      }
2624

2625
      clearLastFileSet(state);
×
2626
      goto _next_fileset;
×
2627
    } else {
2628
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
4✔
2629
    }
2630

2631
    if (!state->pBrinBlock) {
4!
2632
      state->pBrinBlock = &state->brinBlock;
4✔
2633
    } else {
2634
      tBrinBlockClear(&state->brinBlock);
×
2635
    }
2636

2637
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
4!
2638

2639
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
4✔
2640
    state->state = SFSNEXTROW_BRINBLOCK;
4✔
2641
  }
2642

2643
  if (SFSNEXTROW_BRINBLOCK == state->state) {
×
2644
  _next_brinrecord:
4✔
2645
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
4!
2646
      tBrinBlockClear(&state->brinBlock);
×
2647
      goto _next_brinindex;
×
2648
    }
2649

2650
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
4!
2651

2652
    SBrinRecord *pRecord = &state->brinRecord;
4✔
2653
    if (pRecord->uid != state->uid) {
4!
2654
      // TODO: goto next brin block early
2655
      --state->iBrinRecord;
×
2656
      goto _next_brinrecord;
×
2657
    }
2658

2659
    state->state = SFSNEXTROW_BRINRECORD;
4✔
2660
  }
2661

2662
  if (SFSNEXTROW_BRINRECORD == state->state) {
×
2663
    SBrinRecord *pRecord = &state->brinRecord;
4✔
2664

2665
    if (!state->pBlockData) {
4!
2666
      state->pBlockData = &state->blockData;
4✔
2667

2668
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
4!
2669
    } else {
2670
      tBlockDataReset(state->pBlockData);
×
2671
    }
2672

2673
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
4!
2674
      --nCols;
4✔
2675
      ++aCols;
4✔
2676
    }
2677

2678
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
4!
2679
                                                      state->pTSchema, aCols, nCols),
2680
                    &lino, _err);
2681

2682
    state->nRow = state->blockData.nRow;
4✔
2683
    state->iRow = state->nRow - 1;
4✔
2684

2685
    state->state = SFSNEXTROW_BLOCKROW;
4✔
2686
  }
2687

2688
  if (SFSNEXTROW_BLOCKROW == state->state) {
×
2689
    if (state->iRow < 0) {
4!
2690
      --state->iBrinRecord;
×
2691
      goto _next_brinrecord;
×
2692
    }
2693

2694
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
4✔
2695
    if (!state->pLastIter) {
4!
2696
      *ppRow = &state->row;
×
2697
      --state->iRow;
×
2698
      return code;
4✔
2699
    }
2700

2701
    if (!state->pLastRow) {
4!
2702
      // get next row from fslast and process with fs row, --state->Row if select fs row
2703
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
4!
2704
    }
2705

2706
    if (!state->pLastRow) {
4!
2707
      if (state->pLastIter) {
4!
2708
        code = lastIterClose(&state->pLastIter);
4✔
2709
        if (code != TSDB_CODE_SUCCESS) {
4!
2710
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2711
                    tstrerror(code));
2712
          TAOS_RETURN(code);
×
2713
        }
2714
      }
2715

2716
      *ppRow = &state->row;
4✔
2717
      --state->iRow;
4✔
2718
      return code;
4✔
2719
    }
2720

2721
    // process state->pLastRow & state->row
2722
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
2723
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
2724
    if (lastRowTs > rowTs) {
×
2725
      *ppRow = state->pLastRow;
×
2726
      state->pLastRow = NULL;
×
2727

2728
      TAOS_RETURN(code);
×
2729
    } else if (lastRowTs < rowTs) {
×
2730
      *ppRow = &state->row;
×
2731
      --state->iRow;
×
2732

2733
      TAOS_RETURN(code);
×
2734
    } else {
2735
      // TODO: merge rows and *ppRow = mergedRow
2736
      SRowMerger *pMerger = &state->rowMerger;
×
2737
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
2738
      if (code != TSDB_CODE_SUCCESS) {
×
2739
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2740
                  tstrerror(code));
2741
        TAOS_RETURN(code);
×
2742
      }
2743

2744
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
2745
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
2746

2747
      if (state->pTSRow) {
×
2748
        taosMemoryFree(state->pTSRow);
×
2749
        state->pTSRow = NULL;
×
2750
      }
2751

2752
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
2753

2754
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
2755
      *ppRow = &state->row;
×
2756
      --state->iRow;
×
2757

2758
      tsdbRowMergerClear(pMerger);
×
2759

2760
      TAOS_RETURN(code);
×
2761
    }
2762
  }
2763

2764
_err:
×
2765
  clearLastFileSet(state);
×
2766

2767
  *ppRow = NULL;
×
2768

2769
  if (code) {
×
2770
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
2771
              tstrerror(code));
2772
  }
2773

2774
  TAOS_RETURN(code);
×
2775
}
2776

2777
typedef enum SMEMNEXTROWSTATES {
2778
  SMEMNEXTROW_ENTER,
2779
  SMEMNEXTROW_NEXT,
2780
} SMEMNEXTROWSTATES;
2781

2782
typedef struct SMemNextRowIter {
2783
  SMEMNEXTROWSTATES state;
2784
  STbData          *pMem;  // [input]
2785
  STbDataIter       iter;  // mem buffer skip list iterator
2786
  int64_t           lastTs;
2787
} SMemNextRowIter;
2788

2789
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
363✔
2790
                                 int nCols) {
2791
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
363✔
2792
  int32_t          code = 0;
363✔
2793
  *pIgnoreEarlierTs = false;
363✔
2794
  switch (state->state) {
363!
2795
    case SMEMNEXTROW_ENTER: {
350✔
2796
      if (state->pMem != NULL) {
350!
2797
        /*
2798
        if (state->pMem->maxKey <= state->lastTs) {
2799
          *ppRow = NULL;
2800
          *pIgnoreEarlierTs = true;
2801

2802
          TAOS_RETURN(code);
2803
        }
2804
        */
2805
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
350✔
2806

2807
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
350!
2808
        if (pMemRow) {
350!
2809
          *ppRow = pMemRow;
350✔
2810
          state->state = SMEMNEXTROW_NEXT;
350✔
2811

2812
          TAOS_RETURN(code);
350✔
2813
        }
2814
      }
2815

2816
      *ppRow = NULL;
×
2817

2818
      TAOS_RETURN(code);
×
2819
    }
2820
    case SMEMNEXTROW_NEXT:
13✔
2821
      if (tsdbTbDataIterNext(&state->iter)) {
13✔
2822
        *ppRow = tsdbTbDataIterGet(&state->iter);
6!
2823

2824
        TAOS_RETURN(code);
6✔
2825
      } else {
2826
        *ppRow = NULL;
7✔
2827

2828
        TAOS_RETURN(code);
7✔
2829
      }
2830
    default:
×
2831
      break;
×
2832
  }
2833

2834
_err:
×
2835
  *ppRow = NULL;
×
2836

2837
  TAOS_RETURN(code);
×
2838
}
2839

2840
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
764✔
2841
  bool deleted = false;
764✔
2842
  while (*iSkyline > 0) {
764✔
2843
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
6✔
2844
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
6✔
2845

2846
    if (key->ts > pItemBack->ts) {
6✔
2847
      return false;
4✔
2848
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
2!
2849
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
1!
2850
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
2851
        return true;
1✔
2852
      } else {
2853
        if (*iSkyline > 1) {
×
2854
          --*iSkyline;
×
2855
        } else {
2856
          return false;
×
2857
        }
2858
      }
2859
    } else {
2860
      if (*iSkyline > 1) {
1!
2861
        --*iSkyline;
×
2862
      } else {
2863
        return false;
1✔
2864
      }
2865
    }
2866
  }
2867

2868
  return deleted;
758✔
2869
}
2870

2871
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2872
                                  int nCols);
2873
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2874

2875
typedef struct {
2876
  TSDBROW             *pRow;
2877
  bool                 stop;
2878
  bool                 next;
2879
  bool                 ignoreEarlierTs;
2880
  void                *iter;
2881
  _next_row_fn_t       nextRowFn;
2882
  _next_row_clear_fn_t nextRowClearFn;
2883
} TsdbNextRowState;
2884

2885
typedef struct CacheNextRowIter {
2886
  SArray           *pMemDelData;
2887
  SArray           *pSkyline;
2888
  int64_t           iSkyline;
2889
  SBlockIdx         idx;
2890
  SMemNextRowIter   memState;
2891
  SMemNextRowIter   imemState;
2892
  SFSNextRowIter    fsState;
2893
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
2894
  TsdbNextRowState  input[3];
2895
  SCacheRowsReader *pr;
2896
  STsdb            *pTsdb;
2897
} CacheNextRowIter;
2898

2899
int32_t clearNextRowFromFS(void *iter) {
787✔
2900
  int32_t code = 0;
787✔
2901

2902
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
787✔
2903
  if (!state) {
787!
2904
    TAOS_RETURN(code);
×
2905
  }
2906

2907
  if (state->pLastIter) {
787!
2908
    code = lastIterClose(&state->pLastIter);
×
2909
    if (code != TSDB_CODE_SUCCESS) {
×
2910
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2911
      TAOS_RETURN(code);
×
2912
    }
2913
  }
2914

2915
  if (state->pBlockData) {
787✔
2916
    tBlockDataDestroy(state->pBlockData);
4✔
2917
    state->pBlockData = NULL;
4✔
2918
  }
2919

2920
  if (state->pBrinBlock) {
787✔
2921
    tBrinBlockDestroy(state->pBrinBlock);
4✔
2922
    state->pBrinBlock = NULL;
4✔
2923
  }
2924

2925
  if (state->pIndexList) {
787✔
2926
    taosArrayDestroy(state->pIndexList);
28✔
2927
    state->pIndexList = NULL;
28✔
2928
  }
2929

2930
  if (state->pTSRow) {
787!
2931
    taosMemoryFree(state->pTSRow);
×
2932
    state->pTSRow = NULL;
×
2933
  }
2934

2935
  if (state->pRowIter->pSkyline) {
787✔
2936
    taosArrayDestroy(state->pRowIter->pSkyline);
752✔
2937
    state->pRowIter->pSkyline = NULL;
753✔
2938
  }
2939

2940
  TAOS_RETURN(code);
788✔
2941
}
2942

2943
static void clearLastFileSet(SFSNextRowIter *state) {
10,854✔
2944
  if (state->pLastIter) {
10,854!
2945
    int code = lastIterClose(&state->pLastIter);
×
2946
    if (code != TSDB_CODE_SUCCESS) {
×
2947
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2948
      return;
×
2949
    }
2950
  }
2951

2952
  if (state->pBlockData) {
10,854!
2953
    tBlockDataDestroy(state->pBlockData);
×
2954
    state->pBlockData = NULL;
×
2955
  }
2956

2957
  if (state->pr->pFileReader) {
10,854✔
2958
    tsdbDataFileReaderClose(&state->pr->pFileReader);
25✔
2959
    state->pr->pFileReader = NULL;
25✔
2960

2961
    state->pr->pCurFileSet = NULL;
25✔
2962
  }
2963

2964
  if (state->pTSRow) {
10,854!
2965
    taosMemoryFree(state->pTSRow);
×
2966
    state->pTSRow = NULL;
×
2967
  }
2968

2969
  if (state->pRowIter->pSkyline) {
10,854✔
2970
    taosArrayDestroy(state->pRowIter->pSkyline);
1✔
2971
    state->pRowIter->pSkyline = NULL;
1✔
2972

2973
    void   *pe = NULL;
1✔
2974
    int32_t iter = 0;
1✔
2975
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
2✔
2976
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
1✔
2977
      taosArrayDestroy(pInfo->pTombData);
1✔
2978
      pInfo->pTombData = NULL;
1✔
2979
    }
2980
  }
2981
}
2982

2983
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
789✔
2984
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
2985
                               SCacheRowsReader *pr) {
2986
  int32_t code = 0, lino = 0;
789✔
2987

2988
  STbData *pMem = NULL;
789✔
2989
  if (pReadSnap->pMem) {
789!
2990
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
790✔
2991
  }
2992

2993
  STbData *pIMem = NULL;
790✔
2994
  if (pReadSnap->pIMem) {
790!
2995
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
2996
  }
2997

2998
  pIter->pTsdb = pTsdb;
790✔
2999

3000
  pIter->pMemDelData = NULL;
790✔
3001

3002
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
790!
3003

3004
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
791✔
3005

3006
  pIter->fsState.pRowIter = pIter;
791✔
3007
  pIter->fsState.state = SFSNEXTROW_FS;
791✔
3008
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
791✔
3009
  pIter->fsState.pBlockIdxExp = &pIter->idx;
791✔
3010
  pIter->fsState.pTSchema = pTSchema;
791✔
3011
  pIter->fsState.suid = suid;
791✔
3012
  pIter->fsState.uid = uid;
791✔
3013
  pIter->fsState.lastTs = lastTs;
791✔
3014
  pIter->fsState.pr = pr;
791✔
3015

3016
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
791✔
3017
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
791✔
3018
  pIter->input[2] =
791✔
3019
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
791✔
3020

3021
  if (pMem) {
791✔
3022
    pIter->memState.pMem = pMem;
351✔
3023
    pIter->memState.state = SMEMNEXTROW_ENTER;
351✔
3024
    pIter->memState.lastTs = lastTs;
351✔
3025
    pIter->input[0].stop = false;
351✔
3026
    pIter->input[0].next = true;
351✔
3027
  }
3028

3029
  if (pIMem) {
791!
3030
    pIter->imemState.pMem = pIMem;
×
3031
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
3032
    pIter->imemState.lastTs = lastTs;
×
3033
    pIter->input[1].stop = false;
×
3034
    pIter->input[1].next = true;
×
3035
  }
3036

3037
  pIter->pr = pr;
791✔
3038

3039
_err:
791✔
3040
  TAOS_RETURN(code);
791✔
3041
}
3042

3043
static void nextRowIterClose(CacheNextRowIter *pIter) {
787✔
3044
  for (int i = 0; i < 3; ++i) {
3,150✔
3045
    if (pIter->input[i].nextRowClearFn) {
2,365✔
3046
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
789✔
3047
    }
3048
  }
3049

3050
  if (pIter->pSkyline) {
785!
3051
    taosArrayDestroy(pIter->pSkyline);
×
3052
  }
3053

3054
  if (pIter->pMemDelData) {
785!
3055
    taosArrayDestroy(pIter->pMemDelData);
790✔
3056
  }
3057
}
786✔
3058

3059
// iterate next row non deleted backward ts, version (from high to low)
3060
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
809✔
3061
                              int16_t *aCols, int nCols) {
3062
  int32_t code = 0, lino = 0;
809✔
3063

3064
  for (;;) {
1✔
3065
    for (int i = 0; i < 3; ++i) {
3,233✔
3066
      if (pIter->input[i].next && !pIter->input[i].stop) {
2,424!
3067
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
1,158!
3068
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3069
                        &lino, _err);
3070

3071
        if (pIter->input[i].pRow == NULL) {
1,157✔
3072
          pIter->input[i].stop = true;
394✔
3073
          pIter->input[i].next = false;
394✔
3074
        }
3075
      }
3076
    }
3077

3078
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
809!
3079
      *ppRow = NULL;
44✔
3080
      *pIgnoreEarlierTs =
44✔
3081
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
44!
3082

3083
      TAOS_RETURN(code);
806✔
3084
    }
3085

3086
    // select maxpoint(s) from mem, imem, fs and last
3087
    TSDBROW *max[4] = {0};
765✔
3088
    int      iMax[4] = {-1, -1, -1, -1};
765✔
3089
    int      nMax = 0;
765✔
3090
    SRowKey  maxKey = {.ts = TSKEY_MIN};
765✔
3091

3092
    for (int i = 0; i < 3; ++i) {
3,050✔
3093
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
2,285!
3094
        STsdbRowKey tsdbRowKey = {0};
764✔
3095
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
764✔
3096

3097
        // merging & deduplicating on client side
3098
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
764✔
3099
        if (c <= 0) {
764✔
3100
          if (c < 0) {
763!
3101
            nMax = 0;
763✔
3102
            maxKey = tsdbRowKey.key;
763✔
3103
          }
3104

3105
          iMax[nMax] = i;
763✔
3106
          max[nMax++] = pIter->input[i].pRow;
763✔
3107
        }
3108
        pIter->input[i].next = false;
764✔
3109
      }
3110
    }
3111

3112
    // delete detection
3113
    TSDBROW *merge[4] = {0};
765✔
3114
    int      iMerge[4] = {-1, -1, -1, -1};
765✔
3115
    int      nMerge = 0;
765✔
3116
    for (int i = 0; i < nMax; ++i) {
1,529✔
3117
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
763✔
3118

3119
      if (!pIter->pSkyline) {
763✔
3120
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
752✔
3121
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
749!
3122

3123
        uint64_t        uid = pIter->idx.uid;
749✔
3124
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
749✔
3125
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
753!
3126

3127
        if (pInfo->pTombData == NULL) {
753✔
3128
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
750✔
3129
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
751!
3130
        }
3131

3132
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
754!
3133
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3134
        }
3135

3136
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
754✔
3137
        if (delSize > 0) {
754✔
3138
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
6✔
3139
          TAOS_CHECK_GOTO(code, &lino, _err);
6!
3140
        }
3141
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
754✔
3142
      }
3143

3144
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
765✔
3145
      if (!deleted) {
764✔
3146
        iMerge[nMerge] = iMax[i];
763✔
3147
        merge[nMerge++] = max[i];
763✔
3148
      }
3149

3150
      pIter->input[iMax[i]].next = deleted;
764✔
3151
    }
3152

3153
    if (nMerge > 0) {
766✔
3154
      pIter->input[iMerge[0]].next = true;
765✔
3155

3156
      *ppRow = merge[0];
765✔
3157

3158
      TAOS_RETURN(code);
765✔
3159
    }
3160
  }
3161

3162
_err:
×
3163

3164
  if (code) {
×
3165
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3166
  }
3167

3168
  TAOS_RETURN(code);
×
3169
}
3170

3171
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
790✔
3172
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
790✔
3173
  if (NULL == pColArray) {
790!
3174
    TAOS_RETURN(terrno);
×
3175
  }
3176

3177
  for (int32_t i = 0; i < nCols; ++i) {
2,536✔
3178
    int16_t  slotId = slotIds[i];
1,745✔
3179
    SLastCol col = {.rowKey.ts = 0,
1,745✔
3180
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
1,745✔
3181
    if (!taosArrayPush(pColArray, &col)) {
1,746!
3182
      TAOS_RETURN(terrno);
×
3183
    }
3184
  }
3185
  *ppColArray = pColArray;
791✔
3186

3187
  TAOS_RETURN(TSDB_CODE_SUCCESS);
791✔
3188
}
3189

3190
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
118✔
3191
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
118✔
3192
  *ppDst = taosMemoryMalloc(len);
118✔
3193
  if (NULL == *ppDst) {
118!
3194
    TAOS_RETURN(terrno);
×
3195
  }
3196
  memcpy(*ppDst, pSrc, len);
118✔
3197

3198
  TAOS_RETURN(TSDB_CODE_SUCCESS);
118✔
3199
}
3200

3201
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
357✔
3202
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
357✔
3203
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
118✔
3204
  }
3205

3206
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
239!
3207
    TAOS_RETURN(TSDB_CODE_SUCCESS);
223✔
3208
  }
3209

3210
  taosMemoryFreeClear(pReader->pCurrSchema);
16!
3211
  TAOS_RETURN(
16✔
3212
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
3213
}
3214

3215
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
266✔
3216
                            int nCols, int16_t *slotIds) {
3217
  int32_t   code = 0, lino = 0;
266✔
3218
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
266✔
3219
  int16_t   nLastCol = nCols;
266✔
3220
  int16_t   noneCol = 0;
266✔
3221
  bool      setNoneCol = false;
266✔
3222
  bool      hasRow = false;
266✔
3223
  bool      ignoreEarlierTs = false;
266✔
3224
  SArray   *pColArray = NULL;
266✔
3225
  SColVal  *pColVal = &(SColVal){0};
266✔
3226

3227
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
266!
3228

3229
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
266✔
3230
  if (NULL == aColArray) {
266!
3231
    taosArrayDestroy(pColArray);
×
3232

3233
    TAOS_RETURN(terrno);
×
3234
  }
3235

3236
  for (int i = 0; i < nCols; ++i) {
822✔
3237
    if (!taosArrayPush(aColArray, &aCols[i])) {
1,112!
3238
      taosArrayDestroy(pColArray);
×
3239

3240
      TAOS_RETURN(terrno);
×
3241
    }
3242
  }
3243

3244
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
266✔
3245

3246
  // inverse iterator
3247
  CacheNextRowIter iter = {0};
266✔
3248
  code =
3249
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
266✔
3250
  TAOS_CHECK_GOTO(code, &lino, _err);
266!
3251

3252
  do {
3253
    TSDBROW *pRow = NULL;
284✔
3254
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
284✔
3255

3256
    if (!pRow) {
284✔
3257
      break;
258✔
3258
    }
3259

3260
    hasRow = true;
250✔
3261

3262
    int32_t sversion = TSDBROW_SVERSION(pRow);
250✔
3263
    if (sversion != -1) {
250✔
3264
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
146!
3265

3266
      pTSchema = pr->pCurrSchema;
146✔
3267
    }
3268
    // int16_t nCol = pTSchema->numOfCols;
3269

3270
    STsdbRowKey rowKey = {0};
250✔
3271
    tsdbRowGetKey(pRow, &rowKey);
250✔
3272

3273
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
250✔
3274
      lastRowKey = rowKey;
239✔
3275

3276
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
737✔
3277
        if (iCol >= nLastCol) {
498!
3278
          break;
×
3279
        }
3280
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
498✔
3281
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
498!
3282
          if (!setNoneCol) {
×
3283
            noneCol = iCol;
×
3284
            setNoneCol = true;
×
3285
          }
3286
          continue;
110✔
3287
        }
3288
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
498✔
3289
          continue;
80✔
3290
        }
3291
        if (slotIds[iCol] == 0) {
418✔
3292
          STColumn *pTColumn = &pTSchema->columns[0];
30✔
3293
          *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
30✔
3294

3295
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
30✔
3296
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
30!
3297

3298
          taosArraySet(pColArray, 0, &colTmp);
30✔
3299
          continue;
30✔
3300
        }
3301
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
388✔
3302

3303
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
388✔
3304
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
388!
3305

3306
        if (!COL_VAL_IS_VALUE(pColVal)) {
388✔
3307
          if (!setNoneCol) {
26✔
3308
            noneCol = iCol;
15✔
3309
            setNoneCol = true;
15✔
3310
          }
3311
        } else {
3312
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
362✔
3313
          if (aColIndex >= 0) {
362✔
3314
            taosArrayRemove(aColArray, aColIndex);
193✔
3315
          }
3316
        }
3317
      }
3318
      if (!setNoneCol) {
239✔
3319
        // done, goto return pColArray
3320
        break;
224✔
3321
      } else {
3322
        continue;
15✔
3323
      }
3324
    }
3325

3326
    // merge into pColArray
3327
    setNoneCol = false;
11✔
3328
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
36✔
3329
      if (iCol >= nLastCol) {
25!
3330
        break;
×
3331
      }
3332
      // high version's column value
3333
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
25!
3334
        continue;
×
3335
      }
3336

3337
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
25✔
3338
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
25!
3339
        continue;
×
3340
      }
3341
      SColVal *tColVal = &lastColVal->colVal;
25✔
3342
      if (COL_VAL_IS_VALUE(tColVal)) continue;
25✔
3343

3344
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
19✔
3345
      if (COL_VAL_IS_VALUE(pColVal)) {
19✔
3346
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
16✔
3347
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
16!
3348

3349
        tsdbCacheFreeSLastColItem(lastColVal);
16✔
3350
        taosArraySet(pColArray, iCol, &lastCol);
16✔
3351
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
16✔
3352
        if (aColIndex >= 0) {
16!
3353
          taosArrayRemove(aColArray, aColIndex);
16✔
3354
        }
3355
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
3!
3356
        noneCol = iCol;
3✔
3357
        setNoneCol = true;
3✔
3358
      }
3359
    }
3360
  } while (setNoneCol);
26✔
3361

3362
  if (!hasRow) {
266✔
3363
    if (ignoreEarlierTs) {
27!
3364
      taosArrayDestroy(pColArray);
×
3365
      pColArray = NULL;
×
3366
    } else {
3367
      taosArrayClear(pColArray);
27✔
3368
    }
3369
  }
3370
  *ppLastArray = pColArray;
266✔
3371

3372
  nextRowIterClose(&iter);
266✔
3373
  taosArrayDestroy(aColArray);
266✔
3374

3375
  TAOS_RETURN(code);
266✔
3376

3377
_err:
×
3378
  nextRowIterClose(&iter);
×
3379
  // taosMemoryFreeClear(pTSchema);
3380
  *ppLastArray = NULL;
×
3381
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
3382
  taosArrayDestroy(aColArray);
×
3383

3384
  if (code) {
×
3385
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3386
              tstrerror(code));
3387
  }
3388

3389
  TAOS_RETURN(code);
×
3390
}
3391

3392
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
524✔
3393
                               int nCols, int16_t *slotIds) {
3394
  int32_t   code = 0, lino = 0;
524✔
3395
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
524✔
3396
  int16_t   nLastCol = nCols;
524✔
3397
  int16_t   noneCol = 0;
524✔
3398
  bool      setNoneCol = false;
524✔
3399
  bool      hasRow = false;
524✔
3400
  bool      ignoreEarlierTs = false;
524✔
3401
  SArray   *pColArray = NULL;
524✔
3402
  SColVal  *pColVal = &(SColVal){0};
524✔
3403

3404
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
524!
3405

3406
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
524✔
3407
  if (NULL == aColArray) {
525!
3408
    taosArrayDestroy(pColArray);
×
3409

3410
    TAOS_RETURN(terrno);
×
3411
  }
3412

3413
  for (int i = 0; i < nCols; ++i) {
1,716✔
3414
    if (!taosArrayPush(aColArray, &aCols[i])) {
2,383!
3415
      taosArrayDestroy(pColArray);
×
3416

3417
      TAOS_RETURN(terrno);
×
3418
    }
3419
  }
3420

3421
  // inverse iterator
3422
  CacheNextRowIter iter = {0};
524✔
3423
  code =
3424
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
524✔
3425
  TAOS_CHECK_GOTO(code, &lino, _err);
525!
3426

3427
  do {
3428
    TSDBROW *pRow = NULL;
525✔
3429
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
525✔
3430

3431
    if (!pRow) {
523✔
3432
      break;
10✔
3433
    }
3434

3435
    hasRow = true;
513✔
3436

3437
    int32_t sversion = TSDBROW_SVERSION(pRow);
513✔
3438
    if (sversion != -1) {
513✔
3439
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
211!
3440

3441
      pTSchema = pr->pCurrSchema;
210✔
3442
    }
3443
    // int16_t nCol = pTSchema->numOfCols;
3444

3445
    STsdbRowKey rowKey = {0};
512✔
3446
    tsdbRowGetKey(pRow, &rowKey);
512✔
3447

3448
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
1,581✔
3449
      if (iCol >= nLastCol) {
1,071!
3450
        break;
×
3451
      }
3452
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
1,071✔
3453
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
1,071!
3454
        continue;
514✔
3455
      }
3456
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
1,071!
3457
        continue;
×
3458
      }
3459
      if (slotIds[iCol] == 0) {
1,071✔
3460
        STColumn *pTColumn = &pTSchema->columns[0];
513✔
3461
        *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
513✔
3462

3463
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
513✔
3464
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
513!
3465

3466
        taosArraySet(pColArray, 0, &colTmp);
514✔
3467
        continue;
514✔
3468
      }
3469
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
558✔
3470

3471
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
558✔
3472
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
558!
3473

3474
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
557✔
3475
      if (aColIndex >= 0) {
557!
3476
        taosArrayRemove(aColArray, aColIndex);
557✔
3477
      }
3478
    }
3479

3480
    break;
510✔
3481
  } while (1);
3482

3483
  if (!hasRow) {
520✔
3484
    if (ignoreEarlierTs) {
10!
3485
      taosArrayDestroy(pColArray);
×
3486
      pColArray = NULL;
×
3487
    } else {
3488
      taosArrayClear(pColArray);
10✔
3489
    }
3490
  }
3491
  *ppLastArray = pColArray;
520✔
3492

3493
  nextRowIterClose(&iter);
520✔
3494
  taosArrayDestroy(aColArray);
525✔
3495

3496
  TAOS_RETURN(code);
525✔
3497

3498
_err:
×
3499
  nextRowIterClose(&iter);
×
3500

3501
  *ppLastArray = NULL;
×
3502
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
3503
  taosArrayDestroy(aColArray);
×
3504

3505
  if (code) {
×
3506
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3507
              tstrerror(code));
3508
  }
3509

3510
  TAOS_RETURN(code);
×
3511
}
3512

3513
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
×
3514

3515
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
355✔
3516
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
355✔
3517
}
356✔
3518

3519
#ifdef BUILD_NO_CALL
3520
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
3521
#endif
3522

3523
size_t tsdbCacheGetUsage(SVnode *pVnode) {
957,395✔
3524
  size_t usage = 0;
957,395✔
3525
  if (pVnode->pTsdb != NULL) {
957,395!
3526
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
957,395✔
3527
  }
3528

3529
  return usage;
957,395✔
3530
}
3531

3532
int32_t tsdbCacheGetElems(SVnode *pVnode) {
957,395✔
3533
  int32_t elems = 0;
957,395✔
3534
  if (pVnode->pTsdb != NULL) {
957,395!
3535
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
957,395✔
3536
  }
3537

3538
  return elems;
957,395✔
3539
}
3540

3541
// block cache
3542
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
×
3543
  struct {
3544
    int32_t fid;
3545
    int64_t commitID;
3546
    int64_t blkno;
3547
  } bKey = {0};
×
3548

3549
  bKey.fid = fid;
×
3550
  bKey.commitID = commitID;
×
3551
  bKey.blkno = blkno;
×
3552

3553
  *len = sizeof(bKey);
×
3554
  memcpy(key, &bKey, *len);
×
3555
}
×
3556

3557
static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
×
3558
  int32_t code = 0;
×
3559

3560
  int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
×
3561

3562
  TAOS_CHECK_RETURN(tcsGetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock));
×
3563

3564
  tsdbTrace("block:%p load from s3", *ppBlock);
×
3565

3566
_exit:
×
3567
  return code;
×
3568
}
3569

3570
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
×
3571
  (void)ud;
3572
  uint8_t *pBlock = (uint8_t *)value;
×
3573

3574
  taosMemoryFree(pBlock);
×
3575
}
×
3576

3577
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
3578
  int32_t code = 0;
×
3579
  char    key[128] = {0};
×
3580
  int     keyLen = 0;
×
3581

3582
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
3583
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
3584
  if (!h) {
×
3585
    STsdb *pTsdb = pFD->pTsdb;
×
3586
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
3587

3588
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
3589
    if (!h) {
×
3590
      uint8_t *pBlock = NULL;
×
3591
      code = tsdbCacheLoadBlockS3(pFD, &pBlock);
×
3592
      //  if table's empty or error, return code of -1
3593
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
3594
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
3595

3596
        *handle = NULL;
×
3597
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
3598
          code = TSDB_CODE_OUT_OF_MEMORY;
×
3599
        }
3600

3601
        TAOS_RETURN(code);
×
3602
      }
3603

3604
      size_t              charge = tsS3BlockSize * pFD->szPage;
×
3605
      _taos_lru_deleter_t deleter = deleteBCache;
×
3606
      LRUStatus           status =
3607
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
3608
      if (status != TAOS_LRU_STATUS_OK) {
3609
        // code = -1;
3610
      }
3611
    }
3612

3613
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
3614
  }
3615

3616
  *handle = h;
×
3617

3618
  TAOS_RETURN(code);
×
3619
}
3620

3621
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
×
3622
  int32_t code = 0;
×
3623
  char    key[128] = {0};
×
3624
  int     keyLen = 0;
×
3625

3626
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
3627
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
×
3628

3629
  return code;
×
3630
}
3631

3632
void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
×
3633
  char       key[128] = {0};
×
3634
  int        keyLen = 0;
×
3635
  LRUHandle *handle = NULL;
×
3636

3637
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
3638
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
×
3639
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
×
3640
  if (!handle) {
×
3641
    size_t              charge = pFD->szPage;
×
3642
    _taos_lru_deleter_t deleter = deleteBCache;
×
3643
    uint8_t            *pPg = taosMemoryMalloc(charge);
×
3644
    if (!pPg) {
×
3645
      return;  // ignore error with s3 cache and leave error untouched
×
3646
    }
3647
    memcpy(pPg, pPage, charge);
×
3648

3649
    LRUStatus status =
3650
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
×
3651
    if (status != TAOS_LRU_STATUS_OK) {
3652
      // ignore cache updating if not ok
3653
      // code = TSDB_CODE_OUT_OF_MEMORY;
3654
    }
3655
  }
3656
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
3657

3658
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
3659
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc