• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3534

21 Nov 2024 07:36AM UTC coverage: 60.825% (+2.0%) from 58.848%
#3534

push

travis-ci

web-flow
Merge pull request #28810 from taosdata/ehn/add-sync-heartbeat-sent-time-to-log

ehn:add-sync-heartbeat-sent-time-to-log

120023 of 252376 branches covered (47.56%)

Branch coverage included in aggregate %.

43 of 47 new or added lines in 3 files covered. (91.49%)

2254 existing lines in 162 files now uncovered.

200876 of 275203 relevant lines covered (72.99%)

16110754.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.17
/source/dnode/vnode/src/tsdb/tsdbUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcompression.h"
17
#include "tdataformat.h"
18
#include "tsdb.h"
19
#include "tsdbDef.h"
20

21
int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg);
22

23
static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist,
24
                                         SColCompressInfo *pCompressExt);
25

26
// SMapData =======================================================================
27
void tMapDataReset(SMapData *pMapData) {
×
28
  pMapData->nItem = 0;
×
29
  pMapData->nData = 0;
×
30
}
×
31

32
void tMapDataClear(SMapData *pMapData) {
×
33
  tFree(pMapData->aOffset);
×
34
  tFree(pMapData->pData);
×
35
  pMapData->pData = NULL;
×
36
  pMapData->aOffset = NULL;
×
37
}
×
38

39
#ifdef BUILD_NO_CALL
40
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
41
  int32_t code = 0;
42
  int32_t offset = pMapData->nData;
43
  int32_t nItem = pMapData->nItem;
44

45
  pMapData->nItem++;
46
  pMapData->nData += tPutItemFn(NULL, pItem);
47

48
  // alloc
49
  code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
50
  if (code) goto _exit;
51
  code = tRealloc(&pMapData->pData, pMapData->nData);
52
  if (code) goto _exit;
53

54
  // put
55
  pMapData->aOffset[nItem] = offset;
56
  tPutItemFn(pMapData->pData + offset, pItem);
57

58
_exit:
59
  return code;
60
}
61

62
int32_t tMapDataCopy(SMapData *pFrom, SMapData *pTo) {
63
  int32_t code = 0;
64

65
  pTo->nItem = pFrom->nItem;
66
  pTo->nData = pFrom->nData;
67
  code = tRealloc((uint8_t **)&pTo->aOffset, sizeof(int32_t) * pFrom->nItem);
68
  if (code) goto _exit;
69
  code = tRealloc(&pTo->pData, pFrom->nData);
70
  if (code) goto _exit;
71
  memcpy(pTo->aOffset, pFrom->aOffset, sizeof(int32_t) * pFrom->nItem);
72
  memcpy(pTo->pData, pFrom->pData, pFrom->nData);
73

74
_exit:
75
  return code;
76
}
77

78
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
79
                       int32_t (*tItemCmprFn)(const void *, const void *), void *pItem) {
80
  int32_t code = 0;
81
  int32_t lidx = 0;
82
  int32_t ridx = pMapData->nItem - 1;
83
  int32_t midx;
84
  int32_t c;
85

86
  while (lidx <= ridx) {
87
    midx = (lidx + ridx) / 2;
88

89
    tMapDataGetItemByIdx(pMapData, midx, pItem, tGetItemFn);
90

91
    c = tItemCmprFn(pSearchItem, pItem);
92
    if (c == 0) {
93
      goto _exit;
94
    } else if (c < 0) {
95
      ridx = midx - 1;
96
    } else {
97
      lidx = midx + 1;
98
    }
99
  }
100

101
  code = TSDB_CODE_NOT_FOUND;
102

103
_exit:
104
  return code;
105
}
106
#endif
107

108
void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) {
×
109
  int32_t r = tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem);
×
110
}
×
111

112
#ifdef BUILD_NO_CALL
113
int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *),
114
                        SArray **ppArray) {
115
  int32_t code = 0;
116

117
  SArray *pArray = taosArrayInit(pMapData->nItem, itemSize);
118
  if (pArray == NULL) {
119
    code = terrno;
120
    goto _exit;
121
  }
122

123
  for (int32_t i = 0; i < pMapData->nItem; i++) {
124
    tMapDataGetItemByIdx(pMapData, i, taosArrayReserve(pArray, 1), tGetItemFn);
125
  }
126

127
_exit:
128
  *ppArray = pArray;
129
  return code;
130
}
131

132
int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
133
  int32_t n = 0;
134

135
  n += tPutI32v(p ? p + n : p, pMapData->nItem);
136
  if (pMapData->nItem) {
137
    int32_t lOffset = 0;
138
    for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
139
      n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem] - lOffset);
140
      lOffset = pMapData->aOffset[iItem];
141
    }
142

143
    n += tPutI32v(p ? p + n : p, pMapData->nData);
144
    if (p) {
145
      memcpy(p + n, pMapData->pData, pMapData->nData);
146
    }
147
    n += pMapData->nData;
148
  }
149

150
  return n;
151
}
152
#endif
153

154
int32_t tGetMapData(uint8_t *p, SMapData *pMapData, int32_t *decodedSize) {
×
155
  int32_t n = 0;
×
156
  int32_t code;
157
  int32_t offset;
158

159
  tMapDataReset(pMapData);
×
160

161
  n += tGetI32v(p + n, &pMapData->nItem);
×
162
  if (pMapData->nItem) {
×
163
    code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
×
164
    if (code) {
×
165
      return code;
×
166
    }
167

168
    int32_t lOffset = 0;
×
169
    for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
×
170
      n += tGetI32v(p + n, &pMapData->aOffset[iItem]);
×
171
      pMapData->aOffset[iItem] += lOffset;
×
172
      lOffset = pMapData->aOffset[iItem];
×
173
    }
174

175
    n += tGetI32v(p + n, &pMapData->nData);
×
176
    code = tRealloc(&pMapData->pData, pMapData->nData);
×
177
    if (code) {
×
178
      return code;
×
179
    }
180
    memcpy(pMapData->pData, p + n, pMapData->nData);
×
181
    n += pMapData->nData;
×
182
  }
183

184
  if (decodedSize) {
×
185
    *decodedSize = n;
×
186
  }
187

188
  return 0;
×
189
}
190

191
#ifdef BUILD_NO_CALL
192
// TABLEID =======================================================================
193
int32_t tTABLEIDCmprFn(const void *p1, const void *p2) {
194
  TABLEID *pId1 = (TABLEID *)p1;
195
  TABLEID *pId2 = (TABLEID *)p2;
196

197
  if (pId1->suid < pId2->suid) {
198
    return -1;
199
  } else if (pId1->suid > pId2->suid) {
200
    return 1;
201
  }
202

203
  if (pId1->uid < pId2->uid) {
204
    return -1;
205
  } else if (pId1->uid > pId2->uid) {
206
    return 1;
207
  }
208

209
  return 0;
210
}
211

212
// SBlockIdx ======================================================
213
int32_t tPutBlockIdx(uint8_t *p, void *ph) {
214
  int32_t    n = 0;
215
  SBlockIdx *pBlockIdx = (SBlockIdx *)ph;
216

217
  n += tPutI64(p ? p + n : p, pBlockIdx->suid);
218
  n += tPutI64(p ? p + n : p, pBlockIdx->uid);
219
  n += tPutI64v(p ? p + n : p, pBlockIdx->offset);
220
  n += tPutI64v(p ? p + n : p, pBlockIdx->size);
221

222
  return n;
223
}
224
#endif
225

226
int32_t tGetBlockIdx(uint8_t *p, void *ph) {
×
227
  int32_t    n = 0;
×
228
  SBlockIdx *pBlockIdx = (SBlockIdx *)ph;
×
229

230
  n += tGetI64(p + n, &pBlockIdx->suid);
×
231
  n += tGetI64(p + n, &pBlockIdx->uid);
×
232
  n += tGetI64v(p + n, &pBlockIdx->offset);
×
233
  n += tGetI64v(p + n, &pBlockIdx->size);
×
234

235
  return n;
×
236
}
237

238
#ifdef BUILD_NO_CALL
239
int32_t tCmprBlockIdx(void const *lhs, void const *rhs) {
240
  SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
241
  SBlockIdx *rBlockIdx = (SBlockIdx *)rhs;
242

243
  if (lBlockIdx->suid < rBlockIdx->suid) {
244
    return -1;
245
  } else if (lBlockIdx->suid > rBlockIdx->suid) {
246
    return 1;
247
  }
248

249
  if (lBlockIdx->uid < rBlockIdx->uid) {
250
    return -1;
251
  } else if (lBlockIdx->uid > rBlockIdx->uid) {
252
    return 1;
253
  }
254

255
  return 0;
256
}
257

258
int32_t tCmprBlockL(void const *lhs, void const *rhs) {
259
  SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
260
  SSttBlk   *rBlockL = (SSttBlk *)rhs;
261

262
  if (lBlockIdx->suid < rBlockL->suid) {
263
    return -1;
264
  } else if (lBlockIdx->suid > rBlockL->suid) {
265
    return 1;
266
  }
267

268
  if (lBlockIdx->uid < rBlockL->minUid) {
269
    return -1;
270
  } else if (lBlockIdx->uid > rBlockL->maxUid) {
271
    return 1;
272
  }
273

274
  return 0;
275
}
276

277
// SDataBlk ======================================================
278
void tDataBlkReset(SDataBlk *pDataBlk) {
279
  *pDataBlk = (SDataBlk){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN};
280
}
281

282
int32_t tPutDataBlk(uint8_t *p, void *ph) {
283
  int32_t   n = 0;
284
  SDataBlk *pDataBlk = (SDataBlk *)ph;
285

286
  n += tPutI64v(p ? p + n : p, pDataBlk->minKey.version);
287
  n += tPutI64v(p ? p + n : p, pDataBlk->minKey.ts);
288
  n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.version);
289
  n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.ts);
290
  n += tPutI64v(p ? p + n : p, pDataBlk->minVer);
291
  n += tPutI64v(p ? p + n : p, pDataBlk->maxVer);
292
  n += tPutI32v(p ? p + n : p, pDataBlk->nRow);
293
  n += tPutI8(p ? p + n : p, pDataBlk->hasDup);
294
  n += tPutI8(p ? p + n : p, pDataBlk->nSubBlock);
295
  for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
296
    n += tPutI64v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].offset);
297
    n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szBlock);
298
    n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szKey);
299
  }
300
  if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) {
301
    n += tPutI64v(p ? p + n : p, pDataBlk->smaInfo.offset);
302
    n += tPutI32v(p ? p + n : p, pDataBlk->smaInfo.size);
303
  }
304

305
  return n;
306
}
307
#endif
308

309
int32_t tGetDataBlk(uint8_t *p, void *ph) {
×
310
  int32_t   n = 0;
×
311
  SDataBlk *pDataBlk = (SDataBlk *)ph;
×
312

313
  n += tGetI64v(p + n, &pDataBlk->minKey.version);
×
314
  n += tGetI64v(p + n, &pDataBlk->minKey.ts);
×
315
  n += tGetI64v(p + n, &pDataBlk->maxKey.version);
×
316
  n += tGetI64v(p + n, &pDataBlk->maxKey.ts);
×
317
  n += tGetI64v(p + n, &pDataBlk->minVer);
×
318
  n += tGetI64v(p + n, &pDataBlk->maxVer);
×
319
  n += tGetI32v(p + n, &pDataBlk->nRow);
×
320
  n += tGetI8(p + n, &pDataBlk->hasDup);
×
321
  n += tGetI8(p + n, &pDataBlk->nSubBlock);
×
322
  for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
×
323
    n += tGetI64v(p + n, &pDataBlk->aSubBlock[iSubBlock].offset);
×
324
    n += tGetI32v(p + n, &pDataBlk->aSubBlock[iSubBlock].szBlock);
×
325
    n += tGetI32v(p + n, &pDataBlk->aSubBlock[iSubBlock].szKey);
×
326
  }
327
  if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) {
×
328
    n += tGetI64v(p + n, &pDataBlk->smaInfo.offset);
×
329
    n += tGetI32v(p + n, &pDataBlk->smaInfo.size);
×
330
  } else {
331
    pDataBlk->smaInfo.offset = 0;
×
332
    pDataBlk->smaInfo.size = 0;
×
333
  }
334

335
  return n;
×
336
}
337

338
#ifdef BUILD_NO_CALL
339
int32_t tDataBlkCmprFn(const void *p1, const void *p2) {
340
  SDataBlk *pBlock1 = (SDataBlk *)p1;
341
  SDataBlk *pBlock2 = (SDataBlk *)p2;
342

343
  if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) {
344
    return -1;
345
  } else if (tsdbKeyCmprFn(&pBlock1->minKey, &pBlock2->maxKey) > 0) {
346
    return 1;
347
  }
348

349
  return 0;
350
}
351

352
bool tDataBlkHasSma(SDataBlk *pDataBlk) {
353
  if (pDataBlk->nSubBlock > 1) return false;
354
  if (pDataBlk->hasDup) return false;
355

356
  return pDataBlk->smaInfo.size > 0;
357
}
358

359
// SSttBlk ======================================================
360
int32_t tPutSttBlk(uint8_t *p, void *ph) {
361
  int32_t  n = 0;
362
  SSttBlk *pSttBlk = (SSttBlk *)ph;
363

364
  n += tPutI64(p ? p + n : p, pSttBlk->suid);
365
  n += tPutI64(p ? p + n : p, pSttBlk->minUid);
366
  n += tPutI64(p ? p + n : p, pSttBlk->maxUid);
367
  n += tPutI64v(p ? p + n : p, pSttBlk->minKey);
368
  n += tPutI64v(p ? p + n : p, pSttBlk->maxKey);
369
  n += tPutI64v(p ? p + n : p, pSttBlk->minVer);
370
  n += tPutI64v(p ? p + n : p, pSttBlk->maxVer);
371
  n += tPutI32v(p ? p + n : p, pSttBlk->nRow);
372
  n += tPutI64v(p ? p + n : p, pSttBlk->bInfo.offset);
373
  n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szBlock);
374
  n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szKey);
375

376
  return n;
377
}
378
#endif
379

380
int32_t tGetSttBlk(uint8_t *p, void *ph) {
×
381
  int32_t  n = 0;
×
382
  SSttBlk *pSttBlk = (SSttBlk *)ph;
×
383

384
  n += tGetI64(p + n, &pSttBlk->suid);
×
385
  n += tGetI64(p + n, &pSttBlk->minUid);
×
386
  n += tGetI64(p + n, &pSttBlk->maxUid);
×
387
  n += tGetI64v(p + n, &pSttBlk->minKey);
×
388
  n += tGetI64v(p + n, &pSttBlk->maxKey);
×
389
  n += tGetI64v(p + n, &pSttBlk->minVer);
×
390
  n += tGetI64v(p + n, &pSttBlk->maxVer);
×
391
  n += tGetI32v(p + n, &pSttBlk->nRow);
×
392
  n += tGetI64v(p + n, &pSttBlk->bInfo.offset);
×
393
  n += tGetI32v(p + n, &pSttBlk->bInfo.szBlock);
×
394
  n += tGetI32v(p + n, &pSttBlk->bInfo.szKey);
×
395

396
  return n;
×
397
}
398

399
// SBlockCol ======================================================
400

401
static const int32_t BLOCK_WITH_ALG_VER = 2;
402

403
int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) {
2,373,746✔
404
  int32_t code;
405

406
  if ((code = tBufferPutI16v(buffer, pBlockCol->cid))) return code;
4,747,519!
407
  if ((code = tBufferPutI8(buffer, pBlockCol->type))) return code;
4,747,546!
408
  if ((code = tBufferPutI8(buffer, pBlockCol->cflag))) return code;
4,747,546!
409
  if ((code = tBufferPutI8(buffer, pBlockCol->flag))) return code;
4,747,546!
410
  if ((code = tBufferPutI32v(buffer, pBlockCol->szOrigin))) return code;
4,747,546!
411

412
  if (pBlockCol->flag != HAS_NULL) {
2,373,773✔
413
    if (pBlockCol->flag != HAS_VALUE) {
2,225,965✔
414
      if ((code = tBufferPutI32v(buffer, pBlockCol->szBitmap))) return code;
879,358!
415
    }
416

417
    if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
2,225,965✔
418
      if ((code = tBufferPutI32v(buffer, pBlockCol->szOffset))) return code;
752,936!
419
    }
420

421
    if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) {
2,225,965✔
422
      if ((code = tBufferPutI32v(buffer, pBlockCol->szValue))) return code;
4,430,610!
423
    }
424

425
    if ((code = tBufferPutI32v(buffer, pBlockCol->offset))) return code;
4,451,930!
426
  }
427
  if (ver >= BLOCK_WITH_ALG_VER) {
2,373,773!
428
    if ((code = tBufferPutU32(buffer, pBlockCol->alg))) return code;
4,748,856!
429
  } else {
430
    if ((code = tBufferPutU32(buffer, defaultCmprAlg))) return code;
×
431
  }
432
  return 0;
2,373,773✔
433
}
434

435
int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) {
45,565,208✔
436
  int32_t code;
437

438
  if ((code = tBufferGetI16v(br, &pBlockCol->cid))) return code;
45,565,208!
439
  if ((code = tBufferGetI8(br, &pBlockCol->type))) return code;
45,571,666!
440
  if ((code = tBufferGetI8(br, &pBlockCol->cflag))) return code;
45,570,093!
441
  if ((code = tBufferGetI8(br, &pBlockCol->flag))) return code;
45,564,329!
442
  if ((code = tBufferGetI32v(br, &pBlockCol->szOrigin))) return code;
45,561,872!
443

444
  pBlockCol->szBitmap = 0;
45,561,927✔
445
  pBlockCol->szOffset = 0;
45,561,927✔
446
  pBlockCol->szValue = 0;
45,561,927✔
447
  pBlockCol->offset = 0;
45,561,927✔
448

449
  if (pBlockCol->flag != HAS_NULL) {
45,561,927✔
450
    if (pBlockCol->flag != HAS_VALUE) {
44,703,806✔
451
      if ((code = tBufferGetI32v(br, &pBlockCol->szBitmap))) return code;
3,256,134!
452
    }
453

454
    if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
44,702,455!
455
      if ((code = tBufferGetI32v(br, &pBlockCol->szOffset))) return code;
4,970,424!
456
    }
457

458
    if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) {
44,711,430✔
459
      if ((code = tBufferGetI32v(br, &pBlockCol->szValue))) return code;
44,701,088!
460
    }
461

462
    if ((code = tBufferGetI32v(br, &pBlockCol->offset))) return code;
44,715,257!
463
  }
464

465
  if (ver >= BLOCK_WITH_ALG_VER) {
45,564,308!
466
    if ((code = tBufferGetU32(br, &pBlockCol->alg))) return code;
45,567,007!
467
  } else {
UNCOV
468
    pBlockCol->alg = defaultCmprAlg;
×
469
  }
470

471
  return 0;
45,562,020✔
472
}
473

474
#ifdef BUILD_NO_CALL
475
int32_t tBlockColCmprFn(const void *p1, const void *p2) {
476
  if (((SBlockCol *)p1)->cid < ((SBlockCol *)p2)->cid) {
477
    return -1;
478
  } else if (((SBlockCol *)p1)->cid > ((SBlockCol *)p2)->cid) {
479
    return 1;
480
  }
481

482
  return 0;
483
}
484

485
// SDelIdx ======================================================
486
int32_t tCmprDelIdx(void const *lhs, void const *rhs) {
487
  SDelIdx *lDelIdx = (SDelIdx *)lhs;
488
  SDelIdx *rDelIdx = (SDelIdx *)rhs;
489

490
  if (lDelIdx->suid < rDelIdx->suid) {
491
    return -1;
492
  } else if (lDelIdx->suid > rDelIdx->suid) {
493
    return 1;
494
  }
495

496
  if (lDelIdx->uid < rDelIdx->uid) {
497
    return -1;
498
  } else if (lDelIdx->uid > rDelIdx->uid) {
499
    return 1;
500
  }
501

502
  return 0;
503
}
504

505
int32_t tPutDelIdx(uint8_t *p, void *ph) {
506
  SDelIdx *pDelIdx = (SDelIdx *)ph;
507
  int32_t  n = 0;
508

509
  n += tPutI64(p ? p + n : p, pDelIdx->suid);
510
  n += tPutI64(p ? p + n : p, pDelIdx->uid);
511
  n += tPutI64v(p ? p + n : p, pDelIdx->offset);
512
  n += tPutI64v(p ? p + n : p, pDelIdx->size);
513

514
  return n;
515
}
516
#endif
517

518
int32_t tGetDelIdx(uint8_t *p, void *ph) {
×
519
  SDelIdx *pDelIdx = (SDelIdx *)ph;
×
520
  int32_t  n = 0;
×
521

522
  n += tGetI64(p + n, &pDelIdx->suid);
×
523
  n += tGetI64(p + n, &pDelIdx->uid);
×
524
  n += tGetI64v(p + n, &pDelIdx->offset);
×
525
  n += tGetI64v(p + n, &pDelIdx->size);
×
526

527
  return n;
×
528
}
529

530
#ifdef BUILD_NO_CALL
531
// SDelData ======================================================
532
int32_t tPutDelData(uint8_t *p, void *ph) {
533
  SDelData *pDelData = (SDelData *)ph;
534
  int32_t   n = 0;
535

536
  n += tPutI64v(p ? p + n : p, pDelData->version);
537
  n += tPutI64(p ? p + n : p, pDelData->sKey);
538
  n += tPutI64(p ? p + n : p, pDelData->eKey);
539

540
  return n;
541
}
542
#endif
543

544
int32_t tGetDelData(uint8_t *p, void *ph) {
×
545
  SDelData *pDelData = (SDelData *)ph;
×
546
  int32_t   n = 0;
×
547

548
  n += tGetI64v(p + n, &pDelData->version);
×
549
  n += tGetI64(p + n, &pDelData->sKey);
×
550
  n += tGetI64(p + n, &pDelData->eKey);
×
551

552
  return n;
×
553
}
554

555
int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision) {
990,293✔
556
  int64_t fid;
557
  if (key < 0) {
990,293✔
558
    fid = ((key + 1) / tsTickPerMin[precision] / minutes - 1);
21,193✔
559
    return (fid < INT32_MIN) ? INT32_MIN : (int32_t)fid;
21,193✔
560
  } else {
561
    fid = ((key / tsTickPerMin[precision] / minutes));
969,100✔
562
    return (fid > INT32_MAX) ? INT32_MAX : (int32_t)fid;
969,100!
563
  }
564
}
565

566
void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey) {
13,129,812✔
567
  *minKey = tsTickPerMin[precision] * fid * minutes;
13,129,812✔
568
  *maxKey = *minKey + tsTickPerMin[precision] * minutes - 1;
13,129,812✔
569
}
13,129,812✔
570

571
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t nowSec) {
156,918✔
572
  int32_t aFid[3];
573
  TSKEY   key;
574

575
  if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
156,918✔
576
    nowSec = nowSec * 1000;
156,798✔
577
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
120✔
578
    nowSec = nowSec * 1000000l;
74✔
579
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
46!
580
    nowSec = nowSec * 1000000000l;
76✔
581
  } else {
582
    tsdbError("invalid time precision:%d", pKeepCfg->precision);
×
583
    return 0;
×
584
  }
585

586
  nowSec = nowSec - pKeepCfg->keepTimeOffset * tsTickPerHour[pKeepCfg->precision];
156,948✔
587

588
  key = nowSec - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision];
156,948✔
589
  aFid[0] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
156,948✔
590
  key = nowSec - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision];
156,965✔
591
  aFid[1] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
156,965✔
592
  key = nowSec - pKeepCfg->keep2 * tsTickPerMin[pKeepCfg->precision];
156,956✔
593
  aFid[2] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
156,956✔
594

595
  if (fid >= aFid[0]) {
156,960✔
596
    return 0;
156,724✔
597
  } else if (fid >= aFid[1]) {
236✔
598
    return 1;
85✔
599
  } else if (fid >= aFid[2]) {
151✔
600
    return 2;
147✔
601
  } else {
602
    return -1;
4✔
603
  }
604
}
605

606
// TSDBROW ======================================================
607
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
2,147,483,647✔
608
  STColumn *pTColumn = &pTSchema->columns[iCol];
2,147,483,647✔
609
  SValue    value;
610

611
  if (pRow->type == TSDBROW_ROW_FMT) {
2,147,483,647✔
612
    int32_t ret = tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal);
2,147,483,647✔
613
    if (ret != 0) {
2,147,483,647!
614
      tsdbError("failed to get column value, code:%d", ret);
×
615
    }
616
  } else if (pRow->type == TSDBROW_COL_FMT) {
2,147,483,647!
617
    if (iCol == 0) {
2,147,483,647!
618
      *pColVal =
×
619
          COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID,
×
620
                        ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pRow->pBlockData->aTSKEY[pRow->iRow]}));
621
    } else {
622
      SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId);
2,147,483,647✔
623

624
      if (pColData) {
2,147,483,647✔
625
        tColDataGetValue(pColData, pRow->iRow, pColVal);
583,247,604✔
626
      } else {
627
        *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
2,147,483,647✔
628
      }
629
    }
630
  }
631
}
2,147,483,647✔
632

633
void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) {
2,147,483,647✔
634
  if (row->type == TSDBROW_ROW_FMT) {
2,147,483,647✔
635
    key->version = row->version;
2,114,759,018✔
636
    tRowGetKey(row->pTSRow, &key->key);
2,114,759,018✔
637
  } else {
638
    key->version = row->pBlockData->aVersion[row->iRow];
1,801,911,659✔
639
    tColRowGetKey(row->pBlockData, row->iRow, &key->key);
1,801,911,659!
640
  }
641
}
2,147,483,647✔
642

643
void tColRowGetPrimaryKey(SBlockData *pBlock, int32_t irow, SRowKey *key) {
2,045,016,433✔
644
  for (int32_t i = 0; i < pBlock->nColData; i++) {
2,147,483,647✔
645
    SColData *pColData = &pBlock->aColData[i];
2,147,483,647✔
646
    if (pColData->cflag & COL_IS_KEY) {
2,147,483,647✔
647
      SColVal cv;
648
      tColDataGetValue(pColData, irow, &cv);
1,288,538,585✔
649
      key->pks[key->numOfPKs] = cv.value;
1,308,191,872✔
650
      key->numOfPKs++;
1,308,191,872✔
651
    } else {
652
      break;
1,939,368,914✔
653
    }
654
  }
655
}
2,064,669,720✔
656

657
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2) {
842,059,081✔
658
  int32_t c = tRowKeyCompare(&key1->key, &key2->key);
842,059,081✔
659

660
  if (c) {
842,658,601✔
661
    return c;
803,865,370✔
662
  }
663

664
  if (key1->version < key2->version) {
38,793,231✔
665
    return -1;
24,090,207✔
666
  } else if (key1->version > key2->version) {
14,703,024!
667
    return 1;
18,021,013✔
668
  }
669
  return 0;
×
670
}
671

672
int32_t tsdbRowCompare(const void *p1, const void *p2) {
86,533,053✔
673
  STsdbRowKey key1, key2;
674

675
  tsdbRowGetKey((TSDBROW *)p1, &key1);
86,533,053✔
676
  tsdbRowGetKey((TSDBROW *)p2, &key2);
86,514,192✔
677
  return tsdbRowKeyCmpr(&key1, &key2);
85,951,514✔
678
}
679

680
int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2) {
1,006,370,105✔
681
  STsdbRowKey key1, key2;
682

683
  tsdbRowGetKey((TSDBROW *)p1, &key1);
1,006,370,105✔
684
  tsdbRowGetKey((TSDBROW *)p2, &key2);
1,009,073,187✔
685
  return tRowKeyCompare(&key1.key, &key2.key);
1,009,157,785✔
686
}
687

688
// STSDBRowIter ======================================================
689
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
246,037✔
690
  pIter->pRow = pRow;
246,037✔
691
  if (pRow->type == TSDBROW_ROW_FMT) {
246,037!
692
    int32_t code = tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter);
246,042✔
693
    if (code) return code;
246,056!
694
  } else if (pRow->type == TSDBROW_COL_FMT) {
×
695
    pIter->iColData = 0;
×
696
  }
697

698
  return 0;
246,051✔
699
}
700

701
void tsdbRowClose(STSDBRowIter *pIter) {
246,045✔
702
  if (pIter->pRow->type == TSDBROW_ROW_FMT) {
246,045!
703
    tRowIterClose(&pIter->pIter);
246,048✔
704
  }
705
}
246,056✔
706

707
SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
5,966,678✔
708
  if (pIter->pRow->type == TSDBROW_ROW_FMT) {
5,966,678!
709
    return tRowIterNext(pIter->pIter);
5,966,762✔
710
  } else if (pIter->pRow->type == TSDBROW_COL_FMT) {
×
711
    if (pIter->iColData == 0) {
×
712
      pIter->cv = COL_VAL_VALUE(
×
713
          PRIMARYKEY_TIMESTAMP_COL_ID,
714
          ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]}));
715
      ++pIter->iColData;
×
716
      return &pIter->cv;
×
717
    }
718

719
    if (pIter->iColData <= pIter->pRow->pBlockData->nColData) {
×
720
      tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv);
×
721
      ++pIter->iColData;
×
722
      return &pIter->cv;
×
723
    } else {
724
      return NULL;
×
725
    }
726
  } else {
727
    tsdbError("invalid row type:%d", pIter->pRow->type);
×
728
    return NULL;
×
729
  }
730
}
731

732
// SRowMerger ======================================================
733
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
364,194,426✔
734
  int32_t   code = 0;
364,194,426✔
735
  TSDBKEY   key = TSDBROW_KEY(pRow);
364,194,426✔
736
  SColVal  *pColVal = &(SColVal){0};
364,194,426✔
737
  STColumn *pTColumn;
738
  int32_t   iCol, jCol = 1;
364,194,426✔
739

740
  if (NULL == pTSchema) {
364,194,426✔
741
    pTSchema = pMerger->pTSchema;
191,167,446✔
742
  }
743

744
  if (taosArrayGetSize(pMerger->pArray) == 0) {
364,194,426✔
745
    // ts
746
    jCol = 0;
340,861,372✔
747
    pTColumn = &pTSchema->columns[jCol++];
340,861,372✔
748

749
    *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = key.ts}));
340,861,372✔
750
    if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
683,254,917!
751
      code = terrno;
×
752
      return code;
×
753
    }
754

755
    // other
756
    for (iCol = 1; jCol < pTSchema->numOfCols && iCol < pMerger->pTSchema->numOfCols; ++iCol) {
2,147,483,647✔
757
      pTColumn = &pMerger->pTSchema->columns[iCol];
2,147,483,647✔
758
      if (pTSchema->columns[jCol].colId < pTColumn->colId) {
2,147,483,647!
759
        ++jCol;
×
760
        --iCol;
×
761
        continue;
×
762
      } else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
2,147,483,647!
763
        if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) {
×
764
          return terrno;
×
765
        }
766
        continue;
×
767
      }
768

769
      tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
2,147,483,647✔
770
      if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->value.type)) {
2,147,483,647!
771
        uint8_t *pVal = pColVal->value.pData;
371,479,850✔
772

773
        pColVal->value.pData = NULL;
371,479,850✔
774
        code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
371,479,850!
775
        if (code) {
371,965,062!
UNCOV
776
          return code;
×
777
        }
778

779
        if (pColVal->value.nData) {
371,965,062✔
780
          memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
371,603,892✔
781
        }
782
      }
783

784
      if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
2,147,483,647!
785
        return terrno;
×
786
      }
787
    }
788

789
    for (; iCol < pMerger->pTSchema->numOfCols; ++iCol) {
370,668,064✔
790
      pTColumn = &pMerger->pTSchema->columns[iCol];
10✔
791
      if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) {
20!
792
        return terrno;
×
793
      }
794
    }
795

796
    pMerger->version = key.version;
370,668,054✔
797
    return 0;
370,668,054✔
798
  } else {
799
    for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
365,180,509✔
800
      pTColumn = &pMerger->pTSchema->columns[iCol];
341,729,032✔
801
      if (pTSchema->columns[jCol].colId < pTColumn->colId) {
341,729,032!
802
        ++jCol;
×
803
        --iCol;
×
804
        continue;
×
805
      } else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
341,729,032!
806
        continue;
×
807
      }
808

809
      tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
341,729,032✔
810

811
      if (key.version > pMerger->version) {
341,491,919✔
812
        if (!COL_VAL_IS_NONE(pColVal)) {
334,967,729✔
813
          if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
243,973,124!
814
            SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
34,126,840✔
815
            if (!pTColVal) return terrno;
34,670,257!
816
            if (!COL_VAL_IS_NULL(pColVal)) {
34,677,147✔
817
              code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
33,861,797✔
818
              if (code) return code;
33,887,572!
819

820
              pTColVal->value.nData = pColVal->value.nData;
33,887,572✔
821
              if (pTColVal->value.nData) {
33,887,572✔
822
                memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
33,584,771✔
823
              }
824
              pTColVal->flag = 0;
33,887,572✔
825
            } else {
826
              tFree(pTColVal->value.pData);
815,350✔
827
              taosArraySet(pMerger->pArray, iCol, pColVal);
815,352✔
828
            }
829
          } else {
830
            taosArraySet(pMerger->pArray, iCol, pColVal);
175,160,245✔
831
          }
832
        }
833
      } else if (key.version < pMerger->version) {
6,524,190!
834
        SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
6,524,190✔
835
        if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
6,523,083✔
836
          if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->value.type)) {
252!
837
            code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
84!
838
            if (code) return code;
84!
839

840
            tColVal->value.nData = pColVal->value.nData;
84✔
841
            if (pColVal->value.nData) {
84!
842
              memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
84✔
843
            }
844
            tColVal->flag = 0;
84✔
845
          } else {
846
            taosArraySet(pMerger->pArray, iCol, pColVal);
168✔
847
          }
848
        }
849
      } else {
850
        return TSDB_CODE_INVALID_PARA;
×
851
      }
852
    }
853

854
    pMerger->version = key.version;
23,451,477✔
855
    return code;
23,451,477✔
856
  }
857
}
858

859
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pSchema) {
4,902,655✔
860
  pMerger->pTSchema = pSchema;
4,902,655✔
861
  pMerger->pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
4,902,655✔
862
  if (pMerger->pArray == NULL) {
4,911,283!
UNCOV
863
    return terrno;
×
864
  } else {
865
    return TSDB_CODE_SUCCESS;
4,911,780✔
866
  }
867
}
868

869
void tsdbRowMergerClear(SRowMerger *pMerger) {
344,500,201✔
870
  for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
2,147,483,647✔
871
    SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
2,147,483,647✔
872
    if (IS_VAR_DATA_TYPE(pTColVal->value.type)) {
2,147,483,647!
873
      tFree(pTColVal->value.pData);
1,099,054,258✔
874
    }
875
  }
876

877
  taosArrayClear(pMerger->pArray);
339,372,633✔
878
}
339,673,397✔
879

880
void tsdbRowMergerCleanup(SRowMerger *pMerger) {
4,958,738✔
881
  int32_t numOfCols = taosArrayGetSize(pMerger->pArray);
4,958,738✔
882
  for (int32_t iCol = 1; iCol < numOfCols; iCol++) {
4,959,102!
UNCOV
883
    SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
×
UNCOV
884
    if (IS_VAR_DATA_TYPE(pTColVal->value.type)) {
×
UNCOV
885
      tFree(pTColVal->value.pData);
×
886
    }
887
  }
888

889
  taosArrayDestroy(pMerger->pArray);
4,959,102✔
890
}
4,959,707✔
891

892
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) {
341,999,684✔
893
  return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow);
341,999,684✔
894
}
895

896
// delete skyline ======================================================
897
static void tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pSkyline) {
1,070,434✔
898
  int32_t  i1 = 0;
1,070,434✔
899
  int32_t  n1 = taosArrayGetSize(pSkyline1);
1,070,434✔
900
  int32_t  i2 = 0;
1,070,442✔
901
  int32_t  n2 = taosArrayGetSize(pSkyline2);
1,070,442✔
902
  TSDBKEY *pKey1;
903
  TSDBKEY *pKey2;
904
  int64_t  version1 = 0;
1,070,433✔
905
  int64_t  version2 = 0;
1,070,433✔
906

907
  taosArrayClear(pSkyline);
1,070,433✔
908
  TSDBKEY **pItem = TARRAY_GET_ELEM(pSkyline, 0);
1,069,045✔
909

910
  while (i1 < n1 && i2 < n2) {
6,931,824✔
911
    pKey1 = (TSDBKEY *)taosArrayGetP(pSkyline1, i1);
5,861,426✔
912
    pKey2 = (TSDBKEY *)taosArrayGetP(pSkyline2, i2);
5,861,340✔
913

914
    if (pKey1->ts < pKey2->ts) {
5,862,779✔
915
      version1 = pKey1->version;
3,730,774✔
916
      *pItem = pKey1;
3,730,774✔
917
      i1++;
3,730,774✔
918
    } else if (pKey1->ts > pKey2->ts) {
2,132,005✔
919
      version2 = pKey2->version;
1,739,095✔
920
      *pItem = pKey2;
1,739,095✔
921
      i2++;
1,739,095✔
922
    } else {
923
      version1 = pKey1->version;
392,910✔
924
      version2 = pKey2->version;
392,910✔
925
      *pItem = pKey1;
392,910✔
926
      i1++;
392,910✔
927
      i2++;
392,910✔
928
    }
929

930
    (*pItem)->version = TMAX(version1, version2);
5,862,779✔
931
    pItem++;
5,862,779✔
932
  }
933

934
  while (i1 < n1) {
1,706,969✔
935
    pKey1 = (TSDBKEY *)taosArrayGetP(pSkyline1, i1);
636,574✔
936
    *pItem = pKey1;
636,571✔
937
    pItem++;
636,571✔
938
    i1++;
636,571✔
939
  }
940

941
  while (i2 < n2) {
3,063,940✔
942
    pKey2 = (TSDBKEY *)taosArrayGetP(pSkyline2, i2);
1,993,559✔
943
    *pItem = pKey2;
1,993,545✔
944
    pItem++;
1,993,545✔
945
    i2++;
1,993,545✔
946
  }
947

948
  pSkyline->size = TARRAY_ELEM_IDX(pSkyline, pItem);
1,070,381✔
949
}
1,070,381✔
950

951
int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) {
2,369,315✔
952
  int32_t   code = 0;
2,369,315✔
953
  SDelData *pDelData;
954
  int32_t   midx;
955

956
  taosArrayClear(pSkyline);
2,369,315✔
957
  if (sidx == eidx) {
2,369,296✔
958
    TSDBKEY *pItem1 = taosArrayGet(aSkyline, sidx * 2);
1,298,911✔
959
    TSDBKEY *pItem2 = taosArrayGet(aSkyline, sidx * 2 + 1);
1,298,903✔
960
    if (taosArrayPush(pSkyline, &pItem1) == NULL) {
1,298,893!
961
      return terrno;
×
962
    }
963

964
    if (taosArrayPush(pSkyline, &pItem2) == NULL) {
1,298,885!
965
      return terrno;
×
966
    }
967
  } else {
968
    SArray *pSkyline1 = NULL;
1,070,385✔
969
    SArray *pSkyline2 = NULL;
1,070,385✔
970
    midx = (sidx + eidx) / 2;
1,070,385✔
971

972
    pSkyline1 = taosArrayInit((midx - sidx + 1) * 2, POINTER_BYTES);
1,070,385✔
973
    if (pSkyline1 == NULL) {
1,070,494!
UNCOV
974
      return terrno;
×
975
    }
976
    pSkyline2 = taosArrayInit((eidx - midx) * 2, POINTER_BYTES);
1,070,494✔
977
    if (pSkyline2 == NULL) {
1,070,483!
UNCOV
978
      taosArrayDestroy(pSkyline1);
×
UNCOV
979
      return terrno;
×
980
    }
981

982
    code = tsdbBuildDeleteSkylineImpl(aSkyline, sidx, midx, pSkyline1);
1,070,483✔
983
    if (code) goto _clear;
1,070,448!
984

985
    code = tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2);
1,070,448✔
986
    if (code) goto _clear;
1,070,439!
987

988
    tsdbMergeSkyline(pSkyline1, pSkyline2, pSkyline);
1,070,439✔
989

990
  _clear:
1,070,401✔
991
    taosArrayDestroy(pSkyline1);
1,070,401✔
992
    taosArrayDestroy(pSkyline2);
1,070,469✔
993
  }
994

995
  return code;
2,369,269✔
996
}
997

998
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
228,466✔
999
  SDelData *pDelData;
1000
  int32_t   code = 0;
228,466✔
1001
  int32_t   dataNum = eidx - sidx + 1;
228,466✔
1002
  SArray   *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY));
228,466✔
1003
  if (aTmpSkyline == NULL) {
228,471!
1004
    return terrno;
×
1005
  }
1006

1007
  SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES);
228,471✔
1008
  if (pSkyline == NULL) {
228,472!
1009
    taosArrayDestroy(aTmpSkyline);
×
1010
    return terrno;
×
1011
  }
1012

1013
  taosArrayClear(aSkyline);
228,472✔
1014
  for (int32_t i = sidx; i <= eidx; ++i) {
1,527,359✔
1015
    pDelData = (SDelData *)taosArrayGet(aDelData, i);
1,298,889✔
1016
    if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}) == NULL) {
2,597,770!
1017
      code = terrno;
×
1018
      goto _clear;
×
1019
    }
1020

1021
    if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}) == NULL) {
2,597,810!
1022
      code = terrno;
×
1023
      goto _clear;
×
1024
    }
1025
  }
1026

1027
  code = tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline);
228,470✔
1028
  if (code) goto _clear;
228,463!
1029

1030
  int32_t skylineNum = taosArrayGetSize(pSkyline);
228,463✔
1031
  for (int32_t i = 0; i < skylineNum; ++i) {
2,433,090✔
1032
    TSDBKEY *p = taosArrayGetP(pSkyline, i);
2,204,629✔
1033
    if (taosArrayPush(aSkyline, p) == NULL) {
2,204,627!
1034
      code = terrno;
×
1035
      goto _clear;
×
1036
    }
1037
  }
1038

1039
_clear:
228,461✔
1040
  taosArrayDestroy(aTmpSkyline);
228,461✔
1041
  taosArrayDestroy(pSkyline);
228,467✔
1042

1043
  return code;
228,472✔
1044
}
1045

1046
/*
1047
int32_t tsdbBuildDeleteSkyline2(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) {
1048
  int32_t   code = 0;
1049
  SDelData *pDelData;
1050
  int32_t   midx;
1051

1052
  taosArrayClear(aSkyline);
1053
  if (sidx == eidx) {
1054
    pDelData = (SDelData *)taosArrayGet(aDelData, sidx);
1055
    taosArrayPush(aSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version});
1056
    taosArrayPush(aSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0});
1057
  } else {
1058
    SArray *aSkyline1 = NULL;
1059
    SArray *aSkyline2 = NULL;
1060

1061
    aSkyline1 = taosArrayInit(0, sizeof(TSDBKEY));
1062
    aSkyline2 = taosArrayInit(0, sizeof(TSDBKEY));
1063
    if (aSkyline1 == NULL || aSkyline2 == NULL) {
1064
      code = TSDB_CODE_OUT_OF_MEMORY;
1065
      goto _clear;
1066
    }
1067
    midx = (sidx + eidx) / 2;
1068

1069
    code = tsdbBuildDeleteSkyline(aDelData, sidx, midx, aSkyline1);
1070
    if (code) goto _clear;
1071

1072
    code = tsdbBuildDeleteSkyline(aDelData, midx + 1, eidx, aSkyline2);
1073
    if (code) goto _clear;
1074

1075
    code = tsdbMergeSkyline(aSkyline1, aSkyline2, aSkyline);
1076

1077
  _clear:
1078
    taosArrayDestroy(aSkyline1);
1079
    taosArrayDestroy(aSkyline2);
1080
  }
1081

1082
  return code;
1083
}
1084
*/
1085

1086
// SBlockData ======================================================
1087
int32_t tBlockDataCreate(SBlockData *pBlockData) {
18,692,112✔
1088
  pBlockData->suid = 0;
18,692,112✔
1089
  pBlockData->uid = 0;
18,692,112✔
1090
  pBlockData->nRow = 0;
18,692,112✔
1091
  pBlockData->aUid = NULL;
18,692,112✔
1092
  pBlockData->aVersion = NULL;
18,692,112✔
1093
  pBlockData->aTSKEY = NULL;
18,692,112✔
1094
  pBlockData->nColData = 0;
18,692,112✔
1095
  pBlockData->aColData = NULL;
18,692,112✔
1096
  return 0;
18,692,112✔
1097
}
1098

1099
void tBlockDataDestroy(SBlockData *pBlockData) {
19,193,631✔
1100
  tFree(pBlockData->aUid);
19,193,631✔
1101
  tFree(pBlockData->aVersion);
19,193,838✔
1102
  tFree(pBlockData->aTSKEY);
19,193,825✔
1103

1104
  for (int32_t i = 0; i < pBlockData->nColData; i++) {
39,863,696✔
1105
    tColDataDestroy(&pBlockData->aColData[i]);
20,669,894✔
1106
  }
1107

1108
  if (pBlockData->aColData) {
19,193,802✔
1109
    taosMemoryFree(pBlockData->aColData);
4,258,608✔
1110
    pBlockData->aColData = NULL;
4,258,653✔
1111
  }
1112
}
19,193,847✔
1113

1114
static int32_t tBlockDataAdjustColData(SBlockData *pBlockData, int32_t nColData) {
407,718✔
1115
  int32_t code = 0;
407,718✔
1116

1117
  if (pBlockData->nColData > nColData) {
407,718✔
1118
    for (int32_t i = nColData; i < pBlockData->nColData; i++) {
23,791✔
1119
      tColDataDestroy(&pBlockData->aColData[i]);
23,051✔
1120
    }
1121
  } else if (pBlockData->nColData < nColData) {
406,978✔
1122
    SColData *aColData = taosMemoryRealloc(pBlockData->aColData, sizeof(SBlockData) * nColData);
328,511✔
1123
    if (aColData == NULL) {
328,569✔
1124
      code = terrno;
41✔
1125
      goto _exit;
×
1126
    }
1127

1128
    pBlockData->aColData = aColData;
328,528✔
1129
    memset(&pBlockData->aColData[pBlockData->nColData], 0, sizeof(SBlockData) * (nColData - pBlockData->nColData));
328,528✔
1130
  }
1131
  pBlockData->nColData = nColData;
407,735✔
1132

1133
_exit:
407,735✔
1134
  return code;
407,735✔
1135
}
1136
int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid) {
407,727✔
1137
  int32_t code = 0;
407,727✔
1138

1139
  if (!pId->suid && !pId->uid) {
407,727!
1140
    return TSDB_CODE_INVALID_PARA;
×
1141
  }
1142

1143
  pBlockData->suid = pId->suid;
407,727✔
1144
  pBlockData->uid = pId->uid;
407,727✔
1145
  pBlockData->nRow = 0;
407,727✔
1146

1147
  if (aCid) {
407,727!
1148
    code = tBlockDataAdjustColData(pBlockData, nCid);
×
1149
    if (code) goto _exit;
×
1150

1151
    int32_t   iColumn = 1;
×
1152
    STColumn *pTColumn = &pTSchema->columns[iColumn];
×
1153
    for (int32_t iCid = 0; iCid < nCid; iCid++) {
×
1154
      // aCid array (from taos client catalog) contains columns that does not exist in the pTSchema. the pTSchema is
1155
      // newer
1156
      if (pTColumn == NULL) {
×
1157
        continue;
×
1158
      }
1159

1160
      while (pTColumn->colId < aCid[iCid]) {
×
1161
        iColumn++;
×
1162
        if (!(iColumn < pTSchema->numOfCols)) {
×
1163
          return TSDB_CODE_INVALID_PARA;
×
1164
        }
1165
        pTColumn = &pTSchema->columns[iColumn];
×
1166
      }
1167

1168
      if (pTColumn->colId != aCid[iCid]) {
×
1169
        continue;
×
1170
      }
1171

1172
      tColDataInit(&pBlockData->aColData[iCid], pTColumn->colId, pTColumn->type, pTColumn->flags);
×
1173

1174
      iColumn++;
×
1175
      pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL;
×
1176
    }
1177
  } else {
1178
    code = tBlockDataAdjustColData(pBlockData, pTSchema->numOfCols - 1);
407,727✔
1179
    if (code) goto _exit;
407,742!
1180

1181
    for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
3,279,913✔
1182
      STColumn *pTColumn = &pTSchema->columns[iColData + 1];
2,872,202✔
1183
      tColDataInit(&pBlockData->aColData[iColData], pTColumn->colId, pTColumn->type, pTColumn->flags);
2,872,202✔
1184
    }
1185
  }
1186

1187
_exit:
407,711✔
1188
  return code;
407,711✔
1189
}
1190

1191
void tBlockDataReset(SBlockData *pBlockData) {
12,983,087✔
1192
  pBlockData->suid = 0;
12,983,087✔
1193
  pBlockData->uid = 0;
12,983,087✔
1194
  pBlockData->nRow = 0;
12,983,087✔
1195
  for (int32_t i = 0; i < pBlockData->nColData; i++) {
15,655,813✔
1196
    tColDataDestroy(&pBlockData->aColData[i]);
2,672,753✔
1197
  }
1198
  pBlockData->nColData = 0;
12,983,060✔
1199
  taosMemoryFreeClear(pBlockData->aColData);
12,983,060✔
1200
}
12,983,091✔
1201

1202
void tBlockDataClear(SBlockData *pBlockData) {
501,947✔
1203
  pBlockData->nRow = 0;
501,947✔
1204
  for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
3,579,493✔
1205
    tColDataClear(tBlockDataGetColDataByIdx(pBlockData, iColData));
3,077,525✔
1206
  }
1207
}
501,968✔
1208

1209
int32_t tBlockDataAddColData(SBlockData *pBlockData, int16_t cid, int8_t type, int8_t cflag, SColData **ppColData) {
20,918,611✔
1210
  if (pBlockData->nColData != 0 && pBlockData->aColData[pBlockData->nColData - 1].cid >= cid) {
20,918,611!
1211
    return TSDB_CODE_INVALID_PARA;
×
1212
  }
1213

1214
  SColData *newColData = taosMemoryRealloc(pBlockData->aColData, sizeof(SColData) * (pBlockData->nColData + 1));
20,918,611✔
1215
  if (newColData == NULL) {
20,921,931!
UNCOV
1216
    return terrno;
×
1217
  }
1218

1219
  pBlockData->aColData = newColData;
20,921,931✔
1220
  pBlockData->nColData++;
20,921,931✔
1221

1222
  *ppColData = &pBlockData->aColData[pBlockData->nColData - 1];
20,921,931✔
1223
  memset(*ppColData, 0, sizeof(SColData));
20,921,931✔
1224
  tColDataInit(*ppColData, cid, type, cflag);
20,921,931✔
1225

1226
  return 0;
20,923,413✔
1227
}
1228

1229
/* flag > 0: forward update
1230
 * flag == 0: insert
1231
 * flag < 0: backward update
1232
 */
1233
static int32_t tBlockDataUpsertBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow,
400,467,102✔
1234
                                        int32_t flag) {
1235
  int32_t code = 0;
400,467,102✔
1236

1237
  SColVal   cv = {0};
400,467,102✔
1238
  int32_t   iColDataFrom = 0;
400,467,102✔
1239
  SColData *pColDataFrom = (iColDataFrom < pBlockDataFrom->nColData) ? &pBlockDataFrom->aColData[iColDataFrom] : NULL;
400,467,102!
1240

1241
  for (int32_t iColDataTo = 0; iColDataTo < pBlockData->nColData; iColDataTo++) {
1,662,078,259✔
1242
    SColData *pColDataTo = &pBlockData->aColData[iColDataTo];
1,259,979,502✔
1243

1244
    while (pColDataFrom && pColDataFrom->cid < pColDataTo->cid) {
1,259,979,530✔
1245
      pColDataFrom = (++iColDataFrom < pBlockDataFrom->nColData) ? &pBlockDataFrom->aColData[iColDataFrom] : NULL;
28!
1246
    }
1247

1248
    if (pColDataFrom == NULL || pColDataFrom->cid > pColDataTo->cid) {
1,300,210,828✔
1249
      cv = COL_VAL_NONE(pColDataTo->cid, pColDataTo->type);
40,230,975✔
1250
      if (flag == 0 && (code = tColDataAppendValue(pColDataTo, &cv))) goto _exit;
40,230,975!
1251
    } else {
1252
      tColDataGetValue(pColDataFrom, iRow, &cv);
1,219,748,527✔
1253

1254
      if (flag) {
1,227,331,019✔
1255
        code = tColDataUpdateValue(pColDataTo, &cv, flag > 0);
34,932,508✔
1256
      } else {
1257
        code = tColDataAppendValue(pColDataTo, &cv);
1,192,398,511✔
1258
      }
1259
      if (code) goto _exit;
1,221,379,831!
1260

1261
      pColDataFrom = (++iColDataFrom < pBlockDataFrom->nColData) ? &pBlockDataFrom->aColData[iColDataFrom] : NULL;
1,221,379,831✔
1262
    }
1263
  }
1264

1265
_exit:
402,098,757✔
1266
  return code;
402,098,757✔
1267
}
1268

1269
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
726,412,946✔
1270
  int32_t code = 0;
726,412,946✔
1271

1272
  if (!(pBlockData->suid || pBlockData->uid)) {
726,412,946!
1273
    return TSDB_CODE_INVALID_PARA;
×
1274
  }
1275

1276
  // uid
1277
  if (pBlockData->uid == 0) {
726,412,946✔
1278
    if (!uid) {
336,934,900!
1279
      return TSDB_CODE_INVALID_PARA;
×
1280
    }
1281
    code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1));
336,934,900!
1282
    if (code) goto _exit;
337,357,745!
1283
    pBlockData->aUid[pBlockData->nRow] = uid;
337,357,745✔
1284
  }
1285
  // version
1286
  code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1));
726,835,791!
1287
  if (code) goto _exit;
727,483,006!
1288
  pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow);
727,483,006✔
1289
  // timestamp
1290
  code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1));
727,483,006!
1291
  if (code) goto _exit;
728,151,596!
1292
  pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow);
728,151,596✔
1293

1294
  if (pRow->type == TSDBROW_ROW_FMT) {
728,151,596✔
1295
    code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, 0 /* append */);
341,900,931✔
1296
    if (code) goto _exit;
341,573,070!
1297
  } else if (pRow->type == TSDBROW_COL_FMT) {
386,250,665!
1298
    code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, 0 /* append */);
388,506,233✔
1299
    if (code) goto _exit;
389,561,697!
1300
  } else {
1301
    return TSDB_CODE_INVALID_PARA;
×
1302
  }
1303
  pBlockData->nRow++;
731,134,767✔
1304

1305
_exit:
731,134,767✔
1306
  return code;
731,134,767✔
1307
}
1308
int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
17,145,488✔
1309
  int32_t code = 0;
17,145,488✔
1310

1311
  // version
1312
  int64_t lversion = pBlockData->aVersion[pBlockData->nRow - 1];
17,145,488✔
1313
  int64_t rversion = TSDBROW_VERSION(pRow);
17,145,488✔
1314
  if (lversion == rversion) {
17,145,488!
1315
    return TSDB_CODE_INVALID_PARA;
×
1316
  }
1317
  if (rversion > lversion) {
17,145,488!
1318
    pBlockData->aVersion[pBlockData->nRow - 1] = rversion;
17,147,056✔
1319
  }
1320

1321
  // update other rows
1322
  if (pRow->type == TSDBROW_ROW_FMT) {
17,145,488✔
1323
    code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData,
5,139,971!
1324
                             (rversion > lversion) ? 1 : -1 /* update */);
1325
    if (code) goto _exit;
5,143,029!
1326
  } else if (pRow->type == TSDBROW_COL_FMT) {
12,005,517!
1327
    code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, (rversion > lversion) ? 1 : -1);
12,091,941!
1328
    if (code) goto _exit;
12,106,238!
1329
  } else {
1330
    code = TSDB_CODE_INVALID_PARA;
×
1331
    goto _exit;
×
1332
  }
1333

1334
_exit:
17,162,843✔
1335
  return code;
17,162,843✔
1336
}
1337

1338
#ifdef BUILD_NO_CALL
1339
int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid) {
1340
  if (pBlockData->nRow == 0) {
1341
    return 1;
1342
  } else if (pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
1343
    return pBlockData->nRow;
1344
  } else {
1345
    return pBlockData->nRow + 1;
1346
  }
1347
}
1348

1349
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
1350
  if (pBlockData->nRow > 0 && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
1351
    return tBlockDataUpdateRow(pBlockData, pRow, pTSchema);
1352
  } else {
1353
    return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid);
1354
  }
1355
}
1356
#endif
1357

1358
SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid) {
2,147,483,647✔
1359
  int32_t lidx = 0;
2,147,483,647✔
1360
  int32_t ridx = pBlockData->nColData - 1;
2,147,483,647✔
1361

1362
  while (lidx <= ridx) {
2,147,483,647✔
1363
    int32_t   midx = (lidx + ridx) >> 1;
2,147,483,647✔
1364
    SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, midx);
2,147,483,647✔
1365
    int32_t   c = (pColData->cid == cid) ? 0 : ((pColData->cid > cid) ? 1 : -1);
2,147,483,647✔
1366

1367
    if (c == 0) {
2,147,483,647✔
1368
      return pColData;
610,315,695✔
1369
    } else if (c < 0) {
2,147,483,647✔
1370
      lidx = midx + 1;
2,147,483,647✔
1371
    } else {
1372
      ridx = midx - 1;
1,365,205,273✔
1373
    }
1374
  }
1375

1376
  return NULL;
2,147,483,647✔
1377
}
1378

1379
/* buffers[0]: SDiskDataHdr
1380
 * buffers[1]: key part: uid + version + ts + primary keys
1381
 * buffers[2]: SBlockCol part
1382
 * buffers[3]: regular column part
1383
 */
1384
int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SBuffer *assist) {
360,789✔
1385
  int32_t code = 0;
360,789✔
1386
  int32_t lino = 0;
360,789✔
1387

1388
  SColCompressInfo *pInfo = pCompr;
360,789✔
1389
  code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, 1, &pInfo->defaultCmprAlg);
360,789✔
1390
  TAOS_UNUSED(code);
1391

1392
  SDiskDataHdr hdr = {
360,809✔
1393
      .delimiter = TSDB_FILE_DLMT,
1394
      .fmtVer = 2,
1395
      .suid = bData->suid,
360,809✔
1396
      .uid = bData->uid,
360,809✔
1397
      .szUid = 0,     // filled by compress key
1398
      .szVer = 0,     // filled by compress key
1399
      .szKey = 0,     // filled by compress key
1400
      .szBlkCol = 0,  // filled by this func
1401
      .nRow = bData->nRow,
360,809✔
1402
      .cmprAlg = pInfo->defaultCmprAlg,
360,809✔
1403
      .numOfPKs = 0,  // filled by compress key
1404
  };
1405
  // Key part
1406

1407
  tBufferClear(&buffers[1]);
360,809✔
1408
  code = tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist, (SColCompressInfo *)pInfo);
360,809✔
1409
  TSDB_CHECK_CODE(code, lino, _exit);
360,855!
1410

1411
  // Regulart column part
1412
  tBufferClear(&buffers[2]);
360,855✔
1413
  tBufferClear(&buffers[3]);
360,855✔
1414
  for (int i = 0; i < bData->nColData; i++) {
2,954,391✔
1415
    SColData *colData = tBlockDataGetColDataByIdx(bData, i);
2,593,717✔
1416

1417
    if (colData->cflag & COL_IS_KEY) {
2,593,717✔
1418
      continue;
347,705✔
1419
    }
1420
    if (colData->flag == HAS_NONE) {
2,465,248✔
1421
      continue;
219,236✔
1422
    }
1423

1424
    SColDataCompressInfo cinfo = {
2,246,012✔
1425
        .cmprAlg = pInfo->defaultCmprAlg,
2,246,012✔
1426
    };
1427
    code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, colData->cid, &cinfo.cmprAlg);
2,246,012✔
1428
    if (code < 0) {
1429
      //
1430
    }
1431

1432
    int32_t offset = buffers[3].size;
2,245,176✔
1433
    code = tColDataCompress(colData, &cinfo, &buffers[3], assist);
2,245,176✔
1434
    TSDB_CHECK_CODE(code, lino, _exit);
2,245,363!
1435

1436
    SBlockCol blockCol = (SBlockCol){.cid = cinfo.columnId,
2,245,363✔
1437
                                     .type = cinfo.dataType,
2,245,363✔
1438
                                     .cflag = cinfo.columnFlag,
2,245,363✔
1439
                                     .flag = cinfo.flag,
2,245,363✔
1440
                                     .szOrigin = cinfo.dataOriginalSize,
2,245,363✔
1441
                                     .szBitmap = cinfo.bitmapCompressedSize,
2,245,363✔
1442
                                     .szOffset = cinfo.offsetCompressedSize,
2,245,363✔
1443
                                     .szValue = cinfo.dataCompressedSize,
2,245,363✔
1444
                                     .offset = offset,
1445
                                     .alg = cinfo.cmprAlg};
2,245,363✔
1446

1447
    code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer, hdr.cmprAlg);
2,245,363✔
1448
    TSDB_CHECK_CODE(code, lino, _exit);
2,245,831!
1449
  }
1450
  hdr.szBlkCol = buffers[2].size;
360,674✔
1451

1452
  // SDiskDataHdr part
1453
  tBufferClear(&buffers[0]);
1454
  code = tPutDiskDataHdr(&buffers[0], &hdr);
360,674✔
1455
  TSDB_CHECK_CODE(code, lino, _exit);
360,831!
1456

1457
_exit:
360,831✔
1458
  return code;
360,831✔
1459
}
1460

1461
int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist) {
98,306✔
1462
  int32_t       code = 0;
98,306✔
1463
  int32_t       lino = 0;
98,306✔
1464
  SDiskDataHdr  hdr = {0};
98,306✔
1465
  SCompressInfo cinfo;
1466

1467
  // SDiskDataHdr
1468
  code = tGetDiskDataHdr(br, &hdr);
98,306✔
1469
  TSDB_CHECK_CODE(code, lino, _exit);
98,303!
1470

1471
  tBlockDataReset(blockData);
98,303✔
1472
  blockData->suid = hdr.suid;
98,303✔
1473
  blockData->uid = hdr.uid;
98,303✔
1474
  blockData->nRow = hdr.nRow;
98,303✔
1475

1476
  // Key part
1477
  code = tBlockDataDecompressKeyPart(&hdr, br, blockData, assist);
98,303✔
1478
  TSDB_CHECK_CODE(code, lino, _exit);
98,309!
1479

1480
  // Column part
1481
  SBufferReader br2 = *br;
98,309✔
1482
  br->offset += hdr.szBlkCol;
98,309✔
1483
  for (uint32_t startOffset = br2.offset; br2.offset - startOffset < hdr.szBlkCol;) {
417,707✔
1484
    SBlockCol blockCol;
1485

1486
    code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer, hdr.cmprAlg);
319,386✔
1487
    TSDB_CHECK_CODE(code, lino, _exit);
319,326!
1488
    if (blockCol.alg == 0) blockCol.alg = hdr.cmprAlg;
319,326!
1489
    code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist);
319,326✔
1490
    TSDB_CHECK_CODE(code, lino, _exit);
319,398!
1491
  }
1492

1493
_exit:
98,321✔
1494
  return code;
98,321✔
1495
}
1496

1497
// SDiskDataHdr ==============================
1498
int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) {
360,881✔
1499
  int32_t code;
1500

1501
  if ((code = tBufferPutU32(buffer, pHdr->delimiter))) return code;
721,777!
1502
  if ((code = tBufferPutU32v(buffer, pHdr->fmtVer))) return code;
721,792!
1503
  if ((code = tBufferPutI64(buffer, pHdr->suid))) return code;
721,792!
1504
  if ((code = tBufferPutI64(buffer, pHdr->uid))) return code;
721,792!
1505
  if ((code = tBufferPutI32v(buffer, pHdr->szUid))) return code;
721,792!
1506
  if ((code = tBufferPutI32v(buffer, pHdr->szVer))) return code;
721,792!
1507
  if ((code = tBufferPutI32v(buffer, pHdr->szKey))) return code;
721,792!
1508
  if ((code = tBufferPutI32v(buffer, pHdr->szBlkCol))) return code;
721,792!
1509
  if ((code = tBufferPutI32v(buffer, pHdr->nRow))) return code;
721,792!
1510
  if (pHdr->fmtVer < 2) {
360,896!
1511
    if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code;
×
1512
  } else if (pHdr->fmtVer == 2) {
360,896✔
1513
    if ((code = tBufferPutU32(buffer, pHdr->cmprAlg))) return code;
721,582!
1514
  } else {
1515
    // more data fmt ver
1516
  }
1517
  if (pHdr->fmtVer >= 1) {
360,896✔
1518
    if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code;
721,630!
1519
    for (int i = 0; i < pHdr->numOfPKs; i++) {
489,285✔
1520
      if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) return code;
128,467!
1521
    }
1522
  }
1523

1524
  return 0;
360,899✔
1525
}
1526

1527
int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) {
5,235,525✔
1528
  int32_t code;
1529

1530
  if ((code = tBufferGetU32(br, &pHdr->delimiter))) return code;
5,235,525!
1531
  if ((code = tBufferGetU32v(br, &pHdr->fmtVer))) return code;
5,236,395!
1532
  if ((code = tBufferGetI64(br, &pHdr->suid))) return code;
5,236,068!
1533
  if ((code = tBufferGetI64(br, &pHdr->uid))) return code;
5,234,840!
1534
  if ((code = tBufferGetI32v(br, &pHdr->szUid))) return code;
5,234,369!
1535
  if ((code = tBufferGetI32v(br, &pHdr->szVer))) return code;
5,235,990!
1536
  if ((code = tBufferGetI32v(br, &pHdr->szKey))) return code;
5,236,058!
1537
  if ((code = tBufferGetI32v(br, &pHdr->szBlkCol))) return code;
5,235,745!
1538
  if ((code = tBufferGetI32v(br, &pHdr->nRow))) return code;
5,236,170!
1539
  if (pHdr->fmtVer < 2) {
5,236,169!
1540
    int8_t cmprAlg = 0;
×
1541
    if ((code = tBufferGetI8(br, &cmprAlg))) return code;
×
1542
    pHdr->cmprAlg = cmprAlg;
×
1543
  } else if (pHdr->fmtVer == 2) {
5,236,169✔
1544
    if ((code = tBufferGetU32(br, &pHdr->cmprAlg))) return code;
5,234,742!
1545
  } else {
1546
    // more data fmt ver
1547
  }
1548
  if (pHdr->fmtVer >= 1) {
5,236,736✔
1549
    if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code;
5,235,685!
1550
    for (int i = 0; i < pHdr->numOfPKs; i++) {
5,953,346✔
1551
      if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) {
717,703!
1552
        return code;
×
1553
      }
1554
    }
1555
  } else {
1556
    pHdr->numOfPKs = 0;
1,051✔
1557
  }
1558

1559
  return 0;
5,236,694✔
1560
}
1561

1562
// ALGORITHM ==============================
1563
int32_t tPutColumnDataAgg(SBuffer *buffer, SColumnDataAgg *pColAgg) {
318,041✔
1564
  int32_t code;
1565

1566
  if ((code = tBufferPutI16v(buffer, pColAgg->colId))) return code;
636,082!
1567
  if ((code = tBufferPutI16v(buffer, pColAgg->numOfNull))) return code;
636,082!
1568
  if ((code = tBufferPutI64(buffer, pColAgg->sum))) return code;
636,082!
1569
  if ((code = tBufferPutI64(buffer, pColAgg->max))) return code;
636,082!
1570
  if ((code = tBufferPutI64(buffer, pColAgg->min))) return code;
636,082!
1571

1572
  return 0;
318,041✔
1573
}
1574

1575
int32_t tGetColumnDataAgg(SBufferReader *br, SColumnDataAgg *pColAgg) {
883,566✔
1576
  int32_t code;
1577

1578
  if ((code = tBufferGetI16v(br, &pColAgg->colId))) return code;
883,566!
1579
  if ((code = tBufferGetI16v(br, &pColAgg->numOfNull))) return code;
883,400!
1580
  if ((code = tBufferGetI64(br, &pColAgg->sum))) return code;
883,525!
1581
  if ((code = tBufferGetI64(br, &pColAgg->max))) return code;
883,442!
1582
  if ((code = tBufferGetI64(br, &pColAgg->min))) return code;
883,310!
1583

1584
  return 0;
883,253✔
1585
}
1586

1587
static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist,
360,798✔
1588
                                         SColCompressInfo *compressInfo) {
1589
  int32_t       code = 0;
360,798✔
1590
  int32_t       lino = 0;
360,798✔
1591
  SCompressInfo cinfo;
1592

1593
  // uid
1594
  if (bData->uid == 0) {
360,798✔
1595
    cinfo = (SCompressInfo){
231,896✔
1596
        .cmprAlg = hdr->cmprAlg,
231,896✔
1597
        .dataType = TSDB_DATA_TYPE_BIGINT,
1598
        .originalSize = sizeof(int64_t) * bData->nRow,
231,896✔
1599
    };
1600
    code = tCompressDataToBuffer(bData->aUid, &cinfo, buffer, assist);
231,896✔
1601
    TSDB_CHECK_CODE(code, lino, _exit);
231,955!
1602
    hdr->szUid = cinfo.compressedSize;
231,955✔
1603
  }
1604

1605
  // version
1606
  cinfo = (SCompressInfo){
360,857✔
1607
      .cmprAlg = hdr->cmprAlg,
360,857✔
1608
      .dataType = TSDB_DATA_TYPE_BIGINT,
1609
      .originalSize = sizeof(int64_t) * bData->nRow,
360,857✔
1610
  };
1611
  code = tCompressDataToBuffer((uint8_t *)bData->aVersion, &cinfo, buffer, assist);
360,857✔
1612
  TSDB_CHECK_CODE(code, lino, _exit);
360,883!
1613
  hdr->szVer = cinfo.compressedSize;
360,883✔
1614

1615
  // ts
1616
  cinfo = (SCompressInfo){
360,883✔
1617
      .cmprAlg = hdr->cmprAlg,
360,883✔
1618
      .dataType = TSDB_DATA_TYPE_TIMESTAMP,
1619
      .originalSize = sizeof(TSKEY) * bData->nRow,
360,883✔
1620
  };
1621

1622
  code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist);
360,883✔
1623
  TSDB_CHECK_CODE(code, lino, _exit);
360,860!
1624
  hdr->szKey = cinfo.compressedSize;
360,860✔
1625

1626
  // primary keys
1627
  for (hdr->numOfPKs = 0; hdr->numOfPKs < bData->nColData; hdr->numOfPKs++) {
489,329✔
1628
    if (!(hdr->numOfPKs <= TD_MAX_PK_COLS)) {
489,318!
1629
      return TSDB_CODE_INVALID_PARA;
×
1630
    }
1631

1632
    SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs];
489,318✔
1633
    SColData  *colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs);
489,318✔
1634

1635
    if ((colData->cflag & COL_IS_KEY) == 0) {
489,318✔
1636
      break;
360,853✔
1637
    }
1638

1639
    SColDataCompressInfo info = {
128,465✔
1640
        .cmprAlg = hdr->cmprAlg,
128,465✔
1641
    };
1642
    code = tsdbGetColCmprAlgFromSet(compressInfo->pColCmpr, colData->cid, &info.cmprAlg);
128,465✔
1643
    if (code < 0) {
1644
      // do nothing
1645
    } else {
1646
    }
1647

1648
    code = tColDataCompress(colData, &info, buffer, assist);
128,470✔
1649
    TSDB_CHECK_CODE(code, lino, _exit);
128,469!
1650

1651
    *blockCol = (SBlockCol){
128,469✔
1652
        .cid = info.columnId,
128,469✔
1653
        .type = info.dataType,
128,469✔
1654
        .cflag = info.columnFlag,
128,469✔
1655
        .flag = info.flag,
128,469✔
1656
        .szOrigin = info.dataOriginalSize,
128,469✔
1657
        .szBitmap = info.bitmapCompressedSize,
128,469✔
1658
        .szOffset = info.offsetCompressedSize,
128,469✔
1659
        .szValue = info.dataCompressedSize,
128,469✔
1660
        .offset = 0,
1661
        .alg = info.cmprAlg,
128,469✔
1662
    };
1663
  }
1664

1665
_exit:
11✔
1666
  return code;
360,864✔
1667
}
1668

1669
int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br,
20,917,694✔
1670
                                    SBlockData *blockData, SBuffer *assist) {
1671
  int32_t code = 0;
20,917,694✔
1672
  int32_t lino = 0;
20,917,694✔
1673

1674
  SColData *colData;
1675

1676
  code = tBlockDataAddColData(blockData, blockCol->cid, blockCol->type, blockCol->cflag, &colData);
20,917,694✔
1677
  TSDB_CHECK_CODE(code, lino, _exit);
20,922,815!
1678

1679
  SColDataCompressInfo info = {
18,147,777✔
1680
      .cmprAlg = blockCol->alg,
20,922,815✔
1681
      .columnFlag = blockCol->cflag,
20,922,815✔
1682
      .flag = blockCol->flag,
20,922,815✔
1683
      .dataType = blockCol->type,
20,922,815✔
1684
      .columnId = blockCol->cid,
20,922,815✔
1685
      .numOfData = hdr->nRow,
20,922,815✔
1686
      .bitmapOriginalSize = 0,
1687
      .bitmapCompressedSize = blockCol->szBitmap,
20,922,815✔
1688
      .offsetOriginalSize = blockCol->szOffset ? sizeof(int32_t) * hdr->nRow : 0,
20,922,815✔
1689
      .offsetCompressedSize = blockCol->szOffset,
20,922,815✔
1690
      .dataOriginalSize = blockCol->szOrigin,
20,922,815✔
1691
      .dataCompressedSize = blockCol->szValue,
20,922,815✔
1692
  };
1693

1694
  switch (blockCol->flag) {
20,922,815✔
1695
    case (HAS_NONE | HAS_NULL | HAS_VALUE):
2✔
1696
      info.bitmapOriginalSize = BIT2_SIZE(hdr->nRow);
2✔
1697
      break;
2✔
1698
    case (HAS_NONE | HAS_NULL):
1,226,526✔
1699
    case (HAS_NONE | HAS_VALUE):
1700
    case (HAS_NULL | HAS_VALUE):
1701
      info.bitmapOriginalSize = BIT1_SIZE(hdr->nRow);
1,226,526✔
1702
      break;
1,226,526✔
1703
  }
1704

1705
  code = tColDataDecompress(BR_PTR(br), &info, colData, assist);
20,922,815✔
1706
  TSDB_CHECK_CODE(code, lino, _exit);
20,917,525!
1707
  br->offset += blockCol->szBitmap + blockCol->szOffset + blockCol->szValue;
20,917,525✔
1708

1709
_exit:
20,917,525✔
1710
  return code;
20,917,525✔
1711
}
1712

1713
int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData,
5,234,032✔
1714
                                    SBuffer *assist) {
1715
  int32_t       code = 0;
5,234,032✔
1716
  int32_t       lino = 0;
5,234,032✔
1717
  SCompressInfo cinfo;
1718

1719
  // uid
1720
  if (hdr->szUid > 0) {
5,234,032✔
1721
    cinfo = (SCompressInfo){
4,410,512✔
1722
        .cmprAlg = hdr->cmprAlg,
4,410,512✔
1723
        .dataType = TSDB_DATA_TYPE_BIGINT,
1724
        .compressedSize = hdr->szUid,
4,410,512✔
1725
        .originalSize = sizeof(int64_t) * hdr->nRow,
4,410,512✔
1726
    };
1727

1728
    code = tRealloc((uint8_t **)&blockData->aUid, cinfo.originalSize);
4,410,512✔
1729
    TSDB_CHECK_CODE(code, lino, _exit);
4,412,081!
1730
    code = tDecompressData(BR_PTR(br), &cinfo, blockData->aUid, cinfo.originalSize, assist);
4,412,081✔
1731
    TSDB_CHECK_CODE(code, lino, _exit);
4,412,284!
1732
    br->offset += cinfo.compressedSize;
4,412,284✔
1733
  }
1734

1735
  // version
1736
  cinfo = (SCompressInfo){
5,235,804✔
1737
      .cmprAlg = hdr->cmprAlg,
5,235,804✔
1738
      .dataType = TSDB_DATA_TYPE_BIGINT,
1739
      .compressedSize = hdr->szVer,
5,235,804✔
1740
      .originalSize = sizeof(int64_t) * hdr->nRow,
5,235,804✔
1741
  };
1742
  code = tRealloc((uint8_t **)&blockData->aVersion, cinfo.originalSize);
5,235,804✔
1743
  TSDB_CHECK_CODE(code, lino, _exit);
5,236,506!
1744
  code = tDecompressData(BR_PTR(br), &cinfo, blockData->aVersion, cinfo.originalSize, assist);
5,236,506✔
1745
  TSDB_CHECK_CODE(code, lino, _exit);
5,237,052!
1746
  br->offset += cinfo.compressedSize;
5,237,052✔
1747

1748
  // ts
1749
  cinfo = (SCompressInfo){
5,237,052✔
1750
      .cmprAlg = hdr->cmprAlg,
5,237,052✔
1751
      .dataType = TSDB_DATA_TYPE_TIMESTAMP,
1752
      .compressedSize = hdr->szKey,
5,237,052✔
1753
      .originalSize = sizeof(TSKEY) * hdr->nRow,
5,237,052✔
1754
  };
1755
  code = tRealloc((uint8_t **)&blockData->aTSKEY, cinfo.originalSize);
5,237,052✔
1756
  TSDB_CHECK_CODE(code, lino, _exit);
5,236,955!
1757
  code = tDecompressData(BR_PTR(br), &cinfo, blockData->aTSKEY, cinfo.originalSize, assist);
5,236,955✔
1758
  TSDB_CHECK_CODE(code, lino, _exit);
5,237,134!
1759
  br->offset += cinfo.compressedSize;
5,237,134✔
1760

1761
  // primary keys
1762
  for (int i = 0; i < hdr->numOfPKs; i++) {
5,955,194✔
1763
    const SBlockCol *blockCol = &hdr->primaryBlockCols[i];
718,171✔
1764

1765
    if (!(blockCol->flag == HAS_VALUE)) {
718,171!
1766
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
1767
    }
1768
    if (!(blockCol->cflag & COL_IS_KEY)) {
718,171!
1769
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
1770
    }
1771

1772
    code = tBlockDataDecompressColData(hdr, blockCol, br, blockData, assist);
718,171✔
1773
    TSDB_CHECK_CODE(code, lino, _exit);
718,060!
1774
  }
1775

1776
_exit:
5,237,023✔
1777
  return code;
5,237,023✔
1778
}
1779

1780
int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg) {
2,734,848✔
1781
  if (set == NULL) return -1;
2,734,848!
1782

1783
  uint32_t *ret = taosHashGet(set, &colId, sizeof(colId));
2,734,848✔
1784
  if (ret == NULL) {
2,733,898!
1785
    return TSDB_CODE_NOT_FOUND;
×
1786
  }
1787

1788
  *alg = *ret;
2,733,898✔
1789
  return 0;
2,733,898✔
1790
}
1791
uint32_t tsdbCvtTimestampAlg(uint32_t alg) {
×
1792
  DEFINE_VAR(alg)
×
1793

1794
  return 0;
×
1795
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc