• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3629

04 Mar 2025 01:45PM UTC coverage: 63.692% (-0.1%) from 63.79%
#3629

push

travis-ci

web-flow
Merge pull request #30007 from taosdata/revert-29951-docs/update-exception-handling-strategy

Revert "docs: update exception handling strategy"

149369 of 300378 branches covered (49.73%)

Branch coverage included in aggregate %.

233614 of 300930 relevant lines covered (77.63%)

18792670.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.31
/source/libs/stream/src/streamQueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
#define MAX_STREAM_EXEC_BATCH_NUM 32
19
#define MAX_SMOOTH_BURST_RATIO    5  // 5 sec
20

21
// todo refactor:
22
// read data from input queue
23
typedef struct SQueueReader {
24
  SStreamQueue* pQueue;
25
  int32_t       taskLevel;
26
  int32_t       maxBlocks;     // maximum block in one batch
27
  int32_t       waitDuration;  // maximum wait time to format several block into a batch to process, unit: ms
28
} SQueueReader;
29

30
#define streamQueueCurItem(_q) ((_q)->qItem)
31

32
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
33
static void streamTaskPutbackToken(STokenBucket* pBucket);
34
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
35
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
36

37
static void streamQueueCleanup(SStreamQueue* pQueue) {
28,673✔
38
  SStreamQueueItem* qItem = NULL;
28,673✔
39
  while (1) {
40
    streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, "");
29,640✔
41
    if (qItem == NULL) {
29,642✔
42
      break;
28,678✔
43
    }
44
    streamFreeQitem(qItem);
964✔
45
  }
46
  pQueue->status = STREAM_QUEUE__SUCESS;
28,678✔
47
}
28,678✔
48

49
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
28,617✔
50
  *pQ = NULL;
28,617✔
51

52
  int32_t code = 0;
28,617✔
53
  int32_t lino = 0;
28,617✔
54

55
  SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
28,617!
56
  if (pQueue == NULL) {
28,680!
57
    return terrno;
×
58
  }
59

60
  code = taosOpenQueue(&pQueue->pQueue);
28,680✔
61
  TSDB_CHECK_CODE(code, lino, _error);
28,652!
62

63
  code = taosAllocateQall(&pQueue->qall);
28,652✔
64
  TSDB_CHECK_CODE(code, lino, _error);
28,681!
65

66
  code = taosOpenQueue(&pQueue->pChkptQueue);
28,681✔
67
  TSDB_CHECK_CODE(code, lino, _error);
28,661!
68

69
  pQueue->status = STREAM_QUEUE__SUCESS;
28,661✔
70

71
  taosSetQueueCapacity(pQueue->pQueue, cap);
28,661✔
72
  taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
28,643✔
73

74
  *pQ = pQueue;
28,621✔
75
  return code;
28,621✔
76

77
_error:
×
78
  streamQueueClose(pQueue, 0);
×
79
  stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
×
80
  return code;
×
81
}
82

83
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
28,676✔
84
  stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue,
28,676✔
85
          taosQueueItemSize(pQueue->pQueue));
86
  streamQueueCleanup(pQueue);
28,676✔
87

88
  taosFreeQall(pQueue->qall);
28,676✔
89
  taosCloseQueue(pQueue->pQueue);
28,674✔
90
  pQueue->pQueue = NULL;
28,685✔
91

92
  taosCloseQueue(pQueue->pChkptQueue);
28,685✔
93
  pQueue->pChkptQueue = NULL;
28,691✔
94

95
  taosMemoryFree(pQueue);
28,691!
96
}
28,692✔
97

98
void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
145,594✔
99
  *pItem = NULL;
145,594✔
100
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
145,594✔
101

102
  if (flag == STREAM_QUEUE__FAILED) {
145,628✔
103
    *pItem = streamQueueCurItem(pQueue);
559✔
104
  } else {
105
    pQueue->qItem = NULL;
145,069✔
106
    (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
145,069✔
107

108
    if (pQueue->qItem == NULL) {
145,059✔
109
      (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
109,585✔
110
      (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
109,589✔
111
    }
112

113
    *pItem = streamQueueCurItem(pQueue);
145,057✔
114
  }
115
}
145,616✔
116

117
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
668,215✔
118
  *pItem = NULL;
668,215✔
119
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
668,215✔
120

121
  if (flag == STREAM_QUEUE__CHKPTFAILED) {
668,222!
122
    *pItem = pQueue->qChkptItem;
×
123
    return;
×
124
  }
125

126
  if (flag == STREAM_QUEUE__FAILED) {
668,222✔
127
    *pItem = pQueue->qItem;
2,713✔
128
    return;
2,713✔
129
  }
130

131
  pQueue->qChkptItem = NULL;
665,509✔
132
  taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
665,509✔
133
  if (pQueue->qChkptItem != NULL) {
665,656✔
134
    stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
6,544✔
135
    *pItem = pQueue->qChkptItem;
6,483✔
136
    return;
6,483✔
137
  }
138

139
  // if in checkpoint status, not read data from ordinary input q.
140
  if (status == TASK_STATUS__CK) {
659,112✔
141
    stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
3,007✔
142
    return;
3,007✔
143
  }
144

145
  // let's try the ordinary input q
146
  pQueue->qItem = NULL;
656,105✔
147
  int32_t num = taosGetQitem(pQueue->qall, &pQueue->qItem);
656,105✔
148

149
  if (pQueue->qItem == NULL) {
656,027✔
150
    num = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
105,480✔
151
    num = taosGetQitem(pQueue->qall, &pQueue->qItem);
105,527✔
152
  }
153

154
  *pItem = streamQueueCurItem(pQueue);
656,067✔
155
}
156

157
void streamQueueProcessSuccess(SStreamQueue* queue) {
613,291✔
158
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
613,291!
159
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
160
    return;
×
161
  }
162

163
  queue->qItem = NULL;
613,274✔
164
  queue->qChkptItem = NULL;
613,274✔
165
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
613,274✔
166
}
167

168
void streamQueueProcessFail(SStreamQueue* queue) {
3,271✔
169
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
3,271!
170
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
171
    return;
×
172
  }
173
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
3,272✔
174
}
175

176
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
×
177
  if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
×
178
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
×
179
    return;
×
180
  }
181
  atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
×
182
}
183

184
bool streamQueueIsFull(const SStreamQueue* pQueue) {
1,090,905✔
185
  int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
1,090,905✔
186
  if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
1,091,021✔
187
    return true;
45✔
188
  }
189

190
  return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
1,090,976✔
191
}
192

193
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
2,177,905✔
194
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
2,177,905✔
195
  int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
2,179,557✔
196

197
  return numOfItems1 + numOfItems2;
2,179,259✔
198
}
199

200
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
53,363✔
201
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
53,363✔
202
  int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
53,365✔
203

204
  return numOfItems1 + numOfItems2;
53,364✔
205
}
206

207
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
67,895✔
208
  return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
67,895✔
209
}
210

211
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
613,257✔
212
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
613,257✔
213
  return p->dataSize;
613,257✔
214
}
215

216
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
559,722✔
217
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
559,722✔
218
  p->dataSize += size;
559,722✔
219
}
559,722✔
220

221
const char* streamQueueItemGetTypeStr(int32_t type) {
77,197✔
222
  switch (type) {
77,197✔
223
    case STREAM_INPUT__CHECKPOINT:
4,968✔
224
      return "checkpoint";
4,968✔
225
    case STREAM_INPUT__CHECKPOINT_TRIGGER:
19,317✔
226
      return "checkpoint-trigger";
19,317✔
227
    case STREAM_INPUT__TRANS_STATE:
15,838✔
228
      return "trans-state";
15,838✔
229
    case STREAM_INPUT__REF_DATA_BLOCK:
6,114✔
230
      return "ref-block";
6,114✔
231
    default:
30,960✔
232
      return "datablock";
30,960✔
233
  }
234
}
235

236
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
139,017✔
237
                                             int32_t* blockSize) {
238
  const char*   id = pTask->id.idStr;
139,017✔
239
  int32_t       taskLevel = pTask->info.taskLevel;
139,017✔
240
  SStreamQueue* pQueue = pTask->inputq.queue;
139,017✔
241

242
  *pInput = NULL;
139,017✔
243
  *numOfBlocks = 0;
139,017✔
244
  *blockSize = 0;
139,017✔
245

246
  // no available token in bucket for sink task, let's wait for a little bit
247
  if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
139,017✔
248
    stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
265!
249
    return EXEC_AFTER_IDLE;
265✔
250
  }
251

252
  while (1) {
597,376✔
253
    ETaskStatus status = streamTaskGetStatus(pTask).state;
736,117✔
254
    if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
736,057!
255
      stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
18!
256
      return EXEC_CONTINUE;
138,767✔
257
    }
258

259
    SStreamQueueItem* qItem = NULL;
736,039✔
260
    if (taskLevel == TASK_LEVEL__SOURCE) {
736,039✔
261
      streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
638,565✔
262
    } else {
263
      streamQueueNextItem(pQueue, &qItem);
97,474✔
264
    }
265

266
    if (qItem == NULL) {
736,164✔
267
      // restore the token to bucket
268
      if (*numOfBlocks > 0) {
92,846✔
269
        *blockSize = streamQueueItemGetSize(*pInput);
34,350✔
270
        if (taskLevel == TASK_LEVEL__SINK) {
34,344✔
271
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
11,536✔
272
        }
273
      } else {
274
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
58,496✔
275
      }
276

277
      return EXEC_CONTINUE;
92,819✔
278
    }
279

280
    // do not merge blocks for sink node and check point data block
281
    int8_t type = qItem->type;
643,318✔
282
    if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
643,318✔
283
        type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK) {
618,133✔
284
      const char* p = streamQueueItemGetTypeStr(type);
29,932✔
285

286
      if (*pInput == NULL) {
29,898✔
287
        stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
26,744✔
288

289
        // restore the token to bucket in case of checkpoint/trans-state msg
290
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
26,744✔
291
        *blockSize = 0;
26,736✔
292
        *numOfBlocks = 1;
26,736✔
293
        *pInput = qItem;
26,736✔
294
        return EXEC_CONTINUE;
26,736✔
295
      } else {  // previous existed blocks needs to be handled, before handle the checkpoint msg block
296
        stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
3,154✔
297
        *blockSize = streamQueueItemGetSize(*pInput);
3,154✔
298
        if (taskLevel == TASK_LEVEL__SINK) {
3,155✔
299
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
548✔
300
        }
301

302
        if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
3,155!
303
            (taskLevel == TASK_LEVEL__SOURCE)) {
304
          streamQueueGetSourceChkptFailed(pQueue);
×
305
        } else {
306
          streamQueueProcessFail(pQueue);
3,155✔
307
        }
308
        return EXEC_CONTINUE;
3,155✔
309
      }
310
    } else {
311
      if (*pInput == NULL) {
613,386✔
312
        *pInput = qItem;
53,527✔
313
      } else { // merge current block failed, let's handle the already merged blocks.
314
        void*   newRet = NULL;
559,859✔
315
        int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
559,859✔
316
        if (newRet == NULL) {
559,890✔
317
          if (code != -1) {
117!
318
            stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
×
319
                    tstrerror(code));
320
          }
321

322
          *blockSize = streamQueueItemGetSize(*pInput);
117✔
323
          if (taskLevel == TASK_LEVEL__SINK) {
117!
324
            streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
325
          }
326

327
          streamQueueProcessFail(pQueue);
117✔
328
          return EXEC_CONTINUE;
117✔
329
        }
330

331
        *pInput = newRet;
559,773✔
332
      }
333

334
      *numOfBlocks += 1;
613,300✔
335
      streamQueueProcessSuccess(pQueue);
613,300✔
336

337
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
613,303✔
338
        stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
15,927✔
339

340
        *blockSize = streamQueueItemGetSize(*pInput);
15,927✔
341
        if (taskLevel == TASK_LEVEL__SINK) {
15,919✔
342
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
406✔
343
        }
344

345
        return EXEC_CONTINUE;
15,919✔
346
      }
347
    }
348
  }
349
}
350

351
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
641,169✔
352
  int8_t      type = pItem->type;
641,169✔
353
  STaosQueue* pQueue = pTask->inputq.queue->pQueue;
641,169✔
354
  int32_t     level = pTask->info.taskLevel;
641,169✔
355
  int32_t     total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
641,169✔
356

357
  if (type == STREAM_INPUT__DATA_SUBMIT) {
641,185✔
358
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
565,095✔
359
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
565,095✔
360
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
154✔
361
      stTrace(
154!
362
          "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
363
          pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
364
      streamDataSubmitDestroy(px);
154✔
365
      return TSDB_CODE_STREAM_INPUTQ_FULL;
154✔
366
    }
367

368
    int32_t msgLen = px->submit.msgLen;
564,942✔
369
    int64_t ver = px->submit.ver;
564,942✔
370

371
    int32_t code = taosWriteQitem(pQueue, pItem);
564,942✔
372
    if (code != TSDB_CODE_SUCCESS) {
564,943!
373
      streamDataSubmitDestroy(px);
×
374
      return code;
×
375
    }
376

377
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
564,943✔
378

379
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
380
    stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
564,946✔
381
            msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
382
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
122,801✔
383
    if (streamQueueIsFull(pTask->inputq.queue)) {
46,710!
384
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
385

386
      stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
×
387
              pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
388
      streamFreeQitem(pItem);
×
389
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
390
    }
391

392
    int32_t code = taosWriteQitem(pQueue, pItem);
46,711✔
393
    if (code != TSDB_CODE_SUCCESS) {
46,711!
394
      streamFreeQitem(pItem);
×
395
      return code;
×
396
    }
397

398
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
46,711✔
399
    stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
46,711✔
400
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
29,380✔
401
             type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) {
29,884✔
402

403
    int32_t code = 0;
24,778✔
404
    if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
31,259✔
405
      STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
6,481✔
406
      code = taosWriteQitem(pChkptQ, pItem);
6,481✔
407

408
      double  size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
6,482✔
409
      int32_t num = taosQueueItemSize(pChkptQ);
6,482✔
410

411
      stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
6,481✔
412
              pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
413
    } else {
414
      code = taosWriteQitem(pQueue, pItem);
18,297✔
415
      if (code != TSDB_CODE_SUCCESS) {
18,299!
416
        streamFreeQitem(pItem);
×
417
        return code;
×
418
      }
419

420
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
18,299✔
421
      stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
18,300✔
422
              pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
423
    }
424
  } else if (type == STREAM_INPUT__GET_RES) {
4,602!
425
    // use the default memory limit, refactor later.
426
    int32_t code = taosWriteQitem(pQueue, pItem);
4,602✔
427
    if (code != TSDB_CODE_SUCCESS) {
4,602!
428
      streamFreeQitem(pItem);
×
429
      return code;
×
430
    }
431

432
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
4,602✔
433
    stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
4,602✔
434
  } else {
435
    stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
×
436
    return TSDB_CODE_INVALID_PARA;
×
437
  }
438

439
  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
641,038✔
440
      (pTask->info.delaySchedParam != 0)) {
621,178✔
441
    (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
12,045✔
442
                                        TASK_TRIGGER_STATUS__MAY_ACTIVE);
443
    stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
12,046✔
444
            pTask->schedInfo.status);
445
  }
446

447
  return 0;
641,021✔
448
}
449

450
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
2,213✔
451
  int32_t           code = 0;
2,213✔
452
  SStreamDataBlock* pTranstate = NULL;
2,213✔
453
  SSDataBlock*      pBlock = NULL;
2,213✔
454

455
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate);
2,213✔
456
  if (code) {
2,213!
457
    return code;
×
458
  }
459

460
  pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
2,213!
461
  if (pBlock == NULL) {
2,212!
462
    code = terrno;
×
463
    goto _err;
×
464
  }
465

466
  pTranstate->type = STREAM_INPUT__TRANS_STATE;
2,212✔
467

468
  pBlock->info.type = STREAM_TRANS_STATE;
2,212✔
469
  pBlock->info.rows = 1;
2,212✔
470
  pBlock->info.childId = pTask->info.selfChildId;
2,212✔
471

472
  pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
2,212✔
473
  if (pTranstate->blocks == NULL) {
2,212!
474
    code = terrno;
×
475
    goto _err;
×
476
  }
477

478
  void* p = taosArrayPush(pTranstate->blocks, pBlock);
2,212✔
479
  if (p == NULL) {
2,211!
480
    code = terrno;
×
481
    goto _err;
×
482
  }
483

484
  taosMemoryFree(pBlock);
2,211!
485
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
2,213!
486
    code = TSDB_CODE_OUT_OF_MEMORY;
×
487
    goto _err;
×
488
  }
489

490
  pTask->status.appendTranstateBlock = true;
2,211✔
491
  return TSDB_CODE_SUCCESS;
2,211✔
492

493
_err:
×
494
  taosMemoryFree(pBlock);
×
495
  taosFreeQitem(pTranstate);
×
496
  return code;
×
497
}
498

499
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
500
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
22,273✔
501
  STaosQueue* pQueue = pTask->outputq.queue->pQueue;
22,273✔
502
  int32_t     code = taosWriteQitem(pQueue, pBlock);
22,273✔
503

504
  int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
22,273✔
505
  double  size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
22,273✔
506
  if (code != 0) {
22,273!
507
    stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
×
508
            pTask->id.idStr, total + 1, size, tstrerror(code));
509
  } else {
510
    if (streamQueueIsFull(pTask->outputq.queue)) {
22,273!
511
      stWarn(
×
512
          "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms "
513
          "after handle this batch of blocks",
514
          pTask->id.idStr, total, size);
515
    } else {
516
      stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
22,273✔
517
    }
518
  }
519

520
  return code;
22,273✔
521
}
522

523
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
14,329✔
524
                                  const char* id) {
525
  if (numCap < 10 || numRate < 10 || pBucket == NULL) {
14,329!
526
    stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
×
527
    return TSDB_CODE_INVALID_PARA;
×
528
  }
529

530
  pBucket->numCapacity = numCap;
14,334✔
531
  pBucket->numOfToken = numCap;
14,334✔
532
  pBucket->numRate = numRate;
14,334✔
533

534
  pBucket->quotaRate = quotaRate;
14,334✔
535
  pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
14,334✔
536
  pBucket->quotaRemain = pBucket->quotaCapacity;
14,334✔
537

538
  pBucket->tokenFillTimestamp = taosGetTimestampMs();
14,321✔
539
  pBucket->quotaFillTimestamp = taosGetTimestampMs();
14,319✔
540
  stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
14,319✔
541
  return TSDB_CODE_SUCCESS;
14,317✔
542
}
543

544
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
48,918✔
545
  int64_t now = taosGetTimestampMs();
48,911✔
546

547
  int64_t deltaToken = now - pBucket->tokenFillTimestamp;
48,911✔
548
  if (pBucket->numOfToken < 0) {
48,911!
549
    return;
×
550
  }
551

552
  int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
48,911✔
553
  if (incNum > 0) {
48,911✔
554
    pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
26,101✔
555
    pBucket->tokenFillTimestamp = now;
26,101✔
556
  }
557

558
  // increase the new available quota as time goes on
559
  int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
48,911✔
560
  double  incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
48,911✔
561
  if (incSize > 0) {
48,911✔
562
    pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
32,247✔
563
    pBucket->quotaFillTimestamp = now;
32,247✔
564
  }
565

566
  if (incNum > 0 || incSize > 0) {
48,911✔
567
    stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
32,230✔
568
            ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
569
            pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
570
  }
571
}
572

573
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
48,917✔
574
  fillTokenBucket(pBucket, id);
48,917✔
575

576
  if (pBucket->numOfToken > 0) {
48,914✔
577
    if (pBucket->quotaRemain > 0) {
48,649!
578
      pBucket->numOfToken -= 1;
48,649✔
579
      return true;
48,649✔
580
    } else {  // no available size quota now
581
      return false;
×
582
    }
583
  } else {
584
    return false;
265✔
585
  }
586
}
587

588
void streamTaskPutbackToken(STokenBucket* pBucket) {
85,199✔
589
  pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
85,199✔
590
}
85,199✔
591

592
// size in KB
593
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); }
12,489✔
594

595
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc