• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4131

20 May 2025 07:22AM UTC coverage: 63.096% (+0.7%) from 62.384%
#4131

push

travis-ci

web-flow
docs(datain): add topic meta options docs in tmq (#31147)

157751 of 318088 branches covered (49.59%)

Branch coverage included in aggregate %.

243052 of 317143 relevant lines covered (76.64%)

18743283.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.91
/source/libs/stream/src/streamQueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
#define MAX_STREAM_EXEC_BATCH_NUM 32
19
#define MAX_SMOOTH_BURST_RATIO    5  // 5 sec
20

21
// todo refactor:
22
// read data from input queue
23
typedef struct SQueueReader {
24
  SStreamQueue* pQueue;
25
  int32_t       taskLevel;
26
  int32_t       maxBlocks;     // maximum block in one batch
27
  int32_t       waitDuration;  // maximum wait time to format several block into a batch to process, unit: ms
28
} SQueueReader;
29

30
#define streamQueueCurItem(_q) ((_q)->qItem)
31

32
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
33
static void streamTaskPutbackToken(STokenBucket* pBucket);
34
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
35
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
36

37
static void streamQueueCleanup(SStreamQueue* pQueue) {
30,314✔
38
  SStreamQueueItem* qItem = NULL;
30,314✔
39
  while (1) {
40
    streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, "");
30,595✔
41
    if (qItem == NULL) {
30,631✔
42
      break;
30,350✔
43
    }
44
    streamFreeQitem(qItem);
281✔
45
  }
46
  pQueue->status = STREAM_QUEUE__SUCESS;
30,350✔
47
}
30,350✔
48

49
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
30,297✔
50
  *pQ = NULL;
30,297✔
51

52
  int32_t code = 0;
30,297✔
53
  int32_t lino = 0;
30,297✔
54

55
  SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
30,297!
56
  if (pQueue == NULL) {
30,352!
57
    return terrno;
×
58
  }
59

60
  code = taosOpenQueue(&pQueue->pQueue);
30,352✔
61
  TSDB_CHECK_CODE(code, lino, _error);
30,323!
62

63
  code = taosAllocateQall(&pQueue->qall);
30,323✔
64
  TSDB_CHECK_CODE(code, lino, _error);
30,356!
65

66
  code = taosOpenQueue(&pQueue->pChkptQueue);
30,356✔
67
  TSDB_CHECK_CODE(code, lino, _error);
30,347!
68

69
  pQueue->status = STREAM_QUEUE__SUCESS;
30,347✔
70

71
  taosSetQueueCapacity(pQueue->pQueue, cap);
30,347✔
72
  taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
30,344✔
73

74
  *pQ = pQueue;
30,337✔
75
  return code;
30,337✔
76

77
_error:
×
78
  streamQueueClose(pQueue, 0);
×
79
  stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
×
80
  return code;
×
81
}
82

83
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
30,316✔
84
  stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue,
30,316✔
85
          taosQueueItemSize(pQueue->pQueue));
86
  streamQueueCleanup(pQueue);
30,316✔
87

88
  taosFreeQall(pQueue->qall);
30,331✔
89
  taosCloseQueue(pQueue->pQueue);
30,349✔
90
  pQueue->pQueue = NULL;
30,353✔
91

92
  taosCloseQueue(pQueue->pChkptQueue);
30,353✔
93
  pQueue->pChkptQueue = NULL;
30,356✔
94

95
  taosMemoryFree(pQueue);
30,356!
96
}
30,355✔
97

98
void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
94,628✔
99
  *pItem = NULL;
94,628✔
100
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
94,628✔
101

102
  if (flag == STREAM_QUEUE__FAILED) {
94,683✔
103
    *pItem = streamQueueCurItem(pQueue);
479✔
104
  } else {
105
    pQueue->qItem = NULL;
94,204✔
106
    (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
94,204✔
107

108
    if (pQueue->qItem == NULL) {
94,180✔
109
      (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
86,153✔
110
      (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
86,178✔
111
    }
112

113
    *pItem = streamQueueCurItem(pQueue);
94,190✔
114
  }
115
}
94,669✔
116

117
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
239,664✔
118
  *pItem = NULL;
239,664✔
119
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
239,664✔
120

121
  if (flag == STREAM_QUEUE__CHKPTFAILED) {
239,761!
122
    *pItem = pQueue->qChkptItem;
×
123
    return;
×
124
  }
125

126
  if (flag == STREAM_QUEUE__FAILED) {
239,761✔
127
    *pItem = pQueue->qItem;
2,853✔
128
    return;
2,853✔
129
  }
130

131
  pQueue->qChkptItem = NULL;
236,908✔
132
  taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
236,908✔
133
  if (pQueue->qChkptItem != NULL) {
236,941✔
134
    stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
5,316✔
135
    *pItem = pQueue->qChkptItem;
5,274✔
136
    return;
5,274✔
137
  }
138

139
  // if in checkpoint status, not read data from ordinary input q.
140
  if (status == TASK_STATUS__CK) {
231,625✔
141
    stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
2,489✔
142
    return;
2,489✔
143
  }
144

145
  // let's try the ordinary input q
146
  pQueue->qItem = NULL;
229,136✔
147
  int32_t num = taosGetQitem(pQueue->qall, &pQueue->qItem);
229,136✔
148
  TAOS_UNUSED(num);
149

150
  if (pQueue->qItem == NULL) {
229,146✔
151
    num = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
105,841✔
152
    num = taosGetQitem(pQueue->qall, &pQueue->qItem);
105,920✔
153
    TAOS_UNUSED(num);
154
  }
155

156
  *pItem = streamQueueCurItem(pQueue);
229,164✔
157
}
158

159
void streamQueueProcessSuccess(SStreamQueue* queue) {
159,839✔
160
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
159,839!
161
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
162
    return;
×
163
  }
164

165
  queue->qItem = NULL;
159,832✔
166
  queue->qChkptItem = NULL;
159,832✔
167
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
159,832✔
168
}
169

170
void streamQueueProcessFail(SStreamQueue* queue) {
3,329✔
171
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
3,329!
172
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
173
    return;
×
174
  }
175
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
3,329✔
176
}
177

178
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
×
179
  if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
×
180
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
×
181
    return;
×
182
  }
183
  atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
×
184
}
185

186
bool streamQueueIsFull(const SStreamQueue* pQueue) {
524,268✔
187
  int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
524,268✔
188
  if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
525,064!
189
    return true;
×
190
  }
191

192
  return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
525,064✔
193
}
194

195
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
1,038,324✔
196
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
1,038,324✔
197
  int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
1,041,934✔
198

199
  return numOfItems1 + numOfItems2;
1,041,640✔
200
}
201

202
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
34,588✔
203
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
34,588✔
204
  int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
34,588✔
205

206
  return numOfItems1 + numOfItems2;
34,587✔
207
}
208

209
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
62,290✔
210
  return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
62,290✔
211
}
212

213
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
159,814✔
214
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
159,814✔
215
  return p->dataSize;
159,814✔
216
}
217

218
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
120,014✔
219
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
120,014✔
220
  p->dataSize += size;
120,014✔
221
}
120,014✔
222

223
const char* streamQueueItemGetTypeStr(int32_t type) {
51,365✔
224
  switch (type) {
51,365!
225
    case STREAM_INPUT__CHECKPOINT:
3,671✔
226
      return "checkpoint";
3,671✔
227
    case STREAM_INPUT__CHECKPOINT_TRIGGER:
13,624✔
228
      return "checkpoint-trigger";
13,624✔
229
    case STREAM_INPUT__TRANS_STATE:
12,132✔
230
      return "trans-state";
12,132✔
231
    case STREAM_INPUT__REF_DATA_BLOCK:
5,930✔
232
      return "ref-block";
5,930✔
233
    case STREAM_INPUT__RECALCULATE:
×
234
      return "recalculate";
×
235
    default:
16,008✔
236
      return "datablock";
16,008✔
237
  }
238
}
239

240
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
115,274✔
241
                                             int32_t* blockSize) {
242
  const char*   id = pTask->id.idStr;
115,274✔
243
  int32_t       taskLevel = pTask->info.taskLevel;
115,274✔
244
  SStreamQueue* pQueue = pTask->inputq.queue;
115,274✔
245

246
  *pInput = NULL;
115,274✔
247
  *numOfBlocks = 0;
115,274✔
248
  *blockSize = 0;
115,274✔
249

250
  // no available token in bucket for sink task, let's wait for a little bit
251
  if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
115,274!
252
    stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
×
253
    return EXEC_AFTER_IDLE;
×
254
  }
255

256
  while (1) {
157,886✔
257
    ETaskStatus status = streamTaskGetStatus(pTask).state;
273,175✔
258
    if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
273,058!
259
      stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
×
260
      return EXEC_CONTINUE;
115,311✔
261
    }
262

263
    SStreamQueueItem* qItem = NULL;
273,121✔
264
    if (taskLevel == TASK_LEVEL__SOURCE) {
273,121✔
265
      streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
209,091✔
266
    } else {
267
      streamQueueNextItem(pQueue, &qItem);
64,030✔
268
    }
269

270
    if (qItem == NULL) {
273,241✔
271
      // restore the token to bucket
272
      if (*numOfBlocks > 0) {
87,153✔
273
        *blockSize = streamQueueItemGetSize(*pInput);
34,494✔
274
        if (taskLevel == TASK_LEVEL__SINK) {
34,491✔
275
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
10,610✔
276
        }
277
      } else {
278
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
52,659✔
279
      }
280

281
      return EXEC_CONTINUE;
87,100✔
282
    }
283

284
    // do not merge blocks for sink node and check point data block
285
    int8_t type = qItem->type;
186,088✔
286
    if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
186,088✔
287
        type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK || type == STREAM_INPUT__RECALCULATE) {
165,019✔
288
      const char* p = streamQueueItemGetTypeStr(type);
26,051✔
289

290
      if (*pInput == NULL) {
26,033✔
291
        stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
22,890✔
292

293
        // restore the token to bucket in case of checkpoint/trans-state msg
294
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
22,890✔
295
        *blockSize = 0;
22,890✔
296
        *numOfBlocks = 1;
22,890✔
297
        *pInput = qItem;
22,890✔
298
        return EXEC_CONTINUE;
22,890✔
299
      } else {  // previous existed blocks needs to be handled, before handle the checkpoint msg block
300
        stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
3,143✔
301
        *blockSize = streamQueueItemGetSize(*pInput);
3,143✔
302
        if (taskLevel == TASK_LEVEL__SINK) {
3,144✔
303
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
456✔
304
        }
305

306
        if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
3,144!
307
            (taskLevel == TASK_LEVEL__SOURCE)) {
308
          streamQueueGetSourceChkptFailed(pQueue);
×
309
        } else {
310
          streamQueueProcessFail(pQueue);
3,144✔
311
        }
312
        return EXEC_CONTINUE;
3,145✔
313
      }
314
    } else {
315
      if (*pInput == NULL) {
160,037✔
316
        *pInput = qItem;
39,798✔
317
      } else { // merge current block failed, let's handle the already merged blocks.
318
        void*   newRet = NULL;
120,239✔
319
        int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
120,239✔
320
        if (newRet == NULL) {
120,244✔
321
          if (code != -1) {
185!
322
            stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
×
323
                    tstrerror(code));
324
          }
325

326
          *blockSize = streamQueueItemGetSize(*pInput);
185✔
327
          if (taskLevel == TASK_LEVEL__SINK) {
186!
328
            streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
329
          }
330

331
          streamQueueProcessFail(pQueue);
186✔
332
          return EXEC_CONTINUE;
186✔
333
        }
334

335
        *pInput = newRet;
120,059✔
336
      }
337

338
      *numOfBlocks += 1;
159,857✔
339
      streamQueueProcessSuccess(pQueue);
159,857✔
340

341
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
159,877✔
342
        stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
1,991✔
343

344
        *blockSize = streamQueueItemGetSize(*pInput);
1,991✔
345
        if (taskLevel == TASK_LEVEL__SINK) {
1,990✔
346
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
4✔
347
        }
348

349
        return EXEC_CONTINUE;
1,990✔
350
      }
351
    }
352
  }
353
}
354

355
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
182,858✔
356
  int8_t      type = pItem->type;
182,858✔
357
  STaosQueue* pQueue = pTask->inputq.queue->pQueue;
182,858✔
358
  int32_t     level = pTask->info.taskLevel;
182,858✔
359
  int32_t     total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
182,858✔
360

361
  if (type == STREAM_INPUT__DATA_SUBMIT) {
182,970✔
362
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
137,155✔
363
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
137,155✔
364
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
2✔
365
      stTrace(
2!
366
          "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
367
          pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
368
      streamDataSubmitDestroy(px);
2✔
369
      return TSDB_CODE_STREAM_INPUTQ_FULL;
2✔
370
    }
371

372
    int32_t msgLen = px->submit.msgLen;
137,159✔
373
    int64_t ver = px->submit.ver;
137,159✔
374

375
    int32_t code = taosWriteQitem(pQueue, pItem);
137,159✔
376
    if (code != TSDB_CODE_SUCCESS) {
137,154!
377
      streamDataSubmitDestroy(px);
×
378
      return code;
×
379
    }
380

381
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
137,154✔
382

383
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
384
    stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
137,155✔
385
            msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
386
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
65,330✔
387
    if (streamQueueIsFull(pTask->inputq.queue)) {
19,538!
388
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
389

390
      stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
×
391
              pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
392
      streamFreeQitem(pItem);
×
393
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
394
    }
395

396
    int32_t code = taosWriteQitem(pQueue, pItem);
19,520✔
397
    if (code != TSDB_CODE_SUCCESS) {
19,514!
398
      streamFreeQitem(pItem);
×
399
      return code;
×
400
    }
401

402
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
19,514✔
403
    stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
19,521✔
404
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
26,277✔
405
             type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__RECALCULATE) {
26,819!
406

407
    int32_t code = 0;
20,882✔
408
    if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
26,147✔
409
      STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
5,260✔
410
      code = taosWriteQitem(pChkptQ, pItem);
5,260✔
411

412
      double  size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
5,264✔
413
      int32_t num = taosQueueItemSize(pChkptQ);
5,271✔
414

415
      stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
5,267✔
416
              pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
417
    } else {
418
      code = taosWriteQitem(pQueue, pItem);
15,622✔
419
      if (code != TSDB_CODE_SUCCESS) {
15,620!
420
        streamFreeQitem(pItem);
×
421
        return code;
×
422
      }
423

424
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
15,620✔
425
      stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
15,640✔
426
              pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
427
    }
428
  } else if (type == STREAM_INPUT__GET_RES) {
5,395!
429
    // use the default memory limit, refactor later.
430
    int32_t code = taosWriteQitem(pQueue, pItem);
5,395✔
431
    if (code != TSDB_CODE_SUCCESS) {
5,395!
432
      streamFreeQitem(pItem);
×
433
      return code;
×
434
    }
435

436
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
5,395✔
437
    stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
5,395✔
438
  } else {
439
    stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
×
440
    return TSDB_CODE_INVALID_PARA;
×
441
  }
442

443
  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
182,947!
444
      type != STREAM_INPUT__RECALCULATE && (pTask->info.delaySchedParam != 0)) {
165,241✔
445
    (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
8,462✔
446
                                        TASK_TRIGGER_STATUS__MAY_ACTIVE);
447
    stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
8,462✔
448
            pTask->schedInfo.status);
449
  }
450

451
  return 0;
182,956✔
452
}
453

454
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
2,046✔
455
  int32_t           code = 0;
2,046✔
456
  SStreamDataBlock* pTranstate = NULL;
2,046✔
457
  SSDataBlock*      pBlock = NULL;
2,046✔
458

459
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate);
2,046✔
460
  if (code) {
2,046!
461
    return code;
×
462
  }
463

464
  pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
2,046!
465
  if (pBlock == NULL) {
2,046!
466
    code = terrno;
×
467
    goto _err;
×
468
  }
469

470
  pTranstate->type = STREAM_INPUT__TRANS_STATE;
2,046✔
471

472
  pBlock->info.type = STREAM_TRANS_STATE;
2,046✔
473
  pBlock->info.rows = 1;
2,046✔
474
  pBlock->info.childId = pTask->info.selfChildId;
2,046✔
475

476
  pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
2,046✔
477
  if (pTranstate->blocks == NULL) {
2,045!
478
    code = terrno;
×
479
    goto _err;
×
480
  }
481

482
  void* p = taosArrayPush(pTranstate->blocks, pBlock);
2,045✔
483
  if (p == NULL) {
2,046!
484
    code = terrno;
×
485
    goto _err;
×
486
  }
487

488
  taosMemoryFree(pBlock);
2,046!
489
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
2,046!
490
    code = TSDB_CODE_OUT_OF_MEMORY;
×
491
    goto _err;
×
492
  }
493

494
  pTask->status.appendTranstateBlock = true;
2,045✔
495
  return TSDB_CODE_SUCCESS;
2,045✔
496

497
_err:
×
498
  taosMemoryFree(pBlock);
×
499
  taosFreeQitem(pTranstate);
×
500
  return code;
×
501
}
502

503
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
504
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
13,590✔
505
  STaosQueue* pQueue = pTask->outputq.queue->pQueue;
13,590✔
506
  int32_t     code = taosWriteQitem(pQueue, pBlock);
13,590✔
507

508
  int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
13,590✔
509
  double  size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
13,590✔
510
  if (code != 0) {
13,590!
511
    stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
×
512
            pTask->id.idStr, total + 1, size, tstrerror(code));
513
  } else {
514
    if (streamQueueIsFull(pTask->outputq.queue)) {
13,590!
515
      stWarn(
×
516
          "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms "
517
          "after handle this batch of blocks",
518
          pTask->id.idStr, total, size);
519
    } else {
520
      stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
13,590✔
521
    }
522
  }
523

524
  return code;
13,590✔
525
}
526

527
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
15,145✔
528
                                  const char* id) {
529
  if (numCap < 10 || numRate < 10 || pBucket == NULL) {
15,145!
530
    stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
×
531
    return TSDB_CODE_INVALID_PARA;
×
532
  }
533

534
  pBucket->numCapacity = numCap;
15,170✔
535
  pBucket->numOfToken = numCap;
15,170✔
536
  pBucket->numRate = numRate;
15,170✔
537

538
  pBucket->quotaRate = quotaRate;
15,170✔
539
  pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
15,170✔
540
  pBucket->quotaRemain = pBucket->quotaCapacity;
15,170✔
541

542
  pBucket->tokenFillTimestamp = taosGetTimestampMs();
15,172✔
543
  pBucket->quotaFillTimestamp = taosGetTimestampMs();
15,172✔
544
  stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
15,172✔
545
  return TSDB_CODE_SUCCESS;
15,172✔
546
}
547

548
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
41,764✔
549
  int64_t now = taosGetTimestampMs();
41,775✔
550

551
  int64_t deltaToken = now - pBucket->tokenFillTimestamp;
41,775✔
552
  if (pBucket->numOfToken < 0) {
41,775!
553
    return;
×
554
  }
555

556
  int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
41,775✔
557
  if (incNum > 0) {
41,775✔
558
    pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
21,953✔
559
    pBucket->tokenFillTimestamp = now;
21,953✔
560
  }
561

562
  // increase the new available quota as time goes on
563
  int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
41,775✔
564
  double  incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
41,775✔
565
  if (incSize > 0) {
41,775✔
566
    pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
25,896✔
567
    pBucket->quotaFillTimestamp = now;
25,896✔
568
  }
569

570
  if (incNum > 0 || incSize > 0) {
41,775✔
571
    stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
25,878✔
572
            ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
573
            pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
574
  }
575
}
576

577
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
41,773✔
578
  fillTokenBucket(pBucket, id);
41,773✔
579

580
  if (pBucket->numOfToken > 0) {
41,790!
581
    if (pBucket->quotaRemain > 0) {
41,794!
582
      pBucket->numOfToken -= 1;
41,794✔
583
      return true;
41,794✔
584
    } else {  // no available size quota now
585
      return false;
×
586
    }
587
  } else {
588
    return false;
×
589
  }
590
}
591

592
void streamTaskPutbackToken(STokenBucket* pBucket) {
75,484✔
593
  pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
75,484✔
594
}
75,484✔
595

596
// size in KB
597
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); }
11,070✔
598

599
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc