• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4244

04 Jun 2025 05:45AM UTC coverage: 62.872% (-0.1%) from 62.995%
#4244

push

travis-ci

web-flow
add server port and mqttport in configuration file (#31277)

157271 of 319194 branches covered (49.27%)

Branch coverage included in aggregate %.

243262 of 317868 relevant lines covered (76.53%)

6692004.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.84
/source/libs/stream/src/streamQueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
#define MAX_STREAM_EXEC_BATCH_NUM 32
19
#define MAX_SMOOTH_BURST_RATIO    5  // 5 sec
20

21
// todo refactor:
22
// read data from input queue
23
typedef struct SQueueReader {
24
  SStreamQueue* pQueue;
25
  int32_t       taskLevel;
26
  int32_t       maxBlocks;     // maximum block in one batch
27
  int32_t       waitDuration;  // maximum wait time to format several block into a batch to process, unit: ms
28
} SQueueReader;
29

30
#define streamQueueCurItem(_q) ((_q)->qItem)
31

32
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
33
static void streamTaskPutbackToken(STokenBucket* pBucket);
34
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
35
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
36

37
static void streamQueueCleanup(SStreamQueue* pQueue) {
13,636✔
38
  SStreamQueueItem* qItem = NULL;
13,636✔
39
  while (1) {
40
    streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, "");
13,810✔
41
    if (qItem == NULL) {
13,810✔
42
      break;
13,636✔
43
    }
44
    streamFreeQitem(qItem);
174✔
45
  }
46
  pQueue->status = STREAM_QUEUE__SUCESS;
13,636✔
47
}
13,636✔
48

49
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
13,635✔
50
  *pQ = NULL;
13,635✔
51

52
  int32_t code = 0;
13,635✔
53
  int32_t lino = 0;
13,635✔
54

55
  SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
13,635!
56
  if (pQueue == NULL) {
13,636!
57
    return terrno;
×
58
  }
59

60
  code = taosOpenQueue(&pQueue->pQueue);
13,636✔
61
  TSDB_CHECK_CODE(code, lino, _error);
13,636!
62

63
  code = taosAllocateQall(&pQueue->qall);
13,636✔
64
  TSDB_CHECK_CODE(code, lino, _error);
13,636!
65

66
  code = taosOpenQueue(&pQueue->pChkptQueue);
13,636✔
67
  TSDB_CHECK_CODE(code, lino, _error);
13,636!
68

69
  pQueue->status = STREAM_QUEUE__SUCESS;
13,636✔
70

71
  taosSetQueueCapacity(pQueue->pQueue, cap);
13,636✔
72
  taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
13,636✔
73

74
  *pQ = pQueue;
13,636✔
75
  return code;
13,636✔
76

77
_error:
×
78
  streamQueueClose(pQueue, 0);
×
79
  stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
×
80
  return code;
×
81
}
82

83
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
13,636✔
84
  stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue,
13,636!
85
          taosQueueItemSize(pQueue->pQueue));
86
  streamQueueCleanup(pQueue);
13,636✔
87

88
  taosFreeQall(pQueue->qall);
13,636✔
89
  taosCloseQueue(pQueue->pQueue);
13,636✔
90
  pQueue->pQueue = NULL;
13,636✔
91

92
  taosCloseQueue(pQueue->pChkptQueue);
13,636✔
93
  pQueue->pChkptQueue = NULL;
13,636✔
94

95
  taosMemoryFree(pQueue);
13,636!
96
}
13,636✔
97

98
void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
40,996✔
99
  *pItem = NULL;
40,996✔
100
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
40,996✔
101

102
  if (flag == STREAM_QUEUE__FAILED) {
40,995✔
103
    *pItem = streamQueueCurItem(pQueue);
133✔
104
  } else {
105
    pQueue->qItem = NULL;
40,862✔
106
    (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
40,862✔
107

108
    if (pQueue->qItem == NULL) {
40,862✔
109
      (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
37,808✔
110
      (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
37,809✔
111
    }
112

113
    *pItem = streamQueueCurItem(pQueue);
40,863✔
114
  }
115
}
40,996✔
116

117
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
101,858✔
118
  *pItem = NULL;
101,858✔
119
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
101,858✔
120

121
  if (flag == STREAM_QUEUE__CHKPTFAILED) {
101,862!
122
    *pItem = pQueue->qChkptItem;
×
123
    return;
×
124
  }
125

126
  if (flag == STREAM_QUEUE__FAILED) {
101,862✔
127
    *pItem = pQueue->qItem;
1,315✔
128
    return;
1,315✔
129
  }
130

131
  pQueue->qChkptItem = NULL;
100,547✔
132
  taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
100,547✔
133
  if (pQueue->qChkptItem != NULL) {
100,547✔
134
    stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
1,672!
135
    *pItem = pQueue->qChkptItem;
1,672✔
136
    return;
1,672✔
137
  }
138

139
  // if in checkpoint status, not read data from ordinary input q.
140
  if (status == TASK_STATUS__CK) {
98,875✔
141
    stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
800!
142
    return;
800✔
143
  }
144

145
  // let's try the ordinary input q
146
  pQueue->qItem = NULL;
98,075✔
147
  int32_t num = taosGetQitem(pQueue->qall, &pQueue->qItem);
98,075✔
148
  TAOS_UNUSED(num);
149

150
  if (pQueue->qItem == NULL) {
98,075✔
151
    num = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
51,732✔
152
    num = taosGetQitem(pQueue->qall, &pQueue->qItem);
51,732✔
153
    TAOS_UNUSED(num);
154
  }
155

156
  *pItem = streamQueueCurItem(pQueue);
98,075✔
157
}
158

159
void streamQueueProcessSuccess(SStreamQueue* queue) {
64,641✔
160
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
64,641!
161
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
162
    return;
×
163
  }
164

165
  queue->qItem = NULL;
64,640✔
166
  queue->qChkptItem = NULL;
64,640✔
167
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
64,640✔
168
}
169

170
void streamQueueProcessFail(SStreamQueue* queue) {
1,448✔
171
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
1,448!
172
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
173
    return;
×
174
  }
175
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
1,448✔
176
}
177

178
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
×
179
  if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
×
180
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
×
181
    return;
×
182
  }
183
  atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
×
184
}
185

186
bool streamQueueIsFull(const SStreamQueue* pQueue) {
195,693✔
187
  int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
195,693✔
188
  if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
195,708!
189
    return true;
×
190
  }
191

192
  return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
195,708✔
193
}
194

195
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
386,106✔
196
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
386,106✔
197
  int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
386,186✔
198

199
  return numOfItems1 + numOfItems2;
386,164✔
200
}
201

202
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
15,464✔
203
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
15,464✔
204
  int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
15,464✔
205

206
  return numOfItems1 + numOfItems2;
15,464✔
207
}
208

209
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
22,092✔
210
  return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
22,092✔
211
}
212

213
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
64,641✔
214
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
64,641✔
215
  return p->dataSize;
64,641✔
216
}
217

218
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
45,064✔
219
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
45,064✔
220
  p->dataSize += size;
45,064✔
221
}
45,064✔
222

223
const char* streamQueueItemGetTypeStr(int32_t type) {
34,311✔
224
  switch (type) {
34,311!
225
    case STREAM_INPUT__CHECKPOINT:
1,737✔
226
      return "checkpoint";
1,737✔
227
    case STREAM_INPUT__CHECKPOINT_TRIGGER:
7,389✔
228
      return "checkpoint-trigger";
7,389✔
229
    case STREAM_INPUT__TRANS_STATE:
7,048✔
230
      return "trans-state";
7,048✔
231
    case STREAM_INPUT__REF_DATA_BLOCK:
2,760✔
232
      return "ref-block";
2,760✔
233
    case STREAM_INPUT__RECALCULATE:
×
234
      return "recalculate";
×
235
    default:
15,377✔
236
      return "datablock";
15,377✔
237
  }
238
}
239

240
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
50,813✔
241
                                             int32_t* blockSize) {
242
  const char*   id = pTask->id.idStr;
50,813✔
243
  int32_t       taskLevel = pTask->info.taskLevel;
50,813✔
244
  SStreamQueue* pQueue = pTask->inputq.queue;
50,813✔
245

246
  *pInput = NULL;
50,813✔
247
  *numOfBlocks = 0;
50,813✔
248
  *blockSize = 0;
50,813✔
249

250
  // no available token in bucket for sink task, let's wait for a little bit
251
  if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
50,813!
252
    stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
×
253
    return EXEC_AFTER_IDLE;
×
254
  }
255

256
  while (1) {
63,887✔
257
    ETaskStatus status = streamTaskGetStatus(pTask).state;
114,700✔
258
    if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
114,700!
259
      stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
×
260
      return EXEC_CONTINUE;
50,815✔
261
    }
262

263
    SStreamQueueItem* qItem = NULL;
114,700✔
264
    if (taskLevel == TASK_LEVEL__SOURCE) {
114,700✔
265
      streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
88,049✔
266
    } else {
267
      streamQueueNextItem(pQueue, &qItem);
26,651✔
268
    }
269

270
    if (qItem == NULL) {
114,703✔
271
      // restore the token to bucket
272
      if (*numOfBlocks > 0) {
40,233✔
273
        *blockSize = streamQueueItemGetSize(*pInput);
17,373✔
274
        if (taskLevel == TASK_LEVEL__SINK) {
17,372✔
275
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
4,673✔
276
        }
277
      } else {
278
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
22,860✔
279
      }
280

281
      return EXEC_CONTINUE;
40,233✔
282
    }
283

284
    // do not merge blocks for sink node and check point data block
285
    int8_t type = qItem->type;
74,470✔
286
    if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
74,470✔
287
        type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK || type == STREAM_INPUT__RECALCULATE) {
66,632!
288
      const char* p = streamQueueItemGetTypeStr(type);
9,634✔
289

290
      if (*pInput == NULL) {
9,634✔
291
        stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
8,379!
292

293
        // restore the token to bucket in case of checkpoint/trans-state msg
294
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
8,379✔
295
        *blockSize = 0;
8,379✔
296
        *numOfBlocks = 1;
8,379✔
297
        *pInput = qItem;
8,379✔
298
        return EXEC_CONTINUE;
8,379✔
299
      } else {  // previous existed blocks needs to be handled, before handle the checkpoint msg block
300
        stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
1,255!
301
        *blockSize = streamQueueItemGetSize(*pInput);
1,255✔
302
        if (taskLevel == TASK_LEVEL__SINK) {
1,255✔
303
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
120✔
304
        }
305

306
        if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
1,255!
307
            (taskLevel == TASK_LEVEL__SOURCE)) {
308
          streamQueueGetSourceChkptFailed(pQueue);
×
309
        } else {
310
          streamQueueProcessFail(pQueue);
1,255✔
311
        }
312
        return EXEC_CONTINUE;
1,255✔
313
      }
314
    } else {
315
      if (*pInput == NULL) {
64,836✔
316
        *pInput = qItem;
19,576✔
317
      } else { // merge current block failed, let's handle the already merged blocks.
318
        void*   newRet = NULL;
45,260✔
319
        int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
45,260✔
320
        if (newRet == NULL) {
45,259✔
321
          if (code != -1) {
193!
322
            stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
×
323
                    tstrerror(code));
324
          }
325

326
          *blockSize = streamQueueItemGetSize(*pInput);
193✔
327
          if (taskLevel == TASK_LEVEL__SINK) {
193!
328
            streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
329
          }
330

331
          streamQueueProcessFail(pQueue);
193✔
332
          return EXEC_CONTINUE;
193✔
333
        }
334

335
        *pInput = newRet;
45,066✔
336
      }
337

338
      *numOfBlocks += 1;
64,642✔
339
      streamQueueProcessSuccess(pQueue);
64,642✔
340

341
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
64,642✔
342
        stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
755!
343

344
        *blockSize = streamQueueItemGetSize(*pInput);
755✔
345
        if (taskLevel == TASK_LEVEL__SINK) {
755✔
346
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
3✔
347
        }
348

349
        return EXEC_CONTINUE;
755✔
350
      }
351
    }
352
  }
353
}
354

355
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
73,179✔
356
  int8_t      type = pItem->type;
73,179✔
357
  STaosQueue* pQueue = pTask->inputq.queue->pQueue;
73,179✔
358
  int32_t     level = pTask->info.taskLevel;
73,179✔
359
  int32_t     total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
73,179✔
360

361
  if (type == STREAM_INPUT__DATA_SUBMIT) {
73,181✔
362
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
51,899✔
363
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
51,899!
364
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
365
      stTrace(
×
366
          "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
367
          pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
368
      streamDataSubmitDestroy(px);
×
369
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
370
    }
371

372
    int32_t msgLen = px->submit.msgLen;
51,899✔
373
    int64_t ver = px->submit.ver;
51,899✔
374

375
    int32_t code = taosWriteQitem(pQueue, pItem);
51,899✔
376
    if (code != TSDB_CODE_SUCCESS) {
51,899!
377
      streamDataSubmitDestroy(px);
×
378
      return code;
×
379
    }
380

381
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
51,899✔
382

383
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
384
    stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
51,899!
385
            msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
386
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
29,536✔
387
    if (streamQueueIsFull(pTask->inputq.queue)) {
8,254!
388
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
389

390
      stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
×
391
              pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
392
      streamFreeQitem(pItem);
×
393
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
394
    }
395

396
    int32_t code = taosWriteQitem(pQueue, pItem);
8,253✔
397
    if (code != TSDB_CODE_SUCCESS) {
8,253!
398
      streamFreeQitem(pItem);
×
399
      return code;
×
400
    }
401

402
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
8,253✔
403
    stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
8,254!
404
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
13,028✔
405
             type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__RECALCULATE) {
13,432!
406

407
    int32_t code = 0;
7,862✔
408
    if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
9,534✔
409
      STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
1,672✔
410
      code = taosWriteQitem(pChkptQ, pItem);
1,672✔
411

412
      double  size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
1,672✔
413
      int32_t num = taosQueueItemSize(pChkptQ);
1,672✔
414

415
      stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
1,672!
416
              pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
417
    } else {
418
      code = taosWriteQitem(pQueue, pItem);
6,190✔
419
      if (code != TSDB_CODE_SUCCESS) {
6,190!
420
        streamFreeQitem(pItem);
×
421
        return code;
×
422
      }
423

424
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
6,190✔
425
      stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
6,190!
426
              pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
427
    }
428
  } else if (type == STREAM_INPUT__GET_RES) {
5,166!
429
    // use the default memory limit, refactor later.
430
    int32_t code = taosWriteQitem(pQueue, pItem);
5,166✔
431
    if (code != TSDB_CODE_SUCCESS) {
5,166!
432
      streamFreeQitem(pItem);
×
433
      return code;
×
434
    }
435

436
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
5,166✔
437
    stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
5,166!
438
  } else {
439
    stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
×
440
    return TSDB_CODE_INVALID_PARA;
×
441
  }
442

443
  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
73,181!
444
      type != STREAM_INPUT__RECALCULATE && (pTask->info.delaySchedParam != 0)) {
63,883✔
445
    (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
6,954✔
446
                                        TASK_TRIGGER_STATUS__MAY_ACTIVE);
447
    stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
6,954!
448
            pTask->schedInfo.status);
449
  }
450

451
  return 0;
73,181✔
452
}
453

454
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
845✔
455
  int32_t           code = 0;
845✔
456
  SStreamDataBlock* pTranstate = NULL;
845✔
457
  SSDataBlock*      pBlock = NULL;
845✔
458

459
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate);
845✔
460
  if (code) {
845!
461
    return code;
×
462
  }
463

464
  pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
845!
465
  if (pBlock == NULL) {
845!
466
    code = terrno;
×
467
    goto _err;
×
468
  }
469

470
  pTranstate->type = STREAM_INPUT__TRANS_STATE;
845✔
471

472
  pBlock->info.type = STREAM_TRANS_STATE;
845✔
473
  pBlock->info.rows = 1;
845✔
474
  pBlock->info.childId = pTask->info.selfChildId;
845✔
475

476
  pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
845✔
477
  if (pTranstate->blocks == NULL) {
845!
478
    code = terrno;
×
479
    goto _err;
×
480
  }
481

482
  void* p = taosArrayPush(pTranstate->blocks, pBlock);
845✔
483
  if (p == NULL) {
845!
484
    code = terrno;
×
485
    goto _err;
×
486
  }
487

488
  taosMemoryFree(pBlock);
845!
489
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
845!
490
    code = TSDB_CODE_OUT_OF_MEMORY;
×
491
    goto _err;
×
492
  }
493

494
  pTask->status.appendTranstateBlock = true;
845✔
495
  return TSDB_CODE_SUCCESS;
845✔
496

497
_err:
×
498
  taosMemoryFree(pBlock);
×
499
  taosFreeQitem(pTranstate);
×
500
  return code;
×
501
}
502

503
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
504
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
6,473✔
505
  STaosQueue* pQueue = pTask->outputq.queue->pQueue;
6,473✔
506
  int32_t     code = taosWriteQitem(pQueue, pBlock);
6,473✔
507

508
  int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
6,473✔
509
  double  size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
6,473✔
510
  if (code != 0) {
6,473!
511
    stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
×
512
            pTask->id.idStr, total + 1, size, tstrerror(code));
513
  } else {
514
    if (streamQueueIsFull(pTask->outputq.queue)) {
6,473!
515
      stWarn(
×
516
          "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms "
517
          "after handle this batch of blocks",
518
          pTask->id.idStr, total, size);
519
    } else {
520
      stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
6,473!
521
    }
522
  }
523

524
  return code;
6,473✔
525
}
526

527
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
6,818✔
528
                                  const char* id) {
529
  if (numCap < 10 || numRate < 10 || pBucket == NULL) {
6,818!
530
    stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
×
531
    return TSDB_CODE_INVALID_PARA;
×
532
  }
533

534
  pBucket->numCapacity = numCap;
6,818✔
535
  pBucket->numOfToken = numCap;
6,818✔
536
  pBucket->numRate = numRate;
6,818✔
537

538
  pBucket->quotaRate = quotaRate;
6,818✔
539
  pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
6,818✔
540
  pBucket->quotaRemain = pBucket->quotaCapacity;
6,818✔
541

542
  pBucket->tokenFillTimestamp = taosGetTimestampMs();
6,818✔
543
  pBucket->quotaFillTimestamp = taosGetTimestampMs();
6,818✔
544
  stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
6,818!
545
  return TSDB_CODE_SUCCESS;
6,818✔
546
}
547

548
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
17,008✔
549
  int64_t now = taosGetTimestampMs();
17,008✔
550

551
  int64_t deltaToken = now - pBucket->tokenFillTimestamp;
17,008✔
552
  if (pBucket->numOfToken < 0) {
17,008!
553
    return;
×
554
  }
555

556
  int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
17,008✔
557
  if (incNum > 0) {
17,008✔
558
    pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
8,991✔
559
    pBucket->tokenFillTimestamp = now;
8,991✔
560
  }
561

562
  // increase the new available quota as time goes on
563
  int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
17,008✔
564
  double  incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
17,008✔
565
  if (incSize > 0) {
17,008✔
566
    pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
12,128✔
567
    pBucket->quotaFillTimestamp = now;
12,128✔
568
  }
569

570
  if (incNum > 0 || incSize > 0) {
17,008✔
571
    stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
12,128✔
572
            ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
573
            pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
574
  }
575
}
576

577
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
17,008✔
578
  fillTokenBucket(pBucket, id);
17,008✔
579

580
  if (pBucket->numOfToken > 0) {
17,008!
581
    if (pBucket->quotaRemain > 0) {
17,008!
582
      pBucket->numOfToken -= 1;
17,008✔
583
      return true;
17,008✔
584
    } else {  // no available size quota now
585
      return false;
×
586
    }
587
  } else {
588
    return false;
×
589
  }
590
}
591

592
void streamTaskPutbackToken(STokenBucket* pBucket) {
31,239✔
593
  pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
31,239✔
594
}
31,239✔
595

596
// size in KB
597
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); }
4,796✔
598

599
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc