• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3656

14 Mar 2025 08:10AM UTC coverage: 62.841% (+3.3%) from 59.532%
#3656

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

147682 of 302527 branches covered (48.82%)

Branch coverage included in aggregate %.

88 of 99 new or added lines in 12 files covered. (88.89%)

3177 existing lines in 34 files now uncovered.

232747 of 302857 relevant lines covered (76.85%)

5880306.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.98
/source/libs/stream/src/streamUpdate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcompare.h"
17
#include "tdatablock.h"
18
#include "tencode.h"
19
#include "tstreamUpdate.h"
20
#include "ttime.h"
21
#include "tutil.h"
22

23
#define DEFAULT_FALSE_POSITIVE   0.01
24
#define DEFAULT_BUCKET_SIZE      131072
25
#define DEFAULT_MAP_CAPACITY     131072
26
#define DEFAULT_MAP_SIZE         (DEFAULT_MAP_CAPACITY * 100)
27
#define ROWS_PER_MILLISECOND     1
28
#define MAX_NUM_SCALABLE_BF      64
29
#define MIN_NUM_SCALABLE_BF      10
30
#define DEFAULT_PREADD_BUCKET    1
31
#define MAX_INTERVAL             MILLISECOND_PER_MINUTE
32
#define MIN_INTERVAL             (MILLISECOND_PER_SECOND * 10)
33
#define DEFAULT_EXPECTED_ENTRIES 10000
34

35
static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }
129,074,490✔
36

37
int compareKeyTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) {
9,991,150✔
38
  return compareInt64Val(pTs1, pTs2);
9,991,150✔
39
}
40

41
int compareKeyTsAndPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) {
24✔
42
  int res = compareInt64Val(pValue1, pTs);
24✔
43
  if (res != 0) {
24✔
44
    return res;
8✔
45
  } else {
46
    void* pk1 = (char*)pValue1 + sizeof(TSKEY);
16✔
47
    return cmpPkFn(pk1, pPkVal);
16✔
48
  }
49
}
50

51
int32_t getKeyBuff(TSKEY ts, int64_t tbUid, void* pVal, int32_t len, char* buff) {
17,037,908✔
52
  *(TSKEY*)buff = ts;
17,037,908✔
53
  memcpy(buff + sizeof(TSKEY), &tbUid, sizeof(int64_t));
17,037,908✔
54
  if (len == 0) {
17,037,908!
55
    return sizeof(TSKEY) + sizeof(int64_t);
17,070,322✔
56
  }
57
  memcpy(buff, pVal, len);
×
58
  return sizeof(TSKEY) + sizeof(int64_t) + len;
×
59
}
60

61
int32_t getValueBuff(TSKEY ts, char* pVal, int32_t len, char* buff) {
9,989,769✔
62
  *(TSKEY*)buff = ts;
9,989,769✔
63
  if (len == 0) {
9,989,769!
64
    return sizeof(TSKEY);
9,993,987✔
65
  }
UNCOV
66
  memcpy(buff + sizeof(TSKEY), pVal, len);
×
UNCOV
67
  return sizeof(TSKEY) + len;
×
68
}
69

70
int32_t windowSBfAdd(SUpdateInfo* pInfo, uint64_t count) {
3,828,778✔
71
  int32_t code = TSDB_CODE_SUCCESS;
3,828,778✔
72
  int32_t lino = 0;
3,828,778✔
73
  if (pInfo->numSBFs < count) {
3,828,778✔
74
    count = pInfo->numSBFs;
3,801,066✔
75
  }
76
  for (uint64_t i = 0; i < count; ++i) {
132,902,766✔
77
    int64_t      rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
129,074,004✔
78
    SScalableBf* tsSBF = NULL;
129,073,998✔
79
    code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &tsSBF);
129,073,998✔
80
    QUERY_CHECK_CODE(code, lino, _error);
129,074,047!
81
    void* res = taosArrayPush(pInfo->pTsSBFs, &tsSBF);
129,074,047✔
82
    if (!res) {
129,073,988!
83
      code = terrno;
×
84
      QUERY_CHECK_CODE(code, lino, _error);
×
85
    }
86
  }
87

88
_error:
3,828,762✔
89
  if (code != TSDB_CODE_SUCCESS) {
3,828,762!
90
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
91
  }
92
  return code;
3,828,777✔
93
}
94

95
static void clearItemHelper(void* p) {
128,992,010✔
96
  SScalableBf** pBf = p;
128,992,010✔
97
  tScalableBfDestroy(*pBf);
128,992,010✔
98
}
128,992,009✔
99

100
void windowSBfDelete(SUpdateInfo* pInfo, uint64_t count) {
3,824,020✔
101
  if (count < pInfo->numSBFs) {
3,824,020✔
102
    for (uint64_t i = 0; i < count; ++i) {
46,354✔
103
      SScalableBf* pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0);
25,587✔
104
      tScalableBfDestroy(pTsSBFs);
25,587✔
105
      taosArrayRemove(pInfo->pTsSBFs, 0);
25,587✔
106
    }
107
  } else {
108
    taosArrayClearEx(pInfo->pTsSBFs, clearItemHelper);
3,803,253✔
109
  }
110
  pInfo->minTS += pInfo->interval * count;
3,824,020✔
111
}
3,824,020✔
112

113
static int64_t adjustInterval(int64_t interval, int32_t precision) {
5,375✔
114
  int64_t val = interval;
5,375✔
115
  if (precision != TSDB_TIME_PRECISION_MILLI) {
5,375✔
116
    val = convertTimePrecision(interval, precision, TSDB_TIME_PRECISION_MILLI);
118✔
117
  }
118

119
  if (val <= 0 || val > MAX_INTERVAL) {
5,375✔
120
    val = MAX_INTERVAL;
1,756✔
121
  } else if (val < MIN_INTERVAL) {
3,619✔
122
    val = MIN_INTERVAL;
481✔
123
  }
124

125
  if (precision != TSDB_TIME_PRECISION_MILLI) {
5,375✔
126
    val = convertTimePrecision(val, TSDB_TIME_PRECISION_MILLI, precision);
118✔
127
  }
128
  return val;
5,375✔
129
}
130

131
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
5,375✔
132
  if (watermark <= adjInterval) {
5,375✔
133
    watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
5,137✔
134
  }
135

136
  if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
5,375✔
137
    watermark = MAX_NUM_SCALABLE_BF * adjInterval;
239✔
138
  }
139
  return watermark;
5,375✔
140
}
141

142
int32_t updateInfoInitP(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen,
3,337✔
143
                        SUpdateInfo** ppInfo) {
144
  return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp, pkType, pkLen, ppInfo);
3,337✔
145
}
146

147
int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen,
5,375✔
148
                       SUpdateInfo** ppInfo) {
149
  int32_t      code = TSDB_CODE_SUCCESS;
5,375✔
150
  int32_t      lino = 0;
5,375✔
151
  SUpdateInfo* pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
5,375!
152
  if (pInfo == NULL) {
5,375!
153
    code = terrno;
×
154
    QUERY_CHECK_CODE(code, lino, _end);
×
155
  }
156
  pInfo->pTsBuckets = NULL;
5,375✔
157
  pInfo->pTsSBFs = NULL;
5,375✔
158
  pInfo->minTS = INT64_MIN;
5,375✔
159
  pInfo->interval = adjustInterval(interval, precision);
5,375✔
160
  pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark);
5,375✔
161
  pInfo->numSBFs = 0;
5,375✔
162

163
  uint64_t bfSize = 0;
5,375✔
164
  if (!igUp) {
5,375✔
165
    bfSize = (uint64_t)(pInfo->watermark / pInfo->interval);
4,759✔
166
    pInfo->numSBFs = bfSize;
4,759✔
167

168
    pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void*));
4,759✔
169
    if (pInfo->pTsSBFs == NULL) {
4,759!
170
      updateInfoDestroy(pInfo);
×
171
      code = terrno;
×
172
      QUERY_CHECK_CODE(code, lino, _end);
×
173
    }
174
    code = windowSBfAdd(pInfo, bfSize);
4,759✔
175
    QUERY_CHECK_CODE(code, lino, _end);
4,759!
176

177
    pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY));
4,759✔
178
    if (pInfo->pTsBuckets == NULL) {
4,759!
179
      updateInfoDestroy(pInfo);
×
180
      code = terrno;
×
181
      QUERY_CHECK_CODE(code, lino, _end);
×
182
    }
183

184
    TSKEY dumy = INT64_MIN;
4,759✔
185
    for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) {
456,465,110!
186
      void* tmp = taosArrayPush(pInfo->pTsBuckets, &dumy);
458,441,912✔
187
      if (!tmp) {
456,460,351!
188
        code = terrno;
×
189
        QUERY_CHECK_CODE(code, lino, _end);
×
190
      }
191
    }
192
    pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
×
193
    pInfo->pCloseWinSBF = NULL;
×
194
  }
195
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
×
196
  pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
5,374✔
197
  if (!pInfo->pMap) {
5,375!
198
    code = terrno;
×
199
    QUERY_CHECK_CODE(code, lino, _end);
×
200
  }
201
  pInfo->maxDataVersion = 0;
5,375✔
202
  pInfo->pkColLen = pkLen;
5,375✔
203
  pInfo->pkColType = pkType;
5,375✔
204
  pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pkLen);
5,375!
205
  if (!pInfo->pKeyBuff) {
5,375!
206
    code = terrno;
×
207
    QUERY_CHECK_CODE(code, lino, _end);
×
208
  }
209
  pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pkLen);
5,375!
210
  if (!pInfo->pValueBuff) {
5,375!
211
    code = terrno;
×
212
    QUERY_CHECK_CODE(code, lino, _end);
×
213
  }
214
  if (pkLen != 0) {
5,375✔
215
    pInfo->comparePkRowFn = compareKeyTsAndPk;
31✔
216
    pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC);
31✔
217
  } else {
218
    pInfo->comparePkRowFn = compareKeyTs;
5,344✔
219
    pInfo->comparePkCol = NULL;
5,344✔
220
  }
221
  (*ppInfo) = pInfo;
5,375✔
222

223
_end:
5,375✔
224
  if (code != TSDB_CODE_SUCCESS) {
5,375!
225
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
226
  }
227
  return code;
5,375✔
228
}
229

230
static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) {
25,999,157✔
231
  int32_t code = TSDB_CODE_SUCCESS;
25,999,157✔
232
  int32_t lino = 0;
25,999,157✔
233
  if (pInfo->minTS == INT64_MIN) {
25,999,157✔
234
    pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
1,786✔
235
  }
236
  int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval);
25,999,157✔
237
  if (index < 0) {
25,999,157✔
238
    (*ppSBf) = NULL;
16,936,934✔
239
    goto _end;
16,936,934✔
240
  }
241
  if (index >= pInfo->numSBFs) {
9,062,223✔
242
    uint64_t count = index + 1 - pInfo->numSBFs;
3,824,019✔
243
    windowSBfDelete(pInfo, count);
3,824,019✔
244
    code = windowSBfAdd(pInfo, count);
3,824,018✔
245
    QUERY_CHECK_CODE(code, lino, _end);
3,824,018!
246

247
    index = pInfo->numSBFs - 1;
3,824,018✔
248
  }
249
  SScalableBf* res = taosArrayGetP(pInfo->pTsSBFs, index);
9,062,222✔
250
  if (res == NULL) {
9,089,183!
251
    int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
×
252
    code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &res);
×
253
    QUERY_CHECK_CODE(code, lino, _end);
×
254

255
    void* tmp = taosArrayPush(pInfo->pTsSBFs, &res);
×
256
    if (!tmp) {
×
257
      code = terrno;
×
258
      QUERY_CHECK_CODE(code, lino, _end);
×
259
    }
260
  }
261
  (*ppSBf) = res;
9,089,297✔
262

263
_end:
26,026,231✔
264
  if (code != TSDB_CODE_SUCCESS) {
26,026,231!
265
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
266
  }
267
  return code;
25,997,329✔
268
}
269

270
bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid) {
22,874✔
271
  void* pVal = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
22,874✔
272
  if (pVal || taosHashGetSize(pInfo->pMap) >= DEFAULT_MAP_SIZE) return true;
22,883!
273
  return false;
1,908✔
274
}
275

276
int32_t updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol,
5,685✔
277
                                TSKEY* pMaxResTs) {
278
  int32_t code = TSDB_CODE_SUCCESS;
5,685✔
279
  int32_t lino = 0;
5,685✔
280
  if (pBlock == NULL || pBlock->info.rows == 0) {
5,685!
281
    (*pMaxResTs) = INT64_MIN;
×
282
    goto _end;
×
283
  }
284
  TSKEY   maxTs = INT64_MIN;
5,685✔
285
  void*   pPkVal = NULL;
5,685✔
286
  void*   pMaxPkVal = NULL;
5,685✔
287
  int32_t maxLen = 0;
5,685✔
288
  int32_t len = 0;
5,685✔
289
  int64_t tbUid = pBlock->info.id.uid;
5,685✔
290

291
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol);
5,685✔
292
  SColumnInfoData* pPkDataInfo = NULL;
5,685✔
293
  if (primaryKeyCol >= 0) {
5,685!
294
    pPkDataInfo = taosArrayGet(pBlock->pDataBlock, primaryKeyCol);
×
295
  }
296

297
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
16,018,428✔
298
    TSKEY ts = ((TSKEY*)pColDataInfo->pData)[i];
16,012,958✔
299
    if (maxTs < ts) {
16,012,958✔
300
      maxTs = ts;
16,012,956✔
301
      if (primaryKeyCol >= 0) {
16,012,956!
302
        pMaxPkVal = colDataGetData(pPkDataInfo, i);
×
303
        maxLen = colDataGetRowLength(pPkDataInfo, i);
×
304
      }
305
    }
306
    SScalableBf* pSBf = NULL;
16,012,958✔
307
    code = getSBf(pInfo, ts, &pSBf);
16,012,958✔
308
    QUERY_CHECK_CODE(code, lino, _end);
16,012,429!
309

310
    if (pSBf) {
16,012,429✔
311
      if (primaryKeyCol >= 0) {
7,052,704!
312
        pPkVal = colDataGetData(pPkDataInfo, i);
×
313
        len = colDataGetRowLength(pPkDataInfo, i);
×
314
      }
315
      int32_t buffLen = getKeyBuff(ts, tbUid, pPkVal, len, pInfo->pKeyBuff);
7,052,704✔
316
      // we don't care whether the data is updated or not
317
      int32_t winRes = 0;
7,052,323✔
318
      code = tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen, &winRes);
7,052,323✔
319
      QUERY_CHECK_CODE(code, lino, _end);
7,053,018!
320
    }
321
  }
322
  void* pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
5,470✔
323
  if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol) == -1) {
5,685!
324
    int32_t valueLen = getValueBuff(maxTs, pMaxPkVal, maxLen, pInfo->pValueBuff);
5,685✔
325
    code = taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), pInfo->pValueBuff, valueLen);
5,685✔
326
    QUERY_CHECK_CODE(code, lino, _end);
5,685!
327
  }
328
  (*pMaxResTs) = maxTs;
5,685✔
329

330
_end:
5,685✔
331
  if (code != TSDB_CODE_SUCCESS) {
5,685!
332
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
333
  }
334
  return code;
5,685✔
335
}
336

337
bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) {
9,998,868✔
338
  int32_t code = TSDB_CODE_SUCCESS;
9,998,868✔
339
  int32_t lino = 0;
9,998,868✔
340
  int32_t res = TSDB_CODE_FAILED;
9,998,868✔
341
  int32_t buffLen = 0;
9,998,868✔
342

343
  buffLen = getKeyBuff(ts, tableId, pPkVal, len, pInfo->pKeyBuff);
9,998,868✔
344
  void**   pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
10,006,346✔
345
  uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
10,007,471✔
346
  TSKEY    maxTs = *(TSKEY*)taosArrayGet(pInfo->pTsBuckets, index);
10,007,471✔
347
  if (maxTs != INT64_MIN && ts < maxTs - pInfo->watermark) {
10,007,018!
348
    // this window has been closed.
349
    if (pInfo->pCloseWinSBF) {
×
350
      code = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen, &res);
×
351
      QUERY_CHECK_CODE(code, lino, _end);
×
352
      if (res == TSDB_CODE_SUCCESS) {
×
353
        return false;
×
354
      } else {
355
        return true;
×
356
      }
357
    }
358
    return true;
×
359
  }
360

361
  SScalableBf* pSBf = NULL;
10,007,018✔
362
  code = getSBf(pInfo, ts, &pSBf);
10,007,018✔
363
  QUERY_CHECK_CODE(code, lino, _end);
9,995,587!
364

365
  int32_t size = taosHashGetSize(pInfo->pMap);
9,995,587✔
366
  if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) ||
9,983,746!
367
      (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1)) {
9,995,082✔
368
    int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
9,970,642✔
369
    code = taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
9,985,855✔
370
    QUERY_CHECK_CODE(code, lino, _end);
9,995,406!
371

372
    // pSBf may be a null pointer
373
    if (pSBf) {
9,995,406✔
374
      res = tScalableBfPutNoCheck(pSBf, pInfo->pKeyBuff, buffLen);
2,029,955✔
375
    }
376
    return false;
9,997,565✔
377
  }
378

379
  // pSBf may be a null pointer
380
  if (pSBf) {
7,356✔
381
    code = tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen, &res);
4,612✔
382
    QUERY_CHECK_CODE(code, lino, _end);
4,618!
383
  }
384

385
  if (!pMapMaxTs && maxTs < ts) {
7,362!
386
    taosArraySet(pInfo->pTsBuckets, index, &ts);
×
387
    return false;
×
388
  }
389

390
  if (ts < pInfo->minTS) {
7,362✔
391
    return true;
3,612✔
392
  } else if (res == TSDB_CODE_SUCCESS) {
3,750✔
393
    return false;
645✔
394
  }
395

396
_end:
3,105✔
397
  if (code != TSDB_CODE_SUCCESS) {
3,105!
398
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
399
  }
400
  // check from tsdb api
401
  return true;
3,105✔
402
}
403

404
void updateInfoDestroy(SUpdateInfo* pInfo) {
9,601✔
405
  if (pInfo == NULL) {
9,601✔
406
    return;
2,726✔
407
  }
408
  taosArrayDestroy(pInfo->pTsBuckets);
6,875✔
409

410
  uint64_t size = taosArrayGetSize(pInfo->pTsSBFs);
6,875✔
411
  for (uint64_t i = 0; i < size; i++) {
88,940✔
412
    SScalableBf* pSBF = taosArrayGetP(pInfo->pTsSBFs, i);
82,065✔
413
    tScalableBfDestroy(pSBF);
82,055✔
414
  }
415

416
  taosArrayDestroy(pInfo->pTsSBFs);
6,875✔
417
  taosMemoryFreeClear(pInfo->pKeyBuff);
6,875!
418
  taosMemoryFreeClear(pInfo->pValueBuff);
6,875!
419
  taosHashCleanup(pInfo->pMap);
6,875✔
420
  updateInfoDestoryColseWinSBF(pInfo);
6,875✔
421
  taosMemoryFree(pInfo);
6,875!
422
}
423

424
void updateInfoAddCloseWindowSBF(SUpdateInfo* pInfo) {
485✔
425
  if (pInfo->pCloseWinSBF) {
485!
426
    return;
×
427
  }
428
  int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
485✔
429
  int32_t code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &pInfo->pCloseWinSBF);
486✔
430
  if (code != TSDB_CODE_SUCCESS) {
486!
431
    pInfo->pCloseWinSBF = NULL;
×
432
    uError("%s failed to add close window SBF since %s", __func__, tstrerror(code));
×
433
  }
434
}
435

436
void updateInfoDestoryColseWinSBF(SUpdateInfo* pInfo) {
29,307✔
437
  if (!pInfo || !pInfo->pCloseWinSBF) {
29,307✔
438
    return;
28,821✔
439
  }
440
  tScalableBfDestroy(pInfo->pCloseWinSBF);
486✔
441
  pInfo->pCloseWinSBF = NULL;
486✔
442
}
443

444
int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo) {
3,058✔
445
  int32_t code = TSDB_CODE_SUCCESS;
3,058✔
446
  int32_t lino = 0;
3,058✔
447
  if (!pInfo) {
3,058✔
448
    if (tEncodeI32(pEncoder, -1) < 0) {
10!
449
      code = TSDB_CODE_FAILED;
×
450
      QUERY_CHECK_CODE(code, lino, _end);
×
451
    }
452
    uDebug("%s line:%d. it did not have updateinfo", __func__, __LINE__);
10✔
453
    return TSDB_CODE_SUCCESS;
10✔
454
  }
455

456
  int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
3,048✔
457
  if (tEncodeI32(pEncoder, size) < 0) {
3,048!
458
    code = TSDB_CODE_FAILED;
×
459
    QUERY_CHECK_CODE(code, lino, _end);
×
460
  }
461

462
  for (int32_t i = 0; i < size; i++) {
386,403,304✔
463
    TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i);
386,400,256✔
464
    if (tEncodeI64(pEncoder, *pTs) < 0) {
772,800,512!
465
      code = TSDB_CODE_FAILED;
×
466
      QUERY_CHECK_CODE(code, lino, _end);
×
467
    }
468
  }
469

470
  if (tEncodeU64(pEncoder, pInfo->numBuckets) < 0) {
6,096!
471
    code = TSDB_CODE_FAILED;
×
472
    QUERY_CHECK_CODE(code, lino, _end);
×
473
  }
474

475
  int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
3,048✔
476
  if (tEncodeI32(pEncoder, sBfSize) < 0) {
3,048!
477
    code = TSDB_CODE_FAILED;
×
478
    QUERY_CHECK_CODE(code, lino, _end);
×
479
  }
480
  for (int32_t i = 0; i < sBfSize; i++) {
53,346✔
481
    SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
50,298✔
482
    if (tScalableBfEncode(pSBf, pEncoder) < 0) {
50,298!
483
      code = TSDB_CODE_FAILED;
×
484
      QUERY_CHECK_CODE(code, lino, _end);
×
485
    }
486
  }
487

488
  if (tEncodeU64(pEncoder, pInfo->numSBFs) < 0) {
6,096!
489
    code = TSDB_CODE_FAILED;
×
490
    QUERY_CHECK_CODE(code, lino, _end);
×
491
  }
492
  if (tEncodeI64(pEncoder, pInfo->interval) < 0) {
6,096!
493
    code = TSDB_CODE_FAILED;
×
494
    QUERY_CHECK_CODE(code, lino, _end);
×
495
  }
496
  if (tEncodeI64(pEncoder, pInfo->watermark) < 0) {
6,096!
497
    code = TSDB_CODE_FAILED;
×
498
    QUERY_CHECK_CODE(code, lino, _end);
×
499
  }
500
  if (tEncodeI64(pEncoder, pInfo->minTS) < 0) {
6,096!
501
    code = TSDB_CODE_FAILED;
×
502
    QUERY_CHECK_CODE(code, lino, _end);
×
503
  }
504

505
  if (tScalableBfEncode(pInfo->pCloseWinSBF, pEncoder) < 0) {
3,048!
506
    code = TSDB_CODE_FAILED;
×
507
    QUERY_CHECK_CODE(code, lino, _end);
×
508
  }
509

510
  int32_t mapSize = taosHashGetSize(pInfo->pMap);
3,048✔
511
  if (tEncodeI32(pEncoder, mapSize) < 0) {
3,048!
512
    code = TSDB_CODE_FAILED;
×
513
    QUERY_CHECK_CODE(code, lino, _end);
×
514
  }
515
  void*  pIte = NULL;
3,048✔
516
  size_t keyLen = 0;
3,048✔
517
  while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
9,686✔
518
    void* key = taosHashGetKey(pIte, &keyLen);
6,638✔
519
    if (tEncodeU64(pEncoder, *(uint64_t*)key) < 0) {
13,276!
520
      code = TSDB_CODE_FAILED;
×
521
      QUERY_CHECK_CODE(code, lino, _end);
×
522
    }
523
    int32_t valueSize = taosHashGetValueSize(pIte);
6,638✔
524
    if (tEncodeBinary(pEncoder, (const uint8_t*)pIte, valueSize) < 0) {
13,276!
525
      code = TSDB_CODE_FAILED;
×
526
      QUERY_CHECK_CODE(code, lino, _end);
×
527
    }
528
  }
529

530
  if (tEncodeU64(pEncoder, pInfo->maxDataVersion) < 0) {
6,096!
531
    code = TSDB_CODE_FAILED;
×
532
    QUERY_CHECK_CODE(code, lino, _end);
×
533
  }
534

535
  if (tEncodeI32(pEncoder, pInfo->pkColLen) < 0) {
6,096!
536
    code = TSDB_CODE_FAILED;
×
537
    QUERY_CHECK_CODE(code, lino, _end);
×
538
  }
539
  if (tEncodeI8(pEncoder, pInfo->pkColType) < 0) {
6,096!
540
    code = TSDB_CODE_FAILED;
×
541
    QUERY_CHECK_CODE(code, lino, _end);
×
542
  }
543

544
_end:
3,048✔
545
  if (code != TSDB_CODE_SUCCESS) {
3,048!
546
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
547
  }
548
  return code;
3,048✔
549
}
550

551
int32_t updateInfoDeserialize(SDecoder* pDeCoder, SUpdateInfo* pInfo) {
1,501✔
552
  int32_t code = TSDB_CODE_SUCCESS;
1,501✔
553
  int32_t lino = 0;
1,501✔
554
  QUERY_CHECK_NULL(pInfo, code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
1,501!
555
  
556
  int32_t size = 0;
1,501✔
557
  if (tDecodeI32(pDeCoder, &size) < 0) return -1;
1,501!
558

559
  if (size < 0) {
1,501✔
560
    return -1;
1✔
561
  }
562
  pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
1,500✔
563
  QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno);
1,500!
564

565
  TSKEY ts = INT64_MIN;
1,500✔
566
  for (int32_t i = 0; i < size; i++) {
190,318,044✔
567
    if (tDecodeI64(pDeCoder, &ts) < 0) return -1;
190,316,544!
568
    void* tmp = taosArrayPush(pInfo->pTsBuckets, &ts);
190,316,544✔
569
    if (!tmp) {
190,316,544!
570
      code = terrno;
×
571
      QUERY_CHECK_CODE(code, lino, _error);
×
572
    }
573
  }
574

575
  if (tDecodeU64(pDeCoder, &pInfo->numBuckets) < 0) return -1;
3,000!
576

577
  int32_t sBfSize = 0;
1,500✔
578
  if (tDecodeI32(pDeCoder, &sBfSize) < 0) return -1;
1,500!
579
  pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void*));
1,500✔
580
  QUERY_CHECK_NULL(pInfo->pTsSBFs, code, lino, _error, terrno);
1,500!
581

582
  for (int32_t i = 0; i < sBfSize; i++) {
26,617✔
583
    SScalableBf* pSBf = NULL;
25,117✔
584
    code = tScalableBfDecode(pDeCoder, &pSBf);
25,117✔
585
    QUERY_CHECK_CODE(code, lino, _error);
25,117!
586

587
    void* tmp = taosArrayPush(pInfo->pTsSBFs, &pSBf);
25,117✔
588
    if (!tmp) {
25,117!
589
      code = terrno;
×
590
      QUERY_CHECK_CODE(code, lino, _error);
×
591
    }
592
  }
593

594
  if (tDecodeU64(pDeCoder, &pInfo->numSBFs) < 0) return -1;
3,000!
595
  if (tDecodeI64(pDeCoder, &pInfo->interval) < 0) return -1;
3,000!
596
  if (tDecodeI64(pDeCoder, &pInfo->watermark) < 0) return -1;
3,000!
597
  if (tDecodeI64(pDeCoder, &pInfo->minTS) < 0) return -1;
3,000!
598

599
  code = tScalableBfDecode(pDeCoder, &pInfo->pCloseWinSBF);
1,500✔
600
  if (code != TSDB_CODE_SUCCESS) {
1,500!
601
    pInfo->pCloseWinSBF = NULL;
×
602
    code = TSDB_CODE_SUCCESS;
×
603
  }
604

605
  int32_t mapSize = 0;
1,500✔
606
  if (tDecodeI32(pDeCoder, &mapSize) < 0) return -1;
1,500!
607
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
1,500✔
608
  pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK);
1,500✔
609
  uint64_t uid = 0;
1,500✔
610
  void*    pVal = NULL;
1,500✔
611
  uint32_t  valSize = 0;
1,500✔
612
  for (int32_t i = 0; i < mapSize; i++) {
3,797✔
613
    if (tDecodeU64(pDeCoder, &uid) < 0) return -1;
2,297!
614
    if (tDecodeBinary(pDeCoder, (uint8_t**)&pVal, &valSize) < 0) return -1;
2,297!
615
    code = taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize);
2,297✔
616
    QUERY_CHECK_CODE(code, lino, _error);
2,297!
617
  }
618
  QUERY_CHECK_CONDITION((mapSize == taosHashGetSize(pInfo->pMap)), code, lino, _error,
1,500!
619
                        TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
620
  if (tDecodeU64(pDeCoder, &pInfo->maxDataVersion) < 0) return -1;
3,000!
621

622
  if (tDecodeI32(pDeCoder, &pInfo->pkColLen) < 0) return -1;
3,000!
623
  if (tDecodeI8(pDeCoder, &pInfo->pkColType) < 0) return -1;
3,000!
624

625
  pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pInfo->pkColLen);
1,500!
626
  QUERY_CHECK_NULL(pInfo->pKeyBuff, code, lino, _error, terrno);
1,500!
627

628
  pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen);
1,500!
629
  QUERY_CHECK_NULL(pInfo->pValueBuff, code, lino, _error, terrno);
1,500!
630

631
  if (pInfo->pkColLen != 0) {
1,500!
632
    pInfo->comparePkRowFn = compareKeyTsAndPk;
×
633
    pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);
×
634
  } else {
635
    pInfo->comparePkRowFn = compareKeyTs;
1,500✔
636
    pInfo->comparePkCol = NULL;
1,500✔
637
  }
638

639
_error:
1,500✔
640
  if (code != TSDB_CODE_SUCCESS) {
1,500!
641
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
642
  }
643
  return code;
1,500✔
644
}
645

646
bool isIncrementalTimeStamp(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) {
3,374✔
647
  int32_t code = TSDB_CODE_SUCCESS;
3,374✔
648
  int32_t lino = 0;
3,374✔
649
  TSKEY*  pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
3,374✔
650
  bool    res = true;
3,374✔
651
  if (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == 1) {
3,374✔
652
    res = false;
267✔
653
  } else {
654
    int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
3,107✔
655
    code = taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
3,107✔
656
    QUERY_CHECK_CODE(code, lino, _error);
3,108!
657
  }
658
  return res;
3,375✔
659

660
_error:
×
661
  if (code != TSDB_CODE_SUCCESS) {
×
662
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
663
  }
664
  return false;
×
665
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc