• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3608

12 Feb 2025 05:57AM UTC coverage: 63.066% (+1.4%) from 61.715%
#3608

push

travis-ci

web-flow
Merge pull request #29746 from taosdata/merge/mainto3.02

merge: from main to 3.0 branch

140199 of 286257 branches covered (48.98%)

Branch coverage included in aggregate %.

89 of 161 new or added lines in 18 files covered. (55.28%)

3211 existing lines in 190 files now uncovered.

218998 of 283298 relevant lines covered (77.3%)

5949310.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.2
/source/dnode/vnode/src/tsdb/tsdbUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcompression.h"
17
#include "tdataformat.h"
18
#include "tsdb.h"
19
#include "tsdbDef.h"
20

21
int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg);
22

23
static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist,
24
                                         SColCompressInfo *pCompressExt);
25

26
// SMapData =======================================================================
27
void tMapDataReset(SMapData *pMapData) {
×
28
  pMapData->nItem = 0;
×
29
  pMapData->nData = 0;
×
30
}
×
31

32
void tMapDataClear(SMapData *pMapData) {
×
33
  tFree(pMapData->aOffset);
×
34
  tFree(pMapData->pData);
×
35
  pMapData->pData = NULL;
×
36
  pMapData->aOffset = NULL;
×
37
}
×
38

39
#ifdef BUILD_NO_CALL
40
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
41
  int32_t code = 0;
42
  int32_t offset = pMapData->nData;
43
  int32_t nItem = pMapData->nItem;
44

45
  pMapData->nItem++;
46
  pMapData->nData += tPutItemFn(NULL, pItem);
47

48
  // alloc
49
  code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
50
  if (code) goto _exit;
51
  code = tRealloc(&pMapData->pData, pMapData->nData);
52
  if (code) goto _exit;
53

54
  // put
55
  pMapData->aOffset[nItem] = offset;
56
  tPutItemFn(pMapData->pData + offset, pItem);
57

58
_exit:
59
  return code;
60
}
61

62
int32_t tMapDataCopy(SMapData *pFrom, SMapData *pTo) {
63
  int32_t code = 0;
64

65
  pTo->nItem = pFrom->nItem;
66
  pTo->nData = pFrom->nData;
67
  code = tRealloc((uint8_t **)&pTo->aOffset, sizeof(int32_t) * pFrom->nItem);
68
  if (code) goto _exit;
69
  code = tRealloc(&pTo->pData, pFrom->nData);
70
  if (code) goto _exit;
71
  memcpy(pTo->aOffset, pFrom->aOffset, sizeof(int32_t) * pFrom->nItem);
72
  memcpy(pTo->pData, pFrom->pData, pFrom->nData);
73

74
_exit:
75
  return code;
76
}
77

78
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
79
                       int32_t (*tItemCmprFn)(const void *, const void *), void *pItem) {
80
  int32_t code = 0;
81
  int32_t lidx = 0;
82
  int32_t ridx = pMapData->nItem - 1;
83
  int32_t midx;
84
  int32_t c;
85

86
  while (lidx <= ridx) {
87
    midx = (lidx + ridx) / 2;
88

89
    tMapDataGetItemByIdx(pMapData, midx, pItem, tGetItemFn);
90

91
    c = tItemCmprFn(pSearchItem, pItem);
92
    if (c == 0) {
93
      goto _exit;
94
    } else if (c < 0) {
95
      ridx = midx - 1;
96
    } else {
97
      lidx = midx + 1;
98
    }
99
  }
100

101
  code = TSDB_CODE_NOT_FOUND;
102

103
_exit:
104
  return code;
105
}
106
#endif
107

108
void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) {
×
109
  int32_t r = tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem);
×
110
}
×
111

112
#ifdef BUILD_NO_CALL
113
int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *),
114
                        SArray **ppArray) {
115
  int32_t code = 0;
116

117
  SArray *pArray = taosArrayInit(pMapData->nItem, itemSize);
118
  if (pArray == NULL) {
119
    code = terrno;
120
    goto _exit;
121
  }
122

123
  for (int32_t i = 0; i < pMapData->nItem; i++) {
124
    tMapDataGetItemByIdx(pMapData, i, taosArrayReserve(pArray, 1), tGetItemFn);
125
  }
126

127
_exit:
128
  *ppArray = pArray;
129
  return code;
130
}
131

132
int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
133
  int32_t n = 0;
134

135
  n += tPutI32v(p ? p + n : p, pMapData->nItem);
136
  if (pMapData->nItem) {
137
    int32_t lOffset = 0;
138
    for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
139
      n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem] - lOffset);
140
      lOffset = pMapData->aOffset[iItem];
141
    }
142

143
    n += tPutI32v(p ? p + n : p, pMapData->nData);
144
    if (p) {
145
      memcpy(p + n, pMapData->pData, pMapData->nData);
146
    }
147
    n += pMapData->nData;
148
  }
149

150
  return n;
151
}
152
#endif
153

154
int32_t tGetMapData(uint8_t *p, SMapData *pMapData, int32_t *decodedSize) {
×
155
  int32_t n = 0;
×
156
  int32_t code;
157
  int32_t offset;
158

159
  tMapDataReset(pMapData);
×
160

161
  n += tGetI32v(p + n, &pMapData->nItem);
×
162
  if (pMapData->nItem) {
×
163
    code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
×
164
    if (code) {
×
165
      return code;
×
166
    }
167

168
    int32_t lOffset = 0;
×
169
    for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
×
170
      n += tGetI32v(p + n, &pMapData->aOffset[iItem]);
×
171
      pMapData->aOffset[iItem] += lOffset;
×
172
      lOffset = pMapData->aOffset[iItem];
×
173
    }
174

175
    n += tGetI32v(p + n, &pMapData->nData);
×
176
    code = tRealloc(&pMapData->pData, pMapData->nData);
×
177
    if (code) {
×
178
      return code;
×
179
    }
180
    memcpy(pMapData->pData, p + n, pMapData->nData);
×
181
    n += pMapData->nData;
×
182
  }
183

184
  if (decodedSize) {
×
185
    *decodedSize = n;
×
186
  }
187

188
  return 0;
×
189
}
190

191
#ifdef BUILD_NO_CALL
192
// TABLEID =======================================================================
193
int32_t tTABLEIDCmprFn(const void *p1, const void *p2) {
194
  TABLEID *pId1 = (TABLEID *)p1;
195
  TABLEID *pId2 = (TABLEID *)p2;
196

197
  if (pId1->suid < pId2->suid) {
198
    return -1;
199
  } else if (pId1->suid > pId2->suid) {
200
    return 1;
201
  }
202

203
  if (pId1->uid < pId2->uid) {
204
    return -1;
205
  } else if (pId1->uid > pId2->uid) {
206
    return 1;
207
  }
208

209
  return 0;
210
}
211

212
// SBlockIdx ======================================================
213
int32_t tPutBlockIdx(uint8_t *p, void *ph) {
214
  int32_t    n = 0;
215
  SBlockIdx *pBlockIdx = (SBlockIdx *)ph;
216

217
  n += tPutI64(p ? p + n : p, pBlockIdx->suid);
218
  n += tPutI64(p ? p + n : p, pBlockIdx->uid);
219
  n += tPutI64v(p ? p + n : p, pBlockIdx->offset);
220
  n += tPutI64v(p ? p + n : p, pBlockIdx->size);
221

222
  return n;
223
}
224
#endif
225

226
int32_t tGetBlockIdx(uint8_t *p, void *ph) {
×
227
  int32_t    n = 0;
×
228
  SBlockIdx *pBlockIdx = (SBlockIdx *)ph;
×
229

230
  n += tGetI64(p + n, &pBlockIdx->suid);
×
231
  n += tGetI64(p + n, &pBlockIdx->uid);
×
232
  n += tGetI64v(p + n, &pBlockIdx->offset);
×
233
  n += tGetI64v(p + n, &pBlockIdx->size);
×
234

235
  return n;
×
236
}
237

238
#ifdef BUILD_NO_CALL
239
int32_t tCmprBlockIdx(void const *lhs, void const *rhs) {
240
  SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
241
  SBlockIdx *rBlockIdx = (SBlockIdx *)rhs;
242

243
  if (lBlockIdx->suid < rBlockIdx->suid) {
244
    return -1;
245
  } else if (lBlockIdx->suid > rBlockIdx->suid) {
246
    return 1;
247
  }
248

249
  if (lBlockIdx->uid < rBlockIdx->uid) {
250
    return -1;
251
  } else if (lBlockIdx->uid > rBlockIdx->uid) {
252
    return 1;
253
  }
254

255
  return 0;
256
}
257

258
int32_t tCmprBlockL(void const *lhs, void const *rhs) {
259
  SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
260
  SSttBlk   *rBlockL = (SSttBlk *)rhs;
261

262
  if (lBlockIdx->suid < rBlockL->suid) {
263
    return -1;
264
  } else if (lBlockIdx->suid > rBlockL->suid) {
265
    return 1;
266
  }
267

268
  if (lBlockIdx->uid < rBlockL->minUid) {
269
    return -1;
270
  } else if (lBlockIdx->uid > rBlockL->maxUid) {
271
    return 1;
272
  }
273

274
  return 0;
275
}
276

277
// SDataBlk ======================================================
278
void tDataBlkReset(SDataBlk *pDataBlk) {
279
  *pDataBlk = (SDataBlk){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN};
280
}
281

282
int32_t tPutDataBlk(uint8_t *p, void *ph) {
283
  int32_t   n = 0;
284
  SDataBlk *pDataBlk = (SDataBlk *)ph;
285

286
  n += tPutI64v(p ? p + n : p, pDataBlk->minKey.version);
287
  n += tPutI64v(p ? p + n : p, pDataBlk->minKey.ts);
288
  n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.version);
289
  n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.ts);
290
  n += tPutI64v(p ? p + n : p, pDataBlk->minVer);
291
  n += tPutI64v(p ? p + n : p, pDataBlk->maxVer);
292
  n += tPutI32v(p ? p + n : p, pDataBlk->nRow);
293
  n += tPutI8(p ? p + n : p, pDataBlk->hasDup);
294
  n += tPutI8(p ? p + n : p, pDataBlk->nSubBlock);
295
  for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
296
    n += tPutI64v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].offset);
297
    n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szBlock);
298
    n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szKey);
299
  }
300
  if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) {
301
    n += tPutI64v(p ? p + n : p, pDataBlk->smaInfo.offset);
302
    n += tPutI32v(p ? p + n : p, pDataBlk->smaInfo.size);
303
  }
304

305
  return n;
306
}
307
#endif
308

309
int32_t tGetDataBlk(uint8_t *p, void *ph) {
×
310
  int32_t   n = 0;
×
311
  SDataBlk *pDataBlk = (SDataBlk *)ph;
×
312

313
  n += tGetI64v(p + n, &pDataBlk->minKey.version);
×
314
  n += tGetI64v(p + n, &pDataBlk->minKey.ts);
×
315
  n += tGetI64v(p + n, &pDataBlk->maxKey.version);
×
316
  n += tGetI64v(p + n, &pDataBlk->maxKey.ts);
×
317
  n += tGetI64v(p + n, &pDataBlk->minVer);
×
318
  n += tGetI64v(p + n, &pDataBlk->maxVer);
×
319
  n += tGetI32v(p + n, &pDataBlk->nRow);
×
320
  n += tGetI8(p + n, &pDataBlk->hasDup);
×
321
  n += tGetI8(p + n, &pDataBlk->nSubBlock);
×
322
  for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
×
323
    n += tGetI64v(p + n, &pDataBlk->aSubBlock[iSubBlock].offset);
×
324
    n += tGetI32v(p + n, &pDataBlk->aSubBlock[iSubBlock].szBlock);
×
325
    n += tGetI32v(p + n, &pDataBlk->aSubBlock[iSubBlock].szKey);
×
326
  }
327
  if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) {
×
328
    n += tGetI64v(p + n, &pDataBlk->smaInfo.offset);
×
329
    n += tGetI32v(p + n, &pDataBlk->smaInfo.size);
×
330
  } else {
331
    pDataBlk->smaInfo.offset = 0;
×
332
    pDataBlk->smaInfo.size = 0;
×
333
  }
334

335
  return n;
×
336
}
337

338
#ifdef BUILD_NO_CALL
339
int32_t tDataBlkCmprFn(const void *p1, const void *p2) {
340
  SDataBlk *pBlock1 = (SDataBlk *)p1;
341
  SDataBlk *pBlock2 = (SDataBlk *)p2;
342

343
  if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) {
344
    return -1;
345
  } else if (tsdbKeyCmprFn(&pBlock1->minKey, &pBlock2->maxKey) > 0) {
346
    return 1;
347
  }
348

349
  return 0;
350
}
351

352
bool tDataBlkHasSma(SDataBlk *pDataBlk) {
353
  if (pDataBlk->nSubBlock > 1) return false;
354
  if (pDataBlk->hasDup) return false;
355

356
  return pDataBlk->smaInfo.size > 0;
357
}
358

359
// SSttBlk ======================================================
360
int32_t tPutSttBlk(uint8_t *p, void *ph) {
361
  int32_t  n = 0;
362
  SSttBlk *pSttBlk = (SSttBlk *)ph;
363

364
  n += tPutI64(p ? p + n : p, pSttBlk->suid);
365
  n += tPutI64(p ? p + n : p, pSttBlk->minUid);
366
  n += tPutI64(p ? p + n : p, pSttBlk->maxUid);
367
  n += tPutI64v(p ? p + n : p, pSttBlk->minKey);
368
  n += tPutI64v(p ? p + n : p, pSttBlk->maxKey);
369
  n += tPutI64v(p ? p + n : p, pSttBlk->minVer);
370
  n += tPutI64v(p ? p + n : p, pSttBlk->maxVer);
371
  n += tPutI32v(p ? p + n : p, pSttBlk->nRow);
372
  n += tPutI64v(p ? p + n : p, pSttBlk->bInfo.offset);
373
  n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szBlock);
374
  n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szKey);
375

376
  return n;
377
}
378
#endif
379

380
int32_t tGetSttBlk(uint8_t *p, void *ph) {
×
381
  int32_t  n = 0;
×
382
  SSttBlk *pSttBlk = (SSttBlk *)ph;
×
383

384
  n += tGetI64(p + n, &pSttBlk->suid);
×
385
  n += tGetI64(p + n, &pSttBlk->minUid);
×
386
  n += tGetI64(p + n, &pSttBlk->maxUid);
×
387
  n += tGetI64v(p + n, &pSttBlk->minKey);
×
388
  n += tGetI64v(p + n, &pSttBlk->maxKey);
×
389
  n += tGetI64v(p + n, &pSttBlk->minVer);
×
390
  n += tGetI64v(p + n, &pSttBlk->maxVer);
×
391
  n += tGetI32v(p + n, &pSttBlk->nRow);
×
392
  n += tGetI64v(p + n, &pSttBlk->bInfo.offset);
×
393
  n += tGetI32v(p + n, &pSttBlk->bInfo.szBlock);
×
394
  n += tGetI32v(p + n, &pSttBlk->bInfo.szKey);
×
395

396
  return n;
×
397
}
398

399
// SBlockCol ======================================================
400

401
static const int32_t BLOCK_WITH_ALG_VER = 2;
402

403
int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) {
1,433,964✔
404
  int32_t code;
405

406
  if ((code = tBufferPutI16v(buffer, pBlockCol->cid))) return code;
2,867,954!
407
  if ((code = tBufferPutI8(buffer, pBlockCol->type))) return code;
2,867,980!
408
  if ((code = tBufferPutI8(buffer, pBlockCol->cflag))) return code;
2,867,980!
409
  if ((code = tBufferPutI8(buffer, pBlockCol->flag))) return code;
2,867,980!
410
  if ((code = tBufferPutI32v(buffer, pBlockCol->szOrigin))) return code;
2,867,980!
411

412
  if (pBlockCol->flag != HAS_NULL) {
1,433,990✔
413
    if (pBlockCol->flag != HAS_VALUE) {
1,295,128✔
414
      if ((code = tBufferPutI32v(buffer, pBlockCol->szBitmap))) return code;
294,268!
415
    }
416

417
    if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
1,295,128!
418
      if ((code = tBufferPutI32v(buffer, pBlockCol->szOffset))) return code;
601,780!
419
    }
420

421
    if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) {
1,295,128✔
422
      if ((code = tBufferPutI32v(buffer, pBlockCol->szValue))) return code;
2,589,084!
423
    }
424

425
    if ((code = tBufferPutI32v(buffer, pBlockCol->offset))) return code;
2,590,256!
426
  }
427
  if (ver >= BLOCK_WITH_ALG_VER) {
1,433,990!
428
    if ((code = tBufferPutU32(buffer, pBlockCol->alg))) return code;
2,869,026!
429
  } else {
430
    if ((code = tBufferPutU32(buffer, defaultCmprAlg))) return code;
×
431
  }
432
  return 0;
1,433,990✔
433
}
434

435
int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) {
7,960,041✔
436
  int32_t code;
437

438
  if ((code = tBufferGetI16v(br, &pBlockCol->cid))) return code;
7,960,041!
439
  if ((code = tBufferGetI8(br, &pBlockCol->type))) return code;
7,960,721!
440
  if ((code = tBufferGetI8(br, &pBlockCol->cflag))) return code;
7,961,134!
441
  if ((code = tBufferGetI8(br, &pBlockCol->flag))) return code;
7,958,567!
442
  if ((code = tBufferGetI32v(br, &pBlockCol->szOrigin))) return code;
7,956,710!
443

444
  pBlockCol->szBitmap = 0;
7,957,148✔
445
  pBlockCol->szOffset = 0;
7,957,148✔
446
  pBlockCol->szValue = 0;
7,957,148✔
447
  pBlockCol->offset = 0;
7,957,148✔
448

449
  if (pBlockCol->flag != HAS_NULL) {
7,957,148✔
450
    if (pBlockCol->flag != HAS_VALUE) {
6,832,990✔
451
      if ((code = tBufferGetI32v(br, &pBlockCol->szBitmap))) return code;
691,695!
452
    }
453

454
    if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
6,832,969!
455
      if ((code = tBufferGetI32v(br, &pBlockCol->szOffset))) return code;
1,066,181!
456
    }
457

458
    if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) {
6,835,632✔
459
      if ((code = tBufferGetI32v(br, &pBlockCol->szValue))) return code;
6,831,982!
460
    }
461

462
    if ((code = tBufferGetI32v(br, &pBlockCol->offset))) return code;
6,836,922!
463
  }
464

465
  if (ver >= BLOCK_WITH_ALG_VER) {
7,956,012!
466
    if ((code = tBufferGetU32(br, &pBlockCol->alg))) return code;
7,957,357!
467
  } else {
468
    pBlockCol->alg = defaultCmprAlg;
×
469
  }
470

471
  return 0;
7,955,031✔
472
}
473

474
#ifdef BUILD_NO_CALL
475
int32_t tBlockColCmprFn(const void *p1, const void *p2) {
476
  if (((SBlockCol *)p1)->cid < ((SBlockCol *)p2)->cid) {
477
    return -1;
478
  } else if (((SBlockCol *)p1)->cid > ((SBlockCol *)p2)->cid) {
479
    return 1;
480
  }
481

482
  return 0;
483
}
484

485
// SDelIdx ======================================================
486
int32_t tCmprDelIdx(void const *lhs, void const *rhs) {
487
  SDelIdx *lDelIdx = (SDelIdx *)lhs;
488
  SDelIdx *rDelIdx = (SDelIdx *)rhs;
489

490
  if (lDelIdx->suid < rDelIdx->suid) {
491
    return -1;
492
  } else if (lDelIdx->suid > rDelIdx->suid) {
493
    return 1;
494
  }
495

496
  if (lDelIdx->uid < rDelIdx->uid) {
497
    return -1;
498
  } else if (lDelIdx->uid > rDelIdx->uid) {
499
    return 1;
500
  }
501

502
  return 0;
503
}
504

505
int32_t tPutDelIdx(uint8_t *p, void *ph) {
506
  SDelIdx *pDelIdx = (SDelIdx *)ph;
507
  int32_t  n = 0;
508

509
  n += tPutI64(p ? p + n : p, pDelIdx->suid);
510
  n += tPutI64(p ? p + n : p, pDelIdx->uid);
511
  n += tPutI64v(p ? p + n : p, pDelIdx->offset);
512
  n += tPutI64v(p ? p + n : p, pDelIdx->size);
513

514
  return n;
515
}
516
#endif
517

518
int32_t tGetDelIdx(uint8_t *p, void *ph) {
×
519
  SDelIdx *pDelIdx = (SDelIdx *)ph;
×
520
  int32_t  n = 0;
×
521

522
  n += tGetI64(p + n, &pDelIdx->suid);
×
523
  n += tGetI64(p + n, &pDelIdx->uid);
×
524
  n += tGetI64v(p + n, &pDelIdx->offset);
×
525
  n += tGetI64v(p + n, &pDelIdx->size);
×
526

527
  return n;
×
528
}
529

530
#ifdef BUILD_NO_CALL
531
// SDelData ======================================================
532
int32_t tPutDelData(uint8_t *p, void *ph) {
533
  SDelData *pDelData = (SDelData *)ph;
534
  int32_t   n = 0;
535

536
  n += tPutI64v(p ? p + n : p, pDelData->version);
537
  n += tPutI64(p ? p + n : p, pDelData->sKey);
538
  n += tPutI64(p ? p + n : p, pDelData->eKey);
539

540
  return n;
541
}
542
#endif
543

544
int32_t tGetDelData(uint8_t *p, void *ph) {
×
545
  SDelData *pDelData = (SDelData *)ph;
×
546
  int32_t   n = 0;
×
547

548
  n += tGetI64v(p + n, &pDelData->version);
×
549
  n += tGetI64(p + n, &pDelData->sKey);
×
550
  n += tGetI64(p + n, &pDelData->eKey);
×
551

552
  return n;
×
553
}
554

555
int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision) {
785,124✔
556
  int64_t fid;
557
  if (key < 0) {
785,124✔
558
    fid = ((key + 1) / tsTickPerMin[precision] / minutes - 1);
4,802✔
559
    return (fid < INT32_MIN) ? INT32_MIN : (int32_t)fid;
4,802✔
560
  } else {
561
    fid = ((key / tsTickPerMin[precision] / minutes));
780,322✔
562
    return (fid > INT32_MAX) ? INT32_MAX : (int32_t)fid;
780,322!
563
  }
564
}
565

566
void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey) {
2,207,645✔
567
  *minKey = tsTickPerMin[precision] * fid * minutes;
2,207,645✔
568
  *maxKey = *minKey + tsTickPerMin[precision] * minutes - 1;
2,207,645✔
569
}
2,207,645✔
570

571
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t nowSec) {
135,706✔
572
  int32_t aFid[3];
573
  TSKEY   key;
574

575
  if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
135,706✔
576
    nowSec = nowSec * 1000;
135,588✔
577
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
118✔
578
    nowSec = nowSec * 1000000l;
62✔
579
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
56!
580
    nowSec = nowSec * 1000000000l;
82✔
581
  } else {
582
    tsdbError("invalid time precision:%d", pKeepCfg->precision);
×
583
    return 0;
×
584
  }
585

586
  nowSec = nowSec - pKeepCfg->keepTimeOffset * tsTickPerHour[pKeepCfg->precision];
135,732✔
587

588
  key = nowSec - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision];
135,732✔
589
  aFid[0] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
135,732✔
590
  key = nowSec - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision];
135,758✔
591
  aFid[1] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
135,758✔
592
  key = nowSec - pKeepCfg->keep2 * tsTickPerMin[pKeepCfg->precision];
135,766✔
593
  aFid[2] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
135,766✔
594

595
  if (fid >= aFid[0]) {
135,767✔
596
    return 0;
135,597✔
597
  } else if (fid >= aFid[1]) {
170✔
598
    return 1;
56✔
599
  } else if (fid >= aFid[2]) {
114✔
600
    return 2;
110✔
601
  } else {
602
    return -1;
4✔
603
  }
604
}
605

606
// TSDBROW ======================================================
607
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
257,119,553✔
608
  STColumn *pTColumn = &pTSchema->columns[iCol];
257,119,553✔
609
  SValue    value;
610

611
  if (pRow->type == TSDBROW_ROW_FMT) {
257,119,553✔
612
    int32_t ret = tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal);
38,555,441✔
613
    if (ret != 0) {
38,314,349!
614
      tsdbError("failed to get column value, code:%d", ret);
×
615
    }
616
  } else if (pRow->type == TSDBROW_COL_FMT) {
218,564,112!
617
    if (iCol == 0) {
222,541,755!
618
      *pColVal =
×
619
          COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID,
×
620
                        ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pRow->pBlockData->aTSKEY[pRow->iRow]}));
621
    } else {
622
      SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId);
222,541,755✔
623

624
      if (pColData) {
221,529,102✔
625
        if (tColDataGetValue(pColData, pRow->iRow, pColVal) != 0) {
18,996,870!
626
          tsdbError("failed to tColDataGetValue");
×
627
        }
628
      } else {
629
        *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
202,532,232✔
630
      }
631
    }
632
  }
633
}
257,116,008✔
634

635
void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) {
589,964,590✔
636
  if (row->type == TSDBROW_ROW_FMT) {
589,964,590✔
637
    key->version = row->version;
355,132,052✔
638
    tRowGetKey(row->pTSRow, &key->key);
355,132,052✔
639
  } else {
640
    key->version = row->pBlockData->aVersion[row->iRow];
234,832,538✔
641
    tColRowGetKey(row->pBlockData, row->iRow, &key->key);
234,832,538!
642
  }
643
}
589,772,403✔
644

645
void tColRowGetPrimaryKey(SBlockData *pBlock, int32_t irow, SRowKey *key) {
357,237,650✔
646
  for (int32_t i = 0; i < pBlock->nColData; i++) {
357,438,126✔
647
    SColData *pColData = &pBlock->aColData[i];
282,570,136✔
648
    if (pColData->cflag & COL_IS_KEY) {
282,570,136✔
649
      SColVal cv;
650
      if (tColDataGetValue(pColData, irow, &cv) != 0) {
200,476!
651
        break;
×
652
      }
653
      key->pks[key->numOfPKs] = cv.value;
200,476✔
654
      key->numOfPKs++;
200,476✔
655
    } else {
656
      break;
282,369,660✔
657
    }
658
  }
659
}
357,237,650✔
660

661
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2) {
56,919,913✔
662
  int32_t c = tRowKeyCompare(&key1->key, &key2->key);
56,919,913✔
663

664
  if (c) {
56,926,301✔
665
    return c;
54,595,805✔
666
  }
667

668
  if (key1->version < key2->version) {
2,330,496✔
669
    return -1;
990,956✔
670
  } else if (key1->version > key2->version) {
1,339,540!
671
    return 1;
1,397,206✔
672
  }
673
  return 0;
×
674
}
675

676
int32_t tsdbRowCompare(const void *p1, const void *p2) {
6,675,635✔
677
  STsdbRowKey key1, key2;
678

679
  tsdbRowGetKey((TSDBROW *)p1, &key1);
6,675,635✔
680
  tsdbRowGetKey((TSDBROW *)p2, &key2);
6,675,321✔
681
  return tsdbRowKeyCmpr(&key1, &key2);
6,656,637✔
682
}
683

684
int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2) {
151,942,298✔
685
  STsdbRowKey key1, key2;
686

687
  tsdbRowGetKey((TSDBROW *)p1, &key1);
151,942,298✔
688
  tsdbRowGetKey((TSDBROW *)p2, &key2);
151,715,234✔
689
  return tRowKeyCompare(&key1.key, &key2.key);
152,196,284✔
690
}
691

692
// STSDBRowIter ======================================================
693
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
1,260,715✔
694
  pIter->pRow = pRow;
1,260,715✔
695
  if (pRow->type == TSDBROW_ROW_FMT) {
1,260,715!
696
    int32_t code = tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter);
1,261,225✔
697
    if (code) return code;
1,330,597!
698
  } else if (pRow->type == TSDBROW_COL_FMT) {
×
699
    pIter->iColData = 0;
×
700
  }
701

702
  return 0;
1,330,087✔
703
}
704

705
void tsdbRowClose(STSDBRowIter *pIter) {
1,219,791✔
706
  if (pIter->pRow && pIter->pRow->type == TSDBROW_ROW_FMT) {
1,219,791!
707
    tRowIterClose(&pIter->pIter);
1,220,872✔
708
  }
709
  pIter->pRow = NULL;
1,329,486✔
710
  pIter->pIter = NULL;
1,329,486✔
711
}
1,329,486✔
712

713
SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
10,506,182✔
714
  if (pIter->pRow->type == TSDBROW_ROW_FMT) {
10,506,182!
715
    return tRowIterNext(pIter->pIter);
10,534,911✔
716
  } else if (pIter->pRow->type == TSDBROW_COL_FMT) {
×
717
    if (pIter->iColData == 0) {
×
718
      pIter->cv = COL_VAL_VALUE(
×
719
          PRIMARYKEY_TIMESTAMP_COL_ID,
720
          ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]}));
721
      ++pIter->iColData;
×
722
      return &pIter->cv;
×
723
    }
724

725
    if (pIter->iColData <= pIter->pRow->pBlockData->nColData) {
×
726
      if (tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv) !=
×
727
          0) {
UNCOV
728
        return NULL;
×
729
      }
730
      ++pIter->iColData;
×
UNCOV
731
      return &pIter->cv;
×
732
    } else {
UNCOV
733
      return NULL;
×
734
    }
735
  } else {
736
    tsdbError("invalid row type:%d", pIter->pRow->type);
×
UNCOV
737
    return NULL;
×
738
  }
739
}
740

741
// SRowMerger ======================================================
742
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
47,771,896✔
743
  int32_t   code = 0;
47,771,896✔
744
  TSDBKEY   key = TSDBROW_KEY(pRow);
47,771,896✔
745
  SColVal  *pColVal = &(SColVal){0};
47,771,896✔
746
  STColumn *pTColumn;
747
  int32_t   iCol, jCol = 1;
47,771,896✔
748

749
  if (NULL == pTSchema) {
47,771,896✔
750
    pTSchema = pMerger->pTSchema;
43,268,958✔
751
  }
752

753
  if (taosArrayGetSize(pMerger->pArray) == 0) {
47,771,896✔
754
    // ts
755
    jCol = 0;
41,417,867✔
756
    pTColumn = &pTSchema->columns[jCol++];
41,417,867✔
757

758
    *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = key.ts}));
41,417,867✔
759
    if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
83,204,152!
760
      code = terrno;
×
UNCOV
761
      return code;
×
762
    }
763

764
    // other
765
    for (iCol = 1; jCol < pTSchema->numOfCols && iCol < pMerger->pTSchema->numOfCols; ++iCol) {
232,817,033✔
766
      pTColumn = &pMerger->pTSchema->columns[iCol];
189,906,948✔
767
      if (pTSchema->columns[jCol].colId < pTColumn->colId) {
189,906,948!
768
        ++jCol;
×
769
        --iCol;
×
UNCOV
770
        continue;
×
771
      } else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
189,906,948!
772
        if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) {
×
UNCOV
773
          return terrno;
×
774
        }
UNCOV
775
        continue;
×
776
      }
777

778
      tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
189,906,948✔
779
      if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->value.type)) {
188,752,592!
780
        uint8_t *pVal = pColVal->value.pData;
6,294,924✔
781

782
        pColVal->value.pData = NULL;
6,294,924✔
783
        code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
6,294,924!
784
        if (code) {
6,282,125!
UNCOV
785
          return code;
×
786
        }
787

788
        if (pColVal->value.nData) {
6,282,125✔
789
          memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
6,218,327✔
790
        }
791
      }
792

793
      if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
379,770,541!
UNCOV
794
        return terrno;
×
795
      }
796
    }
797

798
    for (; iCol < pMerger->pTSchema->numOfCols; ++iCol) {
42,910,089✔
799
      pTColumn = &pMerger->pTSchema->columns[iCol];
4✔
800
      if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) {
8!
UNCOV
801
        return terrno;
×
802
      }
803
    }
804

805
    pMerger->version = key.version;
42,910,085✔
806
    return 0;
42,910,085✔
807
  } else {
808
    for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
78,577,145✔
809
      pTColumn = &pMerger->pTSchema->columns[iCol];
72,222,897✔
810
      if (pTSchema->columns[jCol].colId < pTColumn->colId) {
72,222,897!
811
        ++jCol;
×
812
        --iCol;
×
UNCOV
813
        continue;
×
814
      } else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
72,222,897!
UNCOV
815
        continue;
×
816
      }
817

818
      tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
72,222,897✔
819

820
      if (key.version > pMerger->version) {
72,126,043✔
821
        if (!COL_VAL_IS_NONE(pColVal)) {
72,073,142✔
822
          if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
30,746,045!
823
            SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
6,248,719✔
824
            if (!pTColVal) return terrno;
6,432,765!
825
            if (!COL_VAL_IS_NULL(pColVal)) {
6,435,010✔
826
              code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
6,230,377✔
827
              if (code) return code;
6,238,733!
828

829
              pTColVal->value.nData = pColVal->value.nData;
6,238,733✔
830
              if (pTColVal->value.nData) {
6,238,733✔
831
                memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
6,205,764✔
832
              }
833
              pTColVal->flag = 0;
6,238,733✔
834
            } else {
835
              tFree(pTColVal->value.pData);
204,633!
836
              taosArraySet(pMerger->pArray, iCol, pColVal);
204,662✔
837
            }
838
          } else {
839
            taosArraySet(pMerger->pArray, iCol, pColVal);
18,063,561✔
840
          }
841
        }
842
      } else if (key.version < pMerger->version) {
52,901!
843
        SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
52,901✔
844
        if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
52,901!
845
          if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->value.type)) {
×
846
            code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
×
UNCOV
847
            if (code) return code;
×
848

849
            tColVal->value.nData = pColVal->value.nData;
×
850
            if (pColVal->value.nData) {
×
UNCOV
851
              memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
×
852
            }
UNCOV
853
            tColVal->flag = 0;
×
854
          } else {
UNCOV
855
            taosArraySet(pMerger->pArray, iCol, pColVal);
×
856
          }
857
        }
858
      } else {
UNCOV
859
        return TSDB_CODE_INVALID_PARA;
×
860
      }
861
    }
862

863
    pMerger->version = key.version;
6,354,248✔
864
    return code;
6,354,248✔
865
  }
866
}
867

868
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pSchema) {
1,098,001✔
869
  pMerger->pTSchema = pSchema;
1,098,001✔
870
  pMerger->pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
1,098,001✔
871
  if (pMerger->pArray == NULL) {
1,099,584!
UNCOV
872
    return terrno;
×
873
  } else {
874
    return TSDB_CODE_SUCCESS;
1,099,668✔
875
  }
876
}
877

878
void tsdbRowMergerClear(SRowMerger *pMerger) {
42,258,930✔
879
  for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
227,724,968✔
880
    SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
185,613,005✔
881
    if (IS_VAR_DATA_TYPE(pTColVal->value.type)) {
185,432,196!
882
      tFree(pTColVal->value.pData);
73,671,697!
883
    }
884
  }
885

886
  taosArrayClear(pMerger->pArray);
42,111,963✔
887
}
41,066,463✔
888

889
void tsdbRowMergerCleanup(SRowMerger *pMerger) {
1,139,695✔
890
  int32_t numOfCols = taosArrayGetSize(pMerger->pArray);
1,139,695✔
891
  for (int32_t iCol = 1; iCol < numOfCols; iCol++) {
1,139,916!
892
    SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
×
893
    if (IS_VAR_DATA_TYPE(pTColVal->value.type)) {
×
UNCOV
894
      tFree(pTColVal->value.pData);
×
895
    }
896
  }
897

898
  taosArrayDestroy(pMerger->pArray);
1,139,916✔
899
}
1,139,965✔
900

901
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) {
41,211,136✔
902
  return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow);
41,211,136✔
903
}
904

905
// delete skyline ======================================================
906
static void tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pSkyline) {
259,599✔
907
  int32_t  i1 = 0;
259,599✔
908
  int32_t  n1 = taosArrayGetSize(pSkyline1);
259,599✔
909
  int32_t  i2 = 0;
259,599✔
910
  int32_t  n2 = taosArrayGetSize(pSkyline2);
259,599✔
911
  TSDBKEY *pKey1;
912
  TSDBKEY *pKey2;
913
  int64_t  version1 = 0;
259,599✔
914
  int64_t  version2 = 0;
259,599✔
915

916
  taosArrayClear(pSkyline);
259,599✔
917
  TSDBKEY **pItem = TARRAY_GET_ELEM(pSkyline, 0);
259,599✔
918

919
  while (i1 < n1 && i2 < n2) {
1,301,243✔
920
    pKey1 = (TSDBKEY *)taosArrayGetP(pSkyline1, i1);
1,041,644✔
921
    pKey2 = (TSDBKEY *)taosArrayGetP(pSkyline2, i2);
1,041,644✔
922

923
    if (pKey1->ts < pKey2->ts) {
1,041,644✔
924
      version1 = pKey1->version;
399,639✔
925
      *pItem = pKey1;
399,639✔
926
      i1++;
399,639✔
927
    } else if (pKey1->ts > pKey2->ts) {
642,005✔
928
      version2 = pKey2->version;
230,512✔
929
      *pItem = pKey2;
230,512✔
930
      i2++;
230,512✔
931
    } else {
932
      version1 = pKey1->version;
411,493✔
933
      version2 = pKey2->version;
411,493✔
934
      *pItem = pKey1;
411,493✔
935
      i1++;
411,493✔
936
      i2++;
411,493✔
937
    }
938

939
    (*pItem)->version = TMAX(version1, version2);
1,041,644✔
940
    pItem++;
1,041,644✔
941
  }
942

943
  while (i1 < n1) {
339,812✔
944
    pKey1 = (TSDBKEY *)taosArrayGetP(pSkyline1, i1);
80,213✔
945
    *pItem = pKey1;
80,213✔
946
    pItem++;
80,213✔
947
    i1++;
80,213✔
948
  }
949

950
  while (i2 < n2) {
413,430✔
951
    pKey2 = (TSDBKEY *)taosArrayGetP(pSkyline2, i2);
153,831✔
952
    *pItem = pKey2;
153,831✔
953
    pItem++;
153,831✔
954
    i2++;
153,831✔
955
  }
956

957
  pSkyline->size = TARRAY_ELEM_IDX(pSkyline, pItem);
259,599✔
958
}
259,599✔
959

960
int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) {
543,160✔
961
  int32_t   code = 0;
543,160✔
962
  SDelData *pDelData;
963
  int32_t   midx;
964

965
  taosArrayClear(pSkyline);
543,160✔
966
  if (sidx == eidx) {
543,160✔
967
    TSDBKEY *pItem1 = taosArrayGet(aSkyline, sidx * 2);
283,561✔
968
    TSDBKEY *pItem2 = taosArrayGet(aSkyline, sidx * 2 + 1);
283,561✔
969
    if (taosArrayPush(pSkyline, &pItem1) == NULL) {
283,561!
UNCOV
970
      return terrno;
×
971
    }
972

973
    if (taosArrayPush(pSkyline, &pItem2) == NULL) {
283,561!
UNCOV
974
      return terrno;
×
975
    }
976
  } else {
977
    SArray *pSkyline1 = NULL;
259,599✔
978
    SArray *pSkyline2 = NULL;
259,599✔
979
    midx = (sidx + eidx) / 2;
259,599✔
980

981
    pSkyline1 = taosArrayInit((midx - sidx + 1) * 2, POINTER_BYTES);
259,599✔
982
    if (pSkyline1 == NULL) {
259,599!
UNCOV
983
      return terrno;
×
984
    }
985
    pSkyline2 = taosArrayInit((eidx - midx) * 2, POINTER_BYTES);
259,599✔
986
    if (pSkyline2 == NULL) {
259,599!
987
      taosArrayDestroy(pSkyline1);
×
UNCOV
988
      return terrno;
×
989
    }
990

991
    code = tsdbBuildDeleteSkylineImpl(aSkyline, sidx, midx, pSkyline1);
259,599✔
992
    if (code) goto _clear;
259,599!
993

994
    code = tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2);
259,599✔
995
    if (code) goto _clear;
259,599!
996

997
    tsdbMergeSkyline(pSkyline1, pSkyline2, pSkyline);
259,599✔
998

999
  _clear:
259,599✔
1000
    taosArrayDestroy(pSkyline1);
259,599✔
1001
    taosArrayDestroy(pSkyline2);
259,599✔
1002
  }
1003

1004
  return code;
543,160✔
1005
}
1006

1007
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
23,962✔
1008
  SDelData *pDelData;
1009
  int32_t   code = 0;
23,962✔
1010
  int32_t   dataNum = eidx - sidx + 1;
23,962✔
1011
  SArray   *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY));
23,962✔
1012
  if (aTmpSkyline == NULL) {
23,962!
UNCOV
1013
    return terrno;
×
1014
  }
1015

1016
  SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES);
23,962✔
1017
  if (pSkyline == NULL) {
23,962!
1018
    taosArrayDestroy(aTmpSkyline);
×
UNCOV
1019
    return terrno;
×
1020
  }
1021

1022
  taosArrayClear(aSkyline);
23,962✔
1023
  for (int32_t i = sidx; i <= eidx; ++i) {
307,523✔
1024
    pDelData = (SDelData *)taosArrayGet(aDelData, i);
283,561✔
1025
    if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}) == NULL) {
567,122!
1026
      code = terrno;
×
UNCOV
1027
      goto _clear;
×
1028
    }
1029

1030
    if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}) == NULL) {
567,122!
1031
      code = terrno;
×
UNCOV
1032
      goto _clear;
×
1033
    }
1034
  }
1035

1036
  code = tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline);
23,962✔
1037
  if (code) goto _clear;
23,962!
1038

1039
  int32_t skylineNum = taosArrayGetSize(pSkyline);
23,962✔
1040
  for (int32_t i = 0; i < skylineNum; ++i) {
179,591✔
1041
    TSDBKEY *p = taosArrayGetP(pSkyline, i);
155,629✔
1042
    if (taosArrayPush(aSkyline, p) == NULL) {
155,629!
1043
      code = terrno;
×
UNCOV
1044
      goto _clear;
×
1045
    }
1046
  }
1047

1048
_clear:
23,962✔
1049
  taosArrayDestroy(aTmpSkyline);
23,962✔
1050
  taosArrayDestroy(pSkyline);
23,962✔
1051

1052
  return code;
23,962✔
1053
}
1054

1055
/*
1056
int32_t tsdbBuildDeleteSkyline2(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
1057
  int32_t   code = 0;
1058
  SDelData *pDelData;
1059
  int32_t   midx;
1060

1061
  taosArrayClear(aSkyline);
1062
  if (sidx == eidx) {
1063
    pDelData = (SDelData *)taosArrayGet(aDelData, sidx);
1064
    taosArrayPush(aSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version});
1065
    taosArrayPush(aSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0});
1066
  } else {
1067
    SArray *aSkyline1 = NULL;
1068
    SArray *aSkyline2 = NULL;
1069

1070
    aSkyline1 = taosArrayInit(0, sizeof(TSDBKEY));
1071
    aSkyline2 = taosArrayInit(0, sizeof(TSDBKEY));
1072
    if (aSkyline1 == NULL || aSkyline2 == NULL) {
1073
      code = TSDB_CODE_OUT_OF_MEMORY;
1074
      goto _clear;
1075
    }
1076
    midx = (sidx + eidx) / 2;
1077

1078
    code = tsdbBuildDeleteSkyline(aDelData, sidx, midx, aSkyline1);
1079
    if (code) goto _clear;
1080

1081
    code = tsdbBuildDeleteSkyline(aDelData, midx + 1, eidx, aSkyline2);
1082
    if (code) goto _clear;
1083

1084
    code = tsdbMergeSkyline(aSkyline1, aSkyline2, aSkyline);
1085

1086
  _clear:
1087
    taosArrayDestroy(aSkyline1);
1088
    taosArrayDestroy(aSkyline2);
1089
  }
1090

1091
  return code;
1092
}
1093
*/
1094

1095
// SBlockData ======================================================
1096
int32_t tBlockDataCreate(SBlockData *pBlockData) {
4,342,201✔
1097
  pBlockData->suid = 0;
4,342,201✔
1098
  pBlockData->uid = 0;
4,342,201✔
1099
  pBlockData->nRow = 0;
4,342,201✔
1100
  pBlockData->aUid = NULL;
4,342,201✔
1101
  pBlockData->aVersion = NULL;
4,342,201✔
1102
  pBlockData->aTSKEY = NULL;
4,342,201✔
1103
  pBlockData->nColData = 0;
4,342,201✔
1104
  pBlockData->aColData = NULL;
4,342,201✔
1105
  return 0;
4,342,201✔
1106
}
1107

1108
void tBlockDataDestroy(SBlockData *pBlockData) {
4,755,510✔
1109
  tFree(pBlockData->aUid);
4,755,510!
1110
  tFree(pBlockData->aVersion);
4,755,608!
1111
  tFree(pBlockData->aTSKEY);
4,755,614✔
1112

1113
  for (int32_t i = 0; i < pBlockData->nColData; i++) {
8,284,568✔
1114
    tColDataDestroy(&pBlockData->aColData[i]);
3,528,978✔
1115
  }
1116

1117
  if (pBlockData->aColData) {
4,755,590✔
1118
    taosMemoryFree(pBlockData->aColData);
901,430!
1119
    pBlockData->aColData = NULL;
901,438✔
1120
  }
1121
}
4,755,598✔
1122

1123
static int32_t tBlockDataAdjustColData(SBlockData *pBlockData, int32_t nColData) {
176,810✔
1124
  int32_t code = 0;
176,810✔
1125

1126
  if (pBlockData->nColData > nColData) {
176,810✔
1127
    for (int32_t i = nColData; i < pBlockData->nColData; i++) {
1,707✔
1128
      tColDataDestroy(&pBlockData->aColData[i]);
1,345✔
1129
    }
1130
  } else if (pBlockData->nColData < nColData) {
176,448✔
1131
    SColData *aColData = taosMemoryRealloc(pBlockData->aColData, sizeof(SBlockData) * nColData);
163,996!
1132
    if (aColData == NULL) {
164,040!
1133
      code = terrno;
×
UNCOV
1134
      goto _exit;
×
1135
    }
1136

1137
    pBlockData->aColData = aColData;
164,040✔
1138
    memset(&pBlockData->aColData[pBlockData->nColData], 0, sizeof(SBlockData) * (nColData - pBlockData->nColData));
164,040✔
1139
  }
1140
  pBlockData->nColData = nColData;
176,854✔
1141

1142
_exit:
176,854✔
1143
  return code;
176,854✔
1144
}
1145
int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid) {
176,827✔
1146
  int32_t code = 0;
176,827✔
1147

1148
  if (!pId->suid && !pId->uid) {
176,827!
UNCOV
1149
    return TSDB_CODE_INVALID_PARA;
×
1150
  }
1151

1152
  pBlockData->suid = pId->suid;
176,827✔
1153
  pBlockData->uid = pId->uid;
176,827✔
1154
  pBlockData->nRow = 0;
176,827✔
1155

1156
  if (aCid) {
176,827!
1157
    code = tBlockDataAdjustColData(pBlockData, nCid);
×
UNCOV
1158
    if (code) goto _exit;
×
1159

1160
    int32_t   iColumn = 1;
×
1161
    STColumn *pTColumn = &pTSchema->columns[iColumn];
×
UNCOV
1162
    for (int32_t iCid = 0; iCid < nCid; iCid++) {
×
1163
      // aCid array (from taos client catalog) contains columns that does not exist in the pTSchema. the pTSchema is
1164
      // newer
1165
      if (pTColumn == NULL) {
×
UNCOV
1166
        continue;
×
1167
      }
1168

1169
      while (pTColumn->colId < aCid[iCid]) {
×
1170
        iColumn++;
×
1171
        if (!(iColumn < pTSchema->numOfCols)) {
×
UNCOV
1172
          return TSDB_CODE_INVALID_PARA;
×
1173
        }
UNCOV
1174
        pTColumn = &pTSchema->columns[iColumn];
×
1175
      }
1176

1177
      if (pTColumn->colId != aCid[iCid]) {
×
UNCOV
1178
        continue;
×
1179
      }
1180

UNCOV
1181
      tColDataInit(&pBlockData->aColData[iCid], pTColumn->colId, pTColumn->type, pTColumn->flags);
×
1182

1183
      iColumn++;
×
UNCOV
1184
      pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL;
×
1185
    }
1186
  } else {
1187
    code = tBlockDataAdjustColData(pBlockData, pTSchema->numOfCols - 1);
176,827✔
1188
    if (code) goto _exit;
176,822!
1189

1190
    for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
1,774,446✔
1191
      STColumn *pTColumn = &pTSchema->columns[iColData + 1];
1,597,634✔
1192
      tColDataInit(&pBlockData->aColData[iColData], pTColumn->colId, pTColumn->type, pTColumn->flags);
1,597,634✔
1193
    }
1194
  }
1195

1196
_exit:
176,812✔
1197
  return code;
176,812✔
1198
}
1199

1200
void tBlockDataReset(SBlockData *pBlockData) {
3,440,418✔
1201
  pBlockData->suid = 0;
3,440,418✔
1202
  pBlockData->uid = 0;
3,440,418✔
1203
  pBlockData->nRow = 0;
3,440,418✔
1204
  for (int32_t i = 0; i < pBlockData->nColData; i++) {
4,694,882✔
1205
    tColDataDestroy(&pBlockData->aColData[i]);
1,254,421✔
1206
  }
1207
  pBlockData->nColData = 0;
3,440,461✔
1208
  taosMemoryFreeClear(pBlockData->aColData);
3,440,461✔
1209
}
3,440,488✔
1210

1211
void tBlockDataClear(SBlockData *pBlockData) {
294,339✔
1212
  pBlockData->nRow = 0;
294,339✔
1213
  for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
2,172,233✔
1214
    tColDataClear(tBlockDataGetColDataByIdx(pBlockData, iColData));
1,877,848✔
1215
  }
1216
}
294,385✔
1217

1218
int32_t tBlockDataAddColData(SBlockData *pBlockData, int16_t cid, int8_t type, int8_t cflag, SColData **ppColData) {
3,289,290✔
1219
  if (pBlockData->nColData != 0 && pBlockData->aColData[pBlockData->nColData - 1].cid >= cid) {
3,289,290!
UNCOV
1220
    return TSDB_CODE_INVALID_PARA;
×
1221
  }
1222

1223
  SColData *newColData = taosMemoryRealloc(pBlockData->aColData, sizeof(SColData) * (pBlockData->nColData + 1));
3,289,290!
1224
  if (newColData == NULL) {
3,290,686!
UNCOV
1225
    return terrno;
×
1226
  }
1227

1228
  pBlockData->aColData = newColData;
3,290,686✔
1229
  pBlockData->nColData++;
3,290,686✔
1230

1231
  *ppColData = &pBlockData->aColData[pBlockData->nColData - 1];
3,290,686✔
1232
  memset(*ppColData, 0, sizeof(SColData));
3,290,686✔
1233
  tColDataInit(*ppColData, cid, type, cflag);
3,290,686✔
1234

1235
  return 0;
3,290,338✔
1236
}
1237

1238
/* flag > 0: forward update
1239
 * flag == 0: insert
1240
 * flag < 0: backward update
1241
 */
1242
static int32_t tBlockDataUpsertBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow,
35,634,021✔
1243
                                        int32_t flag) {
1244
  int32_t code = 0;
35,634,021✔
1245

1246
  SColVal   cv = {0};
35,634,021✔
1247
  int32_t   iColDataFrom = 0;
35,634,021✔
1248
  SColData *pColDataFrom = (iColDataFrom < pBlockDataFrom->nColData) ? &pBlockDataFrom->aColData[iColDataFrom] : NULL;
35,634,021✔
1249

1250
  for (int32_t iColDataTo = 0; iColDataTo < pBlockData->nColData; iColDataTo++) {
204,536,403✔
1251
    SColData *pColDataTo = &pBlockData->aColData[iColDataTo];
168,273,175✔
1252

1253
    while (pColDataFrom && pColDataFrom->cid < pColDataTo->cid) {
168,273,189✔
1254
      pColDataFrom = (++iColDataFrom < pBlockDataFrom->nColData) ? &pBlockDataFrom->aColData[iColDataFrom] : NULL;
14!
1255
    }
1256

1257
    if (pColDataFrom == NULL || pColDataFrom->cid > pColDataTo->cid) {
168,936,127✔
1258
      cv = COL_VAL_NONE(pColDataTo->cid, pColDataTo->type);
662,966✔
1259
      if (flag == 0 && (code = tColDataAppendValue(pColDataTo, &cv))) goto _exit;
662,966!
1260
    } else {
1261
      code = tColDataGetValue(pColDataFrom, iRow, &cv);
167,610,209✔
1262
      if (code) goto _exit;
172,003,642!
1263

1264
      if (flag) {
172,003,642✔
1265
        code = tColDataUpdateValue(pColDataTo, &cv, flag > 0);
1,276,685✔
1266
      } else {
1267
        code = tColDataAppendValue(pColDataTo, &cv);
170,726,957✔
1268
      }
1269
      if (code) goto _exit;
168,239,430!
1270

1271
      pColDataFrom = (++iColDataFrom < pBlockDataFrom->nColData) ? &pBlockDataFrom->aColData[iColDataFrom] : NULL;
168,239,430✔
1272
    }
1273
  }
1274

1275
_exit:
36,263,228✔
1276
  return code;
36,263,228✔
1277
}
1278

1279
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
111,868,541✔
1280
  int32_t code = 0;
111,868,541✔
1281

1282
  if (!(pBlockData->suid || pBlockData->uid)) {
111,868,541!
UNCOV
1283
    return TSDB_CODE_INVALID_PARA;
×
1284
  }
1285

1286
  // uid
1287
  if (pBlockData->uid == 0) {
111,868,541✔
1288
    if (!uid) {
59,589,437!
UNCOV
1289
      return TSDB_CODE_INVALID_PARA;
×
1290
    }
1291
    code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1));
59,589,437!
1292
    if (code) goto _exit;
60,020,303!
1293
    pBlockData->aUid[pBlockData->nRow] = uid;
60,020,303✔
1294
  }
1295
  // version
1296
  code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1));
112,299,407✔
1297
  if (code) goto _exit;
112,408,476!
1298
  pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow);
112,408,476✔
1299
  // timestamp
1300
  code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1));
112,408,476✔
1301
  if (code) goto _exit;
112,649,848!
1302
  pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow);
112,649,848✔
1303

1304
  if (pRow->type == TSDBROW_ROW_FMT) {
112,649,848✔
1305
    code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, 0 /* append */);
77,816,711✔
1306
    if (code) goto _exit;
78,030,568!
1307
  } else if (pRow->type == TSDBROW_COL_FMT) {
34,833,137!
1308
    code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, 0 /* append */);
35,433,878✔
1309
    if (code) goto _exit;
35,593,001!
1310
  } else {
UNCOV
1311
    return TSDB_CODE_INVALID_PARA;
×
1312
  }
1313
  pBlockData->nRow++;
113,623,569✔
1314

1315
_exit:
113,623,569✔
1316
  return code;
113,623,569✔
1317
}
1318
int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
375,277✔
1319
  int32_t code = 0;
375,277✔
1320

1321
  // version
1322
  int64_t lversion = pBlockData->aVersion[pBlockData->nRow - 1];
375,277✔
1323
  int64_t rversion = TSDBROW_VERSION(pRow);
375,277✔
1324
  if (lversion == rversion) {
375,277!
UNCOV
1325
    return TSDB_CODE_INVALID_PARA;
×
1326
  }
1327
  if (rversion > lversion) {
375,277!
1328
    pBlockData->aVersion[pBlockData->nRow - 1] = rversion;
375,330✔
1329
  }
1330

1331
  // update other rows
1332
  if (pRow->type == TSDBROW_ROW_FMT) {
375,277✔
1333
    code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData,
182,819!
1334
                             (rversion > lversion) ? 1 : -1 /* update */);
1335
    if (code) goto _exit;
185,419!
1336
  } else if (pRow->type == TSDBROW_COL_FMT) {
192,458!
1337
    code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, (rversion > lversion) ? 1 : -1);
192,647!
1338
    if (code) goto _exit;
194,200!
1339
  } else {
1340
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1341
    goto _exit;
×
1342
  }
1343

1344
_exit:
379,430✔
1345
  return code;
379,430✔
1346
}
1347

1348
#ifdef BUILD_NO_CALL
1349
int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid) {
1350
  if (pBlockData->nRow == 0) {
1351
    return 1;
1352
  } else if (pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
1353
    return pBlockData->nRow;
1354
  } else {
1355
    return pBlockData->nRow + 1;
1356
  }
1357
}
1358

1359
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
1360
  if (pBlockData->nRow > 0 && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
1361
    return tBlockDataUpdateRow(pBlockData, pRow, pTSchema);
1362
  } else {
1363
    return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid);
1364
  }
1365
}
1366
#endif
1367

1368
SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid) {
226,846,552✔
1369
  int32_t lidx = 0;
226,846,552✔
1370
  int32_t ridx = pBlockData->nColData - 1;
226,846,552✔
1371

1372
  while (lidx <= ridx) {
307,545,961✔
1373
    int32_t   midx = (lidx + ridx) >> 1;
100,716,917✔
1374
    SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, midx);
100,716,917✔
1375
    int32_t   c = (pColData->cid == cid) ? 0 : ((pColData->cid > cid) ? 1 : -1);
100,716,917✔
1376

1377
    if (c == 0) {
100,716,917✔
1378
      return pColData;
20,017,508✔
1379
    } else if (c < 0) {
80,699,409✔
1380
      lidx = midx + 1;
59,308,124✔
1381
    } else {
1382
      ridx = midx - 1;
21,391,285✔
1383
    }
1384
  }
1385

1386
  return NULL;
206,829,044✔
1387
}
1388

1389
/* buffers[0]: SDiskDataHdr
1390
 * buffers[1]: key part: uid + version + ts + primary keys
1391
 * buffers[2]: SBlockCol part
1392
 * buffers[3]: regular column part
1393
 */
1394
int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SBuffer *assist) {
173,134✔
1395
  int32_t code = 0;
173,134✔
1396
  int32_t lino = 0;
173,134✔
1397

1398
  SColCompressInfo *pInfo = pCompr;
173,134✔
1399
  code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, 1, &pInfo->defaultCmprAlg);
173,134✔
1400
  TAOS_UNUSED(code);
1401

1402
  SDiskDataHdr hdr = {
173,148✔
1403
      .delimiter = TSDB_FILE_DLMT,
1404
      .fmtVer = 2,
1405
      .suid = bData->suid,
173,148✔
1406
      .uid = bData->uid,
173,148✔
1407
      .szUid = 0,     // filled by compress key
1408
      .szVer = 0,     // filled by compress key
1409
      .szKey = 0,     // filled by compress key
1410
      .szBlkCol = 0,  // filled by this func
1411
      .nRow = bData->nRow,
173,148✔
1412
      .cmprAlg = pInfo->defaultCmprAlg,
173,148✔
1413
      .numOfPKs = 0,  // filled by compress key
1414
  };
1415
  // Key part
1416

1417
  tBufferClear(&buffers[1]);
173,148✔
1418
  code = tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist, (SColCompressInfo *)pInfo);
173,148✔
1419
  TSDB_CHECK_CODE(code, lino, _exit);
173,217!
1420

1421
  // Regulart column part
1422
  tBufferClear(&buffers[2]);
173,217✔
1423
  tBufferClear(&buffers[3]);
173,217✔
1424
  for (int i = 0; i < bData->nColData; i++) {
1,677,541✔
1425
    SColData *colData = tBlockDataGetColDataByIdx(bData, i);
1,504,556✔
1426

1427
    if (colData->cflag & COL_IS_KEY) {
1,504,556✔
1428
      continue;
69,964✔
1429
    }
1430
    if (colData->flag == HAS_NONE) {
1,504,503✔
1431
      continue;
69,911✔
1432
    }
1433

1434
    SColDataCompressInfo cinfo = {
1,434,592✔
1435
        .cmprAlg = pInfo->defaultCmprAlg,
1,434,592✔
1436
    };
1437
    code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, colData->cid, &cinfo.cmprAlg);
1,434,592✔
1438
    if (code < 0) {
1439
      //
1440
    }
1441

1442
    int32_t offset = buffers[3].size;
1,433,857✔
1443
    code = tColDataCompress(colData, &cinfo, &buffers[3], assist);
1,433,857✔
1444
    TSDB_CHECK_CODE(code, lino, _exit);
1,433,921!
1445

1446
    SBlockCol blockCol = (SBlockCol){.cid = cinfo.columnId,
1,433,921✔
1447
                                     .type = cinfo.dataType,
1,433,921✔
1448
                                     .cflag = cinfo.columnFlag,
1,433,921✔
1449
                                     .flag = cinfo.flag,
1,433,921✔
1450
                                     .szOrigin = cinfo.dataOriginalSize,
1,433,921✔
1451
                                     .szBitmap = cinfo.bitmapCompressedSize,
1,433,921✔
1452
                                     .szOffset = cinfo.offsetCompressedSize,
1,433,921✔
1453
                                     .szValue = cinfo.dataCompressedSize,
1,433,921✔
1454
                                     .offset = offset,
1455
                                     .alg = cinfo.cmprAlg};
1,433,921✔
1456

1457
    code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer, hdr.cmprAlg);
1,433,921✔
1458
    TSDB_CHECK_CODE(code, lino, _exit);
1,434,360!
1459
  }
1460
  hdr.szBlkCol = buffers[2].size;
172,985✔
1461

1462
  // SDiskDataHdr part
1463
  tBufferClear(&buffers[0]);
1464
  code = tPutDiskDataHdr(&buffers[0], &hdr);
172,985✔
1465
  TSDB_CHECK_CODE(code, lino, _exit);
173,206!
1466

1467
_exit:
173,206✔
1468
  return code;
173,206✔
1469
}
1470

1471
int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist) {
17,829✔
1472
  int32_t       code = 0;
17,829✔
1473
  int32_t       lino = 0;
17,829✔
1474
  SDiskDataHdr  hdr = {0};
17,829✔
1475
  SCompressInfo cinfo;
1476

1477
  // SDiskDataHdr
1478
  code = tGetDiskDataHdr(br, &hdr);
17,829✔
1479
  TSDB_CHECK_CODE(code, lino, _exit);
17,833!
1480

1481
  tBlockDataReset(blockData);
17,833✔
1482
  blockData->suid = hdr.suid;
17,832✔
1483
  blockData->uid = hdr.uid;
17,832✔
1484
  blockData->nRow = hdr.nRow;
17,832✔
1485

1486
  // Key part
1487
  code = tBlockDataDecompressKeyPart(&hdr, br, blockData, assist);
17,832✔
1488
  TSDB_CHECK_CODE(code, lino, _exit);
17,832!
1489

1490
  // Column part
1491
  SBufferReader br2 = *br;
17,832✔
1492
  br->offset += hdr.szBlkCol;
17,832✔
1493
  for (uint32_t startOffset = br2.offset; br2.offset - startOffset < hdr.szBlkCol;) {
111,701✔
1494
    SBlockCol blockCol;
1495

1496
    code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer, hdr.cmprAlg);
93,863✔
1497
    TSDB_CHECK_CODE(code, lino, _exit);
93,834!
1498
    if (blockCol.alg == 0) blockCol.alg = hdr.cmprAlg;
93,834!
1499
    code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist);
93,834✔
1500
    TSDB_CHECK_CODE(code, lino, _exit);
93,869!
1501
  }
1502

1503
_exit:
17,838✔
1504
  return code;
17,838✔
1505
}
1506

1507
// SDiskDataHdr ==============================
1508
int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) {
173,232✔
1509
  int32_t code;
1510

1511
  if ((code = tBufferPutU32(buffer, pHdr->delimiter))) return code;
346,478!
1512
  if ((code = tBufferPutU32v(buffer, pHdr->fmtVer))) return code;
346,492!
1513
  if ((code = tBufferPutI64(buffer, pHdr->suid))) return code;
346,492!
1514
  if ((code = tBufferPutI64(buffer, pHdr->uid))) return code;
346,492!
1515
  if ((code = tBufferPutI32v(buffer, pHdr->szUid))) return code;
346,492!
1516
  if ((code = tBufferPutI32v(buffer, pHdr->szVer))) return code;
346,492!
1517
  if ((code = tBufferPutI32v(buffer, pHdr->szKey))) return code;
346,492!
1518
  if ((code = tBufferPutI32v(buffer, pHdr->szBlkCol))) return code;
346,492!
1519
  if ((code = tBufferPutI32v(buffer, pHdr->nRow))) return code;
346,492!
1520
  if (pHdr->fmtVer < 2) {
173,246!
UNCOV
1521
    if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code;
×
1522
  } else if (pHdr->fmtVer == 2) {
173,246✔
1523
    if ((code = tBufferPutU32(buffer, pHdr->cmprAlg))) return code;
346,410!
1524
  } else {
1525
    // more data fmt ver
1526
  }
1527
  if (pHdr->fmtVer >= 1) {
173,246✔
1528
    if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code;
346,372!
1529
    for (int i = 0; i < pHdr->numOfPKs; i++) {
173,239✔
1530
      if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) return code;
53!
1531
    }
1532
  }
1533

1534
  return 0;
173,246✔
1535
}
1536

1537
int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) {
1,406,246✔
1538
  int32_t code;
1539

1540
  if ((code = tBufferGetU32(br, &pHdr->delimiter))) return code;
1,406,246!
1541
  if ((code = tBufferGetU32v(br, &pHdr->fmtVer))) return code;
1,406,659!
1542
  if ((code = tBufferGetI64(br, &pHdr->suid))) return code;
1,406,966!
1543
  if ((code = tBufferGetI64(br, &pHdr->uid))) return code;
1,406,173!
1544
  if ((code = tBufferGetI32v(br, &pHdr->szUid))) return code;
1,406,800!
1545
  if ((code = tBufferGetI32v(br, &pHdr->szVer))) return code;
1,407,084!
1546
  if ((code = tBufferGetI32v(br, &pHdr->szKey))) return code;
1,407,122!
1547
  if ((code = tBufferGetI32v(br, &pHdr->szBlkCol))) return code;
1,406,815!
1548
  if ((code = tBufferGetI32v(br, &pHdr->nRow))) return code;
1,407,201!
1549
  if (pHdr->fmtVer < 2) {
1,407,122!
1550
    int8_t cmprAlg = 0;
×
1551
    if ((code = tBufferGetI8(br, &cmprAlg))) return code;
×
UNCOV
1552
    pHdr->cmprAlg = cmprAlg;
×
1553
  } else if (pHdr->fmtVer == 2) {
1,407,122✔
1554
    if ((code = tBufferGetU32(br, &pHdr->cmprAlg))) return code;
1,406,462!
1555
  } else {
1556
    // more data fmt ver
1557
  }
1558
  if (pHdr->fmtVer >= 1) {
1,407,504✔
1559
    if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code;
1,406,910!
1560
    for (int i = 0; i < pHdr->numOfPKs; i++) {
1,406,355✔
1561
      if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) {
53!
UNCOV
1562
        return code;
×
1563
      }
1564
    }
1565
  } else {
1566
    pHdr->numOfPKs = 0;
594✔
1567
  }
1568

1569
  return 0;
1,406,896✔
1570
}
1571

1572
// ALGORITHM ==============================
1573
int32_t tPutColumnDataAgg(SBuffer *buffer, SColumnDataAgg *pColAgg) {
71,309✔
1574
  int32_t code;
1575

1576
  if ((code = tBufferPutI16v(buffer, pColAgg->colId))) return code;
142,618!
1577
  if ((code = tBufferPutI16v(buffer, pColAgg->numOfNull))) return code;
142,618!
1578
  if ((code = tBufferPutI64(buffer, pColAgg->sum))) return code;
142,618!
1579
  if ((code = tBufferPutI64(buffer, pColAgg->max))) return code;
142,618!
1580
  if ((code = tBufferPutI64(buffer, pColAgg->min))) return code;
142,618!
1581

1582
  return 0;
71,309✔
1583
}
1584

1585
int32_t tGetColumnDataAgg(SBufferReader *br, SColumnDataAgg *pColAgg) {
95,763✔
1586
  int32_t code;
1587

1588
  if ((code = tBufferGetI16v(br, &pColAgg->colId))) return code;
95,763!
1589
  if ((code = tBufferGetI16v(br, &pColAgg->numOfNull))) return code;
95,761!
1590
  if ((code = tBufferGetI64(br, &pColAgg->sum))) return code;
95,762!
1591
  if ((code = tBufferGetI64(br, &pColAgg->max))) return code;
95,762!
1592
  if ((code = tBufferGetI64(br, &pColAgg->min))) return code;
95,762!
1593

1594
  return 0;
95,759✔
1595
}
1596

1597
static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist,
173,137✔
1598
                                         SColCompressInfo *compressInfo) {
1599
  int32_t       code = 0;
173,137✔
1600
  int32_t       lino = 0;
173,137✔
1601
  SCompressInfo cinfo;
1602

1603
  // uid
1604
  if (bData->uid == 0) {
173,137✔
1605
    cinfo = (SCompressInfo){
146,240✔
1606
        .cmprAlg = hdr->cmprAlg,
146,240✔
1607
        .dataType = TSDB_DATA_TYPE_BIGINT,
1608
        .originalSize = sizeof(int64_t) * bData->nRow,
146,240✔
1609
    };
1610
    code = tCompressDataToBuffer(bData->aUid, &cinfo, buffer, assist);
146,240✔
1611
    TSDB_CHECK_CODE(code, lino, _exit);
146,283!
1612
    hdr->szUid = cinfo.compressedSize;
146,283✔
1613
  }
1614

1615
  // version
1616
  cinfo = (SCompressInfo){
173,180✔
1617
      .cmprAlg = hdr->cmprAlg,
173,180✔
1618
      .dataType = TSDB_DATA_TYPE_BIGINT,
1619
      .originalSize = sizeof(int64_t) * bData->nRow,
173,180✔
1620
  };
1621
  code = tCompressDataToBuffer((uint8_t *)bData->aVersion, &cinfo, buffer, assist);
173,180✔
1622
  TSDB_CHECK_CODE(code, lino, _exit);
173,226!
1623
  hdr->szVer = cinfo.compressedSize;
173,226✔
1624

1625
  // ts
1626
  cinfo = (SCompressInfo){
173,226✔
1627
      .cmprAlg = hdr->cmprAlg,
173,226✔
1628
      .dataType = TSDB_DATA_TYPE_TIMESTAMP,
1629
      .originalSize = sizeof(TSKEY) * bData->nRow,
173,226✔
1630
  };
1631

1632
  code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist);
173,226✔
1633
  TSDB_CHECK_CODE(code, lino, _exit);
173,217!
1634
  hdr->szKey = cinfo.compressedSize;
173,217✔
1635

1636
  // primary keys
1637
  for (hdr->numOfPKs = 0; hdr->numOfPKs < bData->nColData; hdr->numOfPKs++) {
173,270✔
1638
    if (!(hdr->numOfPKs <= TD_MAX_PK_COLS)) {
173,268!
UNCOV
1639
      return TSDB_CODE_INVALID_PARA;
×
1640
    }
1641

1642
    SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs];
173,268✔
1643
    SColData  *colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs);
173,268✔
1644

1645
    if ((colData->cflag & COL_IS_KEY) == 0) {
173,268✔
1646
      break;
173,215✔
1647
    }
1648

1649
    SColDataCompressInfo info = {
53✔
1650
        .cmprAlg = hdr->cmprAlg,
53✔
1651
    };
1652
    code = tsdbGetColCmprAlgFromSet(compressInfo->pColCmpr, colData->cid, &info.cmprAlg);
53✔
1653
    if (code < 0) {
1654
      // do nothing
1655
    } else {
1656
    }
1657

1658
    code = tColDataCompress(colData, &info, buffer, assist);
53✔
1659
    TSDB_CHECK_CODE(code, lino, _exit);
53!
1660

1661
    *blockCol = (SBlockCol){
53✔
1662
        .cid = info.columnId,
53✔
1663
        .type = info.dataType,
53✔
1664
        .cflag = info.columnFlag,
53✔
1665
        .flag = info.flag,
53✔
1666
        .szOrigin = info.dataOriginalSize,
53✔
1667
        .szBitmap = info.bitmapCompressedSize,
53✔
1668
        .szOffset = info.offsetCompressedSize,
53✔
1669
        .szValue = info.dataCompressedSize,
53✔
1670
        .offset = 0,
1671
        .alg = info.cmprAlg,
53✔
1672
    };
1673
  }
1674

1675
_exit:
2✔
1676
  return code;
173,217✔
1677
}
1678

1679
int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br,
3,289,569✔
1680
                                    SBlockData *blockData, SBuffer *assist) {
1681
  int32_t code = 0;
3,289,569✔
1682
  int32_t lino = 0;
3,289,569✔
1683

1684
  SColData *colData;
1685

1686
  code = tBlockDataAddColData(blockData, blockCol->cid, blockCol->type, blockCol->cflag, &colData);
3,289,569✔
1687
  TSDB_CHECK_CODE(code, lino, _exit);
3,290,328!
1688

1689
  SColDataCompressInfo info = {
2,605,498✔
1690
      .cmprAlg = blockCol->alg,
3,290,328✔
1691
      .columnFlag = blockCol->cflag,
3,290,328✔
1692
      .flag = blockCol->flag,
3,290,328✔
1693
      .dataType = blockCol->type,
3,290,328✔
1694
      .columnId = blockCol->cid,
3,290,328✔
1695
      .numOfData = hdr->nRow,
3,290,328✔
1696
      .bitmapOriginalSize = 0,
1697
      .bitmapCompressedSize = blockCol->szBitmap,
3,290,328✔
1698
      .offsetOriginalSize = blockCol->szOffset ? sizeof(int32_t) * hdr->nRow : 0,
3,290,328✔
1699
      .offsetCompressedSize = blockCol->szOffset,
3,290,328✔
1700
      .dataOriginalSize = blockCol->szOrigin,
3,290,328✔
1701
      .dataCompressedSize = blockCol->szValue,
3,290,328✔
1702
  };
1703

1704
  switch (blockCol->flag) {
3,290,328✔
1705
    case (HAS_NONE | HAS_NULL | HAS_VALUE):
2✔
1706
      info.bitmapOriginalSize = BIT2_SIZE(hdr->nRow);
2✔
1707
      break;
2✔
1708
    case (HAS_NONE | HAS_NULL):
284,793✔
1709
    case (HAS_NONE | HAS_VALUE):
1710
    case (HAS_NULL | HAS_VALUE):
1711
      info.bitmapOriginalSize = BIT1_SIZE(hdr->nRow);
284,793✔
1712
      break;
284,793✔
1713
  }
1714

1715
  code = tColDataDecompress(BR_PTR(br), &info, colData, assist);
3,290,328✔
1716
  TSDB_CHECK_CODE(code, lino, _exit);
3,289,139!
1717
  br->offset += blockCol->szBitmap + blockCol->szOffset + blockCol->szValue;
3,289,139✔
1718

1719
_exit:
3,289,139✔
1720
  return code;
3,289,139✔
1721
}
1722

1723
int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData,
1,405,094✔
1724
                                    SBuffer *assist) {
1725
  int32_t       code = 0;
1,405,094✔
1726
  int32_t       lino = 0;
1,405,094✔
1727
  SCompressInfo cinfo;
1728

1729
  // uid
1730
  if (hdr->szUid > 0) {
1,405,094✔
1731
    cinfo = (SCompressInfo){
1,021,277✔
1732
        .cmprAlg = hdr->cmprAlg,
1,021,277✔
1733
        .dataType = TSDB_DATA_TYPE_BIGINT,
1734
        .compressedSize = hdr->szUid,
1,021,277✔
1735
        .originalSize = sizeof(int64_t) * hdr->nRow,
1,021,277✔
1736
    };
1737

1738
    code = tRealloc((uint8_t **)&blockData->aUid, cinfo.originalSize);
1,021,277✔
1739
    TSDB_CHECK_CODE(code, lino, _exit);
1,022,417!
1740
    code = tDecompressData(BR_PTR(br), &cinfo, blockData->aUid, cinfo.originalSize, assist);
1,022,417✔
1741
    TSDB_CHECK_CODE(code, lino, _exit);
1,021,429!
1742
    br->offset += cinfo.compressedSize;
1,021,429✔
1743
  }
1744

1745
  // version
1746
  cinfo = (SCompressInfo){
1,405,246✔
1747
      .cmprAlg = hdr->cmprAlg,
1,405,246✔
1748
      .dataType = TSDB_DATA_TYPE_BIGINT,
1749
      .compressedSize = hdr->szVer,
1,405,246✔
1750
      .originalSize = sizeof(int64_t) * hdr->nRow,
1,405,246✔
1751
  };
1752
  code = tRealloc((uint8_t **)&blockData->aVersion, cinfo.originalSize);
1,405,246✔
1753
  TSDB_CHECK_CODE(code, lino, _exit);
1,407,005!
1754
  code = tDecompressData(BR_PTR(br), &cinfo, blockData->aVersion, cinfo.originalSize, assist);
1,407,005✔
1755
  TSDB_CHECK_CODE(code, lino, _exit);
1,407,626!
1756
  br->offset += cinfo.compressedSize;
1,407,626✔
1757

1758
  // ts
1759
  cinfo = (SCompressInfo){
1,407,626✔
1760
      .cmprAlg = hdr->cmprAlg,
1,407,626✔
1761
      .dataType = TSDB_DATA_TYPE_TIMESTAMP,
1762
      .compressedSize = hdr->szKey,
1,407,626✔
1763
      .originalSize = sizeof(TSKEY) * hdr->nRow,
1,407,626✔
1764
  };
1765
  code = tRealloc((uint8_t **)&blockData->aTSKEY, cinfo.originalSize);
1,407,626✔
1766
  TSDB_CHECK_CODE(code, lino, _exit);
1,407,530!
1767
  code = tDecompressData(BR_PTR(br), &cinfo, blockData->aTSKEY, cinfo.originalSize, assist);
1,407,530✔
1768
  TSDB_CHECK_CODE(code, lino, _exit);
1,407,514!
1769
  br->offset += cinfo.compressedSize;
1,407,514✔
1770

1771
  // primary keys
1772
  for (int i = 0; i < hdr->numOfPKs; i++) {
1,407,567✔
1773
    const SBlockCol *blockCol = &hdr->primaryBlockCols[i];
53✔
1774

1775
    if (!(blockCol->flag == HAS_VALUE)) {
53!
UNCOV
1776
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
1777
    }
1778
    if (!(blockCol->cflag & COL_IS_KEY)) {
53!
UNCOV
1779
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
1780
    }
1781

1782
    code = tBlockDataDecompressColData(hdr, blockCol, br, blockData, assist);
53✔
1783
    TSDB_CHECK_CODE(code, lino, _exit);
53!
1784
  }
1785

1786
_exit:
1,407,514✔
1787
  return code;
1,407,514✔
1788
}
1789

1790
int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg) {
1,607,403✔
1791
  if (set == NULL) return -1;
1,607,403!
1792

1793
  uint32_t *ret = taosHashGet(set, &colId, sizeof(colId));
1,607,403✔
1794
  if (ret == NULL) {
1,606,637!
UNCOV
1795
    return TSDB_CODE_NOT_FOUND;
×
1796
  }
1797

1798
  *alg = *ret;
1,606,637✔
1799
  return 0;
1,606,637✔
1800
}
1801
uint32_t tsdbCvtTimestampAlg(uint32_t alg) {
×
UNCOV
1802
  DEFINE_VAR(alg)
×
1803

UNCOV
1804
  return 0;
×
1805
}
1806

1807
int32_t tsdbAllocateDisk(STsdb *tsdb, const char *label, int32_t expLevel, SDiskID *diskId) {
142,145✔
1808
  int32_t code = 0;
142,145✔
1809
  int32_t lino = 0;
142,145✔
1810
  SDiskID did = {0};
142,145✔
1811
  STfs   *tfs = tsdb->pVnode->pTfs;
142,145✔
1812

1813
  code = tfsAllocDisk(tfs, expLevel, label, &did);
142,145✔
1814
  if (code) {
142,226!
UNCOV
1815
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, __LINE__,
×
1816
              tstrerror(code));
UNCOV
1817
    return code;
×
1818
  }
1819

1820
  if (tfsMkdirRecurAt(tfs, tsdb->path, did) != 0) {
142,226!
UNCOV
1821
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, __LINE__,
×
1822
              tstrerror(code));
1823
  }
1824

1825
  if (diskId) {
142,167✔
1826
    *diskId = did;
142,142✔
1827
  }
1828
  return code;
142,167✔
1829
}
1830

1831
int32_t tsdbAllocateDiskAtLevel(STsdb *tsdb, int32_t level, const char *label, SDiskID *diskId) {
30✔
1832
  int32_t code = 0;
30✔
1833
  SDiskID did = {0};
30✔
1834
  STfs   *tfs = tsdb->pVnode->pTfs;
30✔
1835

1836
  code = tfsAllocDiskAtLevel(tfs, level, label, &did);
30✔
1837
  if (code) {
30!
UNCOV
1838
    return code;
×
1839
  }
1840

1841
  if (tfsMkdirRecurAt(tfs, tsdb->path, did) != 0) {
30!
UNCOV
1842
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, __LINE__,
×
1843
              tstrerror(code));
1844
  }
1845

1846
  if (diskId) {
30!
1847
    *diskId = did;
30✔
1848
  }
1849
  return 0;
30✔
1850
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc