• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4106

19 May 2025 07:15AM UTC coverage: 62.857% (-0.2%) from 63.042%
#4106

push

travis-ci

GitHub
Merge pull request #31115 from taosdata/merge/mainto3.0

156749 of 318088 branches covered (49.28%)

Branch coverage included in aggregate %.

242535 of 317143 relevant lines covered (76.47%)

18746393.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.08
/source/libs/stream/src/streamQueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
#define MAX_STREAM_EXEC_BATCH_NUM 32
19
#define MAX_SMOOTH_BURST_RATIO    5  // 5 sec
20

21
// todo refactor:
22
// read data from input queue
23
typedef struct SQueueReader {
24
  SStreamQueue* pQueue;
25
  int32_t       taskLevel;
26
  int32_t       maxBlocks;     // maximum block in one batch
27
  int32_t       waitDuration;  // maximum wait time to format several block into a batch to process, unit: ms
28
} SQueueReader;
29

30
#define streamQueueCurItem(_q) ((_q)->qItem)
31

32
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
33
static void streamTaskPutbackToken(STokenBucket* pBucket);
34
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
35
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
36

37
static void streamQueueCleanup(SStreamQueue* pQueue) {
24,909✔
38
  SStreamQueueItem* qItem = NULL;
24,909✔
39
  while (1) {
40
    streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, "");
25,050✔
41
    if (qItem == NULL) {
25,090✔
42
      break;
24,948✔
43
    }
44
    streamFreeQitem(qItem);
142✔
45
  }
46
  pQueue->status = STREAM_QUEUE__SUCESS;
24,948✔
47
}
24,948✔
48

49
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
24,911✔
50
  *pQ = NULL;
24,911✔
51

52
  int32_t code = 0;
24,911✔
53
  int32_t lino = 0;
24,911✔
54

55
  SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
24,911!
56
  if (pQueue == NULL) {
24,939!
57
    return terrno;
×
58
  }
59

60
  code = taosOpenQueue(&pQueue->pQueue);
24,939✔
61
  TSDB_CHECK_CODE(code, lino, _error);
24,917!
62

63
  code = taosAllocateQall(&pQueue->qall);
24,917✔
64
  TSDB_CHECK_CODE(code, lino, _error);
24,941!
65

66
  code = taosOpenQueue(&pQueue->pChkptQueue);
24,941✔
67
  TSDB_CHECK_CODE(code, lino, _error);
24,938!
68

69
  pQueue->status = STREAM_QUEUE__SUCESS;
24,938✔
70

71
  taosSetQueueCapacity(pQueue->pQueue, cap);
24,938✔
72
  taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
24,935✔
73

74
  *pQ = pQueue;
24,937✔
75
  return code;
24,937✔
76

77
_error:
×
78
  streamQueueClose(pQueue, 0);
×
79
  stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
×
80
  return code;
×
81
}
82

83
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
24,917✔
84
  stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue,
24,917✔
85
          taosQueueItemSize(pQueue->pQueue));
86
  streamQueueCleanup(pQueue);
24,917✔
87

88
  taosFreeQall(pQueue->qall);
24,926✔
89
  taosCloseQueue(pQueue->pQueue);
24,941✔
90
  pQueue->pQueue = NULL;
24,941✔
91

92
  taosCloseQueue(pQueue->pChkptQueue);
24,941✔
93
  pQueue->pChkptQueue = NULL;
24,946✔
94

95
  taosMemoryFree(pQueue);
24,946!
96
}
24,950✔
97

98
void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
102,321✔
99
  *pItem = NULL;
102,321✔
100
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
102,321✔
101

102
  if (flag == STREAM_QUEUE__FAILED) {
102,374✔
103
    *pItem = streamQueueCurItem(pQueue);
407✔
104
  } else {
105
    pQueue->qItem = NULL;
101,967✔
106
    (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
101,967✔
107

108
    if (pQueue->qItem == NULL) {
101,956✔
109
      (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
91,592✔
110
      (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
91,606✔
111
    }
112

113
    *pItem = streamQueueCurItem(pQueue);
101,954✔
114
  }
115
}
102,361✔
116

117
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
310,730✔
118
  *pItem = NULL;
310,730✔
119
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
310,730✔
120

121
  if (flag == STREAM_QUEUE__CHKPTFAILED) {
311,192!
122
    *pItem = pQueue->qChkptItem;
×
123
    return;
×
124
  }
125

126
  if (flag == STREAM_QUEUE__FAILED) {
311,192✔
127
    *pItem = pQueue->qItem;
2,357✔
128
    return;
2,357✔
129
  }
130

131
  pQueue->qChkptItem = NULL;
308,835✔
132
  taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
308,835✔
133
  if (pQueue->qChkptItem != NULL) {
308,644✔
134
    stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
10,012✔
135
    *pItem = pQueue->qChkptItem;
9,927✔
136
    return;
9,927✔
137
  }
138

139
  // if in checkpoint status, not read data from ordinary input q.
140
  if (status == TASK_STATUS__CK) {
298,632✔
141
    stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
4,263✔
142
    return;
4,263✔
143
  }
144

145
  // let's try the ordinary input q
146
  pQueue->qItem = NULL;
294,369✔
147
  int32_t num = taosGetQitem(pQueue->qall, &pQueue->qItem);
294,369✔
148
  TAOS_UNUSED(num);
149

150
  if (pQueue->qItem == NULL) {
294,449✔
151
    num = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
110,465✔
152
    num = taosGetQitem(pQueue->qall, &pQueue->qItem);
110,529✔
153
    TAOS_UNUSED(num);
154
  }
155

156
  *pItem = streamQueueCurItem(pQueue);
294,560✔
157
}
158

159
void streamQueueProcessSuccess(SStreamQueue* queue) {
221,466✔
160
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
221,466!
161
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
162
    return;
×
163
  }
164

165
  queue->qItem = NULL;
221,459✔
166
  queue->qChkptItem = NULL;
221,459✔
167
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
221,459✔
168
}
169

170
void streamQueueProcessFail(SStreamQueue* queue) {
2,764✔
171
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
2,764!
172
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
173
    return;
×
174
  }
175
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
2,764✔
176
}
177

178
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
×
179
  if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
×
180
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
×
181
    return;
×
182
  }
183
  atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
×
184
}
185

186
bool streamQueueIsFull(const SStreamQueue* pQueue) {
1,051,070✔
187
  int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
1,051,070✔
188
  if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
1,053,289✔
189
    return true;
6✔
190
  }
191

192
  return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
1,053,283✔
193
}
194

195
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
2,100,696✔
196
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
2,100,696✔
197
  int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
2,106,749✔
198

199
  return numOfItems1 + numOfItems2;
2,104,969✔
200
}
201

202
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
38,659✔
203
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
38,659✔
204
  int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
38,660✔
205

206
  return numOfItems1 + numOfItems2;
38,659✔
207
}
208

209
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
156,288✔
210
  return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
156,288✔
211
}
212

213
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
221,445✔
214
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
221,445✔
215
  return p->dataSize;
221,445✔
216
}
217

218
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
181,966✔
219
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
181,966✔
220
  p->dataSize += size;
181,966✔
221
}
181,966✔
222

223
const char* streamQueueItemGetTypeStr(int32_t type) {
59,648✔
224
  switch (type) {
59,648!
225
    case STREAM_INPUT__CHECKPOINT:
6,584✔
226
      return "checkpoint";
6,584✔
227
    case STREAM_INPUT__CHECKPOINT_TRIGGER:
19,193✔
228
      return "checkpoint-trigger";
19,193✔
229
    case STREAM_INPUT__TRANS_STATE:
10,554✔
230
      return "trans-state";
10,554✔
231
    case STREAM_INPUT__REF_DATA_BLOCK:
4,938✔
232
      return "ref-block";
4,938✔
233
    case STREAM_INPUT__RECALCULATE:
×
234
      return "recalculate";
×
235
    default:
18,379✔
236
      return "datablock";
18,379✔
237
  }
238
}
239

240
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
137,986✔
241
                                             int32_t* blockSize) {
242
  const char*   id = pTask->id.idStr;
137,986✔
243
  int32_t       taskLevel = pTask->info.taskLevel;
137,986✔
244
  SStreamQueue* pQueue = pTask->inputq.queue;
137,986✔
245

246
  *pInput = NULL;
137,986✔
247
  *numOfBlocks = 0;
137,986✔
248
  *blockSize = 0;
137,986✔
249

250
  // no available token in bucket for sink task, let's wait for a little bit
251
  if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
137,986!
252
    stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
×
253
    return EXEC_AFTER_IDLE;
×
254
  }
255

256
  while (1) {
217,397✔
257
    ETaskStatus status = streamTaskGetStatus(pTask).state;
355,395✔
258
    if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
355,387!
259
      stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
×
260
      return EXEC_CONTINUE;
138,148✔
261
    }
262

263
    SStreamQueueItem* qItem = NULL;
355,566✔
264
    if (taskLevel == TASK_LEVEL__SOURCE) {
355,566✔
265
      streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
285,983✔
266
    } else {
267
      streamQueueNextItem(pQueue, &qItem);
69,583✔
268
    }
269

270
    if (qItem == NULL) {
355,726✔
271
      // restore the token to bucket
272
      if (*numOfBlocks > 0) {
102,166✔
273
        *blockSize = streamQueueItemGetSize(*pInput);
32,585✔
274
        if (taskLevel == TASK_LEVEL__SINK) {
32,585✔
275
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
10,144✔
276
        }
277
      } else {
278
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
69,581✔
279
      }
280

281
      return EXEC_CONTINUE;
101,968✔
282
    }
283

284
    // do not merge blocks for sink node and check point data block
285
    int8_t type = qItem->type;
253,560✔
286
    if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
253,560✔
287
        type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK || type == STREAM_INPUT__RECALCULATE) {
225,670✔
288
      const char* p = streamQueueItemGetTypeStr(type);
31,878✔
289

290
      if (*pInput == NULL) {
31,860✔
291
        stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
29,288✔
292

293
        // restore the token to bucket in case of checkpoint/trans-state msg
294
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
29,288✔
295
        *blockSize = 0;
29,289✔
296
        *numOfBlocks = 1;
29,289✔
297
        *pInput = qItem;
29,289✔
298
        return EXEC_CONTINUE;
29,289✔
299
      } else {  // previous existed blocks needs to be handled, before handle the checkpoint msg block
300
        stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
2,572✔
301
        *blockSize = streamQueueItemGetSize(*pInput);
2,572✔
302
        if (taskLevel == TASK_LEVEL__SINK) {
2,577✔
303
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
392✔
304
        }
305

306
        if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
2,577!
307
            (taskLevel == TASK_LEVEL__SOURCE)) {
308
          streamQueueGetSourceChkptFailed(pQueue);
×
309
        } else {
310
          streamQueueProcessFail(pQueue);
2,577✔
311
        }
312
        return EXEC_CONTINUE;
2,578✔
313
      }
314
    } else {
315
      if (*pInput == NULL) {
221,682✔
316
        *pInput = qItem;
39,463✔
317
      } else { // merge current block failed, let's handle the already merged blocks.
318
        void*   newRet = NULL;
182,219✔
319
        int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
182,219✔
320
        if (newRet == NULL) {
182,211✔
321
          if (code != -1) {
188!
322
            stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
×
323
                    tstrerror(code));
324
          }
325

326
          *blockSize = streamQueueItemGetSize(*pInput);
188✔
327
          if (taskLevel == TASK_LEVEL__SINK) {
186!
328
            streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
329
          }
330

331
          streamQueueProcessFail(pQueue);
186✔
332
          return EXEC_CONTINUE;
186✔
333
        }
334

335
        *pInput = newRet;
182,023✔
336
      }
337

338
      *numOfBlocks += 1;
221,486✔
339
      streamQueueProcessSuccess(pQueue);
221,486✔
340

341
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
221,522✔
342
        stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
4,125✔
343

344
        *blockSize = streamQueueItemGetSize(*pInput);
4,125✔
345
        if (taskLevel == TASK_LEVEL__SINK) {
4,123✔
346
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
4✔
347
        }
348

349
        return EXEC_CONTINUE;
4,123✔
350
      }
351
    }
352
  }
353
}
354

355
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
250,773✔
356
  int8_t      type = pItem->type;
250,773✔
357
  STaosQueue* pQueue = pTask->inputq.queue->pQueue;
250,773✔
358
  int32_t     level = pTask->info.taskLevel;
250,773✔
359
  int32_t     total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
250,773✔
360

361
  if (type == STREAM_INPUT__DATA_SUBMIT) {
250,903✔
362
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
197,189✔
363
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
197,189✔
364
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
30✔
365
      stTrace(
30!
366
          "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
367
          pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
368
      streamDataSubmitDestroy(px);
30✔
369
      return TSDB_CODE_STREAM_INPUTQ_FULL;
30✔
370
    }
371

372
    int32_t msgLen = px->submit.msgLen;
197,164✔
373
    int64_t ver = px->submit.ver;
197,164✔
374

375
    int32_t code = taosWriteQitem(pQueue, pItem);
197,164✔
376
    if (code != TSDB_CODE_SUCCESS) {
197,166!
377
      streamDataSubmitDestroy(px);
×
378
      return code;
×
379
    }
380

381
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
197,166✔
382

383
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
384
    stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
197,170✔
385
            msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
386
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
74,393✔
387
    if (streamQueueIsFull(pTask->inputq.queue)) {
20,717!
388
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
389

390
      stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
×
391
              pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
392
      streamFreeQitem(pItem);
×
393
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
394
    }
395

396
    int32_t code = taosWriteQitem(pQueue, pItem);
20,683✔
397
    if (code != TSDB_CODE_SUCCESS) {
20,679!
398
      streamFreeQitem(pItem);
×
399
      return code;
×
400
    }
401

402
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
20,679✔
403
    stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
20,682✔
404
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
32,997✔
405
             type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__RECALCULATE) {
33,503!
406

407
    int32_t code = 0;
27,679✔
408
    if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
37,582✔
409
      STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
9,899✔
410
      code = taosWriteQitem(pChkptQ, pItem);
9,899✔
411

412
      double  size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
9,912✔
413
      int32_t num = taosQueueItemSize(pChkptQ);
9,925✔
414

415
      stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
9,907✔
416
              pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
417
    } else {
418
      code = taosWriteQitem(pQueue, pItem);
17,780✔
419
      if (code != TSDB_CODE_SUCCESS) {
17,776!
420
        streamFreeQitem(pItem);
×
421
        return code;
×
422
      }
423

424
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
17,776✔
425
      stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
17,794✔
426
              pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
427
    }
428
  } else if (type == STREAM_INPUT__GET_RES) {
5,318!
429
    // use the default memory limit, refactor later.
430
    int32_t code = taosWriteQitem(pQueue, pItem);
5,318✔
431
    if (code != TSDB_CODE_SUCCESS) {
5,318!
432
      streamFreeQitem(pItem);
×
433
      return code;
×
434
    }
435

436
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
5,318✔
437
    stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
5,318✔
438
  } else {
439
    stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
×
440
    return TSDB_CODE_INVALID_PARA;
×
441
  }
442

443
  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
250,841!
444
      type != STREAM_INPUT__RECALCULATE && (pTask->info.delaySchedParam != 0)) {
224,864✔
445
    (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
8,541✔
446
                                        TASK_TRIGGER_STATUS__MAY_ACTIVE);
447
    stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
8,541✔
448
            pTask->schedInfo.status);
449
  }
450

451
  return 0;
250,847✔
452
}
453

454
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
1,667✔
455
  int32_t           code = 0;
1,667✔
456
  SStreamDataBlock* pTranstate = NULL;
1,667✔
457
  SSDataBlock*      pBlock = NULL;
1,667✔
458

459
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate);
1,667✔
460
  if (code) {
1,667!
461
    return code;
×
462
  }
463

464
  pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
1,667!
465
  if (pBlock == NULL) {
1,667!
466
    code = terrno;
×
467
    goto _err;
×
468
  }
469

470
  pTranstate->type = STREAM_INPUT__TRANS_STATE;
1,667✔
471

472
  pBlock->info.type = STREAM_TRANS_STATE;
1,667✔
473
  pBlock->info.rows = 1;
1,667✔
474
  pBlock->info.childId = pTask->info.selfChildId;
1,667✔
475

476
  pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
1,667✔
477
  if (pTranstate->blocks == NULL) {
1,667!
478
    code = terrno;
×
479
    goto _err;
×
480
  }
481

482
  void* p = taosArrayPush(pTranstate->blocks, pBlock);
1,667✔
483
  if (p == NULL) {
1,667!
484
    code = terrno;
×
485
    goto _err;
×
486
  }
487

488
  taosMemoryFree(pBlock);
1,667!
489
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
1,667!
490
    code = TSDB_CODE_OUT_OF_MEMORY;
×
491
    goto _err;
×
492
  }
493

494
  pTask->status.appendTranstateBlock = true;
1,667✔
495
  return TSDB_CODE_SUCCESS;
1,667✔
496

497
_err:
×
498
  taosMemoryFree(pBlock);
×
499
  taosFreeQitem(pTranstate);
×
500
  return code;
×
501
}
502

503
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
504
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
13,513✔
505
  STaosQueue* pQueue = pTask->outputq.queue->pQueue;
13,513✔
506
  int32_t     code = taosWriteQitem(pQueue, pBlock);
13,513✔
507

508
  int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
13,515✔
509
  double  size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
13,514✔
510
  if (code != 0) {
13,515!
511
    stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
×
512
            pTask->id.idStr, total + 1, size, tstrerror(code));
513
  } else {
514
    if (streamQueueIsFull(pTask->outputq.queue)) {
13,515!
515
      stWarn(
×
516
          "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms "
517
          "after handle this batch of blocks",
518
          pTask->id.idStr, total, size);
519
    } else {
520
      stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
13,515✔
521
    }
522
  }
523

524
  return code;
13,512✔
525
}
526

527
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
12,458✔
528
                                  const char* id) {
529
  if (numCap < 10 || numRate < 10 || pBucket == NULL) {
12,458!
530
    stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
×
531
    return TSDB_CODE_INVALID_PARA;
×
532
  }
533

534
  pBucket->numCapacity = numCap;
12,470✔
535
  pBucket->numOfToken = numCap;
12,470✔
536
  pBucket->numRate = numRate;
12,470✔
537

538
  pBucket->quotaRate = quotaRate;
12,470✔
539
  pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
12,470✔
540
  pBucket->quotaRemain = pBucket->quotaCapacity;
12,470✔
541

542
  pBucket->tokenFillTimestamp = taosGetTimestampMs();
12,473✔
543
  pBucket->quotaFillTimestamp = taosGetTimestampMs();
12,474✔
544
  stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
12,474✔
545
  return TSDB_CODE_SUCCESS;
12,473✔
546
}
547

548
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
42,799✔
549
  int64_t now = taosGetTimestampMs();
42,805✔
550

551
  int64_t deltaToken = now - pBucket->tokenFillTimestamp;
42,805✔
552
  if (pBucket->numOfToken < 0) {
42,805!
553
    return;
×
554
  }
555

556
  int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
42,805✔
557
  if (incNum > 0) {
42,805✔
558
    pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
22,266✔
559
    pBucket->tokenFillTimestamp = now;
22,266✔
560
  }
561

562
  // increase the new available quota as time goes on
563
  int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
42,805✔
564
  double  incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
42,805✔
565
  if (incSize > 0) {
42,805✔
566
    pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
26,560✔
567
    pBucket->quotaFillTimestamp = now;
26,560✔
568
  }
569

570
  if (incNum > 0 || incSize > 0) {
42,805✔
571
    stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
26,542✔
572
            ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
573
            pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
574
  }
575
}
576

577
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
42,805✔
578
  fillTokenBucket(pBucket, id);
42,805✔
579

580
  if (pBucket->numOfToken > 0) {
42,815!
581
    if (pBucket->quotaRemain > 0) {
42,822!
582
      pBucket->numOfToken -= 1;
42,822✔
583
      return true;
42,822✔
584
    } else {  // no available size quota now
585
      return false;
×
586
    }
587
  } else {
588
    return false;
×
589
  }
590
}
591

592
void streamTaskPutbackToken(STokenBucket* pBucket) {
98,578✔
593
  pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
98,578✔
594
}
98,578✔
595

596
// size in KB
597
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); }
10,539✔
598

599
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc