• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.59
/source/libs/stream/src/streamQueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
#define MAX_STREAM_EXEC_BATCH_NUM 32
19
#define MAX_SMOOTH_BURST_RATIO    5  // 5 sec
20

21
// todo refactor:
22
// read data from input queue
23
typedef struct SQueueReader {
24
  SStreamQueue* pQueue;
25
  int32_t       taskLevel;
26
  int32_t       maxBlocks;     // maximum block in one batch
27
  int32_t       waitDuration;  // maximum wait time to format several block into a batch to process, unit: ms
28
} SQueueReader;
29

30
#define streamQueueCurItem(_q) ((_q)->qItem)
31

32
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
33
static void streamTaskPutbackToken(STokenBucket* pBucket);
34
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
35
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
36

37
static void streamQueueCleanup(SStreamQueue* pQueue) {
29,391✔
38
  SStreamQueueItem* qItem = NULL;
29,391✔
39
  while (1) {
40
    streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, "");
29,984✔
41
    if (qItem == NULL) {
30,008✔
42
      break;
29,414✔
43
    }
44
    streamFreeQitem(qItem);
594✔
45
  }
46
  pQueue->status = STREAM_QUEUE__SUCESS;
29,414✔
47
}
29,414✔
48

49
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
29,356✔
50
  *pQ = NULL;
29,356✔
51

52
  int32_t code = 0;
29,356✔
53
  int32_t lino = 0;
29,356✔
54

55
  SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
29,356!
56
  if (pQueue == NULL) {
29,415!
57
    return terrno;
×
58
  }
59

60
  code = taosOpenQueue(&pQueue->pQueue);
29,415✔
61
  TSDB_CHECK_CODE(code, lino, _error);
29,383!
62

63
  code = taosAllocateQall(&pQueue->qall);
29,383✔
64
  TSDB_CHECK_CODE(code, lino, _error);
29,418!
65

66
  code = taosOpenQueue(&pQueue->pChkptQueue);
29,418✔
67
  TSDB_CHECK_CODE(code, lino, _error);
29,417!
68

69
  pQueue->status = STREAM_QUEUE__SUCESS;
29,417✔
70

71
  taosSetQueueCapacity(pQueue->pQueue, cap);
29,417✔
72
  taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
29,393✔
73

74
  *pQ = pQueue;
29,394✔
75
  return code;
29,394✔
76

77
_error:
×
78
  streamQueueClose(pQueue, 0);
×
79
  stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
×
80
  return code;
×
81
}
82

83
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
29,402✔
84
  stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue,
29,402✔
85
          taosQueueItemSize(pQueue->pQueue));
86
  streamQueueCleanup(pQueue);
29,402✔
87

88
  taosFreeQall(pQueue->qall);
29,414✔
89
  taosCloseQueue(pQueue->pQueue);
29,404✔
90
  pQueue->pQueue = NULL;
29,418✔
91

92
  taosCloseQueue(pQueue->pChkptQueue);
29,418✔
93
  pQueue->pChkptQueue = NULL;
29,438✔
94

95
  taosMemoryFree(pQueue);
29,438!
96
}
29,434✔
97

98
void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
155,082✔
99
  *pItem = NULL;
155,082✔
100
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
155,082✔
101

102
  if (flag == STREAM_QUEUE__FAILED) {
155,112✔
103
    *pItem = streamQueueCurItem(pQueue);
571✔
104
  } else {
105
    pQueue->qItem = NULL;
154,541✔
106
    (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
154,541✔
107

108
    if (pQueue->qItem == NULL) {
154,530✔
109
      (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
122,091✔
110
      (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
122,097✔
111
    }
112

113
    *pItem = streamQueueCurItem(pQueue);
154,529✔
114
  }
115
}
155,100✔
116

117
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
576,723✔
118
  *pItem = NULL;
576,723✔
119
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
576,723✔
120

121
  if (flag == STREAM_QUEUE__CHKPTFAILED) {
576,743!
122
    *pItem = pQueue->qChkptItem;
×
123
    return;
×
124
  }
125

126
  if (flag == STREAM_QUEUE__FAILED) {
576,743✔
127
    *pItem = pQueue->qItem;
2,752✔
128
    return;
2,752✔
129
  }
130

131
  pQueue->qChkptItem = NULL;
573,991✔
132
  taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
573,991✔
133
  if (pQueue->qChkptItem != NULL) {
574,101✔
134
    stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
6,674✔
135
    *pItem = pQueue->qChkptItem;
6,553✔
136
    return;
6,553✔
137
  }
138

139
  // if in checkpoint status, not read data from ordinary input q.
140
  if (status == TASK_STATUS__CK) {
567,427✔
141
    stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
3,050✔
142
    return;
3,050✔
143
  }
144

145
  // let's try the ordinary input q
146
  pQueue->qItem = NULL;
564,377✔
147
  int32_t num = taosGetQitem(pQueue->qall, &pQueue->qItem);
564,377✔
148
  TAOS_UNUSED(num);
149

150
  if (pQueue->qItem == NULL) {
564,315✔
151
    num = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
109,326✔
152
    num = taosGetQitem(pQueue->qall, &pQueue->qItem);
109,366✔
153
    TAOS_UNUSED(num);
154
  }
155

156
  *pItem = streamQueueCurItem(pQueue);
564,369✔
157
}
158

159
void streamQueueProcessSuccess(SStreamQueue* queue) {
519,575✔
160
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
519,575!
UNCOV
161
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
UNCOV
162
    return;
×
163
  }
164

165
  queue->qItem = NULL;
519,568✔
166
  queue->qChkptItem = NULL;
519,568✔
167
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
519,568✔
168
}
169

170
void streamQueueProcessFail(SStreamQueue* queue) {
3,322✔
171
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
3,322!
UNCOV
172
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
UNCOV
173
    return;
×
174
  }
175
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
3,322✔
176
}
177

UNCOV
178
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
×
179
  if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
×
180
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
×
UNCOV
181
    return;
×
182
  }
UNCOV
183
  atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
×
184
}
185

186
bool streamQueueIsFull(const SStreamQueue* pQueue) {
1,022,376✔
187
  int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
1,022,376✔
188
  if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
1,022,382✔
189
    return true;
19✔
190
  }
191

192
  return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
1,022,363✔
193
}
194

195
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
2,041,375✔
196
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
2,041,375✔
197
  int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
2,042,456✔
198

199
  return numOfItems1 + numOfItems2;
2,042,249✔
200
}
201

202
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
54,711✔
203
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
54,711✔
204
  int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
54,711✔
205

206
  return numOfItems1 + numOfItems2;
54,711✔
207
}
208

209
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
71,670✔
210
  return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
71,670✔
211
}
212

213
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
519,565✔
214
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
519,565✔
215
  return p->dataSize;
519,565✔
216
}
217

218
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
464,765✔
219
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
464,765✔
220
  p->dataSize += size;
464,765✔
221
}
464,765✔
222

223
const char* streamQueueItemGetTypeStr(int32_t type) {
75,263✔
224
  switch (type) {
75,263!
225
    case STREAM_INPUT__CHECKPOINT:
4,975✔
226
      return "checkpoint";
4,975✔
227
    case STREAM_INPUT__CHECKPOINT_TRIGGER:
19,412✔
228
      return "checkpoint-trigger";
19,412✔
229
    case STREAM_INPUT__TRANS_STATE:
15,938✔
230
      return "trans-state";
15,938✔
231
    case STREAM_INPUT__REF_DATA_BLOCK:
6,162✔
232
      return "ref-block";
6,162✔
UNCOV
233
    case STREAM_INPUT__RECALCULATE:
×
UNCOV
234
      return "recalculate";
×
235
    default:
28,776✔
236
      return "datablock";
28,776✔
237
  }
238
}
239

240
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
145,163✔
241
                                             int32_t* blockSize) {
242
  const char*   id = pTask->id.idStr;
145,163✔
243
  int32_t       taskLevel = pTask->info.taskLevel;
145,163✔
244
  SStreamQueue* pQueue = pTask->inputq.queue;
145,163✔
245

246
  *pInput = NULL;
145,163✔
247
  *numOfBlocks = 0;
145,163✔
248
  *blockSize = 0;
145,163✔
249

250
  // no available token in bucket for sink task, let's wait for a little bit
251
  if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
145,163!
UNCOV
252
    stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
×
UNCOV
253
    return EXEC_AFTER_IDLE;
×
254
  }
255

256
  while (1) {
507,206✔
257
    ETaskStatus status = streamTaskGetStatus(pTask).state;
652,347✔
258
    if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
652,298!
259
      stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
90!
260
      return EXEC_CONTINUE;
145,101✔
261
    }
262

263
    SStreamQueueItem* qItem = NULL;
652,208✔
264
    if (taskLevel == TASK_LEVEL__SOURCE) {
652,208✔
265
      streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
546,686✔
266
    } else {
267
      streamQueueNextItem(pQueue, &qItem);
105,522✔
268
    }
269

270
    if (qItem == NULL) {
652,358✔
271
      // restore the token to bucket
272
      if (*numOfBlocks > 0) {
102,477✔
273
        *blockSize = streamQueueItemGetSize(*pInput);
39,113✔
274
        if (taskLevel == TASK_LEVEL__SINK) {
39,105✔
275
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
15,157✔
276
        }
277
      } else {
278
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
63,364✔
279
      }
280

281
      return EXEC_CONTINUE;
102,444✔
282
    }
283

284
    // do not merge blocks for sink node and check point data block
285
    int8_t type = qItem->type;
549,881✔
286
    if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
549,881✔
287
        type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK || type == STREAM_INPUT__RECALCULATE) {
524,481✔
288
      const char* p = streamQueueItemGetTypeStr(type);
30,183✔
289

290
      if (*pInput == NULL) {
30,152✔
291
        stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
26,958✔
292

293
        // restore the token to bucket in case of checkpoint/trans-state msg
294
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
26,958✔
295
        *blockSize = 0;
26,953✔
296
        *numOfBlocks = 1;
26,953✔
297
        *pInput = qItem;
26,953✔
298
        return EXEC_CONTINUE;
26,953✔
299
      } else {  // previous existed blocks needs to be handled, before handle the checkpoint msg block
300
        stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
3,194✔
301
        *blockSize = streamQueueItemGetSize(*pInput);
3,194✔
302
        if (taskLevel == TASK_LEVEL__SINK) {
3,197✔
303
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
551✔
304
        }
305

306
        if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
3,197!
307
            (taskLevel == TASK_LEVEL__SOURCE)) {
UNCOV
308
          streamQueueGetSourceChkptFailed(pQueue);
×
309
        } else {
310
          streamQueueProcessFail(pQueue);
3,197✔
311
        }
312
        return EXEC_CONTINUE;
3,198✔
313
      }
314
    } else {
315
      if (*pInput == NULL) {
519,698✔
316
        *pInput = qItem;
54,779✔
317
      } else { // merge current block failed, let's handle the already merged blocks.
318
        void*   newRet = NULL;
464,919✔
319
        int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
464,919✔
320
        if (newRet == NULL) {
464,918✔
321
          if (code != -1) {
124!
UNCOV
322
            stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
×
323
                    tstrerror(code));
324
          }
325

326
          *blockSize = streamQueueItemGetSize(*pInput);
124✔
327
          if (taskLevel == TASK_LEVEL__SINK) {
125!
UNCOV
328
            streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
329
          }
330

331
          streamQueueProcessFail(pQueue);
125✔
332
          return EXEC_CONTINUE;
125✔
333
        }
334

335
        *pInput = newRet;
464,794✔
336
      }
337

338
      *numOfBlocks += 1;
519,573✔
339
      streamQueueProcessSuccess(pQueue);
519,573✔
340

341
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
519,583✔
342
        stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
12,377✔
343

344
        *blockSize = streamQueueItemGetSize(*pInput);
12,377✔
345
        if (taskLevel == TASK_LEVEL__SINK) {
12,378✔
346
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
105✔
347
        }
348

349
        return EXEC_CONTINUE;
12,378✔
350
      }
351
    }
352
  }
353
}
354

355
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
547,165✔
356
  int8_t      type = pItem->type;
547,165✔
357
  STaosQueue* pQueue = pTask->inputq.queue->pQueue;
547,165✔
358
  int32_t     level = pTask->info.taskLevel;
547,165✔
359
  int32_t     total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
547,165✔
360

361
  if (type == STREAM_INPUT__DATA_SUBMIT) {
547,190✔
362
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
470,347✔
363
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
470,347!
364
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
46✔
365
      stTrace(
46!
366
          "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
367
          pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
368
      streamDataSubmitDestroy(px);
46✔
369
      return TSDB_CODE_STREAM_INPUTQ_FULL;
46✔
370
    }
371

372
    int32_t msgLen = px->submit.msgLen;
470,304✔
373
    int64_t ver = px->submit.ver;
470,304✔
374

375
    int32_t code = taosWriteQitem(pQueue, pItem);
470,304✔
376
    if (code != TSDB_CODE_SUCCESS) {
470,298!
UNCOV
377
      streamDataSubmitDestroy(px);
×
UNCOV
378
      return code;
×
379
    }
380

381
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
470,298✔
382

383
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
384
    stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
470,309✔
385
            msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
386
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
124,077✔
387
    if (streamQueueIsFull(pTask->inputq.queue)) {
47,241!
UNCOV
388
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
389

UNCOV
390
      stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
×
391
              pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
UNCOV
392
      streamFreeQitem(pItem);
×
393
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
394
    }
395

396
    int32_t code = taosWriteQitem(pQueue, pItem);
47,234✔
397
    if (code != TSDB_CODE_SUCCESS) {
47,234!
398
      streamFreeQitem(pItem);
×
UNCOV
399
      return code;
×
400
    }
401

402
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
47,234✔
403
    stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
47,234✔
404
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
29,602✔
405
             type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__RECALCULATE) {
30,133!
406

407
    int32_t code = 0;
25,003✔
408
    if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
31,555✔
409
      STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
6,545✔
410
      code = taosWriteQitem(pChkptQ, pItem);
6,545✔
411

412
      double  size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
6,553✔
413
      int32_t num = taosQueueItemSize(pChkptQ);
6,551✔
414

415
      stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
6,552✔
416
              pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
417
    } else {
418
      code = taosWriteQitem(pQueue, pItem);
18,458✔
419
      if (code != TSDB_CODE_SUCCESS) {
18,459!
UNCOV
420
        streamFreeQitem(pItem);
×
UNCOV
421
        return code;
×
422
      }
423

424
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
18,459✔
425
      stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
18,459✔
426
              pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
427
    }
428
  } else if (type == STREAM_INPUT__GET_RES) {
4,599!
429
    // use the default memory limit, refactor later.
430
    int32_t code = taosWriteQitem(pQueue, pItem);
4,599✔
431
    if (code != TSDB_CODE_SUCCESS) {
4,599!
UNCOV
432
      streamFreeQitem(pItem);
×
UNCOV
433
      return code;
×
434
    }
435

436
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
4,599✔
437
    stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
4,599✔
438
  } else {
UNCOV
439
    stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
×
UNCOV
440
    return TSDB_CODE_INVALID_PARA;
×
441
  }
442

443
  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
547,149!
444
      type != STREAM_INPUT__RECALCULATE && (pTask->info.delaySchedParam != 0)) {
527,115✔
445
    (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
12,088✔
446
                                        TASK_TRIGGER_STATUS__MAY_ACTIVE);
447
    stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
12,088✔
448
            pTask->schedInfo.status);
449
  }
450

451
  return 0;
547,134✔
452
}
453

454
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
2,208✔
455
  int32_t           code = 0;
2,208✔
456
  SStreamDataBlock* pTranstate = NULL;
2,208✔
457
  SSDataBlock*      pBlock = NULL;
2,208✔
458

459
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate);
2,208✔
460
  if (code) {
2,208!
UNCOV
461
    return code;
×
462
  }
463

464
  pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
2,208!
465
  if (pBlock == NULL) {
2,207!
466
    code = terrno;
×
UNCOV
467
    goto _err;
×
468
  }
469

470
  pTranstate->type = STREAM_INPUT__TRANS_STATE;
2,207✔
471

472
  pBlock->info.type = STREAM_TRANS_STATE;
2,207✔
473
  pBlock->info.rows = 1;
2,207✔
474
  pBlock->info.childId = pTask->info.selfChildId;
2,207✔
475

476
  pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
2,207✔
477
  if (pTranstate->blocks == NULL) {
2,208!
UNCOV
478
    code = terrno;
×
UNCOV
479
    goto _err;
×
480
  }
481

482
  void* p = taosArrayPush(pTranstate->blocks, pBlock);
2,208✔
483
  if (p == NULL) {
2,208!
484
    code = terrno;
×
UNCOV
485
    goto _err;
×
486
  }
487

488
  taosMemoryFree(pBlock);
2,208!
489
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
2,208!
490
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
491
    goto _err;
×
492
  }
493

494
  pTask->status.appendTranstateBlock = true;
2,208✔
495
  return TSDB_CODE_SUCCESS;
2,208✔
496

UNCOV
497
_err:
×
UNCOV
498
  taosMemoryFree(pBlock);
×
UNCOV
499
  taosFreeQitem(pTranstate);
×
UNCOV
500
  return code;
×
501
}
502

503
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
504
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
22,921✔
505
  STaosQueue* pQueue = pTask->outputq.queue->pQueue;
22,921✔
506
  int32_t     code = taosWriteQitem(pQueue, pBlock);
22,921✔
507

508
  int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
22,925✔
509
  double  size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
22,924✔
510
  if (code != 0) {
22,922!
UNCOV
511
    stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
×
512
            pTask->id.idStr, total + 1, size, tstrerror(code));
513
  } else {
514
    if (streamQueueIsFull(pTask->outputq.queue)) {
22,922!
UNCOV
515
      stWarn(
×
516
          "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms "
517
          "after handle this batch of blocks",
518
          pTask->id.idStr, total, size);
519
    } else {
520
      stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
22,925✔
521
    }
522
  }
523

524
  return code;
22,921✔
525
}
526

527
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
14,694✔
528
                                  const char* id) {
529
  if (numCap < 10 || numRate < 10 || pBucket == NULL) {
14,694!
UNCOV
530
    stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
×
UNCOV
531
    return TSDB_CODE_INVALID_PARA;
×
532
  }
533

534
  pBucket->numCapacity = numCap;
14,695✔
535
  pBucket->numOfToken = numCap;
14,695✔
536
  pBucket->numRate = numRate;
14,695✔
537

538
  pBucket->quotaRate = quotaRate;
14,695✔
539
  pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
14,695✔
540
  pBucket->quotaRemain = pBucket->quotaCapacity;
14,695✔
541

542
  pBucket->tokenFillTimestamp = taosGetTimestampMs();
14,689✔
543
  pBucket->quotaFillTimestamp = taosGetTimestampMs();
14,671✔
544
  stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
14,671✔
545
  return TSDB_CODE_SUCCESS;
14,668✔
546
}
547

548
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
55,530✔
549
  int64_t now = taosGetTimestampMs();
55,524✔
550

551
  int64_t deltaToken = now - pBucket->tokenFillTimestamp;
55,524✔
552
  if (pBucket->numOfToken < 0) {
55,524!
UNCOV
553
    return;
×
554
  }
555

556
  int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
55,524✔
557
  if (incNum > 0) {
55,524✔
558
    pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
29,100✔
559
    pBucket->tokenFillTimestamp = now;
29,100✔
560
  }
561

562
  // increase the new available quota as time goes on
563
  int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
55,524✔
564
  double  incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
55,524✔
565
  if (incSize > 0) {
55,524✔
566
    pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
36,886✔
567
    pBucket->quotaFillTimestamp = now;
36,886✔
568
  }
569

570
  if (incNum > 0 || incSize > 0) {
55,524✔
571
    stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
36,861✔
572
            ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
573
            pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
574
  }
575
}
576

577
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
55,545✔
578
  fillTokenBucket(pBucket, id);
55,545✔
579

580
  if (pBucket->numOfToken > 0) {
55,532!
581
    if (pBucket->quotaRemain > 0) {
55,532!
582
      pBucket->numOfToken -= 1;
55,532✔
583
      return true;
55,532✔
584
    } else {  // no available size quota now
UNCOV
585
      return false;
×
586
    }
587
  } else {
UNCOV
588
    return false;
×
589
  }
590
}
591

592
void streamTaskPutbackToken(STokenBucket* pBucket) {
90,343✔
593
  pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
90,343✔
594
}
90,343✔
595

596
// size in KB
597
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); }
15,811✔
598

UNCOV
599
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc