• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.29
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tcs.h"
17
#include "tsdb.h"
18
#include "tsdbDataFileRW.h"
19
#include "tsdbIter.h"
20
#include "tsdbReadUtil.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
1,549,889✔
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
1,549,889✔
27
    tsdbTrace(" release lru cache failed");
231,046!
28
  }
29
}
1,549,961✔
30

31
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
10,159✔
32
  int32_t    code = 0, lino = 0;
10,159✔
33
  int32_t    szPage = pTsdb->pVnode->config.tsdbPageSize;
10,159✔
34
  int64_t    szBlock = tsS3BlockSize <= 1024 ? 1024 : tsS3BlockSize;
10,159✔
35
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szBlock * szPage, 0, .5);
10,159✔
36
  if (pCache == NULL) {
10,178!
37
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
38
  }
39

40
  taosLRUCacheSetStrictCapacity(pCache, false);
10,178✔
41

42
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
10,180✔
43

44
  pTsdb->bCache = pCache;
10,180✔
45

46
_err:
10,180✔
47
  if (code) {
10,180!
48
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
49
              tstrerror(code));
50
  }
51

52
  TAOS_RETURN(code);
10,180✔
53
}
54

55
static void tsdbCloseBCache(STsdb *pTsdb) {
10,180✔
56
  SLRUCache *pCache = pTsdb->bCache;
10,180✔
57
  if (pCache) {
10,180!
58
    int32_t elems = taosLRUCacheGetElems(pCache);
10,180✔
59
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
10,180✔
60
    taosLRUCacheEraseUnrefEntries(pCache);
10,180✔
61
    elems = taosLRUCacheGetElems(pCache);
10,180✔
62
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
10,180✔
63

64
    taosLRUCacheCleanup(pCache);
10,180✔
65

66
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
10,180✔
67
  }
68
}
10,180✔
69

70
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
10,176✔
71
  int32_t code = 0, lino = 0;
10,176✔
72
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
10,176✔
73

74
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3PageCacheSize * szPage, 0, .5);
10,176✔
75
  if (pCache == NULL) {
10,180!
76
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
77
  }
78

79
  taosLRUCacheSetStrictCapacity(pCache, false);
10,180✔
80

81
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
10,180✔
82

83
  pTsdb->pgCache = pCache;
10,180✔
84

85
_err:
10,180✔
86
  if (code) {
10,180!
87
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
88
  }
89

90
  TAOS_RETURN(code);
10,180✔
91
}
92

93
static void tsdbClosePgCache(STsdb *pTsdb) {
10,180✔
94
  SLRUCache *pCache = pTsdb->pgCache;
10,180✔
95
  if (pCache) {
10,180!
96
    int32_t elems = taosLRUCacheGetElems(pCache);
10,180✔
97
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
10,180✔
98
    taosLRUCacheEraseUnrefEntries(pCache);
10,180✔
99
    elems = taosLRUCacheGetElems(pCache);
10,180✔
100
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
10,180✔
101

102
    taosLRUCacheCleanup(pCache);
10,180✔
103

104
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
10,180✔
105
  }
106
}
10,180✔
107

108
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
109

110
enum {
111
  LFLAG_LAST_ROW = 0,
112
  LFLAG_LAST = 1,
113
};
114

115
typedef struct {
116
  tb_uid_t uid;
117
  int16_t  cid;
118
  int8_t   lflag;
119
} SLastKey;
120

121
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
122
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
123

124
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
10,157✔
125
  SVnode *pVnode = pTsdb->pVnode;
10,157✔
126
  vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
10,157✔
127

128
  int32_t offset = strlen(path);
10,160✔
129
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP);
10,160✔
130
}
10,160✔
131

132
static const char *myCmpName(void *state) {
54,986✔
133
  (void)state;
134
  return "myCmp";
54,986✔
135
}
136

137
static void myCmpDestroy(void *state) { (void)state; }
10,179✔
138

139
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
36,330,143✔
140
  (void)state;
141
  (void)alen;
142
  (void)blen;
143
  SLastKey *lhs = (SLastKey *)a;
36,330,143✔
144
  SLastKey *rhs = (SLastKey *)b;
36,330,143✔
145

146
  if (lhs->uid < rhs->uid) {
36,330,143✔
147
    return -1;
15,965,385✔
148
  } else if (lhs->uid > rhs->uid) {
20,364,758✔
149
    return 1;
5,501,385✔
150
  }
151

152
  if (lhs->cid < rhs->cid) {
14,863,373✔
153
    return -1;
3,034,047✔
154
  } else if (lhs->cid > rhs->cid) {
11,829,326✔
155
    return 1;
3,014,259✔
156
  }
157

158
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
8,815,067✔
159
    return -1;
1,303,347✔
160
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
7,511,720✔
161
    return 1;
3,351,008✔
162
  }
163

164
  return 0;
4,160,712✔
165
}
166

167
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
10,153✔
168
  int32_t code = 0, lino = 0;
10,153✔
169

170
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
10,153✔
171
  if (NULL == cmp) {
10,175!
172
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
173
  }
174

175
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
10,175✔
176
  pTsdb->rCache.tableoptions = tableoptions;
10,180✔
177

178
  rocksdb_options_t *options = rocksdb_options_create();
10,180✔
179
  if (NULL == options) {
10,179!
UNCOV
180
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
181
  }
182

183
  rocksdb_options_set_create_if_missing(options, 1);
10,179✔
184
  rocksdb_options_set_comparator(options, cmp);
10,178✔
185
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
10,178✔
186
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
10,180✔
187
  // rocksdb_options_set_inplace_update_support(options, 1);
188
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
189

190
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
10,178✔
191
  if (NULL == writeoptions) {
10,177!
UNCOV
192
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
193
  }
194
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
10,177✔
195

196
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
10,178✔
197
  if (NULL == readoptions) {
10,175!
UNCOV
198
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
199
  }
200

201
  char *err = NULL;
10,175✔
202
  char  cachePath[TSDB_FILENAME_LEN] = {0};
10,175✔
203
  tsdbGetRocksPath(pTsdb, cachePath);
10,175✔
204

205
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
10,138✔
206
  if (NULL == db) {
10,180!
UNCOV
207
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
UNCOV
208
    rocksdb_free(err);
×
209

UNCOV
210
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
211
  }
212

213
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
10,180✔
214
  if (NULL == flushoptions) {
10,180!
UNCOV
215
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
216
  }
217

218
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
10,180✔
219

220
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
10,178!
221

222
  pTsdb->rCache.writebatch = writebatch;
10,172✔
223
  pTsdb->rCache.my_comparator = cmp;
10,172✔
224
  pTsdb->rCache.options = options;
10,172✔
225
  pTsdb->rCache.writeoptions = writeoptions;
10,172✔
226
  pTsdb->rCache.readoptions = readoptions;
10,172✔
227
  pTsdb->rCache.flushoptions = flushoptions;
10,172✔
228
  pTsdb->rCache.db = db;
10,172✔
229
  pTsdb->rCache.sver = -1;
10,172✔
230
  pTsdb->rCache.suid = -1;
10,172✔
231
  pTsdb->rCache.uid = -1;
10,172✔
232
  pTsdb->rCache.pTSchema = NULL;
10,172✔
233
  pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
10,172✔
234
  if (!pTsdb->rCache.ctxArray) {
10,172!
UNCOV
235
    TAOS_CHECK_GOTO(terrno, &lino, _err7);
×
236
  }
237

238
  TAOS_RETURN(code);
10,172✔
239

240
_err7:
×
241
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
×
242
_err6:
×
243
  rocksdb_writebatch_destroy(writebatch);
×
244
_err5:
×
245
  rocksdb_close(pTsdb->rCache.db);
×
246
_err4:
×
247
  rocksdb_readoptions_destroy(readoptions);
×
248
_err3:
×
249
  rocksdb_writeoptions_destroy(writeoptions);
×
250
_err2:
×
251
  rocksdb_options_destroy(options);
×
252
  rocksdb_block_based_options_destroy(tableoptions);
×
253
_err:
×
UNCOV
254
  rocksdb_comparator_destroy(cmp);
×
255

UNCOV
256
  TAOS_RETURN(code);
×
257
}
258

259
static void tsdbCloseRocksCache(STsdb *pTsdb) {
10,180✔
260
  rocksdb_close(pTsdb->rCache.db);
10,180✔
261
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
10,177✔
262
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
10,177✔
263
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
10,180✔
264
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
10,180✔
265
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
10,177✔
266
  rocksdb_options_destroy(pTsdb->rCache.options);
10,177✔
267
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
10,180✔
268
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
10,179✔
269
  taosMemoryFree(pTsdb->rCache.pTSchema);
10,180!
270
  taosArrayDestroy(pTsdb->rCache.ctxArray);
10,179✔
271
}
10,180✔
272

273
static void rocksMayWrite(STsdb *pTsdb, bool force) {
1,457,975✔
274
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
1,457,975✔
275

276
  int count = rocksdb_writebatch_count(wb);
1,457,975✔
277
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
1,458,043!
278
    char *err = NULL;
6,440✔
279

280
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
6,440✔
281
    if (NULL != err) {
6,439!
UNCOV
282
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
283
                err);
UNCOV
284
      rocksdb_free(err);
×
285
    }
286

287
    rocksdb_writebatch_clear(wb);
6,439✔
288
  }
289
}
1,458,043✔
290

291
typedef struct {
292
  TSKEY  ts;
293
  int8_t dirty;
294
  struct {
295
    int16_t cid;
296
    int8_t  type;
297
    int8_t  flag;
298
    union {
299
      int64_t val;
300
      struct {
301
        uint32_t nData;
302
        uint8_t *pData;
303
      };
304
    } value;
305
  } colVal;
306
} SLastColV0;
307

308
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
25,889✔
309
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
25,889✔
310

311
  pLastCol->rowKey.ts = pLastColV0->ts;
25,889✔
312
  pLastCol->rowKey.numOfPKs = 0;
25,889✔
313
  pLastCol->dirty = pLastColV0->dirty;
25,889✔
314
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
25,889✔
315
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
25,889✔
316
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
25,889✔
317

318
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
25,889✔
319

320
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
25,889!
321
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
22✔
322
    pLastCol->colVal.value.pData = NULL;
22✔
323
    if (pLastCol->colVal.value.nData > 0) {
22✔
324
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
10✔
325
    }
326
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
22✔
327
  } else {
328
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
25,867✔
329
    return sizeof(SLastColV0);
25,867✔
330
  }
331
}
332

333
static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
25,889✔
334
  if (!value) {
25,889!
UNCOV
335
    return TSDB_CODE_INVALID_PARA;
×
336
  }
337

338
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
25,889!
339
  if (NULL == pLastCol) {
25,889!
UNCOV
340
    return terrno;
×
341
  }
342

343
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
25,889✔
344
  if (offset == size) {
25,889!
345
    // version 0
UNCOV
346
    *ppLastCol = pLastCol;
×
347

UNCOV
348
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
349
  } else if (offset > size) {
25,889!
UNCOV
350
    taosMemoryFreeClear(pLastCol);
×
351

UNCOV
352
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
353
  }
354

355
  // version
356
  int8_t version = *(int8_t *)(value + offset);
25,889✔
357
  offset += sizeof(int8_t);
25,889✔
358

359
  // numOfPKs
360
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
25,889✔
361
  offset += sizeof(uint8_t);
25,889✔
362

363
  // pks
364
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
51,490✔
365
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
25,601✔
366
    offset += sizeof(SValue);
25,601✔
367

368
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
25,601!
369
      pLastCol->rowKey.pks[i].pData = NULL;
×
370
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
371
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
UNCOV
372
        offset += pLastCol->rowKey.pks[i].nData;
×
373
      }
374
    }
375
  }
376

377
  if (version >= LAST_COL_VERSION_2) {
25,889!
378
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
25,889✔
379
  }
380

381
  if (offset > size) {
25,889!
UNCOV
382
    taosMemoryFreeClear(pLastCol);
×
383

UNCOV
384
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
385
  }
386

387
  *ppLastCol = pLastCol;
25,889✔
388

389
  TAOS_RETURN(TSDB_CODE_SUCCESS);
25,889✔
390
}
391

392
/*
393
typedef struct {
394
  SLastColV0 lastColV0;
395
  char       colData[];
396
  int8_t     version;
397
  uint8_t    numOfPKs;
398
  SValue     pks[0];
399
  char       pk0Data[];
400
  SValue     pks[1];
401
  char       pk1Data[];
402
  ...
403
} SLastColDisk;
404
*/
405
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
1,362,121✔
406
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
1,362,121✔
407

408
  pLastColV0->ts = pLastCol->rowKey.ts;
1,362,121✔
409
  pLastColV0->dirty = pLastCol->dirty;
1,362,121✔
410
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
1,362,121✔
411
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
1,362,121✔
412
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
1,362,121✔
413
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
1,362,121!
414
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
3,121✔
415
    if (pLastCol->colVal.value.nData > 0) {
3,121✔
416
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
2,299✔
417
    }
418
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
3,121✔
419
  } else {
420
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
1,359,000✔
421
    return sizeof(SLastColV0);
1,359,000✔
422
  }
423

424
  return 0;
425
}
426

427
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
1,362,122✔
428
  *size = sizeof(SLastColV0);
1,362,122✔
429
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
1,362,122!
430
    *size += pLastCol->colVal.value.nData;
3,146✔
431
  }
432
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
1,362,122✔
433

434
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
2,699,991✔
435
    *size += sizeof(SValue);
1,337,869✔
436
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
1,337,869!
437
      *size += pLastCol->rowKey.pks[i].nData;
4✔
438
    }
439
  }
440

441
  *value = taosMemoryMalloc(*size);
1,362,122!
442
  if (NULL == *value) {
1,362,129!
UNCOV
443
    TAOS_RETURN(terrno);
×
444
  }
445

446
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
1,362,129✔
447

448
  // version
449
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
1,362,122✔
450
  offset++;
1,362,122✔
451

452
  // numOfPKs
453
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
1,362,122✔
454
  offset++;
1,362,122✔
455

456
  // pks
457
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
2,699,980✔
458
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
1,337,858✔
459
    offset += sizeof(SValue);
1,337,858✔
460
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
1,337,858!
461
      if (pLastCol->rowKey.pks[i].nData > 0) {
4!
462
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
4✔
463
      }
464
      offset += pLastCol->rowKey.pks[i].nData;
4✔
465
    }
466
  }
467

468
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
1,362,122✔
469

470
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,362,122✔
471
}
472

473
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
474

475
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
17,800,208✔
476
  SLastCol *pLastCol = (SLastCol *)value;
17,800,208✔
477

478
  if (pLastCol->dirty) {
17,800,208✔
479
    STsdb *pTsdb = (STsdb *)ud;
1,323,329✔
480

481
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
1,323,329✔
482
    if (code) {
1,323,357!
483
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
484
      return code;
×
485
    }
486

487
    pLastCol->dirty = 0;
1,323,357✔
488

489
    rocksMayWrite(pTsdb, false);
1,323,357✔
490
  }
491

492
  return 0;
17,800,897✔
493
}
494

495
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
1,460,040✔
496
  bool deleted = false;
1,460,040✔
497
  while (*iSkyline > 0) {
1,461,216✔
498
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
362,191✔
499
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
362,189✔
500

501
    if (key->ts > pItemBack->ts) {
362,186✔
502
      return false;
348,759✔
503
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
13,427!
504
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
13,385!
505
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
506
        return true;
12,209✔
507
      } else {
508
        if (*iSkyline > 1) {
1,176!
509
          --*iSkyline;
1,176✔
510
        } else {
511
          return false;
×
512
        }
513
      }
514
    } else {
515
      if (*iSkyline > 1) {
42!
UNCOV
516
        --*iSkyline;
×
517
      } else {
518
        return false;
42✔
519
      }
520
    }
521
  }
522

523
  return deleted;
1,099,025✔
524
}
525

526
// Get next non-deleted row from imem
527
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
405,523✔
528
  int32_t code = 0;
405,523✔
529

530
  if (tsdbTbDataIterNext(pTbIter)) {
405,523✔
531
    TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
405,357✔
532
    TSDBKEY  rowKey = TSDBROW_KEY(pMemRow);
405,357✔
533
    bool     deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
405,357✔
534
    if (!deleted) {
405,059✔
535
      return pMemRow;
399,133✔
536
    }
537
  }
538

539
  return NULL;
5,972✔
540
}
541

542
// Get first non-deleted row from imem
543
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
188,717✔
544
                                    int64_t *piSkyline) {
545
  int32_t code = 0;
188,717✔
546

547
  tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
188,717✔
548
  TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
188,716✔
549
  if (pMemRow) {
188,716✔
550
    // if non deleted, return the found row.
551
    TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
188,547!
552
    bool    deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
188,547✔
553
    if (!deleted) {
188,547✔
554
      return pMemRow;
182,245✔
555
    }
556
  } else {
557
    return NULL;
169✔
558
  }
559

560
  // continue to find the non-deleted first row from imem, using get next row
561
  return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
6,302✔
562
}
563

564
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
134✔
565
  SRocksCache *pRCache = &pTsdb->rCache;
134✔
566
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
134!
567

UNCOV
568
  if (suid > 0 && suid == pRCache->suid) {
×
UNCOV
569
    pRCache->sver = -1;
×
UNCOV
570
    pRCache->suid = -1;
×
571
  }
UNCOV
572
  if (suid == 0 && uid == pRCache->uid) {
×
UNCOV
573
    pRCache->sver = -1;
×
UNCOV
574
    pRCache->uid = -1;
×
575
  }
576
}
577

578
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
398,700✔
579
  SRocksCache *pRCache = &pTsdb->rCache;
398,700✔
580
  if (pRCache->pTSchema && sver == pRCache->sver) {
398,700!
581
    if (suid > 0 && suid == pRCache->suid) {
398,632✔
582
      return 0;
398,599✔
583
    }
584
    if (suid == 0 && uid == pRCache->uid) {
33✔
585
      return 0;
17✔
586
    }
587
  }
588

589
  pRCache->suid = suid;
84✔
590
  pRCache->uid = uid;
84✔
591
  pRCache->sver = sver;
84✔
592
  tDestroyTSchema(pRCache->pTSchema);
84!
593
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema);
84✔
594
}
595

596
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
597

598
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
188,713✔
599
  int32_t     code = 0;
188,713✔
600
  int32_t     lino = 0;
188,713✔
601
  STsdb      *pTsdb = imem->pTsdb;
188,713✔
602
  SArray     *pMemDelData = NULL;
188,713✔
603
  SArray     *pSkyline = NULL;
188,713✔
604
  int64_t     iSkyline = 0;
188,713✔
605
  STbDataIter tbIter = {0};
188,713✔
606
  TSDBROW    *pMemRow = NULL;
188,713✔
607
  STSchema   *pTSchema = NULL;
188,713✔
608
  SSHashObj  *iColHash = NULL;
188,713✔
609
  int32_t     sver;
610
  int32_t     nCol;
611
  SArray     *ctxArray = pTsdb->rCache.ctxArray;
188,713✔
612
  STsdbRowKey tsdbRowKey = {0};
188,713✔
613

614
  STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
188,713✔
615

616
  // load imem tomb data and build skyline
617
  TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
188,716!
618

619
  // tsdbBuildDeleteSkyline
620
  size_t delSize = TARRAY_SIZE(pMemDelData);
188,716✔
621
  if (delSize > 0) {
188,716✔
622
    pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
171,823✔
623
    if (!pSkyline) {
171,825!
UNCOV
624
      TAOS_CHECK_EXIT(terrno);
×
625
    }
626

627
    TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline));
171,825!
628
    iSkyline = taosArrayGetSize(pSkyline) - 1;
171,824✔
629
  }
630

631
  pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
188,717✔
632
  if (!pMemRow) {
188,715✔
633
    goto _exit;
6,019✔
634
  }
635

636
  // iter first row to last_row/last col values to ctxArray, and mark last null col ids
637
  sver = TSDBROW_SVERSION(pMemRow);
182,696!
638
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
182,696!
639
  pTSchema = pTsdb->rCache.pTSchema;
182,698✔
640
  nCol = pTSchema->numOfCols;
182,698✔
641

642
  tsdbRowGetKey(pMemRow, &tsdbRowKey);
182,698✔
643

644
  STSDBRowIter iter = {0};
182,695✔
645
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
182,695!
646

647
  int32_t iCol = 0;
182,696✔
648
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
919,397!
649
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
736,690✔
650
    if (!taosArrayPush(ctxArray, &updateCtx)) {
736,704!
UNCOV
651
      TAOS_CHECK_EXIT(terrno);
×
652
    }
653

654
    if (COL_VAL_IS_VALUE(pColVal)) {
736,704✔
655
      updateCtx.lflag = LFLAG_LAST;
720,265✔
656
      if (!taosArrayPush(ctxArray, &updateCtx)) {
720,262!
UNCOV
657
        TAOS_CHECK_EXIT(terrno);
×
658
      }
659
    } else {
660
      if (!iColHash) {
16,439✔
661
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
15,976✔
662
        if (iColHash == NULL) {
15,976!
UNCOV
663
          TAOS_CHECK_EXIT(terrno);
×
664
        }
665
      }
666

667
      if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
16,439!
UNCOV
668
        TAOS_CHECK_EXIT(terrno);
×
669
      }
670
    }
671
  }
672
  tsdbRowClose(&iter);
182,700✔
673

674
  // continue to get next row to fill null last col values
675
  pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
182,698✔
676
  while (pMemRow) {
398,775✔
677
    if (tSimpleHashGetSize(iColHash) == 0) {
398,667✔
678
      break;
182,582✔
679
    }
680

681
    sver = TSDBROW_SVERSION(pMemRow);
216,031!
682
    TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
216,031!
683
    pTSchema = pTsdb->rCache.pTSchema;
216,039✔
684

685
    STsdbRowKey tsdbRowKey = {0};
216,039✔
686
    tsdbRowGetKey(pMemRow, &tsdbRowKey);
216,039✔
687

688
    STSDBRowIter iter = {0};
215,587✔
689
    TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
215,587!
690

691
    int32_t iCol = 0;
215,497✔
692
    for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
2,396,500!
693
      if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
2,187,993✔
694
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
16,379✔
695
        if (!taosArrayPush(ctxArray, &updateCtx)) {
16,379!
UNCOV
696
          TAOS_CHECK_EXIT(terrno);
×
697
        }
698

699
        TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
16,379!
700
      }
701
    }
702
    tsdbRowClose(&iter);
206,015✔
703

704
    pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
216,612✔
705
  }
706

707
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
182,690!
708

709
_exit:
182,697✔
710
  if (code) {
188,716!
UNCOV
711
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
712

UNCOV
713
    tsdbRowClose(&iter);
×
714
  }
715

716
  taosArrayClear(ctxArray);
188,716✔
717
  // destroy any allocated resource
718
  tSimpleHashCleanup(iColHash);
188,715✔
719
  if (pMemDelData) {
188,714!
720
    taosArrayDestroy(pMemDelData);
188,714✔
721
  }
722
  if (pSkyline) {
188,718✔
723
    taosArrayDestroy(pSkyline);
171,824✔
724
  }
725

726
  TAOS_RETURN(code);
188,717✔
727
}
728

729
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
38,754✔
730
  if (!pTsdb) return 0;
38,754!
731
  if (!pTsdb->imem) return 0;
38,754✔
732

733
  int32_t    code = 0;
1,617✔
734
  int32_t    lino = 0;
1,617✔
735
  SMemTable *imem = pTsdb->imem;
1,617✔
736
  int32_t    nTbData = imem->nTbData;
1,617✔
737
  int64_t    nRow = imem->nRow;
1,617✔
738
  int64_t    nDel = imem->nDel;
1,617✔
739

740
  if (nRow == 0 || nTbData == 0) return 0;
1,617!
741

742
  TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
1,618!
743

744
_exit:
1,618✔
745
  if (code) {
1,618!
746
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
747
  } else {
748
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
1,618!
749
  }
750

751
  TAOS_RETURN(code);
1,618✔
752
}
753

754
int32_t tsdbCacheCommit(STsdb *pTsdb) {
38,752✔
755
  int32_t code = 0;
38,752✔
756

757
  // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
758
  // flush dirty data of lru into rocks
759
  // 4, and update when writing if !updateCacheBatch
760
  // 5, merge cache & mem if updateCacheBatch
761

762
  if (tsUpdateCacheBatch) {
38,752!
763
    code = tsdbCacheUpdateFromIMem(pTsdb);
38,754✔
764
    if (code) {
38,755!
UNCOV
765
      tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
766

UNCOV
767
      TAOS_RETURN(code);
×
768
    }
769
  }
770

771
  char                 *err = NULL;
38,753✔
772
  SLRUCache            *pCache = pTsdb->lruCache;
38,753✔
773
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
38,753✔
774

775
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
38,753✔
776

777
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
38,756✔
778

779
  rocksMayWrite(pTsdb, true);
38,756✔
780
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
38,756✔
781

782
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
38,756✔
783

784
  if (NULL != err) {
38,756!
UNCOV
785
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
UNCOV
786
    rocksdb_free(err);
×
UNCOV
787
    code = TSDB_CODE_FAILED;
×
788
  }
789

790
  TAOS_RETURN(code);
38,756✔
791
}
792

793
static int32_t reallocVarDataVal(SValue *pValue) {
4,795✔
794
  if (IS_VAR_DATA_TYPE(pValue->type)) {
4,795!
795
    uint8_t *pVal = pValue->pData;
4,795✔
796
    uint32_t nData = pValue->nData;
4,795✔
797
    if (nData > 0) {
4,795✔
798
      uint8_t *p = taosMemoryMalloc(nData);
2,859!
799
      if (!p) {
2,859!
UNCOV
800
        TAOS_RETURN(terrno);
×
801
      }
802
      pValue->pData = p;
2,859✔
803
      (void)memcpy(pValue->pData, pVal, nData);
2,859✔
804
    } else {
805
      pValue->pData = NULL;
1,936✔
806
    }
807
  }
808

809
  TAOS_RETURN(TSDB_CODE_SUCCESS);
4,795✔
810
}
811

812
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
4,791✔
813

814
// realloc pk data and col data.
815
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
1,383,530✔
816
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
1,383,530✔
817
  size_t  charge = sizeof(SLastCol);
1,383,530✔
818

819
  int8_t i = 0;
1,383,530✔
820
  for (; i < pCol->rowKey.numOfPKs; i++) {
2,721,467✔
821
    SValue *pValue = &pCol->rowKey.pks[i];
1,337,917✔
822
    if (IS_VAR_DATA_TYPE(pValue->type)) {
1,337,917!
UNCOV
823
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
×
824
      charge += pValue->nData;
4✔
825
    }
826
  }
827

828
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
1,383,550!
829
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
4,794!
830
    charge += pCol->colVal.value.nData;
4,791✔
831
  }
832

833
  if (pCharge) {
1,383,547✔
834
    *pCharge = charge;
1,368,291✔
835
  }
836

837
_exit:
15,256✔
838
  if (TSDB_CODE_SUCCESS != code) {
1,383,547!
UNCOV
839
    for (int8_t j = 0; j < i; j++) {
×
840
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
UNCOV
841
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
842
      }
843
    }
844

845
    (void)memset(pCol, 0, sizeof(SLastCol));
×
846
  }
847

848
  TAOS_RETURN(code);
1,383,547✔
849
}
850

851
void tsdbCacheFreeSLastColItem(void *pItem) {
17,075✔
852
  SLastCol *pCol = (SLastCol *)pItem;
17,075✔
853
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
17,151✔
854
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
76!
855
      taosMemoryFree(pCol->rowKey.pks[i].pData);
×
856
    }
857
  }
858

859
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) {
17,075!
860
    taosMemoryFree(pCol->colVal.value.pData);
713!
861
  }
862
}
17,076✔
863

864
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
1,368,136✔
865
  SLastCol *pLastCol = (SLastCol *)value;
1,368,136✔
866

867
  if (pLastCol->dirty) {
1,368,136✔
868
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
3,050!
UNCOV
869
      STsdb *pTsdb = (STsdb *)ud;
×
UNCOV
870
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
871
    }
872
  }
873

874
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
2,705,756✔
875
    SValue *pValue = &pLastCol->rowKey.pks[i];
1,337,666✔
876
    if (IS_VAR_DATA_TYPE(pValue->type)) {
1,337,666!
877
      taosMemoryFree(pValue->pData);
9!
878
    }
879
  }
880

881
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) /* && pLastCol->colVal.value.nData > 0*/) {
1,368,090!
882
    taosMemoryFree(pLastCol->colVal.value.pData);
3,430!
883
  }
884

885
  taosMemoryFree(value);
1,368,087!
886
}
1,368,269✔
887

888
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
1,318,697✔
889
  SLastCol *pLastCol = (SLastCol *)value;
1,318,697✔
890
  pLastCol->dirty = 0;
1,318,697✔
891
}
1,318,697✔
892

893
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
894

895
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
10,689✔
896
  int32_t code = 0, lino = 0;
10,689✔
897

898
  SLRUCache            *pCache = pTsdb->lruCache;
10,689✔
899
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
10,689✔
900
  SRowKey               emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
10,689✔
901
  SLastCol              emptyCol = {
10,689✔
902
                   .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
903

904
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
10,689✔
905
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
10,689✔
906
  if (code) {
10,701!
UNCOV
907
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
908
  }
909

910
  TAOS_RETURN(code);
10,701✔
911
}
912

913
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
2,107✔
914
  int32_t code = 0;
2,107✔
915
  char   *err = NULL;
2,107✔
916

917
  SLRUCache            *pCache = pTsdb->lruCache;
2,107✔
918
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
2,107✔
919

920
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
2,107✔
921

922
  rocksMayWrite(pTsdb, true);
2,109✔
923
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
2,108✔
924

925
  if (NULL != err) {
2,109!
UNCOV
926
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
UNCOV
927
    rocksdb_free(err);
×
UNCOV
928
    code = TSDB_CODE_FAILED;
×
929
  }
930

931
  TAOS_RETURN(code);
2,109✔
932
}
933

934
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
149,634✔
935
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
936
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
149,634!
937
  if (!valuesList) return terrno;
149,639!
938
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
149,639!
939
  if (!valuesListSizes) {
149,632!
UNCOV
940
    taosMemoryFreeClear(valuesList);
×
UNCOV
941
    return terrno;
×
942
  }
943
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
149,632✔
944
  if (!errs) {
149,623!
UNCOV
945
    taosMemoryFreeClear(valuesList);
×
UNCOV
946
    taosMemoryFreeClear(valuesListSizes);
×
UNCOV
947
    return terrno;
×
948
  }
949
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
149,623✔
950
                    valuesListSizes, errs);
951
  for (size_t i = 0; i < numKeys; ++i) {
723,903✔
952
    rocksdb_free(errs[i]);
574,269✔
953
  }
954
  taosMemoryFreeClear(errs);
149,634!
955

956
  *pppValuesList = valuesList;
149,637✔
957
  *ppValuesListSizes = valuesListSizes;
149,637✔
958
  TAOS_RETURN(TSDB_CODE_SUCCESS);
149,637✔
959
}
960

961
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
103,756✔
962
  int32_t code = 0;
103,756✔
963

964
  // build keys & multi get from rocks
965
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
103,756!
966
  if (!keys_list) {
103,766!
UNCOV
967
    return terrno;
×
968
  }
969
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
103,766!
970
  if (!keys_list_sizes) {
103,762!
UNCOV
971
    taosMemoryFree(keys_list);
×
972
    return terrno;
×
973
  }
974
  const size_t klen = ROCKS_KEY_LEN;
103,762✔
975

976
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
103,762!
977
  if (!keys) {
103,770!
978
    taosMemoryFree(keys_list);
×
UNCOV
979
    taosMemoryFree(keys_list_sizes);
×
UNCOV
980
    return terrno;
×
981
  }
982
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
103,770✔
983
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
103,770✔
984

985
  keys_list[0] = keys;
103,770✔
986
  keys_list[1] = keys + sizeof(SLastKey);
103,770✔
987
  keys_list_sizes[0] = klen;
103,770✔
988
  keys_list_sizes[1] = klen;
103,770✔
989

990
  char  **values_list = NULL;
103,770✔
991
  size_t *values_list_sizes = NULL;
103,770✔
992

993
  // was written by caller
994
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
995

996
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
103,770!
997
                                              &values_list_sizes),
998
                  NULL, _exit);
999

1000
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
103,741✔
1001
  {
1002
    SLastCol *pLastCol = NULL;
103,741✔
1003
    if (values_list[0] != NULL) {
103,741✔
1004
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
104✔
1005
      if (code != TSDB_CODE_SUCCESS) {
104!
UNCOV
1006
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1007
                  tstrerror(code));
UNCOV
1008
        goto _exit;
×
1009
      }
1010
      if (NULL != pLastCol) {
104!
1011
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
104✔
1012
      }
1013
      taosMemoryFreeClear(pLastCol);
104!
1014
    }
1015

1016
    pLastCol = NULL;
103,741✔
1017
    if (values_list[1] != NULL) {
103,741✔
1018
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
104✔
1019
      if (code != TSDB_CODE_SUCCESS) {
104!
UNCOV
1020
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1021
                  tstrerror(code));
UNCOV
1022
        goto _exit;
×
1023
      }
1024
      if (NULL != pLastCol) {
104!
1025
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
104✔
1026
      }
1027
      taosMemoryFreeClear(pLastCol);
104!
1028
    }
1029

1030
    rocksdb_free(values_list[0]);
103,741✔
1031
    rocksdb_free(values_list[1]);
103,769✔
1032

1033
    for (int i = 0; i < 2; i++) {
311,262✔
1034
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
207,494✔
1035
      if (h) {
207,506✔
1036
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
208✔
1037
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
208✔
1038
      }
1039
    }
1040
  }
1041

1042
_exit:
103,768✔
1043
  taosMemoryFree(keys_list[0]);
103,768!
1044

1045
  taosMemoryFree(keys_list);
103,768!
1046
  taosMemoryFree(keys_list_sizes);
103,771!
1047
  taosMemoryFree(values_list);
103,778!
1048
  taosMemoryFree(values_list_sizes);
103,773!
1049

1050
  TAOS_RETURN(code);
103,759✔
1051
}
1052

1053
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
582✔
1054
  int32_t code = 0;
582✔
1055

1056
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
582✔
1057

1058
  if (suid < 0) {
585✔
1059
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
263✔
1060
      int16_t cid = pSchemaRow->pSchema[i].colId;
235✔
1061
      int8_t  col_type = pSchemaRow->pSchema[i].type;
235✔
1062

1063
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
235✔
1064
      if (code != TSDB_CODE_SUCCESS) {
235!
UNCOV
1065
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1066
                  tstrerror(code));
1067
      }
1068
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
235✔
1069
      if (code != TSDB_CODE_SUCCESS) {
235!
UNCOV
1070
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1071
                  tstrerror(code));
1072
      }
1073
    }
1074
  } else {
1075
    STSchema *pTSchema = NULL;
557✔
1076
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
557✔
1077
    if (code != TSDB_CODE_SUCCESS) {
556!
UNCOV
1078
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1079

UNCOV
1080
      TAOS_RETURN(code);
×
1081
    }
1082

1083
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
5,513✔
1084
      int16_t cid = pTSchema->columns[i].colId;
4,954✔
1085
      int8_t  col_type = pTSchema->columns[i].type;
4,954✔
1086

1087
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
4,954✔
1088
      if (code != TSDB_CODE_SUCCESS) {
4,954!
UNCOV
1089
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1090
                  tstrerror(code));
1091
      }
1092
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
4,954✔
1093
      if (code != TSDB_CODE_SUCCESS) {
4,957!
UNCOV
1094
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1095
                  tstrerror(code));
1096
      }
1097
    }
1098

1099
    taosMemoryFree(pTSchema);
559!
1100
  }
1101

1102
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
585✔
1103

1104
  TAOS_RETURN(code);
585✔
1105
}
1106

1107
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
9✔
1108
  int32_t code = 0;
9✔
1109

1110
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
9✔
1111

1112
  code = tsdbCacheCommitNoLock(pTsdb);
9✔
1113
  if (code != TSDB_CODE_SUCCESS) {
9!
UNCOV
1114
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1115
              tstrerror(code));
1116
  }
1117

1118
  if (pSchemaRow != NULL) {
9!
1119
    bool hasPrimayKey = false;
×
UNCOV
1120
    int  nCols = pSchemaRow->nCols;
×
UNCOV
1121
    if (nCols >= 2) {
×
UNCOV
1122
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
1123
    }
UNCOV
1124
    for (int i = 0; i < nCols; ++i) {
×
UNCOV
1125
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
UNCOV
1126
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1127

UNCOV
1128
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
UNCOV
1129
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1130
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1131
                  tstrerror(code));
1132
      }
1133
    }
1134
  } else {
1135
    STSchema *pTSchema = NULL;
9✔
1136
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
9✔
1137
    if (code != TSDB_CODE_SUCCESS) {
9!
UNCOV
1138
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1139

UNCOV
1140
      TAOS_RETURN(code);
×
1141
    }
1142

1143
    bool hasPrimayKey = false;
9✔
1144
    int  nCols = pTSchema->numOfCols;
9✔
1145
    if (nCols >= 2) {
9!
1146
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
9✔
1147
    }
1148
    for (int i = 0; i < nCols; ++i) {
27✔
1149
      int16_t cid = pTSchema->columns[i].colId;
18✔
1150
      int8_t  col_type = pTSchema->columns[i].type;
18✔
1151

1152
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
18✔
1153
      if (code != TSDB_CODE_SUCCESS) {
18!
UNCOV
1154
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1155
                  tstrerror(code));
1156
      }
1157
    }
1158

1159
    taosMemoryFree(pTSchema);
9!
1160
  }
1161

1162
  rocksMayWrite(pTsdb, false);
9✔
1163

1164
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
9✔
1165

1166
  TAOS_RETURN(code);
9✔
1167
}
1168

1169
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
2,057✔
1170
  int32_t code = 0;
2,057✔
1171

1172
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
2,057✔
1173

1174
  code = tsdbCacheCommitNoLock(pTsdb);
2,068✔
1175
  if (code != TSDB_CODE_SUCCESS) {
2,064!
UNCOV
1176
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1177
              tstrerror(code));
1178
  }
1179

1180
  STSchema *pTSchema = NULL;
2,064✔
1181
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
2,064✔
1182
  if (code != TSDB_CODE_SUCCESS) {
2,067✔
1183
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
4✔
1184

UNCOV
1185
    TAOS_RETURN(code);
×
1186
  }
1187

1188
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
5,308✔
1189
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
3,244✔
1190

1191
    bool hasPrimayKey = false;
3,244✔
1192
    int  nCols = pTSchema->numOfCols;
3,244✔
1193
    if (nCols >= 2) {
3,244!
1194
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
3,244✔
1195
    }
1196

1197
    for (int i = 0; i < nCols; ++i) {
106,904✔
1198
      int16_t cid = pTSchema->columns[i].colId;
103,659✔
1199
      int8_t  col_type = pTSchema->columns[i].type;
103,659✔
1200

1201
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
103,659✔
1202
      if (code != TSDB_CODE_SUCCESS) {
103,660!
UNCOV
1203
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1204
                  tstrerror(code));
1205
      }
1206
    }
1207
  }
1208

1209
  taosMemoryFree(pTSchema);
2,064!
1210

1211
  rocksMayWrite(pTsdb, false);
2,067✔
1212

1213
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
2,067✔
1214

1215
  TAOS_RETURN(code);
2,068✔
1216
}
1217

UNCOV
1218
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
×
UNCOV
1219
  int32_t code = 0;
×
1220

UNCOV
1221
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1222

1223
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
UNCOV
1224
  if (code != TSDB_CODE_SUCCESS) {
×
1225
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1226
              tstrerror(code));
1227
  }
UNCOV
1228
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
UNCOV
1229
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1230
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1231
              tstrerror(code));
1232
  }
1233
  // rocksMayWrite(pTsdb, true, false, false);
UNCOV
1234
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1235

1236
  TAOS_RETURN(code);
×
1237
}
1238

1239
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
×
1240
  int32_t code = 0;
×
1241

UNCOV
1242
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1243

1244
  code = tsdbCacheCommitNoLock(pTsdb);
×
1245
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1246
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1247
              tstrerror(code));
1248
  }
1249

1250
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
UNCOV
1251
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1252
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1253
              tstrerror(code));
1254
  }
1255

UNCOV
1256
  rocksMayWrite(pTsdb, false);
×
1257

UNCOV
1258
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1259

UNCOV
1260
  TAOS_RETURN(code);
×
1261
}
1262

1263
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
63✔
1264
  int32_t code = 0;
63✔
1265

1266
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
63✔
1267

1268
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
224✔
1269
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
159✔
1270

1271
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
159✔
1272
    if (code != TSDB_CODE_SUCCESS) {
160!
UNCOV
1273
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1274
                tstrerror(code));
1275
    }
1276
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
160✔
1277
    if (code != TSDB_CODE_SUCCESS) {
160!
UNCOV
1278
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1279
                tstrerror(code));
1280
    }
1281
  }
1282

1283
  // rocksMayWrite(pTsdb, true, false, false);
1284
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
65✔
1285
  TAOS_RETURN(code);
64✔
1286
}
1287

1288
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
31✔
1289
  int32_t code = 0;
31✔
1290

1291
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
31✔
1292

1293
  code = tsdbCacheCommitNoLock(pTsdb);
32✔
1294
  if (code != TSDB_CODE_SUCCESS) {
32!
UNCOV
1295
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1296
              tstrerror(code));
1297
  }
1298

1299
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
112✔
1300
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
80✔
1301

1302
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
80✔
1303
    if (code != TSDB_CODE_SUCCESS) {
80!
UNCOV
1304
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1305
                tstrerror(code));
1306
    }
1307
  }
1308

1309
  rocksMayWrite(pTsdb, false);
32✔
1310

1311
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
32✔
1312

1313
  TAOS_RETURN(code);
32✔
1314
}
1315

1316
typedef struct {
1317
  int      idx;
1318
  SLastKey key;
1319
} SIdxKey;
1320

UNCOV
1321
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
×
1322
  // update rowkey
UNCOV
1323
  pLastCol->rowKey.ts = TSKEY_MIN;
×
UNCOV
1324
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
UNCOV
1325
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
UNCOV
1326
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
UNCOV
1327
      taosMemoryFreeClear(pPKValue->pData);
×
UNCOV
1328
      pPKValue->nData = 0;
×
1329
    } else {
UNCOV
1330
      pPKValue->val = 0;
×
1331
    }
1332
  }
UNCOV
1333
  pLastCol->rowKey.numOfPKs = 0;
×
1334

1335
  // update colval
UNCOV
1336
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
×
UNCOV
1337
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
UNCOV
1338
    pLastCol->colVal.value.nData = 0;
×
1339
  } else {
UNCOV
1340
    pLastCol->colVal.value.val = 0;
×
1341
  }
1342

UNCOV
1343
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
×
UNCOV
1344
  pLastCol->dirty = 1;
×
UNCOV
1345
  pLastCol->cacheStatus = cacheStatus;
×
UNCOV
1346
}
×
1347

1348
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
1,362,122✔
1349
  int32_t code = 0;
1,362,122✔
1350
  char   *rocks_value = NULL;
1,362,122✔
1351
  size_t  vlen = 0;
1,362,122✔
1352

1353
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
1,362,122✔
1354
  if (code) {
1,362,121!
UNCOV
1355
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
1356
    TAOS_RETURN(code);
×
1357
  }
1358

1359
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
1,362,121✔
1360
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
1,362,121✔
1361
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
1,362,172✔
1362
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
1,362,184✔
1363

1364
  taosMemoryFree(rocks_value);
1,362,193✔
1365

1366
  TAOS_RETURN(code);
1,362,177✔
1367
}
1368

1369
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
1,368,186✔
1370
  int32_t code = 0, lino = 0;
1,368,186✔
1371

1372
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
1,368,186!
1373
  if (!pLRULastCol) {
1,368,302!
UNCOV
1374
    return terrno;
×
1375
  }
1376

1377
  size_t charge = 0;
1,368,302✔
1378
  *pLRULastCol = *pLastCol;
1,368,302✔
1379
  pLRULastCol->dirty = dirty;
1,368,302✔
1380
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
1,368,302!
1381

1382
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
1,368,261✔
1383
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
1384
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
1,368,247!
UNCOV
1385
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
UNCOV
1386
    code = TSDB_CODE_FAILED;
×
UNCOV
1387
    pLRULastCol = NULL;
×
1388
  }
1389

1390
_exit:
1,368,247✔
1391
  if (TSDB_CODE_SUCCESS != code) {
1,368,247!
UNCOV
1392
    taosMemoryFree(pLRULastCol);
×
UNCOV
1393
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1394
  }
1395

1396
  TAOS_RETURN(code);
1,368,247✔
1397
}
1398

1399
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
182,689✔
1400
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
182,689!
UNCOV
1401
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1402
  }
1403

1404
  int32_t code = 0, lino = 0;
182,690✔
1405

1406
  int        num_keys = TARRAY_SIZE(updCtxArray);
182,690✔
1407
  SArray    *remainCols = NULL;
182,690✔
1408
  SLRUCache *pCache = pTsdb->lruCache;
182,690✔
1409

1410
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
182,690✔
1411
  for (int i = 0; i < num_keys; ++i) {
1,656,034✔
1412
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
1,473,324✔
1413
    int8_t          lflag = updCtx->lflag;
1,473,324✔
1414
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
1,473,324✔
1415
    SColVal        *pColVal = &updCtx->colVal;
1,473,324✔
1416

1417
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
1,473,324!
1418
      continue;
×
1419
    }
1420

1421
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
1,473,324✔
1422
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
1,473,324✔
1423
    if (h) {
1,473,321✔
1424
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
1,417,514✔
1425
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
1,417,474✔
1426
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
1,416,027✔
1427
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
1,415,950!
1428
          SLastCol newLastCol = {
1,318,399✔
1429
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1430
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
1,318,399✔
1431
        }
1432
      }
1433

1434
      tsdbLRUCacheRelease(pCache, h, false);
1,417,478✔
1435
      TAOS_CHECK_EXIT(code);
1,417,536!
1436
    } else {
1437
      if (!remainCols) {
55,807✔
1438
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
7,065✔
1439
        if (!remainCols) {
7,065!
UNCOV
1440
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1441
        }
1442
      }
1443
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
111,610!
UNCOV
1444
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1445
      }
1446
    }
1447
  }
1448

1449
  if (remainCols) {
182,710✔
1450
    num_keys = TARRAY_SIZE(remainCols);
7,065✔
1451
  }
1452
  if (remainCols && num_keys > 0) {
182,710!
1453
    char  **keys_list = NULL;
7,065✔
1454
    size_t *keys_list_sizes = NULL;
7,065✔
1455
    char  **values_list = NULL;
7,065✔
1456
    size_t *values_list_sizes = NULL;
7,065✔
1457
    char  **errs = NULL;
7,065✔
1458
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
7,065!
1459
    if (!keys_list) {
7,065!
UNCOV
1460
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
UNCOV
1461
      return terrno;
×
1462
    }
1463
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
7,065!
1464
    if (!keys_list_sizes) {
7,065!
UNCOV
1465
      taosMemoryFree(keys_list);
×
1466
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
UNCOV
1467
      return terrno;
×
1468
    }
1469
    for (int i = 0; i < num_keys; ++i) {
62,868✔
1470
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
55,803✔
1471

1472
      keys_list[i] = (char *)&idxKey->key;
55,803✔
1473
      keys_list_sizes[i] = ROCKS_KEY_LEN;
55,803✔
1474
    }
1475

1476
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
7,065✔
1477

1478
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
7,065✔
1479
                                       &values_list_sizes);
1480
    if (code) {
7,065!
1481
      taosMemoryFree(keys_list);
×
UNCOV
1482
      taosMemoryFree(keys_list_sizes);
×
UNCOV
1483
      goto _exit;
×
1484
    }
1485

1486
    rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
7,065✔
1487
    for (int i = 0; i < num_keys; ++i) {
62,868✔
1488
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
55,803✔
1489
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
55,803✔
1490
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
55,803✔
1491
      SColVal        *pColVal = &updCtx->colVal;
55,803✔
1492

1493
      SLastCol *pLastCol = NULL;
55,803✔
1494
      if (values_list[i] != NULL) {
55,803✔
1495
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
19,397✔
1496
        if (code != TSDB_CODE_SUCCESS) {
19,397!
1497
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1498
                    tstrerror(code));
UNCOV
1499
          goto _exit;
×
1500
        }
1501
      }
1502
      /*
1503
      if (code) {
1504
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1505
      }
1506
      */
1507
      SLastCol *pToFree = pLastCol;
55,803✔
1508

1509
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
55,803!
1510
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
1511
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1512
                    tstrerror(code));
1513
          taosMemoryFreeClear(pToFree);
×
UNCOV
1514
          break;
×
1515
        }
1516

1517
        // cache invalid => skip update
UNCOV
1518
        taosMemoryFreeClear(pToFree);
×
1519
        continue;
×
1520
      }
1521

1522
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
55,803!
UNCOV
1523
        taosMemoryFreeClear(pToFree);
×
UNCOV
1524
        continue;
×
1525
      }
1526

1527
      int32_t cmp_res = 1;
55,803✔
1528
      if (pLastCol) {
55,803✔
1529
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
19,397✔
1530
      }
1531

1532
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
55,803!
1533
        SLastCol lastColTmp = {
38,512✔
1534
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
1535
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
38,512!
UNCOV
1536
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1537
                    tstrerror(code));
UNCOV
1538
          taosMemoryFreeClear(pToFree);
×
UNCOV
1539
          break;
×
1540
        }
1541
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
38,512!
UNCOV
1542
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1543
                    tstrerror(code));
UNCOV
1544
          taosMemoryFreeClear(pToFree);
×
UNCOV
1545
          break;
×
1546
        }
1547
      }
1548

1549
      taosMemoryFreeClear(pToFree);
55,803!
1550
    }
1551

1552
    rocksMayWrite(pTsdb, false);
7,065✔
1553

1554
    taosMemoryFree(keys_list);
7,065!
1555
    taosMemoryFree(keys_list_sizes);
7,065!
1556
    if (values_list) {
7,065!
1557
      for (int i = 0; i < num_keys; ++i) {
62,868✔
1558
        rocksdb_free(values_list[i]);
55,803✔
1559
      }
1560
      taosMemoryFree(values_list);
7,065!
1561
    }
1562
    taosMemoryFree(values_list_sizes);
7,065!
1563
  }
1564

1565
_exit:
175,645✔
1566
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
182,710✔
1567
  taosArrayDestroy(remainCols);
182,698✔
1568

1569
  if (code) {
182,699!
UNCOV
1570
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1571
              tstrerror(code));
1572
  }
1573

1574
  TAOS_RETURN(code);
182,699✔
1575
}
1576

UNCOV
1577
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
×
1578
                                 SRow **aRow) {
UNCOV
1579
  int32_t code = 0, lino = 0;
×
1580

1581
  // 1. prepare last
UNCOV
1582
  TSDBROW      lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
×
UNCOV
1583
  STSchema    *pTSchema = NULL;
×
UNCOV
1584
  int32_t      sver = TSDBROW_SVERSION(&lRow);
×
UNCOV
1585
  SSHashObj   *iColHash = NULL;
×
1586
  STSDBRowIter iter = {0};
×
1587

UNCOV
1588
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
UNCOV
1589
  pTSchema = pTsdb->rCache.pTSchema;
×
1590

UNCOV
1591
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
×
UNCOV
1592
  int32_t nCol = pTSchema->numOfCols;
×
UNCOV
1593
  SArray *ctxArray = pTsdb->rCache.ctxArray;
×
1594

1595
  // 1. prepare by lrow
UNCOV
1596
  STsdbRowKey tsdbRowKey = {0};
×
UNCOV
1597
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1598

1599
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1600

UNCOV
1601
  int32_t iCol = 0;
×
UNCOV
1602
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
1603
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
UNCOV
1604
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
1605
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1606
    }
1607

UNCOV
1608
    if (COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
1609
      updateCtx.lflag = LFLAG_LAST;
×
UNCOV
1610
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
1611
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1612
      }
1613
    } else {
UNCOV
1614
      if (!iColHash) {
×
UNCOV
1615
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
UNCOV
1616
        if (iColHash == NULL) {
×
UNCOV
1617
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1618
        }
1619
      }
1620

1621
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
×
UNCOV
1622
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1623
      }
1624
    }
1625
  }
1626

1627
  // 2. prepare by the other rows
UNCOV
1628
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
×
UNCOV
1629
    if (tSimpleHashGetSize(iColHash) == 0) {
×
UNCOV
1630
      break;
×
1631
    }
1632

UNCOV
1633
    tRow.pTSRow = aRow[iRow];
×
1634

UNCOV
1635
    STsdbRowKey tsdbRowKey = {0};
×
UNCOV
1636
    tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1637

UNCOV
1638
    void   *pIte = NULL;
×
UNCOV
1639
    int32_t iter = 0;
×
UNCOV
1640
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
×
UNCOV
1641
      int32_t iCol = ((int32_t *)pIte)[0];
×
UNCOV
1642
      SColVal colVal = COL_VAL_NONE(0, 0);
×
UNCOV
1643
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
×
1644

UNCOV
1645
      if (COL_VAL_IS_VALUE(&colVal)) {
×
UNCOV
1646
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
UNCOV
1647
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
1648
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1649
        }
UNCOV
1650
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
UNCOV
1651
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1652
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1653
                    __LINE__, tstrerror(code));
1654
        }
1655
      }
1656
    }
1657
  }
1658

UNCOV
1659
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1660

UNCOV
1661
_exit:
×
UNCOV
1662
  if (code) {
×
UNCOV
1663
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1664
  }
1665

1666
  tsdbRowClose(&iter);
×
1667
  tSimpleHashCleanup(iColHash);
×
UNCOV
1668
  taosArrayClear(ctxArray);
×
1669

UNCOV
1670
  TAOS_RETURN(code);
×
1671
}
1672

1673
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
UNCOV
1674
  int32_t      code = 0, lino = 0;
×
UNCOV
1675
  STSDBRowIter iter = {0};
×
UNCOV
1676
  STSchema    *pTSchema = NULL;
×
UNCOV
1677
  SArray      *ctxArray = NULL;
×
1678

UNCOV
1679
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
UNCOV
1680
  int32_t sver = TSDBROW_SVERSION(&lRow);
×
1681

UNCOV
1682
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1683

UNCOV
1684
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
UNCOV
1685
  if (ctxArray == NULL) {
×
UNCOV
1686
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1687
  }
1688

1689
  // 1. prepare last
UNCOV
1690
  STsdbRowKey tsdbRowKey = {0};
×
UNCOV
1691
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1692

1693
  {
UNCOV
1694
    SLastUpdateCtx updateCtx = {
×
1695
        .lflag = LFLAG_LAST,
1696
        .tsdbRowKey = tsdbRowKey,
UNCOV
1697
        .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP,
×
1698
                                                                       .val = lRow.pBlockData->aTSKEY[lRow.iRow]}))};
UNCOV
1699
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
1700
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1701
    }
1702
  }
1703

UNCOV
1704
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1705

1706
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1707
    SColData *pColData = &pBlockData->aColData[iColData];
×
UNCOV
1708
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
UNCOV
1709
      continue;
×
1710
    }
1711

UNCOV
1712
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
UNCOV
1713
      STsdbRowKey tsdbRowKey = {0};
×
UNCOV
1714
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1715

UNCOV
1716
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
UNCOV
1717
      if (colType == 2) {
×
UNCOV
1718
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
UNCOV
1719
        tColDataGetValue(pColData, tRow.iRow, &colVal);
×
1720

UNCOV
1721
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1722
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1723
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1724
        }
1725
        break;
×
1726
      }
1727
    }
1728
  }
1729

1730
  // 2. prepare last row
UNCOV
1731
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
UNCOV
1732
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
UNCOV
1733
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
UNCOV
1734
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
1735
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1736
    }
1737
  }
1738

UNCOV
1739
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1740

UNCOV
1741
_exit:
×
1742
  tsdbRowClose(&iter);
×
UNCOV
1743
  taosMemoryFreeClear(pTSchema);
×
UNCOV
1744
  taosArrayDestroy(ctxArray);
×
1745

UNCOV
1746
  TAOS_RETURN(code);
×
1747
}
1748

1749
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1750
                            int nCols, int16_t *slotIds);
1751

1752
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1753
                               int nCols, int16_t *slotIds);
1754

1755
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
81✔
1756
                                    SCacheRowsReader *pr, int8_t ltype) {
1757
  int32_t               code = 0, lino = 0;
81✔
1758
  rocksdb_writebatch_t *wb = NULL;
81✔
1759
  SArray               *pTmpColArray = NULL;
81✔
1760
  bool                  extraTS = false;
81✔
1761

1762
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
81✔
1763
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
81✔
1764
    // ignore 'ts' loaded from cache and load it from tsdb
1765
    // SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
1766
    // tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
1767

1768
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
54✔
1769
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
54!
UNCOV
1770
      TAOS_RETURN(terrno);
×
1771
    }
1772

1773
    extraTS = true;
54✔
1774
  }
1775

1776
  int      num_keys = TARRAY_SIZE(remainCols);
81✔
1777
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
81!
1778

1779
  int16_t *lastColIds = NULL, *lastSlotIds = NULL, *lastrowColIds = NULL, *lastrowSlotIds = NULL;
81✔
1780
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
81!
1781
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
81!
1782
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
81!
1783
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
81!
1784
  SArray *lastTmpColArray = NULL, *lastTmpIndexArray = NULL, *lastrowTmpColArray = NULL, *lastrowTmpIndexArray = NULL;
81✔
1785

1786
  int lastIndex = 0;
81✔
1787
  int lastrowIndex = 0;
81✔
1788

1789
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
81!
UNCOV
1790
    TAOS_CHECK_EXIT(terrno);
×
1791
  }
1792

1793
  for (int i = 0; i < num_keys; ++i) {
338✔
1794
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
257✔
1795
    if (extraTS && !i) {
257✔
1796
      slotIds[i] = 0;
54✔
1797
    } else {
1798
      slotIds[i] = pr->pSlotIds[idxKey->idx];
203✔
1799
    }
1800

1801
    if (IS_LAST_KEY(idxKey->key)) {
257✔
1802
      if (NULL == lastTmpIndexArray) {
168✔
1803
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
60✔
1804
        if (!lastTmpIndexArray) {
60!
UNCOV
1805
          TAOS_CHECK_EXIT(terrno);
×
1806
        }
1807
      }
1808
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
168!
UNCOV
1809
        TAOS_CHECK_EXIT(terrno);
×
1810
      }
1811
      lastColIds[lastIndex] = idxKey->key.cid;
168✔
1812
      if (extraTS && !i) {
168✔
1813
        lastSlotIds[lastIndex] = 0;
54✔
1814
      } else {
1815
        lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
114✔
1816
      }
1817
      lastIndex++;
168✔
1818
    } else {
1819
      if (NULL == lastrowTmpIndexArray) {
89✔
1820
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
21✔
1821
        if (!lastrowTmpIndexArray) {
21!
UNCOV
1822
          TAOS_CHECK_EXIT(terrno);
×
1823
        }
1824
      }
1825
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
89!
1826
        TAOS_CHECK_EXIT(terrno);
×
1827
      }
1828
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
89✔
1829
      if (extraTS && !i) {
89!
UNCOV
1830
        lastrowSlotIds[lastrowIndex] = 0;
×
1831
      } else {
1832
        lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
89✔
1833
      }
1834
      lastrowIndex++;
89✔
1835
    }
1836
  }
1837

1838
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
81✔
1839
  if (!pTmpColArray) {
81!
UNCOV
1840
    TAOS_CHECK_EXIT(terrno);
×
1841
  }
1842

1843
  if (lastTmpIndexArray != NULL) {
81✔
1844
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
60!
1845
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
214✔
1846
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
154!
1847
                           taosArrayGet(lastTmpColArray, i))) {
154✔
1848
        TAOS_CHECK_EXIT(terrno);
×
1849
      }
1850
    }
1851
  }
1852

1853
  if (lastrowTmpIndexArray != NULL) {
81✔
1854
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
21!
1855
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
96✔
1856
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
75!
1857
                           taosArrayGet(lastrowTmpColArray, i))) {
75✔
UNCOV
1858
        TAOS_CHECK_EXIT(terrno);
×
1859
      }
1860
    }
1861
  }
1862

1863
  SLRUCache *pCache = pTsdb->lruCache;
81✔
1864
  for (int i = 0; i < num_keys; ++i) {
338✔
1865
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
257✔
1866
    SLastCol *pLastCol = NULL;
257✔
1867

1868
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
257!
1869
      pLastCol = taosArrayGet(pTmpColArray, i);
229✔
1870
    }
1871

1872
    // still null, then make up a none col value
1873
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
257✔
1874
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
257✔
1875
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
1876
    if (!pLastCol) {
257✔
1877
      pLastCol = &noneCol;
28✔
1878
    }
1879

1880
    if (!extraTS || i > 0) {
257✔
1881
      taosArraySet(pLastArray, idxKey->idx, pLastCol);
203✔
1882
    }
1883
    // taosArrayRemove(remainCols, i);
1884

1885
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
257!
UNCOV
1886
      continue;
×
1887
    }
1888
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
257!
1889
      continue;
×
1890
    }
1891

1892
    // store result back to rocks cache
1893
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
257✔
1894
    if (code) {
257!
UNCOV
1895
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
UNCOV
1896
      TAOS_CHECK_EXIT(code);
×
1897
    }
1898

1899
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
257✔
1900
    if (code) {
257!
UNCOV
1901
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
UNCOV
1902
      TAOS_CHECK_EXIT(code);
×
1903
    }
1904
  }
1905

1906
  rocksMayWrite(pTsdb, false);
81✔
1907

1908
_exit:
81✔
1909
  taosArrayDestroy(lastrowTmpIndexArray);
81✔
1910
  taosArrayDestroy(lastrowTmpColArray);
81✔
1911
  taosArrayDestroy(lastTmpIndexArray);
81✔
1912
  taosArrayDestroy(lastTmpColArray);
81✔
1913

1914
  taosMemoryFree(lastColIds);
81!
1915
  taosMemoryFree(lastSlotIds);
81!
1916
  taosMemoryFree(lastrowColIds);
81!
1917
  taosMemoryFree(lastrowSlotIds);
81!
1918

1919
  taosArrayDestroy(pTmpColArray);
81✔
1920

1921
  taosMemoryFree(slotIds);
81!
1922

1923
  TAOS_RETURN(code);
81✔
1924
}
1925

1926
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
120✔
1927
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
1928
  int32_t code = 0, lino = 0;
120✔
1929
  int     num_keys = TARRAY_SIZE(remainCols);
120✔
1930
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
120!
1931
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
120!
1932
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
120!
1933
  if (!keys_list || !keys_list_sizes || !key_list) {
120!
UNCOV
1934
    taosMemoryFree(keys_list);
×
UNCOV
1935
    taosMemoryFree(keys_list_sizes);
×
UNCOV
1936
    TAOS_RETURN(terrno);
×
1937
  }
1938
  char  **values_list = NULL;
120✔
1939
  size_t *values_list_sizes = NULL;
120✔
1940
  for (int i = 0; i < num_keys; ++i) {
398✔
1941
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
278✔
1942
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
278✔
1943
    keys_list_sizes[i] = ROCKS_KEY_LEN;
278✔
1944
  }
1945

1946
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
120✔
1947

1948
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
120✔
1949
                                     &values_list_sizes);
1950
  if (code) {
120!
UNCOV
1951
    taosMemoryFree(key_list);
×
UNCOV
1952
    taosMemoryFree(keys_list);
×
UNCOV
1953
    taosMemoryFree(keys_list_sizes);
×
UNCOV
1954
    TAOS_RETURN(code);
×
1955
  }
1956

1957
  SLRUCache *pCache = pTsdb->lruCache;
120✔
1958
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
398!
1959
    SLastCol *pLastCol = NULL;
278✔
1960
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
278✔
1961
    if (ignore) {
278✔
1962
      ++j;
1✔
1963
      continue;
1✔
1964
    }
1965

1966
    if (values_list[i] != NULL) {
277✔
1967
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
75✔
1968
      if (code != TSDB_CODE_SUCCESS) {
75!
UNCOV
1969
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1970
                  tstrerror(code));
UNCOV
1971
        goto _exit;
×
1972
      }
1973
    }
1974
    SLastCol *pToFree = pLastCol;
277✔
1975
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
277✔
1976
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
352!
1977
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
75✔
1978
      if (code) {
75!
UNCOV
1979
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
UNCOV
1980
        taosMemoryFreeClear(pToFree);
×
UNCOV
1981
        TAOS_CHECK_EXIT(code);
×
1982
      }
1983

1984
      SLastCol lastCol = *pLastCol;
75✔
1985
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
75✔
1986
      if (TSDB_CODE_SUCCESS != code) {
75!
UNCOV
1987
        taosMemoryFreeClear(pToFree);
×
UNCOV
1988
        TAOS_CHECK_EXIT(code);
×
1989
      }
1990

1991
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
75✔
1992
      taosArrayRemove(remainCols, j);
75✔
1993
      taosArrayRemove(ignoreFromRocks, j);
75✔
1994
    } else {
1995
      ++j;
202✔
1996
    }
1997

1998
    taosMemoryFreeClear(pToFree);
277!
1999
  }
2000

2001
  if (TARRAY_SIZE(remainCols) > 0) {
120✔
2002
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
2003
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
81✔
2004
  }
2005

2006
_exit:
39✔
2007
  taosMemoryFree(key_list);
120!
2008
  taosMemoryFree(keys_list);
120!
2009
  taosMemoryFree(keys_list_sizes);
120!
2010
  if (values_list) {
120!
2011
    for (int i = 0; i < num_keys; ++i) {
398✔
2012
      rocksdb_free(values_list[i]);
278✔
2013
    }
2014
    taosMemoryFree(values_list);
120!
2015
  }
2016
  taosMemoryFree(values_list_sizes);
120!
2017

2018
  TAOS_RETURN(code);
120✔
2019
}
2020

2021
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
4,393✔
2022
                                        int8_t ltype, SArray *keyArray) {
2023
  int32_t    code = 0, lino = 0;
4,393✔
2024
  SArray    *remainCols = NULL;
4,393✔
2025
  SArray    *ignoreFromRocks = NULL;
4,393✔
2026
  SLRUCache *pCache = pTsdb->lruCache;
4,393✔
2027
  SArray    *pCidList = pr->pCidList;
4,393✔
2028
  int        numKeys = TARRAY_SIZE(pCidList);
4,393✔
2029

2030
  for (int i = 0; i < numKeys; ++i) {
13,972✔
2031
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
9,571✔
2032

2033
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
9,571✔
2034
    // for select last_row, last case
2035
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
9,571✔
2036
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
9,571✔
2037
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
1,356✔
2038
    }
2039
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
9,570✔
2040
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
709✔
2041
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
709✔
2042
    }
2043

2044
    if (!taosArrayPush(keyArray, &key)) {
9,569!
UNCOV
2045
      TAOS_CHECK_EXIT(terrno);
×
2046
    }
2047

2048
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
9,569✔
2049
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
9,581✔
2050
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
18,878✔
2051
      SLastCol lastCol = *pLastCol;
9,303✔
2052
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
9,303!
UNCOV
2053
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2054
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
2055
      }
2056

2057
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
9,298!
UNCOV
2058
        code = terrno;
×
UNCOV
2059
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2060
        goto _exit;
×
2061
      }
2062
    } else {
2063
      // no cache or cache is invalid
2064
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
277✔
2065
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
277✔
2066

2067
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
278!
2068
        code = terrno;
×
UNCOV
2069
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2070
        goto _exit;
×
2071
      }
2072

2073
      if (!remainCols) {
278✔
2074
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
120!
UNCOV
2075
          code = terrno;
×
UNCOV
2076
          tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2077
          goto _exit;
×
2078
        }
2079
      }
2080
      if (!ignoreFromRocks) {
278✔
2081
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
120!
UNCOV
2082
          code = terrno;
×
2083
          tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2084
          goto _exit;
×
2085
        }
2086
      }
2087
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
556!
UNCOV
2088
        code = terrno;
×
UNCOV
2089
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2090
        goto _exit;
×
2091
      }
2092
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
278!
2093
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
278!
UNCOV
2094
        code = terrno;
×
UNCOV
2095
        tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
2096
        goto _exit;
×
2097
      }
2098
    }
2099

2100
    if (h) {
9,576✔
2101
      tsdbLRUCacheRelease(pCache, h, false);
9,297✔
2102
    }
2103
  }
2104

2105
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
4,401!
2106
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
120✔
2107

2108
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
398✔
2109
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
278✔
2110
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
278✔
2111
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
278✔
2112
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
278!
2113
        SLastCol lastCol = *pLastCol;
×
2114
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
UNCOV
2115
        if (code) {
×
2116
          tsdbLRUCacheRelease(pCache, h, false);
×
2117
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
UNCOV
2118
          TAOS_RETURN(code);
×
2119
        }
2120

2121
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
2122

2123
        taosArrayRemove(remainCols, i);
×
UNCOV
2124
        taosArrayRemove(ignoreFromRocks, i);
×
2125
      } else {
2126
        // no cache or cache is invalid
2127
        ++i;
278✔
2128
      }
2129
      if (h) {
278✔
2130
        tsdbLRUCacheRelease(pCache, h, false);
1✔
2131
      }
2132
    }
2133

2134
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
2135
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
120✔
2136

2137
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
120✔
2138
  }
2139

2140
_exit:
4,281✔
2141
  if (remainCols) {
4,399✔
2142
    taosArrayDestroy(remainCols);
120✔
2143
  }
2144
  if (ignoreFromRocks) {
4,399✔
2145
    taosArrayDestroy(ignoreFromRocks);
120✔
2146
  }
2147

2148
  TAOS_RETURN(code);
4,399✔
2149
}
2150

2151
typedef enum SMEMNEXTROWSTATES {
2152
  SMEMNEXTROW_ENTER,
2153
  SMEMNEXTROW_NEXT,
2154
} SMEMNEXTROWSTATES;
2155

2156
typedef struct SMemNextRowIter {
2157
  SMEMNEXTROWSTATES state;
2158
  STbData          *pMem;  // [input]
2159
  STbDataIter       iter;  // mem buffer skip list iterator
2160
  int64_t           lastTs;
2161
} SMemNextRowIter;
2162

2163
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
878,576✔
2164
                                 int nCols) {
2165
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
878,576✔
2166
  int32_t          code = 0;
878,576✔
2167
  *pIgnoreEarlierTs = false;
878,576✔
2168
  switch (state->state) {
878,576!
2169
    case SMEMNEXTROW_ENTER: {
3,505✔
2170
      if (state->pMem != NULL) {
3,505!
2171
        /*
2172
        if (state->pMem->maxKey <= state->lastTs) {
2173
          *ppRow = NULL;
2174
          *pIgnoreEarlierTs = true;
2175

2176
          TAOS_RETURN(code);
2177
        }
2178
        */
2179
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
3,505✔
2180

2181
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
3,503!
2182
        if (pMemRow) {
3,503!
2183
          *ppRow = pMemRow;
3,503✔
2184
          state->state = SMEMNEXTROW_NEXT;
3,503✔
2185

2186
          TAOS_RETURN(code);
3,503✔
2187
        }
2188
      }
2189

UNCOV
2190
      *ppRow = NULL;
×
2191

UNCOV
2192
      TAOS_RETURN(code);
×
2193
    }
2194
    case SMEMNEXTROW_NEXT:
875,442✔
2195
      if (tsdbTbDataIterNext(&state->iter)) {
875,442✔
2196
        *ppRow = tsdbTbDataIterGet(&state->iter);
863,686!
2197

2198
        TAOS_RETURN(code);
863,686✔
2199
      } else {
2200
        *ppRow = NULL;
65✔
2201

2202
        TAOS_RETURN(code);
65✔
2203
      }
UNCOV
2204
    default:
×
UNCOV
2205
      break;
×
2206
  }
2207

UNCOV
2208
_err:
×
UNCOV
2209
  *ppRow = NULL;
×
2210

UNCOV
2211
  TAOS_RETURN(code);
×
2212
}
2213

2214
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2215
                                  int nCols);
2216
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2217

2218
typedef struct {
2219
  TSDBROW             *pRow;
2220
  bool                 stop;
2221
  bool                 next;
2222
  bool                 ignoreEarlierTs;
2223
  void                *iter;
2224
  _next_row_fn_t       nextRowFn;
2225
  _next_row_clear_fn_t nextRowClearFn;
2226
} TsdbNextRowState;
2227

2228
typedef struct {
2229
  SArray           *pMemDelData;
2230
  SArray           *pSkyline;
2231
  int64_t           iSkyline;
2232
  SBlockIdx         idx;
2233
  SMemNextRowIter   memState;
2234
  SMemNextRowIter   imemState;
2235
  TSDBROW           memRow, imemRow;
2236
  TsdbNextRowState  input[2];
2237
  SCacheRowsReader *pr;
2238
  STsdb            *pTsdb;
2239
} MemNextRowIter;
2240

2241
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
4,398✔
2242
                              STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
2243
  int32_t code = 0, lino = 0;
4,398✔
2244

2245
  STbData *pMem = NULL;
4,398✔
2246
  if (pReadSnap->pMem) {
4,398!
2247
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
4,398✔
2248
  }
2249

2250
  STbData *pIMem = NULL;
4,399✔
2251
  if (pReadSnap->pIMem) {
4,399!
UNCOV
2252
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
2253
  }
2254

2255
  pIter->pTsdb = pTsdb;
4,399✔
2256

2257
  pIter->pMemDelData = NULL;
4,399✔
2258

2259
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
4,399!
2260

2261
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
4,399✔
2262

2263
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
4,399✔
2264
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
4,399✔
2265

2266
  if (pMem) {
4,399✔
2267
    pIter->memState.pMem = pMem;
3,440✔
2268
    pIter->memState.state = SMEMNEXTROW_ENTER;
3,440✔
2269
    pIter->input[0].stop = false;
3,440✔
2270
    pIter->input[0].next = true;
3,440✔
2271
  }
2272

2273
  if (pIMem) {
4,399!
UNCOV
2274
    pIter->imemState.pMem = pIMem;
×
UNCOV
2275
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
UNCOV
2276
    pIter->input[1].stop = false;
×
UNCOV
2277
    pIter->input[1].next = true;
×
2278
  }
2279

2280
  pIter->pr = pr;
4,399✔
2281

2282
_exit:
4,399✔
2283
  if (code) {
4,399!
UNCOV
2284
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2285
  }
2286

2287
  TAOS_RETURN(code);
4,399✔
2288
}
2289

2290
static void memRowIterClose(MemNextRowIter *pIter) {
4,394✔
2291
  for (int i = 0; i < 2; ++i) {
13,185✔
2292
    if (pIter->input[i].nextRowClearFn) {
8,791!
2293
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
2294
    }
2295
  }
2296

2297
  if (pIter->pSkyline) {
4,394✔
2298
    taosArrayDestroy(pIter->pSkyline);
3,437✔
2299
  }
2300

2301
  if (pIter->pMemDelData) {
4,395!
2302
    taosArrayDestroy(pIter->pMemDelData);
4,397✔
2303
  }
2304
}
4,394✔
2305

2306
static void freeTableInfoFunc(void *param) {
3,448✔
2307
  void **p = (void **)param;
3,448✔
2308
  taosMemoryFreeClear(*p);
3,448!
2309
}
3,448✔
2310

2311
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
3,542✔
2312
  if (!pReader->pTableMap) {
3,542✔
2313
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
2,054✔
2314
    if (!pReader->pTableMap) {
2,054!
UNCOV
2315
      return NULL;
×
2316
    }
2317

2318
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
2,054✔
2319
  }
2320

2321
  STableLoadInfo  *pInfo = NULL;
3,542✔
2322
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
3,542✔
2323
  if (!ppInfo) {
3,546✔
2324
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
3,448!
2325
    if (pInfo) {
3,447!
2326
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
3,447!
UNCOV
2327
        return NULL;
×
2328
      }
2329
    }
2330

2331
    return pInfo;
3,448✔
2332
  }
2333

2334
  return *ppInfo;
98✔
2335
}
2336

2337
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
879,870✔
2338
  int32_t code = 0, lino = 0;
879,870✔
2339

UNCOV
2340
  for (;;) {
×
2341
    for (int i = 0; i < 2; ++i) {
2,600,934✔
2342
      if (pIter->input[i].next && !pIter->input[i].stop) {
1,742,778!
2343
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
880,842!
2344
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
2345
                        &lino, _exit);
2346

2347
        if (pIter->input[i].pRow == NULL) {
859,575✔
2348
          pIter->input[i].stop = true;
181✔
2349
          pIter->input[i].next = false;
181✔
2350
        }
2351
      }
2352
    }
2353

2354
    if (pIter->input[0].stop && pIter->input[1].stop) {
858,156!
2355
      return NULL;
859,989✔
2356
    }
2357

2358
    TSDBROW *max[2] = {0};
857,016✔
2359
    int      iMax[2] = {-1, -1};
857,016✔
2360
    int      nMax = 0;
857,016✔
2361
    SRowKey  maxKey = {.ts = TSKEY_MIN};
857,016✔
2362

2363
    for (int i = 0; i < 2; ++i) {
2,591,025✔
2364
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
1,729,529!
2365
        STsdbRowKey tsdbRowKey = {0};
862,195✔
2366
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
862,195✔
2367

2368
        // merging & deduplicating on client side
2369
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
864,131✔
2370
        if (c <= 0) {
866,675✔
2371
          if (c < 0) {
866,020✔
2372
            nMax = 0;
863,393✔
2373
            maxKey = tsdbRowKey.key;
863,393✔
2374
          }
2375

2376
          iMax[nMax] = i;
866,020✔
2377
          max[nMax++] = pIter->input[i].pRow;
866,020✔
2378
        }
2379
        pIter->input[i].next = false;
866,675✔
2380
      }
2381
    }
2382

2383
    TSDBROW *merge[2] = {0};
861,496✔
2384
    int      iMerge[2] = {-1, -1};
861,496✔
2385
    int      nMerge = 0;
861,496✔
2386
    for (int i = 0; i < nMax; ++i) {
1,727,522✔
2387
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
869,120!
2388

2389
      if (!pIter->pSkyline) {
869,120✔
2390
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
3,437✔
2391
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
3,437!
2392

2393
        uint64_t        uid = pIter->idx.uid;
3,437✔
2394
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
3,437✔
2395
        TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
3,439!
2396

2397
        if (pInfo->pTombData == NULL) {
3,439✔
2398
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
3,374✔
2399
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
3,375!
2400
        }
2401

2402
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
3,440!
UNCOV
2403
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2404
        }
2405

2406
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
3,439✔
2407
        if (delSize > 0) {
3,439✔
2408
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
1✔
2409
          TAOS_CHECK_GOTO(code, &lino, _exit);
1!
2410
        }
2411
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
3,439✔
2412
      }
2413

2414
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
869,122✔
2415
      if (!deleted) {
866,026!
2416
        iMerge[nMerge] = iMax[i];
868,383✔
2417
        merge[nMerge++] = max[i];
868,383✔
2418
      }
2419

2420
      pIter->input[iMax[i]].next = deleted;
866,026✔
2421
    }
2422

2423
    if (nMerge > 0) {
858,402!
2424
      pIter->input[iMerge[0]].next = true;
858,849✔
2425

2426
      return merge[0];
858,849✔
2427
    }
2428
  }
2429

UNCOV
2430
_exit:
×
UNCOV
2431
  if (code) {
×
UNCOV
2432
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2433
  }
2434

UNCOV
2435
  return NULL;
×
2436
}
2437

2438
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
1,587✔
2439
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
1,587✔
2440
  *ppDst = taosMemoryMalloc(len);
1,587!
2441
  if (NULL == *ppDst) {
1,588!
UNCOV
2442
    TAOS_RETURN(terrno);
×
2443
  }
2444
  memcpy(*ppDst, pSrc, len);
1,588✔
2445

2446
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,588✔
2447
}
2448

2449
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
856,241✔
2450
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
856,241✔
2451
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
1,589✔
2452
  }
2453

2454
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
854,652!
2455
    TAOS_RETURN(TSDB_CODE_SUCCESS);
854,823✔
2456
  }
2457

UNCOV
2458
  taosMemoryFreeClear(pReader->pCurrSchema);
×
UNCOV
2459
  TAOS_RETURN(
×
2460
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
2461
}
2462

2463
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
4,398✔
2464
                                        SArray *keyArray) {
2465
  int32_t        code = 0;
4,398✔
2466
  int32_t        lino = 0;
4,398✔
2467
  STSchema      *pTSchema = pr->pSchema;
4,398✔
2468
  SLRUCache     *pCache = pTsdb->lruCache;
4,398✔
2469
  SArray        *pCidList = pr->pCidList;
4,398✔
2470
  int            numKeys = TARRAY_SIZE(pCidList);
4,398✔
2471
  MemNextRowIter iter = {0};
4,398✔
2472
  SSHashObj     *iColHash = NULL;
4,398✔
2473

2474
  // 1, get from mem, imem filtered with delete info
2475
  TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
4,398!
2476

2477
  TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
4,399✔
2478
  if (!pRow) {
4,397✔
2479
    goto _exit;
959✔
2480
  }
2481

2482
  int32_t sversion = TSDBROW_SVERSION(pRow);
3,438!
2483
  if (sversion != -1) {
3,438!
2484
    TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
3,440!
2485

2486
    pTSchema = pr->pCurrSchema;
3,438✔
2487
  }
2488
  int32_t nCol = pTSchema->numOfCols;
3,436✔
2489

2490
  STsdbRowKey rowKey = {0};
3,436✔
2491
  tsdbRowGetKey(pRow, &rowKey);
3,436✔
2492

2493
  STSDBRowIter rowIter = {0};
3,440✔
2494
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
3,440!
2495

2496
  int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
3,440✔
2497
  for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) {
34,603!
2498
    SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
31,166✔
2499
    if (pColVal->cid < pTargetCol->colVal.cid) {
31,166✔
2500
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
25,299✔
2501

2502
      continue;
25,296✔
2503
    }
2504
    if (pColVal->cid > pTargetCol->colVal.cid) {
5,867!
UNCOV
2505
      break;
×
2506
    }
2507

2508
    int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
5,867✔
2509
    if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
5,866✔
2510
      if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
3,235!
2511
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
3,235✔
2512
        TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
3,235!
2513

2514
        tsdbCacheFreeSLastColItem(pTargetCol);
3,235✔
2515
        taosArraySet(pLastArray, jCol, &lastCol);
3,235✔
2516
      }
2517
    } else {
2518
      if (COL_VAL_IS_VALUE(pColVal)) {
2,631✔
2519
        if (cmp_res <= 0) {
2,029!
2520
          SLastCol lastCol = {
2,029✔
2521
              .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2522
          TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
2,029!
2523

2524
          tsdbCacheFreeSLastColItem(pTargetCol);
2,031✔
2525
          taosArraySet(pLastArray, jCol, &lastCol);
2,031✔
2526
        }
2527
      } else {
2528
        if (!iColHash) {
602✔
2529
          iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
247✔
2530
          if (iColHash == NULL) {
247!
UNCOV
2531
            TAOS_CHECK_EXIT(terrno);
×
2532
          }
2533
        }
2534

2535
        if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
602!
UNCOV
2536
          TAOS_CHECK_EXIT(terrno);
×
2537
        }
2538
      }
2539
    }
2540

2541
    ++jCol;
5,867✔
2542

2543
    if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) {
5,867✔
2544
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
2,962✔
2545
    }
2546
  }
2547
  tsdbRowClose(&rowIter);
3,436✔
2548

2549
  if (iColHash && tSimpleHashGetSize(iColHash) > 0) {
3,438!
2550
    pRow = memRowIterGet(&iter, false, NULL, 0);
247✔
2551
    while (pRow) {
862,173✔
2552
      if (tSimpleHashGetSize(iColHash) == 0) {
861,992✔
2553
        break;
66✔
2554
      }
2555

2556
      sversion = TSDBROW_SVERSION(pRow);
853,998!
2557
      if (sversion != -1) {
853,998!
2558
        TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
855,240!
2559

2560
        pTSchema = pr->pCurrSchema;
832,482✔
2561
      }
2562
      nCol = pTSchema->numOfCols;
831,240✔
2563

2564
      STsdbRowKey tsdbRowKey = {0};
831,240✔
2565
      tsdbRowGetKey(pRow, &tsdbRowKey);
831,240✔
2566

2567
      STSDBRowIter rowIter = {0};
823,047✔
2568
      TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
823,047!
2569

2570
      iCol = 0;
856,735✔
2571
      for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
7,252,846!
2572
           pColVal = tsdbRowIterNext(&rowIter), iCol++) {
6,255,672✔
2573
        int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
6,615,347✔
2574
        if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
6,255,672✔
2575
          SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol];
422✔
2576

2577
          int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key);
422✔
2578
          if (cmp_res <= 0) {
422!
2579
            SLastCol lastCol = {
422✔
2580
                .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2581
            TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
422!
2582

2583
            tsdbCacheFreeSLastColItem(pTargetCol);
422✔
2584
            taosArraySet(pLastArray, *pjCol, &lastCol);
422✔
2585
          }
2586

2587
          TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
422!
2588
        }
2589
      }
2590
      tsdbRowClose(&rowIter);
591,319✔
2591

2592
      pRow = memRowIterGet(&iter, false, NULL, 0);
879,396✔
2593
    }
2594
  }
2595

2596
_exit:
3,372✔
2597
  if (code) {
4,397!
UNCOV
2598
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2599

UNCOV
2600
    tsdbRowClose(&rowIter);
×
2601
  }
2602

2603
  tSimpleHashCleanup(iColHash);
4,397✔
2604

2605
  memRowIterClose(&iter);
4,394✔
2606

2607
  TAOS_RETURN(code);
4,396✔
2608
}
2609

2610
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
4,398✔
2611
  int32_t code = 0;
4,398✔
2612
  int32_t lino = 0;
4,398✔
2613

2614
  SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
4,398✔
2615
  if (!keyArray) {
4,397!
UNCOV
2616
    TAOS_CHECK_EXIT(terrno);
×
2617
  }
2618

2619
  TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
4,397!
2620

2621
  if (tsUpdateCacheBatch) {
4,399!
2622
    TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
4,399!
2623
  }
2624

2625
_exit:
4,397✔
2626
  if (code) {
4,397!
UNCOV
2627
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2628
  }
2629

2630
  if (keyArray) {
4,397!
2631
    taosArrayDestroy(keyArray);
4,397✔
2632
  }
2633

2634
  TAOS_RETURN(code);
4,397✔
2635
}
2636

2637
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
38,675✔
2638
  int32_t code = 0, lino = 0;
38,675✔
2639
  // fetch schema
2640
  STSchema *pTSchema = NULL;
38,675✔
2641
  int       sver = -1;
38,675✔
2642

2643
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
38,675!
2644

2645
  // build keys & multi get from rocks
2646
  int     numCols = pTSchema->numOfCols;
38,677✔
2647
  int     numKeys = 0;
38,677✔
2648
  SArray *remainCols = NULL;
38,677✔
2649

2650
  code = tsdbCacheCommit(pTsdb);
38,677✔
2651
  if (code != TSDB_CODE_SUCCESS) {
38,678!
UNCOV
2652
    tsdbTrace("vgId:%d, %s commit failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2653
              tstrerror(code));
2654
  }
2655

2656
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
38,678✔
2657

2658
  for (int i = 0; i < numCols; ++i) {
255,551✔
2659
    int16_t cid = pTSchema->columns[i].colId;
216,862✔
2660
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
650,583✔
2661
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
433,710✔
2662
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
433,710✔
2663
      if (h) {
433,722✔
2664
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
122,909✔
2665
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
122,908✔
2666
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
168✔
2667
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
168✔
2668
                              .dirty = 1,
2669
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2670
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
168✔
2671
        }
2672
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
122,908✔
2673
        TAOS_CHECK_EXIT(code);
122,909!
2674
      } else {
2675
        if (!remainCols) {
310,813✔
2676
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
26,318✔
2677
        }
2678
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
621,626!
UNCOV
2679
          TAOS_CHECK_EXIT(terrno);
×
2680
        }
2681
      }
2682
    }
2683
  }
2684

2685
  if (remainCols) {
38,689✔
2686
    numKeys = TARRAY_SIZE(remainCols);
26,319✔
2687
  }
2688

2689
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
38,689!
2690
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
38,678!
2691
  char  **values_list = NULL;
38,678✔
2692
  size_t *values_list_sizes = NULL;
38,678✔
2693

2694
  if (!keys_list || !keys_list_sizes) {
38,678!
UNCOV
2695
    code = terrno;
×
2696
    goto _exit;
×
2697
  }
2698
  const size_t klen = ROCKS_KEY_LEN;
38,678✔
2699

2700
  for (int i = 0; i < numKeys; ++i) {
349,479✔
2701
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
310,797!
2702
    if (!key) {
310,800!
UNCOV
2703
      code = terrno;
×
UNCOV
2704
      goto _exit;
×
2705
    }
2706
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
310,800✔
2707

2708
    ((SLastKey *)key)[0] = idxKey->key;
310,801✔
2709

2710
    keys_list[i] = key;
310,801✔
2711
    keys_list_sizes[i] = klen;
310,801✔
2712
  }
2713

2714
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
38,682✔
2715

2716
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
38,678!
2717
                                              &values_list, &values_list_sizes),
2718
                  NULL, _exit);
2719

2720
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
38,678✔
2721
  for (int i = 0; i < numKeys; ++i) {
349,475✔
2722
    SLastCol *pLastCol = NULL;
310,796✔
2723
    if (values_list[i] != NULL) {
310,796✔
2724
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
6,209✔
2725
      if (code != TSDB_CODE_SUCCESS) {
6,209!
2726
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2727
                  tstrerror(code));
2728
        goto _exit;
×
2729
      }
2730
    }
2731
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
310,796✔
2732
    SLastKey *pLastKey = &idxKey->key;
310,796✔
2733
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
310,796✔
2734
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
39✔
2735
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
39✔
2736
                             .dirty = 0,
2737
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2738

2739
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
39!
UNCOV
2740
        taosMemoryFreeClear(pLastCol);
×
2741
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
UNCOV
2742
        goto _exit;
×
2743
      }
2744
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
39!
2745
        taosMemoryFreeClear(pLastCol);
×
UNCOV
2746
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2747
        goto _exit;
×
2748
      }
2749
    }
2750

2751
    if (pLastCol == NULL) {
310,796✔
2752
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
304,587✔
2753
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2754
    }
2755

2756
    taosMemoryFreeClear(pLastCol);
310,797!
2757
  }
2758

2759
  rocksMayWrite(pTsdb, false);
38,679✔
2760

2761
_exit:
38,678✔
2762
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
38,678✔
2763

2764
  for (int i = 0; i < numKeys; ++i) {
349,472✔
2765
    taosMemoryFree(keys_list[i]);
310,794!
2766
  }
2767
  taosMemoryFree(keys_list);
38,678!
2768
  taosMemoryFree(keys_list_sizes);
38,678!
2769
  if (values_list) {
38,678!
2770
    for (int i = 0; i < numKeys; ++i) {
349,454✔
2771
      rocksdb_free(values_list[i]);
310,778✔
2772
    }
2773
    taosMemoryFree(values_list);
38,676!
2774
  }
2775
  taosMemoryFree(values_list_sizes);
38,678!
2776
  taosArrayDestroy(remainCols);
38,678✔
2777
  taosMemoryFree(pTSchema);
38,678!
2778

2779
  TAOS_RETURN(code);
38,678✔
2780
}
2781

2782
int32_t tsdbOpenCache(STsdb *pTsdb) {
10,156✔
2783
  int32_t code = 0, lino = 0;
10,156✔
2784
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
10,156✔
2785

2786
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
10,156✔
2787
  if (pCache == NULL) {
10,180!
UNCOV
2788
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2789
  }
2790

2791
  TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
10,180!
2792

2793
  TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
10,178!
2794

2795
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
10,179!
2796

2797
  taosLRUCacheSetStrictCapacity(pCache, false);
10,175✔
2798

2799
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
10,179✔
2800

2801
_err:
10,176✔
2802
  if (code) {
10,176!
UNCOV
2803
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2804
  }
2805

2806
  pTsdb->lruCache = pCache;
10,176✔
2807

2808
  TAOS_RETURN(code);
10,176✔
2809
}
2810

2811
void tsdbCloseCache(STsdb *pTsdb) {
10,180✔
2812
  SLRUCache *pCache = pTsdb->lruCache;
10,180✔
2813
  if (pCache) {
10,180!
2814
    taosLRUCacheEraseUnrefEntries(pCache);
10,180✔
2815

2816
    taosLRUCacheCleanup(pCache);
10,180✔
2817

2818
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
10,180✔
2819
  }
2820

2821
  tsdbCloseBCache(pTsdb);
10,180✔
2822
  tsdbClosePgCache(pTsdb);
10,180✔
2823
  tsdbCloseRocksCache(pTsdb);
10,180✔
2824
}
10,178✔
2825

UNCOV
2826
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
UNCOV
2827
  if (cacheType == 0) {  // last_row
×
UNCOV
2828
    *(uint64_t *)key = (uint64_t)uid;
×
2829
  } else {  // last
2830
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2831
  }
2832

UNCOV
2833
  *len = sizeof(uint64_t);
×
2834
}
×
2835

UNCOV
2836
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
2837
  tb_uid_t suid = 0;
×
2838

UNCOV
2839
  SMetaReader mr = {0};
×
UNCOV
2840
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
UNCOV
2841
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
UNCOV
2842
    metaReaderClear(&mr);  // table not esist
×
UNCOV
2843
    return 0;
×
2844
  }
2845

UNCOV
2846
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
UNCOV
2847
    suid = mr.me.ctbEntry.suid;
×
UNCOV
2848
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
UNCOV
2849
    suid = 0;
×
2850
  } else {
UNCOV
2851
    suid = 0;
×
2852
  }
2853

2854
  metaReaderClear(&mr);
×
2855

2856
  return suid;
×
2857
}
2858

UNCOV
2859
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
UNCOV
2860
  int32_t code = 0;
×
2861

UNCOV
2862
  if (pDelIdx) {
×
UNCOV
2863
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
2864
  }
2865

UNCOV
2866
  TAOS_RETURN(code);
×
2867
}
2868

UNCOV
2869
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
UNCOV
2870
  int32_t   code = 0;
×
UNCOV
2871
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
2872

UNCOV
2873
  for (; pDelData; pDelData = pDelData->pNext) {
×
UNCOV
2874
    if (!taosArrayPush(aDelData, pDelData)) {
×
UNCOV
2875
      TAOS_RETURN(terrno);
×
2876
    }
2877
  }
2878

UNCOV
2879
  TAOS_RETURN(code);
×
2880
}
2881

2882
static uint64_t *getUidList(SCacheRowsReader *pReader) {
23✔
2883
  if (!pReader->uidList) {
23✔
2884
    int32_t numOfTables = pReader->numOfTables;
9✔
2885

2886
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
9!
2887
    if (!pReader->uidList) {
9!
UNCOV
2888
      return NULL;
×
2889
    }
2890

2891
    for (int32_t i = 0; i < numOfTables; ++i) {
32✔
2892
      uint64_t uid = pReader->pTableList[i].uid;
23✔
2893
      pReader->uidList[i] = uid;
23✔
2894
    }
2895

2896
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
9✔
2897
  }
2898

2899
  return pReader->uidList;
23✔
2900
}
2901

2902
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
23✔
2903
                               bool isFile) {
2904
  int32_t   code = 0;
23✔
2905
  int32_t   numOfTables = pReader->numOfTables;
23✔
2906
  int64_t   suid = pReader->info.suid;
23✔
2907
  uint64_t *uidList = getUidList(pReader);
23✔
2908

2909
  if (!uidList) {
23!
2910
    TAOS_RETURN(terrno);
×
2911
  }
2912

2913
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
39!
2914
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
16✔
2915
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
16!
UNCOV
2916
      continue;
×
2917
    }
2918

2919
    if (pTombBlk->minTbid.suid > suid ||
16!
2920
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
16!
2921
      break;
2922
    }
2923

2924
    STombBlock block = {0};
16✔
2925
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
16✔
2926
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
16!
2927
    if (code != TSDB_CODE_SUCCESS) {
16!
UNCOV
2928
      TAOS_RETURN(code);
×
2929
    }
2930

2931
    uint64_t        uid = uidList[j];
16✔
2932
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
16✔
2933
    if (!pInfo) {
16!
UNCOV
2934
      tTombBlockDestroy(&block);
×
UNCOV
2935
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
2936
    }
2937

2938
    if (pInfo->pTombData == NULL) {
16✔
2939
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
2✔
2940
    }
2941

2942
    STombRecord record = {0};
16✔
2943
    bool        finished = false;
16✔
2944
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
32✔
2945
      code = tTombBlockGet(&block, k, &record);
16✔
2946
      if (code != TSDB_CODE_SUCCESS) {
16!
2947
        finished = true;
×
2948
        break;
×
2949
      }
2950

2951
      if (record.suid < suid) {
16!
UNCOV
2952
        continue;
×
2953
      }
2954
      if (record.suid > suid) {
16!
UNCOV
2955
        finished = true;
×
UNCOV
2956
        break;
×
2957
      }
2958

2959
      bool newTable = false;
16✔
2960
      if (uid < record.uid) {
16!
2961
        while (j < numOfTables && uidList[j] < record.uid) {
96!
2962
          ++j;
80✔
2963
          newTable = true;
80✔
2964
        }
2965

2966
        if (j >= numOfTables) {
16!
UNCOV
2967
          finished = true;
×
UNCOV
2968
          break;
×
2969
        }
2970

2971
        uid = uidList[j];
16✔
2972
      }
2973

2974
      if (record.uid < uid) {
16!
UNCOV
2975
        continue;
×
2976
      }
2977

2978
      if (newTable) {
16!
2979
        pInfo = getTableLoadInfo(pReader, uid);
16✔
2980
        if (!pInfo) {
16!
UNCOV
2981
          code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2982
          finished = true;
×
UNCOV
2983
          break;
×
2984
        }
2985
        if (pInfo->pTombData == NULL) {
16✔
2986
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
2✔
2987
          if (!pInfo->pTombData) {
2!
UNCOV
2988
            code = terrno;
×
UNCOV
2989
            finished = true;
×
UNCOV
2990
            break;
×
2991
          }
2992
        }
2993
      }
2994

2995
      if (record.version <= pReader->info.verRange.maxVer) {
16!
2996
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
2997
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
2998

2999
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
16✔
3000
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
32!
UNCOV
3001
          TAOS_RETURN(terrno);
×
3002
        }
3003
      }
3004
    }
3005

3006
    tTombBlockDestroy(&block);
16✔
3007

3008
    if (finished) {
16!
UNCOV
3009
      TAOS_RETURN(code);
×
3010
    }
3011
  }
3012

3013
  TAOS_RETURN(TSDB_CODE_SUCCESS);
23✔
3014
}
3015

3016
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
16✔
3017
  const TTombBlkArray *pBlkArray = NULL;
16✔
3018

3019
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
16!
3020

3021
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
16✔
3022
}
3023

3024
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
7✔
3025
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
7✔
3026
  const TTombBlkArray *pBlkArray = NULL;
7✔
3027

3028
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
7!
3029

3030
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
7✔
3031
}
3032

3033
typedef struct {
3034
  SMergeTree  mergeTree;
3035
  SMergeTree *pMergeTree;
3036
} SFSLastIter;
3037

3038
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
23✔
3039
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
3040
  int32_t code = 0;
23✔
3041
  destroySttBlockReader(pr->pLDataIterArray, NULL);
23✔
3042
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
23✔
3043
  if (pr->pLDataIterArray == NULL) return terrno;
23!
3044

3045
  SMergeTreeConf conf = {
23✔
3046
      .uid = uid,
3047
      .suid = suid,
3048
      .pTsdb = pTsdb,
3049
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
3050
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
3051
      .strictTimeRange = false,
3052
      .pSchema = pTSchema,
3053
      .pCurrentFileset = pFileSet,
3054
      .backward = 1,
3055
      .pSttFileBlockIterArray = pr->pLDataIterArray,
23✔
3056
      .pCols = aCols,
3057
      .numOfCols = nCols,
3058
      .loadTombFn = loadSttTomb,
3059
      .pReader = pr,
3060
      .idstr = pr->idstr,
23✔
3061
      .pCurRowKey = &pr->rowKey,
23✔
3062
  };
3063

3064
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
23!
3065

3066
  iter->pMergeTree = &iter->mergeTree;
23✔
3067

3068
  TAOS_RETURN(code);
23✔
3069
}
3070

3071
static int32_t lastIterClose(SFSLastIter **iter) {
2✔
3072
  int32_t code = 0;
2✔
3073

3074
  if ((*iter)->pMergeTree) {
2!
3075
    tMergeTreeClose((*iter)->pMergeTree);
2✔
3076
    (*iter)->pMergeTree = NULL;
2✔
3077
  }
3078

3079
  *iter = NULL;
2✔
3080

3081
  TAOS_RETURN(code);
2✔
3082
}
3083

3084
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
29✔
3085
  bool hasVal = false;
29✔
3086
  *ppRow = NULL;
29✔
3087

3088
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
29✔
3089
  if (code != 0) {
29!
UNCOV
3090
    return code;
×
3091
  }
3092

3093
  if (!hasVal) {
29✔
3094
    *ppRow = NULL;
19✔
3095
    TAOS_RETURN(code);
19✔
3096
  }
3097

3098
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
10✔
3099
  TAOS_RETURN(code);
10✔
3100
}
3101

3102
typedef enum SFSNEXTROWSTATES {
3103
  SFSNEXTROW_FS,
3104
  SFSNEXTROW_FILESET,
3105
  SFSNEXTROW_INDEXLIST,
3106
  SFSNEXTROW_BRINBLOCK,
3107
  SFSNEXTROW_BRINRECORD,
3108
  SFSNEXTROW_BLOCKDATA,
3109
  SFSNEXTROW_BLOCKROW,
3110
  SFSNEXTROW_NEXTSTTROW
3111
} SFSNEXTROWSTATES;
3112

3113
struct CacheNextRowIter;
3114

3115
typedef struct SFSNextRowIter {
3116
  SFSNEXTROWSTATES         state;         // [input]
3117
  SBlockIdx               *pBlockIdxExp;  // [input]
3118
  STSchema                *pTSchema;      // [input]
3119
  tb_uid_t                 suid;
3120
  tb_uid_t                 uid;
3121
  int32_t                  iFileSet;
3122
  STFileSet               *pFileSet;
3123
  TFileSetArray           *aDFileSet;
3124
  SArray                  *pIndexList;
3125
  int32_t                  iBrinIndex;
3126
  SBrinBlock               brinBlock;
3127
  SBrinBlock              *pBrinBlock;
3128
  int32_t                  iBrinRecord;
3129
  SBrinRecord              brinRecord;
3130
  SBlockData               blockData;
3131
  SBlockData              *pBlockData;
3132
  int32_t                  nRow;
3133
  int32_t                  iRow;
3134
  TSDBROW                  row;
3135
  int64_t                  lastTs;
3136
  SFSLastIter              lastIter;
3137
  SFSLastIter             *pLastIter;
3138
  int8_t                   lastEmpty;
3139
  TSDBROW                 *pLastRow;
3140
  SRow                    *pTSRow;
3141
  SRowMerger               rowMerger;
3142
  SCacheRowsReader        *pr;
3143
  struct CacheNextRowIter *pRowIter;
3144
} SFSNextRowIter;
3145

3146
static void clearLastFileSet(SFSNextRowIter *state);
3147

3148
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
83✔
3149
                                int nCols) {
3150
  int32_t         code = 0, lino = 0;
83✔
3151
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
83✔
3152
  STsdb          *pTsdb = state->pr->pTsdb;
83✔
3153

3154
  if (SFSNEXTROW_FS == state->state) {
83✔
3155
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
79✔
3156

3157
    state->state = SFSNEXTROW_FILESET;
79✔
3158
  }
3159

3160
  if (SFSNEXTROW_FILESET == state->state) {
83✔
3161
  _next_fileset:
79✔
3162
    clearLastFileSet(state);
94✔
3163

3164
    if (--state->iFileSet < 0) {
95✔
3165
      *ppRow = NULL;
72✔
3166

3167
      TAOS_RETURN(code);
72✔
3168
    } else {
3169
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
23✔
3170
    }
3171

3172
    STFileObj **pFileObj = state->pFileSet->farr;
23✔
3173
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
23!
3174
      if (state->pFileSet != state->pr->pCurFileSet) {
16!
3175
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
16✔
3176
        const char           *filesName[4] = {0};
16✔
3177
        if (pFileObj[0] != NULL) {
16!
3178
          conf.files[0].file = *pFileObj[0]->f;
16✔
3179
          conf.files[0].exist = true;
16✔
3180
          filesName[0] = pFileObj[0]->fname;
16✔
3181

3182
          conf.files[1].file = *pFileObj[1]->f;
16✔
3183
          conf.files[1].exist = true;
16✔
3184
          filesName[1] = pFileObj[1]->fname;
16✔
3185

3186
          conf.files[2].file = *pFileObj[2]->f;
16✔
3187
          conf.files[2].exist = true;
16✔
3188
          filesName[2] = pFileObj[2]->fname;
16✔
3189
        }
3190

3191
        if (pFileObj[3] != NULL) {
16!
3192
          conf.files[3].exist = true;
16✔
3193
          conf.files[3].file = *pFileObj[3]->f;
16✔
3194
          filesName[3] = pFileObj[3]->fname;
16✔
3195
        }
3196

3197
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
16!
3198

3199
        state->pr->pCurFileSet = state->pFileSet;
16✔
3200

3201
        code = loadDataTomb(state->pr, state->pr->pFileReader);
16✔
3202
        if (code != TSDB_CODE_SUCCESS) {
16!
UNCOV
3203
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3204
                    tstrerror(code));
UNCOV
3205
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3206
        }
3207

3208
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
16!
3209
      }
3210

3211
      if (!state->pIndexList) {
16!
3212
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
16✔
3213
        if (!state->pIndexList) {
16!
UNCOV
3214
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3215
        }
3216
      } else {
UNCOV
3217
        taosArrayClear(state->pIndexList);
×
3218
      }
3219

3220
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
16✔
3221

3222
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
32✔
3223
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
16✔
3224
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
16!
3225
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
16✔
3226
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
4!
UNCOV
3227
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3228
            }
3229
          }
UNCOV
3230
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
3231
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
3232
          break;
3233
        }
3234
      }
3235

3236
      int indexSize = TARRAY_SIZE(state->pIndexList);
16✔
3237
      if (indexSize <= 0) {
16✔
3238
        goto _check_stt_data;
14✔
3239
      }
3240

3241
      state->state = SFSNEXTROW_INDEXLIST;
2✔
3242
      state->iBrinIndex = 1;
2✔
3243
    }
3244

3245
  _check_stt_data:
7✔
3246
    if (state->pFileSet != state->pr->pCurFileSet) {
23✔
3247
      state->pr->pCurFileSet = state->pFileSet;
7✔
3248
    }
3249

3250
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
23!
3251
                                 state->pr, state->lastTs, aCols, nCols),
3252
                    &lino, _err);
3253

3254
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
23!
3255

3256
    if (!state->pLastRow) {
23✔
3257
      state->lastEmpty = 1;
16✔
3258

3259
      if (SFSNEXTROW_INDEXLIST != state->state) {
16✔
3260
        clearLastFileSet(state);
14✔
3261
        goto _next_fileset;
14✔
3262
      }
3263
    } else {
3264
      state->lastEmpty = 0;
7✔
3265

3266
      if (SFSNEXTROW_INDEXLIST != state->state) {
7!
3267
        state->state = SFSNEXTROW_NEXTSTTROW;
7✔
3268

3269
        *ppRow = state->pLastRow;
7✔
3270
        state->pLastRow = NULL;
7✔
3271

3272
        TAOS_RETURN(code);
7✔
3273
      }
3274
    }
3275

3276
    state->pLastIter = &state->lastIter;
2✔
3277
  }
3278

3279
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
6✔
3280
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
4!
3281

3282
    if (!state->pLastRow) {
4✔
3283
      if (state->pLastIter) {
1!
3284
        code = lastIterClose(&state->pLastIter);
×
UNCOV
3285
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3286
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3287
                    tstrerror(code));
UNCOV
3288
          TAOS_RETURN(code);
×
3289
        }
3290
      }
3291

3292
      clearLastFileSet(state);
1✔
3293
      state->state = SFSNEXTROW_FILESET;
1✔
3294
      goto _next_fileset;
1✔
3295
    } else {
3296
      *ppRow = state->pLastRow;
3✔
3297
      state->pLastRow = NULL;
3✔
3298

3299
      TAOS_RETURN(code);
3✔
3300
    }
3301
  }
3302

3303
  if (SFSNEXTROW_INDEXLIST == state->state) {
2!
3304
    SBrinBlk *pBrinBlk = NULL;
2✔
3305
  _next_brinindex:
2✔
3306
    if (--state->iBrinIndex < 0) {
2!
UNCOV
3307
      if (state->pLastRow) {
×
UNCOV
3308
        state->state = SFSNEXTROW_NEXTSTTROW;
×
UNCOV
3309
        *ppRow = state->pLastRow;
×
UNCOV
3310
        state->pLastRow = NULL;
×
UNCOV
3311
        return code;
×
3312
      }
3313

UNCOV
3314
      clearLastFileSet(state);
×
UNCOV
3315
      goto _next_fileset;
×
3316
    } else {
3317
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
2✔
3318
    }
3319

3320
    if (!state->pBrinBlock) {
2!
3321
      state->pBrinBlock = &state->brinBlock;
2✔
3322
    } else {
UNCOV
3323
      tBrinBlockClear(&state->brinBlock);
×
3324
    }
3325

3326
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
2!
3327

3328
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
2✔
3329
    state->state = SFSNEXTROW_BRINBLOCK;
2✔
3330
  }
3331

3332
  if (SFSNEXTROW_BRINBLOCK == state->state) {
2!
3333
  _next_brinrecord:
2✔
3334
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
2!
UNCOV
3335
      tBrinBlockClear(&state->brinBlock);
×
UNCOV
3336
      goto _next_brinindex;
×
3337
    }
3338

3339
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
2!
3340

3341
    SBrinRecord *pRecord = &state->brinRecord;
2✔
3342
    if (pRecord->uid != state->uid) {
2!
3343
      // TODO: goto next brin block early
UNCOV
3344
      --state->iBrinRecord;
×
UNCOV
3345
      goto _next_brinrecord;
×
3346
    }
3347

3348
    state->state = SFSNEXTROW_BRINRECORD;
2✔
3349
  }
3350

3351
  if (SFSNEXTROW_BRINRECORD == state->state) {
2!
3352
    SBrinRecord *pRecord = &state->brinRecord;
2✔
3353

3354
    if (!state->pBlockData) {
2!
3355
      state->pBlockData = &state->blockData;
2✔
3356

3357
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
2!
3358
    } else {
UNCOV
3359
      tBlockDataReset(state->pBlockData);
×
3360
    }
3361

3362
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
2!
3363
      --nCols;
2✔
3364
      ++aCols;
2✔
3365
    }
3366

3367
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
2!
3368
                                                      state->pTSchema, aCols, nCols),
3369
                    &lino, _err);
3370

3371
    state->nRow = state->blockData.nRow;
2✔
3372
    state->iRow = state->nRow - 1;
2✔
3373

3374
    state->state = SFSNEXTROW_BLOCKROW;
2✔
3375
  }
3376

3377
  if (SFSNEXTROW_BLOCKROW == state->state) {
2!
3378
    if (state->iRow < 0) {
2!
UNCOV
3379
      --state->iBrinRecord;
×
3380
      goto _next_brinrecord;
×
3381
    }
3382

3383
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
2✔
3384
    if (!state->pLastIter) {
2!
3385
      *ppRow = &state->row;
×
UNCOV
3386
      --state->iRow;
×
3387
      return code;
2✔
3388
    }
3389

3390
    if (!state->pLastRow) {
2!
3391
      // get next row from fslast and process with fs row, --state->Row if select fs row
3392
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
2!
3393
    }
3394

3395
    if (!state->pLastRow) {
2!
3396
      if (state->pLastIter) {
2!
3397
        code = lastIterClose(&state->pLastIter);
2✔
3398
        if (code != TSDB_CODE_SUCCESS) {
2!
UNCOV
3399
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3400
                    tstrerror(code));
UNCOV
3401
          TAOS_RETURN(code);
×
3402
        }
3403
      }
3404

3405
      *ppRow = &state->row;
2✔
3406
      --state->iRow;
2✔
3407
      return code;
2✔
3408
    }
3409

3410
    // process state->pLastRow & state->row
UNCOV
3411
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
UNCOV
3412
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
UNCOV
3413
    if (lastRowTs > rowTs) {
×
UNCOV
3414
      *ppRow = state->pLastRow;
×
3415
      state->pLastRow = NULL;
×
3416

3417
      TAOS_RETURN(code);
×
UNCOV
3418
    } else if (lastRowTs < rowTs) {
×
UNCOV
3419
      *ppRow = &state->row;
×
UNCOV
3420
      --state->iRow;
×
3421

UNCOV
3422
      TAOS_RETURN(code);
×
3423
    } else {
3424
      // TODO: merge rows and *ppRow = mergedRow
UNCOV
3425
      SRowMerger *pMerger = &state->rowMerger;
×
UNCOV
3426
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
UNCOV
3427
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3428
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3429
                  tstrerror(code));
UNCOV
3430
        TAOS_RETURN(code);
×
3431
      }
3432

UNCOV
3433
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
UNCOV
3434
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
3435

UNCOV
3436
      if (state->pTSRow) {
×
UNCOV
3437
        taosMemoryFree(state->pTSRow);
×
UNCOV
3438
        state->pTSRow = NULL;
×
3439
      }
3440

UNCOV
3441
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
3442

UNCOV
3443
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
UNCOV
3444
      *ppRow = &state->row;
×
UNCOV
3445
      --state->iRow;
×
3446

UNCOV
3447
      tsdbRowMergerClear(pMerger);
×
3448

UNCOV
3449
      TAOS_RETURN(code);
×
3450
    }
3451
  }
3452

UNCOV
3453
_err:
×
UNCOV
3454
  clearLastFileSet(state);
×
3455

UNCOV
3456
  *ppRow = NULL;
×
3457

UNCOV
3458
  if (code) {
×
UNCOV
3459
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3460
              tstrerror(code));
3461
  }
3462

UNCOV
3463
  TAOS_RETURN(code);
×
3464
}
3465

3466
typedef struct CacheNextRowIter {
3467
  SArray           *pMemDelData;
3468
  SArray           *pSkyline;
3469
  int64_t           iSkyline;
3470
  SBlockIdx         idx;
3471
  SMemNextRowIter   memState;
3472
  SMemNextRowIter   imemState;
3473
  SFSNextRowIter    fsState;
3474
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
3475
  TsdbNextRowState  input[3];
3476
  SCacheRowsReader *pr;
3477
  STsdb            *pTsdb;
3478
} CacheNextRowIter;
3479

3480
int32_t clearNextRowFromFS(void *iter) {
81✔
3481
  int32_t code = 0;
81✔
3482

3483
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
81✔
3484
  if (!state) {
81!
3485
    TAOS_RETURN(code);
×
3486
  }
3487

3488
  if (state->pLastIter) {
81!
UNCOV
3489
    code = lastIterClose(&state->pLastIter);
×
UNCOV
3490
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3491
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3492
      TAOS_RETURN(code);
×
3493
    }
3494
  }
3495

3496
  if (state->pBlockData) {
81✔
3497
    tBlockDataDestroy(state->pBlockData);
2✔
3498
    state->pBlockData = NULL;
2✔
3499
  }
3500

3501
  if (state->pBrinBlock) {
81✔
3502
    tBrinBlockDestroy(state->pBrinBlock);
2✔
3503
    state->pBrinBlock = NULL;
2✔
3504
  }
3505

3506
  if (state->pIndexList) {
81✔
3507
    taosArrayDestroy(state->pIndexList);
16✔
3508
    state->pIndexList = NULL;
16✔
3509
  }
3510

3511
  if (state->pTSRow) {
81!
UNCOV
3512
    taosMemoryFree(state->pTSRow);
×
3513
    state->pTSRow = NULL;
×
3514
  }
3515

3516
  if (state->pRowIter->pSkyline) {
81✔
3517
    taosArrayDestroy(state->pRowIter->pSkyline);
73✔
3518
    state->pRowIter->pSkyline = NULL;
73✔
3519
  }
3520

3521
  TAOS_RETURN(code);
81✔
3522
}
3523

3524
static void clearLastFileSet(SFSNextRowIter *state) {
109✔
3525
  if (state->pLastIter) {
109!
UNCOV
3526
    int code = lastIterClose(&state->pLastIter);
×
UNCOV
3527
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3528
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3529
      return;
×
3530
    }
3531
  }
3532

3533
  if (state->pBlockData) {
109!
UNCOV
3534
    tBlockDataDestroy(state->pBlockData);
×
UNCOV
3535
    state->pBlockData = NULL;
×
3536
  }
3537

3538
  if (state->pr->pFileReader) {
109✔
3539
    tsdbDataFileReaderClose(&state->pr->pFileReader);
16✔
3540
    state->pr->pFileReader = NULL;
16✔
3541

3542
    state->pr->pCurFileSet = NULL;
16✔
3543
  }
3544

3545
  if (state->pTSRow) {
109!
UNCOV
3546
    taosMemoryFree(state->pTSRow);
×
3547
    state->pTSRow = NULL;
×
3548
  }
3549

3550
  if (state->pRowIter->pSkyline) {
109✔
3551
    taosArrayDestroy(state->pRowIter->pSkyline);
1✔
3552
    state->pRowIter->pSkyline = NULL;
1✔
3553

3554
    void   *pe = NULL;
1✔
3555
    int32_t iter = 0;
1✔
3556
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
2✔
3557
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
1✔
3558
      taosArrayDestroy(pInfo->pTombData);
1✔
3559
      pInfo->pTombData = NULL;
1✔
3560
    }
3561
  }
3562
}
3563

3564
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
81✔
3565
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
3566
                               SCacheRowsReader *pr) {
3567
  int32_t code = 0, lino = 0;
81✔
3568

3569
  STbData *pMem = NULL;
81✔
3570
  if (pReadSnap->pMem) {
81!
3571
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
81✔
3572
  }
3573

3574
  STbData *pIMem = NULL;
80✔
3575
  if (pReadSnap->pIMem) {
80!
UNCOV
3576
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
3577
  }
3578

3579
  pIter->pTsdb = pTsdb;
80✔
3580

3581
  pIter->pMemDelData = NULL;
80✔
3582

3583
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
80!
3584

3585
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
81✔
3586

3587
  pIter->fsState.pRowIter = pIter;
81✔
3588
  pIter->fsState.state = SFSNEXTROW_FS;
81✔
3589
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
81✔
3590
  pIter->fsState.pBlockIdxExp = &pIter->idx;
81✔
3591
  pIter->fsState.pTSchema = pTSchema;
81✔
3592
  pIter->fsState.suid = suid;
81✔
3593
  pIter->fsState.uid = uid;
81✔
3594
  pIter->fsState.lastTs = lastTs;
81✔
3595
  pIter->fsState.pr = pr;
81✔
3596

3597
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
81✔
3598
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
81✔
3599
  pIter->input[2] =
81✔
3600
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
81✔
3601

3602
  if (pMem) {
81✔
3603
    pIter->memState.pMem = pMem;
65✔
3604
    pIter->memState.state = SMEMNEXTROW_ENTER;
65✔
3605
    pIter->memState.lastTs = lastTs;
65✔
3606
    pIter->input[0].stop = false;
65✔
3607
    pIter->input[0].next = true;
65✔
3608
  }
3609

3610
  if (pIMem) {
81!
UNCOV
3611
    pIter->imemState.pMem = pIMem;
×
UNCOV
3612
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
3613
    pIter->imemState.lastTs = lastTs;
×
UNCOV
3614
    pIter->input[1].stop = false;
×
UNCOV
3615
    pIter->input[1].next = true;
×
3616
  }
3617

3618
  pIter->pr = pr;
81✔
3619

3620
_err:
81✔
3621
  TAOS_RETURN(code);
81✔
3622
}
3623

3624
static void nextRowIterClose(CacheNextRowIter *pIter) {
81✔
3625
  for (int i = 0; i < 3; ++i) {
324✔
3626
    if (pIter->input[i].nextRowClearFn) {
243✔
3627
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
81✔
3628
    }
3629
  }
3630

3631
  if (pIter->pSkyline) {
81!
3632
    taosArrayDestroy(pIter->pSkyline);
×
3633
  }
3634

3635
  if (pIter->pMemDelData) {
81!
3636
    taosArrayDestroy(pIter->pMemDelData);
81✔
3637
  }
3638
}
81✔
3639

3640
// iterate next row non deleted backward ts, version (from high to low)
3641
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
97✔
3642
                              int16_t *aCols, int nCols) {
3643
  int32_t code = 0, lino = 0;
97✔
3644

3645
  for (;;) {
1✔
3646
    for (int i = 0; i < 3; ++i) {
387✔
3647
      if (pIter->input[i].next && !pIter->input[i].stop) {
290!
3648
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
161!
3649
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3650
                        &lino, _err);
3651

3652
        if (pIter->input[i].pRow == NULL) {
160✔
3653
          pIter->input[i].stop = true;
79✔
3654
          pIter->input[i].next = false;
79✔
3655
        }
3656
      }
3657
    }
3658

3659
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
97!
3660
      *ppRow = NULL;
15✔
3661
      *pIgnoreEarlierTs =
15✔
3662
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
15!
3663

3664
      TAOS_RETURN(code);
97✔
3665
    }
3666

3667
    // select maxpoint(s) from mem, imem, fs and last
3668
    TSDBROW *max[4] = {0};
82✔
3669
    int      iMax[4] = {-1, -1, -1, -1};
82✔
3670
    int      nMax = 0;
82✔
3671
    SRowKey  maxKey = {.ts = TSKEY_MIN};
82✔
3672

3673
    for (int i = 0; i < 3; ++i) {
331✔
3674
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
249!
3675
        STsdbRowKey tsdbRowKey = {0};
84✔
3676
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
84✔
3677

3678
        // merging & deduplicating on client side
3679
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
84✔
3680
        if (c <= 0) {
84!
3681
          if (c < 0) {
84!
3682
            nMax = 0;
84✔
3683
            maxKey = tsdbRowKey.key;
84✔
3684
          }
3685

3686
          iMax[nMax] = i;
84✔
3687
          max[nMax++] = pIter->input[i].pRow;
84✔
3688
        }
3689
        pIter->input[i].next = false;
84✔
3690
      }
3691
    }
3692

3693
    // delete detection
3694
    TSDBROW *merge[4] = {0};
82✔
3695
    int      iMerge[4] = {-1, -1, -1, -1};
82✔
3696
    int      nMerge = 0;
82✔
3697
    for (int i = 0; i < nMax; ++i) {
165✔
3698
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
83✔
3699

3700
      if (!pIter->pSkyline) {
83✔
3701
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
74✔
3702
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
74!
3703

3704
        uint64_t        uid = pIter->idx.uid;
74✔
3705
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
74✔
3706
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
74!
3707

3708
        if (pInfo->pTombData == NULL) {
74✔
3709
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
70✔
3710
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
70!
3711
        }
3712

3713
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
74!
UNCOV
3714
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3715
        }
3716

3717
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
74✔
3718
        if (delSize > 0) {
74✔
3719
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
4✔
3720
          TAOS_CHECK_GOTO(code, &lino, _err);
4!
3721
        }
3722
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
74✔
3723
      }
3724

3725
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
83✔
3726
      if (!deleted) {
83✔
3727
        iMerge[nMerge] = iMax[i];
82✔
3728
        merge[nMerge++] = max[i];
82✔
3729
      }
3730

3731
      pIter->input[iMax[i]].next = deleted;
83✔
3732
    }
3733

3734
    if (nMerge > 0) {
82✔
3735
      pIter->input[iMerge[0]].next = true;
81✔
3736

3737
      *ppRow = merge[0];
81✔
3738

3739
      TAOS_RETURN(code);
81✔
3740
    }
3741
  }
3742

UNCOV
3743
_err:
×
UNCOV
3744
  if (code) {
×
UNCOV
3745
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3746
  }
3747

UNCOV
3748
  TAOS_RETURN(code);
×
3749
}
3750

3751
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
81✔
3752
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
81✔
3753
  if (NULL == pColArray) {
81!
UNCOV
3754
    TAOS_RETURN(terrno);
×
3755
  }
3756

3757
  for (int32_t i = 0; i < nCols; ++i) {
336✔
3758
    int16_t  slotId = slotIds[i];
254✔
3759
    SLastCol col = {.rowKey.ts = 0,
254✔
3760
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
254✔
3761
    if (!taosArrayPush(pColArray, &col)) {
255!
UNCOV
3762
      TAOS_RETURN(terrno);
×
3763
    }
3764
  }
3765
  *ppColArray = pColArray;
82✔
3766

3767
  TAOS_RETURN(TSDB_CODE_SUCCESS);
82✔
3768
}
3769

3770
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
60✔
3771
                            int nCols, int16_t *slotIds) {
3772
  int32_t   code = 0, lino = 0;
60✔
3773
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
60✔
3774
  int16_t   nLastCol = nCols;
60✔
3775
  int16_t   noneCol = 0;
60✔
3776
  bool      setNoneCol = false;
60✔
3777
  bool      hasRow = false;
60✔
3778
  bool      ignoreEarlierTs = false;
60✔
3779
  SArray   *pColArray = NULL;
60✔
3780
  SColVal  *pColVal = &(SColVal){0};
60✔
3781

3782
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
60!
3783

3784
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
60✔
3785
  if (NULL == aColArray) {
60!
UNCOV
3786
    taosArrayDestroy(pColArray);
×
3787

UNCOV
3788
    TAOS_RETURN(terrno);
×
3789
  }
3790

3791
  for (int i = 0; i < nCols; ++i) {
228✔
3792
    if (!taosArrayPush(aColArray, &aCols[i])) {
336!
UNCOV
3793
      taosArrayDestroy(pColArray);
×
3794

UNCOV
3795
      TAOS_RETURN(terrno);
×
3796
    }
3797
  }
3798

3799
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
60✔
3800

3801
  // inverse iterator
3802
  CacheNextRowIter iter = {0};
60✔
3803
  code =
3804
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
60✔
3805
  TAOS_CHECK_GOTO(code, &lino, _err);
60!
3806

3807
  do {
3808
    TSDBROW *pRow = NULL;
76✔
3809
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
76✔
3810

3811
    if (!pRow) {
76✔
3812
      break;
54✔
3813
    }
3814

3815
    hasRow = true;
65✔
3816

3817
    int32_t sversion = TSDBROW_SVERSION(pRow);
65✔
3818
    if (sversion != -1) {
65✔
3819
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
58!
3820

3821
      pTSchema = pr->pCurrSchema;
58✔
3822
    }
3823
    // int16_t nCol = pTSchema->numOfCols;
3824

3825
    STsdbRowKey rowKey = {0};
65✔
3826
    tsdbRowGetKey(pRow, &rowKey);
65✔
3827

3828
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
65✔
3829
      lastRowKey = rowKey;
56✔
3830

3831
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
210✔
3832
        if (iCol >= nLastCol) {
154!
UNCOV
3833
          break;
×
3834
        }
3835
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
154✔
3836
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
154!
UNCOV
3837
          if (!setNoneCol) {
×
UNCOV
3838
            noneCol = iCol;
×
UNCOV
3839
            setNoneCol = true;
×
3840
          }
3841
          continue;
96✔
3842
        }
3843
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
154✔
3844
          continue;
40✔
3845
        }
3846
        if (slotIds[iCol] == 0) {
114✔
3847
          STColumn *pTColumn = &pTSchema->columns[0];
56✔
3848
          *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
56✔
3849

3850
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
56✔
3851
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
56!
3852

3853
          taosArraySet(pColArray, 0, &colTmp);
56✔
3854
          continue;
56✔
3855
        }
3856
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
58✔
3857

3858
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
58✔
3859
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
58!
3860

3861
        if (!COL_VAL_IS_VALUE(pColVal)) {
58✔
3862
          if (!setNoneCol) {
22✔
3863
            noneCol = iCol;
13✔
3864
            setNoneCol = true;
13✔
3865
          }
3866
        } else {
3867
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
36✔
3868
          if (aColIndex >= 0) {
36!
3869
            taosArrayRemove(aColArray, aColIndex);
36✔
3870
          }
3871
        }
3872
      }
3873
      if (!setNoneCol) {
56✔
3874
        // done, goto return pColArray
3875
        break;
43✔
3876
      } else {
3877
        continue;
13✔
3878
      }
3879
    }
3880

3881
    // merge into pColArray
3882
    setNoneCol = false;
9✔
3883
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
30✔
3884
      if (iCol >= nLastCol) {
21!
UNCOV
3885
        break;
×
3886
      }
3887
      // high version's column value
3888
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
21!
UNCOV
3889
        continue;
×
3890
      }
3891

3892
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
21✔
3893
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
21!
UNCOV
3894
        continue;
×
3895
      }
3896
      SColVal *tColVal = &lastColVal->colVal;
21✔
3897
      if (COL_VAL_IS_VALUE(tColVal)) continue;
21✔
3898

3899
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
15✔
3900
      if (COL_VAL_IS_VALUE(pColVal)) {
15✔
3901
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
12✔
3902
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
12!
3903

3904
        tsdbCacheFreeSLastColItem(lastColVal);
12✔
3905
        taosArraySet(pColArray, iCol, &lastCol);
12✔
3906
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
12✔
3907
        if (aColIndex >= 0) {
12!
3908
          taosArrayRemove(aColArray, aColIndex);
12✔
3909
        }
3910
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
3!
3911
        noneCol = iCol;
3✔
3912
        setNoneCol = true;
3✔
3913
      }
3914
    }
3915
  } while (setNoneCol);
22✔
3916

3917
  if (!hasRow) {
60✔
3918
    if (ignoreEarlierTs) {
4!
UNCOV
3919
      taosArrayDestroy(pColArray);
×
UNCOV
3920
      pColArray = NULL;
×
3921
    } else {
3922
      taosArrayClear(pColArray);
4✔
3923
    }
3924
  }
3925
  *ppLastArray = pColArray;
60✔
3926

3927
  nextRowIterClose(&iter);
60✔
3928
  taosArrayDestroy(aColArray);
60✔
3929

3930
  TAOS_RETURN(code);
60✔
3931

UNCOV
3932
_err:
×
UNCOV
3933
  nextRowIterClose(&iter);
×
3934
  // taosMemoryFreeClear(pTSchema);
UNCOV
3935
  *ppLastArray = NULL;
×
UNCOV
3936
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
UNCOV
3937
  taosArrayDestroy(aColArray);
×
3938

UNCOV
3939
  if (code) {
×
UNCOV
3940
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3941
              tstrerror(code));
3942
  }
3943

UNCOV
3944
  TAOS_RETURN(code);
×
3945
}
3946

3947
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
21✔
3948
                               int nCols, int16_t *slotIds) {
3949
  int32_t   code = 0, lino = 0;
21✔
3950
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
21✔
3951
  int16_t   nLastCol = nCols;
21✔
3952
  int16_t   noneCol = 0;
21✔
3953
  bool      setNoneCol = false;
21✔
3954
  bool      hasRow = false;
21✔
3955
  bool      ignoreEarlierTs = false;
21✔
3956
  SArray   *pColArray = NULL;
21✔
3957
  SColVal  *pColVal = &(SColVal){0};
21✔
3958

3959
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
21!
3960

3961
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
21✔
3962
  if (NULL == aColArray) {
21!
UNCOV
3963
    taosArrayDestroy(pColArray);
×
3964

UNCOV
3965
    TAOS_RETURN(terrno);
×
3966
  }
3967

3968
  for (int i = 0; i < nCols; ++i) {
108✔
3969
    if (!taosArrayPush(aColArray, &aCols[i])) {
174!
UNCOV
3970
      taosArrayDestroy(pColArray);
×
3971

UNCOV
3972
      TAOS_RETURN(terrno);
×
3973
    }
3974
  }
3975

3976
  // inverse iterator
3977
  CacheNextRowIter iter = {0};
21✔
3978
  code =
3979
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
21✔
3980
  TAOS_CHECK_GOTO(code, &lino, _err);
21!
3981

3982
  do {
3983
    TSDBROW *pRow = NULL;
21✔
3984
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
21✔
3985

3986
    if (!pRow) {
21✔
3987
      break;
4✔
3988
    }
3989

3990
    hasRow = true;
17✔
3991

3992
    int32_t sversion = TSDBROW_SVERSION(pRow);
17✔
3993
    if (sversion != -1) {
17✔
3994
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
13!
3995

3996
      pTSchema = pr->pCurrSchema;
13✔
3997
    }
3998
    // int16_t nCol = pTSchema->numOfCols;
3999

4000
    STsdbRowKey rowKey = {0};
17✔
4001
    tsdbRowGetKey(pRow, &rowKey);
17✔
4002

4003
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
92✔
4004
      if (iCol >= nLastCol) {
75!
UNCOV
4005
        break;
×
4006
      }
4007
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
75✔
4008
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
75!
4009
        continue;
17✔
4010
      }
4011
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
75!
UNCOV
4012
        continue;
×
4013
      }
4014
      if (slotIds[iCol] == 0) {
75✔
4015
        STColumn *pTColumn = &pTSchema->columns[0];
17✔
4016
        *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
17✔
4017

4018
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
17✔
4019
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
17!
4020

4021
        taosArraySet(pColArray, 0, &colTmp);
17✔
4022
        continue;
17✔
4023
      }
4024
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
58✔
4025

4026
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
58✔
4027
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
58!
4028

4029
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
58✔
4030
      if (aColIndex >= 0) {
58!
4031
        taosArrayRemove(aColArray, aColIndex);
58✔
4032
      }
4033
    }
4034

4035
    break;
17✔
4036
  } while (1);
4037

4038
  if (!hasRow) {
21✔
4039
    if (ignoreEarlierTs) {
4!
UNCOV
4040
      taosArrayDestroy(pColArray);
×
UNCOV
4041
      pColArray = NULL;
×
4042
    } else {
4043
      taosArrayClear(pColArray);
4✔
4044
    }
4045
  }
4046
  *ppLastArray = pColArray;
21✔
4047

4048
  nextRowIterClose(&iter);
21✔
4049
  taosArrayDestroy(aColArray);
21✔
4050

4051
  TAOS_RETURN(code);
21✔
4052

UNCOV
4053
_err:
×
UNCOV
4054
  nextRowIterClose(&iter);
×
4055

UNCOV
4056
  *ppLastArray = NULL;
×
UNCOV
4057
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
UNCOV
4058
  taosArrayDestroy(aColArray);
×
4059

UNCOV
4060
  if (code) {
×
UNCOV
4061
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4062
              tstrerror(code));
4063
  }
4064

UNCOV
4065
  TAOS_RETURN(code);
×
4066
}
4067

UNCOV
4068
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
×
4069

4070
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
2✔
4071
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
2✔
4072
}
2✔
4073

4074
#ifdef BUILD_NO_CALL
4075
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
4076
#endif
4077

4078
size_t tsdbCacheGetUsage(SVnode *pVnode) {
220,145✔
4079
  size_t usage = 0;
220,145✔
4080
  if (pVnode->pTsdb != NULL) {
220,145!
4081
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
220,145✔
4082
  }
4083

4084
  return usage;
220,145✔
4085
}
4086

4087
int32_t tsdbCacheGetElems(SVnode *pVnode) {
220,145✔
4088
  int32_t elems = 0;
220,145✔
4089
  if (pVnode->pTsdb != NULL) {
220,145!
4090
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
220,145✔
4091
  }
4092

4093
  return elems;
220,145✔
4094
}
4095

4096
// block cache
UNCOV
4097
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
×
4098
  struct {
4099
    int32_t fid;
4100
    int64_t commitID;
4101
    int64_t blkno;
UNCOV
4102
  } bKey = {0};
×
4103

UNCOV
4104
  bKey.fid = fid;
×
UNCOV
4105
  bKey.commitID = commitID;
×
UNCOV
4106
  bKey.blkno = blkno;
×
4107

UNCOV
4108
  *len = sizeof(bKey);
×
UNCOV
4109
  memcpy(key, &bKey, *len);
×
UNCOV
4110
}
×
4111

UNCOV
4112
static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
×
UNCOV
4113
  int32_t code = 0;
×
4114

UNCOV
4115
  int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
×
4116

UNCOV
4117
  TAOS_CHECK_RETURN(tcsGetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock));
×
4118

UNCOV
4119
  tsdbTrace("block:%p load from s3", *ppBlock);
×
4120

UNCOV
4121
_exit:
×
UNCOV
4122
  return code;
×
4123
}
4124

UNCOV
4125
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
×
4126
  (void)ud;
UNCOV
4127
  uint8_t *pBlock = (uint8_t *)value;
×
4128

UNCOV
4129
  taosMemoryFree(pBlock);
×
UNCOV
4130
}
×
4131

UNCOV
4132
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
UNCOV
4133
  int32_t code = 0;
×
UNCOV
4134
  char    key[128] = {0};
×
UNCOV
4135
  int     keyLen = 0;
×
4136

UNCOV
4137
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
UNCOV
4138
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
UNCOV
4139
  if (!h) {
×
UNCOV
4140
    STsdb *pTsdb = pFD->pTsdb;
×
UNCOV
4141
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
4142

UNCOV
4143
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
UNCOV
4144
    if (!h) {
×
UNCOV
4145
      uint8_t *pBlock = NULL;
×
UNCOV
4146
      code = tsdbCacheLoadBlockS3(pFD, &pBlock);
×
4147
      //  if table's empty or error, return code of -1
UNCOV
4148
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
UNCOV
4149
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4150

UNCOV
4151
        *handle = NULL;
×
UNCOV
4152
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
UNCOV
4153
          code = TSDB_CODE_OUT_OF_MEMORY;
×
4154
        }
4155

UNCOV
4156
        TAOS_RETURN(code);
×
4157
      }
4158

UNCOV
4159
      size_t              charge = tsS3BlockSize * pFD->szPage;
×
UNCOV
4160
      _taos_lru_deleter_t deleter = deleteBCache;
×
4161
      LRUStatus           status =
UNCOV
4162
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
4163
      if (status != TAOS_LRU_STATUS_OK) {
4164
        // code = -1;
4165
      }
4166
    }
4167

UNCOV
4168
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4169
  }
4170

UNCOV
4171
  *handle = h;
×
4172

UNCOV
4173
  TAOS_RETURN(code);
×
4174
}
4175

UNCOV
4176
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
×
UNCOV
4177
  int32_t code = 0;
×
UNCOV
4178
  char    key[128] = {0};
×
UNCOV
4179
  int     keyLen = 0;
×
4180

UNCOV
4181
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
UNCOV
4182
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
×
4183

UNCOV
4184
  return code;
×
4185
}
4186

UNCOV
4187
void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
×
UNCOV
4188
  char       key[128] = {0};
×
UNCOV
4189
  int        keyLen = 0;
×
UNCOV
4190
  LRUHandle *handle = NULL;
×
4191

UNCOV
4192
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
UNCOV
4193
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
×
UNCOV
4194
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
×
UNCOV
4195
  if (!handle) {
×
UNCOV
4196
    size_t              charge = pFD->szPage;
×
UNCOV
4197
    _taos_lru_deleter_t deleter = deleteBCache;
×
UNCOV
4198
    uint8_t            *pPg = taosMemoryMalloc(charge);
×
UNCOV
4199
    if (!pPg) {
×
UNCOV
4200
      return;  // ignore error with s3 cache and leave error untouched
×
4201
    }
UNCOV
4202
    memcpy(pPg, pPage, charge);
×
4203

4204
    LRUStatus status =
UNCOV
4205
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
×
4206
    if (status != TAOS_LRU_STATUS_OK) {
4207
      // ignore cache updating if not ok
4208
      // code = TSDB_CODE_OUT_OF_MEMORY;
4209
    }
4210
  }
UNCOV
4211
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4212

UNCOV
4213
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
4214
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc