• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5057

17 May 2026 01:15AM UTC coverage: 73.406% (+0.02%) from 73.384%
#5057

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281727 of 383795 relevant lines covered (73.41%)

136101761.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.93
/source/dnode/vnode/src/tsdb/tsdbCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "functionMgt.h"
16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbIter.h"
19
#include "tsdbReadUtil.h"
20
#include "tss.h"
21
#include "vnd.h"
22

23
#define ROCKS_BATCH_SIZE (4096)
24

25
void tsdbLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRef) {
73,945,477✔
26
  if (!taosLRUCacheRelease(cache, handle, eraseIfLastRef)) {
73,945,477✔
27
    tsdbTrace(" release lru cache failed");
18,746,671✔
28
  }
29
}
73,939,068✔
30

31
#ifdef USE_SHARED_STORAGE
32

33
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
24,780✔
34
  int32_t code = 0, lino = 0;
24,780✔
35
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
24,780✔
36
  int64_t szBlock = tsSsBlockSize <= 1024 ? 1024 : tsSsBlockSize;
24,780✔
37

38
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsBlockCacheSize * szBlock * szPage, 0, .5);
24,780✔
39
  if (pCache == NULL) {
24,780✔
40
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
41
  }
42

43
  taosLRUCacheSetStrictCapacity(pCache, false);
24,780✔
44

45
  (void)taosThreadMutexInit(&pTsdb->bMutex, NULL);
24,780✔
46

47
  pTsdb->bCache = pCache;
24,780✔
48

49
_err:
24,780✔
50
  if (code) {
24,780✔
51
    tsdbError("tsdb/bcache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
52
              tstrerror(code));
53
  }
54

55
  TAOS_RETURN(code);
24,780✔
56
}
57

58
static void tsdbCloseBCache(STsdb *pTsdb) {
24,780✔
59
  SLRUCache *pCache = pTsdb->bCache;
24,780✔
60
  if (pCache) {
24,780✔
61
    int32_t elems = taosLRUCacheGetElems(pCache);
24,780✔
62
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
24,780✔
63
    taosLRUCacheEraseUnrefEntries(pCache);
24,780✔
64
    elems = taosLRUCacheGetElems(pCache);
24,780✔
65
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
24,780✔
66

67
    taosLRUCacheCleanup(pCache);
24,780✔
68

69
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
24,780✔
70
  }
71
}
24,780✔
72

73
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
24,780✔
74
  int32_t code = 0, lino = 0;
24,780✔
75
  int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
24,780✔
76

77
  SLRUCache *pCache = taosLRUCacheInit((int64_t)tsSsPageCacheSize * szPage, 0, .5);
24,780✔
78
  if (pCache == NULL) {
24,780✔
79
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
80
  }
81

82
  taosLRUCacheSetStrictCapacity(pCache, false);
24,780✔
83

84
  (void)taosThreadMutexInit(&pTsdb->pgMutex, NULL);
24,780✔
85

86
  pTsdb->pgCache = pCache;
24,780✔
87

88
_err:
24,780✔
89
  if (code) {
24,780✔
90
    tsdbError("tsdb/pgcache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
91
  }
92

93
  TAOS_RETURN(code);
24,780✔
94
}
95

96
static void tsdbClosePgCache(STsdb *pTsdb) {
24,780✔
97
  SLRUCache *pCache = pTsdb->pgCache;
24,780✔
98
  if (pCache) {
24,780✔
99
    int32_t elems = taosLRUCacheGetElems(pCache);
24,780✔
100
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
24,780✔
101
    taosLRUCacheEraseUnrefEntries(pCache);
24,780✔
102
    elems = taosLRUCacheGetElems(pCache);
24,780✔
103
    tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
24,780✔
104

105
    taosLRUCacheCleanup(pCache);
24,780✔
106

107
    (void)taosThreadMutexDestroy(&pTsdb->bMutex);
24,780✔
108
  }
109
}
24,780✔
110

111
#endif  // USE_SHARED_STORAGE
112

113
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
114

115
enum {
116
  LFLAG_LAST_ROW = 0,
117
  LFLAG_LAST = 1,
118
};
119

120
typedef struct {
121
  tb_uid_t uid;
122
  int16_t  cid;
123
  int8_t   lflag;
124
} SLastKey;
125

126
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
127
#define IS_LAST_KEY(k)     (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
128

129
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
4,869,281✔
130
  SVnode *pVnode = pTsdb->pVnode;
4,869,281✔
131
  vnodeGetPrimaryPath(pVnode, false, path, TSDB_FILENAME_LEN);
4,867,074✔
132

133
  int32_t offset = strlen(path);
4,870,784✔
134
  snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%scache.rdb", TD_DIRSEP, pTsdb->name, TD_DIRSEP);
4,870,784✔
135
}
4,869,897✔
136

137
static const char *myCmpName(void *state) {
25,735,421✔
138
  (void)state;
139
  return "myCmp";
25,735,421✔
140
}
141

142
static void myCmpDestroy(void *state) { (void)state; }
4,873,068✔
143

144
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
1,379,249,535✔
145
  (void)state;
146
  (void)alen;
147
  (void)blen;
148
  SLastKey *lhs = (SLastKey *)a;
1,379,249,535✔
149
  SLastKey *rhs = (SLastKey *)b;
1,379,249,535✔
150

151
  if (lhs->uid < rhs->uid) {
1,379,249,535✔
152
    return -1;
823,504,099✔
153
  } else if (lhs->uid > rhs->uid) {
555,888,906✔
154
    return 1;
245,172,126✔
155
  }
156

157
  if (lhs->cid < rhs->cid) {
310,758,796✔
158
    return -1;
112,745,282✔
159
  } else if (lhs->cid > rhs->cid) {
198,026,063✔
160
    return 1;
74,491,404✔
161
  }
162

163
  if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
123,539,703✔
164
    return -1;
45,448,929✔
165
  } else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
78,091,151✔
166
    return 1;
74,145,441✔
167
  }
168

169
  return 0;
3,945,710✔
170
}
171

172
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
4,872,400✔
173
  int32_t code = 0, lino = 0;
4,872,400✔
174
#ifdef USE_ROCKSDB
175
  rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
4,873,068✔
176
  if (NULL == cmp) {
4,873,068✔
177
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
178
  }
179

180
  rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create();
4,873,068✔
181
  pTsdb->rCache.tableoptions = tableoptions;
4,873,006✔
182

183
  rocksdb_options_t *options = rocksdb_options_create();
4,873,006✔
184
  if (NULL == options) {
4,873,068✔
185
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
186
  }
187

188
  rocksdb_options_set_create_if_missing(options, 1);
4,873,068✔
189
  rocksdb_options_set_comparator(options, cmp);
4,870,401✔
190
  rocksdb_options_set_block_based_table_factory(options, tableoptions);
4,870,239✔
191
  rocksdb_options_set_info_log_level(options, 2);  // WARN_LEVEL
4,872,677✔
192
  // rocksdb_options_set_inplace_update_support(options, 1);
193
  // rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
194

195
  rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
4,872,677✔
196
  if (NULL == writeoptions) {
4,871,565✔
197
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err2);
×
198
  }
199
  rocksdb_writeoptions_disable_WAL(writeoptions, 1);
4,871,565✔
200

201
  rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
4,872,341✔
202
  if (NULL == readoptions) {
4,869,513✔
203
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err3);
×
204
  }
205

206
  char *err = NULL;
4,869,513✔
207
  char  cachePath[TSDB_FILENAME_LEN] = {0};
4,869,346✔
208
  tsdbGetRocksPath(pTsdb, cachePath);
4,868,425✔
209

210
  rocksdb_t *db = rocksdb_open(options, cachePath, &err);
4,871,953✔
211
  if (NULL == db) {
4,872,341✔
212
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
213
    rocksdb_free(err);
×
214

215
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err4);
×
216
  }
217

218
  rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
4,872,341✔
219
  if (NULL == flushoptions) {
4,872,341✔
220
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err5);
×
221
  }
222

223
  rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
4,872,341✔
224

225
  TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
4,871,795✔
226

227
  pTsdb->rCache.writebatch = writebatch;
4,871,795✔
228
  pTsdb->rCache.my_comparator = cmp;
4,872,522✔
229
  pTsdb->rCache.options = options;
4,872,341✔
230
  pTsdb->rCache.writeoptions = writeoptions;
4,871,614✔
231
  pTsdb->rCache.readoptions = readoptions;
4,871,068✔
232
  pTsdb->rCache.flushoptions = flushoptions;
4,872,522✔
233
  pTsdb->rCache.db = db;
4,871,068✔
234
  pTsdb->rCache.sver = -1;
4,871,068✔
235
  pTsdb->rCache.suid = -1;
4,871,068✔
236
  pTsdb->rCache.uid = -1;
4,872,522✔
237
  pTsdb->rCache.pTSchema = NULL;
4,871,795✔
238
  pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
4,871,068✔
239
  if (!pTsdb->rCache.ctxArray) {
4,873,068✔
240
    TAOS_CHECK_GOTO(terrno, &lino, _err7);
×
241
  }
242

243
  TAOS_RETURN(code);
4,873,068✔
244

245
_err7:
×
246
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
×
247
_err6:
×
248
  rocksdb_writebatch_destroy(writebatch);
×
249
_err5:
×
250
  rocksdb_close(pTsdb->rCache.db);
×
251
_err4:
×
252
  rocksdb_readoptions_destroy(readoptions);
×
253
_err3:
×
254
  rocksdb_writeoptions_destroy(writeoptions);
×
255
_err2:
×
256
  rocksdb_options_destroy(options);
×
257
  rocksdb_block_based_options_destroy(tableoptions);
×
258
_err:
×
259
  rocksdb_comparator_destroy(cmp);
×
260
#endif
261
  TAOS_RETURN(code);
×
262
}
263

264
static void tsdbCloseRocksCache(STsdb *pTsdb) {
4,873,068✔
265
#ifdef USE_ROCKSDB
266
  rocksdb_close(pTsdb->rCache.db);
4,873,068✔
267
  (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
4,872,702✔
268
  rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
4,871,570✔
269
  rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
4,872,702✔
270
  rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
4,872,702✔
271
  rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
4,871,997✔
272
  rocksdb_options_destroy(pTsdb->rCache.options);
4,871,438✔
273
  rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions);
4,873,068✔
274
  rocksdb_comparator_destroy(pTsdb->rCache.my_comparator);
4,873,068✔
275
  taosMemoryFree(pTsdb->rCache.pTSchema);
4,873,068✔
276
  taosArrayDestroy(pTsdb->rCache.ctxArray);
4,872,024✔
277
#endif
278
}
4,873,068✔
279

280
static void rocksMayWrite(STsdb *pTsdb, bool force) {
66,158,823✔
281
#ifdef USE_ROCKSDB
282
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
66,158,823✔
283

284
  int count = rocksdb_writebatch_count(wb);
66,158,352✔
285
  if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
66,159,954✔
286
    char *err = NULL;
276,971✔
287

288
    rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
276,971✔
289
    if (NULL != err) {
276,971✔
290
      tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
×
291
                err);
292
      rocksdb_free(err);
×
293
    }
294

295
    rocksdb_writebatch_clear(wb);
276,971✔
296
  }
297
#endif
298
}
66,159,954✔
299

300
typedef struct {
301
  TSKEY  ts;
302
  int8_t dirty;
303
  struct {
304
    int16_t cid;
305
    int8_t  type;
306
    int8_t  flag;
307
    union {
308
      int64_t val;
309
      struct {
310
        uint32_t nData;
311
        uint8_t *pData;
312
      };
313
    } value;
314
  } colVal;
315
} SLastColV0;
316

317
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
214,833✔
318
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
214,833✔
319

320
  pLastCol->rowKey.ts = pLastColV0->ts;
214,833✔
321
  pLastCol->rowKey.numOfPKs = 0;
214,833✔
322
  pLastCol->dirty = pLastColV0->dirty;
214,833✔
323
  pLastCol->colVal.cid = pLastColV0->colVal.cid;
214,833✔
324
  pLastCol->colVal.flag = pLastColV0->colVal.flag;
214,833✔
325
  pLastCol->colVal.value.type = pLastColV0->colVal.type;
214,833✔
326

327
  pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
214,833✔
328

329
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
214,833✔
330
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
13,047✔
331
    pLastCol->colVal.value.pData = NULL;
13,047✔
332
    if (pLastCol->colVal.value.nData > 0) {
13,047✔
333
      pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
4,850✔
334
    }
335
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
13,047✔
336
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
201,786✔
337
    pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
48,840✔
338
    pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
48,840✔
339
    return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
48,840✔
340
  } else {
341
    pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
152,946✔
342
    return sizeof(SLastColV0);
152,946✔
343
  }
344
}
345

346
int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
214,833✔
347
  if (!value) {
214,833✔
348
    return TSDB_CODE_INVALID_PARA;
×
349
  }
350

351
  SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
214,833✔
352
  if (NULL == pLastCol) {
214,833✔
353
    return terrno;
×
354
  }
355

356
  int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
214,833✔
357
  if (offset == size) {
214,833✔
358
    // version 0
359
    *ppLastCol = pLastCol;
×
360

361
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
362
  } else if (offset > size) {
214,833✔
363
    taosMemoryFreeClear(pLastCol);
×
364

365
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
366
  }
367

368
  // version - validate before read
369
  if (offset + sizeof(int8_t) > size) {
214,833✔
370
    taosMemoryFreeClear(pLastCol);
×
371
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
372
  }
373
  int8_t version = *(int8_t *)(value + offset);
214,833✔
374
  offset += sizeof(int8_t);
214,833✔
375

376
  // numOfPKs - validate before read
377
  if (offset + sizeof(uint8_t) > size) {
214,833✔
378
    taosMemoryFreeClear(pLastCol);
×
379
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
380
  }
381
  pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
214,833✔
382
  offset += sizeof(uint8_t);
214,833✔
383

384
  if (pLastCol->rowKey.numOfPKs > TD_MAX_PK_COLS) {
214,833✔
385
    taosMemoryFreeClear(pLastCol);
×
386
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
387
  }
388

389
  // pks
390
  for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
214,833✔
391
    // validate before reading SValue
392
    if (offset + sizeof(SValue) > size) {
×
393
      taosMemoryFreeClear(pLastCol);
×
394
      TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
395
    }
396
    pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
×
397
    offset += sizeof(SValue);
×
398

399
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
×
400
      pLastCol->rowKey.pks[i].pData = NULL;
×
401
      if (pLastCol->rowKey.pks[i].nData > 0) {
×
402
        // validate before reading variable-length payload
403
        if (offset + pLastCol->rowKey.pks[i].nData > size) {
×
404
          taosMemoryFreeClear(pLastCol);
×
405
          TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
406
        }
407
        pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
×
408
        offset += pLastCol->rowKey.pks[i].nData;
×
409
      }
410
    }
411
  }
412

413
  if (version >= LAST_COL_VERSION_2) {
214,833✔
414
    // validate before reading cacheStatus
415
    if (offset + sizeof(uint8_t) > size) {
214,833✔
416
      taosMemoryFreeClear(pLastCol);
×
417
      TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
418
    }
419
    pLastCol->cacheStatus = *(uint8_t *)(value + offset);
214,833✔
420
    offset += sizeof(uint8_t);
214,833✔
421
  }
422

423
  // Final validation
424
  if (offset > size) {
214,833✔
425
    taosMemoryFreeClear(pLastCol);
×
426
    TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
×
427
  }
428

429
  *ppLastCol = pLastCol;
214,833✔
430

431
  TAOS_RETURN(TSDB_CODE_SUCCESS);
214,833✔
432
}
433

434
/*
435
typedef struct {
436
  SLastColV0 lastColV0;
437
  char       colData[];
438
  int8_t     version;
439
  uint8_t    numOfPKs;
440
  SValue     pks[0];
441
  char       pk0Data[];
442
  SValue     pks[1];
443
  char       pk1Data[];
444
  ...
445
} SLastColDisk;
446
*/
447
static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
61,308,577✔
448
  SLastColV0 *pLastColV0 = (SLastColV0 *)value;
61,308,577✔
449

450
  pLastColV0->ts = pLastCol->rowKey.ts;
61,308,577✔
451
  pLastColV0->dirty = pLastCol->dirty;
61,308,577✔
452
  pLastColV0->colVal.cid = pLastCol->colVal.cid;
61,308,200✔
453
  pLastColV0->colVal.flag = pLastCol->colVal.flag;
61,308,200✔
454
  pLastColV0->colVal.type = pLastCol->colVal.value.type;
61,308,200✔
455
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
61,308,577✔
456
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
1,200,441✔
457
    if (pLastCol->colVal.value.nData > 0) {
1,200,064✔
458
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
573,955✔
459
    }
460
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
1,200,064✔
461
  } else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
60,110,021✔
462
    pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
657,876✔
463
    if (pLastCol->colVal.value.nData > 0) {
657,876✔
464
      memcpy(&pLastColV0[1], pLastCol->colVal.value.pData, pLastCol->colVal.value.nData);
343,876✔
465
    }
466
    return sizeof(SLastColV0) + pLastCol->colVal.value.nData;
657,876✔
467
  } else {
468
    pLastColV0->colVal.value.val = pLastCol->colVal.value.val;
59,452,145✔
469
    return sizeof(SLastColV0);
59,452,145✔
470
  }
471

472
  return 0;
473
}
474

475
static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
61,309,604✔
476
  *size = sizeof(SLastColV0);
61,309,604✔
477
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
61,310,085✔
478
    *size += pLastCol->colVal.value.nData;
1,200,545✔
479
  }
480
  if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
61,309,708✔
481
    *size += DECIMAL128_BYTES;
657,876✔
482
  }
483
  *size += sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t);  // version + numOfPKs + cacheStatus
61,309,708✔
484

485
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
63,731,821✔
486
    *size += sizeof(SValue);
2,422,840✔
487
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
2,422,840✔
488
      *size += pLastCol->rowKey.pks[i].nData;
830,897✔
489
    }
490
  }
491

492
  *value = taosMemoryMalloc(*size);
61,308,604✔
493
  if (NULL == *value) {
61,308,954✔
494
    TAOS_RETURN(terrno);
×
495
  }
496

497
  int32_t offset = tsdbCacheSerializeV0(*value, pLastCol);
61,308,577✔
498

499
  // version
500
  ((uint8_t *)(*value + offset))[0] = LAST_COL_VERSION;
61,309,256✔
501
  offset++;
61,310,085✔
502

503
  // numOfPKs
504
  ((uint8_t *)(*value + offset))[0] = pLastCol->rowKey.numOfPKs;
61,310,085✔
505
  offset++;
61,310,085✔
506

507
  // pks
508
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
63,732,925✔
509
    ((SValue *)(*value + offset))[0] = pLastCol->rowKey.pks[i];
2,422,840✔
510
    offset += sizeof(SValue);
2,422,840✔
511
    if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
2,422,840✔
512
      if (pLastCol->rowKey.pks[i].nData > 0) {
830,170✔
513
        memcpy(*value + offset, pLastCol->rowKey.pks[i].pData, pLastCol->rowKey.pks[i].nData);
830,170✔
514
      }
515
      offset += pLastCol->rowKey.pks[i].nData;
830,170✔
516
    }
517
  }
518

519
  ((uint8_t *)(*value + offset))[0] = pLastCol->cacheStatus;
61,310,085✔
520

521
  TAOS_RETURN(TSDB_CODE_SUCCESS);
61,310,085✔
522
}
523

524
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
525

526
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
67,193,026✔
527
  SLastCol *pLastCol = (SLastCol *)value;
67,193,026✔
528

529
  if (pLastCol->dirty) {
67,193,026✔
530
    STsdb *pTsdb = (STsdb *)ud;
58,609,858✔
531

532
    int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
58,609,858✔
533
    if (code) {
58,609,933✔
534
      tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
535
      return code;
×
536
    }
537

538
    pLastCol->dirty = 0;
58,609,933✔
539

540
    rocksMayWrite(pTsdb, false);
58,609,933✔
541
  }
542

543
  return 0;
67,199,609✔
544
}
545

546
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
24,143,264✔
547
  bool deleted = false;
24,143,264✔
548
  while (*iSkyline > 0) {
24,143,264✔
549
    TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline);
32,532✔
550
    TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1);
32,532✔
551

552
    if (key->ts > pItemBack->ts) {
32,532✔
553
      return false;
8,342✔
554
    } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
24,190✔
555
      if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
9,254✔
556
        // if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
557
        return true;
9,254✔
558
      } else {
559
        if (*iSkyline > 1) {
×
560
          --*iSkyline;
×
561
        } else {
562
          return false;
×
563
        }
564
      }
565
    } else {
566
      if (*iSkyline > 1) {
14,936✔
567
        --*iSkyline;
×
568
      } else {
569
        return false;
14,936✔
570
      }
571
    }
572
  }
573

574
  return deleted;
24,110,732✔
575
}
576

577
// Get next non-deleted row from imem
578
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
12,766,943✔
579
  int32_t code = 0;
12,766,943✔
580

581
  if (tsdbTbDataIterNext(pTbIter)) {
12,766,943✔
582
    TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
12,676,550✔
583
    TSDBKEY  rowKey = TSDBROW_KEY(pMemRow);
12,676,550✔
584
    bool     deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
12,676,550✔
585
    if (!deleted) {
12,675,823✔
586
      return pMemRow;
12,675,823✔
587
    }
588
  }
589

590
  return NULL;
90,393✔
591
}
592

593
// Get first non-deleted row from imem
594
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline,
9,016,485✔
595
                                    int64_t *piSkyline) {
596
  int32_t code = 0;
9,016,485✔
597

598
  tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
9,016,485✔
599
  TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
9,016,862✔
600
  if (pMemRow) {
9,016,862✔
601
    // if non deleted, return the found row.
602
    TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
9,016,862✔
603
    bool    deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
9,016,862✔
604
    if (!deleted) {
9,016,862✔
605
      return pMemRow;
9,013,982✔
606
    }
607
  } else {
608
    return NULL;
×
609
  }
610

611
  // continue to find the non-deleted first row from imem, using get next row
612
  return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
2,880✔
613
}
614

615
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
62,300✔
616
  SRocksCache *pRCache = &pTsdb->rCache;
62,300✔
617
  if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
62,300✔
618

619
  if (suid > 0 && suid == pRCache->suid) {
4,050✔
620
    pRCache->sver = -1;
×
621
    pRCache->suid = -1;
×
622
  }
623
  if (suid == 0 && uid == pRCache->uid) {
4,050✔
624
    pRCache->sver = -1;
810✔
625
    pRCache->uid = -1;
810✔
626
  }
627
}
628

629
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
12,763,336✔
630
  SRocksCache *pRCache = &pTsdb->rCache;
12,763,336✔
631
  if (pRCache->pTSchema && sver == pRCache->sver) {
12,763,713✔
632
    if (suid > 0 && suid == pRCache->suid) {
12,713,867✔
633
      return 0;
12,238,806✔
634
    }
635
    if (suid == 0 && uid == pRCache->uid) {
475,061✔
636
      return 0;
309,869✔
637
    }
638
  }
639

640
  pRCache->suid = suid;
215,038✔
641
  pRCache->uid = uid;
215,038✔
642
  pRCache->sver = sver;
215,038✔
643
  tDestroyTSchema(pRCache->pTSchema);
215,038✔
644
  return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema);
215,038✔
645
}
646

647
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
648

649
int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) {
9,016,862✔
650
  int32_t      code = 0;
9,016,862✔
651
  int32_t      lino = 0;
9,016,862✔
652
  STsdb       *pTsdb = imem->pTsdb;
9,016,862✔
653
  SArray      *pMemDelData = NULL;
9,016,862✔
654
  SArray      *pSkyline = NULL;
9,016,862✔
655
  int64_t      iSkyline = 0;
9,016,862✔
656
  STbDataIter  tbIter = {0};
9,016,862✔
657
  TSDBROW     *pMemRow = NULL;
9,016,862✔
658
  STSchema    *pTSchema = NULL;
9,016,862✔
659
  SSHashObj   *iColHash = NULL;
9,016,862✔
660
  int32_t      sver;
661
  int32_t      nCol;
662
  SArray      *ctxArray = pTsdb->rCache.ctxArray;
9,016,862✔
663
  STsdbRowKey  tsdbRowKey = {0};
9,016,862✔
664
  STSDBRowIter iter = {0};
9,016,862✔
665

666
  STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid);
9,016,862✔
667

668
  // load imem tomb data and build skyline
669
  TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit);
9,016,108✔
670

671
  // tsdbBuildDeleteSkyline
672
  size_t delSize = TARRAY_SIZE(pMemDelData);
9,016,485✔
673
  if (delSize > 0) {
9,016,485✔
674
    pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
4,108✔
675
    if (!pSkyline) {
4,108✔
676
      TAOS_CHECK_EXIT(terrno);
×
677
    }
678

679
    TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline));
4,108✔
680
    iSkyline = taosArrayGetSize(pSkyline) - 1;
4,108✔
681
  }
682

683
  pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline);
9,016,485✔
684
  if (!pMemRow) {
9,016,862✔
685
    goto _exit;
×
686
  }
687

688
  // iter first row to last_row/last col values to ctxArray, and mark last null col ids
689
  sver = TSDBROW_SVERSION(pMemRow);
9,016,862✔
690
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
9,016,862✔
691
  pTSchema = pTsdb->rCache.pTSchema;
9,016,862✔
692
  nCol = pTSchema->numOfCols;
9,016,862✔
693

694
  tsdbRowGetKey(pMemRow, &tsdbRowKey);
9,016,862✔
695

696
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
9,016,862✔
697

698
  int32_t iCol = 0;
9,016,862✔
699
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
38,084,824✔
700
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
29,066,077✔
701
    if (!taosArrayPush(ctxArray, &updateCtx)) {
29,066,831✔
702
      TAOS_CHECK_EXIT(terrno);
×
703
    }
704

705
    if (COL_VAL_IS_VALUE(pColVal)) {
29,066,831✔
706
      updateCtx.lflag = LFLAG_LAST;
27,459,448✔
707
      if (!taosArrayPush(ctxArray, &updateCtx)) {
27,460,202✔
708
        TAOS_CHECK_EXIT(terrno);
×
709
      }
710
    } else {
711
      if (!iColHash) {
1,607,760✔
712
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
87,020✔
713
        if (iColHash == NULL) {
87,020✔
714
          TAOS_CHECK_EXIT(terrno);
×
715
        }
716
      }
717

718
      if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) {
1,607,760✔
719
        TAOS_CHECK_EXIT(terrno);
×
720
      }
721
    }
722
  }
723
  tsdbRowClose(&iter);
9,019,124✔
724

725
  // continue to get next row to fill null last col values
726
  pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
9,016,862✔
727
  while (pMemRow) {
12,763,336✔
728
    if (tSimpleHashGetSize(iColHash) == 0) {
12,672,943✔
729
      break;
8,925,715✔
730
    }
731

732
    sver = TSDBROW_SVERSION(pMemRow);
3,746,851✔
733
    TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver));
3,746,851✔
734
    pTSchema = pTsdb->rCache.pTSchema;
3,746,851✔
735

736
    STsdbRowKey tsdbRowKey = {0};
3,746,851✔
737
    tsdbRowGetKey(pMemRow, &tsdbRowKey);
3,746,851✔
738

739
    TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema));
3,746,851✔
740

741
    int32_t iCol = 0;
3,747,578✔
742
    for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
99,133,212✔
743
      if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) {
95,502,681✔
744
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
941,840✔
745
        if (!taosArrayPush(ctxArray, &updateCtx)) {
941,840✔
746
          TAOS_CHECK_EXIT(terrno);
×
747
        }
748

749
        TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
941,840✔
750
      }
751
    }
752
    tsdbRowClose(&iter);
3,746,851✔
753

754
    pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
3,747,578✔
755
  }
756

757
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
9,016,108✔
758

759
_exit:
9,016,862✔
760
  if (code) {
9,016,862✔
761
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
762

763
    tsdbRowClose(&iter);
×
764
  }
765

766
  taosArrayClear(ctxArray);
9,016,862✔
767
  // destroy any allocated resource
768
  tSimpleHashCleanup(iColHash);
9,016,862✔
769
  if (pMemDelData) {
9,016,485✔
770
    taosArrayDestroy(pMemDelData);
9,016,485✔
771
  }
772
  if (pSkyline) {
9,016,862✔
773
    taosArrayDestroy(pSkyline);
4,108✔
774
  }
775

776
  TAOS_RETURN(code);
9,016,862✔
777
}
778

779
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
88,550✔
780
  if (!pTsdb) return 0;
88,550✔
781
  if (!pTsdb->imem) return 0;
88,550✔
782

783
  int32_t    code = 0;
68,283✔
784
  int32_t    lino = 0;
68,283✔
785
  SMemTable *imem = pTsdb->imem;
68,283✔
786
  int32_t    nTbData = imem->nTbData;
68,283✔
787
  int64_t    nRow = imem->nRow;
68,283✔
788
  int64_t    nDel = imem->nDel;
68,283✔
789

790
  if (nRow == 0 || nTbData == 0) return 0;
68,283✔
791

792
  TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem));
68,283✔
793

794
_exit:
68,283✔
795
  if (code) {
68,283✔
796
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
797
  } else {
798
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel);
68,283✔
799
  }
800

801
  TAOS_RETURN(code);
68,283✔
802
}
803

804
int32_t tsdbCacheCommit(STsdb *pTsdb) {
88,927✔
805
  int32_t code = 0;
88,927✔
806

807
  // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
808
  // flush dirty data of lru into rocks
809
  // 4, and update when writing if !updateCacheBatch
810
  // 5, merge cache & mem if updateCacheBatch
811

812
  if (tsUpdateCacheBatch) {
88,927✔
813
    code = tsdbCacheUpdateFromIMem(pTsdb);
88,927✔
814
    if (code) {
88,927✔
815
      tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
816

817
      TAOS_RETURN(code);
×
818
    }
819
  }
820

821
  char      *err = NULL;
88,927✔
822
  SLRUCache *pCache = pTsdb->lruCache;
88,927✔
823
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
824

825
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
88,927✔
826

827
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
88,927✔
828

829
#ifdef USE_ROCKSDB
830
  rocksMayWrite(pTsdb, true);
88,927✔
831
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
88,927✔
832
#endif
833
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
88,927✔
834
#ifdef USE_ROCKSDB
835
  if (NULL != err) {
88,927✔
836
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
837
    rocksdb_free(err);
×
838
    code = TSDB_CODE_FAILED;
×
839
  }
840
#endif
841
  TAOS_RETURN(code);
88,927✔
842
}
843

844
static int32_t reallocVarDataVal(SValue *pValue) {
5,006,687✔
845
  if (IS_VAR_DATA_TYPE(pValue->type)) {
5,006,687✔
846
    uint8_t *pVal = pValue->pData;
5,006,687✔
847
    uint32_t nData = pValue->nData;
5,006,687✔
848
    if (nData > 0) {
5,006,687✔
849
      uint8_t *p = taosMemoryMalloc(nData);
3,870,554✔
850
      if (!p) {
3,870,067✔
851
        TAOS_RETURN(terrno);
×
852
      }
853
      pValue->pData = p;
3,870,067✔
854
      (void)memcpy(pValue->pData, pVal, nData);
3,870,067✔
855
    } else {
856
      pValue->pData = NULL;
1,136,133✔
857
    }
858
  }
859

860
  TAOS_RETURN(TSDB_CODE_SUCCESS);
5,006,200✔
861
}
862

863
static int32_t reallocVarData(SColVal *pColVal) { return reallocVarDataVal(&pColVal->value); }
2,751,765✔
864

865
// realloc pk data and col data.
866
static int32_t tsdbCacheReallocSLastCol(SLastCol *pCol, size_t *pCharge) {
126,162,132✔
867
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
126,162,132✔
868
  size_t  charge = sizeof(SLastCol);
126,162,132✔
869

870
  int8_t i = 0;
126,162,132✔
871
  for (; i < pCol->rowKey.numOfPKs; i++) {
133,644,900✔
872
    SValue *pValue = &pCol->rowKey.pks[i];
7,482,768✔
873
    if (IS_VAR_DATA_TYPE(pValue->type)) {
7,482,768✔
874
      TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
2,254,922✔
875
      charge += pValue->nData;
2,254,922✔
876
    }
877
  }
878

879
  if (IS_VAR_DATA_TYPE(pCol->colVal.value.type)) {
126,161,005✔
880
    TAOS_CHECK_EXIT(reallocVarData(&pCol->colVal));
2,757,803✔
881
    charge += pCol->colVal.value.nData;
2,751,278✔
882
  }
883

884
  if (pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
126,170,697✔
885
    if (pCol->colVal.value.nData > 0) {
937,168✔
886
      void *p = taosMemoryMalloc(pCol->colVal.value.nData);
463,782✔
887
      if (!p) TAOS_CHECK_EXIT(terrno);
463,782✔
888
      (void)memcpy(p, pCol->colVal.value.pData, pCol->colVal.value.nData);
463,782✔
889
      pCol->colVal.value.pData = p;
463,782✔
890
    }else {
891
      pCol->colVal.value.pData = NULL;
473,386✔
892
    }
893
    charge += pCol->colVal.value.nData;
937,168✔
894
  }
895

896
  if (pCharge) {
126,168,730✔
897
    *pCharge = charge;
114,807,301✔
898
  }
899

900
_exit:
11,361,429✔
901
  if (TSDB_CODE_SUCCESS != code) {
126,168,730✔
902
    for (int8_t j = 0; j < i; j++) {
×
903
      if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[j].type)) {
×
904
        taosMemoryFree(pCol->rowKey.pks[j].pData);
×
905
      }
906
    }
907

908
    (void)memset(pCol, 0, sizeof(SLastCol));
×
909
  }
910

911
  TAOS_RETURN(code);
126,168,730✔
912
}
913

914
void tsdbCacheFreeSLastColItem(void *pItem) {
12,370,105✔
915
  SLastCol *pCol = (SLastCol *)pItem;
12,370,105✔
916
  for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
18,053,562✔
917
    if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
5,684,184✔
918
      taosMemoryFree(pCol->rowKey.pks[i].pData);
1,663,152✔
919
    }
920
  }
921

922
  if ((IS_VAR_DATA_TYPE(pCol->colVal.value.type) || pCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) &&
12,370,930✔
923
      pCol->colVal.value.pData) {
1,730,028✔
924
    taosMemoryFree(pCol->colVal.value.pData);
1,323,387✔
925
  }
926
}
12,370,930✔
927

928
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
114,803,719✔
929
  SLastCol *pLastCol = (SLastCol *)value;
114,803,719✔
930

931
  if (pLastCol->dirty) {
114,803,719✔
932
    if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
2,334,390✔
933
      STsdb *pTsdb = (STsdb *)ud;
×
934
      tsdbTrace("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
×
935
    }
936
  }
937

938
  for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
117,208,445✔
939
    SValue *pValue = &pLastCol->rowKey.pks[i];
2,422,840✔
940
    if (IS_VAR_DATA_TYPE(pValue->type)) {
2,422,840✔
941
      taosMemoryFree(pValue->pData);
830,170✔
942
    }
943
  }
944

945
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) ||
114,787,119✔
946
      pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL /* && pLastCol->colVal.value.nData > 0*/) {
113,417,081✔
947
    taosMemoryFree(pLastCol->colVal.value.pData);
2,128,557✔
948
  }
949

950
  taosMemoryFree(value);
114,792,397✔
951
}
114,272,328✔
952

953
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
55,044,204✔
954
  SLastCol *pLastCol = (SLastCol *)value;
55,044,204✔
955
  pLastCol->dirty = 0;
55,044,204✔
956
}
55,044,581✔
957

958
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
959

960
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
57,003,535✔
961
  int32_t code = 0, lino = 0;
57,003,535✔
962

963
  SLRUCache *pCache = pTsdb->lruCache;
57,003,535✔
964
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
965
  SRowKey  emptyRowKey = {.ts = TSKEY_MIN, .numOfPKs = 0};
57,006,328✔
966
  SLastCol emptyCol = {
57,006,328✔
967
      .rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
968

969
  SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
57,004,213✔
970
  code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
57,007,251✔
971
  if (code) {
57,004,148✔
972
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
973
  }
974

975
  TAOS_RETURN(code);
57,004,148✔
976
}
977

978
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
1,065,230✔
979
  int32_t code = 0;
1,065,230✔
980
  char   *err = NULL;
1,065,230✔
981

982
  SLRUCache *pCache = pTsdb->lruCache;
1,067,882✔
983
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
984

985
  taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
1,066,570✔
986
#ifdef USE_ROCKSDB
987
  rocksMayWrite(pTsdb, true);
1,068,412✔
988
  rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
1,067,609✔
989
  if (NULL != err) {
1,068,412✔
990
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
×
991
    rocksdb_free(err);
×
992
    code = TSDB_CODE_FAILED;
×
993
  }
994
#endif
995
  TAOS_RETURN(code);
1,068,412✔
996
}
997

998
static int32_t tsdbCacheGetValuesFromRocks(STsdb *pTsdb, size_t numKeys, const char *const *ppKeysList,
9,780,822✔
999
                                           size_t *pKeysListSizes, char ***pppValuesList, size_t **ppValuesListSizes) {
1000
#ifdef USE_ROCKSDB
1001
  char **valuesList = taosMemoryCalloc(numKeys, sizeof(char *));
9,780,822✔
1002
  if (!valuesList) return terrno;
9,780,822✔
1003
  size_t *valuesListSizes = taosMemoryCalloc(numKeys, sizeof(size_t));
9,780,822✔
1004
  if (!valuesListSizes) {
9,779,860✔
1005
    taosMemoryFreeClear(valuesList);
×
1006
    return terrno;
×
1007
  }
1008
  char **errs = taosMemoryCalloc(numKeys, sizeof(char *));
9,779,860✔
1009
  if (!errs) {
9,780,665✔
1010
    taosMemoryFreeClear(valuesList);
×
1011
    taosMemoryFreeClear(valuesListSizes);
×
1012
    return terrno;
×
1013
  }
1014
  rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, numKeys, ppKeysList, pKeysListSizes, valuesList,
9,780,665✔
1015
                    valuesListSizes, errs);
1016
  for (size_t i = 0; i < numKeys; ++i) {
43,851,062✔
1017
    rocksdb_free(errs[i]);
34,071,975✔
1018
  }
1019
  taosMemoryFreeClear(errs);
9,779,087✔
1020

1021
  *pppValuesList = valuesList;
9,779,114✔
1022
  *ppValuesListSizes = valuesListSizes;
9,779,114✔
1023
#endif
1024
  TAOS_RETURN(TSDB_CODE_SUCCESS);
9,779,605✔
1025
}
1026

1027
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
7,103,426✔
1028
  int32_t code = 0;
7,103,426✔
1029

1030
  // build keys & multi get from rocks
1031
  char **keys_list = taosMemoryCalloc(2, sizeof(char *));
7,103,426✔
1032
  if (!keys_list) {
7,103,426✔
1033
    return terrno;
×
1034
  }
1035
  size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
7,103,426✔
1036
  if (!keys_list_sizes) {
7,103,434✔
1037
    taosMemoryFree(keys_list);
×
1038
    return terrno;
×
1039
  }
1040
  const size_t klen = ROCKS_KEY_LEN;
7,103,434✔
1041

1042
  char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
7,103,434✔
1043
  if (!keys) {
7,103,064✔
1044
    taosMemoryFree(keys_list);
×
1045
    taosMemoryFree(keys_list_sizes);
×
1046
    return terrno;
×
1047
  }
1048
  ((SLastKey *)keys)[0] = (SLastKey){.lflag = LFLAG_LAST, .uid = uid, .cid = cid};
7,103,064✔
1049
  ((SLastKey *)keys)[1] = (SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid};
7,103,064✔
1050

1051
  keys_list[0] = keys;
7,103,783✔
1052
  keys_list[1] = keys + sizeof(SLastKey);
7,103,064✔
1053
  keys_list_sizes[0] = klen;
7,103,064✔
1054
  keys_list_sizes[1] = klen;
7,103,434✔
1055

1056
  char  **values_list = NULL;
7,104,153✔
1057
  size_t *values_list_sizes = NULL;
7,104,153✔
1058

1059
  // was written by caller
1060
  // rocksMayWrite(pTsdb, true); // flush writebatch cache
1061

1062
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
7,104,153✔
1063
                                              &values_list_sizes),
1064
                  NULL, _exit);
1065
#ifdef USE_ROCKSDB
1066
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
7,103,030✔
1067
#endif
1068
  {
1069
#ifdef USE_ROCKSDB
1070
    SLastCol *pLastCol = NULL;
7,103,030✔
1071
    if (values_list[0] != NULL) {
7,103,030✔
1072
      code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
76,854✔
1073
      if (code != TSDB_CODE_SUCCESS) {
76,854✔
1074
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1075
                  tstrerror(code));
1076
        goto _exit;
×
1077
      }
1078
      if (NULL != pLastCol) {
76,854✔
1079
        rocksdb_writebatch_delete(wb, keys_list[0], klen);
76,854✔
1080
      }
1081
      taosMemoryFreeClear(pLastCol);
76,854✔
1082
    }
1083

1084
    pLastCol = NULL;
7,103,030✔
1085
    if (values_list[1] != NULL) {
7,103,030✔
1086
      code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
76,854✔
1087
      if (code != TSDB_CODE_SUCCESS) {
76,854✔
1088
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1089
                  tstrerror(code));
1090
        goto _exit;
×
1091
      }
1092
      if (NULL != pLastCol) {
76,854✔
1093
        rocksdb_writebatch_delete(wb, keys_list[1], klen);
76,854✔
1094
      }
1095
      taosMemoryFreeClear(pLastCol);
76,854✔
1096
    }
1097

1098
    rocksdb_free(values_list[0]);
7,103,030✔
1099
    rocksdb_free(values_list[1]);
7,104,153✔
1100
#endif
1101

1102
    for (int i = 0; i < 2; i++) {
21,303,292✔
1103
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
14,202,774✔
1104
      if (h) {
14,201,724✔
1105
        tsdbLRUCacheRelease(pTsdb->lruCache, h, true);
153,708✔
1106
        taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
153,708✔
1107
      }
1108
    }
1109
  }
1110

1111
_exit:
7,100,518✔
1112
  taosMemoryFree(keys_list[0]);
7,100,518✔
1113

1114
  taosMemoryFree(keys_list);
7,103,434✔
1115
  taosMemoryFree(keys_list_sizes);
7,103,269✔
1116
  taosMemoryFree(values_list);
7,103,434✔
1117
  taosMemoryFree(values_list_sizes);
7,102,699✔
1118

1119
  TAOS_RETURN(code);
7,103,434✔
1120
}
1121

1122
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
9,222,542✔
1123
  int32_t code = 0;
9,222,542✔
1124

1125
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
9,222,542✔
1126

1127
  if (suid < 0) {
9,222,542✔
1128
    for (int i = 0; i < pSchemaRow->nCols; ++i) {
125,576✔
1129
      int16_t cid = pSchemaRow->pSchema[i].colId;
108,020✔
1130
      int8_t  col_type = pSchemaRow->pSchema[i].type;
108,020✔
1131

1132
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
108,020✔
1133
      if (code != TSDB_CODE_SUCCESS) {
108,020✔
1134
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1135
                  tstrerror(code));
1136
      }
1137
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
108,020✔
1138
      if (code != TSDB_CODE_SUCCESS) {
108,020✔
1139
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1140
                  tstrerror(code));
1141
      }
1142
    }
1143
  } else {
1144
    STSchema *pTSchema = NULL;
9,204,986✔
1145
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
9,204,986✔
1146
    if (code != TSDB_CODE_SUCCESS) {
9,203,474✔
1147
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1148

1149
      TAOS_RETURN(code);
×
1150
    }
1151

1152
    for (int i = 0; i < pTSchema->numOfCols; ++i) {
37,502,552✔
1153
      int16_t cid = pTSchema->columns[i].colId;
28,298,695✔
1154
      int8_t  col_type = pTSchema->columns[i].type;
28,296,736✔
1155

1156
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
28,296,357✔
1157
      if (code != TSDB_CODE_SUCCESS) {
28,293,846✔
1158
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1159
                  tstrerror(code));
1160
      }
1161
      code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
28,293,846✔
1162
      if (code != TSDB_CODE_SUCCESS) {
28,299,078✔
1163
        tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1164
                  tstrerror(code));
1165
      }
1166
    }
1167

1168
    taosMemoryFree(pTSchema);
9,203,486✔
1169
  }
1170

1171
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
9,222,925✔
1172

1173
  TAOS_RETURN(code);
9,222,925✔
1174
}
1175

1176
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
9,678✔
1177
  int32_t code = 0;
9,678✔
1178

1179
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
9,678✔
1180

1181
  code = tsdbCacheCommitNoLock(pTsdb);
9,678✔
1182
  if (code != TSDB_CODE_SUCCESS) {
9,678✔
1183
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1184
              tstrerror(code));
1185
  }
1186

1187
  if (pSchemaRow != NULL) {
9,678✔
1188
    bool hasPrimayKey = false;
×
1189
    int  nCols = pSchemaRow->nCols;
×
1190
    if (nCols >= 2) {
×
1191
      hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
×
1192
    }
1193
    for (int i = 0; i < nCols; ++i) {
×
1194
      int16_t cid = pSchemaRow->pSchema[i].colId;
×
1195
      int8_t  col_type = pSchemaRow->pSchema[i].type;
×
1196

1197
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1198
      if (code != TSDB_CODE_SUCCESS) {
×
1199
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1200
                  tstrerror(code));
1201
      }
1202
    }
1203
  } else {
1204
    STSchema *pTSchema = NULL;
9,678✔
1205
    code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
9,678✔
1206
    if (code != TSDB_CODE_SUCCESS) {
9,678✔
1207
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1208

1209
      TAOS_RETURN(code);
×
1210
    }
1211

1212
    bool hasPrimayKey = false;
9,678✔
1213
    int  nCols = pTSchema->numOfCols;
9,678✔
1214
    if (nCols >= 2) {
9,678✔
1215
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
9,678✔
1216
    }
1217
    for (int i = 0; i < nCols; ++i) {
37,692✔
1218
      int16_t cid = pTSchema->columns[i].colId;
28,014✔
1219
      int8_t  col_type = pTSchema->columns[i].type;
28,014✔
1220

1221
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
28,014✔
1222
      if (code != TSDB_CODE_SUCCESS) {
28,014✔
1223
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1224
                  tstrerror(code));
1225
      }
1226
    }
1227

1228
    taosMemoryFree(pTSchema);
9,678✔
1229
  }
1230

1231
  rocksMayWrite(pTsdb, false);
9,678✔
1232

1233
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
9,678✔
1234

1235
  TAOS_RETURN(code);
9,678✔
1236
}
1237

1238
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
1,045,668✔
1239
  int32_t code = 0;
1,045,668✔
1240

1241
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
1,045,668✔
1242

1243
  code = tsdbCacheCommitNoLock(pTsdb);
1,048,966✔
1244
  if (code != TSDB_CODE_SUCCESS) {
1,048,966✔
1245
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1246
              tstrerror(code));
1247
  }
1248

1249
  STSchema *pTSchema = NULL;
1,048,966✔
1250
  code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
1,048,966✔
1251
  if (code != TSDB_CODE_SUCCESS) {
1,048,966✔
1252
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
3,034✔
1253

1254
    TAOS_RETURN(code);
2,304✔
1255
  }
1256

1257
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
1,955,361✔
1258
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
909,429✔
1259

1260
    bool hasPrimayKey = false;
909,429✔
1261
    int  nCols = pTSchema->numOfCols;
909,429✔
1262
    if (nCols >= 2) {
909,421✔
1263
      hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
909,421✔
1264
    }
1265

1266
    for (int i = 0; i < nCols; ++i) {
7,934,555✔
1267
      int16_t cid = pTSchema->columns[i].colId;
7,025,126✔
1268
      int8_t  col_type = pTSchema->columns[i].type;
7,024,743✔
1269

1270
      code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
7,025,462✔
1271
      if (code != TSDB_CODE_SUCCESS) {
7,025,134✔
1272
        tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1273
                  tstrerror(code));
1274
      }
1275
    }
1276
  }
1277

1278
  taosMemoryFree(pTSchema);
1,046,651✔
1279

1280
  rocksMayWrite(pTsdb, false);
1,045,932✔
1281

1282
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
1,045,932✔
1283

1284
  TAOS_RETURN(code);
1,045,932✔
1285
}
1286

1287
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
×
1288
  int32_t code = 0;
×
1289

1290
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1291

1292
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
×
1293
  if (code != TSDB_CODE_SUCCESS) {
×
1294
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1295
              tstrerror(code));
1296
  }
1297
  code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
×
1298
  if (code != TSDB_CODE_SUCCESS) {
×
1299
    tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1300
              tstrerror(code));
1301
  }
1302
  // rocksMayWrite(pTsdb, true, false, false);
1303
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1304

1305
  TAOS_RETURN(code);
×
1306
}
1307

1308
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
×
1309
  int32_t code = 0;
×
1310

1311
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
×
1312

1313
  code = tsdbCacheCommitNoLock(pTsdb);
×
1314
  if (code != TSDB_CODE_SUCCESS) {
×
1315
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1316
              tstrerror(code));
1317
  }
1318

1319
  code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
×
1320
  if (code != TSDB_CODE_SUCCESS) {
×
1321
    tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1322
              tstrerror(code));
1323
  }
1324

1325
  rocksMayWrite(pTsdb, false);
×
1326

1327
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1328

1329
  TAOS_RETURN(code);
×
1330
}
1331

1332
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
19,536✔
1333
  int32_t code = 0;
19,536✔
1334

1335
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
19,536✔
1336

1337
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
117,216✔
1338
    tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
97,680✔
1339

1340
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
97,680✔
1341
    if (code != TSDB_CODE_SUCCESS) {
97,680✔
1342
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1343
                tstrerror(code));
1344
    }
1345
    code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
97,680✔
1346
    if (code != TSDB_CODE_SUCCESS) {
97,680✔
1347
      tsdbTrace("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1348
                tstrerror(code));
1349
    }
1350
  }
1351

1352
  // rocksMayWrite(pTsdb, true, false, false);
1353
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
19,536✔
1354
  TAOS_RETURN(code);
19,536✔
1355
}
1356

1357
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
9,768✔
1358
  int32_t code = 0;
9,768✔
1359

1360
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
9,768✔
1361

1362
  code = tsdbCacheCommitNoLock(pTsdb);
9,768✔
1363
  if (code != TSDB_CODE_SUCCESS) {
9,768✔
1364
    tsdbTrace("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1365
              tstrerror(code));
1366
  }
1367

1368
  for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
58,608✔
1369
    int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
48,840✔
1370

1371
    code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
48,840✔
1372
    if (code != TSDB_CODE_SUCCESS) {
48,840✔
1373
      tsdbTrace("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1374
                tstrerror(code));
1375
    }
1376
  }
1377

1378
  rocksMayWrite(pTsdb, false);
9,768✔
1379

1380
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
9,768✔
1381

1382
  TAOS_RETURN(code);
9,768✔
1383
}
1384

1385
typedef struct {
1386
  int      idx;
1387
  SLastKey key;
1388
} SIdxKey;
1389

1390
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
×
1391
  // update rowkey
1392
  pLastCol->rowKey.ts = TSKEY_MIN;
×
1393
  for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
×
1394
    SValue *pPKValue = &pLastCol->rowKey.pks[i];
×
1395
    if (IS_VAR_DATA_TYPE(pPKValue->type) && pPKValue->nData > 0) {
×
1396
      taosMemoryFreeClear(pPKValue->pData);
×
1397
      pPKValue->nData = 0;
×
1398
    } else {
1399
      valueClearDatum(pPKValue, pPKValue->type);
×
1400
    }
1401
  }
1402
  pLastCol->rowKey.numOfPKs = 0;
×
1403

1404
  // update colval
1405
  if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type) && pLastCol->colVal.value.nData > 0) {
×
1406
    taosMemoryFreeClear(pLastCol->colVal.value.pData);
×
1407
    pLastCol->colVal.value.nData = 0;
×
1408
  } else {
1409
    valueClearDatum(&pLastCol->colVal.value, pLastCol->colVal.value.type);
×
1410
  }
1411

1412
  pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
×
1413
  pLastCol->dirty = 1;
×
1414
  pLastCol->cacheStatus = cacheStatus;
×
1415
}
×
1416

1417
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
61,309,227✔
1418
  int32_t code = 0;
61,309,227✔
1419
#ifdef USE_ROCKSDB
1420
  char  *rocks_value = NULL;
61,309,227✔
1421
  size_t vlen = 0;
61,309,604✔
1422

1423
  code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
61,309,604✔
1424
  if (code) {
61,310,085✔
1425
    tsdbError("tsdb/cache/putrocks: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
1426
    TAOS_RETURN(code);
×
1427
  }
1428

1429
  rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
61,310,085✔
1430
  (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
61,310,085✔
1431
  rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
61,310,462✔
1432
  (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
61,308,200✔
1433

1434
  taosMemoryFree(rocks_value);
61,310,462✔
1435
#endif
1436
  TAOS_RETURN(code);
61,308,954✔
1437
}
1438

1439
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
114,797,102✔
1440
  int32_t code = 0, lino = 0;
114,797,102✔
1441

1442
  SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
114,797,102✔
1443
  if (!pLRULastCol) {
114,776,293✔
1444
    return terrno;
×
1445
  }
1446

1447
  size_t charge = 0;
114,776,293✔
1448
  *pLRULastCol = *pLastCol;
114,777,432✔
1449
  pLRULastCol->dirty = dirty;
114,778,415✔
1450
  TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
114,791,086✔
1451

1452
  LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
114,805,810✔
1453
                                        tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
1454
  if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
114,809,648✔
1455
    tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
×
1456
    code = TSDB_CODE_FAILED;
×
1457
    pLRULastCol = NULL;
×
1458
  }
1459

1460
_exit:
114,809,648✔
1461
  if (TSDB_CODE_SUCCESS != code) {
114,809,327✔
1462
    taosMemoryFree(pLRULastCol);
×
1463
    tsdbError("tsdb/cache/putlru: vgId:%d, failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
1464
  }
1465

1466
  TAOS_RETURN(code);
114,809,327✔
1467
}
1468

1469
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) {
9,016,108✔
1470
  if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) {
9,016,108✔
1471
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1472
  }
1473

1474
  int32_t code = 0, lino = 0;
9,016,485✔
1475

1476
  int        num_keys = TARRAY_SIZE(updCtxArray);
9,016,485✔
1477
  SArray    *remainCols = NULL;
9,016,485✔
1478
  SLRUCache *pCache = pTsdb->lruCache;
9,016,485✔
1479

1480
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
9,016,485✔
1481
  for (int i = 0; i < num_keys; ++i) {
66,482,342✔
1482
    SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
57,465,480✔
1483
    int8_t          lflag = updCtx->lflag;
57,464,349✔
1484
    SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
57,464,349✔
1485
    SColVal        *pColVal = &updCtx->colVal;
57,465,480✔
1486

1487
    if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) {
57,465,480✔
1488
      continue;
×
1489
    }
1490

1491
    SLastKey  *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
57,465,480✔
1492
    LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
57,466,611✔
1493
    if (h) {
57,472,266✔
1494
      SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
55,046,703✔
1495
      if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
54,870,644✔
1496
        int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
55,040,714✔
1497
        if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
55,039,960✔
1498
          SLastCol newLastCol = {
55,038,118✔
1499
              .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1500
          code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
55,037,741✔
1501
        }
1502
      }
1503

1504
      tsdbLRUCacheRelease(pCache, h, false);
55,050,096✔
1505
      TAOS_CHECK_EXIT(code);
55,039,540✔
1506
    } else {
1507
      if (!remainCols) {
2,425,563✔
1508
        remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
66,106✔
1509
        if (!remainCols) {
66,106✔
1510
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1511
        }
1512
      }
1513
      if (!taosArrayPush(remainCols, &(SIdxKey){i, *key})) {
4,851,126✔
1514
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1515
      }
1516
    }
1517
  }
1518

1519
  if (remainCols) {
9,016,862✔
1520
    num_keys = TARRAY_SIZE(remainCols);
66,106✔
1521
  }
1522
  if (remainCols && num_keys > 0) {
9,016,862✔
1523
    char  **keys_list = NULL;
66,106✔
1524
    size_t *keys_list_sizes = NULL;
66,106✔
1525
    char  **values_list = NULL;
66,106✔
1526
    size_t *values_list_sizes = NULL;
66,106✔
1527
    char  **errs = NULL;
66,106✔
1528
    keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
66,106✔
1529
    if (!keys_list) {
66,106✔
1530
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1531
      return terrno;
×
1532
    }
1533
    keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
66,106✔
1534
    if (!keys_list_sizes) {
66,106✔
1535
      taosMemoryFree(keys_list);
×
1536
      (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
1537
      return terrno;
×
1538
    }
1539
    for (int i = 0; i < num_keys; ++i) {
2,491,669✔
1540
      SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
2,425,563✔
1541

1542
      keys_list[i] = (char *)&idxKey->key;
2,425,563✔
1543
      keys_list_sizes[i] = ROCKS_KEY_LEN;
2,425,563✔
1544
    }
1545

1546
    rocksMayWrite(pTsdb, true);  // flush writebatch cache
66,106✔
1547

1548
    code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
66,106✔
1549
                                       &values_list_sizes);
1550
    if (code) {
66,106✔
1551
      taosMemoryFree(keys_list);
×
1552
      taosMemoryFree(keys_list_sizes);
×
1553
      goto _exit;
×
1554
    }
1555

1556
    // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
1557
    for (int i = 0; i < num_keys; ++i) {
2,490,942✔
1558
      SIdxKey        *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
2,425,563✔
1559
      SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, idxKey->idx);
2,425,563✔
1560
      SRowKey        *pRowKey = &updCtx->tsdbRowKey.key;
2,425,563✔
1561
      SColVal        *pColVal = &updCtx->colVal;
2,425,563✔
1562

1563
      SLastCol *pLastCol = NULL;
2,425,563✔
1564
      if (values_list[i] != NULL) {
2,425,563✔
1565
        code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
1566
        if (code != TSDB_CODE_SUCCESS) {
×
1567
          tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
1568
                    tstrerror(code));
1569
          goto _exit;
×
1570
        }
1571
      }
1572
      /*
1573
      if (code) {
1574
        tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
1575
      }
1576
      */
1577
      SLastCol *pToFree = pLastCol;
2,425,563✔
1578

1579
      if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
2,425,563✔
1580
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
×
1581
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1582
                    tstrerror(code));
1583
          taosMemoryFreeClear(pToFree);
×
1584
          break;
×
1585
        }
1586

1587
        // cache invalid => skip update
1588
        taosMemoryFreeClear(pToFree);
×
1589
        continue;
×
1590
      }
1591

1592
      if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
2,425,563✔
1593
        taosMemoryFreeClear(pToFree);
×
1594
        continue;
×
1595
      }
1596

1597
      int32_t cmp_res = 1;
2,425,563✔
1598
      if (pLastCol) {
2,425,563✔
1599
        cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
×
1600
      }
1601

1602
      if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
2,425,563✔
1603
        SLastCol lastColTmp = {
2,425,563✔
1604
            .rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
1605
        if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
2,425,563✔
1606
          tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1607
                    tstrerror(code));
1608
          taosMemoryFreeClear(pToFree);
×
1609
          break;
×
1610
        }
1611
        if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
2,425,563✔
1612
          tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
×
1613
                    tstrerror(code));
1614
          taosMemoryFreeClear(pToFree);
×
1615
          break;
×
1616
        }
1617
      }
1618

1619
      taosMemoryFreeClear(pToFree);
2,424,836✔
1620
    }
1621

1622
    rocksMayWrite(pTsdb, false);
65,379✔
1623

1624
    taosMemoryFree(keys_list);
66,106✔
1625
    taosMemoryFree(keys_list_sizes);
66,106✔
1626
    if (values_list) {
66,106✔
1627
#ifdef USE_ROCKSDB
1628
      for (int i = 0; i < num_keys; ++i) {
2,491,669✔
1629
        rocksdb_free(values_list[i]);
2,425,563✔
1630
      }
1631
#endif
1632
      taosMemoryFree(values_list);
66,106✔
1633
    }
1634
    taosMemoryFree(values_list_sizes);
66,106✔
1635
  }
1636

1637
_exit:
9,016,862✔
1638
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
9,016,862✔
1639
  taosArrayDestroy(remainCols);
9,016,862✔
1640

1641
  if (code) {
9,016,862✔
1642
    tsdbError("tsdb/cache: vgId:%d, update failed at line %d since %s.", TD_VID(pTsdb->pVnode), __LINE__,
×
1643
              tstrerror(code));
1644
  }
1645

1646
  TAOS_RETURN(code);
9,016,862✔
1647
}
1648

1649
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
×
1650
                                 SRow **aRow) {
1651
  int32_t code = 0, lino = 0;
×
1652

1653
  // 1. prepare last
1654
  TSDBROW      lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
×
1655
  STSchema    *pTSchema = NULL;
×
1656
  int32_t      sver = TSDBROW_SVERSION(&lRow);
×
1657
  SSHashObj   *iColHash = NULL;
×
1658
  STSDBRowIter iter = {0};
×
1659

1660
  TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
×
1661
  pTSchema = pTsdb->rCache.pTSchema;
×
1662

1663
  TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
×
1664
  int32_t nCol = pTSchema->numOfCols;
×
1665
  SArray *ctxArray = pTsdb->rCache.ctxArray;
×
1666

1667
  // 1. prepare by lrow
1668
  STsdbRowKey tsdbRowKey = {0};
×
1669
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1670

1671
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1672

1673
  int32_t iCol = 0;
×
1674
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
×
1675
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1676
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1677
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1678
    }
1679

1680
    if (COL_VAL_IS_VALUE(pColVal)) {
×
1681
      updateCtx.lflag = LFLAG_LAST;
×
1682
      if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1683
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1684
      }
1685
    } else {
1686
      if (!iColHash) {
×
1687
        iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
1688
        if (iColHash == NULL) {
×
1689
          TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1690
        }
1691
      }
1692

1693
      if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
×
1694
        TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
×
1695
      }
1696
    }
1697
  }
1698

1699
  // 2. prepare by the other rows
1700
  for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) {
×
1701
    if (tSimpleHashGetSize(iColHash) == 0) {
×
1702
      break;
×
1703
    }
1704

1705
    tRow.pTSRow = aRow[iRow];
×
1706

1707
    STsdbRowKey tsdbRowKey = {0};
×
1708
    tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1709

1710
    void   *pIte = NULL;
×
1711
    int32_t iter = 0;
×
1712
    while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
×
1713
      int32_t iCol = ((int32_t *)pIte)[0];
×
1714
      SColVal colVal = COL_VAL_NONE(0, 0);
×
1715
      tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal);
×
1716

1717
      if (COL_VAL_IS_VALUE(&colVal)) {
×
1718
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1719
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1720
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1721
        }
1722
        code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
×
1723
        if (code != TSDB_CODE_SUCCESS) {
×
1724
          tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
×
1725
                    __LINE__, tstrerror(code));
1726
        }
1727
      }
1728
    }
1729
  }
1730

1731
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1732

1733
_exit:
×
1734
  if (code) {
×
1735
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1736
  }
1737

1738
  tsdbRowClose(&iter);
×
1739
  tSimpleHashCleanup(iColHash);
×
1740
  taosArrayClear(ctxArray);
×
1741

1742
  TAOS_RETURN(code);
×
1743
}
1744

1745
int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) {
×
1746
  int32_t      code = 0, lino = 0;
×
1747
  STSDBRowIter iter = {0};
×
1748
  STSchema    *pTSchema = NULL;
×
1749
  SArray      *ctxArray = NULL;
×
1750

1751
  TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1);
×
1752
  int32_t sver = TSDBROW_SVERSION(&lRow);
×
1753

1754
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
×
1755

1756
  ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
×
1757
  if (ctxArray == NULL) {
×
1758
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1759
  }
1760

1761
  // 1. prepare last
1762
  STsdbRowKey tsdbRowKey = {0};
×
1763
  tsdbRowGetKey(&lRow, &tsdbRowKey);
×
1764

1765
  {
1766
    SValue tsVal = {.type = TSDB_DATA_TYPE_TIMESTAMP};
×
1767
    VALUE_SET_TRIVIAL_DATUM(&tsVal, lRow.pBlockData->aTSKEY[lRow.iRow]);
×
1768
    SLastUpdateCtx updateCtx = {
×
1769
        .lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, tsVal)};
1770
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1771
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1772
    }
1773
  }
1774

1775
  TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
×
1776

1777
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
1778
    SColData *pColData = &pBlockData->aColData[iColData];
×
1779
    if ((pColData->flag & HAS_VALUE) != HAS_VALUE) {
×
1780
      continue;
×
1781
    }
1782

1783
    for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) {
×
1784
      STsdbRowKey tsdbRowKey = {0};
×
1785
      tsdbRowGetKey(&tRow, &tsdbRowKey);
×
1786

1787
      uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
×
1788
      if (colType == 2) {
×
1789
        SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
×
1790
        TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit);
×
1791

1792
        SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
×
1793
        if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1794
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1795
        }
1796
        break;
×
1797
      }
1798
    }
1799
  }
1800

1801
  // 2. prepare last row
1802
  TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
×
1803
  for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
×
1804
    SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
×
1805
    if (!taosArrayPush(ctxArray, &updateCtx)) {
×
1806
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
1807
    }
1808
  }
1809

1810
  TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit);
×
1811

1812
_exit:
×
1813
  tsdbRowClose(&iter);
×
1814
  taosMemoryFreeClear(pTSchema);
×
1815
  taosArrayDestroy(ctxArray);
×
1816

1817
  TAOS_RETURN(code);
×
1818
}
1819

1820
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1821
                            int nCols, int16_t *slotIds);
1822

1823
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
1824
                               int nCols, int16_t *slotIds);
1825

1826
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
115,640✔
1827
                                    SCacheRowsReader *pr, int8_t ltype) {
1828
  int32_t code = 0, lino = 0;
115,640✔
1829
  // rocksdb_writebatch_t *wb = NULL;
1830
  SArray *pTmpColArray = NULL;
115,640✔
1831
  bool    extraTS = false;
115,640✔
1832

1833
  SIdxKey *idxKey = taosArrayGet(remainCols, 0);
115,640✔
1834
  if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
115,640✔
1835
    // ignore 'ts' loaded from cache and load it from tsdb
1836
    // SLastCol *pLastCol = taosArrayGet(pLastArray, 0);
1837
    // tsdbCacheUpdateLastColToNone(pLastCol, TSDB_LAST_CACHE_NO_CACHE);
1838

1839
    SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
×
1840
    if (!taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key})) {
×
1841
      TAOS_RETURN(terrno);
×
1842
    }
1843

1844
    extraTS = true;
×
1845
  }
1846

1847
  int      num_keys = TARRAY_SIZE(remainCols);
115,640✔
1848
  int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
115,640✔
1849

1850
  int16_t *lastColIds = NULL, *lastSlotIds = NULL, *lastrowColIds = NULL, *lastrowSlotIds = NULL;
115,640✔
1851
  lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
115,640✔
1852
  lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
115,640✔
1853
  lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
115,640✔
1854
  lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
115,169✔
1855
  SArray *lastTmpColArray = NULL, *lastTmpIndexArray = NULL, *lastrowTmpColArray = NULL, *lastrowTmpIndexArray = NULL;
115,640✔
1856

1857
  int lastIndex = 0;
115,640✔
1858
  int lastrowIndex = 0;
115,640✔
1859

1860
  if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
115,640✔
1861
    TAOS_CHECK_EXIT(terrno);
×
1862
  }
1863

1864
  for (int i = 0; i < num_keys; ++i) {
389,098✔
1865
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
272,484✔
1866
    if (extraTS && !i) {
272,971✔
1867
      slotIds[i] = 0;
×
1868
    } else {
1869
      slotIds[i] = pr->pSlotIds[idxKey->idx];
272,971✔
1870
    }
1871

1872
    if (IS_LAST_KEY(idxKey->key)) {
272,500✔
1873
      if (NULL == lastTmpIndexArray) {
139,964✔
1874
        lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
59,284✔
1875
        if (!lastTmpIndexArray) {
59,284✔
1876
          TAOS_CHECK_EXIT(terrno);
×
1877
        }
1878
      }
1879
      if (!taosArrayPush(lastTmpIndexArray, &(i))) {
139,964✔
1880
        TAOS_CHECK_EXIT(terrno);
×
1881
      }
1882
      lastColIds[lastIndex] = idxKey->key.cid;
139,964✔
1883
      if (extraTS && !i) {
139,964✔
1884
        lastSlotIds[lastIndex] = 0;
×
1885
      } else {
1886
        lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
139,964✔
1887
      }
1888
      lastIndex++;
139,964✔
1889
    } else {
1890
      if (NULL == lastrowTmpIndexArray) {
132,536✔
1891
        lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
56,356✔
1892
        if (!lastrowTmpIndexArray) {
56,356✔
1893
          TAOS_CHECK_EXIT(terrno);
×
1894
        }
1895
      }
1896
      if (!taosArrayPush(lastrowTmpIndexArray, &(i))) {
133,494✔
1897
        TAOS_CHECK_EXIT(terrno);
×
1898
      }
1899
      lastrowColIds[lastrowIndex] = idxKey->key.cid;
133,494✔
1900
      if (extraTS && !i) {
133,494✔
1901
        lastrowSlotIds[lastrowIndex] = 0;
×
1902
      } else {
1903
        lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
133,494✔
1904
      }
1905
      lastrowIndex++;
133,494✔
1906
    }
1907
  }
1908

1909
  pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
115,640✔
1910
  if (!pTmpColArray) {
115,640✔
1911
    TAOS_CHECK_EXIT(terrno);
×
1912
  }
1913

1914
  if (lastTmpIndexArray != NULL) {
115,640✔
1915
    TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
59,284✔
1916
    for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
191,352✔
1917
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
132,068✔
1918
                           taosArrayGet(lastTmpColArray, i))) {
132,068✔
1919
        TAOS_CHECK_EXIT(terrno);
×
1920
      }
1921
    }
1922
  }
1923

1924
  if (lastrowTmpIndexArray != NULL) {
115,640✔
1925
    TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
56,356✔
1926
    for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
186,002✔
1927
      if (!taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
129,646✔
1928
                           taosArrayGet(lastrowTmpColArray, i))) {
129,646✔
1929
        TAOS_CHECK_EXIT(terrno);
×
1930
      }
1931
    }
1932
  }
1933

1934
  SLRUCache *pCache = pTsdb->lruCache;
115,640✔
1935
  for (int i = 0; i < num_keys; ++i) {
389,098✔
1936
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
273,458✔
1937
    SLastCol *pLastCol = NULL;
273,458✔
1938

1939
    if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= i + 1) {
273,458✔
1940
      pLastCol = taosArrayGet(pTmpColArray, i);
261,714✔
1941
    }
1942

1943
    // still null, then make up a none col value
1944
    SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
273,458✔
1945
                        .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type),
273,458✔
1946
                        .cacheStatus = TSDB_LAST_CACHE_VALID};
1947
    if (!pLastCol) {
273,458✔
1948
      pLastCol = &noneCol;
11,744✔
1949
    }
1950

1951
    if (!extraTS || i > 0) {
273,458✔
1952
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from tsdb, col_id:%d col_flag:%d ts:%" PRId64,
273,458✔
1953
                TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, pLastCol->colVal.cid,
1954
                pLastCol->colVal.flag, pLastCol->rowKey.ts);
1955
      taosArraySet(pLastArray, idxKey->idx, pLastCol);
273,458✔
1956
    }
1957

1958
    // taosArrayRemove(remainCols, i);
1959

1960
    if (/*!pTmpColArray*/ lastTmpIndexArray && !lastTmpColArray) {
273,458✔
1961
      continue;
×
1962
    }
1963
    if (/*!pTmpColArray*/ lastrowTmpIndexArray && !lastrowTmpColArray) {
273,458✔
1964
      continue;
×
1965
    }
1966

1967
    // store result back to rocks cache
1968
    code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
273,458✔
1969
    if (code) {
273,458✔
1970
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1971
      TAOS_CHECK_EXIT(code);
×
1972
    }
1973

1974
    code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
273,458✔
1975
    if (code) {
273,458✔
1976
      tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
1977
      TAOS_CHECK_EXIT(code);
×
1978
    }
1979

1980
    if (extraTS && i == 0) {
273,458✔
1981
      tsdbCacheFreeSLastColItem(pLastCol);
×
1982
    }
1983
  }
1984

1985
  rocksMayWrite(pTsdb, false);
115,640✔
1986

1987
_exit:
115,640✔
1988
  taosArrayDestroy(lastrowTmpIndexArray);
115,640✔
1989
  taosArrayDestroy(lastrowTmpColArray);
115,640✔
1990
  taosArrayDestroy(lastTmpIndexArray);
115,640✔
1991
  taosArrayDestroy(lastTmpColArray);
115,640✔
1992

1993
  taosMemoryFree(lastColIds);
115,640✔
1994
  taosMemoryFree(lastSlotIds);
115,640✔
1995
  taosMemoryFree(lastrowColIds);
115,640✔
1996
  taosMemoryFree(lastrowSlotIds);
115,640✔
1997

1998
  taosArrayDestroy(pTmpColArray);
115,640✔
1999

2000
  taosMemoryFree(slotIds);
115,640✔
2001

2002
  TAOS_RETURN(code);
115,640✔
2003
}
2004

2005
static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
142,805✔
2006
                                      SArray *ignoreFromRocks, SCacheRowsReader *pr, int8_t ltype) {
2007
  int32_t code = 0, lino = 0;
142,805✔
2008
  int     num_keys = TARRAY_SIZE(remainCols);
142,805✔
2009
  char  **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
142,805✔
2010
  size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
142,805✔
2011
  char   *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
142,805✔
2012
  if (!keys_list || !keys_list_sizes || !key_list) {
142,805✔
2013
    taosMemoryFree(keys_list);
×
2014
    taosMemoryFree(keys_list_sizes);
×
2015
    TAOS_RETURN(terrno);
×
2016
  }
2017
  char  **values_list = NULL;
142,805✔
2018
  size_t *values_list_sizes = NULL;
142,805✔
2019
  for (int i = 0; i < num_keys; ++i) {
476,917✔
2020
    memcpy(key_list + i * ROCKS_KEY_LEN, &((SIdxKey *)taosArrayGet(remainCols, i))->key, ROCKS_KEY_LEN);
334,112✔
2021
    keys_list[i] = key_list + i * ROCKS_KEY_LEN;
334,112✔
2022
    keys_list_sizes[i] = ROCKS_KEY_LEN;
333,641✔
2023
  }
2024

2025
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
142,805✔
2026

2027
  code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
142,805✔
2028
                                     &values_list_sizes);
2029
  if (code) {
142,805✔
2030
    taosMemoryFree(key_list);
×
2031
    taosMemoryFree(keys_list);
×
2032
    taosMemoryFree(keys_list_sizes);
×
2033
    TAOS_RETURN(code);
×
2034
  }
2035

2036
  SLRUCache *pCache = pTsdb->lruCache;
142,805✔
2037
  for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
474,514✔
2038
    SLastCol *pLastCol = NULL;
332,667✔
2039
    bool      ignore = ((bool *)TARRAY_DATA(ignoreFromRocks))[i];
332,667✔
2040
    if (ignore) {
332,180✔
2041
      ++j;
614✔
2042
      continue;
614✔
2043
    }
2044

2045
    if (values_list[i] != NULL) {
331,566✔
2046
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
61,125✔
2047
      if (code != TSDB_CODE_SUCCESS) {
61,125✔
2048
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2049
                  tstrerror(code));
2050
        goto _exit;
×
2051
      }
2052
    }
2053
    SLastCol *pToFree = pLastCol;
331,566✔
2054
    SIdxKey  *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
331,566✔
2055
    if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
392,220✔
2056
      code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
61,125✔
2057
      if (code) {
61,125✔
2058
        tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
2059
        taosMemoryFreeClear(pToFree);
×
2060
        TAOS_CHECK_EXIT(code);
×
2061
      }
2062

2063
      SLastCol lastCol = *pLastCol;
61,125✔
2064
      code = tsdbCacheReallocSLastCol(&lastCol, NULL);
61,125✔
2065
      if (TSDB_CODE_SUCCESS != code) {
61,125✔
2066
        taosMemoryFreeClear(pToFree);
×
2067
        TAOS_CHECK_EXIT(code);
×
2068
      }
2069

2070
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from rocksdb, col_id:%d col_flag:%d ts:%" PRId64,
61,125✔
2071
                TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2072
                lastCol.colVal.flag, lastCol.rowKey.ts);
2073

2074
      taosArraySet(pLastArray, idxKey->idx, &lastCol);
61,125✔
2075
      taosArrayRemove(remainCols, j);
61,125✔
2076
      taosArrayRemove(ignoreFromRocks, j);
61,125✔
2077
    } else {
2078
      ++j;
270,441✔
2079
    }
2080

2081
    taosMemoryFreeClear(pToFree);
331,566✔
2082
  }
2083

2084
  if (TARRAY_SIZE(remainCols) > 0) {
141,847✔
2085
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from raw", TD_VID(pTsdb->pVnode), uid);
2086
    code = tsdbCacheLoadFromRaw(pTsdb, uid, pLastArray, remainCols, pr, ltype);
114,682✔
2087
  }
2088

2089
_exit:
142,805✔
2090
  taosMemoryFree(key_list);
142,805✔
2091
  taosMemoryFree(keys_list);
142,805✔
2092
  taosMemoryFree(keys_list_sizes);
142,805✔
2093
  if (values_list) {
142,805✔
2094
#ifdef USE_ROCKSDB
2095
    for (int i = 0; i < num_keys; ++i) {
477,388✔
2096
      rocksdb_free(values_list[i]);
334,583✔
2097
    }
2098
#endif
2099
    taosMemoryFree(values_list);
142,805✔
2100
  }
2101
  taosMemoryFree(values_list_sizes);
142,805✔
2102

2103
  TAOS_RETURN(code);
142,805✔
2104
}
2105

2106
static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
2,471,621✔
2107
                                        int8_t ltype, SArray *keyArray) {
2108
  int32_t    code = 0, lino = 0;
2,471,621✔
2109
  SArray    *remainCols = NULL;
2,471,621✔
2110
  SArray    *ignoreFromRocks = NULL;
2,471,621✔
2111
  SLRUCache *pCache = pTsdb->lruCache;
2,471,621✔
2112
  SArray    *pCidList = pr->pCidList;
2,472,108✔
2113
  int        numKeys = TARRAY_SIZE(pCidList);
2,472,108✔
2114

2115
  for (int i = 0; i < numKeys; ++i) {
8,540,862✔
2116
    int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
6,068,754✔
2117

2118
    SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid};
6,068,754✔
2119
    // for select last_row, last case
2120
    int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
6,068,754✔
2121
    if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
6,068,754✔
2122
      funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
×
2123
    }
2124
    if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
6,067,823✔
2125
      int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
×
2126
      key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
×
2127
    }
2128

2129
    if (!taosArrayPush(keyArray, &key)) {
6,068,754✔
2130
      TAOS_CHECK_EXIT(terrno);
×
2131
    }
2132

2133
    LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN);
6,068,754✔
2134
    SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
6,068,754✔
2135
    if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
11,802,925✔
2136
      SLastCol lastCol = *pLastCol;
5,734,171✔
2137
      if (TSDB_CODE_SUCCESS != (code = tsdbCacheReallocSLastCol(&lastCol, NULL))) {
5,734,171✔
2138
        tsdbLRUCacheRelease(pCache, h, false);
×
2139
        TAOS_CHECK_GOTO(code, NULL, _exit);
×
2140
      }
2141

2142
      tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from lru, col_id:%d col_flag:%d ts:%" PRId64, TD_VID(pTsdb->pVnode),
5,734,171✔
2143
                __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid, lastCol.colVal.flag,
2144
                lastCol.rowKey.ts);
2145

2146
      if (taosArrayPush(pLastArray, &lastCol) == NULL) {
5,734,171✔
2147
        code = terrno;
×
2148
        tsdbLRUCacheRelease(pCache, h, false);
×
2149
        goto _exit;
×
2150
      }
2151
    } else {
2152
      // no cache or cache is invalid
2153
      SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
334,583✔
2154
                          .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
334,583✔
2155

2156
      if (taosArrayPush(pLastArray, &noneCol) == NULL) {
334,583✔
2157
        code = terrno;
×
2158
        tsdbLRUCacheRelease(pCache, h, false);
×
2159
        goto _exit;
×
2160
      }
2161

2162
      if (!remainCols) {
334,583✔
2163
        if ((remainCols = taosArrayInit(numKeys, sizeof(SIdxKey))) == NULL) {
142,805✔
2164
          code = terrno;
×
2165
          tsdbLRUCacheRelease(pCache, h, false);
×
2166
          goto _exit;
×
2167
        }
2168
      }
2169
      if (!ignoreFromRocks) {
334,583✔
2170
        if ((ignoreFromRocks = taosArrayInit(numKeys, sizeof(bool))) == NULL) {
142,805✔
2171
          code = terrno;
×
2172
          tsdbLRUCacheRelease(pCache, h, false);
×
2173
          goto _exit;
×
2174
        }
2175
      }
2176
      if (taosArrayPush(remainCols, &(SIdxKey){i, key}) == NULL) {
669,166✔
2177
        code = terrno;
×
2178
        tsdbLRUCacheRelease(pCache, h, false);
×
2179
        goto _exit;
×
2180
      }
2181
      bool ignoreRocks = pLastCol ? (pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) : false;
334,583✔
2182
      if (taosArrayPush(ignoreFromRocks, &ignoreRocks) == NULL) {
334,583✔
2183
        code = terrno;
×
2184
        tsdbLRUCacheRelease(pCache, h, false);
×
2185
        goto _exit;
×
2186
      }
2187
    }
2188

2189
    if (h) {
6,068,754✔
2190
      tsdbLRUCacheRelease(pCache, h, false);
5,734,785✔
2191
    }
2192
  }
2193

2194
  if (remainCols && TARRAY_SIZE(remainCols) > 0) {
2,472,108✔
2195
    (void)taosThreadMutexLock(&pTsdb->lruMutex);
142,805✔
2196

2197
    for (int i = 0; i < TARRAY_SIZE(remainCols);) {
477,388✔
2198
      SIdxKey   *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
334,583✔
2199
      LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
334,583✔
2200
      SLastCol  *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL;
334,583✔
2201
      if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
334,583✔
2202
        SLastCol lastCol = *pLastCol;
×
2203
        code = tsdbCacheReallocSLastCol(&lastCol, NULL);
×
2204
        if (code) {
×
2205
          tsdbLRUCacheRelease(pCache, h, false);
×
2206
          (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
×
2207
          TAOS_RETURN(code);
×
2208
        }
2209

2210
        tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from lru(2nd lookup), col_id:%d col_flag:%d ts:%" PRId64,
×
2211
                  TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2212
                  lastCol.colVal.flag, lastCol.rowKey.ts);
2213

2214
        taosArraySet(pLastArray, idxKey->idx, &lastCol);
×
2215

2216
        taosArrayRemove(remainCols, i);
×
2217
        taosArrayRemove(ignoreFromRocks, i);
×
2218
      } else {
2219
        // no cache or cache is invalid
2220
        ++i;
334,583✔
2221
      }
2222
      if (h) {
334,583✔
2223
        tsdbLRUCacheRelease(pCache, h, false);
614✔
2224
      }
2225
    }
2226

2227
    // tsdbTrace("tsdb/cache: vgId: %d, load %" PRId64 " from rocks", TD_VID(pTsdb->pVnode), uid);
2228
    code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, ignoreFromRocks, pr, ltype);
142,805✔
2229

2230
    (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
142,805✔
2231
  }
2232

2233
_exit:
2,329,303✔
2234
  if (remainCols) {
2,472,108✔
2235
    taosArrayDestroy(remainCols);
142,805✔
2236
  }
2237
  if (ignoreFromRocks) {
2,472,108✔
2238
    taosArrayDestroy(ignoreFromRocks);
142,805✔
2239
  }
2240

2241
  TAOS_RETURN(code);
2,472,108✔
2242
}
2243

2244
typedef enum SMEMNEXTROWSTATES {
2245
  SMEMNEXTROW_ENTER,
2246
  SMEMNEXTROW_NEXT,
2247
} SMEMNEXTROWSTATES;
2248

2249
typedef struct SMemNextRowIter {
2250
  SMEMNEXTROWSTATES state;
2251
  STbData          *pMem;  // [input]
2252
  STbDataIter       iter;  // mem buffer skip list iterator
2253
  int64_t           lastTs;
2254
} SMemNextRowIter;
2255

2256
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2,515,306✔
2257
                                 int nCols) {
2258
  SMemNextRowIter *state = (SMemNextRowIter *)iter;
2,515,306✔
2259
  int32_t          code = 0;
2,515,306✔
2260
  *pIgnoreEarlierTs = false;
2,515,306✔
2261
  switch (state->state) {
2,515,793✔
2262
    case SMEMNEXTROW_ENTER: {
2,307,761✔
2263
      if (state->pMem != NULL) {
2,307,761✔
2264
        /*
2265
        if (state->pMem->maxKey <= state->lastTs) {
2266
          *ppRow = NULL;
2267
          *pIgnoreEarlierTs = true;
2268

2269
          TAOS_RETURN(code);
2270
        }
2271
        */
2272
        tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
2,307,761✔
2273

2274
        TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
2,307,761✔
2275
        if (pMemRow) {
2,307,274✔
2276
          *ppRow = pMemRow;
2,307,274✔
2277
          state->state = SMEMNEXTROW_NEXT;
2,307,274✔
2278

2279
          TAOS_RETURN(code);
2,307,274✔
2280
        }
2281
      }
2282

2283
      *ppRow = NULL;
×
2284

2285
      TAOS_RETURN(code);
×
2286
    }
2287
    case SMEMNEXTROW_NEXT:
208,032✔
2288
      if (tsdbTbDataIterNext(&state->iter)) {
208,032✔
2289
        *ppRow = tsdbTbDataIterGet(&state->iter);
289,612✔
2290

2291
        TAOS_RETURN(code);
144,806✔
2292
      } else {
2293
        *ppRow = NULL;
63,226✔
2294

2295
        TAOS_RETURN(code);
63,226✔
2296
      }
2297
    default:
×
2298
      break;
×
2299
  }
2300

2301
_err:
×
2302
  *ppRow = NULL;
×
2303

2304
  TAOS_RETURN(code);
×
2305
}
2306

2307
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
2308
                                  int nCols);
2309
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
2310

2311
typedef struct {
2312
  TSDBROW             *pRow;
2313
  bool                 stop;
2314
  bool                 next;
2315
  bool                 ignoreEarlierTs;
2316
  void                *iter;
2317
  _next_row_fn_t       nextRowFn;
2318
  _next_row_clear_fn_t nextRowClearFn;
2319
} TsdbNextRowState;
2320

2321
typedef struct {
2322
  SArray           *pMemDelData;
2323
  SArray           *pSkyline;
2324
  int64_t           iSkyline;
2325
  SBlockIdx         idx;
2326
  SMemNextRowIter   memState;
2327
  SMemNextRowIter   imemState;
2328
  TSDBROW           memRow, imemRow;
2329
  TsdbNextRowState  input[2];
2330
  SCacheRowsReader *pr;
2331
  STsdb            *pTsdb;
2332
} MemNextRowIter;
2333

2334
static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
2,472,108✔
2335
                              STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
2336
  int32_t code = 0, lino = 0;
2,472,108✔
2337

2338
  STbData *pMem = NULL;
2,472,108✔
2339
  if (pReadSnap->pMem) {
2,472,108✔
2340
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
2,472,108✔
2341
  }
2342

2343
  STbData *pIMem = NULL;
2,472,108✔
2344
  if (pReadSnap->pIMem) {
2,472,108✔
2345
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
6,480✔
2346
  }
2347

2348
  pIter->pTsdb = pTsdb;
2,472,108✔
2349

2350
  pIter->pMemDelData = NULL;
2,472,108✔
2351

2352
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
2,472,108✔
2353

2354
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
2,472,108✔
2355

2356
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
2,472,108✔
2357
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
2,471,637✔
2358

2359
  if (pMem) {
2,472,108✔
2360
    pIter->memState.pMem = pMem;
2,193,015✔
2361
    pIter->memState.state = SMEMNEXTROW_ENTER;
2,193,015✔
2362
    pIter->input[0].stop = false;
2,193,015✔
2363
    pIter->input[0].next = true;
2,193,015✔
2364
  }
2365

2366
  if (pIMem) {
2,472,108✔
2367
    pIter->imemState.pMem = pIMem;
6,480✔
2368
    pIter->imemState.state = SMEMNEXTROW_ENTER;
6,480✔
2369
    pIter->input[1].stop = false;
6,480✔
2370
    pIter->input[1].next = true;
6,480✔
2371
  }
2372

2373
  pIter->pr = pr;
2,472,108✔
2374

2375
_exit:
2,472,108✔
2376
  if (code) {
2,472,108✔
2377
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2378
  }
2379

2380
  TAOS_RETURN(code);
2,472,108✔
2381
}
2382

2383
static void memRowIterClose(MemNextRowIter *pIter) {
2,471,283✔
2384
  for (int i = 0; i < 2; ++i) {
7,413,947✔
2385
    if (pIter->input[i].nextRowClearFn) {
4,942,664✔
2386
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
×
2387
    }
2388
  }
2389

2390
  if (pIter->pSkyline) {
2,471,283✔
2391
    taosArrayDestroy(pIter->pSkyline);
2,193,015✔
2392
  }
2393

2394
  if (pIter->pMemDelData) {
2,471,283✔
2395
    taosArrayDestroy(pIter->pMemDelData);
2,472,108✔
2396
  }
2397
}
2,471,283✔
2398

2399
static void freeTableInfoFunc(void *param) {
2,196,449✔
2400
  void **p = (void **)param;
2,196,449✔
2401
  taosMemoryFreeClear(*p);
2,196,449✔
2402
}
2,196,449✔
2403

2404
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
2,320,913✔
2405
  if (!pReader->pTableMap) {
2,320,913✔
2406
    pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,462,338✔
2407
    if (!pReader->pTableMap) {
1,462,338✔
2408
      return NULL;
×
2409
    }
2410

2411
    tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
1,462,338✔
2412
  }
2413

2414
  STableLoadInfo  *pInfo = NULL;
2,320,913✔
2415
  STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
2,320,913✔
2416
  if (!ppInfo) {
2,320,913✔
2417
    pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
2,196,449✔
2418
    if (pInfo) {
2,196,449✔
2419
      if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) {
2,196,449✔
2420
        return NULL;
×
2421
      }
2422
    }
2423

2424
    return pInfo;
2,196,449✔
2425
  }
2426

2427
  return *ppInfo;
124,464✔
2428
}
2429

2430
static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) {
2,668,536✔
2431
  int32_t code = 0, lino = 0;
2,668,536✔
2432

2433
  for (;;) {
5,760✔
2434
    for (int i = 0; i < 2; ++i) {
8,021,434✔
2435
      if (pIter->input[i].next && !pIter->input[i].stop) {
5,347,865✔
2436
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
2,401,683✔
2437
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
2438
                        &lino, _exit);
2439

2440
        if (pIter->input[i].pRow == NULL) {
2,401,683✔
2441
          pIter->input[i].stop = true;
59,817✔
2442
          pIter->input[i].next = false;
59,817✔
2443
        }
2444
      }
2445
    }
2446

2447
    if (pIter->input[0].stop && pIter->input[1].stop) {
2,673,569✔
2448
      return NULL;
338,910✔
2449
    }
2450

2451
    TSDBROW *max[2] = {0};
2,334,659✔
2452
    int      iMax[2] = {-1, -1};
2,334,659✔
2453
    int      nMax = 0;
2,335,386✔
2454
    SRowKey  maxKey = {.ts = TSKEY_MIN};
2,335,386✔
2455

2456
    for (int i = 0; i < 2; ++i) {
7,004,704✔
2457
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
4,670,045✔
2458
        STsdbRowKey tsdbRowKey = {0};
2,341,139✔
2459
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
2,341,139✔
2460

2461
        // merging & deduplicating on client side
2462
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
2,341,866✔
2463
        if (c <= 0) {
2,341,866✔
2464
          if (c < 0) {
2,335,386✔
2465
            nMax = 0;
2,335,386✔
2466
            maxKey = tsdbRowKey.key;
2,335,386✔
2467
          }
2468

2469
          iMax[nMax] = i;
2,335,386✔
2470
          max[nMax++] = pIter->input[i].pRow;
2,335,386✔
2471
        }
2472
        pIter->input[i].next = false;
2,341,139✔
2473
      }
2474
    }
2475

2476
    TSDBROW *merge[2] = {0};
2,334,659✔
2477
    int      iMerge[2] = {-1, -1};
2,334,659✔
2478
    int      nMerge = 0;
2,335,386✔
2479
    for (int i = 0; i < nMax; ++i) {
4,670,772✔
2480
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
2,335,386✔
2481

2482
      if (!pIter->pSkyline) {
2,334,659✔
2483
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
2,193,015✔
2484
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno);
2,193,015✔
2485

2486
        uint64_t        uid = pIter->idx.uid;
2,193,015✔
2487
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
2,193,015✔
2488
        TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
2,193,015✔
2489

2490
        if (pInfo->pTombData == NULL) {
2,193,015✔
2491
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
2,084,749✔
2492
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno);
2,084,749✔
2493
        }
2494

2495
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
2,193,015✔
2496
          TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
2497
        }
2498

2499
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
2,193,015✔
2500
        if (delSize > 0) {
2,193,015✔
2501
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
12,514✔
2502
          TAOS_CHECK_GOTO(code, &lino, _exit);
12,514✔
2503
        }
2504
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
2,193,015✔
2505
      }
2506

2507
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
2,335,386✔
2508
      if (!deleted) {
2,335,386✔
2509
        iMerge[nMerge] = iMax[i];
2,329,626✔
2510
        merge[nMerge++] = max[i];
2,328,899✔
2511
      }
2512

2513
      pIter->input[iMax[i]].next = deleted;
2,334,659✔
2514
    }
2515

2516
    if (nMerge > 0) {
2,335,386✔
2517
      pIter->input[iMerge[0]].next = true;
2,329,626✔
2518

2519
      return merge[0];
2,328,899✔
2520
    }
2521
  }
2522

2523
_exit:
×
2524
  if (code) {
×
2525
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
2526
  }
2527

2528
  return NULL;
×
2529
}
2530

2531
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
1,457,203✔
2532
  int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
1,457,203✔
2533
  *ppDst = taosMemoryMalloc(len);
1,457,203✔
2534
  if (NULL == *ppDst) {
1,457,203✔
2535
    TAOS_RETURN(terrno);
×
2536
  }
2537
  memcpy(*ppDst, pSrc, len);
1,457,203✔
2538

2539
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,457,203✔
2540
}
2541

2542
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
2,394,848✔
2543
  if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
2,394,848✔
2544
    TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema));
1,457,930✔
2545
  }
2546

2547
  if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
936,918✔
2548
    TAOS_RETURN(TSDB_CODE_SUCCESS);
934,970✔
2549
  }
2550

2551
  taosMemoryFreeClear(pReader->pCurrSchema);
1,948✔
2552
  TAOS_RETURN(
1,948✔
2553
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema));
2554
}
2555

2556
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
2,472,108✔
2557
                                        SArray *keyArray) {
2558
  int32_t        code = 0;
2,472,108✔
2559
  int32_t        lino = 0;
2,472,108✔
2560
  STSchema      *pTSchema = pr->pSchema;
2,472,108✔
2561
  SLRUCache     *pCache = pTsdb->lruCache;
2,472,108✔
2562
  SArray        *pCidList = pr->pCidList;
2,471,637✔
2563
  int            numKeys = TARRAY_SIZE(pCidList);
2,472,108✔
2564
  MemNextRowIter iter = {0};
2,472,108✔
2565
  SSHashObj     *iColHash = NULL;
2,472,108✔
2566
  STSDBRowIter   rowIter = {0};
2,472,108✔
2567

2568
  // 1, get from mem, imem filtered with delete info
2569
  TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr));
2,472,108✔
2570

2571
  TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0);
2,472,108✔
2572
  if (!pRow) {
2,472,108✔
2573
    goto _exit;
279,093✔
2574
  }
2575

2576
  int32_t sversion = TSDBROW_SVERSION(pRow);
2,193,015✔
2577
  if (sversion != -1) {
2,192,288✔
2578
    TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
2,192,288✔
2579

2580
    pTSchema = pr->pCurrSchema;
2,193,015✔
2581
  }
2582
  int32_t nCol = pTSchema->numOfCols;
2,193,015✔
2583

2584
  STsdbRowKey rowKey = {0};
2,193,015✔
2585
  tsdbRowGetKey(pRow, &rowKey);
2,193,015✔
2586

2587
  TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
2,193,015✔
2588

2589
  int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray);
2,193,015✔
2590
  for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) {
9,451,045✔
2591
    SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol];
7,259,228✔
2592
    if (pColVal->cid < pTargetCol->colVal.cid) {
7,259,228✔
2593
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
1,895,492✔
2594

2595
      continue;
1,895,492✔
2596
    }
2597
    if (pColVal->cid > pTargetCol->colVal.cid) {
5,363,009✔
2598
      break;
×
2599
    }
2600

2601
    int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key);
5,363,736✔
2602
    if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) {
5,363,009✔
2603
      if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
1,792,490✔
2604
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
1,792,490✔
2605
        TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
1,792,490✔
2606

2607
        tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from memtable, col_id:%d col_flag:%d ts:%" PRId64,
1,792,490✔
2608
                  TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2609
                  lastCol.colVal.flag, rowKey.key.ts);
2610

2611
        tsdbCacheFreeSLastColItem(pTargetCol);
1,792,490✔
2612
        taosArraySet(pLastArray, jCol, &lastCol);
1,792,490✔
2613
      }
2614
    } else {
2615
      if (COL_VAL_IS_VALUE(pColVal)) {
3,570,519✔
2616
        if (cmp_res <= 0) {
3,375,068✔
2617
          SLastCol lastCol = {
3,375,068✔
2618
              .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2619
          TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
3,375,068✔
2620

2621
          tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64
3,375,068✔
2622
                    " from memtable(last) and memtable(newer), col_id:%d col_flag:%d ts:%" PRId64,
2623
                    TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2624
                    lastCol.colVal.flag, rowKey.key.ts);
2625

2626
          tsdbCacheFreeSLastColItem(pTargetCol);
3,375,068✔
2627
          taosArraySet(pLastArray, jCol, &lastCol);
3,375,068✔
2628
        }
2629
      } else {
2630
        if (!iColHash) {
196,178✔
2631
          iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
105,296✔
2632
          if (iColHash == NULL) {
105,296✔
2633
            TAOS_CHECK_EXIT(terrno);
×
2634
          }
2635
        }
2636

2637
        if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) {
196,178✔
2638
          TAOS_CHECK_EXIT(terrno);
×
2639
        }
2640
      }
2641
    }
2642

2643
    ++jCol;
5,363,736✔
2644

2645
    if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) {
5,363,736✔
2646
      pColVal = tsdbRowIterNext(&rowIter), ++iCol;
3,169,994✔
2647
    }
2648
  }
2649
  tsdbRowClose(&rowIter);
2,193,015✔
2650

2651
  if (iColHash && tSimpleHashGetSize(iColHash) > 0) {
2,192,190✔
2652
    pRow = memRowIterGet(&iter, false, NULL, 0);
105,296✔
2653
    while (pRow) {
196,428✔
2654
      if (tSimpleHashGetSize(iColHash) == 0) {
136,611✔
2655
        break;
45,479✔
2656
      }
2657

2658
      sversion = TSDBROW_SVERSION(pRow);
91,132✔
2659
      if (sversion != -1) {
91,132✔
2660
        TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid));
91,132✔
2661

2662
        pTSchema = pr->pCurrSchema;
91,132✔
2663
      }
2664
      nCol = pTSchema->numOfCols;
91,132✔
2665

2666
      STsdbRowKey tsdbRowKey = {0};
91,132✔
2667
      tsdbRowGetKey(pRow, &tsdbRowKey);
91,132✔
2668

2669
      TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema));
91,132✔
2670

2671
      iCol = 0;
91,132✔
2672
      for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol;
597,768✔
2673
           pColVal = tsdbRowIterNext(&rowIter), iCol++) {
506,636✔
2674
        int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid));
506,636✔
2675
        if (pjCol && COL_VAL_IS_VALUE(pColVal)) {
506,636✔
2676
          SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol];
132,997✔
2677

2678
          int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key);
132,997✔
2679
          if (cmp_res <= 0) {
132,997✔
2680
            SLastCol lastCol = {
132,997✔
2681
                .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
2682
            TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL));
132,997✔
2683

2684
            tsdbDebug("vgId:%d, %s qid:%s uid:%" PRId64 " from memtable(hash), col_id:%d col_flag:%d ts:%" PRId64,
132,997✔
2685
                      TD_VID(pTsdb->pVnode), __func__, pr && pr->idstr ? pr->idstr : "null", uid, lastCol.colVal.cid,
2686
                      lastCol.colVal.flag, tsdbRowKey.key.ts);
2687

2688
            tsdbCacheFreeSLastColItem(pTargetCol);
132,997✔
2689
            taosArraySet(pLastArray, *pjCol, &lastCol);
132,997✔
2690
          }
2691

2692
          TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid)));
132,997✔
2693
        }
2694
      }
2695
      tsdbRowClose(&rowIter);
91,132✔
2696

2697
      pRow = memRowIterGet(&iter, false, NULL, 0);
91,132✔
2698
    }
2699
  }
2700

2701
_exit:
2,470,458✔
2702
  if (code) {
2,472,108✔
2703
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2704

2705
    tsdbRowClose(&rowIter);
×
2706
  }
2707

2708
  tSimpleHashCleanup(iColHash);
2,472,108✔
2709

2710
  memRowIterClose(&iter);
2,471,283✔
2711

2712
  TAOS_RETURN(code);
2,472,108✔
2713
}
2714

2715
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
2,472,108✔
2716
  int32_t code = 0;
2,472,108✔
2717
  int32_t lino = 0;
2,472,108✔
2718

2719
  tsdbDebug("vgId:%d, %s start, qid:%s uid:%" PRId64 " ltype:%d", TD_VID(pTsdb->pVnode), __func__,
2,472,108✔
2720
            pr && pr->idstr ? pr->idstr : "null", uid, ltype);
2721

2722
  SArray *keyArray = taosArrayInit(16, sizeof(SLastKey));
2,472,108✔
2723
  if (!keyArray) {
2,472,108✔
2724
    TAOS_CHECK_EXIT(terrno);
×
2725
  }
2726

2727
  TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray));
2,472,108✔
2728

2729
  if (tsUpdateCacheBatch) {
2,472,108✔
2730
    TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray));
2,472,108✔
2731
  }
2732

2733
_exit:
2,471,283✔
2734
  if (code) {
2,471,283✔
2735
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
2736
  }
2737

2738
  if (keyArray) {
2,471,283✔
2739
    taosArrayDestroy(keyArray);
2,471,283✔
2740
  }
2741

2742
  TAOS_RETURN(code);
2,472,108✔
2743
}
2744

2745
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
2,467,758✔
2746
  int32_t   code = 0, lino = 0;
2,467,758✔
2747
  STSchema *pTSchema = NULL;
2,467,758✔
2748
  int       sver = -1;
2,467,758✔
2749
  int       numKeys = 0;
2,467,758✔
2750
  SArray   *remainCols = NULL;
2,467,758✔
2751

2752
  TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
2,467,758✔
2753

2754
  int numCols = pTSchema->numOfCols;
2,467,758✔
2755

2756
  (void)taosThreadMutexLock(&pTsdb->lruMutex);
2,467,758✔
2757

2758
  for (int i = 0; i < numCols; ++i) {
11,065,914✔
2759
    int16_t cid = pTSchema->columns[i].colId;
8,598,156✔
2760
    for (int8_t lflag = LFLAG_LAST_ROW; lflag <= LFLAG_LAST; ++lflag) {
25,794,112✔
2761
      SLastKey   lastKey = {.lflag = lflag, .uid = uid, .cid = cid};
17,195,956✔
2762
      LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, &lastKey, ROCKS_KEY_LEN);
17,195,956✔
2763
      if (h) {
17,196,312✔
2764
        SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
86,448✔
2765
        if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
86,448✔
2766
          SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
2,456✔
2767
                              .colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
2,456✔
2768
                              .dirty = 1,
2769
                              .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2770
          code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
2,456✔
2771
        }
2772
        tsdbLRUCacheRelease(pTsdb->lruCache, h, false);
86,448✔
2773
        TAOS_CHECK_EXIT(code);
86,448✔
2774
      } else {
2775
        if (!remainCols) {
17,109,864✔
2776
          remainCols = taosArrayInit(numCols * 2, sizeof(SIdxKey));
2,456,796✔
2777
        }
2778
        if (!taosArrayPush(remainCols, &(SIdxKey){i, lastKey})) {
34,219,372✔
2779
          TAOS_CHECK_EXIT(terrno);
×
2780
        }
2781
      }
2782
    }
2783
  }
2784

2785
  if (remainCols) {
2,467,758✔
2786
    numKeys = TARRAY_SIZE(remainCols);
2,456,796✔
2787
  }
2788

2789
  char  **keys_list = taosMemoryCalloc(numKeys, sizeof(char *));
2,467,758✔
2790
  size_t *keys_list_sizes = taosMemoryCalloc(numKeys, sizeof(size_t));
2,467,758✔
2791
  char  **values_list = NULL;
2,467,758✔
2792
  size_t *values_list_sizes = NULL;
2,467,758✔
2793

2794
  if (!keys_list || !keys_list_sizes) {
2,467,758✔
2795
    code = terrno;
×
2796
    goto _exit;
×
2797
  }
2798
  const size_t klen = ROCKS_KEY_LEN;
2,467,758✔
2799

2800
  for (int i = 0; i < numKeys; ++i) {
19,577,622✔
2801
    char *key = taosMemoryCalloc(1, sizeof(SLastKey));
17,109,864✔
2802
    if (!key) {
17,109,864✔
2803
      code = terrno;
×
2804
      goto _exit;
×
2805
    }
2806
    SIdxKey *idxKey = taosArrayGet(remainCols, i);
17,109,864✔
2807

2808
    ((SLastKey *)key)[0] = idxKey->key;
17,109,864✔
2809

2810
    keys_list[i] = key;
17,109,864✔
2811
    keys_list_sizes[i] = klen;
17,109,864✔
2812
  }
2813

2814
  rocksMayWrite(pTsdb, true);  // flush writebatch cache
2,467,758✔
2815

2816
  TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
2,467,758✔
2817
                                              &values_list, &values_list_sizes),
2818
                  NULL, _exit);
2819

2820
  // rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
2821
  for (int i = 0; i < numKeys; ++i) {
19,577,266✔
2822
    SLastCol *pLastCol = NULL;
17,109,864✔
2823
    if (values_list[i] != NULL) {
17,109,152✔
2824
      code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
×
2825
      if (code != TSDB_CODE_SUCCESS) {
×
2826
        tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
2827
                  tstrerror(code));
2828
        goto _exit;
×
2829
      }
2830
    }
2831
    SIdxKey  *idxKey = taosArrayGet(remainCols, i);
17,109,864✔
2832
    SLastKey *pLastKey = &idxKey->key;
17,109,864✔
2833
    if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
17,109,864✔
2834
      SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
×
2835
                             .colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
×
2836
                             .dirty = 0,
2837
                             .cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
2838

2839
      if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
×
2840
        taosMemoryFreeClear(pLastCol);
×
2841
        tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2842
        goto _exit;
×
2843
      }
2844
      if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
×
2845
        taosMemoryFreeClear(pLastCol);
×
2846
        tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
×
2847
        goto _exit;
×
2848
      }
2849
    }
2850

2851
    if (pLastCol == NULL) {
17,109,864✔
2852
      tsdbDebug("tsdb/cache/del: vgId:%d, no cache found for uid:%d ,cid:%" PRId64 ", lflag:%d.", TD_VID(pTsdb->pVnode),
17,109,864✔
2853
                pLastKey->cid, pLastKey->uid, pLastKey->lflag);
2854
    }
2855

2856
    taosMemoryFreeClear(pLastCol);
17,109,864✔
2857
  }
2858

2859
  rocksMayWrite(pTsdb, false);
2,467,402✔
2860

2861
_exit:
2,467,758✔
2862
  (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
2,467,758✔
2863

2864
  for (int i = 0; i < numKeys; ++i) {
19,576,198✔
2865
    taosMemoryFree(keys_list[i]);
17,108,440✔
2866
  }
2867
  taosMemoryFree(keys_list);
2,467,758✔
2868
  taosMemoryFree(keys_list_sizes);
2,467,758✔
2869
  if (values_list) {
2,467,402✔
2870
#if USE_ROCKSDB
2871
    for (int i = 0; i < numKeys; ++i) {
19,576,554✔
2872
      rocksdb_free(values_list[i]);
17,109,152✔
2873
    }
2874
#endif
2875
    taosMemoryFree(values_list);
2,467,402✔
2876
  }
2877
  taosMemoryFree(values_list_sizes);
2,467,402✔
2878
  taosArrayDestroy(remainCols);
2,467,402✔
2879
  taosMemoryFree(pTSchema);
2,467,402✔
2880

2881
  TAOS_RETURN(code);
2,467,758✔
2882
}
2883

2884
int32_t tsdbOpenCache(STsdb *pTsdb) {
4,870,878✔
2885
  int32_t code = 0, lino = 0;
4,870,878✔
2886
  size_t  cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
4,872,970✔
2887
  int32_t numShardBits = pTsdb->pVnode->config.cacheLastShardBits;
4,872,970✔
2888

2889
  // Use configured shard bits, or -1 to auto-calculate based on cache size
2890
  // This enables multi-shard LRU cache for better concurrency
2891
  SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, numShardBits, .5);
4,872,970✔
2892
  if (pCache == NULL) {
4,873,068✔
2893
    TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
×
2894
  }
2895

2896
#ifdef USE_SHARED_STORAGE
2897
  if (tsSsEnabled) {
4,873,068✔
2898
    TAOS_CHECK_GOTO(tsdbOpenBCache(pTsdb), &lino, _err);
24,780✔
2899
    TAOS_CHECK_GOTO(tsdbOpenPgCache(pTsdb), &lino, _err);
24,780✔
2900
  }
2901
#endif
2902

2903
  TAOS_CHECK_GOTO(tsdbOpenRocksCache(pTsdb), &lino, _err);
4,873,068✔
2904

2905
  taosLRUCacheSetStrictCapacity(pCache, false);
4,872,958✔
2906

2907
  (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
4,873,068✔
2908

2909
  pTsdb->lruCache = pCache;
4,870,811✔
2910

2911
  tsdbInfo("vgId:%d, lruCache opened with capacity:%zu bytes, numShards:%d (configured:%d)",
4,871,357✔
2912
           TD_VID(pTsdb->pVnode), cfgCapacity, taosLRUCacheGetNumShards(pCache), numShardBits);
2913
           
2914
  TAOS_RETURN(0);
4,872,838✔
2915

2916
_err:
×
2917
  if (code) {
×
2918
    tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
×
2919
    if (pCache) {
×
2920
      taosLRUCacheCleanup(pCache);
×
2921
      pCache = NULL;
×
2922
    }
2923
  }
2924

2925
  pTsdb->lruCache = pCache;
×
2926
  TAOS_RETURN(code);
×
2927
}
2928

2929
void tsdbCloseCache(STsdb *pTsdb) {
4,872,389✔
2930
  SLRUCache *pCache = pTsdb->lruCache;
4,872,389✔
2931
  if (pCache) {
4,872,848✔
2932
    taosLRUCacheEraseUnrefEntries(pCache);
4,872,304✔
2933

2934
    taosLRUCacheCleanup(pCache);
4,873,068✔
2935

2936
    (void)taosThreadMutexDestroy(&pTsdb->lruMutex);
4,873,068✔
2937
  }
2938

2939
#ifdef USE_SHARED_STORAGE
2940
  if (tsSsEnabled) {
4,873,068✔
2941
    tsdbCloseBCache(pTsdb);
24,780✔
2942
    tsdbClosePgCache(pTsdb);
24,780✔
2943
  }
2944
#endif
2945

2946
  tsdbCloseRocksCache(pTsdb);
4,873,068✔
2947
}
4,873,068✔
2948

2949
// Rebuild only the last cache (lruCache) with a new shard count.
2950
// Must be called with pTsdb->lruMutex held by the caller.
2951
int32_t tsdbRebuildLastCache(STsdb *pTsdb, int32_t numShardBits) {
1,650✔
2952
  size_t     cfgCapacity = (size_t)pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
1,650✔
2953
  SLRUCache *pNewCache = taosLRUCacheInit(cfgCapacity, numShardBits, .5);
1,650✔
2954
  if (pNewCache == NULL) {
1,650✔
2955
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
2956
  }
2957
  taosLRUCacheSetStrictCapacity(pNewCache, false);
1,650✔
2958

2959
  // Swap in the new cache and clean up the old one
2960
  SLRUCache *pOldCache = pTsdb->lruCache;
1,650✔
2961
  pTsdb->lruCache = pNewCache;
1,650✔
2962

2963
  if (pOldCache) {
1,650✔
2964
    int64_t start = taosGetTimestampMs();
1,650✔
2965
    taosLRUCacheEraseUnrefEntries(pOldCache);
1,650✔
2966
    taosLRUCacheCleanup(pOldCache);
1,650✔
2967
    int64_t end = taosGetTimestampMs();
1,650✔
2968
    tsdbInfo("vgId:%d, lruCache erase unref entries and cleanup time:%" PRId64 " ms", TD_VID(pTsdb->pVnode),
1,650✔
2969
             end - start);
2970
  }
2971

2972
  tsdbInfo("vgId:%d, lruCache rebuilt with capacity:%zu bytes, numShards:%d (configured:%d)", TD_VID(pTsdb->pVnode),
1,650✔
2973
           cfgCapacity, taosLRUCacheGetNumShards(pNewCache), numShardBits);
2974

2975
  TAOS_RETURN(0);
1,650✔
2976
}
2977

2978
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
×
2979
  if (cacheType == 0) {  // last_row
×
2980
    *(uint64_t *)key = (uint64_t)uid;
×
2981
  } else {  // last
2982
    *(uint64_t *)key = ((uint64_t)uid) | 0x8000000000000000;
×
2983
  }
2984

2985
  *len = sizeof(uint64_t);
×
2986
}
×
2987

2988
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
×
2989
  tb_uid_t suid = 0;
×
2990

2991
  SMetaReader mr = {0};
×
2992
  metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, META_READER_LOCK);
×
2993
  if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
×
2994
    metaReaderClear(&mr);  // table not esist
×
2995
    return 0;
×
2996
  }
2997

2998
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
2999
    suid = mr.me.ctbEntry.suid;
×
3000
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
3001
    suid = 0;
×
3002
  } else {
3003
    suid = 0;
×
3004
  }
3005

3006
  metaReaderClear(&mr);
×
3007

3008
  return suid;
×
3009
}
3010

3011
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
×
3012
  int32_t code = 0;
×
3013

3014
  if (pDelIdx) {
×
3015
    code = tsdbReadDelDatav1(pDelReader, pDelIdx, aDelData, INT64_MAX);
×
3016
  }
3017

3018
  TAOS_RETURN(code);
×
3019
}
3020

3021
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
×
3022
  int32_t   code = 0;
×
3023
  SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
×
3024

3025
  for (; pDelData; pDelData = pDelData->pNext) {
×
3026
    if (!taosArrayPush(aDelData, pDelData)) {
×
3027
      TAOS_RETURN(terrno);
×
3028
    }
3029
  }
3030

3031
  TAOS_RETURN(code);
×
3032
}
3033

3034
static uint64_t *getUidList(SCacheRowsReader *pReader) {
16,938✔
3035
  if (!pReader->uidList) {
16,938✔
3036
    int32_t numOfTables = pReader->numOfTables;
4,048✔
3037

3038
    pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
4,048✔
3039
    if (!pReader->uidList) {
4,048✔
3040
      return NULL;
×
3041
    }
3042

3043
    for (int32_t i = 0; i < numOfTables; ++i) {
16,938✔
3044
      uint64_t uid = pReader->pTableList[i].uid;
12,890✔
3045
      pReader->uidList[i] = uid;
12,890✔
3046
    }
3047

3048
    taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
4,048✔
3049
  }
3050

3051
  return pReader->uidList;
16,938✔
3052
}
3053

3054
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
16,938✔
3055
                               bool isFile) {
3056
  int32_t   code = 0;
16,938✔
3057
  int32_t   numOfTables = pReader->numOfTables;
16,938✔
3058
  int64_t   suid = pReader->info.suid;
16,938✔
3059
  uint64_t *uidList = getUidList(pReader);
16,938✔
3060

3061
  if (!uidList) {
16,938✔
3062
    TAOS_RETURN(terrno);
×
3063
  }
3064

3065
  for (int i = 0, j = 0; i < pTombBlkArray->size && j < numOfTables; ++i) {
24,730✔
3066
    STombBlk *pTombBlk = &pTombBlkArray->data[i];
7,792✔
3067
    if (pTombBlk->maxTbid.suid < suid || (pTombBlk->maxTbid.suid == suid && pTombBlk->maxTbid.uid < uidList[0])) {
7,792✔
3068
      continue;
×
3069
    }
3070

3071
    if (pTombBlk->minTbid.suid > suid ||
7,792✔
3072
        (pTombBlk->minTbid.suid == suid && pTombBlk->minTbid.uid > uidList[numOfTables - 1])) {
7,792✔
3073
      break;
3074
    }
3075

3076
    STombBlock block = {0};
7,792✔
3077
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
7,792✔
3078
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
7,792✔
3079
    if (code != TSDB_CODE_SUCCESS) {
7,792✔
3080
      TAOS_RETURN(code);
×
3081
    }
3082

3083
    uint64_t        uid = uidList[j];
7,792✔
3084
    STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
7,792✔
3085
    if (!pInfo) {
7,792✔
3086
      tTombBlockDestroy(&block);
×
3087
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
3088
    }
3089

3090
    if (pInfo->pTombData == NULL) {
7,792✔
3091
      pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
974✔
3092
    }
3093

3094
    STombRecord record = {0};
7,792✔
3095
    bool        finished = false;
7,792✔
3096
    for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
15,584✔
3097
      code = tTombBlockGet(&block, k, &record);
7,792✔
3098
      if (code != TSDB_CODE_SUCCESS) {
7,792✔
3099
        finished = true;
×
3100
        break;
×
3101
      }
3102

3103
      if (record.suid < suid) {
7,792✔
3104
        continue;
×
3105
      }
3106
      if (record.suid > suid) {
7,792✔
3107
        finished = true;
×
3108
        break;
×
3109
      }
3110

3111
      bool newTable = false;
7,792✔
3112
      if (uid < record.uid) {
7,792✔
3113
        while (j < numOfTables && uidList[j] < record.uid) {
46,752✔
3114
          ++j;
38,960✔
3115
          newTable = true;
38,960✔
3116
        }
3117

3118
        if (j >= numOfTables) {
7,792✔
3119
          finished = true;
×
3120
          break;
×
3121
        }
3122

3123
        uid = uidList[j];
7,792✔
3124
      }
3125

3126
      if (record.uid < uid) {
7,792✔
3127
        continue;
×
3128
      }
3129

3130
      if (newTable) {
7,792✔
3131
        pInfo = getTableLoadInfo(pReader, uid);
7,792✔
3132
        if (!pInfo) {
7,792✔
3133
          code = TSDB_CODE_OUT_OF_MEMORY;
×
3134
          finished = true;
×
3135
          break;
×
3136
        }
3137
        if (pInfo->pTombData == NULL) {
7,792✔
3138
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
974✔
3139
          if (!pInfo->pTombData) {
974✔
3140
            code = terrno;
×
3141
            finished = true;
×
3142
            break;
×
3143
          }
3144
        }
3145
      }
3146

3147
      if (record.version <= pReader->info.verRange.maxVer) {
7,792✔
3148
        /*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
3149
          TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
3150

3151
        SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
7,792✔
3152
        if (!taosArrayPush(pInfo->pTombData, &delData)) {
15,584✔
3153
          TAOS_RETURN(terrno);
×
3154
        }
3155
      }
3156
    }
3157

3158
    tTombBlockDestroy(&block);
7,792✔
3159

3160
    if (finished) {
7,792✔
3161
      TAOS_RETURN(code);
×
3162
    }
3163
  }
3164

3165
  TAOS_RETURN(TSDB_CODE_SUCCESS);
16,938✔
3166
}
3167

3168
static int32_t loadDataTomb(SCacheRowsReader *pReader, SDataFileReader *pFileReader) {
7,792✔
3169
  const TTombBlkArray *pBlkArray = NULL;
7,792✔
3170

3171
  TAOS_CHECK_RETURN(tsdbDataFileReadTombBlk(pFileReader, &pBlkArray));
7,792✔
3172

3173
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pFileReader, true));
7,792✔
3174
}
3175

3176
static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo) {
9,146✔
3177
  SCacheRowsReader    *pReader = (SCacheRowsReader *)pTsdbReader;
9,146✔
3178
  const TTombBlkArray *pBlkArray = NULL;
9,146✔
3179

3180
  TAOS_CHECK_RETURN(tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray));
9,146✔
3181

3182
  TAOS_RETURN(loadTombFromBlk(pBlkArray, pReader, pSttFileReader, false));
9,146✔
3183
}
3184

3185
typedef struct {
3186
  SMergeTree  mergeTree;
3187
  SMergeTree *pMergeTree;
3188
} SFSLastIter;
3189

3190
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
16,938✔
3191
                            tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
3192
  int32_t code = 0;
16,938✔
3193
  destroySttBlockReader(pr->pLDataIterArray, NULL);
16,938✔
3194
  pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
16,938✔
3195
  if (pr->pLDataIterArray == NULL) return terrno;
16,938✔
3196

3197
  SMergeTreeConf conf = {
16,938✔
3198
      .uid = uid,
3199
      .suid = suid,
3200
      .pTsdb = pTsdb,
3201
      .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
3202
      .verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX},
3203
      .strictTimeRange = false,
3204
      .cacheStatis = false,
3205
      .pSchema = pTSchema,
3206
      .pCurrentFileset = pFileSet,
3207
      .backward = 1,
3208
      .pSttFileBlockIterArray = pr->pLDataIterArray,
16,938✔
3209
      .pCols = aCols,
3210
      .numOfCols = nCols,
3211
      .loadTombFn = loadSttTomb,
3212
      .pReader = pr,
3213
      .idstr = pr->idstr,
16,938✔
3214
      .pCurRowKey = &pr->rowKey,
16,938✔
3215
  };
3216

3217
  TAOS_CHECK_RETURN(tMergeTreeOpen2(&iter->mergeTree, &conf, NULL));
16,938✔
3218

3219
  iter->pMergeTree = &iter->mergeTree;
16,938✔
3220

3221
  TAOS_RETURN(code);
16,938✔
3222
}
3223

3224
static int32_t lastIterClose(SFSLastIter **iter) {
974✔
3225
  int32_t code = 0;
974✔
3226

3227
  if ((*iter)->pMergeTree) {
974✔
3228
    tMergeTreeClose((*iter)->pMergeTree);
974✔
3229
    (*iter)->pMergeTree = NULL;
974✔
3230
  }
3231

3232
  *iter = NULL;
974✔
3233

3234
  TAOS_RETURN(code);
974✔
3235
}
3236

3237
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
18,997✔
3238
  bool hasVal = false;
18,997✔
3239
  *ppRow = NULL;
18,997✔
3240

3241
  int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
18,997✔
3242
  if (code != 0) {
18,997✔
3243
    return code;
×
3244
  }
3245

3246
  if (!hasVal) {
18,997✔
3247
    *ppRow = NULL;
15,452✔
3248
    TAOS_RETURN(code);
15,452✔
3249
  }
3250

3251
  *ppRow = tMergeTreeGetRow(iter->pMergeTree);
3,545✔
3252
  TAOS_RETURN(code);
3,545✔
3253
}
3254

3255
typedef enum SFSNEXTROWSTATES {
3256
  SFSNEXTROW_FS,
3257
  SFSNEXTROW_FILESET,
3258
  SFSNEXTROW_INDEXLIST,
3259
  SFSNEXTROW_BRINBLOCK,
3260
  SFSNEXTROW_BRINRECORD,
3261
  SFSNEXTROW_BLOCKDATA,
3262
  SFSNEXTROW_BLOCKROW,
3263
  SFSNEXTROW_NEXTSTTROW
3264
} SFSNEXTROWSTATES;
3265

3266
struct CacheNextRowIter;
3267

3268
typedef struct SFSNextRowIter {
3269
  SFSNEXTROWSTATES         state;         // [input]
3270
  SBlockIdx               *pBlockIdxExp;  // [input]
3271
  STSchema                *pTSchema;      // [input]
3272
  tb_uid_t                 suid;
3273
  tb_uid_t                 uid;
3274
  int32_t                  iFileSet;
3275
  STFileSet               *pFileSet;
3276
  TFileSetArray           *aDFileSet;
3277
  SArray                  *pIndexList;
3278
  int32_t                  iBrinIndex;
3279
  SBrinBlock               brinBlock;
3280
  SBrinBlock              *pBrinBlock;
3281
  int32_t                  iBrinRecord;
3282
  SBrinRecord              brinRecord;
3283
  SBlockData               blockData;
3284
  SBlockData              *pBlockData;
3285
  int32_t                  nRow;
3286
  int32_t                  iRow;
3287
  TSDBROW                  row;
3288
  int64_t                  lastTs;
3289
  SFSLastIter              lastIter;
3290
  SFSLastIter             *pLastIter;
3291
  int8_t                   lastEmpty;
3292
  TSDBROW                 *pLastRow;
3293
  SRow                    *pTSRow;
3294
  SRowMerger               rowMerger;
3295
  SCacheRowsReader        *pr;
3296
  struct CacheNextRowIter *pRowIter;
3297
} SFSNextRowIter;
3298

3299
static void clearLastFileSet(SFSNextRowIter *state);
3300

3301
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
116,725✔
3302
                                int nCols) {
3303
  int32_t         code = 0, lino = 0;
116,725✔
3304
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
116,725✔
3305
  STsdb          *pTsdb = state->pr->pTsdb;
116,725✔
3306

3307
  if (SFSNEXTROW_FS == state->state) {
116,254✔
3308
    state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
115,640✔
3309

3310
    state->state = SFSNEXTROW_FILESET;
115,169✔
3311
  }
3312

3313
  if (SFSNEXTROW_FILESET == state->state) {
116,725✔
3314
  _next_fileset:
129,144✔
3315
    clearLastFileSet(state);
129,144✔
3316

3317
    if (--state->iFileSet < 0) {
129,144✔
3318
      *ppRow = NULL;
112,206✔
3319

3320
      TAOS_RETURN(code);
112,206✔
3321
    } else {
3322
      state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
16,938✔
3323
    }
3324

3325
    STFileObj **pFileObj = state->pFileSet->farr;
16,938✔
3326
    if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
16,938✔
3327
      if (state->pFileSet != state->pr->pCurFileSet) {
7,792✔
3328
        SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
7,792✔
3329
        const char           *filesName[4] = {0};
7,792✔
3330
        if (pFileObj[0] != NULL) {
7,792✔
3331
          conf.files[0].file = *pFileObj[0]->f;
7,792✔
3332
          conf.files[0].exist = true;
7,792✔
3333
          filesName[0] = pFileObj[0]->fname;
7,792✔
3334

3335
          conf.files[1].file = *pFileObj[1]->f;
7,792✔
3336
          conf.files[1].exist = true;
7,792✔
3337
          filesName[1] = pFileObj[1]->fname;
7,792✔
3338

3339
          conf.files[2].file = *pFileObj[2]->f;
7,792✔
3340
          conf.files[2].exist = true;
7,792✔
3341
          filesName[2] = pFileObj[2]->fname;
7,792✔
3342
        }
3343

3344
        if (pFileObj[3] != NULL) {
7,792✔
3345
          conf.files[3].exist = true;
7,792✔
3346
          conf.files[3].file = *pFileObj[3]->f;
7,792✔
3347
          filesName[3] = pFileObj[3]->fname;
7,792✔
3348
        }
3349

3350
        TAOS_CHECK_GOTO(tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader), &lino, _err);
7,792✔
3351

3352
        state->pr->pCurFileSet = state->pFileSet;
7,792✔
3353

3354
        code = loadDataTomb(state->pr, state->pr->pFileReader);
7,792✔
3355
        if (code != TSDB_CODE_SUCCESS) {
7,792✔
3356
          tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3357
                    tstrerror(code));
3358
          TAOS_CHECK_GOTO(code, &lino, _err);
×
3359
        }
3360

3361
        TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
7,792✔
3362
      }
3363

3364
      if (!state->pIndexList) {
7,792✔
3365
        state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
7,792✔
3366
        if (!state->pIndexList) {
7,792✔
3367
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3368
        }
3369
      } else {
3370
        taosArrayClear(state->pIndexList);
×
3371
      }
3372

3373
      const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
7,792✔
3374

3375
      for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
15,584✔
3376
        SBrinBlk *pBrinBlk = &pBlkArray->data[i];
7,792✔
3377
        if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
7,792✔
3378
          if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
7,792✔
3379
            if (!taosArrayPush(state->pIndexList, pBrinBlk)) {
1,948✔
3380
              TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3381
            }
3382
          }
3383
        } else if (state->suid > pBrinBlk->maxTbid.suid ||
×
3384
                   (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
×
3385
          break;
3386
        }
3387
      }
3388

3389
      int indexSize = TARRAY_SIZE(state->pIndexList);
7,792✔
3390
      if (indexSize <= 0) {
7,792✔
3391
        goto _check_stt_data;
6,818✔
3392
      }
3393

3394
      state->state = SFSNEXTROW_INDEXLIST;
974✔
3395
      state->iBrinIndex = 1;
974✔
3396
    }
3397

3398
  _check_stt_data:
16,938✔
3399
    if (state->pFileSet != state->pr->pCurFileSet) {
16,938✔
3400
      state->pr->pCurFileSet = state->pFileSet;
8,134✔
3401
    }
3402

3403
    TAOS_CHECK_GOTO(lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid,
16,938✔
3404
                                 state->pr, state->lastTs, aCols, nCols),
3405
                    &lino, _err);
3406

3407
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
16,938✔
3408

3409
    if (!state->pLastRow) {
16,938✔
3410
      state->lastEmpty = 1;
13,864✔
3411

3412
      if (SFSNEXTROW_INDEXLIST != state->state) {
13,864✔
3413
        clearLastFileSet(state);
12,890✔
3414
        goto _next_fileset;
12,890✔
3415
      }
3416
    } else {
3417
      state->lastEmpty = 0;
3,074✔
3418

3419
      if (SFSNEXTROW_INDEXLIST != state->state) {
3,074✔
3420
        state->state = SFSNEXTROW_NEXTSTTROW;
3,074✔
3421

3422
        *ppRow = state->pLastRow;
3,074✔
3423
        state->pLastRow = NULL;
3,074✔
3424

3425
        TAOS_RETURN(code);
3,074✔
3426
      }
3427
    }
3428

3429
    state->pLastIter = &state->lastIter;
974✔
3430
  }
3431

3432
  if (SFSNEXTROW_NEXTSTTROW == state->state) {
2,059✔
3433
    TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
1,085✔
3434

3435
    if (!state->pLastRow) {
1,085✔
3436
      if (state->pLastIter) {
614✔
3437
        code = lastIterClose(&state->pLastIter);
×
3438
        if (code != TSDB_CODE_SUCCESS) {
×
3439
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3440
                    tstrerror(code));
3441
          TAOS_RETURN(code);
×
3442
        }
3443
      }
3444

3445
      clearLastFileSet(state);
614✔
3446
      state->state = SFSNEXTROW_FILESET;
614✔
3447
      goto _next_fileset;
614✔
3448
    } else {
3449
      *ppRow = state->pLastRow;
471✔
3450
      state->pLastRow = NULL;
471✔
3451

3452
      TAOS_RETURN(code);
471✔
3453
    }
3454
  }
3455

3456
  if (SFSNEXTROW_INDEXLIST == state->state) {
974✔
3457
    SBrinBlk *pBrinBlk = NULL;
974✔
3458
  _next_brinindex:
974✔
3459
    if (--state->iBrinIndex < 0) {
974✔
3460
      if (state->pLastRow) {
×
3461
        state->state = SFSNEXTROW_NEXTSTTROW;
×
3462
        *ppRow = state->pLastRow;
×
3463
        state->pLastRow = NULL;
×
3464
        return code;
×
3465
      }
3466

3467
      clearLastFileSet(state);
×
3468
      goto _next_fileset;
×
3469
    } else {
3470
      pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
974✔
3471
    }
3472

3473
    if (!state->pBrinBlock) {
974✔
3474
      state->pBrinBlock = &state->brinBlock;
974✔
3475
    } else {
3476
      tBrinBlockClear(&state->brinBlock);
×
3477
    }
3478

3479
    TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
974✔
3480

3481
    state->iBrinRecord = state->brinBlock.numOfRecords - 1;
974✔
3482
    state->state = SFSNEXTROW_BRINBLOCK;
974✔
3483
  }
3484

3485
  if (SFSNEXTROW_BRINBLOCK == state->state) {
974✔
3486
  _next_brinrecord:
974✔
3487
    if (state->iBrinRecord < 0) {  // empty brin block, goto _next_brinindex
974✔
3488
      tBrinBlockClear(&state->brinBlock);
×
3489
      goto _next_brinindex;
×
3490
    }
3491

3492
    TAOS_CHECK_GOTO(tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord), &lino, _err);
974✔
3493

3494
    SBrinRecord *pRecord = &state->brinRecord;
974✔
3495
    if (pRecord->uid != state->uid) {
974✔
3496
      // TODO: goto next brin block early
3497
      --state->iBrinRecord;
×
3498
      goto _next_brinrecord;
×
3499
    }
3500

3501
    state->state = SFSNEXTROW_BRINRECORD;
974✔
3502
  }
3503

3504
  if (SFSNEXTROW_BRINRECORD == state->state) {
974✔
3505
    SBrinRecord *pRecord = &state->brinRecord;
974✔
3506

3507
    if (!state->pBlockData) {
974✔
3508
      state->pBlockData = &state->blockData;
974✔
3509

3510
      TAOS_CHECK_GOTO(tBlockDataCreate(&state->blockData), &lino, _err);
974✔
3511
    } else {
3512
      tBlockDataReset(state->pBlockData);
×
3513
    }
3514

3515
    if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
974✔
3516
      --nCols;
974✔
3517
      ++aCols;
974✔
3518
    }
3519

3520
    TAOS_CHECK_GOTO(tsdbDataFileReadBlockDataByColumn(state->pr->pFileReader, pRecord, state->pBlockData,
974✔
3521
                                                      state->pTSchema, aCols, nCols),
3522
                    &lino, _err);
3523

3524
    state->nRow = state->blockData.nRow;
974✔
3525
    state->iRow = state->nRow - 1;
974✔
3526

3527
    state->state = SFSNEXTROW_BLOCKROW;
974✔
3528
  }
3529

3530
  if (SFSNEXTROW_BLOCKROW == state->state) {
974✔
3531
    if (state->iRow < 0) {
974✔
3532
      --state->iBrinRecord;
×
3533
      goto _next_brinrecord;
×
3534
    }
3535

3536
    state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
974✔
3537
    if (!state->pLastIter) {
974✔
3538
      *ppRow = &state->row;
×
3539
      --state->iRow;
×
3540
      return code;
×
3541
    }
3542

3543
    if (!state->pLastRow) {
974✔
3544
      // get next row from fslast and process with fs row, --state->Row if select fs row
3545
      TAOS_CHECK_GOTO(lastIterNext(&state->lastIter, &state->pLastRow), &lino, _err);
974✔
3546
    }
3547

3548
    if (!state->pLastRow) {
974✔
3549
      if (state->pLastIter) {
974✔
3550
        code = lastIterClose(&state->pLastIter);
974✔
3551
        if (code != TSDB_CODE_SUCCESS) {
974✔
3552
          tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3553
                    tstrerror(code));
3554
          TAOS_RETURN(code);
×
3555
        }
3556
      }
3557

3558
      *ppRow = &state->row;
974✔
3559
      --state->iRow;
974✔
3560
      return code;
974✔
3561
    }
3562

3563
    // process state->pLastRow & state->row
3564
    TSKEY rowTs = TSDBROW_TS(&state->row);
×
3565
    TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
×
3566
    if (lastRowTs > rowTs) {
×
3567
      *ppRow = state->pLastRow;
×
3568
      state->pLastRow = NULL;
×
3569

3570
      TAOS_RETURN(code);
×
3571
    } else if (lastRowTs < rowTs) {
×
3572
      *ppRow = &state->row;
×
3573
      --state->iRow;
×
3574

3575
      TAOS_RETURN(code);
×
3576
    } else {
3577
      // TODO: merge rows and *ppRow = mergedRow
3578
      SRowMerger *pMerger = &state->rowMerger;
×
3579
      code = tsdbRowMergerInit(pMerger, state->pTSchema);
×
3580
      if (code != TSDB_CODE_SUCCESS) {
×
3581
        tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
×
3582
                  tstrerror(code));
3583
        TAOS_RETURN(code);
×
3584
      }
3585

3586
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
×
3587
      TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
×
3588

3589
      if (state->pTSRow) {
×
3590
        taosMemoryFree(state->pTSRow);
×
3591
        state->pTSRow = NULL;
×
3592
      }
3593

3594
      TAOS_CHECK_GOTO(tsdbRowMergerGetRow(pMerger, &state->pTSRow), &lino, _err);
×
3595

3596
      state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
×
3597
      *ppRow = &state->row;
×
3598
      --state->iRow;
×
3599

3600
      tsdbRowMergerClear(pMerger);
×
3601

3602
      TAOS_RETURN(code);
×
3603
    }
3604
  }
3605

3606
_err:
×
3607
  clearLastFileSet(state);
×
3608

3609
  *ppRow = NULL;
×
3610

3611
  if (code) {
×
3612
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
3613
              tstrerror(code));
3614
  }
3615

3616
  TAOS_RETURN(code);
×
3617
}
3618

3619
typedef struct CacheNextRowIter {
3620
  SArray           *pMemDelData;
3621
  SArray           *pSkyline;
3622
  int64_t           iSkyline;
3623
  SBlockIdx         idx;
3624
  SMemNextRowIter   memState;
3625
  SMemNextRowIter   imemState;
3626
  SFSNextRowIter    fsState;
3627
  TSDBROW           memRow, imemRow, fsLastRow, fsRow;
3628
  TsdbNextRowState  input[3];
3629
  SCacheRowsReader *pr;
3630
  STsdb            *pTsdb;
3631
} CacheNextRowIter;
3632

3633
int32_t clearNextRowFromFS(void *iter) {
115,640✔
3634
  int32_t code = 0;
115,640✔
3635

3636
  SFSNextRowIter *state = (SFSNextRowIter *)iter;
115,640✔
3637
  if (!state) {
115,640✔
3638
    TAOS_RETURN(code);
×
3639
  }
3640

3641
  if (state->pLastIter) {
115,640✔
3642
    code = lastIterClose(&state->pLastIter);
×
3643
    if (code != TSDB_CODE_SUCCESS) {
×
3644
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3645
      TAOS_RETURN(code);
×
3646
    }
3647
  }
3648

3649
  if (state->pBlockData) {
115,640✔
3650
    tBlockDataDestroy(state->pBlockData);
974✔
3651
    state->pBlockData = NULL;
974✔
3652
  }
3653

3654
  if (state->pBrinBlock) {
115,640✔
3655
    tBrinBlockDestroy(state->pBrinBlock);
974✔
3656
    state->pBrinBlock = NULL;
974✔
3657
  }
3658

3659
  if (state->pIndexList) {
115,640✔
3660
    taosArrayDestroy(state->pIndexList);
7,792✔
3661
    state->pIndexList = NULL;
7,792✔
3662
  }
3663

3664
  if (state->pTSRow) {
115,640✔
3665
    taosMemoryFree(state->pTSRow);
×
3666
    state->pTSRow = NULL;
×
3667
  }
3668

3669
  if (state->pRowIter->pSkyline) {
115,640✔
3670
    taosArrayDestroy(state->pRowIter->pSkyline);
111,700✔
3671
    state->pRowIter->pSkyline = NULL;
111,700✔
3672
  }
3673

3674
  TAOS_RETURN(code);
115,640✔
3675
}
3676

3677
static void clearLastFileSet(SFSNextRowIter *state) {
142,648✔
3678
  if (state->pLastIter) {
142,648✔
3679
    int code = lastIterClose(&state->pLastIter);
×
3680
    if (code != TSDB_CODE_SUCCESS) {
×
3681
      tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3682
      return;
×
3683
    }
3684
  }
3685

3686
  if (state->pBlockData) {
142,648✔
3687
    tBlockDataDestroy(state->pBlockData);
×
3688
    state->pBlockData = NULL;
×
3689
  }
3690

3691
  if (state->pr->pFileReader) {
142,648✔
3692
    tsdbDataFileReaderClose(&state->pr->pFileReader);
7,792✔
3693
    state->pr->pFileReader = NULL;
7,792✔
3694

3695
    state->pr->pCurFileSet = NULL;
7,792✔
3696
  }
3697

3698
  if (state->pTSRow) {
142,648✔
3699
    taosMemoryFree(state->pTSRow);
×
3700
    state->pTSRow = NULL;
×
3701
  }
3702

3703
  if (state->pRowIter->pSkyline) {
142,648✔
3704
    taosArrayDestroy(state->pRowIter->pSkyline);
614✔
3705
    state->pRowIter->pSkyline = NULL;
614✔
3706

3707
    void   *pe = NULL;
614✔
3708
    int32_t iter = 0;
614✔
3709
    while ((pe = tSimpleHashIterate(state->pr->pTableMap, pe, &iter)) != NULL) {
1,228✔
3710
      STableLoadInfo *pInfo = *(STableLoadInfo **)pe;
614✔
3711
      taosArrayDestroy(pInfo->pTombData);
614✔
3712
      pInfo->pTombData = NULL;
614✔
3713
    }
3714
  }
3715
}
3716

3717
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
115,153✔
3718
                               SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, int64_t lastTs,
3719
                               SCacheRowsReader *pr) {
3720
  int32_t code = 0, lino = 0;
115,153✔
3721

3722
  STbData *pMem = NULL;
115,640✔
3723
  if (pReadSnap->pMem) {
115,640✔
3724
    pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
115,169✔
3725
  }
3726

3727
  STbData *pIMem = NULL;
116,111✔
3728
  if (pReadSnap->pIMem) {
116,111✔
3729
    pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
×
3730
  }
3731

3732
  pIter->pTsdb = pTsdb;
115,640✔
3733

3734
  pIter->pMemDelData = NULL;
115,640✔
3735

3736
  TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _err);
115,640✔
3737

3738
  pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
115,640✔
3739

3740
  pIter->fsState.pRowIter = pIter;
115,169✔
3741
  pIter->fsState.state = SFSNEXTROW_FS;
115,640✔
3742
  pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
115,169✔
3743
  pIter->fsState.pBlockIdxExp = &pIter->idx;
115,169✔
3744
  pIter->fsState.pTSchema = pTSchema;
115,153✔
3745
  pIter->fsState.suid = suid;
115,640✔
3746
  pIter->fsState.uid = uid;
115,153✔
3747
  pIter->fsState.lastTs = lastTs;
115,640✔
3748
  pIter->fsState.pr = pr;
115,153✔
3749

3750
  pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
115,640✔
3751
  pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
115,640✔
3752
  pIter->input[2] =
115,640✔
3753
      (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
115,169✔
3754

3755
  if (pMem) {
115,169✔
3756
    pIter->memState.pMem = pMem;
107,779✔
3757
    pIter->memState.state = SMEMNEXTROW_ENTER;
108,266✔
3758
    pIter->memState.lastTs = lastTs;
108,266✔
3759
    pIter->input[0].stop = false;
107,779✔
3760
    pIter->input[0].next = true;
107,779✔
3761
  }
3762

3763
  if (pIMem) {
115,169✔
3764
    pIter->imemState.pMem = pIMem;
×
3765
    pIter->imemState.state = SMEMNEXTROW_ENTER;
×
3766
    pIter->imemState.lastTs = lastTs;
×
3767
    pIter->input[1].stop = false;
×
3768
    pIter->input[1].next = true;
×
3769
  }
3770

3771
  pIter->pr = pr;
115,169✔
3772

3773
_err:
115,640✔
3774
  TAOS_RETURN(code);
115,640✔
3775
}
3776

3777
static void nextRowIterClose(CacheNextRowIter *pIter) {
115,640✔
3778
  for (int i = 0; i < 3; ++i) {
462,560✔
3779
    if (pIter->input[i].nextRowClearFn) {
346,920✔
3780
      (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
115,640✔
3781
    }
3782
  }
3783

3784
  if (pIter->pSkyline) {
115,640✔
3785
    taosArrayDestroy(pIter->pSkyline);
×
3786
  }
3787

3788
  if (pIter->pMemDelData) {
115,640✔
3789
    taosArrayDestroy(pIter->pMemDelData);
115,640✔
3790
  }
3791
}
115,640✔
3792

3793
// iterate next row non deleted backward ts, version (from high to low)
3794
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
121,468✔
3795
                              int16_t *aCols, int nCols) {
3796
  int32_t code = 0, lino = 0;
121,468✔
3797

3798
  for (;;) {
614✔
3799
    for (int i = 0; i < 3; ++i) {
489,302✔
3800
      if (pIter->input[i].next && !pIter->input[i].stop) {
367,220✔
3801
        TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow,
229,877✔
3802
                                                  &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols),
3803
                        &lino, _err);
3804

3805
        if (pIter->input[i].pRow == NULL) {
230,835✔
3806
          pIter->input[i].stop = true;
115,615✔
3807
          pIter->input[i].next = false;
115,615✔
3808
        }
3809
      }
3810
    }
3811

3812
    if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) {
122,082✔
3813
      *ppRow = NULL;
7,349✔
3814
      *pIgnoreEarlierTs =
14,698✔
3815
          (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs);
7,349✔
3816

3817
      TAOS_RETURN(code);
7,349✔
3818
    }
3819

3820
    // select maxpoint(s) from mem, imem, fs and last
3821
    TSDBROW *max[4] = {0};
115,220✔
3822
    int      iMax[4] = {-1, -1, -1, -1};
115,220✔
3823
    int      nMax = 0;
115,220✔
3824
    SRowKey  maxKey = {.ts = TSKEY_MIN};
115,220✔
3825

3826
    for (int i = 0; i < 3; ++i) {
460,880✔
3827
      if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
345,660✔
3828
        STsdbRowKey tsdbRowKey = {0};
115,834✔
3829
        tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey);
115,834✔
3830

3831
        // merging & deduplicating on client side
3832
        int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key);
115,834✔
3833
        if (c <= 0) {
115,834✔
3834
          if (c < 0) {
115,834✔
3835
            nMax = 0;
115,834✔
3836
            maxKey = tsdbRowKey.key;
115,834✔
3837
          }
3838

3839
          iMax[nMax] = i;
115,834✔
3840
          max[nMax++] = pIter->input[i].pRow;
115,834✔
3841
        }
3842
        pIter->input[i].next = false;
115,834✔
3843
      }
3844
    }
3845

3846
    // delete detection
3847
    TSDBROW *merge[4] = {0};
115,220✔
3848
    int      iMerge[4] = {-1, -1, -1, -1};
115,220✔
3849
    int      nMerge = 0;
115,220✔
3850
    for (int i = 0; i < nMax; ++i) {
230,440✔
3851
      TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
115,220✔
3852

3853
      if (!pIter->pSkyline) {
115,220✔
3854
        pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
112,314✔
3855
        TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, terrno);
112,314✔
3856

3857
        uint64_t        uid = pIter->idx.uid;
112,314✔
3858
        STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
112,314✔
3859
        TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
112,314✔
3860

3861
        if (pInfo->pTombData == NULL) {
112,314✔
3862
          pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
110,366✔
3863
          TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, terrno);
110,366✔
3864
        }
3865

3866
        if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) {
112,314✔
3867
          TAOS_CHECK_GOTO(terrno, &lino, _err);
×
3868
        }
3869

3870
        size_t delSize = TARRAY_SIZE(pInfo->pTombData);
112,314✔
3871
        if (delSize > 0) {
112,314✔
3872
          code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
2,202✔
3873
          TAOS_CHECK_GOTO(code, &lino, _err);
2,202✔
3874
        }
3875
        pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
112,314✔
3876
      }
3877

3878
      bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
115,220✔
3879
      if (!deleted) {
115,220✔
3880
        iMerge[nMerge] = iMax[i];
114,606✔
3881
        merge[nMerge++] = max[i];
114,606✔
3882
      }
3883

3884
      pIter->input[iMax[i]].next = deleted;
115,220✔
3885
    }
3886

3887
    if (nMerge > 0) {
115,220✔
3888
      pIter->input[iMerge[0]].next = true;
114,606✔
3889

3890
      *ppRow = merge[0];
114,606✔
3891

3892
      TAOS_RETURN(code);
114,606✔
3893
    }
3894
  }
3895

3896
_err:
×
3897
  if (code) {
×
3898
    tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
3899
  }
3900

3901
  TAOS_RETURN(code);
×
3902
}
3903

3904
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
115,153✔
3905
  SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
115,153✔
3906
  if (NULL == pColArray) {
115,640✔
3907
    TAOS_RETURN(terrno);
×
3908
  }
3909

3910
  for (int32_t i = 0; i < nCols; ++i) {
389,098✔
3911
    int16_t  slotId = slotIds[i];
273,458✔
3912
    SLastCol col = {.rowKey.ts = 0,
272,971✔
3913
                    .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
272,500✔
3914
    if (!taosArrayPush(pColArray, &col)) {
273,458✔
3915
      TAOS_RETURN(terrno);
×
3916
    }
3917
  }
3918
  *ppColArray = pColArray;
115,640✔
3919

3920
  TAOS_RETURN(TSDB_CODE_SUCCESS);
115,640✔
3921
}
3922

3923
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
59,284✔
3924
                            int nCols, int16_t *slotIds) {
3925
  int32_t   code = 0, lino = 0;
59,284✔
3926
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
59,284✔
3927
  int16_t   nLastCol = nCols;
59,284✔
3928
  int16_t   noneCol = 0;
59,284✔
3929
  bool      setNoneCol = false;
59,284✔
3930
  bool      hasRow = false;
59,284✔
3931
  bool      ignoreEarlierTs = false;
59,284✔
3932
  SArray   *pColArray = NULL;
59,284✔
3933
  SColVal  *pColVal = &(SColVal){0};
59,284✔
3934

3935
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
59,284✔
3936

3937
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
59,284✔
3938
  if (NULL == aColArray) {
59,284✔
3939
    taosArrayDestroy(pColArray);
×
3940

3941
    TAOS_RETURN(terrno);
×
3942
  }
3943

3944
  for (int i = 0; i < nCols; ++i) {
199,248✔
3945
    if (!taosArrayPush(aColArray, &aCols[i])) {
279,928✔
3946
      taosArrayDestroy(pColArray);
×
3947

3948
      TAOS_RETURN(terrno);
×
3949
    }
3950
  }
3951

3952
  STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
59,284✔
3953

3954
  // inverse iterator
3955
  CacheNextRowIter iter = {0};
59,284✔
3956
  code =
3957
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
59,284✔
3958
  TAOS_CHECK_GOTO(code, &lino, _err);
59,284✔
3959

3960
  do {
3961
    TSDBROW *pRow = NULL;
65,599✔
3962
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
65,599✔
3963

3964
    if (!pRow) {
65,599✔
3965
      break;
6,391✔
3966
    }
3967

3968
    hasRow = true;
59,208✔
3969

3970
    int32_t sversion = TSDBROW_SVERSION(pRow);
59,208✔
3971
    if (sversion != -1) {
59,208✔
3972
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
56,261✔
3973

3974
      pTSchema = pr->pCurrSchema;
56,261✔
3975
    }
3976
    // int16_t nCol = pTSchema->numOfCols;
3977

3978
    STsdbRowKey rowKey = {0};
59,208✔
3979
    tsdbRowGetKey(pRow, &rowKey);
59,208✔
3980

3981
    if (lastRowKey.key.ts == TSKEY_MAX) {  // first time
59,208✔
3982
      lastRowKey = rowKey;
56,302✔
3983

3984
      for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
188,370✔
3985
        if (iCol >= nLastCol) {
132,068✔
3986
          break;
×
3987
        }
3988
        SLastCol *pCol = taosArrayGet(pColArray, iCol);
132,068✔
3989
        if (slotIds[iCol] > pTSchema->numOfCols - 1) {
132,068✔
3990
          if (!setNoneCol) {
×
3991
            noneCol = iCol;
×
3992
            setNoneCol = true;
×
3993
          }
3994
          continue;
×
3995
        }
3996
        if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
132,068✔
3997
          continue;
×
3998
        }
3999
        if (slotIds[iCol] == 0) {
132,068✔
4000
          STColumn *pTColumn = &pTSchema->columns[0];
56,302✔
4001
          SValue    val = {.type = pTColumn->type};
56,302✔
4002
          VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
56,302✔
4003
          *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
56,302✔
4004

4005
          SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
56,302✔
4006
          TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
56,302✔
4007

4008
          taosArraySet(pColArray, 0, &colTmp);
56,302✔
4009
          continue;
56,302✔
4010
        }
4011
        tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
75,766✔
4012

4013
        *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
75,766✔
4014
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
75,766✔
4015

4016
        if (!COL_VAL_IS_VALUE(pColVal)) {
75,766✔
4017
          if (!setNoneCol) {
8,734✔
4018
            noneCol = iCol;
5,341✔
4019
            setNoneCol = true;
5,341✔
4020
          }
4021
        } else {
4022
          int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
67,032✔
4023
          if (aColIndex >= 0) {
67,032✔
4024
            taosArrayRemove(aColArray, aColIndex);
67,032✔
4025
          }
4026
        }
4027
      }
4028
      if (!setNoneCol) {
56,302✔
4029
        // done, goto return pColArray
4030
        break;
50,961✔
4031
      } else {
4032
        continue;
5,341✔
4033
      }
4034
    }
4035

4036
    // merge into pColArray
4037
    setNoneCol = false;
2,906✔
4038
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
10,179✔
4039
      if (iCol >= nLastCol) {
7,273✔
4040
        break;
×
4041
      }
4042
      // high version's column value
4043
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
7,273✔
4044
        continue;
×
4045
      }
4046

4047
      SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
7,273✔
4048
      if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
7,273✔
4049
        continue;
×
4050
      }
4051
      SColVal *tColVal = &lastColVal->colVal;
7,273✔
4052
      if (COL_VAL_IS_VALUE(tColVal)) continue;
7,273✔
4053

4054
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
4,838✔
4055
      if (COL_VAL_IS_VALUE(pColVal)) {
4,838✔
4056
        SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
3,864✔
4057
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&lastCol, NULL), &lino, _err);
3,864✔
4058

4059
        tsdbCacheFreeSLastColItem(lastColVal);
3,864✔
4060
        taosArraySet(pColArray, iCol, &lastCol);
3,864✔
4061
        int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
3,864✔
4062
        if (aColIndex >= 0) {
3,864✔
4063
          taosArrayRemove(aColArray, aColIndex);
3,864✔
4064
        }
4065
      } else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
974✔
4066
        noneCol = iCol;
974✔
4067
        setNoneCol = true;
974✔
4068
      }
4069
    }
4070
  } while (setNoneCol);
8,247✔
4071

4072
  if (!hasRow) {
59,284✔
4073
    if (ignoreEarlierTs) {
2,982✔
4074
      taosArrayDestroy(pColArray);
×
4075
      pColArray = NULL;
×
4076
    } else {
4077
      taosArrayClear(pColArray);
2,982✔
4078
    }
4079
  }
4080
  *ppLastArray = pColArray;
59,284✔
4081

4082
  nextRowIterClose(&iter);
59,284✔
4083
  taosArrayDestroy(aColArray);
59,284✔
4084

4085
  TAOS_RETURN(code);
59,284✔
4086

4087
_err:
×
4088
  nextRowIterClose(&iter);
×
4089
  // taosMemoryFreeClear(pTSchema);
4090
  *ppLastArray = NULL;
×
4091
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
4092
  taosArrayDestroy(aColArray);
×
4093

4094
  if (code) {
×
4095
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4096
              tstrerror(code));
4097
  }
4098

4099
  TAOS_RETURN(code);
×
4100
}
4101

4102
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
56,356✔
4103
                               int nCols, int16_t *slotIds) {
4104
  int32_t   code = 0, lino = 0;
56,356✔
4105
  STSchema *pTSchema = pr->pSchema;  // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
56,356✔
4106
  int16_t   nLastCol = nCols;
56,356✔
4107
  int16_t   noneCol = 0;
56,356✔
4108
  bool      setNoneCol = false;
56,356✔
4109
  bool      hasRow = false;
56,356✔
4110
  bool      ignoreEarlierTs = false;
56,356✔
4111
  SArray   *pColArray = NULL;
56,356✔
4112
  SColVal  *pColVal = &(SColVal){0};
56,356✔
4113

4114
  TAOS_CHECK_RETURN(initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols));
56,356✔
4115

4116
  SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
56,356✔
4117
  if (NULL == aColArray) {
56,356✔
4118
    taosArrayDestroy(pColArray);
×
4119

4120
    TAOS_RETURN(terrno);
×
4121
  }
4122

4123
  for (int i = 0; i < nCols; ++i) {
188,908✔
4124
    if (!taosArrayPush(aColArray, &aCols[i])) {
265,575✔
4125
      taosArrayDestroy(pColArray);
×
4126

4127
      TAOS_RETURN(terrno);
×
4128
    }
4129
  }
4130

4131
  // inverse iterator
4132
  CacheNextRowIter iter = {0};
55,885✔
4133
  code =
4134
      nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
56,356✔
4135
  TAOS_CHECK_GOTO(code, &lino, _err);
56,356✔
4136

4137
  do {
4138
    TSDBROW *pRow = NULL;
56,356✔
4139
    code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
56,356✔
4140

4141
    if (!pRow) {
56,356✔
4142
      break;
958✔
4143
    }
4144

4145
    hasRow = true;
55,398✔
4146

4147
    int32_t sversion = TSDBROW_SVERSION(pRow);
55,398✔
4148
    if (sversion != -1) {
55,398✔
4149
      TAOS_CHECK_GOTO(updateTSchema(sversion, pr, uid), &lino, _err);
54,440✔
4150

4151
      pTSchema = pr->pCurrSchema;
54,440✔
4152
    }
4153
    // int16_t nCol = pTSchema->numOfCols;
4154

4155
    STsdbRowKey rowKey = {0};
55,398✔
4156
    tsdbRowGetKey(pRow, &rowKey);
55,398✔
4157

4158
    for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
185,044✔
4159
      if (iCol >= nLastCol) {
129,646✔
4160
        break;
×
4161
      }
4162
      SLastCol *pCol = taosArrayGet(pColArray, iCol);
129,646✔
4163
      if (slotIds[iCol] > pTSchema->numOfCols - 1) {
129,646✔
4164
        continue;
×
4165
      }
4166
      if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
129,646✔
4167
        continue;
×
4168
      }
4169
      if (slotIds[iCol] == 0) {
129,646✔
4170
        STColumn *pTColumn = &pTSchema->columns[0];
55,398✔
4171
        SValue    val = {.type = pTColumn->type};
55,398✔
4172
        VALUE_SET_TRIVIAL_DATUM(&val, rowKey.key.ts);
55,398✔
4173
        *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
55,398✔
4174

4175
        SLastCol colTmp = {.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
55,398✔
4176
        TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(&colTmp, NULL), &lino, _err);
55,398✔
4177

4178
        taosArraySet(pColArray, 0, &colTmp);
55,398✔
4179
        continue;
55,398✔
4180
      }
4181
      tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
74,248✔
4182

4183
      *pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
74,248✔
4184
      TAOS_CHECK_GOTO(tsdbCacheReallocSLastCol(pCol, NULL), &lino, _err);
74,248✔
4185

4186
      int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
74,248✔
4187
      if (aColIndex >= 0) {
74,248✔
4188
        taosArrayRemove(aColArray, aColIndex);
74,248✔
4189
      }
4190
    }
4191

4192
    break;
55,398✔
4193
  } while (1);
4194

4195
  if (!hasRow) {
56,356✔
4196
    if (ignoreEarlierTs) {
958✔
4197
      taosArrayDestroy(pColArray);
×
4198
      pColArray = NULL;
×
4199
    } else {
4200
      taosArrayClear(pColArray);
958✔
4201
    }
4202
  }
4203
  *ppLastArray = pColArray;
56,356✔
4204

4205
  nextRowIterClose(&iter);
56,356✔
4206
  taosArrayDestroy(aColArray);
56,356✔
4207

4208
  TAOS_RETURN(code);
56,356✔
4209

4210
_err:
×
4211
  nextRowIterClose(&iter);
×
4212

4213
  *ppLastArray = NULL;
×
4214
  taosArrayDestroyEx(pColArray, tsdbCacheFreeSLastColItem);
×
4215
  taosArrayDestroy(aColArray);
×
4216

4217
  if (code) {
×
4218
    tsdbError("tsdb/cache: vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, lino,
×
4219
              tstrerror(code));
4220
  }
4221

4222
  TAOS_RETURN(code);
×
4223
}
4224

4225
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { tsdbLRUCacheRelease(pCache, h, false); }
12,923,596✔
4226

4227
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
15,957✔
4228
  taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
15,957✔
4229
}
15,957✔
4230

4231
#ifdef BUILD_NO_CALL
4232
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
4233
#endif
4234

4235
size_t tsdbCacheGetUsage(SVnode *pVnode) {
178,395,292✔
4236
  size_t usage = 0;
178,395,292✔
4237
  if (pVnode->pTsdb != NULL) {
178,395,292✔
4238
    usage = taosLRUCacheGetUsage(pVnode->pTsdb->lruCache);
178,395,292✔
4239
  }
4240

4241
  return usage;
178,395,292✔
4242
}
4243

4244
int32_t tsdbCacheGetElems(SVnode *pVnode) {
178,395,292✔
4245
  int32_t elems = 0;
178,395,292✔
4246
  if (pVnode->pTsdb != NULL) {
178,395,292✔
4247
    elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
178,395,292✔
4248
  }
4249

4250
  return elems;
178,395,292✔
4251
}
4252

4253
#ifdef USE_SHARED_STORAGE
4254
// block cache
4255
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
12,928,552✔
4256
  struct {
4257
    int32_t fid;
4258
    int64_t commitID;
4259
    int64_t blkno;
4260
  } bKey = {0};
12,928,552✔
4261

4262
  bKey.fid = fid;
12,928,552✔
4263
  bKey.commitID = commitID;
12,928,552✔
4264
  bKey.blkno = blkno;
12,928,552✔
4265

4266
  *len = sizeof(bKey);
12,928,552✔
4267
  memcpy(key, &bKey, *len);
12,928,552✔
4268
}
12,928,552✔
4269

4270
static int32_t tsdbCacheLoadBlockSs(STsdbFD *pFD, uint8_t **ppBlock) {
×
4271
  int32_t code = 0;
×
4272

4273
  int64_t block_size = tsSsBlockSize * pFD->szPage;
×
4274
  int64_t block_offset = (pFD->blkno - 1) * block_size;
×
4275

4276
  char *buf = taosMemoryMalloc(block_size);
×
4277
  if (buf == NULL) {
×
4278
    code = TSDB_CODE_OUT_OF_MEMORY;
×
4279
    goto _exit;
×
4280
  }
4281

4282
  // TODO: pFD->objName is not initialized, but this function is never called.
4283
  code = tssReadFileFromDefault(pFD->objName, block_offset, buf, &block_size);
×
4284
  if (code != TSDB_CODE_SUCCESS) {
×
4285
    taosMemoryFree(buf);
×
4286
    goto _exit;
×
4287
  }
4288
  *ppBlock = buf;
×
4289

4290
_exit:
×
4291
  return code;
×
4292
}
4293

4294
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
10,658,704✔
4295
  (void)ud;
4296
  uint8_t *pBlock = (uint8_t *)value;
10,658,704✔
4297

4298
  taosMemoryFree(pBlock);
10,658,704✔
4299
}
10,658,704✔
4300

4301
int32_t tsdbCacheGetBlockSs(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
×
4302
  int32_t code = 0;
×
4303
  char    key[128] = {0};
×
4304
  int     keyLen = 0;
×
4305

4306
  getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
×
4307
  LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
×
4308
  if (!h) {
×
4309
    STsdb *pTsdb = pFD->pTsdb;
×
4310
    (void)taosThreadMutexLock(&pTsdb->bMutex);
×
4311

4312
    h = taosLRUCacheLookup(pCache, key, keyLen);
×
4313
    if (!h) {
×
4314
      uint8_t *pBlock = NULL;
×
4315
      code = tsdbCacheLoadBlockSs(pFD, &pBlock);
×
4316
      //  if table's empty or error, return code of -1
4317
      if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
×
4318
        (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4319

4320
        *handle = NULL;
×
4321
        if (code == TSDB_CODE_SUCCESS && !pBlock) {
×
4322
          code = TSDB_CODE_OUT_OF_MEMORY;
×
4323
        }
4324

4325
        TAOS_RETURN(code);
×
4326
      }
4327

4328
      size_t              charge = tsSsBlockSize * pFD->szPage;
×
4329
      _taos_lru_deleter_t deleter = deleteBCache;
×
4330
      LRUStatus           status =
4331
          taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
×
4332
      if (status != TAOS_LRU_STATUS_OK) {
4333
        // code = -1;
4334
      }
4335
    }
4336

4337
    (void)taosThreadMutexUnlock(&pTsdb->bMutex);
×
4338
  }
4339

4340
  *handle = h;
×
4341

4342
  TAOS_RETURN(code);
×
4343
}
4344

4345
int32_t tsdbCacheGetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
2,269,848✔
4346
  if (!tsSsEnabled) {
2,269,848✔
4347
    return TSDB_CODE_OPS_NOT_SUPPORT;
×
4348
  }
4349

4350
  int32_t code = 0;
2,269,848✔
4351
  char    key[128] = {0};
2,269,848✔
4352
  int     keyLen = 0;
2,269,848✔
4353

4354
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
2,269,848✔
4355
  *handle = taosLRUCacheLookup(pCache, key, keyLen);
2,269,848✔
4356

4357
  return code;
2,269,848✔
4358
}
4359

4360
void tsdbCacheSetPageSs(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
10,658,704✔
4361
  if (!tsSsEnabled) {
10,658,704✔
4362
    return;
×
4363
  }
4364

4365
  char       key[128] = {0};
10,658,704✔
4366
  int        keyLen = 0;
10,658,704✔
4367
  LRUHandle *handle = NULL;
10,658,704✔
4368

4369
  getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
10,658,704✔
4370
  (void)taosThreadMutexLock(&pFD->pTsdb->pgMutex);
10,658,704✔
4371
  handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
10,658,704✔
4372
  if (!handle) {
10,658,704✔
4373
    size_t              charge = pFD->szPage;
10,658,704✔
4374
    _taos_lru_deleter_t deleter = deleteBCache;
10,658,704✔
4375
    uint8_t            *pPg = taosMemoryMalloc(charge);
10,658,704✔
4376
    if (!pPg) {
10,658,704✔
4377
      (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
×
4378

4379
      return;  // ignore error with ss cache and leave error untouched
×
4380
    }
4381
    memcpy(pPg, pPage, charge);
10,658,704✔
4382

4383
    LRUStatus status =
4384
        taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
10,658,704✔
4385
    if (status != TAOS_LRU_STATUS_OK) {
4386
      // ignore cache updating if not ok
4387
      // code = TSDB_CODE_OUT_OF_MEMORY;
4388
    }
4389
  }
4390
  (void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
10,658,704✔
4391

4392
  tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
10,658,704✔
4393
}
4394
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc