• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3658

14 Mar 2025 08:10AM UTC coverage: 63.25% (+0.4%) from 62.877%
#3658

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

148878 of 302527 branches covered (49.21%)

Branch coverage included in aggregate %.

88 of 99 new or added lines in 12 files covered. (88.89%)

3290 existing lines in 68 files now uncovered.

234027 of 302857 relevant lines covered (77.27%)

17847433.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.59
/source/libs/stream/src/streamQueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
#define MAX_STREAM_EXEC_BATCH_NUM 32
19
#define MAX_SMOOTH_BURST_RATIO    5  // 5 sec
20

21
// todo refactor:
22
// read data from input queue
23
typedef struct SQueueReader {
24
  SStreamQueue* pQueue;
25
  int32_t       taskLevel;
26
  int32_t       maxBlocks;     // maximum block in one batch
27
  int32_t       waitDuration;  // maximum wait time to format several block into a batch to process, unit: ms
28
} SQueueReader;
29

30
#define streamQueueCurItem(_q) ((_q)->qItem)
31

32
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
33
static void streamTaskPutbackToken(STokenBucket* pBucket);
34
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
35
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
36

37
static void streamQueueCleanup(SStreamQueue* pQueue) {
29,366✔
38
  SStreamQueueItem* qItem = NULL;
29,366✔
39
  while (1) {
40
    streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, "");
29,698✔
41
    if (qItem == NULL) {
29,717✔
42
      break;
29,383✔
43
    }
44
    streamFreeQitem(qItem);
334✔
45
  }
46
  pQueue->status = STREAM_QUEUE__SUCESS;
29,383✔
47
}
29,383✔
48

49
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
29,366✔
50
  *pQ = NULL;
29,366✔
51

52
  int32_t code = 0;
29,366✔
53
  int32_t lino = 0;
29,366✔
54

55
  SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
29,366!
56
  if (pQueue == NULL) {
29,402!
57
    return terrno;
×
58
  }
59

60
  code = taosOpenQueue(&pQueue->pQueue);
29,402✔
61
  TSDB_CHECK_CODE(code, lino, _error);
29,373!
62

63
  code = taosAllocateQall(&pQueue->qall);
29,373✔
64
  TSDB_CHECK_CODE(code, lino, _error);
29,398!
65

66
  code = taosOpenQueue(&pQueue->pChkptQueue);
29,398✔
67
  TSDB_CHECK_CODE(code, lino, _error);
29,393!
68

69
  pQueue->status = STREAM_QUEUE__SUCESS;
29,393✔
70

71
  taosSetQueueCapacity(pQueue->pQueue, cap);
29,393✔
72
  taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
29,387✔
73

74
  *pQ = pQueue;
29,388✔
75
  return code;
29,388✔
76

77
_error:
×
78
  streamQueueClose(pQueue, 0);
×
79
  stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
×
80
  return code;
×
81
}
82

83
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
29,370✔
84
  stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue,
29,370✔
85
          taosQueueItemSize(pQueue->pQueue));
86
  streamQueueCleanup(pQueue);
29,370✔
87

88
  taosFreeQall(pQueue->qall);
29,381✔
89
  taosCloseQueue(pQueue->pQueue);
29,381✔
90
  pQueue->pQueue = NULL;
29,386✔
91

92
  taosCloseQueue(pQueue->pChkptQueue);
29,386✔
93
  pQueue->pChkptQueue = NULL;
29,392✔
94

95
  taosMemoryFree(pQueue);
29,392!
96
}
29,399✔
97

98
void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
156,109✔
99
  *pItem = NULL;
156,109✔
100
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
156,109✔
101

102
  if (flag == STREAM_QUEUE__FAILED) {
156,140✔
103
    *pItem = streamQueueCurItem(pQueue);
521✔
104
  } else {
105
    pQueue->qItem = NULL;
155,619✔
106
    (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
155,619✔
107

108
    if (pQueue->qItem == NULL) {
155,598✔
109
      (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
125,715✔
110
      (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
125,724✔
111
    }
112

113
    *pItem = streamQueueCurItem(pQueue);
155,608✔
114
  }
115
}
156,129✔
116

117
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
576,310✔
118
  *pItem = NULL;
576,310✔
119
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
576,310✔
120

121
  if (flag == STREAM_QUEUE__CHKPTFAILED) {
576,417!
122
    *pItem = pQueue->qChkptItem;
×
123
    return;
×
124
  }
125

126
  if (flag == STREAM_QUEUE__FAILED) {
576,417✔
127
    *pItem = pQueue->qItem;
2,811✔
128
    return;
2,811✔
129
  }
130

131
  pQueue->qChkptItem = NULL;
573,606✔
132
  taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
573,606✔
133
  if (pQueue->qChkptItem != NULL) {
573,627✔
134
    stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
6,184✔
135
    *pItem = pQueue->qChkptItem;
6,154✔
136
    return;
6,154✔
137
  }
138

139
  // if in checkpoint status, not read data from ordinary input q.
140
  if (status == TASK_STATUS__CK) {
567,443✔
141
    stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
2,883✔
142
    return;
2,883✔
143
  }
144

145
  // let's try the ordinary input q
146
  pQueue->qItem = NULL;
564,560✔
147
  int32_t code = taosGetQitem(pQueue->qall, &pQueue->qItem);
564,560✔
148
  if (code) {
564,552✔
149
    stError("s-task:%s failed to extract data from inputQ, code:%s", id, tstrerror(code));
456,542!
150
  }
151

152
  if (pQueue->qItem == NULL) {
564,559✔
153
    code = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
108,010✔
154
    if (code) {
108,024✔
155
      stError("s-task:%s failed to read qitem into qall, code:%s", id, tstrerror(code));
25,231!
156
    }
157
    code = taosGetQitem(pQueue->qall, &pQueue->qItem);
108,031✔
158
    if (code) {
108,030✔
159
      stError("s-task:%s failed to extract data from inputQ(qall), code:%s", id, tstrerror(code));
25,238!
160
    }
161
  }
162

163
  *pItem = streamQueueCurItem(pQueue);
564,562✔
164
}
165

166
void streamQueueProcessSuccess(SStreamQueue* queue) {
520,639✔
167
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
520,639!
168
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
169
    return;
×
170
  }
171

172
  queue->qItem = NULL;
520,634✔
173
  queue->qChkptItem = NULL;
520,634✔
174
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
520,634✔
175
}
176

177
void streamQueueProcessFail(SStreamQueue* queue) {
3,332✔
178
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
3,332!
179
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
180
    return;
×
181
  }
182
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
3,332✔
183
}
184

185
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
×
186
  if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
×
187
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
×
188
    return;
×
189
  }
190
  atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
×
191
}
192

193
bool streamQueueIsFull(const SStreamQueue* pQueue) {
984,466✔
194
  int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
984,466✔
195
  if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
984,693✔
196
    return true;
9✔
197
  }
198

199
  return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
984,684✔
200
}
201

202
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
1,962,338✔
203
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
1,962,338✔
204
  int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
1,965,278✔
205

206
  return numOfItems1 + numOfItems2;
1,964,772✔
207
}
208

209
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
53,461✔
210
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
53,461✔
211
  int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
53,461✔
212

213
  return numOfItems1 + numOfItems2;
53,461✔
214
}
215

216
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
62,544✔
217
  return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
62,544✔
218
}
219

220
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
520,624✔
221
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
520,624✔
222
  return p->dataSize;
520,624✔
223
}
224

225
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
464,720✔
226
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
464,720✔
227
  p->dataSize += size;
464,720✔
228
}
464,720✔
229

230
const char* streamQueueItemGetTypeStr(int32_t type) {
74,286✔
231
  switch (type) {
74,286✔
232
    case STREAM_INPUT__CHECKPOINT:
4,747✔
233
      return "checkpoint";
4,747✔
234
    case STREAM_INPUT__CHECKPOINT_TRIGGER:
18,849✔
235
      return "checkpoint-trigger";
18,849✔
236
    case STREAM_INPUT__TRANS_STATE:
16,012✔
237
      return "trans-state";
16,012✔
238
    case STREAM_INPUT__REF_DATA_BLOCK:
6,159✔
239
      return "ref-block";
6,159✔
240
    default:
28,519✔
241
      return "datablock";
28,519✔
242
  }
243
}
244

245
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
145,308✔
246
                                             int32_t* blockSize) {
247
  const char*   id = pTask->id.idStr;
145,308✔
248
  int32_t       taskLevel = pTask->info.taskLevel;
145,308✔
249
  SStreamQueue* pQueue = pTask->inputq.queue;
145,308✔
250

251
  *pInput = NULL;
145,308✔
252
  *numOfBlocks = 0;
145,308✔
253
  *blockSize = 0;
145,308✔
254

255
  // no available token in bucket for sink task, let's wait for a little bit
256
  if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
145,308!
257
    stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
×
258
    return EXEC_AFTER_IDLE;
×
259
  }
260

261
  while (1) {
508,512✔
262
    ETaskStatus status = streamTaskGetStatus(pTask).state;
653,822✔
263
    if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
653,761!
UNCOV
264
      stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
×
265
      return EXEC_CONTINUE;
145,355✔
266
    }
267

268
    SStreamQueueItem* qItem = NULL;
653,782✔
269
    if (taskLevel == TASK_LEVEL__SOURCE) {
653,782✔
270
      streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
546,619✔
271
    } else {
272
      streamQueueNextItem(pQueue, &qItem);
107,163✔
273
    }
274

275
    if (qItem == NULL) {
653,892✔
276
      // restore the token to bucket
277
      if (*numOfBlocks > 0) {
103,749✔
278
        *blockSize = streamQueueItemGetSize(*pInput);
40,458✔
279
        if (taskLevel == TASK_LEVEL__SINK) {
40,458✔
280
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
16,809✔
281
        }
282
      } else {
283
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
63,291✔
284
      }
285

286
      return EXEC_CONTINUE;
103,737✔
287
    }
288

289
    // do not merge blocks for sink node and check point data block
290
    int8_t type = qItem->type;
550,143✔
291
    if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
550,143✔
292
        type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK) {
525,526✔
293
      const char* p = streamQueueItemGetTypeStr(type);
29,370✔
294

295
      if (*pInput == NULL) {
29,366✔
296
        stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
26,157✔
297

298
        // restore the token to bucket in case of checkpoint/trans-state msg
299
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
26,157✔
300
        *blockSize = 0;
26,156✔
301
        *numOfBlocks = 1;
26,156✔
302
        *pInput = qItem;
26,156✔
303
        return EXEC_CONTINUE;
26,156✔
304
      } else {  // previous existed blocks needs to be handled, before handle the checkpoint msg block
305
        stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
3,209✔
306
        *blockSize = streamQueueItemGetSize(*pInput);
3,209✔
307
        if (taskLevel == TASK_LEVEL__SINK) {
3,213✔
308
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
503✔
309
        }
310

311
        if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
3,213!
312
            (taskLevel == TASK_LEVEL__SOURCE)) {
313
          streamQueueGetSourceChkptFailed(pQueue);
×
314
        } else {
315
          streamQueueProcessFail(pQueue);
3,213✔
316
        }
317
        return EXEC_CONTINUE;
3,213✔
318
      }
319
    } else {
320
      if (*pInput == NULL) {
520,773✔
321
        *pInput = qItem;
55,919✔
322
      } else { // merge current block failed, let's handle the already merged blocks.
323
        void*   newRet = NULL;
464,854✔
324
        int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
464,854✔
325
        if (newRet == NULL) {
464,842✔
326
          if (code != -1) {
119!
327
            stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
×
328
                    tstrerror(code));
329
          }
330

331
          *blockSize = streamQueueItemGetSize(*pInput);
119✔
332
          if (taskLevel == TASK_LEVEL__SINK) {
119!
333
            streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
334
          }
335

336
          streamQueueProcessFail(pQueue);
119✔
337
          return EXEC_CONTINUE;
119✔
338
        }
339

340
        *pInput = newRet;
464,723✔
341
      }
342

343
      *numOfBlocks += 1;
520,642✔
344
      streamQueueProcessSuccess(pQueue);
520,642✔
345

346
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
520,642✔
347
        stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
12,130✔
348

349
        *blockSize = streamQueueItemGetSize(*pInput);
12,130✔
350
        if (taskLevel == TASK_LEVEL__SINK) {
12,130✔
351
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
13✔
352
        }
353

354
        return EXEC_CONTINUE;
12,130✔
355
      }
356
    }
357
  }
358
}
359

360
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
547,042✔
361
  int8_t      type = pItem->type;
547,042✔
362
  STaosQueue* pQueue = pTask->inputq.queue->pQueue;
547,042✔
363
  int32_t     level = pTask->info.taskLevel;
547,042✔
364
  int32_t     total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
547,042✔
365

366
  if (type == STREAM_INPUT__DATA_SUBMIT) {
547,078✔
367
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
471,880✔
368
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
471,880!
369
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
31✔
370
      stTrace(
31!
371
          "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
372
          pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
373
      streamDataSubmitDestroy(px);
31✔
374
      return TSDB_CODE_STREAM_INPUTQ_FULL;
31✔
375
    }
376

377
    int32_t msgLen = px->submit.msgLen;
471,855✔
378
    int64_t ver = px->submit.ver;
471,855✔
379

380
    int32_t code = taosWriteQitem(pQueue, pItem);
471,855✔
381
    if (code != TSDB_CODE_SUCCESS) {
471,848!
382
      streamDataSubmitDestroy(px);
×
383
      return code;
×
384
    }
385

386
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
471,848✔
387

388
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
389
    stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
471,843✔
390
            msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
391
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
121,699✔
392
    if (streamQueueIsFull(pTask->inputq.queue)) {
46,502!
393
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
394

395
      stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
×
396
              pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
397
      streamFreeQitem(pItem);
×
398
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
399
    }
400

401
    int32_t code = taosWriteQitem(pQueue, pItem);
46,501✔
402
    if (code != TSDB_CODE_SUCCESS) {
46,501!
403
      streamFreeQitem(pItem);
×
404
      return code;
×
405
    }
406

407
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
46,501✔
408
    stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
46,501✔
409
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
28,696✔
410
             type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) {
29,225✔
411

412
    int32_t code = 0;
24,226✔
413
    if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
30,378✔
414
      STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
6,148✔
415
      code = taosWriteQitem(pChkptQ, pItem);
6,148✔
416

417
      double  size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
6,152✔
418
      int32_t num = taosQueueItemSize(pChkptQ);
6,151✔
419

420
      stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
6,152✔
421
              pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
422
    } else {
423
      code = taosWriteQitem(pQueue, pItem);
18,078✔
424
      if (code != TSDB_CODE_SUCCESS) {
18,077!
425
        streamFreeQitem(pItem);
×
426
        return code;
×
427
      }
428

429
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
18,077✔
430
      stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
18,077✔
431
              pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
432
    }
433
  } else if (type == STREAM_INPUT__GET_RES) {
4,470!
434
    // use the default memory limit, refactor later.
435
    int32_t code = taosWriteQitem(pQueue, pItem);
4,470✔
436
    if (code != TSDB_CODE_SUCCESS) {
4,470!
437
      streamFreeQitem(pItem);
×
438
      return code;
×
439
    }
440

441
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
4,470✔
442
    stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
4,470✔
443
  } else {
444
    stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
×
445
    return TSDB_CODE_INVALID_PARA;
×
446
  }
447

448
  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
547,041✔
449
      (pTask->info.delaySchedParam != 0)) {
527,958✔
450
    (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
11,894✔
451
                                        TASK_TRIGGER_STATUS__MAY_ACTIVE);
452
    stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
11,895✔
453
            pTask->schedInfo.status);
454
  }
455

456
  return 0;
547,044✔
457
}
458

459
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
2,232✔
460
  int32_t           code = 0;
2,232✔
461
  SStreamDataBlock* pTranstate = NULL;
2,232✔
462
  SSDataBlock*      pBlock = NULL;
2,232✔
463

464
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate);
2,232✔
465
  if (code) {
2,232!
466
    return code;
×
467
  }
468

469
  pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
2,232!
470
  if (pBlock == NULL) {
2,232!
471
    code = terrno;
×
472
    goto _err;
×
473
  }
474

475
  pTranstate->type = STREAM_INPUT__TRANS_STATE;
2,232✔
476

477
  pBlock->info.type = STREAM_TRANS_STATE;
2,232✔
478
  pBlock->info.rows = 1;
2,232✔
479
  pBlock->info.childId = pTask->info.selfChildId;
2,232✔
480

481
  pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
2,232✔
482
  if (pTranstate->blocks == NULL) {
2,232!
483
    code = terrno;
×
484
    goto _err;
×
485
  }
486

487
  void* p = taosArrayPush(pTranstate->blocks, pBlock);
2,232✔
488
  if (p == NULL) {
2,232!
489
    code = terrno;
×
490
    goto _err;
×
491
  }
492

493
  taosMemoryFree(pBlock);
2,232!
494
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
2,232!
495
    code = TSDB_CODE_OUT_OF_MEMORY;
×
496
    goto _err;
×
497
  }
498

499
  pTask->status.appendTranstateBlock = true;
2,232✔
500
  return TSDB_CODE_SUCCESS;
2,232✔
501

502
_err:
×
503
  taosMemoryFree(pBlock);
×
504
  taosFreeQitem(pTranstate);
×
505
  return code;
×
506
}
507

508
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
509
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
22,499✔
510
  STaosQueue* pQueue = pTask->outputq.queue->pQueue;
22,499✔
511
  int32_t     code = taosWriteQitem(pQueue, pBlock);
22,499✔
512

513
  int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
22,499✔
514
  double  size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
22,499✔
515
  if (code != 0) {
22,499!
516
    stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
×
517
            pTask->id.idStr, total + 1, size, tstrerror(code));
518
  } else {
519
    if (streamQueueIsFull(pTask->outputq.queue)) {
22,499!
520
      stWarn(
×
521
          "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms "
522
          "after handle this batch of blocks",
523
          pTask->id.idStr, total, size);
524
    } else {
525
      stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
22,499✔
526
    }
527
  }
528

529
  return code;
22,499✔
530
}
531

532
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
14,678✔
533
                                  const char* id) {
534
  if (numCap < 10 || numRate < 10 || pBucket == NULL) {
14,678!
535
    stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
×
536
    return TSDB_CODE_INVALID_PARA;
×
537
  }
538

539
  pBucket->numCapacity = numCap;
14,698✔
540
  pBucket->numOfToken = numCap;
14,698✔
541
  pBucket->numRate = numRate;
14,698✔
542

543
  pBucket->quotaRate = quotaRate;
14,698✔
544
  pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
14,698✔
545
  pBucket->quotaRemain = pBucket->quotaCapacity;
14,698✔
546

547
  pBucket->tokenFillTimestamp = taosGetTimestampMs();
14,694✔
548
  pBucket->quotaFillTimestamp = taosGetTimestampMs();
14,699✔
549
  stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
14,699✔
550
  return TSDB_CODE_SUCCESS;
14,699✔
551
}
552

553
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
58,299✔
554
  int64_t now = taosGetTimestampMs();
58,303✔
555

556
  int64_t deltaToken = now - pBucket->tokenFillTimestamp;
58,303✔
557
  if (pBucket->numOfToken < 0) {
58,303!
558
    return;
×
559
  }
560

561
  int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
58,303✔
562
  if (incNum > 0) {
58,303✔
563
    pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
30,740✔
564
    pBucket->tokenFillTimestamp = now;
30,740✔
565
  }
566

567
  // increase the new available quota as time goes on
568
  int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
58,303✔
569
  double  incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
58,303✔
570
  if (incSize > 0) {
58,303✔
571
    pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
38,492✔
572
    pBucket->quotaFillTimestamp = now;
38,492✔
573
  }
574

575
  if (incNum > 0 || incSize > 0) {
58,303✔
576
    stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
38,484✔
577
            ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
578
            pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
579
  }
580
}
581

582
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
58,299✔
583
  fillTokenBucket(pBucket, id);
58,299✔
584

585
  if (pBucket->numOfToken > 0) {
58,306!
586
    if (pBucket->quotaRemain > 0) {
58,309!
587
      pBucket->numOfToken -= 1;
58,309✔
588
      return true;
58,309✔
589
    } else {  // no available size quota now
590
      return false;
×
591
    }
592
  } else {
593
    return false;
×
594
  }
595
}
596

597
void streamTaskPutbackToken(STokenBucket* pBucket) {
89,414✔
598
  pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
89,414✔
599
}
89,414✔
600

601
// size in KB
602
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); }
17,325✔
603

604
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc