• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3565

25 Dec 2024 05:34AM UTC coverage: 51.098% (-11.1%) from 62.21%
#3565

push

travis-ci

web-flow
Merge pull request #29316 from taosdata/enh/3.0/TD-33266

enh(ut):Add wal & config UT.

111558 of 284773 branches covered (39.17%)

Branch coverage included in aggregate %.

1 of 2 new or added lines in 2 files covered. (50.0%)

39015 existing lines in 102 files now uncovered.

177882 of 281666 relevant lines covered (63.15%)

15090998.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.15
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tcs.h"
17
#include "tsdb.h"
18
#include "tsdbDataFileRW.h"
19
#include "tsdbIter.h"
20
#include "tsdbReadUtil.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

UNCOV
25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
×
UNCOV
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
×
UNCOV
27
    tsdbTrace(" release lru cache failed");
×
28
  }
UNCOV
29
}
×
30

31
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
18✔
32
  int32_t    code = 0, lino = 0;
18✔
33
  int32_t    szPage = pTsdb->pVnode->config.tsdbPageSize;
18✔
34
  int64_t    szBlock = tsS3BlockSize <= 1024 ? 1024 : tsS3BlockSize;
18✔
35
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szBlock * szPage, 0, .5);
18✔
36
  if (pCache == NULL) {
18!
37
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
38
  }
39

40
  taosLRUCacheSetStrictCapacity(pCache, false);
18✔
41

42
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
18✔
43

44
  pTsdb->bCache = pCache;
18✔
45

46
_err:
18✔
47
  if (code) {
18!
48
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
49
              tstrerror(code));
50
  }
51

52
  TAOS_RETURN(code);
18✔
53
}
54

55
static void tsdbCloseBCache(STsdb *pTsdb) {
18✔
56
  SLRUCache *pCache = pTsdb->bCache;
18✔
57
  if (pCache) {
18!
58
    int32_t elems = taosLRUCacheGetElems(pCache);
18✔
59
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
18!
60
    taosLRUCacheEraseUnrefEntries(pCache);
18✔
61
    elems = taosLRUCacheGetElems(pCache);
18✔
62
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
18!
63

64
    taosLRUCacheCleanup(pCache);
18✔
65

66
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
18✔
67
  }
68
}
18✔
69

70
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
18✔
71
  int32_t code = 0, lino = 0;
18✔
72
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
18✔
73

74
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3PageCacheSize * szPage, 0, .5);
18✔
75
  if (pCache == NULL) {
18!
76
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
77
  }
78

79
  taosLRUCacheSetStrictCapacity(pCache, false);
18✔
80

81
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
18✔
82

83
  pTsdb->pgCache = pCache;
18✔
84

85
_err:
18✔
86
  if (code) {
18!
87
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
88
  }
89

90
  TAOS_RETURN(code);
18✔
91
}
92

93
static void tsdbClosePgCache(STsdb *pTsdb) {
18✔
94
  SLRUCache *pCache = pTsdb->pgCache;
18✔
95
  if (pCache) {
18!
96
    int32_t elems = taosLRUCacheGetElems(pCache);
18✔
97
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
18!
98
    taosLRUCacheEraseUnrefEntries(pCache);
18✔
99
    elems = taosLRUCacheGetElems(pCache);
18✔
100
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
18!
101

102
    taosLRUCacheCleanup(pCache);
18✔
103

104
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
18✔
105
  }
106
}
18✔
107

108
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
109

110
enum {
111
  LFLAG_LAST_ROW = 0,
112
  LFLAG_LAST = 1,
113
};
114

115
typedef struct {
116
  tb_uid_t uid;
117
  int16_t  cid;
118
  int8_t   lflag;
119
} SLastKey;
120

121
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
122
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
123

124
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
18✔
125
  SVnode *pVnode = pTsdb->pVnode;
18✔
126
  vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
18✔
127

128
  int32_t offset = strlen(path);
18✔
129
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP);
18✔
130
}
18✔
131

132
static const char *myCmpName(void *state) {
90✔
133
  (void)state;
134
  return "myCmp";
90✔
135
}
136

137
static void myCmpDestroy(void *state) { (void)state; }
18✔
138

UNCOV
139
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
×
140
  (void)state;
141
  (void)alen;
142
  (void)blen;
UNCOV
143
  SLastKey *lhs = (SLastKey *)a;
×
UNCOV
144
  SLastKey *rhs = (SLastKey *)b;
×
145

UNCOV
146
  if (lhs->uid < rhs->uid) {
×
UNCOV
147
    return -1;
×
UNCOV
148
  } else if (lhs->uid > rhs->uid) {
×
UNCOV
149
    return 1;
×
150
  }
151

UNCOV
152
  if (lhs->cid < rhs->cid) {
×
UNCOV
153
    return -1;
×
UNCOV
154
  } else if (lhs->cid > rhs->cid) {
×
UNCOV
155
    return 1;
×
156
  }
157

UNCOV
158
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
×
UNCOV
159
    return -1;
×
UNCOV
160
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
×
UNCOV
161
    return 1;
×
162
  }
163

UNCOV
164
  return 0;
×
165
}
166

167
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
18✔
168
  int32_t code = 0, lino = 0;
18✔
169

170
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
18✔
171
  if (NULL == cmp) {
18!
172
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
173
  }
174

175
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
18✔
176
  pTsdb->rCache.tableoptions = tableoptions;
18✔
177

178
  rocksdb_options_t *options = rocksdb_options_create();
18✔
179
  if (NULL == options) {
18!
180
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
181
  }
182

183
  rocksdb_options_set_create_if_missing(options, 1);
18✔
184
  rocksdb_options_set_comparator(options, cmp);
18✔
185
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
18✔
186
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
18✔
187
  // rocksdb_options_set_inplace_update_support(options, 1);
188
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
189

190
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
18✔
191
  if (NULL == writeoptions) {
18!
192
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
193
  }
194
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
18✔
195

196
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
18✔
197
  if (NULL == readoptions) {
18!
198
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
199
  }
200

201
  char *err = NULL;
18✔
202
  char  cachePath[TSDB_FILENAME_LEN] = {0};
18✔
203
  tsdbGetRocksPath(pTsdb, cachePath);
18✔
204

205
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
18✔
206
  if (NULL == db) {
18!
207
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
208
    rocksdb_free(err);
×
209

210
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
211
  }
212

213
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
18✔
214
  if (NULL == flushoptions) {
18!
215
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
216
  }
217

218
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
18✔
219

220
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
18!
221

222
  pTsdb->rCache.writebatch = writebatch;
18✔
223
  pTsdb->rCache.my_comparator = cmp;
18✔
224
  pTsdb->rCache.options = options;
18✔
225
  pTsdb->rCache.writeoptions = writeoptions;
18✔
226
  pTsdb->rCache.readoptions = readoptions;
18✔
227
  pTsdb->rCache.flushoptions = flushoptions;
18✔
228
  pTsdb->rCache.db = db;
18✔
229
  pTsdb->rCache.sver = -1;
18✔
230
  pTsdb->rCache.suid = -1;
18✔
231
  pTsdb->rCache.uid = -1;
18✔
232
  pTsdb->rCache.pTSchema = NULL;
18✔
233
  pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
18✔
234
  if (!pTsdb->rCache.ctxArray) {
18!
235
    TAOS_CHECK_GOTO(terrno, &lino, _err7);
×
236
  }
237

238
  TAOS_RETURN(code);
18✔
239

240
_err7:
×
241
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
×
242
_err6:
×
243
  rocksdb_writebatch_destroy(writebatch);
×
244
_err5:
×
245
  rocksdb_close(pTsdb->rCache.db);
×
246
_err4:
×
247
  rocksdb_readoptions_destroy(readoptions);
×
248
_err3:
×
249
  rocksdb_writeoptions_destroy(writeoptions);
×
250
_err2:
×
251
  rocksdb_options_destroy(options);
×
252
  rocksdb_block_based_options_destroy(tableoptions);
×
253
_err:
×
254
  rocksdb_comparator_destroy(cmp);
×
255

256
  TAOS_RETURN(code);
×
257
}
258

259
static void tsdbCloseRocksCache(STsdb *pTsdb) {
18✔
260
  rocksdb_close(pTsdb->rCache.db);
18✔
261
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
18✔
262
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
18✔
263
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
18✔
264
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
18✔
265
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
18✔
266
  rocksdb_options_destroy(pTsdb->rCache.options);
18✔
267
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
18✔
268
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
18✔
269
  taosMemoryFree(pTsdb->rCache.pTSchema);
18!
270
  taosArrayDestroy(pTsdb->rCache.ctxArray);
18✔
271
}
18✔
272

273
static void rocksMayWrite(STsdb *pTsdb, bool force) {
8✔
274
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
8✔
275

276
  int count = rocksdb_writebatch_count(wb);
8✔
277
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
8!
UNCOV
278
    char *err = NULL;
×
279

UNCOV
280
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
×
UNCOV
281
    if (NULL != err) {
×
282
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
283
                err);
284
      rocksdb_free(err);
×
285
    }
286

UNCOV
287
    rocksdb_writebatch_clear(wb);
×
288
  }
289
}
8✔
290

291
typedef struct {
292
  TSKEY  ts;
293
  int8_t dirty;
294
  struct {
295
    int16_t cid;
296
    int8_t  type;
297
    int8_t  flag;
298
    union {
299
      int64_t val;
300
      struct {
301
        uint32_t nData;
302
        uint8_t *pData;
303
      };
304
    } value;
305
  } colVal;
306
} SLastColV0;
307

UNCOV
308
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
×
UNCOV
309
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
×
310

UNCOV
311
  pLastCol->rowKey.ts = pLastColV0->ts;
×
UNCOV
312
  pLastCol->rowKey.numOfPKs = 0;
×
UNCOV
313
  pLastCol->dirty = pLastColV0->dirty;
×
UNCOV
314
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
×
UNCOV
315
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
×
UNCOV
316
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
×
317

UNCOV
318
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
×
319

UNCOV
320
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
×
UNCOV
321
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
×
UNCOV
322
    pLastCol->colVal.value.pData = NULL;
×
UNCOV
323
    if (pLastCol->colVal.value.nData > 0) {
×
UNCOV
324
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
×
325
    }
UNCOV
326
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
×
327
  } else {
UNCOV
328
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
×
UNCOV
329
    return sizeof(SLastColV0);
×
330
  }
331
}
332

UNCOV
333
static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
×
UNCOV
334
  if (!value) {
×
335
    return TSDB_CODE_INVALID_PARA;
×
336
  }
337

UNCOV
338
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
×
UNCOV
339
  if (NULL == pLastCol) {
×
340
    return terrno;
×
341
  }
342

UNCOV
343
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
×
UNCOV
344
  if (offset == size) {
×
345
    // version 0
346
    *ppLastCol = pLastCol;
×
347

348
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
UNCOV
349
  } else if (offset > size) {
×
350
    taosMemoryFreeClear(pLastCol);
×
351

352
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
353
  }
354

355
  // version
UNCOV
356
  int8_t version = *(int8_t *)(value + offset);
×
UNCOV
357
  offset += sizeof(int8_t);
×
358

359
  // numOfPKs
UNCOV
360
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
×
UNCOV
361
  offset += sizeof(uint8_t);
×
362

363
  // pks
UNCOV
364
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
365
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
×
366
    offset += sizeof(SValue);
×
367

368
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
369
      pLastCol->rowKey.pks[i].pData = NULL;
×
370
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
371
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
372
        offset += pLastCol->rowKey.pks[i].nData;
×
373
      }
374
    }
375
  }
376

UNCOV
377
  if (version >= LAST_COL_VERSION_2) {
×
UNCOV
378
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
×
379
  }
380

UNCOV
381
  if (offset > size) {
×
382
    taosMemoryFreeClear(pLastCol);
×
383

384
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
385
  }
386

UNCOV
387
  *ppLastCol = pLastCol;
×
388

UNCOV
389
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
390
}
391

392
/*
393
typedef struct {
394
  SLastColV0 lastColV0;
395
  char       colData[];
396
  int8_t     version;
397
  uint8_t    numOfPKs;
398
  SValue     pks[0];
399
  char       pk0Data[];
400
  SValue     pks[1];
401
  char       pk1Data[];
402
  ...
403
} SLastColDisk;
404
*/
UNCOV
405
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
×
UNCOV
406
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
×
407

UNCOV
408
  pLastColV0->ts = pLastCol->rowKey.ts;
×
UNCOV
409
  pLastColV0->dirty = pLastCol->dirty;
×
UNCOV
410
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
×
UNCOV
411
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
×
UNCOV
412
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
×
UNCOV
413
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
×
UNCOV
414
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
×
UNCOV
415
    if (pLastCol->colVal.value.nData > 0) {
×
UNCOV
416
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
×
417
    }
UNCOV
418
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
×
419
  } else {
UNCOV
420
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
×
UNCOV
421
    return sizeof(SLastColV0);
×
422
  }
423

424
  return 0;
425
}
426

UNCOV
427
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
×
UNCOV
428
  *size = sizeof(SLastColV0);
×
UNCOV
429
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
×
UNCOV
430
    *size += pLastCol->colVal.value.nData;
×
431
  }
UNCOV
432
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
×
433

UNCOV
434
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
UNCOV
435
    *size += sizeof(SValue);
×
UNCOV
436
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
UNCOV
437
      *size += pLastCol->rowKey.pks[i].nData;
×
438
    }
439
  }
440

UNCOV
441
  *value = taosMemoryMalloc(*size);
×
UNCOV
442
  if (NULL == *value) {
×
443
    TAOS_RETURN(terrno);
×
444
  }
445

UNCOV
446
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
×
447

448
  // version
UNCOV
449
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
×
UNCOV
450
  offset++;
×
451

452
  // numOfPKs
UNCOV
453
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
×
UNCOV
454
  offset++;
×
455

456
  // pks
UNCOV
457
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
UNCOV
458
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
×
UNCOV
459
    offset += sizeof(SValue);
×
UNCOV
460
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
UNCOV
461
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
UNCOV
462
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
×
463
      }
UNCOV
464
      offset += pLastCol->rowKey.pks[i].nData;
×
465
    }
466
  }
467

UNCOV
468
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
×
469

UNCOV
470
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
471
}
472

473
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
474

UNCOV
475
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
×
UNCOV
476
  SLastCol *pLastCol = (SLastCol *)value;
×
477

UNCOV
478
  if (pLastCol->dirty) {
×
UNCOV
479
    STsdb *pTsdb = (STsdb *)ud;
×
480

UNCOV
481
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
×
UNCOV
482
    if (code) {
×
483
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
484
      return code;
×
485
    }
486

UNCOV
487
    pLastCol->dirty = 0;
×
488

UNCOV
489
    rocksMayWrite(pTsdb, false);
×
490
  }
491

UNCOV
492
  return 0;
×
493
}
494

UNCOV
495
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
×
UNCOV
496
  bool deleted = false;
×
UNCOV
497
  while (*iSkyline > 0) {
×
UNCOV
498
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
×
UNCOV
499
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
×
500

UNCOV
501
    if (key->ts > pItemBack->ts) {
×
UNCOV
502
      return false;
×
UNCOV
503
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
×
UNCOV
504
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
×
505
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
UNCOV
506
        return true;
×
507
      } else {
508
        if (*iSkyline > 1) {
×
509
          --*iSkyline;
×
510
        } else {
511
          return false;
×
512
        }
513
      }
514
    } else {
UNCOV
515
      if (*iSkyline > 1) {
×
516
        --*iSkyline;
×
517
      } else {
UNCOV
518
        return false;
×
519
      }
520
    }
521
  }
522

UNCOV
523
  return deleted;
×
524
}
525

526
// Get next non-deleted row from imem
UNCOV
527
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
×
UNCOV
528
  int32_t code = 0;
×
529

UNCOV
530
  if (tsdbTbDataIterNext(pTbIter)) {
×
UNCOV
531
    TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
×
UNCOV
532
    TSDBKEY  rowKey = TSDBROW_KEY(pMemRow);
×
UNCOV
533
    bool     deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
×
UNCOV
534
    if (!deleted) {
×
UNCOV
535
      return pMemRow;
×
536
    }
537
  }
538

UNCOV
539
  return NULL;
×
540
}
541

542
// Get first non-deleted row from imem
UNCOV
543
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
×
544
                                    int64_t *piSkyline) {
UNCOV
545
  int32_t code = 0;
×
546

UNCOV
547
  tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
×
UNCOV
548
  TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
×
UNCOV
549
  if (pMemRow) {
×
550
    // if non deleted, return the found row.
UNCOV
551
    TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
×
UNCOV
552
    bool    deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
×
UNCOV
553
    if (!deleted) {
×
UNCOV
554
      return pMemRow;
×
555
    }
556
  } else {
UNCOV
557
    return NULL;
×
558
  }
559

560
  // continue to find the non-deleted first row from imem, using get next row
UNCOV
561
  return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
×
562
}
563

UNCOV
564
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
×
UNCOV
565
  SRocksCache *pRCache = &pTsdb->rCache;
×
UNCOV
566
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
×
567

568
  if (suid > 0 && suid == pRCache->suid) {
×
569
    pRCache->sver = -1;
×
570
    pRCache->suid = -1;
×
571
  }
572
  if (suid == 0 && uid == pRCache->uid) {
×
573
    pRCache->sver = -1;
×
574
    pRCache->uid = -1;
×
575
  }
576
}
577

UNCOV
578
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
×
UNCOV
579
  SRocksCache *pRCache = &pTsdb->rCache;
×
UNCOV
580
  if (pRCache->pTSchema && sver == pRCache->sver) {
×
UNCOV
581
    if (suid > 0 && suid == pRCache->suid) {
×
UNCOV
582
      return 0;
×
583
    }
UNCOV
584
    if (suid == 0 && uid == pRCache->uid) {
×
UNCOV
585
      return 0;
×
586
    }
587
  }
588

UNCOV
589
  pRCache->suid = suid;
×
UNCOV
590
  pRCache->uid = uid;
×
UNCOV
591
  pRCache->sver = sver;
×
UNCOV
592
  tDestroyTSchema(pRCache->pTSchema);
×
UNCOV
593
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema);
×
594
}
595

596
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
597

UNCOV
598
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
×
UNCOV
599
  int32_t     code = 0;
×
UNCOV
600
  int32_t     lino = 0;
×
UNCOV
601
  STsdb      *pTsdb = imem->pTsdb;
×
UNCOV
602
  SArray     *pMemDelData = NULL;
×
UNCOV
603
  SArray     *pSkyline = NULL;
×
UNCOV
604
  int64_t     iSkyline = 0;
×
UNCOV
605
  STbDataIter tbIter = {0};
×
UNCOV
606
  TSDBROW    *pMemRow = NULL;
×
UNCOV
607
  STSchema   *pTSchema = NULL;
×
UNCOV
608
  SSHashObj  *iColHash = NULL;
×
609
  int32_t     sver;
610
  int32_t     nCol;
UNCOV
611
  SArray     *ctxArray = pTsdb->rCache.ctxArray;
×
UNCOV
612
  STsdbRowKey tsdbRowKey = {0};
×
UNCOV
613
  STSDBRowIter iter = {0};
×
614

UNCOV
615
  STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
×
616

617
  // load imem tomb data and build skyline
UNCOV
618
  TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
×
619

620
  // tsdbBuildDeleteSkyline
UNCOV
621
  size_t delSize = TARRAY_SIZE(pMemDelData);
×
UNCOV
622
  if (delSize > 0) {
×
UNCOV
623
    pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
×
624
    if (!pSkyline) {
×
UNCOV
625
      TAOS_CHECK_EXIT(terrno);
×
626
    }
627

UNCOV
628
    TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline));
×
UNCOV
629
    iSkyline = taosArrayGetSize(pSkyline) - 1;
×
630
  }
631

UNCOV
632
  pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
×
UNCOV
633
  if (!pMemRow) {
×
UNCOV
634
    goto _exit;
×
635
  }
636

637
  // iter first row to last_row/last col values to ctxArray, and mark last null col ids
UNCOV
638
  sver = TSDBROW_SVERSION(pMemRow);
×
UNCOV
639
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
UNCOV
640
  pTSchema = pTsdb->rCache.pTSchema;
×
UNCOV
641
  nCol = pTSchema->numOfCols;
×
642

UNCOV
643
  tsdbRowGetKey(pMemRow, &tsdbRowKey);
×
644

UNCOV
645
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
×
646

UNCOV
647
  int32_t iCol = 0;
×
UNCOV
648
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
UNCOV
649
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
UNCOV
650
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
651
      TAOS_CHECK_EXIT(terrno);
×
652
    }
653

UNCOV
654
    if (COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
655
      updateCtx.lflag = LFLAG_LAST;
×
UNCOV
656
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
657
        TAOS_CHECK_EXIT(terrno);
×
658
      }
659
    } else {
UNCOV
660
      if (!iColHash) {
×
UNCOV
661
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
×
UNCOV
662
        if (iColHash == NULL) {
×
663
          TAOS_CHECK_EXIT(terrno);
×
664
        }
665
      }
666

UNCOV
667
      if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
×
668
        TAOS_CHECK_EXIT(terrno);
×
669
      }
670
    }
671
  }
UNCOV
672
  tsdbRowClose(&iter);
×
673

674
  // continue to get next row to fill null last col values
UNCOV
675
  pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
×
UNCOV
676
  while (pMemRow) {
×
UNCOV
677
    if (tSimpleHashGetSize(iColHash) == 0) {
×
UNCOV
678
      break;
×
679
    }
680

UNCOV
681
    sver = TSDBROW_SVERSION(pMemRow);
×
UNCOV
682
    TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
×
UNCOV
683
    pTSchema = pTsdb->rCache.pTSchema;
×
684

UNCOV
685
    STsdbRowKey tsdbRowKey = {0};
×
UNCOV
686
    tsdbRowGetKey(pMemRow, &tsdbRowKey);
×
687

UNCOV
688
    TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
×
689

UNCOV
690
    int32_t iCol = 0;
×
UNCOV
691
    for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
UNCOV
692
      if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
693
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
UNCOV
694
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
UNCOV
695
          TAOS_CHECK_EXIT(terrno);
×
696
        }
697

UNCOV
698
        TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
×
699
      }
700
    }
UNCOV
701
    tsdbRowClose(&iter);
×
702

UNCOV
703
    pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
×
704
  }
705

UNCOV
706
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
707

UNCOV
708
_exit:
×
UNCOV
709
  if (code) {
×
UNCOV
710
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
711

UNCOV
712
    tsdbRowClose(&iter);
×
713
  }
714

UNCOV
715
  taosArrayClear(ctxArray);
×
716
  // destroy any allocated resource
UNCOV
717
  tSimpleHashCleanup(iColHash);
×
UNCOV
718
  if (pMemDelData) {
×
UNCOV
719
    taosArrayDestroy(pMemDelData);
×
720
  }
UNCOV
721
  if (pSkyline) {
×
UNCOV
722
    taosArrayDestroy(pSkyline);
×
723
  }
724

UNCOV
725
  TAOS_RETURN(code);
×
726
}
727

UNCOV
728
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
×
UNCOV
729
  if (!pTsdb) return 0;
×
UNCOV
730
  if (!pTsdb->imem) return 0;
×
731

UNCOV
732
  int32_t    code = 0;
×
UNCOV
733
  int32_t    lino = 0;
×
UNCOV
734
  SMemTable *imem = pTsdb->imem;
×
UNCOV
735
  int32_t    nTbData = imem->nTbData;
×
UNCOV
736
  int64_t    nRow = imem->nRow;
×
UNCOV
737
  int64_t    nDel = imem->nDel;
×
738

UNCOV
739
  if (nRow == 0 || nTbData == 0) return 0;
×
740

UNCOV
741
  TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
×
742

UNCOV
743
_exit:
×
UNCOV
744
  if (code) {
×
UNCOV
745
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
746
  } else {
UNCOV
747
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
×
748
  }
749

UNCOV
750
  TAOS_RETURN(code);
×
751
}
752

UNCOV
753
int32_t tsdbCacheCommit(STsdb *pTsdb) {
×
UNCOV
754
  int32_t code = 0;
×
755

756
  // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
757
  // flush dirty data of lru into rocks
758
  // 4, and update when writing if !updateCacheBatch
759
  // 5, merge cache & mem if updateCacheBatch
760

UNCOV
761
  if (tsUpdateCacheBatch) {
×
UNCOV
762
    code = tsdbCacheUpdateFromIMem(pTsdb);
×
UNCOV
763
    if (code) {
×
UNCOV
764
      tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
765

UNCOV
766
      TAOS_RETURN(code);
×
767
    }
768
  }
769

UNCOV
770
  char                 *err = NULL;
×
UNCOV
771
  SLRUCache            *pCache = pTsdb->lruCache;
×
UNCOV
772
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
×
773

UNCOV
774
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
775

UNCOV
776
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
×
777

UNCOV
778
  rocksMayWrite(pTsdb, true);
×
UNCOV
779
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
×
780

UNCOV
781
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
782

UNCOV
783
  if (NULL != err) {
×
UNCOV
784
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
785
    rocksdb_free(err);
×
786
    code = TSDB_CODE_FAILED;
×
787
  }
788

UNCOV
789
  TAOS_RETURN(code);
×
790
}
791

UNCOV
792
static int32_t reallocVarDataVal(SValue *pValue) {
×
UNCOV
793
  if (IS_VAR_DATA_TYPE(pValue->type)) {
×
UNCOV
794
    uint8_t *pVal = pValue->pData;
×
UNCOV
795
    uint32_t nData = pValue->nData;
×
UNCOV
796
    if (nData > 0) {
×
UNCOV
797
      uint8_t *p = taosMemoryMalloc(nData);
×
UNCOV
798
      if (!p) {
×
UNCOV
799
        TAOS_RETURN(terrno);
×
800
      }
UNCOV
801
      pValue->pData = p;
×
UNCOV
802
      (void)memcpy(pValue->pData, pVal, nData);
×
803
    } else {
UNCOV
804
      pValue->pData = NULL;
×
805
    }
806
  }
807

UNCOV
808
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
809
}
810

UNCOV
811
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
×
812

813
// realloc pk data and col data.
UNCOV
814
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
×
UNCOV
815
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
×
UNCOV
816
  size_t  charge = sizeof(SLastCol);
×
817

UNCOV
818
  int8_t i = 0;
×
UNCOV
819
  for (; i < pCol->rowKey.numOfPKs; i++) {
×
UNCOV
820
    SValue *pValue = &pCol->rowKey.pks[i];
×
UNCOV
821
    if (IS_VAR_DATA_TYPE(pValue->type)) {
×
UNCOV
822
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
×
UNCOV
823
      charge += pValue->nData;
×
824
    }
825
  }
826

UNCOV
827
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
×
UNCOV
828
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
×
UNCOV
829
    charge += pCol->colVal.value.nData;
×
830
  }
831

UNCOV
832
  if (pCharge) {
×
UNCOV
833
    *pCharge = charge;
×
834
  }
835

UNCOV
836
_exit:
×
UNCOV
837
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
838
    for (int8_t j = 0; j < i; j++) {
×
839
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
840
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
841
      }
842
    }
843

UNCOV
844
    (void)memset(pCol, 0, sizeof(SLastCol));
×
845
  }
846

UNCOV
847
  TAOS_RETURN(code);
×
848
}
849

UNCOV
850
void tsdbCacheFreeSLastColItem(void *pItem) {
×
UNCOV
851
  SLastCol *pCol = (SLastCol *)pItem;
×
UNCOV
852
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
×
UNCOV
853
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
×
UNCOV
854
      taosMemoryFree(pCol->rowKey.pks[i].pData);
×
855
    }
856
  }
857

UNCOV
858
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) {
×
UNCOV
859
    taosMemoryFree(pCol->colVal.value.pData);
×
860
  }
UNCOV
861
}
×
862

UNCOV
863
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
×
UNCOV
864
  SLastCol *pLastCol = (SLastCol *)value;
×
865

UNCOV
866
  if (pLastCol->dirty) {
×
UNCOV
867
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
×
UNCOV
868
      STsdb *pTsdb = (STsdb *)ud;
×
869
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
870
    }
871
  }
872

UNCOV
873
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
×
UNCOV
874
    SValue *pValue = &pLastCol->rowKey.pks[i];
×
UNCOV
875
    if (IS_VAR_DATA_TYPE(pValue->type)) {
×
UNCOV
876
      taosMemoryFree(pValue->pData);
×
877
    }
878
  }
879

UNCOV
880
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) /* && pLastCol->colVal.value.nData > 0*/) {
×
UNCOV
881
    taosMemoryFree(pLastCol->colVal.value.pData);
×
882
  }
883

UNCOV
884
  taosMemoryFree(value);
×
UNCOV
885
}
×
886

UNCOV
887
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
×
UNCOV
888
  SLastCol *pLastCol = (SLastCol *)value;
×
UNCOV
889
  pLastCol->dirty = 0;
×
UNCOV
890
}
×
891

892
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
893

UNCOV
894
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
×
UNCOV
895
  int32_t code = 0, lino = 0;
×
896

UNCOV
897
  SLRUCache            *pCache = pTsdb->lruCache;
×
UNCOV
898
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
×
UNCOV
899
  SRowKey               emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
×
UNCOV
900
  SLastCol              emptyCol = {
×
901
                   .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
902

UNCOV
903
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
×
UNCOV
904
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
×
UNCOV
905
  if (code) {
×
UNCOV
906
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
907
  }
908

UNCOV
909
  TAOS_RETURN(code);
×
910
}
911

912
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
4✔
913
  int32_t code = 0;
4✔
914
  char   *err = NULL;
4✔
915

916
  SLRUCache            *pCache = pTsdb->lruCache;
4✔
917
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
4✔
918

919
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
4✔
920

921
  rocksMayWrite(pTsdb, true);
4✔
922
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
4✔
923

924
  if (NULL != err) {
4!
UNCOV
925
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
926
    rocksdb_free(err);
×
927
    code = TSDB_CODE_FAILED;
×
928
  }
929

930
  TAOS_RETURN(code);
4✔
931
}
932

UNCOV
933
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
×
934
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
UNCOV
935
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
×
UNCOV
936
  if (!valuesList) return terrno;
×
UNCOV
937
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
×
UNCOV
938
  if (!valuesListSizes) {
×
UNCOV
939
    taosMemoryFreeClear(valuesList);
×
940
    return terrno;
×
941
  }
UNCOV
942
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
×
UNCOV
943
  if (!errs) {
×
UNCOV
944
    taosMemoryFreeClear(valuesList);
×
945
    taosMemoryFreeClear(valuesListSizes);
×
946
    return terrno;
×
947
  }
UNCOV
948
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
×
949
                    valuesListSizes, errs);
UNCOV
950
  for (size_t i = 0; i < numKeys; ++i) {
×
UNCOV
951
    rocksdb_free(errs[i]);
×
952
  }
UNCOV
953
  taosMemoryFreeClear(errs);
×
954

UNCOV
955
  *pppValuesList = valuesList;
×
UNCOV
956
  *ppValuesListSizes = valuesListSizes;
×
UNCOV
957
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
958
}
959

UNCOV
960
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
×
UNCOV
961
  int32_t code = 0;
×
962

963
  // build keys & multi get from rocks
UNCOV
964
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
×
UNCOV
965
  if (!keys_list) {
×
UNCOV
966
    return terrno;
×
967
  }
UNCOV
968
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
×
UNCOV
969
  if (!keys_list_sizes) {
×
UNCOV
970
    taosMemoryFree(keys_list);
×
971
    return terrno;
×
972
  }
UNCOV
973
  const size_t klen = ROCKS_KEY_LEN;
×
974

UNCOV
975
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
×
UNCOV
976
  if (!keys) {
×
UNCOV
977
    taosMemoryFree(keys_list);
×
978
    taosMemoryFree(keys_list_sizes);
×
979
    return terrno;
×
980
  }
UNCOV
981
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
×
UNCOV
982
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
×
983

UNCOV
984
  keys_list[0] = keys;
×
UNCOV
985
  keys_list[1] = keys + sizeof(SLastKey);
×
UNCOV
986
  keys_list_sizes[0] = klen;
×
UNCOV
987
  keys_list_sizes[1] = klen;
×
988

UNCOV
989
  char  **values_list = NULL;
×
UNCOV
990
  size_t *values_list_sizes = NULL;
×
991

992
  // was written by caller
993
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
994

UNCOV
995
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
×
996
                                              &values_list_sizes),
997
                  NULL, _exit);
998

UNCOV
999
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
×
1000
  {
UNCOV
1001
    SLastCol *pLastCol = NULL;
×
UNCOV
1002
    if (values_list[0] != NULL) {
×
UNCOV
1003
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
×
UNCOV
1004
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1005
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1006
                  tstrerror(code));
UNCOV
1007
        goto _exit;
×
1008
      }
UNCOV
1009
      if (NULL != pLastCol) {
×
UNCOV
1010
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
×
1011
      }
UNCOV
1012
      taosMemoryFreeClear(pLastCol);
×
1013
    }
1014

UNCOV
1015
    pLastCol = NULL;
×
UNCOV
1016
    if (values_list[1] != NULL) {
×
UNCOV
1017
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
×
UNCOV
1018
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1019
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1020
                  tstrerror(code));
UNCOV
1021
        goto _exit;
×
1022
      }
UNCOV
1023
      if (NULL != pLastCol) {
×
UNCOV
1024
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
×
1025
      }
UNCOV
1026
      taosMemoryFreeClear(pLastCol);
×
1027
    }
1028

UNCOV
1029
    rocksdb_free(values_list[0]);
×
UNCOV
1030
    rocksdb_free(values_list[1]);
×
1031

UNCOV
1032
    for (int i = 0; i < 2; i++) {
×
UNCOV
1033
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
×
UNCOV
1034
      if (h) {
×
UNCOV
1035
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
×
UNCOV
1036
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
×
1037
      }
1038
    }
1039
  }
1040

UNCOV
1041
_exit:
×
UNCOV
1042
  taosMemoryFree(keys_list[0]);
×
1043

UNCOV
1044
  taosMemoryFree(keys_list);
×
UNCOV
1045
  taosMemoryFree(keys_list_sizes);
×
UNCOV
1046
  taosMemoryFree(values_list);
×
UNCOV
1047
  taosMemoryFree(values_list_sizes);
×
1048

UNCOV
1049
  TAOS_RETURN(code);
×
1050
}
1051

UNCOV
1052
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
×
UNCOV
1053
  int32_t code = 0;
×
1054

UNCOV
1055
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1056

UNCOV
1057
  if (suid < 0) {
×
UNCOV
1058
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
×
UNCOV
1059
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
UNCOV
1060
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1061

UNCOV
1062
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
×
UNCOV
1063
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1064
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1065
                  tstrerror(code));
1066
      }
UNCOV
1067
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
×
UNCOV
1068
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1069
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1070
                  tstrerror(code));
1071
      }
1072
    }
1073
  } else {
UNCOV
1074
    STSchema *pTSchema = NULL;
×
UNCOV
1075
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
×
UNCOV
1076
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1077
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1078

UNCOV
1079
      TAOS_RETURN(code);
×
1080
    }
1081

UNCOV
1082
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
×
UNCOV
1083
      int16_t cid = pTSchema->columns[i].colId;
×
UNCOV
1084
      int8_t  col_type = pTSchema->columns[i].type;
×
1085

UNCOV
1086
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
×
UNCOV
1087
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1088
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1089
                  tstrerror(code));
1090
      }
UNCOV
1091
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
×
UNCOV
1092
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1093
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1094
                  tstrerror(code));
1095
      }
1096
    }
1097

UNCOV
1098
    taosMemoryFree(pTSchema);
×
1099
  }
1100

UNCOV
1101
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1102

UNCOV
1103
  TAOS_RETURN(code);
×
1104
}
1105

UNCOV
1106
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
×
UNCOV
1107
  int32_t code = 0;
×
1108

UNCOV
1109
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1110

UNCOV
1111
  code = tsdbCacheCommitNoLock(pTsdb);
×
UNCOV
1112
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1113
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1114
              tstrerror(code));
1115
  }
1116

UNCOV
1117
  if (pSchemaRow != NULL) {
×
UNCOV
1118
    bool hasPrimayKey = false;
×
1119
    int  nCols = pSchemaRow->nCols;
×
1120
    if (nCols >= 2) {
×
1121
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
1122
    }
UNCOV
1123
    for (int i = 0; i < nCols; ++i) {
×
1124
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
1125
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1126

UNCOV
1127
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1128
      if (code != TSDB_CODE_SUCCESS) {
×
1129
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1130
                  tstrerror(code));
1131
      }
1132
    }
1133
  } else {
UNCOV
1134
    STSchema *pTSchema = NULL;
×
UNCOV
1135
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
×
UNCOV
1136
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1137
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1138

UNCOV
1139
      TAOS_RETURN(code);
×
1140
    }
1141

UNCOV
1142
    bool hasPrimayKey = false;
×
UNCOV
1143
    int  nCols = pTSchema->numOfCols;
×
UNCOV
1144
    if (nCols >= 2) {
×
UNCOV
1145
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
×
1146
    }
UNCOV
1147
    for (int i = 0; i < nCols; ++i) {
×
UNCOV
1148
      int16_t cid = pTSchema->columns[i].colId;
×
UNCOV
1149
      int8_t  col_type = pTSchema->columns[i].type;
×
1150

UNCOV
1151
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
UNCOV
1152
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1153
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1154
                  tstrerror(code));
1155
      }
1156
    }
1157

UNCOV
1158
    taosMemoryFree(pTSchema);
×
1159
  }
1160

UNCOV
1161
  rocksMayWrite(pTsdb, false);
×
1162

UNCOV
1163
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1164

UNCOV
1165
  TAOS_RETURN(code);
×
1166
}
1167

1168
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
4✔
1169
  int32_t code = 0;
4✔
1170

1171
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
4✔
1172

1173
  code = tsdbCacheCommitNoLock(pTsdb);
4✔
1174
  if (code != TSDB_CODE_SUCCESS) {
4!
UNCOV
1175
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1176
              tstrerror(code));
1177
  }
1178

1179
  STSchema *pTSchema = NULL;
4✔
1180
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
4✔
1181
  if (code != TSDB_CODE_SUCCESS) {
4!
UNCOV
1182
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1183

UNCOV
1184
    TAOS_RETURN(code);
×
1185
  }
1186

1187
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
4!
UNCOV
1188
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
×
1189

UNCOV
1190
    bool hasPrimayKey = false;
×
UNCOV
1191
    int  nCols = pTSchema->numOfCols;
×
UNCOV
1192
    if (nCols >= 2) {
×
UNCOV
1193
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
×
1194
    }
1195

UNCOV
1196
    for (int i = 0; i < nCols; ++i) {
×
UNCOV
1197
      int16_t cid = pTSchema->columns[i].colId;
×
UNCOV
1198
      int8_t  col_type = pTSchema->columns[i].type;
×
1199

UNCOV
1200
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
UNCOV
1201
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1202
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1203
                  tstrerror(code));
1204
      }
1205
    }
1206
  }
1207

1208
  taosMemoryFree(pTSchema);
4!
1209

1210
  rocksMayWrite(pTsdb, false);
4✔
1211

1212
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
4✔
1213

1214
  TAOS_RETURN(code);
4✔
1215
}
1216

UNCOV
1217
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
×
1218
  int32_t code = 0;
×
1219

UNCOV
1220
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1221

UNCOV
1222
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
1223
  if (code != TSDB_CODE_SUCCESS) {
×
1224
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1225
              tstrerror(code));
1226
  }
UNCOV
1227
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
1228
  if (code != TSDB_CODE_SUCCESS) {
×
1229
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1230
              tstrerror(code));
1231
  }
1232
  // rocksMayWrite(pTsdb, true, false, false);
UNCOV
1233
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1234

UNCOV
1235
  TAOS_RETURN(code);
×
1236
}
1237

UNCOV
1238
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
×
1239
  int32_t code = 0;
×
1240

UNCOV
1241
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1242

UNCOV
1243
  code = tsdbCacheCommitNoLock(pTsdb);
×
1244
  if (code != TSDB_CODE_SUCCESS) {
×
1245
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1246
              tstrerror(code));
1247
  }
1248

UNCOV
1249
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1250
  if (code != TSDB_CODE_SUCCESS) {
×
1251
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1252
              tstrerror(code));
1253
  }
1254

UNCOV
1255
  rocksMayWrite(pTsdb, false);
×
1256

UNCOV
1257
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1258

UNCOV
1259
  TAOS_RETURN(code);
×
1260
}
1261

UNCOV
1262
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
×
UNCOV
1263
  int32_t code = 0;
×
1264

UNCOV
1265
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1266

UNCOV
1267
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
×
UNCOV
1268
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
×
1269

UNCOV
1270
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
UNCOV
1271
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1272
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1273
                tstrerror(code));
1274
    }
UNCOV
1275
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
UNCOV
1276
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1277
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1278
                tstrerror(code));
1279
    }
1280
  }
1281

1282
  // rocksMayWrite(pTsdb, true, false, false);
UNCOV
1283
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
UNCOV
1284
  TAOS_RETURN(code);
×
1285
}
1286

UNCOV
1287
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
×
UNCOV
1288
  int32_t code = 0;
×
1289

UNCOV
1290
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1291

UNCOV
1292
  code = tsdbCacheCommitNoLock(pTsdb);
×
UNCOV
1293
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1294
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1295
              tstrerror(code));
1296
  }
1297

UNCOV
1298
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
×
UNCOV
1299
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
×
1300

UNCOV
1301
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
UNCOV
1302
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1303
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1304
                tstrerror(code));
1305
    }
1306
  }
1307

UNCOV
1308
  rocksMayWrite(pTsdb, false);
×
1309

UNCOV
1310
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1311

UNCOV
1312
  TAOS_RETURN(code);
×
1313
}
1314

1315
typedef struct {
1316
  int      idx;
1317
  SLastKey key;
1318
} SIdxKey;
1319

UNCOV
1320
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
×
1321
  // update rowkey
UNCOV
1322
  pLastCol->rowKey.ts = TSKEY_MIN;
×
1323
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
1324
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
1325
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
1326
      taosMemoryFreeClear(pPKValue->pData);
×
1327
      pPKValue->nData = 0;
×
1328
    } else {
UNCOV
1329
      pPKValue->val = 0;
×
1330
    }
1331
  }
UNCOV
1332
  pLastCol->rowKey.numOfPKs = 0;
×
1333

1334
  // update colval
UNCOV
1335
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
×
1336
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
1337
    pLastCol->colVal.value.nData = 0;
×
1338
  } else {
UNCOV
1339
    pLastCol->colVal.value.val = 0;
×
1340
  }
1341

UNCOV
1342
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
×
1343
  pLastCol->dirty = 1;
×
1344
  pLastCol->cacheStatus = cacheStatus;
×
1345
}
×
1346

UNCOV
1347
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
×
UNCOV
1348
  int32_t code = 0;
×
UNCOV
1349
  char   *rocks_value = NULL;
×
UNCOV
1350
  size_t  vlen = 0;
×
1351

UNCOV
1352
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
×
UNCOV
1353
  if (code) {
×
UNCOV
1354
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
1355
    TAOS_RETURN(code);
×
1356
  }
1357

UNCOV
1358
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
×
UNCOV
1359
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
×
UNCOV
1360
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
×
UNCOV
1361
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
×
1362

UNCOV
1363
  taosMemoryFree(rocks_value);
×
1364

UNCOV
1365
  TAOS_RETURN(code);
×
1366
}
1367

UNCOV
1368
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
×
UNCOV
1369
  int32_t code = 0, lino = 0;
×
1370

UNCOV
1371
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
×
UNCOV
1372
  if (!pLRULastCol) {
×
UNCOV
1373
    return terrno;
×
1374
  }
1375

UNCOV
1376
  size_t charge = 0;
×
UNCOV
1377
  *pLRULastCol = *pLastCol;
×
UNCOV
1378
  pLRULastCol->dirty = dirty;
×
UNCOV
1379
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
×
1380

UNCOV
1381
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
×
1382
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
UNCOV
1383
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
×
UNCOV
1384
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
1385
    code = TSDB_CODE_FAILED;
×
1386
    pLRULastCol = NULL;
×
1387
  }
1388

UNCOV
1389
_exit:
×
UNCOV
1390
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1391
    taosMemoryFree(pLRULastCol);
×
1392
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1393
  }
1394

UNCOV
1395
  TAOS_RETURN(code);
×
1396
}
1397

UNCOV
1398
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
×
UNCOV
1399
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
×
UNCOV
1400
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1401
  }
1402

UNCOV
1403
  int32_t code = 0, lino = 0;
×
1404

UNCOV
1405
  int        num_keys = TARRAY_SIZE(updCtxArray);
×
UNCOV
1406
  SArray    *remainCols = NULL;
×
UNCOV
1407
  SLRUCache *pCache = pTsdb->lruCache;
×
1408

UNCOV
1409
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
UNCOV
1410
  for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1411
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
×
UNCOV
1412
    int8_t          lflag = updCtx->lflag;
×
UNCOV
1413
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
×
UNCOV
1414
    SColVal        *pColVal = &updCtx->colVal;
×
1415

UNCOV
1416
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
1417
      continue;
×
1418
    }
1419

UNCOV
1420
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
×
UNCOV
1421
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
×
UNCOV
1422
    if (h) {
×
UNCOV
1423
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
×
UNCOV
1424
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
×
UNCOV
1425
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
×
UNCOV
1426
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
×
UNCOV
1427
          SLastCol newLastCol = {
×
1428
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
UNCOV
1429
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
×
1430
        }
1431
      }
1432

UNCOV
1433
      tsdbLRUCacheRelease(pCache, h, false);
×
UNCOV
1434
      TAOS_CHECK_EXIT(code);
×
1435
    } else {
UNCOV
1436
      if (!remainCols) {
×
UNCOV
1437
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
×
UNCOV
1438
        if (!remainCols) {
×
UNCOV
1439
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1440
        }
1441
      }
UNCOV
1442
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
×
UNCOV
1443
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1444
      }
1445
    }
1446
  }
1447

UNCOV
1448
  if (remainCols) {
×
UNCOV
1449
    num_keys = TARRAY_SIZE(remainCols);
×
1450
  }
UNCOV
1451
  if (remainCols && num_keys > 0) {
×
UNCOV
1452
    char  **keys_list = NULL;
×
UNCOV
1453
    size_t *keys_list_sizes = NULL;
×
UNCOV
1454
    char  **values_list = NULL;
×
UNCOV
1455
    size_t *values_list_sizes = NULL;
×
UNCOV
1456
    char  **errs = NULL;
×
UNCOV
1457
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
×
UNCOV
1458
    if (!keys_list) {
×
UNCOV
1459
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1460
      return terrno;
×
1461
    }
UNCOV
1462
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
×
UNCOV
1463
    if (!keys_list_sizes) {
×
UNCOV
1464
      taosMemoryFree(keys_list);
×
1465
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1466
      return terrno;
×
1467
    }
UNCOV
1468
    for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1469
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
×
1470

UNCOV
1471
      keys_list[i] = (char *)&idxKey->key;
×
UNCOV
1472
      keys_list_sizes[i] = ROCKS_KEY_LEN;
×
1473
    }
1474

UNCOV
1475
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
×
1476

UNCOV
1477
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
×
1478
                                       &values_list_sizes);
UNCOV
1479
    if (code) {
×
UNCOV
1480
      taosMemoryFree(keys_list);
×
1481
      taosMemoryFree(keys_list_sizes);
×
1482
      goto _exit;
×
1483
    }
1484

UNCOV
1485
    rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
×
UNCOV
1486
    for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1487
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
×
UNCOV
1488
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
×
UNCOV
1489
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
×
UNCOV
1490
      SColVal        *pColVal = &updCtx->colVal;
×
1491

UNCOV
1492
      SLastCol *pLastCol = NULL;
×
UNCOV
1493
      if (values_list[i] != NULL) {
×
UNCOV
1494
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
UNCOV
1495
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1496
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1497
                    tstrerror(code));
UNCOV
1498
          goto _exit;
×
1499
        }
1500
      }
1501
      /*
1502
      if (code) {
1503
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1504
      }
1505
      */
UNCOV
1506
      SLastCol *pToFree = pLastCol;
×
1507

UNCOV
1508
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
×
UNCOV
1509
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
1510
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1511
                    tstrerror(code));
UNCOV
1512
          taosMemoryFreeClear(pToFree);
×
1513
          break;
×
1514
        }
1515

1516
        // cache invalid => skip update
UNCOV
1517
        taosMemoryFreeClear(pToFree);
×
1518
        continue;
×
1519
      }
1520

UNCOV
1521
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
1522
        taosMemoryFreeClear(pToFree);
×
1523
        continue;
×
1524
      }
1525

UNCOV
1526
      int32_t cmp_res = 1;
×
UNCOV
1527
      if (pLastCol) {
×
UNCOV
1528
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
×
1529
      }
1530

UNCOV
1531
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
×
UNCOV
1532
        SLastCol lastColTmp = {
×
1533
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
UNCOV
1534
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
×
UNCOV
1535
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1536
                    tstrerror(code));
UNCOV
1537
          taosMemoryFreeClear(pToFree);
×
1538
          break;
×
1539
        }
UNCOV
1540
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
×
UNCOV
1541
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1542
                    tstrerror(code));
UNCOV
1543
          taosMemoryFreeClear(pToFree);
×
1544
          break;
×
1545
        }
1546
      }
1547

UNCOV
1548
      taosMemoryFreeClear(pToFree);
×
1549
    }
1550

UNCOV
1551
    rocksMayWrite(pTsdb, false);
×
1552

UNCOV
1553
    taosMemoryFree(keys_list);
×
UNCOV
1554
    taosMemoryFree(keys_list_sizes);
×
UNCOV
1555
    if (values_list) {
×
UNCOV
1556
      for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1557
        rocksdb_free(values_list[i]);
×
1558
      }
UNCOV
1559
      taosMemoryFree(values_list);
×
1560
    }
UNCOV
1561
    taosMemoryFree(values_list_sizes);
×
1562
  }
1563

UNCOV
1564
_exit:
×
UNCOV
1565
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
UNCOV
1566
  taosArrayDestroy(remainCols);
×
1567

UNCOV
1568
  if (code) {
×
UNCOV
1569
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1570
              tstrerror(code));
1571
  }
1572

UNCOV
1573
  TAOS_RETURN(code);
×
1574
}
1575

UNCOV
1576
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
×
1577
                                 SRow **aRow) {
UNCOV
1578
  int32_t code = 0, lino = 0;
×
1579

1580
  // 1. prepare last
UNCOV
1581
  TSDBROW      lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
×
1582
  STSchema    *pTSchema = NULL;
×
1583
  int32_t      sver = TSDBROW_SVERSION(&lRow);
×
1584
  SSHashObj   *iColHash = NULL;
×
1585
  STSDBRowIter iter = {0};
×
1586

UNCOV
1587
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
1588
  pTSchema = pTsdb->rCache.pTSchema;
×
1589

UNCOV
1590
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
×
1591
  int32_t nCol = pTSchema->numOfCols;
×
1592
  SArray *ctxArray = pTsdb->rCache.ctxArray;
×
1593

1594
  // 1. prepare by lrow
UNCOV
1595
  STsdbRowKey tsdbRowKey = {0};
×
1596
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1597

UNCOV
1598
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1599

UNCOV
1600
  int32_t iCol = 0;
×
1601
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
1602
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1603
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1604
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1605
    }
1606

UNCOV
1607
    if (COL_VAL_IS_VALUE(pColVal)) {
×
1608
      updateCtx.lflag = LFLAG_LAST;
×
1609
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1610
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1611
      }
1612
    } else {
UNCOV
1613
      if (!iColHash) {
×
1614
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
1615
        if (iColHash == NULL) {
×
1616
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1617
        }
1618
      }
1619

UNCOV
1620
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
×
1621
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1622
      }
1623
    }
1624
  }
1625

1626
  // 2. prepare by the other rows
UNCOV
1627
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
×
1628
    if (tSimpleHashGetSize(iColHash) == 0) {
×
1629
      break;
×
1630
    }
1631

UNCOV
1632
    tRow.pTSRow = aRow[iRow];
×
1633

UNCOV
1634
    STsdbRowKey tsdbRowKey = {0};
×
1635
    tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1636

UNCOV
1637
    void   *pIte = NULL;
×
1638
    int32_t iter = 0;
×
1639
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
×
1640
      int32_t iCol = ((int32_t *)pIte)[0];
×
1641
      SColVal colVal = COL_VAL_NONE(0, 0);
×
1642
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
×
1643

UNCOV
1644
      if (COL_VAL_IS_VALUE(&colVal)) {
×
1645
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1646
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1647
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1648
        }
UNCOV
1649
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
1650
        if (code != TSDB_CODE_SUCCESS) {
×
1651
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1652
                    __LINE__, tstrerror(code));
1653
        }
1654
      }
1655
    }
1656
  }
1657

UNCOV
1658
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1659

UNCOV
1660
_exit:
×
1661
  if (code) {
×
1662
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1663
  }
1664

UNCOV
1665
  tsdbRowClose(&iter);
×
1666
  tSimpleHashCleanup(iColHash);
×
1667
  taosArrayClear(ctxArray);
×
1668

UNCOV
1669
  TAOS_RETURN(code);
×
1670
}
1671

UNCOV
1672
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
1673
  int32_t      code = 0, lino = 0;
×
1674
  STSDBRowIter iter = {0};
×
1675
  STSchema    *pTSchema = NULL;
×
1676
  SArray      *ctxArray = NULL;
×
1677

UNCOV
1678
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
1679
  int32_t sver = TSDBROW_SVERSION(&lRow);
×
1680

UNCOV
1681
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1682

UNCOV
1683
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
1684
  if (ctxArray == NULL) {
×
1685
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1686
  }
1687

1688
  // 1. prepare last
UNCOV
1689
  STsdbRowKey tsdbRowKey = {0};
×
1690
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1691

1692
  {
UNCOV
1693
    SLastUpdateCtx updateCtx = {
×
1694
        .lflag = LFLAG_LAST,
1695
        .tsdbRowKey = tsdbRowKey,
UNCOV
1696
        .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP,
×
1697
                                                                       .val = lRow.pBlockData->aTSKEY[lRow.iRow]}))};
UNCOV
1698
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1699
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1700
    }
1701
  }
1702

UNCOV
1703
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1704

UNCOV
1705
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1706
    SColData *pColData = &pBlockData->aColData[iColData];
×
1707
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
1708
      continue;
×
1709
    }
1710

UNCOV
1711
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
1712
      STsdbRowKey tsdbRowKey = {0};
×
1713
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1714

UNCOV
1715
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
1716
      if (colType == 2) {
×
1717
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
1718
        tColDataGetValue(pColData, tRow.iRow, &colVal);
×
1719

UNCOV
1720
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1721
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1722
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1723
        }
UNCOV
1724
        break;
×
1725
      }
1726
    }
1727
  }
1728

1729
  // 2. prepare last row
UNCOV
1730
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1731
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
1732
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1733
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1734
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1735
    }
1736
  }
1737

UNCOV
1738
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1739

UNCOV
1740
_exit:
×
1741
  tsdbRowClose(&iter);
×
1742
  taosMemoryFreeClear(pTSchema);
×
1743
  taosArrayDestroy(ctxArray);
×
1744

UNCOV
1745
  TAOS_RETURN(code);
×
1746
}
1747

1748
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1749
                            int nCols, int16_t *slotIds);
1750

1751
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1752
                               int nCols, int16_t *slotIds);
1753

UNCOV
1754
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
×
1755
                                    SCacheRowsReader *pr, int8_t ltype) {
UNCOV
1756
  int32_t               code = 0, lino = 0;
×
UNCOV
1757
  rocksdb_writebatch_t *wb = NULL;
×
UNCOV
1758
  SArray               *pTmpColArray = NULL;
×
UNCOV
1759
  bool                  extraTS = false;
×
1760

UNCOV
1761
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
×
UNCOV
1762
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
×
1763
    // ignore 'ts' loaded from cache and load it from tsdb
1764
    // SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
1765
    // tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
1766

UNCOV
1767
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
×
UNCOV
1768
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
×
UNCOV
1769
      TAOS_RETURN(terrno);
×
1770
    }
1771

UNCOV
1772
    extraTS = true;
×
1773
  }
1774

UNCOV
1775
  int      num_keys = TARRAY_SIZE(remainCols);
×
UNCOV
1776
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
×
1777

UNCOV
1778
  int16_t *lastColIds = NULL, *lastSlotIds = NULL, *lastrowColIds = NULL, *lastrowSlotIds = NULL;
×
UNCOV
1779
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
×
UNCOV
1780
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
×
UNCOV
1781
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
×
UNCOV
1782
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
×
UNCOV
1783
  SArray *lastTmpColArray = NULL, *lastTmpIndexArray = NULL, *lastrowTmpColArray = NULL, *lastrowTmpIndexArray = NULL;
×
1784

UNCOV
1785
  int lastIndex = 0;
×
UNCOV
1786
  int lastrowIndex = 0;
×
1787

UNCOV
1788
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
×
UNCOV
1789
    TAOS_CHECK_EXIT(terrno);
×
1790
  }
1791

UNCOV
1792
  for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1793
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
×
UNCOV
1794
    if (extraTS && !i) {
×
UNCOV
1795
      slotIds[i] = 0;
×
1796
    } else {
UNCOV
1797
      slotIds[i] = pr->pSlotIds[idxKey->idx];
×
1798
    }
1799

UNCOV
1800
    if (IS_LAST_KEY(idxKey->key)) {
×
UNCOV
1801
      if (NULL == lastTmpIndexArray) {
×
UNCOV
1802
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
×
UNCOV
1803
        if (!lastTmpIndexArray) {
×
UNCOV
1804
          TAOS_CHECK_EXIT(terrno);
×
1805
        }
1806
      }
UNCOV
1807
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
×
UNCOV
1808
        TAOS_CHECK_EXIT(terrno);
×
1809
      }
UNCOV
1810
      lastColIds[lastIndex] = idxKey->key.cid;
×
UNCOV
1811
      if (extraTS && !i) {
×
UNCOV
1812
        lastSlotIds[lastIndex] = 0;
×
1813
      } else {
UNCOV
1814
        lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
×
1815
      }
UNCOV
1816
      lastIndex++;
×
1817
    } else {
UNCOV
1818
      if (NULL == lastrowTmpIndexArray) {
×
UNCOV
1819
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
×
UNCOV
1820
        if (!lastrowTmpIndexArray) {
×
UNCOV
1821
          TAOS_CHECK_EXIT(terrno);
×
1822
        }
1823
      }
UNCOV
1824
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
×
UNCOV
1825
        TAOS_CHECK_EXIT(terrno);
×
1826
      }
UNCOV
1827
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
×
UNCOV
1828
      if (extraTS && !i) {
×
UNCOV
1829
        lastrowSlotIds[lastrowIndex] = 0;
×
1830
      } else {
UNCOV
1831
        lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
×
1832
      }
UNCOV
1833
      lastrowIndex++;
×
1834
    }
1835
  }
1836

UNCOV
1837
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
×
UNCOV
1838
  if (!pTmpColArray) {
×
UNCOV
1839
    TAOS_CHECK_EXIT(terrno);
×
1840
  }
1841

UNCOV
1842
  if (lastTmpIndexArray != NULL) {
×
UNCOV
1843
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
×
UNCOV
1844
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
×
UNCOV
1845
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
×
UNCOV
1846
                           taosArrayGet(lastTmpColArray, i))) {
×
UNCOV
1847
        TAOS_CHECK_EXIT(terrno);
×
1848
      }
1849
    }
1850
  }
1851

UNCOV
1852
  if (lastrowTmpIndexArray != NULL) {
×
UNCOV
1853
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
×
UNCOV
1854
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
×
UNCOV
1855
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
×
UNCOV
1856
                           taosArrayGet(lastrowTmpColArray, i))) {
×
UNCOV
1857
        TAOS_CHECK_EXIT(terrno);
×
1858
      }
1859
    }
1860
  }
1861

UNCOV
1862
  SLRUCache *pCache = pTsdb->lruCache;
×
UNCOV
1863
  for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1864
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
×
UNCOV
1865
    SLastCol *pLastCol = NULL;
×
1866

UNCOV
1867
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
×
UNCOV
1868
      pLastCol = taosArrayGet(pTmpColArray, i);
×
1869
    }
1870

1871
    // still null, then make up a none col value
UNCOV
1872
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
×
UNCOV
1873
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
×
1874
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
UNCOV
1875
    if (!pLastCol) {
×
UNCOV
1876
      pLastCol = &noneCol;
×
1877
    }
1878

UNCOV
1879
    if (!extraTS || i > 0) {
×
UNCOV
1880
      taosArraySet(pLastArray, idxKey->idx, pLastCol);
×
1881
    }
1882
    // taosArrayRemove(remainCols, i);
1883

UNCOV
1884
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
×
UNCOV
1885
      continue;
×
1886
    }
UNCOV
1887
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
×
UNCOV
1888
      continue;
×
1889
    }
1890

1891
    // store result back to rocks cache
UNCOV
1892
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
×
UNCOV
1893
    if (code) {
×
UNCOV
1894
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1895
      TAOS_CHECK_EXIT(code);
×
1896
    }
1897

UNCOV
1898
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
×
UNCOV
1899
    if (code) {
×
UNCOV
1900
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1901
      TAOS_CHECK_EXIT(code);
×
1902
    }
1903
  }
1904

UNCOV
1905
  rocksMayWrite(pTsdb, false);
×
1906

UNCOV
1907
_exit:
×
UNCOV
1908
  taosArrayDestroy(lastrowTmpIndexArray);
×
UNCOV
1909
  taosArrayDestroy(lastrowTmpColArray);
×
UNCOV
1910
  taosArrayDestroy(lastTmpIndexArray);
×
UNCOV
1911
  taosArrayDestroy(lastTmpColArray);
×
1912

UNCOV
1913
  taosMemoryFree(lastColIds);
×
UNCOV
1914
  taosMemoryFree(lastSlotIds);
×
UNCOV
1915
  taosMemoryFree(lastrowColIds);
×
UNCOV
1916
  taosMemoryFree(lastrowSlotIds);
×
1917

UNCOV
1918
  taosArrayDestroy(pTmpColArray);
×
1919

UNCOV
1920
  taosMemoryFree(slotIds);
×
1921

UNCOV
1922
  TAOS_RETURN(code);
×
1923
}
1924

UNCOV
1925
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
×
1926
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
UNCOV
1927
  int32_t code = 0, lino = 0;
×
UNCOV
1928
  int     num_keys = TARRAY_SIZE(remainCols);
×
UNCOV
1929
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
×
UNCOV
1930
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
×
UNCOV
1931
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
×
UNCOV
1932
  if (!keys_list || !keys_list_sizes || !key_list) {
×
UNCOV
1933
    taosMemoryFree(keys_list);
×
1934
    taosMemoryFree(keys_list_sizes);
×
1935
    TAOS_RETURN(terrno);
×
1936
  }
UNCOV
1937
  char  **values_list = NULL;
×
UNCOV
1938
  size_t *values_list_sizes = NULL;
×
UNCOV
1939
  for (int i = 0; i < num_keys; ++i) {
×
UNCOV
1940
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
×
UNCOV
1941
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
×
UNCOV
1942
    keys_list_sizes[i] = ROCKS_KEY_LEN;
×
1943
  }
1944

UNCOV
1945
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
×
1946

UNCOV
1947
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
×
1948
                                     &values_list_sizes);
UNCOV
1949
  if (code) {
×
UNCOV
1950
    taosMemoryFree(key_list);
×
1951
    taosMemoryFree(keys_list);
×
1952
    taosMemoryFree(keys_list_sizes);
×
1953
    TAOS_RETURN(code);
×
1954
  }
1955

UNCOV
1956
  SLRUCache *pCache = pTsdb->lruCache;
×
UNCOV
1957
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
×
UNCOV
1958
    SLastCol *pLastCol = NULL;
×
UNCOV
1959
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
×
UNCOV
1960
    if (ignore) {
×
UNCOV
1961
      ++j;
×
UNCOV
1962
      continue;
×
1963
    }
1964

UNCOV
1965
    if (values_list[i] != NULL) {
×
UNCOV
1966
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
UNCOV
1967
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1968
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1969
                  tstrerror(code));
UNCOV
1970
        goto _exit;
×
1971
      }
1972
    }
UNCOV
1973
    SLastCol *pToFree = pLastCol;
×
UNCOV
1974
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
×
UNCOV
1975
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
×
UNCOV
1976
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
×
UNCOV
1977
      if (code) {
×
UNCOV
1978
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1979
        taosMemoryFreeClear(pToFree);
×
1980
        TAOS_CHECK_EXIT(code);
×
1981
      }
1982

UNCOV
1983
      SLastCol lastCol = *pLastCol;
×
UNCOV
1984
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
UNCOV
1985
      if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1986
        taosMemoryFreeClear(pToFree);
×
1987
        TAOS_CHECK_EXIT(code);
×
1988
      }
1989

UNCOV
1990
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
UNCOV
1991
      taosArrayRemove(remainCols, j);
×
UNCOV
1992
      taosArrayRemove(ignoreFromRocks, j);
×
1993
    } else {
UNCOV
1994
      ++j;
×
1995
    }
1996

UNCOV
1997
    taosMemoryFreeClear(pToFree);
×
1998
  }
1999

UNCOV
2000
  if (TARRAY_SIZE(remainCols) > 0) {
×
2001
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
UNCOV
2002
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
×
2003
  }
2004

UNCOV
2005
_exit:
×
UNCOV
2006
  taosMemoryFree(key_list);
×
UNCOV
2007
  taosMemoryFree(keys_list);
×
UNCOV
2008
  taosMemoryFree(keys_list_sizes);
×
UNCOV
2009
  if (values_list) {
×
UNCOV
2010
    for (int i = 0; i < num_keys; ++i) {
×
UNCOV
2011
      rocksdb_free(values_list[i]);
×
2012
    }
UNCOV
2013
    taosMemoryFree(values_list);
×
2014
  }
UNCOV
2015
  taosMemoryFree(values_list_sizes);
×
2016

UNCOV
2017
  TAOS_RETURN(code);
×
2018
}
2019

UNCOV
2020
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
×
2021
                                        int8_t ltype, SArray *keyArray) {
UNCOV
2022
  int32_t    code = 0, lino = 0;
×
UNCOV
2023
  SArray    *remainCols = NULL;
×
UNCOV
2024
  SArray    *ignoreFromRocks = NULL;
×
UNCOV
2025
  SLRUCache *pCache = pTsdb->lruCache;
×
UNCOV
2026
  SArray    *pCidList = pr->pCidList;
×
UNCOV
2027
  int        numKeys = TARRAY_SIZE(pCidList);
×
2028

UNCOV
2029
  for (int i = 0; i < numKeys; ++i) {
×
UNCOV
2030
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
×
2031

UNCOV
2032
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
×
2033
    // for select last_row, last case
UNCOV
2034
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
×
UNCOV
2035
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
×
UNCOV
2036
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
×
2037
    }
UNCOV
2038
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
×
UNCOV
2039
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
×
UNCOV
2040
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
×
2041
    }
2042

UNCOV
2043
    if (!taosArrayPush(keyArray, &key)) {
×
UNCOV
2044
      TAOS_CHECK_EXIT(terrno);
×
2045
    }
2046

UNCOV
2047
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
×
UNCOV
2048
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
×
UNCOV
2049
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
×
UNCOV
2050
      SLastCol lastCol = *pLastCol;
×
UNCOV
2051
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
×
UNCOV
2052
        tsdbLRUCacheRelease(pCache, h, false);
×
2053
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
2054
      }
2055

UNCOV
2056
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
×
UNCOV
2057
        code = terrno;
×
2058
        tsdbLRUCacheRelease(pCache, h, false);
×
2059
        goto _exit;
×
2060
      }
2061
    } else {
2062
      // no cache or cache is invalid
UNCOV
2063
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
×
UNCOV
2064
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
×
2065

UNCOV
2066
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
×
UNCOV
2067
        code = terrno;
×
2068
        tsdbLRUCacheRelease(pCache, h, false);
×
2069
        goto _exit;
×
2070
      }
2071

UNCOV
2072
      if (!remainCols) {
×
UNCOV
2073
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
×
UNCOV
2074
          code = terrno;
×
2075
          tsdbLRUCacheRelease(pCache, h, false);
×
2076
          goto _exit;
×
2077
        }
2078
      }
UNCOV
2079
      if (!ignoreFromRocks) {
×
UNCOV
2080
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
×
UNCOV
2081
          code = terrno;
×
2082
          tsdbLRUCacheRelease(pCache, h, false);
×
2083
          goto _exit;
×
2084
        }
2085
      }
UNCOV
2086
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
×
UNCOV
2087
        code = terrno;
×
2088
        tsdbLRUCacheRelease(pCache, h, false);
×
2089
        goto _exit;
×
2090
      }
UNCOV
2091
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
×
UNCOV
2092
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
×
UNCOV
2093
        code = terrno;
×
2094
        tsdbLRUCacheRelease(pCache, h, false);
×
2095
        goto _exit;
×
2096
      }
2097
    }
2098

UNCOV
2099
    if (h) {
×
UNCOV
2100
      tsdbLRUCacheRelease(pCache, h, false);
×
2101
    }
2102
  }
2103

UNCOV
2104
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
×
UNCOV
2105
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
2106

UNCOV
2107
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
×
UNCOV
2108
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
×
UNCOV
2109
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
×
UNCOV
2110
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
×
UNCOV
2111
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
×
UNCOV
2112
        SLastCol lastCol = *pLastCol;
×
2113
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
2114
        if (code) {
×
2115
          tsdbLRUCacheRelease(pCache, h, false);
×
2116
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
2117
          TAOS_RETURN(code);
×
2118
        }
2119

UNCOV
2120
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
2121

UNCOV
2122
        taosArrayRemove(remainCols, i);
×
2123
        taosArrayRemove(ignoreFromRocks, i);
×
2124
      } else {
2125
        // no cache or cache is invalid
UNCOV
2126
        ++i;
×
2127
      }
UNCOV
2128
      if (h) {
×
UNCOV
2129
        tsdbLRUCacheRelease(pCache, h, false);
×
2130
      }
2131
    }
2132

2133
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
UNCOV
2134
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
×
2135

UNCOV
2136
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
2137
  }
2138

UNCOV
2139
_exit:
×
UNCOV
2140
  if (remainCols) {
×
UNCOV
2141
    taosArrayDestroy(remainCols);
×
2142
  }
UNCOV
2143
  if (ignoreFromRocks) {
×
UNCOV
2144
    taosArrayDestroy(ignoreFromRocks);
×
2145
  }
2146

UNCOV
2147
  TAOS_RETURN(code);
×
2148
}
2149

2150
typedef enum SMEMNEXTROWSTATES {
2151
  SMEMNEXTROW_ENTER,
2152
  SMEMNEXTROW_NEXT,
2153
} SMEMNEXTROWSTATES;
2154

2155
typedef struct SMemNextRowIter {
2156
  SMEMNEXTROWSTATES state;
2157
  STbData          *pMem;  // [input]
2158
  STbDataIter       iter;  // mem buffer skip list iterator
2159
  int64_t           lastTs;
2160
} SMemNextRowIter;
2161

UNCOV
2162
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
×
2163
                                 int nCols) {
UNCOV
2164
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
×
UNCOV
2165
  int32_t          code = 0;
×
UNCOV
2166
  *pIgnoreEarlierTs = false;
×
UNCOV
2167
  switch (state->state) {
×
UNCOV
2168
    case SMEMNEXTROW_ENTER: {
×
UNCOV
2169
      if (state->pMem != NULL) {
×
2170
        /*
2171
        if (state->pMem->maxKey <= state->lastTs) {
2172
          *ppRow = NULL;
2173
          *pIgnoreEarlierTs = true;
2174

2175
          TAOS_RETURN(code);
2176
        }
2177
        */
UNCOV
2178
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
×
2179

UNCOV
2180
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
×
UNCOV
2181
        if (pMemRow) {
×
UNCOV
2182
          *ppRow = pMemRow;
×
UNCOV
2183
          state->state = SMEMNEXTROW_NEXT;
×
2184

UNCOV
2185
          TAOS_RETURN(code);
×
2186
        }
2187
      }
2188

UNCOV
2189
      *ppRow = NULL;
×
2190

UNCOV
2191
      TAOS_RETURN(code);
×
2192
    }
UNCOV
2193
    case SMEMNEXTROW_NEXT:
×
UNCOV
2194
      if (tsdbTbDataIterNext(&state->iter)) {
×
UNCOV
2195
        *ppRow = tsdbTbDataIterGet(&state->iter);
×
2196

UNCOV
2197
        TAOS_RETURN(code);
×
2198
      } else {
UNCOV
2199
        *ppRow = NULL;
×
2200

UNCOV
2201
        TAOS_RETURN(code);
×
2202
      }
UNCOV
2203
    default:
×
2204
      break;
×
2205
  }
2206

UNCOV
2207
_err:
×
2208
  *ppRow = NULL;
×
2209

UNCOV
2210
  TAOS_RETURN(code);
×
2211
}
2212

2213
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2214
                                  int nCols);
2215
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2216

2217
typedef struct {
2218
  TSDBROW             *pRow;
2219
  bool                 stop;
2220
  bool                 next;
2221
  bool                 ignoreEarlierTs;
2222
  void                *iter;
2223
  _next_row_fn_t       nextRowFn;
2224
  _next_row_clear_fn_t nextRowClearFn;
2225
} TsdbNextRowState;
2226

2227
typedef struct {
2228
  SArray           *pMemDelData;
2229
  SArray           *pSkyline;
2230
  int64_t           iSkyline;
2231
  SBlockIdx         idx;
2232
  SMemNextRowIter   memState;
2233
  SMemNextRowIter   imemState;
2234
  TSDBROW           memRow, imemRow;
2235
  TsdbNextRowState  input[2];
2236
  SCacheRowsReader *pr;
2237
  STsdb            *pTsdb;
2238
} MemNextRowIter;
2239

UNCOV
2240
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
×
2241
                              STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
UNCOV
2242
  int32_t code = 0, lino = 0;
×
2243

UNCOV
2244
  STbData *pMem = NULL;
×
UNCOV
2245
  if (pReadSnap->pMem) {
×
UNCOV
2246
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
×
2247
  }
2248

UNCOV
2249
  STbData *pIMem = NULL;
×
UNCOV
2250
  if (pReadSnap->pIMem) {
×
UNCOV
2251
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
2252
  }
2253

UNCOV
2254
  pIter->pTsdb = pTsdb;
×
2255

UNCOV
2256
  pIter->pMemDelData = NULL;
×
2257

UNCOV
2258
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
×
2259

UNCOV
2260
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
×
2261

UNCOV
2262
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
×
UNCOV
2263
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
×
2264

UNCOV
2265
  if (pMem) {
×
UNCOV
2266
    pIter->memState.pMem = pMem;
×
UNCOV
2267
    pIter->memState.state = SMEMNEXTROW_ENTER;
×
UNCOV
2268
    pIter->input[0].stop = false;
×
UNCOV
2269
    pIter->input[0].next = true;
×
2270
  }
2271

UNCOV
2272
  if (pIMem) {
×
UNCOV
2273
    pIter->imemState.pMem = pIMem;
×
2274
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
2275
    pIter->input[1].stop = false;
×
2276
    pIter->input[1].next = true;
×
2277
  }
2278

UNCOV
2279
  pIter->pr = pr;
×
2280

UNCOV
2281
_exit:
×
UNCOV
2282
  if (code) {
×
UNCOV
2283
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2284
  }
2285

UNCOV
2286
  TAOS_RETURN(code);
×
2287
}
2288

UNCOV
2289
static void memRowIterClose(MemNextRowIter *pIter) {
×
UNCOV
2290
  for (int i = 0; i < 2; ++i) {
×
UNCOV
2291
    if (pIter->input[i].nextRowClearFn) {
×
UNCOV
2292
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
2293
    }
2294
  }
2295

UNCOV
2296
  if (pIter->pSkyline) {
×
UNCOV
2297
    taosArrayDestroy(pIter->pSkyline);
×
2298
  }
2299

UNCOV
2300
  if (pIter->pMemDelData) {
×
UNCOV
2301
    taosArrayDestroy(pIter->pMemDelData);
×
2302
  }
UNCOV
2303
}
×
2304

UNCOV
2305
static void freeTableInfoFunc(void *param) {
×
UNCOV
2306
  void **p = (void **)param;
×
UNCOV
2307
  taosMemoryFreeClear(*p);
×
UNCOV
2308
}
×
2309

UNCOV
2310
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
×
UNCOV
2311
  if (!pReader->pTableMap) {
×
UNCOV
2312
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
UNCOV
2313
    if (!pReader->pTableMap) {
×
UNCOV
2314
      return NULL;
×
2315
    }
2316

UNCOV
2317
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
×
2318
  }
2319

UNCOV
2320
  STableLoadInfo  *pInfo = NULL;
×
UNCOV
2321
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
×
UNCOV
2322
  if (!ppInfo) {
×
UNCOV
2323
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
×
UNCOV
2324
    if (pInfo) {
×
UNCOV
2325
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
×
UNCOV
2326
        return NULL;
×
2327
      }
2328
    }
2329

UNCOV
2330
    return pInfo;
×
2331
  }
2332

UNCOV
2333
  return *ppInfo;
×
2334
}
2335

UNCOV
2336
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
×
UNCOV
2337
  int32_t code = 0, lino = 0;
×
2338

UNCOV
2339
  for (;;) {
×
2340
    for (int i = 0; i < 2; ++i) {
×
UNCOV
2341
      if (pIter->input[i].next && !pIter->input[i].stop) {
×
UNCOV
2342
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
×
2343
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
2344
                        &lino, _exit);
2345

UNCOV
2346
        if (pIter->input[i].pRow == NULL) {
×
UNCOV
2347
          pIter->input[i].stop = true;
×
UNCOV
2348
          pIter->input[i].next = false;
×
2349
        }
2350
      }
2351
    }
2352

UNCOV
2353
    if (pIter->input[0].stop && pIter->input[1].stop) {
×
UNCOV
2354
      return NULL;
×
2355
    }
2356

UNCOV
2357
    TSDBROW *max[2] = {0};
×
UNCOV
2358
    int      iMax[2] = {-1, -1};
×
UNCOV
2359
    int      nMax = 0;
×
UNCOV
2360
    SRowKey  maxKey = {.ts = TSKEY_MIN};
×
2361

UNCOV
2362
    for (int i = 0; i < 2; ++i) {
×
UNCOV
2363
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
×
UNCOV
2364
        STsdbRowKey tsdbRowKey = {0};
×
UNCOV
2365
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
×
2366

2367
        // merging & deduplicating on client side
UNCOV
2368
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
×
UNCOV
2369
        if (c <= 0) {
×
UNCOV
2370
          if (c < 0) {
×
UNCOV
2371
            nMax = 0;
×
UNCOV
2372
            maxKey = tsdbRowKey.key;
×
2373
          }
2374

UNCOV
2375
          iMax[nMax] = i;
×
UNCOV
2376
          max[nMax++] = pIter->input[i].pRow;
×
2377
        }
UNCOV
2378
        pIter->input[i].next = false;
×
2379
      }
2380
    }
2381

UNCOV
2382
    TSDBROW *merge[2] = {0};
×
UNCOV
2383
    int      iMerge[2] = {-1, -1};
×
UNCOV
2384
    int      nMerge = 0;
×
UNCOV
2385
    for (int i = 0; i < nMax; ++i) {
×
UNCOV
2386
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
×
2387

UNCOV
2388
      if (!pIter->pSkyline) {
×
UNCOV
2389
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
×
UNCOV
2390
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
×
2391

UNCOV
2392
        uint64_t        uid = pIter->idx.uid;
×
UNCOV
2393
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
×
UNCOV
2394
        TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
×
2395

UNCOV
2396
        if (pInfo->pTombData == NULL) {
×
UNCOV
2397
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
×
UNCOV
2398
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
×
2399
        }
2400

UNCOV
2401
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
×
UNCOV
2402
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2403
        }
2404

UNCOV
2405
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
×
UNCOV
2406
        if (delSize > 0) {
×
UNCOV
2407
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
×
UNCOV
2408
          TAOS_CHECK_GOTO(code, &lino, _exit);
×
2409
        }
UNCOV
2410
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
×
2411
      }
2412

UNCOV
2413
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
×
UNCOV
2414
      if (!deleted) {
×
UNCOV
2415
        iMerge[nMerge] = iMax[i];
×
UNCOV
2416
        merge[nMerge++] = max[i];
×
2417
      }
2418

UNCOV
2419
      pIter->input[iMax[i]].next = deleted;
×
2420
    }
2421

UNCOV
2422
    if (nMerge > 0) {
×
UNCOV
2423
      pIter->input[iMerge[0]].next = true;
×
2424

UNCOV
2425
      return merge[0];
×
2426
    }
2427
  }
2428

UNCOV
2429
_exit:
×
2430
  if (code) {
×
2431
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2432
  }
2433

UNCOV
2434
  return NULL;
×
2435
}
2436

UNCOV
2437
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
×
UNCOV
2438
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
×
UNCOV
2439
  *ppDst = taosMemoryMalloc(len);
×
UNCOV
2440
  if (NULL == *ppDst) {
×
UNCOV
2441
    TAOS_RETURN(terrno);
×
2442
  }
UNCOV
2443
  memcpy(*ppDst, pSrc, len);
×
2444

UNCOV
2445
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
2446
}
2447

UNCOV
2448
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
×
UNCOV
2449
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
×
UNCOV
2450
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
×
2451
  }
2452

UNCOV
2453
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
×
UNCOV
2454
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
2455
  }
2456

UNCOV
2457
  taosMemoryFreeClear(pReader->pCurrSchema);
×
2458
  TAOS_RETURN(
×
2459
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
2460
}
2461

UNCOV
2462
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
×
2463
                                        SArray *keyArray) {
UNCOV
2464
  int32_t        code = 0;
×
UNCOV
2465
  int32_t        lino = 0;
×
UNCOV
2466
  STSchema      *pTSchema = pr->pSchema;
×
UNCOV
2467
  SLRUCache     *pCache = pTsdb->lruCache;
×
UNCOV
2468
  SArray        *pCidList = pr->pCidList;
×
UNCOV
2469
  int            numKeys = TARRAY_SIZE(pCidList);
×
UNCOV
2470
  MemNextRowIter iter = {0};
×
UNCOV
2471
  SSHashObj     *iColHash = NULL;
×
UNCOV
2472
  STSDBRowIter   rowIter = {0};
×
2473

2474
  // 1, get from mem, imem filtered with delete info
UNCOV
2475
  TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
×
2476

UNCOV
2477
  TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
×
UNCOV
2478
  if (!pRow) {
×
UNCOV
2479
    goto _exit;
×
2480
  }
2481

UNCOV
2482
  int32_t sversion = TSDBROW_SVERSION(pRow);
×
UNCOV
2483
  if (sversion != -1) {
×
UNCOV
2484
    TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
×
2485

UNCOV
2486
    pTSchema = pr->pCurrSchema;
×
2487
  }
UNCOV
2488
  int32_t nCol = pTSchema->numOfCols;
×
2489

UNCOV
2490
  STsdbRowKey rowKey = {0};
×
UNCOV
2491
  tsdbRowGetKey(pRow, &rowKey);
×
2492

UNCOV
2493
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
×
2494

UNCOV
2495
  int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
×
UNCOV
2496
  for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) {
×
UNCOV
2497
    SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
×
UNCOV
2498
    if (pColVal->cid < pTargetCol->colVal.cid) {
×
UNCOV
2499
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
×
2500

UNCOV
2501
      continue;
×
2502
    }
UNCOV
2503
    if (pColVal->cid > pTargetCol->colVal.cid) {
×
UNCOV
2504
      break;
×
2505
    }
2506

UNCOV
2507
    int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
×
UNCOV
2508
    if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
×
UNCOV
2509
      if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
×
UNCOV
2510
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
×
UNCOV
2511
        TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
×
2512

UNCOV
2513
        tsdbCacheFreeSLastColItem(pTargetCol);
×
UNCOV
2514
        taosArraySet(pLastArray, jCol, &lastCol);
×
2515
      }
2516
    } else {
UNCOV
2517
      if (COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
2518
        if (cmp_res <= 0) {
×
UNCOV
2519
          SLastCol lastCol = {
×
2520
              .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
UNCOV
2521
          TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
×
2522

UNCOV
2523
          tsdbCacheFreeSLastColItem(pTargetCol);
×
UNCOV
2524
          taosArraySet(pLastArray, jCol, &lastCol);
×
2525
        }
2526
      } else {
UNCOV
2527
        if (!iColHash) {
×
UNCOV
2528
          iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
×
UNCOV
2529
          if (iColHash == NULL) {
×
UNCOV
2530
            TAOS_CHECK_EXIT(terrno);
×
2531
          }
2532
        }
2533

UNCOV
2534
        if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
×
UNCOV
2535
          TAOS_CHECK_EXIT(terrno);
×
2536
        }
2537
      }
2538
    }
2539

UNCOV
2540
    ++jCol;
×
2541

UNCOV
2542
    if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) {
×
UNCOV
2543
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
×
2544
    }
2545
  }
UNCOV
2546
  tsdbRowClose(&rowIter);
×
2547

UNCOV
2548
  if (iColHash && tSimpleHashGetSize(iColHash) > 0) {
×
UNCOV
2549
    pRow = memRowIterGet(&iter, false, NULL, 0);
×
UNCOV
2550
    while (pRow) {
×
UNCOV
2551
      if (tSimpleHashGetSize(iColHash) == 0) {
×
UNCOV
2552
        break;
×
2553
      }
2554

UNCOV
2555
      sversion = TSDBROW_SVERSION(pRow);
×
UNCOV
2556
      if (sversion != -1) {
×
UNCOV
2557
        TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
×
2558

UNCOV
2559
        pTSchema = pr->pCurrSchema;
×
2560
      }
UNCOV
2561
      nCol = pTSchema->numOfCols;
×
2562

UNCOV
2563
      STsdbRowKey tsdbRowKey = {0};
×
UNCOV
2564
      tsdbRowGetKey(pRow, &tsdbRowKey);
×
2565

UNCOV
2566
      TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
×
2567

UNCOV
2568
      iCol = 0;
×
UNCOV
2569
      for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
×
UNCOV
2570
           pColVal = tsdbRowIterNext(&rowIter), iCol++) {
×
UNCOV
2571
        int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
×
UNCOV
2572
        if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
2573
          SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol];
×
2574

UNCOV
2575
          int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key);
×
UNCOV
2576
          if (cmp_res <= 0) {
×
UNCOV
2577
            SLastCol lastCol = {
×
2578
                .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
UNCOV
2579
            TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
×
2580

UNCOV
2581
            tsdbCacheFreeSLastColItem(pTargetCol);
×
UNCOV
2582
            taosArraySet(pLastArray, *pjCol, &lastCol);
×
2583
          }
2584

UNCOV
2585
          TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
×
2586
        }
2587
      }
UNCOV
2588
      tsdbRowClose(&rowIter);
×
2589

UNCOV
2590
      pRow = memRowIterGet(&iter, false, NULL, 0);
×
2591
    }
2592
  }
2593

UNCOV
2594
_exit:
×
UNCOV
2595
  if (code) {
×
UNCOV
2596
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2597

2598
    tsdbRowClose(&rowIter);
×
2599
  }
2600

UNCOV
2601
  tSimpleHashCleanup(iColHash);
×
2602

UNCOV
2603
  memRowIterClose(&iter);
×
2604

UNCOV
2605
  TAOS_RETURN(code);
×
2606
}
2607

UNCOV
2608
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
×
UNCOV
2609
  int32_t code = 0;
×
UNCOV
2610
  int32_t lino = 0;
×
2611

UNCOV
2612
  SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
×
UNCOV
2613
  if (!keyArray) {
×
UNCOV
2614
    TAOS_CHECK_EXIT(terrno);
×
2615
  }
2616

UNCOV
2617
  TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
×
2618

UNCOV
2619
  if (tsUpdateCacheBatch) {
×
UNCOV
2620
    TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
×
2621
  }
2622

UNCOV
2623
_exit:
×
UNCOV
2624
  if (code) {
×
UNCOV
2625
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2626
  }
2627

UNCOV
2628
  if (keyArray) {
×
UNCOV
2629
    taosArrayDestroy(keyArray);
×
2630
  }
2631

UNCOV
2632
  TAOS_RETURN(code);
×
2633
}
2634

UNCOV
2635
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
×
UNCOV
2636
  int32_t   code = 0, lino = 0;
×
UNCOV
2637
  STSchema *pTSchema = NULL;
×
UNCOV
2638
  int       sver = -1;
×
UNCOV
2639
  int       numKeys = 0;
×
UNCOV
2640
  SArray   *remainCols = NULL;
×
2641

UNCOV
2642
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
2643

UNCOV
2644
  int numCols = pTSchema->numOfCols;
×
2645

UNCOV
2646
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
2647

UNCOV
2648
  for (int i = 0; i < numCols; ++i) {
×
UNCOV
2649
    int16_t cid = pTSchema->columns[i].colId;
×
UNCOV
2650
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
×
UNCOV
2651
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
×
UNCOV
2652
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
×
UNCOV
2653
      if (h) {
×
UNCOV
2654
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
×
UNCOV
2655
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
×
UNCOV
2656
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
×
UNCOV
2657
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
×
2658
                              .dirty = 1,
2659
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
UNCOV
2660
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
×
2661
        }
UNCOV
2662
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
×
UNCOV
2663
        TAOS_CHECK_EXIT(code);
×
2664
      } else {
UNCOV
2665
        if (!remainCols) {
×
UNCOV
2666
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
×
2667
        }
UNCOV
2668
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
×
UNCOV
2669
          TAOS_CHECK_EXIT(terrno);
×
2670
        }
2671
      }
2672
    }
2673
  }
2674

UNCOV
2675
  if (remainCols) {
×
UNCOV
2676
    numKeys = TARRAY_SIZE(remainCols);
×
2677
  }
2678

UNCOV
2679
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
×
UNCOV
2680
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
×
UNCOV
2681
  char  **values_list = NULL;
×
UNCOV
2682
  size_t *values_list_sizes = NULL;
×
2683

UNCOV
2684
  if (!keys_list || !keys_list_sizes) {
×
UNCOV
2685
    code = terrno;
×
UNCOV
2686
    goto _exit;
×
2687
  }
2688
  const size_t klen = ROCKS_KEY_LEN;
×
2689

UNCOV
2690
  for (int i = 0; i < numKeys; ++i) {
×
UNCOV
2691
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
×
UNCOV
2692
    if (!key) {
×
UNCOV
2693
      code = terrno;
×
UNCOV
2694
      goto _exit;
×
2695
    }
2696
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
×
2697

UNCOV
2698
    ((SLastKey *)key)[0] = idxKey->key;
×
2699

UNCOV
2700
    keys_list[i] = key;
×
UNCOV
2701
    keys_list_sizes[i] = klen;
×
2702
  }
2703

UNCOV
2704
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
×
2705

UNCOV
2706
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
×
2707
                                              &values_list, &values_list_sizes),
2708
                  NULL, _exit);
2709

UNCOV
2710
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
×
UNCOV
2711
  for (int i = 0; i < numKeys; ++i) {
×
UNCOV
2712
    SLastCol *pLastCol = NULL;
×
UNCOV
2713
    if (values_list[i] != NULL) {
×
UNCOV
2714
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
UNCOV
2715
      if (code != TSDB_CODE_SUCCESS) {
×
2716
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2717
                  tstrerror(code));
2718
        goto _exit;
×
2719
      }
2720
    }
UNCOV
2721
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
×
UNCOV
2722
    SLastKey *pLastKey = &idxKey->key;
×
UNCOV
2723
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
×
UNCOV
2724
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
×
UNCOV
2725
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
×
2726
                             .dirty = 0,
2727
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2728

UNCOV
2729
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
×
UNCOV
2730
        taosMemoryFreeClear(pLastCol);
×
2731
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2732
        goto _exit;
×
2733
      }
2734
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
×
UNCOV
2735
        taosMemoryFreeClear(pLastCol);
×
2736
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2737
        goto _exit;
×
2738
      }
2739
    }
2740

UNCOV
2741
    if (pLastCol == NULL) {
×
UNCOV
2742
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
×
2743
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2744
    }
2745

UNCOV
2746
    taosMemoryFreeClear(pLastCol);
×
2747
  }
2748

UNCOV
2749
  rocksMayWrite(pTsdb, false);
×
2750

UNCOV
2751
_exit:
×
UNCOV
2752
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
2753

UNCOV
2754
  for (int i = 0; i < numKeys; ++i) {
×
UNCOV
2755
    taosMemoryFree(keys_list[i]);
×
2756
  }
UNCOV
2757
  taosMemoryFree(keys_list);
×
UNCOV
2758
  taosMemoryFree(keys_list_sizes);
×
UNCOV
2759
  if (values_list) {
×
UNCOV
2760
    for (int i = 0; i < numKeys; ++i) {
×
UNCOV
2761
      rocksdb_free(values_list[i]);
×
2762
    }
UNCOV
2763
    taosMemoryFree(values_list);
×
2764
  }
UNCOV
2765
  taosMemoryFree(values_list_sizes);
×
UNCOV
2766
  taosArrayDestroy(remainCols);
×
UNCOV
2767
  taosMemoryFree(pTSchema);
×
2768

UNCOV
2769
  TAOS_RETURN(code);
×
2770
}
2771

2772
int32_t tsdbOpenCache(STsdb *pTsdb) {
18✔
2773
  int32_t code = 0, lino = 0;
18✔
2774
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
18✔
2775

2776
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
18✔
2777
  if (pCache == NULL) {
18!
UNCOV
2778
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2779
  }
2780

2781
  TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
18!
2782

2783
  TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
18!
2784

2785
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
18!
2786

2787
  taosLRUCacheSetStrictCapacity(pCache, false);
18✔
2788

2789
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
18✔
2790

2791
_err:
18✔
2792
  if (code) {
18!
UNCOV
2793
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2794
  }
2795

2796
  pTsdb->lruCache = pCache;
18✔
2797

2798
  TAOS_RETURN(code);
18✔
2799
}
2800

2801
void tsdbCloseCache(STsdb *pTsdb) {
18✔
2802
  SLRUCache *pCache = pTsdb->lruCache;
18✔
2803
  if (pCache) {
18!
2804
    taosLRUCacheEraseUnrefEntries(pCache);
18✔
2805

2806
    taosLRUCacheCleanup(pCache);
18✔
2807

2808
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
18✔
2809
  }
2810

2811
  tsdbCloseBCache(pTsdb);
18✔
2812
  tsdbClosePgCache(pTsdb);
18✔
2813
  tsdbCloseRocksCache(pTsdb);
18✔
2814
}
18✔
2815

UNCOV
2816
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
UNCOV
2817
  if (cacheType == 0) {  // last_row
×
2818
    *(uint64_t *)key = (uint64_t)uid;
×
2819
  } else {  // last
2820
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2821
  }
2822

UNCOV
2823
  *len = sizeof(uint64_t);
×
UNCOV
2824
}
×
2825

2826
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
UNCOV
2827
  tb_uid_t suid = 0;
×
2828

2829
  SMetaReader mr = {0};
×
UNCOV
2830
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
2831
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
2832
    metaReaderClear(&mr);  // table not esist
×
2833
    return 0;
×
2834
  }
2835

UNCOV
2836
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
UNCOV
2837
    suid = mr.me.ctbEntry.suid;
×
2838
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
2839
    suid = 0;
×
2840
  } else {
2841
    suid = 0;
×
2842
  }
2843

UNCOV
2844
  metaReaderClear(&mr);
×
2845

2846
  return suid;
×
2847
}
2848

UNCOV
2849
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
UNCOV
2850
  int32_t code = 0;
×
2851

2852
  if (pDelIdx) {
×
UNCOV
2853
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
2854
  }
2855

UNCOV
2856
  TAOS_RETURN(code);
×
2857
}
2858

UNCOV
2859
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
UNCOV
2860
  int32_t   code = 0;
×
2861
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
2862

2863
  for (; pDelData; pDelData = pDelData->pNext) {
×
UNCOV
2864
    if (!taosArrayPush(aDelData, pDelData)) {
×
2865
      TAOS_RETURN(terrno);
×
2866
    }
2867
  }
2868

UNCOV
2869
  TAOS_RETURN(code);
×
2870
}
2871

UNCOV
2872
static uint64_t *getUidList(SCacheRowsReader *pReader) {
×
UNCOV
2873
  if (!pReader->uidList) {
×
UNCOV
2874
    int32_t numOfTables = pReader->numOfTables;
×
2875

UNCOV
2876
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
×
UNCOV
2877
    if (!pReader->uidList) {
×
UNCOV
2878
      return NULL;
×
2879
    }
2880

UNCOV
2881
    for (int32_t i = 0; i < numOfTables; ++i) {
×
UNCOV
2882
      uint64_t uid = pReader->pTableList[i].uid;
×
UNCOV
2883
      pReader->uidList[i] = uid;
×
2884
    }
2885

UNCOV
2886
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
×
2887
  }
2888

UNCOV
2889
  return pReader->uidList;
×
2890
}
2891

UNCOV
2892
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
×
2893
                               bool isFile) {
UNCOV
2894
  int32_t   code = 0;
×
UNCOV
2895
  int32_t   numOfTables = pReader->numOfTables;
×
UNCOV
2896
  int64_t   suid = pReader->info.suid;
×
UNCOV
2897
  uint64_t *uidList = getUidList(pReader);
×
2898

UNCOV
2899
  if (!uidList) {
×
UNCOV
2900
    TAOS_RETURN(terrno);
×
2901
  }
2902

UNCOV
2903
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
×
UNCOV
2904
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
×
UNCOV
2905
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
×
UNCOV
2906
      continue;
×
2907
    }
2908

UNCOV
2909
    if (pTombBlk->minTbid.suid > suid ||
×
UNCOV
2910
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
×
2911
      break;
2912
    }
2913

UNCOV
2914
    STombBlock block = {0};
×
UNCOV
2915
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
×
UNCOV
2916
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
×
UNCOV
2917
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2918
      TAOS_RETURN(code);
×
2919
    }
2920

UNCOV
2921
    uint64_t        uid = uidList[j];
×
UNCOV
2922
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
×
UNCOV
2923
    if (!pInfo) {
×
UNCOV
2924
      tTombBlockDestroy(&block);
×
UNCOV
2925
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
2926
    }
2927

UNCOV
2928
    if (pInfo->pTombData == NULL) {
×
UNCOV
2929
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
×
2930
    }
2931

UNCOV
2932
    STombRecord record = {0};
×
UNCOV
2933
    bool        finished = false;
×
UNCOV
2934
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
×
UNCOV
2935
      code = tTombBlockGet(&block, k, &record);
×
UNCOV
2936
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2937
        finished = true;
×
UNCOV
2938
        break;
×
2939
      }
2940

UNCOV
2941
      if (record.suid < suid) {
×
UNCOV
2942
        continue;
×
2943
      }
2944
      if (record.suid > suid) {
×
UNCOV
2945
        finished = true;
×
UNCOV
2946
        break;
×
2947
      }
2948

UNCOV
2949
      bool newTable = false;
×
UNCOV
2950
      if (uid < record.uid) {
×
UNCOV
2951
        while (j < numOfTables && uidList[j] < record.uid) {
×
UNCOV
2952
          ++j;
×
UNCOV
2953
          newTable = true;
×
2954
        }
2955

UNCOV
2956
        if (j >= numOfTables) {
×
UNCOV
2957
          finished = true;
×
UNCOV
2958
          break;
×
2959
        }
2960

UNCOV
2961
        uid = uidList[j];
×
2962
      }
2963

UNCOV
2964
      if (record.uid < uid) {
×
UNCOV
2965
        continue;
×
2966
      }
2967

UNCOV
2968
      if (newTable) {
×
UNCOV
2969
        pInfo = getTableLoadInfo(pReader, uid);
×
UNCOV
2970
        if (!pInfo) {
×
UNCOV
2971
          code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2972
          finished = true;
×
2973
          break;
×
2974
        }
2975
        if (pInfo->pTombData == NULL) {
×
UNCOV
2976
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
×
UNCOV
2977
          if (!pInfo->pTombData) {
×
UNCOV
2978
            code = terrno;
×
UNCOV
2979
            finished = true;
×
2980
            break;
×
2981
          }
2982
        }
2983
      }
2984

UNCOV
2985
      if (record.version <= pReader->info.verRange.maxVer) {
×
2986
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
2987
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
2988

UNCOV
2989
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
×
UNCOV
2990
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
×
UNCOV
2991
          TAOS_RETURN(terrno);
×
2992
        }
2993
      }
2994
    }
2995

UNCOV
2996
    tTombBlockDestroy(&block);
×
2997

UNCOV
2998
    if (finished) {
×
UNCOV
2999
      TAOS_RETURN(code);
×
3000
    }
3001
  }
3002

UNCOV
3003
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
3004
}
3005

UNCOV
3006
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
×
UNCOV
3007
  const TTombBlkArray *pBlkArray = NULL;
×
3008

UNCOV
3009
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
×
3010

UNCOV
3011
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
×
3012
}
3013

UNCOV
3014
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
×
UNCOV
3015
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
×
UNCOV
3016
  const TTombBlkArray *pBlkArray = NULL;
×
3017

UNCOV
3018
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
×
3019

UNCOV
3020
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
×
3021
}
3022

3023
typedef struct {
3024
  SMergeTree  mergeTree;
3025
  SMergeTree *pMergeTree;
3026
} SFSLastIter;
3027

UNCOV
3028
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
×
3029
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
UNCOV
3030
  int32_t code = 0;
×
UNCOV
3031
  destroySttBlockReader(pr->pLDataIterArray, NULL);
×
UNCOV
3032
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
×
UNCOV
3033
  if (pr->pLDataIterArray == NULL) return terrno;
×
3034

UNCOV
3035
  SMergeTreeConf conf = {
×
3036
      .uid = uid,
3037
      .suid = suid,
3038
      .pTsdb = pTsdb,
3039
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
3040
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
3041
      .strictTimeRange = false,
3042
      .pSchema = pTSchema,
3043
      .pCurrentFileset = pFileSet,
3044
      .backward = 1,
UNCOV
3045
      .pSttFileBlockIterArray = pr->pLDataIterArray,
×
3046
      .pCols = aCols,
3047
      .numOfCols = nCols,
3048
      .loadTombFn = loadSttTomb,
3049
      .pReader = pr,
UNCOV
3050
      .idstr = pr->idstr,
×
UNCOV
3051
      .pCurRowKey = &pr->rowKey,
×
3052
  };
3053

UNCOV
3054
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
×
3055

UNCOV
3056
  iter->pMergeTree = &iter->mergeTree;
×
3057

UNCOV
3058
  TAOS_RETURN(code);
×
3059
}
3060

UNCOV
3061
static int32_t lastIterClose(SFSLastIter **iter) {
×
UNCOV
3062
  int32_t code = 0;
×
3063

UNCOV
3064
  if ((*iter)->pMergeTree) {
×
UNCOV
3065
    tMergeTreeClose((*iter)->pMergeTree);
×
UNCOV
3066
    (*iter)->pMergeTree = NULL;
×
3067
  }
3068

UNCOV
3069
  *iter = NULL;
×
3070

UNCOV
3071
  TAOS_RETURN(code);
×
3072
}
3073

UNCOV
3074
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
×
UNCOV
3075
  bool hasVal = false;
×
UNCOV
3076
  *ppRow = NULL;
×
3077

UNCOV
3078
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
×
UNCOV
3079
  if (code != 0) {
×
UNCOV
3080
    return code;
×
3081
  }
3082

UNCOV
3083
  if (!hasVal) {
×
UNCOV
3084
    *ppRow = NULL;
×
UNCOV
3085
    TAOS_RETURN(code);
×
3086
  }
3087

UNCOV
3088
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
×
UNCOV
3089
  TAOS_RETURN(code);
×
3090
}
3091

3092
typedef enum SFSNEXTROWSTATES {
3093
  SFSNEXTROW_FS,
3094
  SFSNEXTROW_FILESET,
3095
  SFSNEXTROW_INDEXLIST,
3096
  SFSNEXTROW_BRINBLOCK,
3097
  SFSNEXTROW_BRINRECORD,
3098
  SFSNEXTROW_BLOCKDATA,
3099
  SFSNEXTROW_BLOCKROW,
3100
  SFSNEXTROW_NEXTSTTROW
3101
} SFSNEXTROWSTATES;
3102

3103
struct CacheNextRowIter;
3104

3105
typedef struct SFSNextRowIter {
3106
  SFSNEXTROWSTATES         state;         // [input]
3107
  SBlockIdx               *pBlockIdxExp;  // [input]
3108
  STSchema                *pTSchema;      // [input]
3109
  tb_uid_t                 suid;
3110
  tb_uid_t                 uid;
3111
  int32_t                  iFileSet;
3112
  STFileSet               *pFileSet;
3113
  TFileSetArray           *aDFileSet;
3114
  SArray                  *pIndexList;
3115
  int32_t                  iBrinIndex;
3116
  SBrinBlock               brinBlock;
3117
  SBrinBlock              *pBrinBlock;
3118
  int32_t                  iBrinRecord;
3119
  SBrinRecord              brinRecord;
3120
  SBlockData               blockData;
3121
  SBlockData              *pBlockData;
3122
  int32_t                  nRow;
3123
  int32_t                  iRow;
3124
  TSDBROW                  row;
3125
  int64_t                  lastTs;
3126
  SFSLastIter              lastIter;
3127
  SFSLastIter             *pLastIter;
3128
  int8_t                   lastEmpty;
3129
  TSDBROW                 *pLastRow;
3130
  SRow                    *pTSRow;
3131
  SRowMerger               rowMerger;
3132
  SCacheRowsReader        *pr;
3133
  struct CacheNextRowIter *pRowIter;
3134
} SFSNextRowIter;
3135

3136
static void clearLastFileSet(SFSNextRowIter *state);
3137

UNCOV
3138
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
×
3139
                                int nCols) {
UNCOV
3140
  int32_t         code = 0, lino = 0;
×
UNCOV
3141
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
×
UNCOV
3142
  STsdb          *pTsdb = state->pr->pTsdb;
×
3143

UNCOV
3144
  if (SFSNEXTROW_FS == state->state) {
×
UNCOV
3145
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
×
3146

UNCOV
3147
    state->state = SFSNEXTROW_FILESET;
×
3148
  }
3149

UNCOV
3150
  if (SFSNEXTROW_FILESET == state->state) {
×
UNCOV
3151
  _next_fileset:
×
UNCOV
3152
    clearLastFileSet(state);
×
3153

UNCOV
3154
    if (--state->iFileSet < 0) {
×
UNCOV
3155
      *ppRow = NULL;
×
3156

UNCOV
3157
      TAOS_RETURN(code);
×
3158
    } else {
UNCOV
3159
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
×
3160
    }
3161

UNCOV
3162
    STFileObj **pFileObj = state->pFileSet->farr;
×
UNCOV
3163
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
×
UNCOV
3164
      if (state->pFileSet != state->pr->pCurFileSet) {
×
UNCOV
3165
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
×
UNCOV
3166
        const char           *filesName[4] = {0};
×
UNCOV
3167
        if (pFileObj[0] != NULL) {
×
UNCOV
3168
          conf.files[0].file = *pFileObj[0]->f;
×
UNCOV
3169
          conf.files[0].exist = true;
×
UNCOV
3170
          filesName[0] = pFileObj[0]->fname;
×
3171

UNCOV
3172
          conf.files[1].file = *pFileObj[1]->f;
×
UNCOV
3173
          conf.files[1].exist = true;
×
UNCOV
3174
          filesName[1] = pFileObj[1]->fname;
×
3175

UNCOV
3176
          conf.files[2].file = *pFileObj[2]->f;
×
UNCOV
3177
          conf.files[2].exist = true;
×
UNCOV
3178
          filesName[2] = pFileObj[2]->fname;
×
3179
        }
3180

UNCOV
3181
        if (pFileObj[3] != NULL) {
×
UNCOV
3182
          conf.files[3].exist = true;
×
UNCOV
3183
          conf.files[3].file = *pFileObj[3]->f;
×
UNCOV
3184
          filesName[3] = pFileObj[3]->fname;
×
3185
        }
3186

UNCOV
3187
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
×
3188

UNCOV
3189
        state->pr->pCurFileSet = state->pFileSet;
×
3190

UNCOV
3191
        code = loadDataTomb(state->pr, state->pr->pFileReader);
×
UNCOV
3192
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3193
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3194
                    tstrerror(code));
3195
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3196
        }
3197

UNCOV
3198
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
×
3199
      }
3200

UNCOV
3201
      if (!state->pIndexList) {
×
UNCOV
3202
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
×
UNCOV
3203
        if (!state->pIndexList) {
×
UNCOV
3204
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3205
        }
3206
      } else {
UNCOV
3207
        taosArrayClear(state->pIndexList);
×
3208
      }
3209

UNCOV
3210
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
×
3211

UNCOV
3212
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
×
UNCOV
3213
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
×
UNCOV
3214
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
×
UNCOV
3215
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
×
UNCOV
3216
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
×
UNCOV
3217
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3218
            }
3219
          }
UNCOV
3220
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
UNCOV
3221
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
3222
          break;
3223
        }
3224
      }
3225

UNCOV
3226
      int indexSize = TARRAY_SIZE(state->pIndexList);
×
UNCOV
3227
      if (indexSize <= 0) {
×
UNCOV
3228
        goto _check_stt_data;
×
3229
      }
3230

UNCOV
3231
      state->state = SFSNEXTROW_INDEXLIST;
×
UNCOV
3232
      state->iBrinIndex = 1;
×
3233
    }
3234

UNCOV
3235
  _check_stt_data:
×
UNCOV
3236
    if (state->pFileSet != state->pr->pCurFileSet) {
×
UNCOV
3237
      state->pr->pCurFileSet = state->pFileSet;
×
3238
    }
3239

UNCOV
3240
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
×
3241
                                 state->pr, state->lastTs, aCols, nCols),
3242
                    &lino, _err);
3243

UNCOV
3244
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
×
3245

UNCOV
3246
    if (!state->pLastRow) {
×
UNCOV
3247
      state->lastEmpty = 1;
×
3248

UNCOV
3249
      if (SFSNEXTROW_INDEXLIST != state->state) {
×
UNCOV
3250
        clearLastFileSet(state);
×
UNCOV
3251
        goto _next_fileset;
×
3252
      }
3253
    } else {
UNCOV
3254
      state->lastEmpty = 0;
×
3255

UNCOV
3256
      if (SFSNEXTROW_INDEXLIST != state->state) {
×
UNCOV
3257
        state->state = SFSNEXTROW_NEXTSTTROW;
×
3258

UNCOV
3259
        *ppRow = state->pLastRow;
×
UNCOV
3260
        state->pLastRow = NULL;
×
3261

UNCOV
3262
        TAOS_RETURN(code);
×
3263
      }
3264
    }
3265

UNCOV
3266
    state->pLastIter = &state->lastIter;
×
3267
  }
3268

UNCOV
3269
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
×
UNCOV
3270
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
×
3271

UNCOV
3272
    if (!state->pLastRow) {
×
UNCOV
3273
      if (state->pLastIter) {
×
UNCOV
3274
        code = lastIterClose(&state->pLastIter);
×
UNCOV
3275
        if (code != TSDB_CODE_SUCCESS) {
×
3276
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3277
                    tstrerror(code));
3278
          TAOS_RETURN(code);
×
3279
        }
3280
      }
3281

UNCOV
3282
      clearLastFileSet(state);
×
UNCOV
3283
      state->state = SFSNEXTROW_FILESET;
×
UNCOV
3284
      goto _next_fileset;
×
3285
    } else {
UNCOV
3286
      *ppRow = state->pLastRow;
×
UNCOV
3287
      state->pLastRow = NULL;
×
3288

UNCOV
3289
      TAOS_RETURN(code);
×
3290
    }
3291
  }
3292

UNCOV
3293
  if (SFSNEXTROW_INDEXLIST == state->state) {
×
UNCOV
3294
    SBrinBlk *pBrinBlk = NULL;
×
UNCOV
3295
  _next_brinindex:
×
UNCOV
3296
    if (--state->iBrinIndex < 0) {
×
UNCOV
3297
      if (state->pLastRow) {
×
UNCOV
3298
        state->state = SFSNEXTROW_NEXTSTTROW;
×
3299
        *ppRow = state->pLastRow;
×
3300
        state->pLastRow = NULL;
×
3301
        return code;
×
3302
      }
3303

UNCOV
3304
      clearLastFileSet(state);
×
UNCOV
3305
      goto _next_fileset;
×
3306
    } else {
3307
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
×
3308
    }
3309

UNCOV
3310
    if (!state->pBrinBlock) {
×
UNCOV
3311
      state->pBrinBlock = &state->brinBlock;
×
3312
    } else {
UNCOV
3313
      tBrinBlockClear(&state->brinBlock);
×
3314
    }
3315

UNCOV
3316
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
×
3317

UNCOV
3318
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
×
UNCOV
3319
    state->state = SFSNEXTROW_BRINBLOCK;
×
3320
  }
3321

UNCOV
3322
  if (SFSNEXTROW_BRINBLOCK == state->state) {
×
UNCOV
3323
  _next_brinrecord:
×
UNCOV
3324
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
×
UNCOV
3325
      tBrinBlockClear(&state->brinBlock);
×
UNCOV
3326
      goto _next_brinindex;
×
3327
    }
3328

UNCOV
3329
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
×
3330

UNCOV
3331
    SBrinRecord *pRecord = &state->brinRecord;
×
UNCOV
3332
    if (pRecord->uid != state->uid) {
×
3333
      // TODO: goto next brin block early
UNCOV
3334
      --state->iBrinRecord;
×
UNCOV
3335
      goto _next_brinrecord;
×
3336
    }
3337

UNCOV
3338
    state->state = SFSNEXTROW_BRINRECORD;
×
3339
  }
3340

UNCOV
3341
  if (SFSNEXTROW_BRINRECORD == state->state) {
×
UNCOV
3342
    SBrinRecord *pRecord = &state->brinRecord;
×
3343

UNCOV
3344
    if (!state->pBlockData) {
×
UNCOV
3345
      state->pBlockData = &state->blockData;
×
3346

UNCOV
3347
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
×
3348
    } else {
UNCOV
3349
      tBlockDataReset(state->pBlockData);
×
3350
    }
3351

UNCOV
3352
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
×
UNCOV
3353
      --nCols;
×
UNCOV
3354
      ++aCols;
×
3355
    }
3356

UNCOV
3357
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
×
3358
                                                      state->pTSchema, aCols, nCols),
3359
                    &lino, _err);
3360

UNCOV
3361
    state->nRow = state->blockData.nRow;
×
UNCOV
3362
    state->iRow = state->nRow - 1;
×
3363

UNCOV
3364
    state->state = SFSNEXTROW_BLOCKROW;
×
3365
  }
3366

UNCOV
3367
  if (SFSNEXTROW_BLOCKROW == state->state) {
×
UNCOV
3368
    if (state->iRow < 0) {
×
UNCOV
3369
      --state->iBrinRecord;
×
UNCOV
3370
      goto _next_brinrecord;
×
3371
    }
3372

UNCOV
3373
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
×
UNCOV
3374
    if (!state->pLastIter) {
×
UNCOV
3375
      *ppRow = &state->row;
×
UNCOV
3376
      --state->iRow;
×
3377
      return code;
×
3378
    }
3379

UNCOV
3380
    if (!state->pLastRow) {
×
3381
      // get next row from fslast and process with fs row, --state->Row if select fs row
UNCOV
3382
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
×
3383
    }
3384

UNCOV
3385
    if (!state->pLastRow) {
×
UNCOV
3386
      if (state->pLastIter) {
×
UNCOV
3387
        code = lastIterClose(&state->pLastIter);
×
UNCOV
3388
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3389
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3390
                    tstrerror(code));
3391
          TAOS_RETURN(code);
×
3392
        }
3393
      }
3394

UNCOV
3395
      *ppRow = &state->row;
×
UNCOV
3396
      --state->iRow;
×
UNCOV
3397
      return code;
×
3398
    }
3399

3400
    // process state->pLastRow & state->row
UNCOV
3401
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
UNCOV
3402
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
3403
    if (lastRowTs > rowTs) {
×
3404
      *ppRow = state->pLastRow;
×
3405
      state->pLastRow = NULL;
×
3406

3407
      TAOS_RETURN(code);
×
UNCOV
3408
    } else if (lastRowTs < rowTs) {
×
3409
      *ppRow = &state->row;
×
3410
      --state->iRow;
×
3411

3412
      TAOS_RETURN(code);
×
3413
    } else {
3414
      // TODO: merge rows and *ppRow = mergedRow
UNCOV
3415
      SRowMerger *pMerger = &state->rowMerger;
×
UNCOV
3416
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
3417
      if (code != TSDB_CODE_SUCCESS) {
×
3418
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3419
                  tstrerror(code));
3420
        TAOS_RETURN(code);
×
3421
      }
3422

UNCOV
3423
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
UNCOV
3424
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
3425

3426
      if (state->pTSRow) {
×
UNCOV
3427
        taosMemoryFree(state->pTSRow);
×
3428
        state->pTSRow = NULL;
×
3429
      }
3430

UNCOV
3431
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
3432

3433
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
UNCOV
3434
      *ppRow = &state->row;
×
3435
      --state->iRow;
×
3436

3437
      tsdbRowMergerClear(pMerger);
×
3438

3439
      TAOS_RETURN(code);
×
3440
    }
3441
  }
3442

UNCOV
3443
_err:
×
UNCOV
3444
  clearLastFileSet(state);
×
3445

3446
  *ppRow = NULL;
×
3447

3448
  if (code) {
×
UNCOV
3449
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3450
              tstrerror(code));
3451
  }
3452

UNCOV
3453
  TAOS_RETURN(code);
×
3454
}
3455

3456
typedef struct CacheNextRowIter {
3457
  SArray           *pMemDelData;
3458
  SArray           *pSkyline;
3459
  int64_t           iSkyline;
3460
  SBlockIdx         idx;
3461
  SMemNextRowIter   memState;
3462
  SMemNextRowIter   imemState;
3463
  SFSNextRowIter    fsState;
3464
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
3465
  TsdbNextRowState  input[3];
3466
  SCacheRowsReader *pr;
3467
  STsdb            *pTsdb;
3468
} CacheNextRowIter;
3469

UNCOV
3470
int32_t clearNextRowFromFS(void *iter) {
×
UNCOV
3471
  int32_t code = 0;
×
3472

UNCOV
3473
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
×
UNCOV
3474
  if (!state) {
×
UNCOV
3475
    TAOS_RETURN(code);
×
3476
  }
3477

UNCOV
3478
  if (state->pLastIter) {
×
UNCOV
3479
    code = lastIterClose(&state->pLastIter);
×
UNCOV
3480
    if (code != TSDB_CODE_SUCCESS) {
×
3481
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3482
      TAOS_RETURN(code);
×
3483
    }
3484
  }
3485

UNCOV
3486
  if (state->pBlockData) {
×
UNCOV
3487
    tBlockDataDestroy(state->pBlockData);
×
UNCOV
3488
    state->pBlockData = NULL;
×
3489
  }
3490

UNCOV
3491
  if (state->pBrinBlock) {
×
UNCOV
3492
    tBrinBlockDestroy(state->pBrinBlock);
×
UNCOV
3493
    state->pBrinBlock = NULL;
×
3494
  }
3495

UNCOV
3496
  if (state->pIndexList) {
×
UNCOV
3497
    taosArrayDestroy(state->pIndexList);
×
UNCOV
3498
    state->pIndexList = NULL;
×
3499
  }
3500

UNCOV
3501
  if (state->pTSRow) {
×
UNCOV
3502
    taosMemoryFree(state->pTSRow);
×
UNCOV
3503
    state->pTSRow = NULL;
×
3504
  }
3505

UNCOV
3506
  if (state->pRowIter->pSkyline) {
×
UNCOV
3507
    taosArrayDestroy(state->pRowIter->pSkyline);
×
UNCOV
3508
    state->pRowIter->pSkyline = NULL;
×
3509
  }
3510

UNCOV
3511
  TAOS_RETURN(code);
×
3512
}
3513

UNCOV
3514
static void clearLastFileSet(SFSNextRowIter *state) {
×
UNCOV
3515
  if (state->pLastIter) {
×
UNCOV
3516
    int code = lastIterClose(&state->pLastIter);
×
UNCOV
3517
    if (code != TSDB_CODE_SUCCESS) {
×
3518
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3519
      return;
×
3520
    }
3521
  }
3522

UNCOV
3523
  if (state->pBlockData) {
×
UNCOV
3524
    tBlockDataDestroy(state->pBlockData);
×
UNCOV
3525
    state->pBlockData = NULL;
×
3526
  }
3527

UNCOV
3528
  if (state->pr->pFileReader) {
×
UNCOV
3529
    tsdbDataFileReaderClose(&state->pr->pFileReader);
×
UNCOV
3530
    state->pr->pFileReader = NULL;
×
3531

UNCOV
3532
    state->pr->pCurFileSet = NULL;
×
3533
  }
3534

UNCOV
3535
  if (state->pTSRow) {
×
UNCOV
3536
    taosMemoryFree(state->pTSRow);
×
UNCOV
3537
    state->pTSRow = NULL;
×
3538
  }
3539

UNCOV
3540
  if (state->pRowIter->pSkyline) {
×
UNCOV
3541
    taosArrayDestroy(state->pRowIter->pSkyline);
×
UNCOV
3542
    state->pRowIter->pSkyline = NULL;
×
3543

UNCOV
3544
    void   *pe = NULL;
×
UNCOV
3545
    int32_t iter = 0;
×
UNCOV
3546
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
×
UNCOV
3547
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
×
UNCOV
3548
      taosArrayDestroy(pInfo->pTombData);
×
UNCOV
3549
      pInfo->pTombData = NULL;
×
3550
    }
3551
  }
3552
}
3553

UNCOV
3554
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
×
3555
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
3556
                               SCacheRowsReader *pr) {
UNCOV
3557
  int32_t code = 0, lino = 0;
×
3558

UNCOV
3559
  STbData *pMem = NULL;
×
UNCOV
3560
  if (pReadSnap->pMem) {
×
UNCOV
3561
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
×
3562
  }
3563

UNCOV
3564
  STbData *pIMem = NULL;
×
UNCOV
3565
  if (pReadSnap->pIMem) {
×
UNCOV
3566
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
3567
  }
3568

UNCOV
3569
  pIter->pTsdb = pTsdb;
×
3570

UNCOV
3571
  pIter->pMemDelData = NULL;
×
3572

UNCOV
3573
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
×
3574

UNCOV
3575
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
×
3576

UNCOV
3577
  pIter->fsState.pRowIter = pIter;
×
UNCOV
3578
  pIter->fsState.state = SFSNEXTROW_FS;
×
UNCOV
3579
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
×
UNCOV
3580
  pIter->fsState.pBlockIdxExp = &pIter->idx;
×
UNCOV
3581
  pIter->fsState.pTSchema = pTSchema;
×
UNCOV
3582
  pIter->fsState.suid = suid;
×
UNCOV
3583
  pIter->fsState.uid = uid;
×
UNCOV
3584
  pIter->fsState.lastTs = lastTs;
×
UNCOV
3585
  pIter->fsState.pr = pr;
×
3586

UNCOV
3587
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
×
UNCOV
3588
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
×
UNCOV
3589
  pIter->input[2] =
×
UNCOV
3590
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
×
3591

UNCOV
3592
  if (pMem) {
×
UNCOV
3593
    pIter->memState.pMem = pMem;
×
UNCOV
3594
    pIter->memState.state = SMEMNEXTROW_ENTER;
×
UNCOV
3595
    pIter->memState.lastTs = lastTs;
×
UNCOV
3596
    pIter->input[0].stop = false;
×
UNCOV
3597
    pIter->input[0].next = true;
×
3598
  }
3599

UNCOV
3600
  if (pIMem) {
×
UNCOV
3601
    pIter->imemState.pMem = pIMem;
×
UNCOV
3602
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
3603
    pIter->imemState.lastTs = lastTs;
×
3604
    pIter->input[1].stop = false;
×
3605
    pIter->input[1].next = true;
×
3606
  }
3607

UNCOV
3608
  pIter->pr = pr;
×
3609

UNCOV
3610
_err:
×
UNCOV
3611
  TAOS_RETURN(code);
×
3612
}
3613

UNCOV
3614
static void nextRowIterClose(CacheNextRowIter *pIter) {
×
UNCOV
3615
  for (int i = 0; i < 3; ++i) {
×
UNCOV
3616
    if (pIter->input[i].nextRowClearFn) {
×
UNCOV
3617
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
3618
    }
3619
  }
3620

UNCOV
3621
  if (pIter->pSkyline) {
×
UNCOV
3622
    taosArrayDestroy(pIter->pSkyline);
×
3623
  }
3624

UNCOV
3625
  if (pIter->pMemDelData) {
×
UNCOV
3626
    taosArrayDestroy(pIter->pMemDelData);
×
3627
  }
UNCOV
3628
}
×
3629

3630
// iterate next row non deleted backward ts, version (from high to low)
UNCOV
3631
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
×
3632
                              int16_t *aCols, int nCols) {
UNCOV
3633
  int32_t code = 0, lino = 0;
×
3634

UNCOV
3635
  for (;;) {
×
UNCOV
3636
    for (int i = 0; i < 3; ++i) {
×
UNCOV
3637
      if (pIter->input[i].next && !pIter->input[i].stop) {
×
UNCOV
3638
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
×
3639
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3640
                        &lino, _err);
3641

UNCOV
3642
        if (pIter->input[i].pRow == NULL) {
×
UNCOV
3643
          pIter->input[i].stop = true;
×
UNCOV
3644
          pIter->input[i].next = false;
×
3645
        }
3646
      }
3647
    }
3648

UNCOV
3649
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
×
UNCOV
3650
      *ppRow = NULL;
×
UNCOV
3651
      *pIgnoreEarlierTs =
×
UNCOV
3652
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
×
3653

UNCOV
3654
      TAOS_RETURN(code);
×
3655
    }
3656

3657
    // select maxpoint(s) from mem, imem, fs and last
UNCOV
3658
    TSDBROW *max[4] = {0};
×
UNCOV
3659
    int      iMax[4] = {-1, -1, -1, -1};
×
UNCOV
3660
    int      nMax = 0;
×
UNCOV
3661
    SRowKey  maxKey = {.ts = TSKEY_MIN};
×
3662

UNCOV
3663
    for (int i = 0; i < 3; ++i) {
×
UNCOV
3664
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
×
UNCOV
3665
        STsdbRowKey tsdbRowKey = {0};
×
UNCOV
3666
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
×
3667

3668
        // merging & deduplicating on client side
UNCOV
3669
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
×
UNCOV
3670
        if (c <= 0) {
×
UNCOV
3671
          if (c < 0) {
×
UNCOV
3672
            nMax = 0;
×
UNCOV
3673
            maxKey = tsdbRowKey.key;
×
3674
          }
3675

UNCOV
3676
          iMax[nMax] = i;
×
UNCOV
3677
          max[nMax++] = pIter->input[i].pRow;
×
3678
        }
UNCOV
3679
        pIter->input[i].next = false;
×
3680
      }
3681
    }
3682

3683
    // delete detection
UNCOV
3684
    TSDBROW *merge[4] = {0};
×
UNCOV
3685
    int      iMerge[4] = {-1, -1, -1, -1};
×
UNCOV
3686
    int      nMerge = 0;
×
UNCOV
3687
    for (int i = 0; i < nMax; ++i) {
×
UNCOV
3688
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
×
3689

UNCOV
3690
      if (!pIter->pSkyline) {
×
UNCOV
3691
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
×
UNCOV
3692
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
×
3693

UNCOV
3694
        uint64_t        uid = pIter->idx.uid;
×
UNCOV
3695
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
×
UNCOV
3696
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
×
3697

UNCOV
3698
        if (pInfo->pTombData == NULL) {
×
UNCOV
3699
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
×
UNCOV
3700
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
×
3701
        }
3702

UNCOV
3703
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
×
UNCOV
3704
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3705
        }
3706

UNCOV
3707
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
×
UNCOV
3708
        if (delSize > 0) {
×
UNCOV
3709
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
×
UNCOV
3710
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3711
        }
UNCOV
3712
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
×
3713
      }
3714

UNCOV
3715
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
×
UNCOV
3716
      if (!deleted) {
×
UNCOV
3717
        iMerge[nMerge] = iMax[i];
×
UNCOV
3718
        merge[nMerge++] = max[i];
×
3719
      }
3720

UNCOV
3721
      pIter->input[iMax[i]].next = deleted;
×
3722
    }
3723

UNCOV
3724
    if (nMerge > 0) {
×
UNCOV
3725
      pIter->input[iMerge[0]].next = true;
×
3726

UNCOV
3727
      *ppRow = merge[0];
×
3728

UNCOV
3729
      TAOS_RETURN(code);
×
3730
    }
3731
  }
3732

UNCOV
3733
_err:
×
UNCOV
3734
  if (code) {
×
3735
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3736
  }
3737

UNCOV
3738
  TAOS_RETURN(code);
×
3739
}
3740

UNCOV
3741
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
×
UNCOV
3742
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
×
UNCOV
3743
  if (NULL == pColArray) {
×
UNCOV
3744
    TAOS_RETURN(terrno);
×
3745
  }
3746

UNCOV
3747
  for (int32_t i = 0; i < nCols; ++i) {
×
UNCOV
3748
    int16_t  slotId = slotIds[i];
×
UNCOV
3749
    SLastCol col = {.rowKey.ts = 0,
×
UNCOV
3750
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
×
UNCOV
3751
    if (!taosArrayPush(pColArray, &col)) {
×
UNCOV
3752
      TAOS_RETURN(terrno);
×
3753
    }
3754
  }
UNCOV
3755
  *ppColArray = pColArray;
×
3756

UNCOV
3757
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
3758
}
3759

UNCOV
3760
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
×
3761
                            int nCols, int16_t *slotIds) {
UNCOV
3762
  int32_t   code = 0, lino = 0;
×
UNCOV
3763
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
×
UNCOV
3764
  int16_t   nLastCol = nCols;
×
UNCOV
3765
  int16_t   noneCol = 0;
×
UNCOV
3766
  bool      setNoneCol = false;
×
UNCOV
3767
  bool      hasRow = false;
×
UNCOV
3768
  bool      ignoreEarlierTs = false;
×
UNCOV
3769
  SArray   *pColArray = NULL;
×
UNCOV
3770
  SColVal  *pColVal = &(SColVal){0};
×
3771

UNCOV
3772
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
×
3773

UNCOV
3774
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
×
UNCOV
3775
  if (NULL == aColArray) {
×
UNCOV
3776
    taosArrayDestroy(pColArray);
×
3777

3778
    TAOS_RETURN(terrno);
×
3779
  }
3780

UNCOV
3781
  for (int i = 0; i < nCols; ++i) {
×
UNCOV
3782
    if (!taosArrayPush(aColArray, &aCols[i])) {
×
UNCOV
3783
      taosArrayDestroy(pColArray);
×
3784

3785
      TAOS_RETURN(terrno);
×
3786
    }
3787
  }
3788

UNCOV
3789
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
×
3790

3791
  // inverse iterator
UNCOV
3792
  CacheNextRowIter iter = {0};
×
3793
  code =
UNCOV
3794
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
×
UNCOV
3795
  TAOS_CHECK_GOTO(code, &lino, _err);
×
3796

3797
  do {
UNCOV
3798
    TSDBROW *pRow = NULL;
×
UNCOV
3799
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
×
3800

UNCOV
3801
    if (!pRow) {
×
UNCOV
3802
      break;
×
3803
    }
3804

UNCOV
3805
    hasRow = true;
×
3806

UNCOV
3807
    int32_t sversion = TSDBROW_SVERSION(pRow);
×
UNCOV
3808
    if (sversion != -1) {
×
UNCOV
3809
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
×
3810

UNCOV
3811
      pTSchema = pr->pCurrSchema;
×
3812
    }
3813
    // int16_t nCol = pTSchema->numOfCols;
3814

UNCOV
3815
    STsdbRowKey rowKey = {0};
×
UNCOV
3816
    tsdbRowGetKey(pRow, &rowKey);
×
3817

UNCOV
3818
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
×
UNCOV
3819
      lastRowKey = rowKey;
×
3820

UNCOV
3821
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
×
UNCOV
3822
        if (iCol >= nLastCol) {
×
UNCOV
3823
          break;
×
3824
        }
3825
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
×
UNCOV
3826
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
×
UNCOV
3827
          if (!setNoneCol) {
×
UNCOV
3828
            noneCol = iCol;
×
3829
            setNoneCol = true;
×
3830
          }
3831
          continue;
×
3832
        }
UNCOV
3833
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
×
UNCOV
3834
          continue;
×
3835
        }
UNCOV
3836
        if (slotIds[iCol] == 0) {
×
UNCOV
3837
          STColumn *pTColumn = &pTSchema->columns[0];
×
UNCOV
3838
          *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
×
3839

UNCOV
3840
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
×
UNCOV
3841
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
×
3842

UNCOV
3843
          taosArraySet(pColArray, 0, &colTmp);
×
UNCOV
3844
          continue;
×
3845
        }
UNCOV
3846
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
×
3847

UNCOV
3848
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
×
UNCOV
3849
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
×
3850

UNCOV
3851
        if (!COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
3852
          if (!setNoneCol) {
×
UNCOV
3853
            noneCol = iCol;
×
UNCOV
3854
            setNoneCol = true;
×
3855
          }
3856
        } else {
UNCOV
3857
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
×
UNCOV
3858
          if (aColIndex >= 0) {
×
UNCOV
3859
            taosArrayRemove(aColArray, aColIndex);
×
3860
          }
3861
        }
3862
      }
UNCOV
3863
      if (!setNoneCol) {
×
3864
        // done, goto return pColArray
UNCOV
3865
        break;
×
3866
      } else {
UNCOV
3867
        continue;
×
3868
      }
3869
    }
3870

3871
    // merge into pColArray
UNCOV
3872
    setNoneCol = false;
×
UNCOV
3873
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
×
UNCOV
3874
      if (iCol >= nLastCol) {
×
UNCOV
3875
        break;
×
3876
      }
3877
      // high version's column value
UNCOV
3878
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
×
UNCOV
3879
        continue;
×
3880
      }
3881

UNCOV
3882
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
×
UNCOV
3883
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
×
UNCOV
3884
        continue;
×
3885
      }
3886
      SColVal *tColVal = &lastColVal->colVal;
×
UNCOV
3887
      if (COL_VAL_IS_VALUE(tColVal)) continue;
×
3888

UNCOV
3889
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
×
UNCOV
3890
      if (COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
3891
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
×
UNCOV
3892
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
×
3893

UNCOV
3894
        tsdbCacheFreeSLastColItem(lastColVal);
×
UNCOV
3895
        taosArraySet(pColArray, iCol, &lastCol);
×
UNCOV
3896
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
×
UNCOV
3897
        if (aColIndex >= 0) {
×
UNCOV
3898
          taosArrayRemove(aColArray, aColIndex);
×
3899
        }
UNCOV
3900
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
×
UNCOV
3901
        noneCol = iCol;
×
UNCOV
3902
        setNoneCol = true;
×
3903
      }
3904
    }
UNCOV
3905
  } while (setNoneCol);
×
3906

UNCOV
3907
  if (!hasRow) {
×
UNCOV
3908
    if (ignoreEarlierTs) {
×
UNCOV
3909
      taosArrayDestroy(pColArray);
×
UNCOV
3910
      pColArray = NULL;
×
3911
    } else {
3912
      taosArrayClear(pColArray);
×
3913
    }
3914
  }
UNCOV
3915
  *ppLastArray = pColArray;
×
3916

UNCOV
3917
  nextRowIterClose(&iter);
×
UNCOV
3918
  taosArrayDestroy(aColArray);
×
3919

UNCOV
3920
  TAOS_RETURN(code);
×
3921

UNCOV
3922
_err:
×
UNCOV
3923
  nextRowIterClose(&iter);
×
3924
  // taosMemoryFreeClear(pTSchema);
3925
  *ppLastArray = NULL;
×
UNCOV
3926
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
3927
  taosArrayDestroy(aColArray);
×
3928

3929
  if (code) {
×
UNCOV
3930
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3931
              tstrerror(code));
3932
  }
3933

UNCOV
3934
  TAOS_RETURN(code);
×
3935
}
3936

UNCOV
3937
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
×
3938
                               int nCols, int16_t *slotIds) {
UNCOV
3939
  int32_t   code = 0, lino = 0;
×
UNCOV
3940
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
×
UNCOV
3941
  int16_t   nLastCol = nCols;
×
UNCOV
3942
  int16_t   noneCol = 0;
×
UNCOV
3943
  bool      setNoneCol = false;
×
UNCOV
3944
  bool      hasRow = false;
×
UNCOV
3945
  bool      ignoreEarlierTs = false;
×
UNCOV
3946
  SArray   *pColArray = NULL;
×
UNCOV
3947
  SColVal  *pColVal = &(SColVal){0};
×
3948

UNCOV
3949
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
×
3950

UNCOV
3951
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
×
UNCOV
3952
  if (NULL == aColArray) {
×
UNCOV
3953
    taosArrayDestroy(pColArray);
×
3954

3955
    TAOS_RETURN(terrno);
×
3956
  }
3957

UNCOV
3958
  for (int i = 0; i < nCols; ++i) {
×
UNCOV
3959
    if (!taosArrayPush(aColArray, &aCols[i])) {
×
UNCOV
3960
      taosArrayDestroy(pColArray);
×
3961

3962
      TAOS_RETURN(terrno);
×
3963
    }
3964
  }
3965

3966
  // inverse iterator
UNCOV
3967
  CacheNextRowIter iter = {0};
×
3968
  code =
UNCOV
3969
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
×
UNCOV
3970
  TAOS_CHECK_GOTO(code, &lino, _err);
×
3971

3972
  do {
UNCOV
3973
    TSDBROW *pRow = NULL;
×
UNCOV
3974
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
×
3975

UNCOV
3976
    if (!pRow) {
×
UNCOV
3977
      break;
×
3978
    }
3979

UNCOV
3980
    hasRow = true;
×
3981

UNCOV
3982
    int32_t sversion = TSDBROW_SVERSION(pRow);
×
UNCOV
3983
    if (sversion != -1) {
×
UNCOV
3984
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
×
3985

UNCOV
3986
      pTSchema = pr->pCurrSchema;
×
3987
    }
3988
    // int16_t nCol = pTSchema->numOfCols;
3989

UNCOV
3990
    STsdbRowKey rowKey = {0};
×
UNCOV
3991
    tsdbRowGetKey(pRow, &rowKey);
×
3992

UNCOV
3993
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
×
UNCOV
3994
      if (iCol >= nLastCol) {
×
UNCOV
3995
        break;
×
3996
      }
3997
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
×
UNCOV
3998
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
×
UNCOV
3999
        continue;
×
4000
      }
UNCOV
4001
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
×
UNCOV
4002
        continue;
×
4003
      }
4004
      if (slotIds[iCol] == 0) {
×
UNCOV
4005
        STColumn *pTColumn = &pTSchema->columns[0];
×
UNCOV
4006
        *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
×
4007

UNCOV
4008
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
×
UNCOV
4009
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
×
4010

UNCOV
4011
        taosArraySet(pColArray, 0, &colTmp);
×
UNCOV
4012
        continue;
×
4013
      }
UNCOV
4014
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
×
4015

UNCOV
4016
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
×
UNCOV
4017
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
×
4018

UNCOV
4019
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
×
UNCOV
4020
      if (aColIndex >= 0) {
×
UNCOV
4021
        taosArrayRemove(aColArray, aColIndex);
×
4022
      }
4023
    }
4024

UNCOV
4025
    break;
×
4026
  } while (1);
4027

UNCOV
4028
  if (!hasRow) {
×
UNCOV
4029
    if (ignoreEarlierTs) {
×
UNCOV
4030
      taosArrayDestroy(pColArray);
×
UNCOV
4031
      pColArray = NULL;
×
4032
    } else {
4033
      taosArrayClear(pColArray);
×
4034
    }
4035
  }
UNCOV
4036
  *ppLastArray = pColArray;
×
4037

UNCOV
4038
  nextRowIterClose(&iter);
×
UNCOV
4039
  taosArrayDestroy(aColArray);
×
4040

UNCOV
4041
  TAOS_RETURN(code);
×
4042

UNCOV
4043
_err:
×
UNCOV
4044
  nextRowIterClose(&iter);
×
4045

4046
  *ppLastArray = NULL;
×
UNCOV
4047
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
4048
  taosArrayDestroy(aColArray);
×
4049

4050
  if (code) {
×
UNCOV
4051
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4052
              tstrerror(code));
4053
  }
4054

UNCOV
4055
  TAOS_RETURN(code);
×
4056
}
4057

UNCOV
4058
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
×
4059

4060
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
×
UNCOV
4061
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
×
UNCOV
4062
}
×
4063

4064
#ifdef BUILD_NO_CALL
4065
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
4066
#endif
4067

UNCOV
4068
size_t tsdbCacheGetUsage(SVnode *pVnode) {
×
UNCOV
4069
  size_t usage = 0;
×
UNCOV
4070
  if (pVnode->pTsdb != NULL) {
×
UNCOV
4071
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
×
4072
  }
4073

UNCOV
4074
  return usage;
×
4075
}
4076

UNCOV
4077
int32_t tsdbCacheGetElems(SVnode *pVnode) {
×
UNCOV
4078
  int32_t elems = 0;
×
UNCOV
4079
  if (pVnode->pTsdb != NULL) {
×
UNCOV
4080
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
×
4081
  }
4082

UNCOV
4083
  return elems;
×
4084
}
4085

4086
// block cache
UNCOV
4087
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
×
4088
  struct {
4089
    int32_t fid;
4090
    int64_t commitID;
4091
    int64_t blkno;
UNCOV
4092
  } bKey = {0};
×
4093

4094
  bKey.fid = fid;
×
UNCOV
4095
  bKey.commitID = commitID;
×
4096
  bKey.blkno = blkno;
×
4097

4098
  *len = sizeof(bKey);
×
UNCOV
4099
  memcpy(key, &bKey, *len);
×
4100
}
×
4101

4102
static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
×
UNCOV
4103
  int32_t code = 0;
×
4104

4105
  int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
×
4106

4107
  TAOS_CHECK_RETURN(tcsGetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock));
×
4108

4109
  tsdbTrace("block:%p load from s3", *ppBlock);
×
4110

4111
_exit:
×
UNCOV
4112
  return code;
×
4113
}
4114

UNCOV
4115
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
×
4116
  (void)ud;
4117
  uint8_t *pBlock = (uint8_t *)value;
×
4118

4119
  taosMemoryFree(pBlock);
×
UNCOV
4120
}
×
4121

4122
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
UNCOV
4123
  int32_t code = 0;
×
4124
  char    key[128] = {0};
×
4125
  int     keyLen = 0;
×
4126

4127
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
UNCOV
4128
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
4129
  if (!h) {
×
4130
    STsdb *pTsdb = pFD->pTsdb;
×
4131
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
4132

4133
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
UNCOV
4134
    if (!h) {
×
4135
      uint8_t *pBlock = NULL;
×
4136
      code = tsdbCacheLoadBlockS3(pFD, &pBlock);
×
4137
      //  if table's empty or error, return code of -1
4138
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
UNCOV
4139
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4140

4141
        *handle = NULL;
×
UNCOV
4142
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
4143
          code = TSDB_CODE_OUT_OF_MEMORY;
×
4144
        }
4145

UNCOV
4146
        TAOS_RETURN(code);
×
4147
      }
4148

UNCOV
4149
      size_t              charge = tsS3BlockSize * pFD->szPage;
×
UNCOV
4150
      _taos_lru_deleter_t deleter = deleteBCache;
×
4151
      LRUStatus           status =
4152
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
4153
      if (status != TAOS_LRU_STATUS_OK) {
4154
        // code = -1;
4155
      }
4156
    }
4157

UNCOV
4158
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4159
  }
4160

UNCOV
4161
  *handle = h;
×
4162

4163
  TAOS_RETURN(code);
×
4164
}
4165

UNCOV
4166
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
×
UNCOV
4167
  int32_t code = 0;
×
4168
  char    key[128] = {0};
×
4169
  int     keyLen = 0;
×
4170

4171
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
UNCOV
4172
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
×
4173

4174
  return code;
×
4175
}
4176

UNCOV
4177
void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
×
UNCOV
4178
  char       key[128] = {0};
×
4179
  int        keyLen = 0;
×
4180
  LRUHandle *handle = NULL;
×
4181

4182
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
×
UNCOV
4183
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
×
4184
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
×
4185
  if (!handle) {
×
4186
    size_t              charge = pFD->szPage;
×
4187
    _taos_lru_deleter_t deleter = deleteBCache;
×
4188
    uint8_t            *pPg = taosMemoryMalloc(charge);
×
4189
    if (!pPg) {
×
4190
      return;  // ignore error with s3 cache and leave error untouched
×
4191
    }
4192
    memcpy(pPg, pPage, charge);
×
4193

4194
    LRUStatus status =
UNCOV
4195
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
×
4196
    if (status != TAOS_LRU_STATUS_OK) {
4197
      // ignore cache updating if not ok
4198
      // code = TSDB_CODE_OUT_OF_MEMORY;
4199
    }
4200
  }
UNCOV
4201
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4202

4203
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
×
4204
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc