• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.22
/source/dnode/vnode/src/tsdb/tsdbUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcompression.h"
17
#include "tdataformat.h"
18
#include "tsdb.h"
19
#include "tsdbDef.h"
20

21
int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg);
22

23
static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist,
24
                                         SColCompressInfo *pCompressExt);
25

26
// SMapData =======================================================================
27
void tMapDataReset(SMapData *pMapData) {
×
28
  pMapData->nItem = 0;
×
29
  pMapData->nData = 0;
×
30
}
×
31

32
void tMapDataClear(SMapData *pMapData) {
×
33
  tFree(pMapData->aOffset);
×
34
  tFree(pMapData->pData);
×
35
  pMapData->pData = NULL;
×
36
  pMapData->aOffset = NULL;
×
37
}
×
38

39
#ifdef BUILD_NO_CALL
40
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
41
  int32_t code = 0;
42
  int32_t offset = pMapData->nData;
43
  int32_t nItem = pMapData->nItem;
44

45
  pMapData->nItem++;
46
  pMapData->nData += tPutItemFn(NULL, pItem);
47

48
  // alloc
49
  code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
50
  if (code) goto _exit;
51
  code = tRealloc(&pMapData->pData, pMapData->nData);
52
  if (code) goto _exit;
53

54
  // put
55
  pMapData->aOffset[nItem] = offset;
56
  tPutItemFn(pMapData->pData + offset, pItem);
57

58
_exit:
59
  return code;
60
}
61

62
int32_t tMapDataCopy(SMapData *pFrom, SMapData *pTo) {
63
  int32_t code = 0;
64

65
  pTo->nItem = pFrom->nItem;
66
  pTo->nData = pFrom->nData;
67
  code = tRealloc((uint8_t **)&pTo->aOffset, sizeof(int32_t) * pFrom->nItem);
68
  if (code) goto _exit;
69
  code = tRealloc(&pTo->pData, pFrom->nData);
70
  if (code) goto _exit;
71
  memcpy(pTo->aOffset, pFrom->aOffset, sizeof(int32_t) * pFrom->nItem);
72
  memcpy(pTo->pData, pFrom->pData, pFrom->nData);
73

74
_exit:
75
  return code;
76
}
77

78
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
79
                       int32_t (*tItemCmprFn)(const void *, const void *), void *pItem) {
80
  int32_t code = 0;
81
  int32_t lidx = 0;
82
  int32_t ridx = pMapData->nItem - 1;
83
  int32_t midx;
84
  int32_t c;
85

86
  while (lidx <= ridx) {
87
    midx = (lidx + ridx) / 2;
88

89
    tMapDataGetItemByIdx(pMapData, midx, pItem, tGetItemFn);
90

91
    c = tItemCmprFn(pSearchItem, pItem);
92
    if (c == 0) {
93
      goto _exit;
94
    } else if (c < 0) {
95
      ridx = midx - 1;
96
    } else {
97
      lidx = midx + 1;
98
    }
99
  }
100

101
  code = TSDB_CODE_NOT_FOUND;
102

103
_exit:
104
  return code;
105
}
106
#endif
107

108
void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) {
×
109
  int32_t r = tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem);
×
110
}
×
111

112
#ifdef BUILD_NO_CALL
113
int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *),
114
                        SArray **ppArray) {
115
  int32_t code = 0;
116

117
  SArray *pArray = taosArrayInit(pMapData->nItem, itemSize);
118
  if (pArray == NULL) {
119
    code = terrno;
120
    goto _exit;
121
  }
122

123
  for (int32_t i = 0; i < pMapData->nItem; i++) {
124
    tMapDataGetItemByIdx(pMapData, i, taosArrayReserve(pArray, 1), tGetItemFn);
125
  }
126

127
_exit:
128
  *ppArray = pArray;
129
  return code;
130
}
131

132
int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
133
  int32_t n = 0;
134

135
  n += tPutI32v(p ? p + n : p, pMapData->nItem);
136
  if (pMapData->nItem) {
137
    int32_t lOffset = 0;
138
    for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
139
      n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem] - lOffset);
140
      lOffset = pMapData->aOffset[iItem];
141
    }
142

143
    n += tPutI32v(p ? p + n : p, pMapData->nData);
144
    if (p) {
145
      memcpy(p + n, pMapData->pData, pMapData->nData);
146
    }
147
    n += pMapData->nData;
148
  }
149

150
  return n;
151
}
152
#endif
153

154
int32_t tGetMapData(uint8_t *p, SMapData *pMapData, int32_t *decodedSize) {
×
155
  int32_t n = 0;
×
156
  int32_t code;
157
  int32_t offset;
158

159
  tMapDataReset(pMapData);
×
160

161
  n += tGetI32v(p + n, &pMapData->nItem);
×
162
  if (pMapData->nItem) {
×
163
    code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
×
164
    if (code) {
×
165
      return code;
×
166
    }
167

168
    int32_t lOffset = 0;
×
169
    for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
×
170
      n += tGetI32v(p + n, &pMapData->aOffset[iItem]);
×
171
      pMapData->aOffset[iItem] += lOffset;
×
172
      lOffset = pMapData->aOffset[iItem];
×
173
    }
174

175
    n += tGetI32v(p + n, &pMapData->nData);
×
176
    code = tRealloc(&pMapData->pData, pMapData->nData);
×
177
    if (code) {
×
178
      return code;
×
179
    }
180
    memcpy(pMapData->pData, p + n, pMapData->nData);
×
181
    n += pMapData->nData;
×
182
  }
183

184
  if (decodedSize) {
×
185
    *decodedSize = n;
×
186
  }
187

188
  return 0;
×
189
}
190

191
#ifdef BUILD_NO_CALL
192
// TABLEID =======================================================================
193
int32_t tTABLEIDCmprFn(const void *p1, const void *p2) {
194
  TABLEID *pId1 = (TABLEID *)p1;
195
  TABLEID *pId2 = (TABLEID *)p2;
196

197
  if (pId1->suid < pId2->suid) {
198
    return -1;
199
  } else if (pId1->suid > pId2->suid) {
200
    return 1;
201
  }
202

203
  if (pId1->uid < pId2->uid) {
204
    return -1;
205
  } else if (pId1->uid > pId2->uid) {
206
    return 1;
207
  }
208

209
  return 0;
210
}
211

212
// SBlockIdx ======================================================
213
int32_t tPutBlockIdx(uint8_t *p, void *ph) {
214
  int32_t    n = 0;
215
  SBlockIdx *pBlockIdx = (SBlockIdx *)ph;
216

217
  n += tPutI64(p ? p + n : p, pBlockIdx->suid);
218
  n += tPutI64(p ? p + n : p, pBlockIdx->uid);
219
  n += tPutI64v(p ? p + n : p, pBlockIdx->offset);
220
  n += tPutI64v(p ? p + n : p, pBlockIdx->size);
221

222
  return n;
223
}
224
#endif
225

226
int32_t tGetBlockIdx(uint8_t *p, void *ph) {
×
227
  int32_t    n = 0;
×
228
  SBlockIdx *pBlockIdx = (SBlockIdx *)ph;
×
229

230
  n += tGetI64(p + n, &pBlockIdx->suid);
×
231
  n += tGetI64(p + n, &pBlockIdx->uid);
×
232
  n += tGetI64v(p + n, &pBlockIdx->offset);
×
233
  n += tGetI64v(p + n, &pBlockIdx->size);
×
234

235
  return n;
×
236
}
237

238
#ifdef BUILD_NO_CALL
239
int32_t tCmprBlockIdx(void const *lhs, void const *rhs) {
240
  SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
241
  SBlockIdx *rBlockIdx = (SBlockIdx *)rhs;
242

243
  if (lBlockIdx->suid < rBlockIdx->suid) {
244
    return -1;
245
  } else if (lBlockIdx->suid > rBlockIdx->suid) {
246
    return 1;
247
  }
248

249
  if (lBlockIdx->uid < rBlockIdx->uid) {
250
    return -1;
251
  } else if (lBlockIdx->uid > rBlockIdx->uid) {
252
    return 1;
253
  }
254

255
  return 0;
256
}
257

258
int32_t tCmprBlockL(void const *lhs, void const *rhs) {
259
  SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
260
  SSttBlk   *rBlockL = (SSttBlk *)rhs;
261

262
  if (lBlockIdx->suid < rBlockL->suid) {
263
    return -1;
264
  } else if (lBlockIdx->suid > rBlockL->suid) {
265
    return 1;
266
  }
267

268
  if (lBlockIdx->uid < rBlockL->minUid) {
269
    return -1;
270
  } else if (lBlockIdx->uid > rBlockL->maxUid) {
271
    return 1;
272
  }
273

274
  return 0;
275
}
276

277
// SDataBlk ======================================================
278
void tDataBlkReset(SDataBlk *pDataBlk) {
279
  *pDataBlk = (SDataBlk){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN};
280
}
281

282
int32_t tPutDataBlk(uint8_t *p, void *ph) {
283
  int32_t   n = 0;
284
  SDataBlk *pDataBlk = (SDataBlk *)ph;
285

286
  n += tPutI64v(p ? p + n : p, pDataBlk->minKey.version);
287
  n += tPutI64v(p ? p + n : p, pDataBlk->minKey.ts);
288
  n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.version);
289
  n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.ts);
290
  n += tPutI64v(p ? p + n : p, pDataBlk->minVer);
291
  n += tPutI64v(p ? p + n : p, pDataBlk->maxVer);
292
  n += tPutI32v(p ? p + n : p, pDataBlk->nRow);
293
  n += tPutI8(p ? p + n : p, pDataBlk->hasDup);
294
  n += tPutI8(p ? p + n : p, pDataBlk->nSubBlock);
295
  for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
296
    n += tPutI64v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].offset);
297
    n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szBlock);
298
    n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szKey);
299
  }
300
  if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) {
301
    n += tPutI64v(p ? p + n : p, pDataBlk->smaInfo.offset);
302
    n += tPutI32v(p ? p + n : p, pDataBlk->smaInfo.size);
303
  }
304

305
  return n;
306
}
307
#endif
308

309
int32_t tGetDataBlk(uint8_t *p, void *ph) {
×
310
  int32_t   n = 0;
×
311
  SDataBlk *pDataBlk = (SDataBlk *)ph;
×
312

313
  n += tGetI64v(p + n, &pDataBlk->minKey.version);
×
314
  n += tGetI64v(p + n, &pDataBlk->minKey.ts);
×
315
  n += tGetI64v(p + n, &pDataBlk->maxKey.version);
×
316
  n += tGetI64v(p + n, &pDataBlk->maxKey.ts);
×
317
  n += tGetI64v(p + n, &pDataBlk->minVer);
×
318
  n += tGetI64v(p + n, &pDataBlk->maxVer);
×
319
  n += tGetI32v(p + n, &pDataBlk->nRow);
×
320
  n += tGetI8(p + n, &pDataBlk->hasDup);
×
321
  n += tGetI8(p + n, &pDataBlk->nSubBlock);
×
322
  for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
×
323
    n += tGetI64v(p + n, &pDataBlk->aSubBlock[iSubBlock].offset);
×
324
    n += tGetI32v(p + n, &pDataBlk->aSubBlock[iSubBlock].szBlock);
×
325
    n += tGetI32v(p + n, &pDataBlk->aSubBlock[iSubBlock].szKey);
×
326
  }
327
  if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) {
×
328
    n += tGetI64v(p + n, &pDataBlk->smaInfo.offset);
×
329
    n += tGetI32v(p + n, &pDataBlk->smaInfo.size);
×
330
  } else {
331
    pDataBlk->smaInfo.offset = 0;
×
332
    pDataBlk->smaInfo.size = 0;
×
333
  }
334

335
  return n;
×
336
}
337

338
#ifdef BUILD_NO_CALL
339
int32_t tDataBlkCmprFn(const void *p1, const void *p2) {
340
  SDataBlk *pBlock1 = (SDataBlk *)p1;
341
  SDataBlk *pBlock2 = (SDataBlk *)p2;
342

343
  if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) {
344
    return -1;
345
  } else if (tsdbKeyCmprFn(&pBlock1->minKey, &pBlock2->maxKey) > 0) {
346
    return 1;
347
  }
348

349
  return 0;
350
}
351

352
bool tDataBlkHasSma(SDataBlk *pDataBlk) {
353
  if (pDataBlk->nSubBlock > 1) return false;
354
  if (pDataBlk->hasDup) return false;
355

356
  return pDataBlk->smaInfo.size > 0;
357
}
358

359
// SSttBlk ======================================================
360
int32_t tPutSttBlk(uint8_t *p, void *ph) {
361
  int32_t  n = 0;
362
  SSttBlk *pSttBlk = (SSttBlk *)ph;
363

364
  n += tPutI64(p ? p + n : p, pSttBlk->suid);
365
  n += tPutI64(p ? p + n : p, pSttBlk->minUid);
366
  n += tPutI64(p ? p + n : p, pSttBlk->maxUid);
367
  n += tPutI64v(p ? p + n : p, pSttBlk->minKey);
368
  n += tPutI64v(p ? p + n : p, pSttBlk->maxKey);
369
  n += tPutI64v(p ? p + n : p, pSttBlk->minVer);
370
  n += tPutI64v(p ? p + n : p, pSttBlk->maxVer);
371
  n += tPutI32v(p ? p + n : p, pSttBlk->nRow);
372
  n += tPutI64v(p ? p + n : p, pSttBlk->bInfo.offset);
373
  n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szBlock);
374
  n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szKey);
375

376
  return n;
377
}
378
#endif
379

380
int32_t tGetSttBlk(uint8_t *p, void *ph) {
×
381
  int32_t  n = 0;
×
382
  SSttBlk *pSttBlk = (SSttBlk *)ph;
×
383

384
  n += tGetI64(p + n, &pSttBlk->suid);
×
385
  n += tGetI64(p + n, &pSttBlk->minUid);
×
386
  n += tGetI64(p + n, &pSttBlk->maxUid);
×
387
  n += tGetI64v(p + n, &pSttBlk->minKey);
×
388
  n += tGetI64v(p + n, &pSttBlk->maxKey);
×
389
  n += tGetI64v(p + n, &pSttBlk->minVer);
×
390
  n += tGetI64v(p + n, &pSttBlk->maxVer);
×
391
  n += tGetI32v(p + n, &pSttBlk->nRow);
×
392
  n += tGetI64v(p + n, &pSttBlk->bInfo.offset);
×
393
  n += tGetI32v(p + n, &pSttBlk->bInfo.szBlock);
×
394
  n += tGetI32v(p + n, &pSttBlk->bInfo.szKey);
×
395

396
  return n;
×
397
}
398

399
// SBlockCol ======================================================
400

401
static const int32_t BLOCK_WITH_ALG_VER = 2;
402

403
int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) {
6,133,340✔
404
  int32_t code;
405

406
  if ((code = tBufferPutI16v(buffer, pBlockCol->cid))) return code;
12,266,728!
407
  if ((code = tBufferPutI8(buffer, pBlockCol->type))) return code;
12,266,776!
408
  if ((code = tBufferPutI8(buffer, pBlockCol->cflag))) return code;
12,266,776!
409
  if ((code = tBufferPutI8(buffer, pBlockCol->flag))) return code;
12,266,776!
410
  if ((code = tBufferPutI32v(buffer, pBlockCol->szOrigin))) return code;
12,266,776!
411

412
  if (pBlockCol->flag != HAS_NULL) {
6,133,388✔
413
    if (pBlockCol->flag != HAS_VALUE) {
5,896,583✔
414
      if ((code = tBufferPutI32v(buffer, pBlockCol->szBitmap))) return code;
1,554,560!
415
    }
416

417
    if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
5,896,583!
418
      if ((code = tBufferPutI32v(buffer, pBlockCol->szOffset))) return code;
1,935,088!
419
    }
420

421
    if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) {
5,896,583✔
422
      if ((code = tBufferPutI32v(buffer, pBlockCol->szValue))) return code;
11,791,628!
423
    }
424

425
    if ((code = tBufferPutI32v(buffer, pBlockCol->offset))) return code;
11,793,166!
426
  }
427
  if (ver >= BLOCK_WITH_ALG_VER) {
6,133,388!
428
    if ((code = tBufferPutU32(buffer, pBlockCol->alg))) return code;
12,268,518!
429
  } else {
430
    if ((code = tBufferPutU32(buffer, defaultCmprAlg))) return code;
×
431
  }
432
  return 0;
6,133,388✔
433
}
434

435
int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) {
37,330,931✔
436
  int32_t code;
437

438
  if ((code = tBufferGetI16v(br, &pBlockCol->cid))) return code;
37,330,931!
439
  if ((code = tBufferGetI8(br, &pBlockCol->type))) return code;
37,335,431!
440
  if ((code = tBufferGetI8(br, &pBlockCol->cflag))) return code;
37,334,346!
441
  if ((code = tBufferGetI8(br, &pBlockCol->flag))) return code;
37,331,669!
442
  if ((code = tBufferGetI32v(br, &pBlockCol->szOrigin))) return code;
37,331,601!
443

444
  pBlockCol->szBitmap = 0;
37,330,349✔
445
  pBlockCol->szOffset = 0;
37,330,349✔
446
  pBlockCol->szValue = 0;
37,330,349✔
447
  pBlockCol->offset = 0;
37,330,349✔
448

449
  if (pBlockCol->flag != HAS_NULL) {
37,330,349✔
450
    if (pBlockCol->flag != HAS_VALUE) {
36,646,815✔
451
      if ((code = tBufferGetI32v(br, &pBlockCol->szBitmap))) return code;
1,092,167!
452
    }
453

454
    if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
36,646,847!
455
      if ((code = tBufferGetI32v(br, &pBlockCol->szOffset))) return code;
3,935,579!
456
    }
457

458
    if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) {
36,650,181✔
459
      if ((code = tBufferGetI32v(br, &pBlockCol->szValue))) return code;
36,646,898!
460
    }
461

462
    if ((code = tBufferGetI32v(br, &pBlockCol->offset))) return code;
36,650,674!
463
  }
464

465
  if (ver >= BLOCK_WITH_ALG_VER) {
37,330,407!
466
    if ((code = tBufferGetU32(br, &pBlockCol->alg))) return code;
37,332,455!
467
  } else {
468
    pBlockCol->alg = defaultCmprAlg;
×
469
  }
470

471
  return 0;
37,330,723✔
472
}
473

474
#ifdef BUILD_NO_CALL
475
int32_t tBlockColCmprFn(const void *p1, const void *p2) {
476
  if (((SBlockCol *)p1)->cid < ((SBlockCol *)p2)->cid) {
477
    return -1;
478
  } else if (((SBlockCol *)p1)->cid > ((SBlockCol *)p2)->cid) {
479
    return 1;
480
  }
481

482
  return 0;
483
}
484

485
// SDelIdx ======================================================
486
int32_t tCmprDelIdx(void const *lhs, void const *rhs) {
487
  SDelIdx *lDelIdx = (SDelIdx *)lhs;
488
  SDelIdx *rDelIdx = (SDelIdx *)rhs;
489

490
  if (lDelIdx->suid < rDelIdx->suid) {
491
    return -1;
492
  } else if (lDelIdx->suid > rDelIdx->suid) {
493
    return 1;
494
  }
495

496
  if (lDelIdx->uid < rDelIdx->uid) {
497
    return -1;
498
  } else if (lDelIdx->uid > rDelIdx->uid) {
499
    return 1;
500
  }
501

502
  return 0;
503
}
504

505
int32_t tPutDelIdx(uint8_t *p, void *ph) {
506
  SDelIdx *pDelIdx = (SDelIdx *)ph;
507
  int32_t  n = 0;
508

509
  n += tPutI64(p ? p + n : p, pDelIdx->suid);
510
  n += tPutI64(p ? p + n : p, pDelIdx->uid);
511
  n += tPutI64v(p ? p + n : p, pDelIdx->offset);
512
  n += tPutI64v(p ? p + n : p, pDelIdx->size);
513

514
  return n;
515
}
516
#endif
517

518
int32_t tGetDelIdx(uint8_t *p, void *ph) {
×
519
  SDelIdx *pDelIdx = (SDelIdx *)ph;
×
520
  int32_t  n = 0;
×
521

522
  n += tGetI64(p + n, &pDelIdx->suid);
×
523
  n += tGetI64(p + n, &pDelIdx->uid);
×
524
  n += tGetI64v(p + n, &pDelIdx->offset);
×
525
  n += tGetI64v(p + n, &pDelIdx->size);
×
526

527
  return n;
×
528
}
529

530
#ifdef BUILD_NO_CALL
531
// SDelData ======================================================
532
int32_t tPutDelData(uint8_t *p, void *ph) {
533
  SDelData *pDelData = (SDelData *)ph;
534
  int32_t   n = 0;
535

536
  n += tPutI64v(p ? p + n : p, pDelData->version);
537
  n += tPutI64(p ? p + n : p, pDelData->sKey);
538
  n += tPutI64(p ? p + n : p, pDelData->eKey);
539

540
  return n;
541
}
542
#endif
543

544
int32_t tGetDelData(uint8_t *p, void *ph) {
×
545
  SDelData *pDelData = (SDelData *)ph;
×
546
  int32_t   n = 0;
×
547

548
  n += tGetI64v(p + n, &pDelData->version);
×
549
  n += tGetI64(p + n, &pDelData->sKey);
×
550
  n += tGetI64(p + n, &pDelData->eKey);
×
551

552
  return n;
×
553
}
554

555
int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision) {
8,180,627✔
556
  int64_t fid;
557
  if (key < 0) {
8,180,627✔
558
    fid = ((key + 1) / tsTickPerMin[precision] / minutes - 1);
20,813✔
559
    return (fid < INT32_MIN) ? INT32_MIN : (int32_t)fid;
20,813✔
560
  } else {
561
    fid = ((key / tsTickPerMin[precision] / minutes));
8,159,814✔
562
    return (fid > INT32_MAX) ? INT32_MAX : (int32_t)fid;
8,159,814!
563
  }
564
}
565

566
void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey) {
17,294,386✔
567
  *minKey = tsTickPerMin[precision] * fid * minutes;
17,294,386✔
568
  *maxKey = *minKey + tsTickPerMin[precision] * minutes - 1;
17,294,386✔
569
}
17,294,386✔
570

571
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t nowSec) {
461,472✔
572
  int32_t aFid[3];
573
  TSKEY   key;
574

575
  if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
461,472✔
576
    nowSec = nowSec * 1000;
461,406✔
577
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
66✔
578
    nowSec = nowSec * 1000000l;
56✔
579
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
10!
580
    nowSec = nowSec * 1000000000l;
59✔
581
  } else {
582
    tsdbError("invalid time precision:%d", pKeepCfg->precision);
×
583
    return 0;
×
584
  }
585

586
  nowSec = nowSec - pKeepCfg->keepTimeOffset * tsTickPerHour[pKeepCfg->precision];
461,521✔
587

588
  key = nowSec - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision];
461,521✔
589
  aFid[0] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
461,521✔
590
  key = nowSec - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision];
461,549✔
591
  aFid[1] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
461,549✔
592
  key = nowSec - pKeepCfg->keep2 * tsTickPerMin[pKeepCfg->precision];
461,563✔
593
  aFid[2] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
461,563✔
594

595
  if (fid >= aFid[0]) {
461,568✔
596
    return 0;
461,397✔
597
  } else if (fid >= aFid[1]) {
171✔
598
    return 1;
57✔
599
  } else if (fid >= aFid[2]) {
114✔
600
    return 2;
110✔
601
  } else {
602
    return -1;
4✔
603
  }
604
}
605

606
// TSDBROW ======================================================
607
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
2,147,483,647✔
608
  STColumn *pTColumn = &pTSchema->columns[iCol];
2,147,483,647✔
609
  SValue    value;
610

611
  if (pRow->type == TSDBROW_ROW_FMT) {
2,147,483,647✔
612
    int32_t ret = tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal);
2,147,483,647✔
613
    if (ret != 0) {
2,147,483,647!
614
      tsdbError("failed to get column value, code:%d", ret);
×
615
    }
616
  } else if (pRow->type == TSDBROW_COL_FMT) {
2,147,483,647!
617
    if (iCol == 0) {
2,147,483,647!
618
      SValue val = {.type = TSDB_DATA_TYPE_TIMESTAMP};
×
619
      VALUE_SET_TRIVIAL_DATUM(&val, pRow->pBlockData->aTSKEY[pRow->iRow]);
×
UNCOV
620
      *pColVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, val);
×
621
    } else {
622
      SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId);
2,147,483,647✔
623

624
      if (pColData) {
2,147,483,647✔
625
        if (tColDataGetValue(pColData, pRow->iRow, pColVal) != 0) {
376,295,791!
626
          tsdbError("failed to tColDataGetValue");
×
627
        }
628
      } else {
629
        *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
2,147,483,647✔
630
      }
631
    }
632
  }
633
}
2,147,483,647✔
634

635
void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) {
2,147,483,647✔
636
  if (row->type == TSDBROW_ROW_FMT) {
2,147,483,647✔
637
    key->version = row->version;
2,147,483,647✔
638
    tRowGetKey(row->pTSRow, &key->key);
2,147,483,647✔
639
  } else {
640
    key->version = row->pBlockData->aVersion[row->iRow];
2,147,483,647✔
641
    tColRowGetKey(row->pBlockData, row->iRow, &key->key);
2,147,483,647!
642
  }
643
}
2,147,483,647✔
644

645
void tColRowGetPrimaryKey(SBlockData *pBlock, int32_t irow, SRowKey *key) {
2,147,483,647✔
646
  for (int32_t i = 0; i < pBlock->nColData; i++) {
2,147,483,647✔
647
    SColData *pColData = &pBlock->aColData[i];
2,147,483,647✔
648
    if (pColData->cflag & COL_IS_KEY) {
2,147,483,647✔
649
      SColVal cv;
650
      if (tColDataGetValue(pColData, irow, &cv) != 0) {
1,948,031,807!
651
        break;
×
652
      }
653
      key->pks[key->numOfPKs] = cv.value;
1,950,021,962✔
654
      key->numOfPKs++;
1,950,021,962✔
655
    } else {
656
      break;
2,147,483,647✔
657
    }
658
  }
659
}
2,147,483,647✔
660

661
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2) {
1,391,595,886✔
662
  int32_t c = tRowKeyCompare(&key1->key, &key2->key);
1,391,595,886✔
663

664
  if (c) {
1,391,657,234✔
665
    return c;
1,352,866,569✔
666
  }
667

668
  if (key1->version < key2->version) {
38,790,665✔
669
    return -1;
33,532,810✔
670
  } else if (key1->version > key2->version) {
5,257,855!
671
    return 1;
5,518,650✔
672
  }
673
  return 0;
×
674
}
675

676
int32_t tsdbRowCompare(const void *p1, const void *p2) {
158,612,131✔
677
  STsdbRowKey key1, key2;
678

679
  tsdbRowGetKey((TSDBROW *)p1, &key1);
158,612,131✔
680
  tsdbRowGetKey((TSDBROW *)p2, &key2);
158,612,066✔
681
  return tsdbRowKeyCmpr(&key1, &key2);
158,578,099✔
682
}
683

684
int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2) {
2,097,938,501✔
685
  STsdbRowKey key1, key2;
686

687
  tsdbRowGetKey((TSDBROW *)p1, &key1);
2,097,938,501✔
688
  tsdbRowGetKey((TSDBROW *)p2, &key2);
2,098,326,552✔
689
  return tRowKeyCompare(&key1.key, &key2.key);
2,097,469,673✔
690
}
691

692
// STSDBRowIter ======================================================
693
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
889,647✔
694
  pIter->pRow = pRow;
889,647✔
695
  if (pRow->type == TSDBROW_ROW_FMT) {
889,647✔
696
    int32_t code = tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter);
888,839✔
697
    if (code) return code;
914,803!
698
  } else if (pRow->type == TSDBROW_COL_FMT) {
808!
699
    pIter->iColData = 0;
1,204✔
700
  }
701

702
  return 0;
915,611✔
703
}
704

705
void tsdbRowClose(STSDBRowIter *pIter) {
875,807✔
706
  if (pIter->pRow && pIter->pRow->type == TSDBROW_ROW_FMT) {
875,807!
707
    tRowIterClose(&pIter->pIter);
875,101✔
708
  }
709
  pIter->pRow = NULL;
919,494✔
710
  pIter->pIter = NULL;
919,494✔
711
}
919,494✔
712

713
SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
10,916,090✔
714
  if (pIter->pRow->type == TSDBROW_ROW_FMT) {
10,916,090!
715
    return tRowIterNext(pIter->pIter);
10,936,187✔
716
  } else if (pIter->pRow->type == TSDBROW_COL_FMT) {
×
717
    if (pIter->iColData == 0) {
4,814✔
718
      SValue val = {.type = TSDB_DATA_TYPE_TIMESTAMP};
1,204✔
719
      VALUE_SET_TRIVIAL_DATUM(&val, pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]);
1,204✔
720
      pIter->cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, val);
1,204✔
721
      ++pIter->iColData;
1,204✔
722
      return &pIter->cv;
1,204✔
723
    }
724

725
    if (pIter->iColData <= pIter->pRow->pBlockData->nColData) {
3,610✔
726
      if (tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv) !=
2,408!
727
          0) {
728
        return NULL;
×
729
      }
730
      ++pIter->iColData;
2,407✔
731
      return &pIter->cv;
2,407✔
732
    } else {
733
      return NULL;
1,202✔
734
    }
735
  } else {
736
    tsdbError("invalid row type:%d", pIter->pRow->type);
×
737
    return NULL;
×
738
  }
739
}
740

741
// SRowMerger ======================================================
742
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
287,602,270✔
743
  int32_t   code = 0;
287,602,270✔
744
  TSDBKEY   key = TSDBROW_KEY(pRow);
287,602,270✔
745
  SColVal  *pColVal = &(SColVal){0};
287,602,270✔
746
  STColumn *pTColumn;
747
  int32_t   iCol, jCol = 1;
287,602,270✔
748

749
  if (NULL == pTSchema) {
287,602,270✔
750
    pTSchema = pMerger->pTSchema;
151,314,538✔
751
  }
752

753
  if (taosArrayGetSize(pMerger->pArray) == 0) {
287,602,270✔
754
    // ts
755
    jCol = 0;
276,621,710✔
756
    pTColumn = &pTSchema->columns[jCol++];
276,621,710✔
757
    SValue val = {.type = pTColumn->type};
276,621,710✔
758
    VALUE_SET_TRIVIAL_DATUM(&val, key.ts);
276,621,710✔
759
    *pColVal = COL_VAL_VALUE(pTColumn->colId, val);
276,621,710✔
760
    if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
553,917,364!
761
      code = terrno;
×
UNCOV
762
      return code;
×
763
    }
764

765
    // other
766
    for (iCol = 1; jCol < pTSchema->numOfCols && iCol < pMerger->pTSchema->numOfCols; ++iCol) {
2,147,483,647✔
767
      pTColumn = &pMerger->pTSchema->columns[iCol];
2,147,483,647✔
768
      if (pTSchema->columns[jCol].colId < pTColumn->colId) {
2,147,483,647!
769
        ++jCol;
×
770
        --iCol;
×
UNCOV
771
        continue;
×
772
      } else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
2,147,483,647!
773
        if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) {
×
UNCOV
774
          return terrno;
×
775
        }
UNCOV
776
        continue;
×
777
      }
778

779
      tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
2,147,483,647✔
780
      bool usepData = IS_VAR_DATA_TYPE(pColVal->value.type) || pColVal->value.type == TSDB_DATA_TYPE_DECIMAL;
2,147,483,647!
781
      if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && usepData) {
2,147,483,647!
782
        uint8_t *pVal = pColVal->value.pData;
302,972,253✔
783

784
        pColVal->value.pData = NULL;
302,972,253✔
785
        code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
302,972,253!
786
        if (code) {
302,880,178!
UNCOV
787
          return code;
×
788
        }
789

790
        if (pColVal->value.nData) {
302,880,178✔
791
          memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
302,847,807✔
792
        }
793
      }
794

795
      if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
2,147,483,647!
UNCOV
796
        return terrno;
×
797
      }
798
    }
799

800
    for (; iCol < pMerger->pTSchema->numOfCols; ++iCol) {
294,894,508✔
801
      pTColumn = &pMerger->pTSchema->columns[iCol];
4✔
802
      if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) {
8!
UNCOV
803
        return terrno;
×
804
      }
805
    }
806

807
    pMerger->version = key.version;
294,894,504✔
808
    return 0;
294,894,504✔
809
  } else {
810
    for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
190,152,475✔
811
      pTColumn = &pMerger->pTSchema->columns[iCol];
179,141,946✔
812
      if (pTSchema->columns[jCol].colId < pTColumn->colId) {
179,141,946!
813
        ++jCol;
×
UNCOV
814
        --iCol;
×
815
        continue;
×
816
      } else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
179,141,946!
UNCOV
817
        continue;
×
818
      }
819

820
      tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
179,141,946✔
821

822
      if (key.version > pMerger->version) {
178,942,537✔
823
        if (!COL_VAL_IS_NONE(pColVal)) {
177,116,616✔
824
          if (IS_VAR_DATA_TYPE(pColVal->value.type) || pColVal->value.type == TSDB_DATA_TYPE_DECIMAL) {
105,566,476!
825
            SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
17,110,178✔
826
            if (!pTColVal) return terrno;
17,272,358!
827
            if (!COL_VAL_IS_NULL(pColVal)) {
17,274,152✔
828
              code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
17,016,626✔
829
              if (code) return code;
17,024,415!
830

831
              pTColVal->value.nData = pColVal->value.nData;
17,024,415✔
832
              if (pTColVal->value.nData) {
17,024,415✔
833
                memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
16,991,637✔
834
              }
835
              pTColVal->flag = 0;
17,024,415✔
836
            } else {
837
              tFree(pTColVal->value.pData);
257,526!
838
              taosArraySet(pMerger->pArray, iCol, pColVal);
257,542✔
839
            }
840
          } else {
841
            taosArraySet(pMerger->pArray, iCol, pColVal);
71,185,515✔
842
          }
843
        }
844
      } else if (key.version < pMerger->version) {
1,825,921!
845
        SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
1,825,921✔
846
        if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
1,825,910!
847
          bool usepData = IS_VAR_DATA_TYPE(pColVal->value.type) || pColVal->value.type == TSDB_DATA_TYPE_DECIMAL;
×
UNCOV
848
          if ((!COL_VAL_IS_NULL(pColVal)) && usepData) {
×
849
            code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
×
850
            if (code) return code;
×
851

UNCOV
852
            tColVal->value.nData = pColVal->value.nData;
×
853
            if (pColVal->value.nData) {
×
UNCOV
854
              memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
×
855
            }
UNCOV
856
            tColVal->flag = 0;
×
857
          } else {
UNCOV
858
            taosArraySet(pMerger->pArray, iCol, pColVal);
×
859
          }
860
        }
861
      } else {
UNCOV
862
        return TSDB_CODE_INVALID_PARA;
×
863
      }
864
    }
865

866
    pMerger->version = key.version;
11,010,529✔
867
    return code;
11,010,529✔
868
  }
869
}
870

871
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pSchema) {
4,634,963✔
872
  pMerger->pTSchema = pSchema;
4,634,963✔
873
  pMerger->pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
4,634,963✔
874
  if (pMerger->pArray == NULL) {
4,639,345!
UNCOV
875
    return terrno;
×
876
  } else {
877
    return TSDB_CODE_SUCCESS;
4,639,609✔
878
  }
879
}
880

881
void tsdbRowMergerClear(SRowMerger *pMerger) {
277,334,290✔
882
  for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
2,147,483,647✔
883
    SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
2,147,483,647✔
884
    if (IS_VAR_DATA_TYPE(pTColVal->value.type) || pTColVal->value.type == TSDB_DATA_TYPE_DECIMAL) {
2,147,483,647!
885
      tFree(pTColVal->value.pData);
972,991,520!
886
    }
887
  }
888

889
  taosArrayClear(pMerger->pArray);
270,130,974✔
890
}
276,054,041✔
891

892
void tsdbRowMergerCleanup(SRowMerger *pMerger) {
4,685,588✔
893
  int32_t numOfCols = taosArrayGetSize(pMerger->pArray);
4,685,588✔
894
  for (int32_t iCol = 1; iCol < numOfCols; iCol++) {
4,686,439!
UNCOV
895
    SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
×
UNCOV
896
    if (IS_VAR_DATA_TYPE(pTColVal->value.type) || pTColVal->value.type == TSDB_DATA_TYPE_DECIMAL) {
×
UNCOV
897
      tFree(pTColVal->value.pData);
×
898
    }
899
  }
900

901
  taosArrayDestroy(pMerger->pArray);
4,686,439✔
902
}
4,686,958✔
903

904
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) {
276,618,295✔
905
  return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow);
276,618,295✔
906
}
907

908
// delete skyline ======================================================
909
static void tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pSkyline) {
862,730✔
910
  int32_t  i1 = 0;
862,730✔
911
  int32_t  n1 = taosArrayGetSize(pSkyline1);
862,730✔
912
  int32_t  i2 = 0;
862,731✔
913
  int32_t  n2 = taosArrayGetSize(pSkyline2);
862,731✔
914
  TSDBKEY *pKey1;
915
  TSDBKEY *pKey2;
916
  int64_t  version1 = 0;
862,727✔
917
  int64_t  version2 = 0;
862,727✔
918

919
  taosArrayClear(pSkyline);
862,727✔
920
  TSDBKEY **pItem = TARRAY_GET_ELEM(pSkyline, 0);
862,640✔
921

922
  while (i1 < n1 && i2 < n2) {
4,156,510✔
923
    pKey1 = (TSDBKEY *)taosArrayGetP(pSkyline1, i1);
3,293,771✔
924
    pKey2 = (TSDBKEY *)taosArrayGetP(pSkyline2, i2);
3,293,751✔
925

926
    if (pKey1->ts < pKey2->ts) {
3,293,870✔
927
      version1 = pKey1->version;
2,059,832✔
928
      *pItem = pKey1;
2,059,832✔
929
      i1++;
2,059,832✔
930
    } else if (pKey1->ts > pKey2->ts) {
1,234,038✔
931
      version2 = pKey2->version;
821,301✔
932
      *pItem = pKey2;
821,301✔
933
      i2++;
821,301✔
934
    } else {
935
      version1 = pKey1->version;
412,737✔
936
      version2 = pKey2->version;
412,737✔
937
      *pItem = pKey1;
412,737✔
938
      i1++;
412,737✔
939
      i2++;
412,737✔
940
    }
941

942
    (*pItem)->version = TMAX(version1, version2);
3,293,870✔
943
    pItem++;
3,293,870✔
944
  }
945

946
  while (i1 < n1) {
1,382,958✔
947
    pKey1 = (TSDBKEY *)taosArrayGetP(pSkyline1, i1);
520,222✔
948
    *pItem = pKey1;
520,219✔
949
    pItem++;
520,219✔
950
    i1++;
520,219✔
951
  }
952

953
  while (i2 < n2) {
2,158,588✔
954
    pKey2 = (TSDBKEY *)taosArrayGetP(pSkyline2, i2);
1,295,864✔
955
    *pItem = pKey2;
1,295,852✔
956
    pItem++;
1,295,852✔
957
    i2++;
1,295,852✔
958
  }
959

960
  pSkyline->size = TARRAY_ELEM_IDX(pSkyline, pItem);
862,724✔
961
}
862,724✔
962

963
int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) {
1,964,315✔
964
  int32_t   code = 0;
1,964,315✔
965
  SDelData *pDelData;
966
  int32_t   midx;
967

968
  taosArrayClear(pSkyline);
1,964,315✔
969
  if (sidx == eidx) {
1,964,307✔
970
    TSDBKEY *pItem1 = taosArrayGet(aSkyline, sidx * 2);
1,101,637✔
971
    TSDBKEY *pItem2 = taosArrayGet(aSkyline, sidx * 2 + 1);
1,101,643✔
972
    if (taosArrayPush(pSkyline, &pItem1) == NULL) {
1,101,663!
UNCOV
973
      return terrno;
×
974
    }
975

976
    if (taosArrayPush(pSkyline, &pItem2) == NULL) {
1,101,662!
UNCOV
977
      return terrno;
×
978
    }
979
  } else {
980
    SArray *pSkyline1 = NULL;
862,670✔
981
    SArray *pSkyline2 = NULL;
862,670✔
982
    midx = (sidx + eidx) / 2;
862,670✔
983

984
    pSkyline1 = taosArrayInit((midx - sidx + 1) * 2, POINTER_BYTES);
862,670✔
985
    if (pSkyline1 == NULL) {
862,747!
UNCOV
986
      return terrno;
×
987
    }
988
    pSkyline2 = taosArrayInit((eidx - midx) * 2, POINTER_BYTES);
862,747✔
989
    if (pSkyline2 == NULL) {
862,745!
UNCOV
990
      taosArrayDestroy(pSkyline1);
×
UNCOV
991
      return terrno;
×
992
    }
993

994
    code = tsdbBuildDeleteSkylineImpl(aSkyline, sidx, midx, pSkyline1);
862,745✔
995
    if (code) goto _clear;
862,734!
996

997
    code = tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2);
862,734✔
998
    if (code) goto _clear;
862,734!
999

1000
    tsdbMergeSkyline(pSkyline1, pSkyline2, pSkyline);
862,734✔
1001

1002
  _clear:
862,740✔
1003
    taosArrayDestroy(pSkyline1);
862,740✔
1004
    taosArrayDestroy(pSkyline2);
862,739✔
1005
  }
1006

1007
  return code;
1,964,320✔
1008
}
1009

1010
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
238,932✔
1011
  SDelData *pDelData;
1012
  int32_t   code = 0;
238,932✔
1013
  int32_t   dataNum = eidx - sidx + 1;
238,932✔
1014
  SArray   *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY));
238,932✔
1015
  if (aTmpSkyline == NULL) {
238,934!
UNCOV
1016
    return terrno;
×
1017
  }
1018

1019
  SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES);
238,934✔
1020
  if (pSkyline == NULL) {
238,929!
UNCOV
1021
    taosArrayDestroy(aTmpSkyline);
×
UNCOV
1022
    return terrno;
×
1023
  }
1024

1025
  taosArrayClear(aSkyline);
238,929✔
1026
  for (int32_t i = sidx; i <= eidx; ++i) {
1,340,606✔
1027
    pDelData = (SDelData *)taosArrayGet(aDelData, i);
1,101,660✔
1028
    if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}) == NULL) {
2,203,327!
UNCOV
1029
      code = terrno;
×
UNCOV
1030
      goto _clear;
×
1031
    }
1032

1033
    if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}) == NULL) {
2,203,348!
UNCOV
1034
      code = terrno;
×
UNCOV
1035
      goto _clear;
×
1036
    }
1037
  }
1038

1039
  code = tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline);
238,946✔
1040
  if (code) goto _clear;
238,928!
1041

1042
  int32_t skylineNum = taosArrayGetSize(pSkyline);
238,928✔
1043
  for (int32_t i = 0; i < skylineNum; ++i) {
2,029,444✔
1044
    TSDBKEY *p = taosArrayGetP(pSkyline, i);
1,790,489✔
1045
    if (taosArrayPush(aSkyline, p) == NULL) {
1,790,513!
UNCOV
1046
      code = terrno;
×
UNCOV
1047
      goto _clear;
×
1048
    }
1049
  }
1050

1051
_clear:
238,955✔
1052
  taosArrayDestroy(aTmpSkyline);
238,955✔
1053
  taosArrayDestroy(pSkyline);
238,937✔
1054

1055
  return code;
238,937✔
1056
}
1057

1058
/*
1059
int32_t tsdbBuildDeleteSkyline2(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
1060
  int32_t   code = 0;
1061
  SDelData *pDelData;
1062
  int32_t   midx;
1063

1064
  taosArrayClear(aSkyline);
1065
  if (sidx == eidx) {
1066
    pDelData = (SDelData *)taosArrayGet(aDelData, sidx);
1067
    taosArrayPush(aSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version});
1068
    taosArrayPush(aSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0});
1069
  } else {
1070
    SArray *aSkyline1 = NULL;
1071
    SArray *aSkyline2 = NULL;
1072

1073
    aSkyline1 = taosArrayInit(0, sizeof(TSDBKEY));
1074
    aSkyline2 = taosArrayInit(0, sizeof(TSDBKEY));
1075
    if (aSkyline1 == NULL || aSkyline2 == NULL) {
1076
      code = TSDB_CODE_OUT_OF_MEMORY;
1077
      goto _clear;
1078
    }
1079
    midx = (sidx + eidx) / 2;
1080

1081
    code = tsdbBuildDeleteSkyline(aDelData, sidx, midx, aSkyline1);
1082
    if (code) goto _clear;
1083

1084
    code = tsdbBuildDeleteSkyline(aDelData, midx + 1, eidx, aSkyline2);
1085
    if (code) goto _clear;
1086

1087
    code = tsdbMergeSkyline(aSkyline1, aSkyline2, aSkyline);
1088

1089
  _clear:
1090
    taosArrayDestroy(aSkyline1);
1091
    taosArrayDestroy(aSkyline2);
1092
  }
1093

1094
  return code;
1095
}
1096
*/
1097

1098
// SBlockData ======================================================
1099
int32_t tBlockDataCreate(SBlockData *pBlockData) {
14,734,854✔
1100
  pBlockData->suid = 0;
14,734,854✔
1101
  pBlockData->uid = 0;
14,734,854✔
1102
  pBlockData->nRow = 0;
14,734,854✔
1103
  pBlockData->aUid = NULL;
14,734,854✔
1104
  pBlockData->aVersion = NULL;
14,734,854✔
1105
  pBlockData->aTSKEY = NULL;
14,734,854✔
1106
  pBlockData->nColData = 0;
14,734,854✔
1107
  pBlockData->aColData = NULL;
14,734,854✔
1108
  return 0;
14,734,854✔
1109
}
1110

1111
void tBlockDataDestroy(SBlockData *pBlockData) {
16,027,208✔
1112
  tFree(pBlockData->aUid);
16,027,208!
1113
  tFree(pBlockData->aVersion);
16,027,426!
1114
  tFree(pBlockData->aTSKEY);
16,027,440✔
1115

1116
  for (int32_t i = 0; i < pBlockData->nColData; i++) {
31,044,439✔
1117
    tColDataDestroy(&pBlockData->aColData[i]);
15,017,165✔
1118
  }
1119

1120
  if (pBlockData->aColData) {
16,027,274✔
1121
    taosMemoryFree(pBlockData->aColData);
3,627,276!
1122
    pBlockData->aColData = NULL;
3,627,332✔
1123
  }
1124
}
16,027,330✔
1125

1126
static int32_t tBlockDataAdjustColData(SBlockData *pBlockData, int32_t nColData) {
845,297✔
1127
  int32_t code = 0;
845,297✔
1128

1129
  if (pBlockData->nColData > nColData) {
845,297✔
1130
    for (int32_t i = nColData; i < pBlockData->nColData; i++) {
402,951✔
1131
      tColDataDestroy(&pBlockData->aColData[i]);
202,331✔
1132
    }
1133
  } else if (pBlockData->nColData < nColData) {
644,677✔
1134
    SColData *aColData = taosMemoryRealloc(pBlockData->aColData, sizeof(SBlockData) * nColData);
573,047!
1135
    if (aColData == NULL) {
573,091!
UNCOV
1136
      code = terrno;
×
UNCOV
1137
      goto _exit;
×
1138
    }
1139

1140
    pBlockData->aColData = aColData;
573,091✔
1141
    memset(&pBlockData->aColData[pBlockData->nColData], 0, sizeof(SBlockData) * (nColData - pBlockData->nColData));
573,091✔
1142
  }
1143
  pBlockData->nColData = nColData;
845,341✔
1144

1145
_exit:
845,341✔
1146
  return code;
845,341✔
1147
}
1148
int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid) {
845,317✔
1149
  int32_t code = 0;
845,317✔
1150

1151
  if (!pId->suid && !pId->uid) {
845,317!
UNCOV
1152
    return TSDB_CODE_INVALID_PARA;
×
1153
  }
1154

1155
  pBlockData->suid = pId->suid;
845,317✔
1156
  pBlockData->uid = pId->uid;
845,317✔
1157
  pBlockData->nRow = 0;
845,317✔
1158

1159
  if (aCid) {
845,317!
1160
    code = tBlockDataAdjustColData(pBlockData, nCid);
×
1161
    if (code) goto _exit;
×
1162

UNCOV
1163
    int32_t   iColumn = 1;
×
UNCOV
1164
    STColumn *pTColumn = &pTSchema->columns[iColumn];
×
1165
    for (int32_t iCid = 0; iCid < nCid; iCid++) {
×
1166
      // aCid array (from taos client catalog) contains columns that does not exist in the pTSchema. the pTSchema is
1167
      // newer
UNCOV
1168
      if (pTColumn == NULL) {
×
1169
        continue;
×
1170
      }
1171

1172
      while (pTColumn->colId < aCid[iCid]) {
×
UNCOV
1173
        iColumn++;
×
1174
        if (!(iColumn < pTSchema->numOfCols)) {
×
UNCOV
1175
          return TSDB_CODE_INVALID_PARA;
×
1176
        }
1177
        pTColumn = &pTSchema->columns[iColumn];
×
1178
      }
1179

UNCOV
1180
      if (pTColumn->colId != aCid[iCid]) {
×
1181
        continue;
×
1182
      }
1183

1184
      tColDataInit(&pBlockData->aColData[iCid], pTColumn->colId, pTColumn->type, pTColumn->flags);
×
1185

UNCOV
1186
      iColumn++;
×
UNCOV
1187
      pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL;
×
1188
    }
1189
  } else {
1190
    code = tBlockDataAdjustColData(pBlockData, pTSchema->numOfCols - 1);
845,317✔
1191
    if (code) goto _exit;
845,295!
1192

1193
    for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
6,987,986✔
1194
      STColumn *pTColumn = &pTSchema->columns[iColData + 1];
6,142,673✔
1195
      tColDataInit(&pBlockData->aColData[iColData], pTColumn->colId, pTColumn->type, pTColumn->flags);
6,142,673✔
1196
    }
1197
  }
1198

1199
_exit:
845,313✔
1200
  return code;
845,313✔
1201
}
1202

1203
void tBlockDataReset(SBlockData *pBlockData) {
13,875,380✔
1204
  pBlockData->suid = 0;
13,875,380✔
1205
  pBlockData->uid = 0;
13,875,380✔
1206
  pBlockData->nRow = 0;
13,875,380✔
1207
  for (int32_t i = 0; i < pBlockData->nColData; i++) {
17,490,270✔
1208
    tColDataDestroy(&pBlockData->aColData[i]);
3,614,819✔
1209
  }
1210
  pBlockData->nColData = 0;
13,875,451✔
1211
  taosMemoryFreeClear(pBlockData->aColData);
13,875,451✔
1212
}
13,875,457✔
1213

1214
void tBlockDataClear(SBlockData *pBlockData) {
1,197,664✔
1215
  pBlockData->nRow = 0;
1,197,664✔
1216
  for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
8,200,341✔
1217
    tColDataClear(tBlockDataGetColDataByIdx(pBlockData, iColData));
7,002,663✔
1218
  }
1219
}
1,197,678✔
1220

1221
int32_t tBlockDataAddColData(SBlockData *pBlockData, int16_t cid, int8_t type, int8_t cflag, SColData **ppColData) {
14,900,077✔
1222
  if (pBlockData->nColData != 0 && pBlockData->aColData[pBlockData->nColData - 1].cid >= cid) {
14,900,077!
UNCOV
1223
    return TSDB_CODE_INVALID_PARA;
×
1224
  }
1225

1226
  SColData *newColData = taosMemoryRealloc(pBlockData->aColData, sizeof(SColData) * (pBlockData->nColData + 1));
14,900,077!
1227
  if (newColData == NULL) {
14,900,989!
UNCOV
1228
    return terrno;
×
1229
  }
1230

1231
  pBlockData->aColData = newColData;
14,900,989✔
1232
  pBlockData->nColData++;
14,900,989✔
1233

1234
  *ppColData = &pBlockData->aColData[pBlockData->nColData - 1];
14,900,989✔
1235
  memset(*ppColData, 0, sizeof(SColData));
14,900,989✔
1236
  tColDataInit(*ppColData, cid, type, cflag);
14,900,989✔
1237

1238
  return 0;
14,901,001✔
1239
}
1240

1241
/* flag > 0: forward update
1242
 * flag == 0: insert
1243
 * flag < 0: backward update
1244
 */
1245
static int32_t tBlockDataUpsertBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow,
830,028,367✔
1246
                                        int32_t flag) {
1247
  int32_t code = 0;
830,028,367✔
1248

1249
  SColVal   cv = {0};
830,028,367✔
1250
  int32_t   iColDataFrom = 0;
830,028,367✔
1251
  SColData *pColDataFrom = (iColDataFrom < pBlockDataFrom->nColData) ? &pBlockDataFrom->aColData[iColDataFrom] : NULL;
830,028,367!
1252

1253
  for (int32_t iColDataTo = 0; iColDataTo < pBlockData->nColData; iColDataTo++) {
2,147,483,647✔
1254
    SColData *pColDataTo = &pBlockData->aColData[iColDataTo];
2,147,483,647✔
1255

1256
    while (pColDataFrom && pColDataFrom->cid < pColDataTo->cid) {
2,147,483,647✔
1257
      pColDataFrom = (++iColDataFrom < pBlockDataFrom->nColData) ? &pBlockDataFrom->aColData[iColDataFrom] : NULL;
50,037✔
1258
    }
1259

1260
    if (pColDataFrom == NULL || pColDataFrom->cid > pColDataTo->cid) {
2,147,483,647!
1261
      cv = COL_VAL_NONE(pColDataTo->cid, pColDataTo->type);
3,541,106✔
1262
      if (flag == 0 && (code = tColDataAppendValue(pColDataTo, &cv))) goto _exit;
3,541,106!
1263
    } else {
1264
      code = tColDataGetValue(pColDataFrom, iRow, &cv);
2,147,483,647✔
1265
      if (code) goto _exit;
2,147,483,647!
1266

1267
      if (flag) {
2,147,483,647✔
1268
        code = tColDataUpdateValue(pColDataTo, &cv, flag > 0);
95,409,478✔
1269
      } else {
1270
        code = tColDataAppendValue(pColDataTo, &cv);
2,147,483,647✔
1271
      }
1272
      if (code) goto _exit;
2,147,483,647!
1273

1274
      pColDataFrom = (++iColDataFrom < pBlockDataFrom->nColData) ? &pBlockDataFrom->aColData[iColDataFrom] : NULL;
2,147,483,647✔
1275
    }
1276
  }
1277

1278
_exit:
829,000,650✔
1279
  return code;
829,000,650✔
1280
}
1281

1282
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
1,438,517,665✔
1283
  int32_t code = 0;
1,438,517,665✔
1284

1285
  if (!(pBlockData->suid || pBlockData->uid)) {
1,438,517,665!
UNCOV
1286
    return TSDB_CODE_INVALID_PARA;
×
1287
  }
1288

1289
  // uid
1290
  if (pBlockData->uid == 0) {
1,438,517,665✔
1291
    if (!uid) {
651,730,035!
UNCOV
1292
      return TSDB_CODE_INVALID_PARA;
×
1293
    }
1294
    code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1));
651,730,035!
1295
    if (code) goto _exit;
652,256,194!
1296
    pBlockData->aUid[pBlockData->nRow] = uid;
652,256,194✔
1297
  }
1298
  // version
1299
  code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1));
1,439,043,824!
1300
  if (code) goto _exit;
1,440,066,903!
1301
  pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow);
1,440,066,903✔
1302
  // timestamp
1303
  code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1));
1,440,066,903✔
1304
  if (code) goto _exit;
1,440,658,793!
1305
  pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow);
1,440,658,793✔
1306

1307
  if (pRow->type == TSDBROW_ROW_FMT) {
1,440,658,793✔
1308
    code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, 0 /* append */);
643,576,556✔
1309
    if (code) goto _exit;
644,124,067!
1310
  } else if (pRow->type == TSDBROW_COL_FMT) {
797,082,237!
1311
    code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, 0 /* append */);
798,263,165✔
1312
    if (code) goto _exit;
799,133,038!
1313
  } else {
UNCOV
1314
    return TSDB_CODE_INVALID_PARA;
×
1315
  }
1316
  pBlockData->nRow++;
1,443,257,105✔
1317

1318
_exit:
1,443,257,105✔
1319
  return code;
1,443,257,105✔
1320
}
1321
int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
32,585,489✔
1322
  int32_t code = 0;
32,585,489✔
1323

1324
  // version
1325
  int64_t lversion = pBlockData->aVersion[pBlockData->nRow - 1];
32,585,489✔
1326
  int64_t rversion = TSDBROW_VERSION(pRow);
32,585,489✔
1327
  if (lversion == rversion) {
32,585,489!
UNCOV
1328
    return TSDB_CODE_INVALID_PARA;
×
1329
  }
1330
  if (rversion > lversion) {
32,585,489!
1331
    pBlockData->aVersion[pBlockData->nRow - 1] = rversion;
32,585,569✔
1332
  }
1333

1334
  // update other rows
1335
  if (pRow->type == TSDBROW_ROW_FMT) {
32,585,489✔
1336
    code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData,
1,011,996!
1337
                             (rversion > lversion) ? 1 : -1 /* update */);
1338
    if (code) goto _exit;
1,014,128!
1339
  } else if (pRow->type == TSDBROW_COL_FMT) {
31,573,493!
1340
    code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, (rversion > lversion) ? 1 : -1);
31,573,874!
1341
    if (code) goto _exit;
31,575,212!
1342
  } else {
UNCOV
1343
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1344
    goto _exit;
×
1345
  }
1346

1347
_exit:
32,588,959✔
1348
  return code;
32,588,959✔
1349
}
1350

1351
#ifdef BUILD_NO_CALL
1352
int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid) {
1353
  if (pBlockData->nRow == 0) {
1354
    return 1;
1355
  } else if (pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
1356
    return pBlockData->nRow;
1357
  } else {
1358
    return pBlockData->nRow + 1;
1359
  }
1360
}
1361

1362
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
1363
  if (pBlockData->nRow > 0 && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
1364
    return tBlockDataUpdateRow(pBlockData, pRow, pTSchema);
1365
  } else {
1366
    return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid);
1367
  }
1368
}
1369
#endif
1370

1371
SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid) {
2,147,483,647✔
1372
  int32_t lidx = 0;
2,147,483,647✔
1373
  int32_t ridx = pBlockData->nColData - 1;
2,147,483,647✔
1374

1375
  while (lidx <= ridx) {
2,147,483,647✔
1376
    int32_t   midx = (lidx + ridx) >> 1;
2,147,483,647✔
1377
    SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, midx);
2,147,483,647✔
1378
    int32_t   c = (pColData->cid == cid) ? 0 : ((pColData->cid > cid) ? 1 : -1);
2,147,483,647✔
1379

1380
    if (c == 0) {
2,147,483,647✔
1381
      return pColData;
383,891,450✔
1382
    } else if (c < 0) {
2,147,483,647✔
1383
      lidx = midx + 1;
2,147,483,647✔
1384
    } else {
1385
      ridx = midx - 1;
1,067,224,307✔
1386
    }
1387
  }
1388

1389
  return NULL;
2,147,483,647✔
1390
}
1391

1392
/* buffers[0]: SDiskDataHdr
1393
 * buffers[1]: key part: uid + version + ts + primary keys
1394
 * buffers[2]: SBlockCol part
1395
 * buffers[3]: regular column part
1396
 */
1397
int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SBuffer *assist) {
949,579✔
1398
  int32_t code = 0;
949,579✔
1399
  int32_t lino = 0;
949,579✔
1400

1401
  SColCompressInfo *pInfo = pCompr;
949,579✔
1402
  code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, 1, &pInfo->defaultCmprAlg);
949,579✔
1403
  TAOS_UNUSED(code);
1404

1405
  SDiskDataHdr hdr = {
949,634✔
1406
      .delimiter = TSDB_FILE_DLMT,
1407
      .fmtVer = 2,
1408
      .suid = bData->suid,
949,634✔
1409
      .uid = bData->uid,
949,634✔
1410
      .szUid = 0,     // filled by compress key
1411
      .szVer = 0,     // filled by compress key
1412
      .szKey = 0,     // filled by compress key
1413
      .szBlkCol = 0,  // filled by this func
1414
      .nRow = bData->nRow,
949,634✔
1415
      .cmprAlg = pInfo->defaultCmprAlg,
949,634✔
1416
      .numOfPKs = 0,  // filled by compress key
1417
  };
1418
  // Key part
1419

1420
  tBufferClear(&buffers[1]);
949,634✔
1421
  code = tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist, (SColCompressInfo *)pInfo);
949,634✔
1422
  TSDB_CHECK_CODE(code, lino, _exit);
949,646!
1423

1424
  // Regulart column part
1425
  tBufferClear(&buffers[2]);
949,646✔
1426
  tBufferClear(&buffers[3]);
949,646✔
1427
  for (int i = 0; i < bData->nColData; i++) {
7,224,161✔
1428
    SColData *colData = tBlockDataGetColDataByIdx(bData, i);
6,274,628✔
1429

1430
    if (colData->cflag & COL_IS_KEY) {
6,274,628✔
1431
      continue;
298,406✔
1432
    }
1433
    if (colData->flag == HAS_NONE) {
6,116,493✔
1434
      continue;
140,271✔
1435
    }
1436

1437
    SColDataCompressInfo cinfo = {
5,976,222✔
1438
        .cmprAlg = pInfo->defaultCmprAlg,
5,976,222✔
1439
    };
1440
    code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, colData->cid, &cinfo.cmprAlg);
5,976,222✔
1441
    if (code < 0) {
1442
      //
1443
    }
1444

1445
    int32_t offset = buffers[3].size;
5,975,284✔
1446
    code = tColDataCompress(colData, &cinfo, &buffers[3], assist);
5,975,284✔
1447
    TSDB_CHECK_CODE(code, lino, _exit);
5,975,437!
1448

1449
    SBlockCol blockCol = (SBlockCol){.cid = cinfo.columnId,
5,975,437✔
1450
                                     .type = cinfo.dataType,
5,975,437✔
1451
                                     .cflag = cinfo.columnFlag,
5,975,437✔
1452
                                     .flag = cinfo.flag,
5,975,437✔
1453
                                     .szOrigin = cinfo.dataOriginalSize,
5,975,437✔
1454
                                     .szBitmap = cinfo.bitmapCompressedSize,
5,975,437✔
1455
                                     .szOffset = cinfo.offsetCompressedSize,
5,975,437✔
1456
                                     .szValue = cinfo.dataCompressedSize,
5,975,437✔
1457
                                     .offset = offset,
1458
                                     .alg = cinfo.cmprAlg};
5,975,437✔
1459

1460
    code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer, hdr.cmprAlg);
5,975,437✔
1461
    TSDB_CHECK_CODE(code, lino, _exit);
5,976,109!
1462
  }
1463
  hdr.szBlkCol = buffers[2].size;
949,533✔
1464

1465
  // SDiskDataHdr part
1466
  tBufferClear(&buffers[0]);
1467
  code = tPutDiskDataHdr(&buffers[0], &hdr);
949,533✔
1468
  TSDB_CHECK_CODE(code, lino, _exit);
949,630!
1469

1470
_exit:
949,630✔
1471
  return code;
949,630✔
1472
}
1473

1474
int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist) {
183,813✔
1475
  int32_t       code = 0;
183,813✔
1476
  int32_t       lino = 0;
183,813✔
1477
  SDiskDataHdr  hdr = {0};
183,813✔
1478
  SCompressInfo cinfo;
1479

1480
  // SDiskDataHdr
1481
  code = tGetDiskDataHdr(br, &hdr);
183,813✔
1482
  TSDB_CHECK_CODE(code, lino, _exit);
183,811!
1483

1484
  tBlockDataReset(blockData);
183,811✔
1485
  blockData->suid = hdr.suid;
183,810✔
1486
  blockData->uid = hdr.uid;
183,810✔
1487
  blockData->nRow = hdr.nRow;
183,810✔
1488

1489
  // Key part
1490
  code = tBlockDataDecompressKeyPart(&hdr, br, blockData, assist);
183,810✔
1491
  TSDB_CHECK_CODE(code, lino, _exit);
183,813!
1492

1493
  // Column part
1494
  SBufferReader br2 = *br;
183,813✔
1495
  br->offset += hdr.szBlkCol;
183,813✔
1496
  for (uint32_t startOffset = br2.offset; br2.offset - startOffset < hdr.szBlkCol;) {
730,704✔
1497
    SBlockCol blockCol;
1498

1499
    code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer, hdr.cmprAlg);
546,899✔
1500
    TSDB_CHECK_CODE(code, lino, _exit);
546,947!
1501
    if (blockCol.alg == 0) blockCol.alg = hdr.cmprAlg;
546,947!
1502
    code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist);
546,947✔
1503
    TSDB_CHECK_CODE(code, lino, _exit);
546,891!
1504
  }
1505

1506
_exit:
183,805✔
1507
  return code;
183,805✔
1508
}
1509

1510
// SDiskDataHdr ==============================
1511
int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) {
949,675✔
1512
  int32_t code;
1513

1514
  if ((code = tBufferPutU32(buffer, pHdr->delimiter))) return code;
1,899,358!
1515
  if ((code = tBufferPutU32v(buffer, pHdr->fmtVer))) return code;
1,899,366!
1516
  if ((code = tBufferPutI64(buffer, pHdr->suid))) return code;
1,899,366!
1517
  if ((code = tBufferPutI64(buffer, pHdr->uid))) return code;
1,899,366!
1518
  if ((code = tBufferPutI32v(buffer, pHdr->szUid))) return code;
1,899,366!
1519
  if ((code = tBufferPutI32v(buffer, pHdr->szVer))) return code;
1,899,366!
1520
  if ((code = tBufferPutI32v(buffer, pHdr->szKey))) return code;
1,899,366!
1521
  if ((code = tBufferPutI32v(buffer, pHdr->szBlkCol))) return code;
1,899,366!
1522
  if ((code = tBufferPutI32v(buffer, pHdr->nRow))) return code;
1,899,366!
1523
  if (pHdr->fmtVer < 2) {
949,683!
UNCOV
1524
    if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code;
×
1525
  } else if (pHdr->fmtVer == 2) {
949,683✔
1526
    if ((code = tBufferPutU32(buffer, pHdr->cmprAlg))) return code;
1,899,258!
1527
  } else {
1528
    // more data fmt ver
1529
  }
1530
  if (pHdr->fmtVer >= 1) {
949,683✔
1531
    if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code;
1,899,236!
1532
    for (int i = 0; i < pHdr->numOfPKs; i++) {
1,107,752✔
1533
      if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) return code;
158,131!
1534
    }
1535
  }
1536

1537
  return 0;
949,686✔
1538
}
1539

1540
int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) {
6,224,275✔
1541
  int32_t code;
1542

1543
  if ((code = tBufferGetU32(br, &pHdr->delimiter))) return code;
6,224,275!
1544
  if ((code = tBufferGetU32v(br, &pHdr->fmtVer))) return code;
6,225,312!
1545
  if ((code = tBufferGetI64(br, &pHdr->suid))) return code;
6,224,619!
1546
  if ((code = tBufferGetI64(br, &pHdr->uid))) return code;
6,223,293!
1547
  if ((code = tBufferGetI32v(br, &pHdr->szUid))) return code;
6,224,372!
1548
  if ((code = tBufferGetI32v(br, &pHdr->szVer))) return code;
6,225,170!
1549
  if ((code = tBufferGetI32v(br, &pHdr->szKey))) return code;
6,225,209!
1550
  if ((code = tBufferGetI32v(br, &pHdr->szBlkCol))) return code;
6,224,740!
1551
  if ((code = tBufferGetI32v(br, &pHdr->nRow))) return code;
6,225,278!
1552
  if (pHdr->fmtVer < 2) {
6,225,337!
UNCOV
1553
    int8_t cmprAlg = 0;
×
UNCOV
1554
    if ((code = tBufferGetI8(br, &cmprAlg))) return code;
×
UNCOV
1555
    pHdr->cmprAlg = cmprAlg;
×
1556
  } else if (pHdr->fmtVer == 2) {
6,225,337✔
1557
    if ((code = tBufferGetU32(br, &pHdr->cmprAlg))) return code;
6,224,401!
1558
  } else {
1559
    // more data fmt ver
1560
  }
1561
  if (pHdr->fmtVer >= 1) {
6,225,814✔
1562
    if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code;
6,224,986!
1563
    for (int i = 0; i < pHdr->numOfPKs; i++) {
6,494,455✔
1564
      if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) {
269,866!
UNCOV
1565
        return code;
×
1566
      }
1567
    }
1568
  } else {
1569
    pHdr->numOfPKs = 0;
828✔
1570
  }
1571

1572
  return 0;
6,225,417✔
1573
}
1574

1575
// ALGORITHM ==============================
1576
int32_t tPutColumnDataAgg(SBuffer *buffer, SColumnDataAgg *pColAgg) {
592,466✔
1577
  int32_t code;
1578

1579
  if (pColAgg->colId & DECIMAL_AGG_FLAG) {
592,466✔
1580
    if ((code = tBufferPutI32v(buffer, pColAgg->colId))) return code;
992!
1581
    if ((code = tBufferPutI16v(buffer, pColAgg->numOfNull))) return code;
992!
1582
    if ((code = tBufferPutU64(buffer, pColAgg->decimal128Sum[0]))) return code;
992!
1583
    if ((code = tBufferPutU64(buffer, pColAgg->decimal128Sum[1]))) return code;
992!
1584
    if ((code = tBufferPutU64(buffer, pColAgg->decimal128Max[0]))) return code;
992!
1585
    if ((code = tBufferPutU64(buffer, pColAgg->decimal128Max[1]))) return code;
992!
1586
    if ((code = tBufferPutU64(buffer, pColAgg->decimal128Min[0]))) return code;
992!
1587
    if ((code = tBufferPutU64(buffer, pColAgg->decimal128Min[1]))) return code;
992!
1588
    if ((code = tBufferPutU8(buffer, pColAgg->overflow))) return code;
992!
1589
  } else {
1590
    if ((code = tBufferPutI32v(buffer, pColAgg->colId))) return code;
1,183,940!
1591
    if ((code = tBufferPutI16v(buffer, pColAgg->numOfNull))) return code;
1,183,940!
1592
    if ((code = tBufferPutI64(buffer, pColAgg->sum))) return code;
1,183,940!
1593
    if ((code = tBufferPutI64(buffer, pColAgg->max))) return code;
1,183,940!
1594
    if ((code = tBufferPutI64(buffer, pColAgg->min))) return code;
1,183,940!
1595
  }
1596

1597
  return 0;
592,466✔
1598
}
1599

1600
int32_t tGetColumnDataAgg(SBufferReader *br, SColumnDataAgg *pColAgg) {
5,173,869✔
1601
  int32_t code;
1602

1603
  if ((code = tBufferGetI32v(br, &pColAgg->colId))) return code;
5,173,869!
1604
  if ((code = tBufferGetI16v(br, &pColAgg->numOfNull))) return code;
5,173,869!
1605
  if (pColAgg->colId & DECIMAL_AGG_FLAG) {
5,173,868✔
1606
    pColAgg->colId &= 0xFFFF;
24,980✔
1607
    if ((code = tBufferGetU64(br, &pColAgg->decimal128Sum[0]))) return code;
24,980!
1608
    if ((code = tBufferGetU64(br, &pColAgg->decimal128Sum[1]))) return code;
24,980!
1609
    if ((code = tBufferGetU64(br, &pColAgg->decimal128Max[0]))) return code;
24,980!
1610
    if ((code = tBufferGetU64(br, &pColAgg->decimal128Max[1]))) return code;
24,980!
1611
    if ((code = tBufferGetU64(br, &pColAgg->decimal128Min[0]))) return code;
24,980!
1612
    if ((code = tBufferGetU64(br, &pColAgg->decimal128Min[1]))) return code;
24,980!
1613
    if ((code = tBufferGetU8(br, &pColAgg->overflow))) return code;
24,980!
1614
  } else {
1615
    if ((code = tBufferGetI64(br, &pColAgg->sum))) return code;
5,148,888!
1616
    if ((code = tBufferGetI64(br, &pColAgg->max))) return code;
5,148,888!
1617
    if ((code = tBufferGetI64(br, &pColAgg->min))) return code;
5,148,887!
1618
  }
1619

1620
  return 0;
5,173,863✔
1621
}
1622

1623
static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist,
949,608✔
1624
                                         SColCompressInfo *compressInfo) {
1625
  int32_t       code = 0;
949,608✔
1626
  int32_t       lino = 0;
949,608✔
1627
  SCompressInfo cinfo;
1628

1629
  // uid
1630
  if (bData->uid == 0) {
949,608✔
1631
    cinfo = (SCompressInfo){
695,508✔
1632
        .cmprAlg = hdr->cmprAlg,
695,508✔
1633
        .dataType = TSDB_DATA_TYPE_BIGINT,
1634
        .originalSize = sizeof(int64_t) * bData->nRow,
695,508✔
1635
    };
1636
    code = tCompressDataToBuffer(bData->aUid, &cinfo, buffer, assist);
695,508✔
1637
    TSDB_CHECK_CODE(code, lino, _exit);
695,537!
1638
    hdr->szUid = cinfo.compressedSize;
695,537✔
1639
  }
1640

1641
  // version
1642
  cinfo = (SCompressInfo){
949,637✔
1643
      .cmprAlg = hdr->cmprAlg,
949,637✔
1644
      .dataType = TSDB_DATA_TYPE_BIGINT,
1645
      .originalSize = sizeof(int64_t) * bData->nRow,
949,637✔
1646
  };
1647
  code = tCompressDataToBuffer((uint8_t *)bData->aVersion, &cinfo, buffer, assist);
949,637✔
1648
  TSDB_CHECK_CODE(code, lino, _exit);
949,684!
1649
  hdr->szVer = cinfo.compressedSize;
949,684✔
1650

1651
  // ts
1652
  cinfo = (SCompressInfo){
949,684✔
1653
      .cmprAlg = hdr->cmprAlg,
949,684✔
1654
      .dataType = TSDB_DATA_TYPE_TIMESTAMP,
1655
      .originalSize = sizeof(TSKEY) * bData->nRow,
949,684✔
1656
  };
1657

1658
  code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist);
949,684✔
1659
  TSDB_CHECK_CODE(code, lino, _exit);
949,638!
1660
  hdr->szKey = cinfo.compressedSize;
949,638✔
1661

1662
  // primary keys
1663
  for (hdr->numOfPKs = 0; hdr->numOfPKs < bData->nColData; hdr->numOfPKs++) {
1,107,773✔
1664
    if (!(hdr->numOfPKs <= TD_MAX_PK_COLS)) {
1,107,763!
UNCOV
1665
      return TSDB_CODE_INVALID_PARA;
×
1666
    }
1667

1668
    SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs];
1,107,763✔
1669
    SColData  *colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs);
1,107,763✔
1670

1671
    if ((colData->cflag & COL_IS_KEY) == 0) {
1,107,763✔
1672
      break;
949,639✔
1673
    }
1674

1675
    SColDataCompressInfo info = {
158,124✔
1676
        .cmprAlg = hdr->cmprAlg,
158,124✔
1677
    };
1678
    code = tsdbGetColCmprAlgFromSet(compressInfo->pColCmpr, colData->cid, &info.cmprAlg);
158,124✔
1679
    if (code < 0) {
1680
      // do nothing
1681
    } else {
1682
    }
1683

1684
    code = tColDataCompress(colData, &info, buffer, assist);
158,135✔
1685
    TSDB_CHECK_CODE(code, lino, _exit);
158,135!
1686

1687
    *blockCol = (SBlockCol){
158,135✔
1688
        .cid = info.columnId,
158,135✔
1689
        .type = info.dataType,
158,135✔
1690
        .cflag = info.columnFlag,
158,135✔
1691
        .flag = info.flag,
158,135✔
1692
        .szOrigin = info.dataOriginalSize,
158,135✔
1693
        .szBitmap = info.bitmapCompressedSize,
158,135✔
1694
        .szOffset = info.offsetCompressedSize,
158,135✔
1695
        .szValue = info.dataCompressedSize,
158,135✔
1696
        .offset = 0,
1697
        .alg = info.cmprAlg,
158,135✔
1698
    };
1699
  }
1700

1701
_exit:
10✔
1702
  return code;
949,649✔
1703
}
1704

1705
int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br,
14,899,782✔
1706
                                    SBlockData *blockData, SBuffer *assist) {
1707
  int32_t code = 0;
14,899,782✔
1708
  int32_t lino = 0;
14,899,782✔
1709

1710
  SColData *colData;
1711

1712
  code = tBlockDataAddColData(blockData, blockCol->cid, blockCol->type, blockCol->cflag, &colData);
14,899,782✔
1713
  TSDB_CHECK_CODE(code, lino, _exit);
14,900,913!
1714

1715
  SColDataCompressInfo info = {
12,782,018✔
1716
      .cmprAlg = blockCol->alg,
14,900,913✔
1717
      .columnFlag = blockCol->cflag,
14,900,913✔
1718
      .flag = blockCol->flag,
14,900,913✔
1719
      .dataType = blockCol->type,
14,900,913✔
1720
      .columnId = blockCol->cid,
14,900,913✔
1721
      .numOfData = hdr->nRow,
14,900,913✔
1722
      .bitmapOriginalSize = 0,
1723
      .bitmapCompressedSize = blockCol->szBitmap,
14,900,913✔
1724
      .offsetOriginalSize = blockCol->szOffset ? sizeof(int32_t) * hdr->nRow : 0,
14,900,913✔
1725
      .offsetCompressedSize = blockCol->szOffset,
14,900,913✔
1726
      .dataOriginalSize = blockCol->szOrigin,
14,900,913✔
1727
      .dataCompressedSize = blockCol->szValue,
14,900,913✔
1728
  };
1729

1730
  switch (blockCol->flag) {
14,900,913✔
1731
    case (HAS_NONE | HAS_NULL | HAS_VALUE):
2✔
1732
      info.bitmapOriginalSize = BIT2_SIZE(hdr->nRow);
2✔
1733
      break;
2✔
1734
    case (HAS_NONE | HAS_NULL):
503,304✔
1735
    case (HAS_NONE | HAS_VALUE):
1736
    case (HAS_NULL | HAS_VALUE):
1737
      info.bitmapOriginalSize = BIT1_SIZE(hdr->nRow);
503,304✔
1738
      break;
503,304✔
1739
  }
1740

1741
  code = tColDataDecompress(BR_PTR(br), &info, colData, assist);
14,900,913✔
1742
  TSDB_CHECK_CODE(code, lino, _exit);
14,895,964!
1743
  br->offset += blockCol->szBitmap + blockCol->szOffset + blockCol->szValue;
14,895,964✔
1744

1745
_exit:
14,895,964✔
1746
  return code;
14,895,964✔
1747
}
1748

1749
int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData,
6,223,099✔
1750
                                    SBuffer *assist) {
1751
  int32_t       code = 0;
6,223,099✔
1752
  int32_t       lino = 0;
6,223,099✔
1753
  SCompressInfo cinfo;
1754

1755
  // uid
1756
  if (hdr->szUid > 0) {
6,223,099✔
1757
    cinfo = (SCompressInfo){
3,757,126✔
1758
        .cmprAlg = hdr->cmprAlg,
3,757,126✔
1759
        .dataType = TSDB_DATA_TYPE_BIGINT,
1760
        .compressedSize = hdr->szUid,
3,757,126✔
1761
        .originalSize = sizeof(int64_t) * hdr->nRow,
3,757,126✔
1762
    };
1763

1764
    code = tRealloc((uint8_t **)&blockData->aUid, cinfo.originalSize);
3,757,126✔
1765
    TSDB_CHECK_CODE(code, lino, _exit);
3,758,138!
1766
    code = tDecompressData(BR_PTR(br), &cinfo, blockData->aUid, cinfo.originalSize, assist);
3,758,138✔
1767
    TSDB_CHECK_CODE(code, lino, _exit);
3,757,222!
1768
    br->offset += cinfo.compressedSize;
3,757,222✔
1769
  }
1770

1771
  // version
1772
  cinfo = (SCompressInfo){
6,223,195✔
1773
      .cmprAlg = hdr->cmprAlg,
6,223,195✔
1774
      .dataType = TSDB_DATA_TYPE_BIGINT,
1775
      .compressedSize = hdr->szVer,
6,223,195✔
1776
      .originalSize = sizeof(int64_t) * hdr->nRow,
6,223,195✔
1777
  };
1778
  code = tRealloc((uint8_t **)&blockData->aVersion, cinfo.originalSize);
6,223,195✔
1779
  TSDB_CHECK_CODE(code, lino, _exit);
6,225,103!
1780
  code = tDecompressData(BR_PTR(br), &cinfo, blockData->aVersion, cinfo.originalSize, assist);
6,225,103✔
1781
  TSDB_CHECK_CODE(code, lino, _exit);
6,225,573!
1782
  br->offset += cinfo.compressedSize;
6,225,573✔
1783

1784
  // ts
1785
  cinfo = (SCompressInfo){
6,225,573✔
1786
      .cmprAlg = hdr->cmprAlg,
6,225,573✔
1787
      .dataType = TSDB_DATA_TYPE_TIMESTAMP,
1788
      .compressedSize = hdr->szKey,
6,225,573✔
1789
      .originalSize = sizeof(TSKEY) * hdr->nRow,
6,225,573✔
1790
  };
1791
  code = tRealloc((uint8_t **)&blockData->aTSKEY, cinfo.originalSize);
6,225,573✔
1792
  TSDB_CHECK_CODE(code, lino, _exit);
6,225,842!
1793
  code = tDecompressData(BR_PTR(br), &cinfo, blockData->aTSKEY, cinfo.originalSize, assist);
6,225,842✔
1794
  TSDB_CHECK_CODE(code, lino, _exit);
6,225,735!
1795
  br->offset += cinfo.compressedSize;
6,225,735✔
1796

1797
  // primary keys
1798
  for (int i = 0; i < hdr->numOfPKs; i++) {
6,495,602✔
1799
    const SBlockCol *blockCol = &hdr->primaryBlockCols[i];
269,868✔
1800

1801
    if (!(blockCol->flag == HAS_VALUE)) {
269,868!
1802
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
1803
    }
1804
    if (!(blockCol->cflag & COL_IS_KEY)) {
269,868!
UNCOV
1805
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
1806
    }
1807

1808
    code = tBlockDataDecompressColData(hdr, blockCol, br, blockData, assist);
269,868✔
1809
    TSDB_CHECK_CODE(code, lino, _exit);
269,867!
1810
  }
1811

1812
_exit:
6,225,734✔
1813
  return code;
6,225,734✔
1814
}
1815

1816
int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg) {
7,083,525✔
1817
  if (set == NULL) return -1;
7,083,525!
1818

1819
  uint32_t *ret = taosHashGet(set, &colId, sizeof(colId));
7,083,525✔
1820
  if (ret == NULL) {
7,082,576!
1821
    return TSDB_CODE_NOT_FOUND;
×
1822
  }
1823

1824
  *alg = *ret;
7,082,576✔
1825
  return 0;
7,082,576✔
1826
}
UNCOV
1827
uint32_t tsdbCvtTimestampAlg(uint32_t alg) {
×
UNCOV
1828
  DEFINE_VAR(alg)
×
1829

UNCOV
1830
  return 0;
×
1831
}
1832

1833
int32_t tsdbAllocateDisk(STsdb *tsdb, const char *label, int32_t expLevel, SDiskID *diskId) {
352,390✔
1834
  int32_t code = 0;
352,390✔
1835
  int32_t lino = 0;
352,390✔
1836
  SDiskID did = {0};
352,390✔
1837
  STfs   *tfs = tsdb->pVnode->pTfs;
352,390✔
1838

1839
  code = tfsAllocDisk(tfs, expLevel, label, &did);
352,390✔
1840
  if (code) {
352,523!
UNCOV
1841
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, __LINE__,
×
1842
              tstrerror(code));
UNCOV
1843
    return code;
×
1844
  }
1845

1846
  if (tfsMkdirRecurAt(tfs, tsdb->path, did) != 0) {
352,523!
UNCOV
1847
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, __LINE__,
×
1848
              tstrerror(code));
1849
  }
1850

1851
  if (diskId) {
352,464✔
1852
    *diskId = did;
352,418✔
1853
  }
1854
  return code;
352,464✔
1855
}
1856

1857
int32_t tsdbAllocateDiskAtLevel(STsdb *tsdb, int32_t level, const char *label, SDiskID *diskId) {
21✔
1858
  int32_t code = 0;
21✔
1859
  SDiskID did = {0};
21✔
1860
  STfs   *tfs = tsdb->pVnode->pTfs;
21✔
1861

1862
  code = tfsAllocDiskAtLevel(tfs, level, label, &did);
21✔
1863
  if (code) {
21!
UNCOV
1864
    return code;
×
1865
  }
1866

1867
  if (tfsMkdirRecurAt(tfs, tsdb->path, did) != 0) {
21!
UNCOV
1868
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, __LINE__,
×
1869
              tstrerror(code));
1870
  }
1871

1872
  if (diskId) {
21!
1873
    *diskId = did;
21✔
1874
  }
1875
  return 0;
21✔
1876
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc