• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3630

06 Mar 2025 11:35AM UTC coverage: 63.629% (-0.06%) from 63.692%
#3630

push

travis-ci

web-flow
Merge pull request #30042 from taosdata/doc/internal

docs: format

149060 of 300532 branches covered (49.6%)

Branch coverage included in aggregate %.

233739 of 301077 relevant lines covered (77.63%)

17473135.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.76
/source/libs/stream/src/streamQueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
#define MAX_STREAM_EXEC_BATCH_NUM 32
19
#define MAX_SMOOTH_BURST_RATIO    5  // 5 sec
20

21
// todo refactor:
22
// read data from input queue
23
typedef struct SQueueReader {
24
  SStreamQueue* pQueue;
25
  int32_t       taskLevel;
26
  int32_t       maxBlocks;     // maximum block in one batch
27
  int32_t       waitDuration;  // maximum wait time to format several block into a batch to process, unit: ms
28
} SQueueReader;
29

30
#define streamQueueCurItem(_q) ((_q)->qItem)
31

32
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
33
static void streamTaskPutbackToken(STokenBucket* pBucket);
34
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
35
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
36

37
static void streamQueueCleanup(SStreamQueue* pQueue) {
28,848✔
38
  SStreamQueueItem* qItem = NULL;
28,848✔
39
  while (1) {
40
    streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, "");
29,142✔
41
    if (qItem == NULL) {
29,171✔
42
      break;
28,873✔
43
    }
44
    streamFreeQitem(qItem);
298✔
45
  }
46
  pQueue->status = STREAM_QUEUE__SUCESS;
28,873✔
47
}
28,873✔
48

49
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
28,838✔
50
  *pQ = NULL;
28,838✔
51

52
  int32_t code = 0;
28,838✔
53
  int32_t lino = 0;
28,838✔
54

55
  SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
28,838!
56
  if (pQueue == NULL) {
28,872!
57
    return terrno;
×
58
  }
59

60
  code = taosOpenQueue(&pQueue->pQueue);
28,872✔
61
  TSDB_CHECK_CODE(code, lino, _error);
28,863!
62

63
  code = taosAllocateQall(&pQueue->qall);
28,863✔
64
  TSDB_CHECK_CODE(code, lino, _error);
28,881!
65

66
  code = taosOpenQueue(&pQueue->pChkptQueue);
28,881✔
67
  TSDB_CHECK_CODE(code, lino, _error);
28,872!
68

69
  pQueue->status = STREAM_QUEUE__SUCESS;
28,872✔
70

71
  taosSetQueueCapacity(pQueue->pQueue, cap);
28,872✔
72
  taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
28,871✔
73

74
  *pQ = pQueue;
28,872✔
75
  return code;
28,872✔
76

77
_error:
×
78
  streamQueueClose(pQueue, 0);
×
79
  stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
×
80
  return code;
×
81
}
82

83
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
28,850✔
84
  stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue,
28,850✔
85
          taosQueueItemSize(pQueue->pQueue));
86
  streamQueueCleanup(pQueue);
28,850✔
87

88
  taosFreeQall(pQueue->qall);
28,875✔
89
  taosCloseQueue(pQueue->pQueue);
28,868✔
90
  pQueue->pQueue = NULL;
28,875✔
91

92
  taosCloseQueue(pQueue->pChkptQueue);
28,875✔
93
  pQueue->pChkptQueue = NULL;
28,882✔
94

95
  taosMemoryFree(pQueue);
28,882!
96
}
28,882✔
97

98
void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
153,181✔
99
  *pItem = NULL;
153,181✔
100
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
153,181✔
101

102
  if (flag == STREAM_QUEUE__FAILED) {
153,235✔
103
    *pItem = streamQueueCurItem(pQueue);
530✔
104
  } else {
105
    pQueue->qItem = NULL;
152,705✔
106
    (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
152,705✔
107

108
    if (pQueue->qItem == NULL) {
152,681✔
109
      (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
122,386✔
110
      (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
122,395✔
111
    }
112

113
    *pItem = streamQueueCurItem(pQueue);
152,680✔
114
  }
115
}
153,210✔
116

117
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
542,188✔
118
  *pItem = NULL;
542,188✔
119
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
542,188✔
120

121
  if (flag == STREAM_QUEUE__CHKPTFAILED) {
542,266!
122
    *pItem = pQueue->qChkptItem;
×
123
    return;
×
124
  }
125

126
  if (flag == STREAM_QUEUE__FAILED) {
542,266✔
127
    *pItem = pQueue->qItem;
2,609✔
128
    return;
2,609✔
129
  }
130

131
  pQueue->qChkptItem = NULL;
539,657✔
132
  taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
539,657✔
133
  if (pQueue->qChkptItem != NULL) {
539,678✔
134
    stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
6,175✔
135
    *pItem = pQueue->qChkptItem;
6,146✔
136
    return;
6,146✔
137
  }
138

139
  // if in checkpoint status, not read data from ordinary input q.
140
  if (status == TASK_STATUS__CK) {
533,503✔
141
    stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
2,885✔
142
    return;
2,885✔
143
  }
144

145
  // let's try the ordinary input q
146
  pQueue->qItem = NULL;
530,618✔
147
  int32_t num = taosGetQitem(pQueue->qall, &pQueue->qItem);
530,618✔
148

149
  if (pQueue->qItem == NULL) {
530,603✔
150
    num = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
105,713✔
151
    num = taosGetQitem(pQueue->qall, &pQueue->qItem);
105,717✔
152
  }
153

154
  *pItem = streamQueueCurItem(pQueue);
530,610✔
155
}
156

157
void streamQueueProcessSuccess(SStreamQueue* queue) {
488,213✔
158
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
488,213!
159
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
160
    return;
×
161
  }
162

163
  queue->qItem = NULL;
488,213✔
164
  queue->qChkptItem = NULL;
488,213✔
165
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
488,213✔
166
}
167

168
void streamQueueProcessFail(SStreamQueue* queue) {
3,138✔
169
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
3,138!
170
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
171
    return;
×
172
  }
173
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
3,138✔
174
}
175

176
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
×
177
  if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
×
178
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
×
179
    return;
×
180
  }
181
  atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
×
182
}
183

184
bool streamQueueIsFull(const SStreamQueue* pQueue) {
947,228✔
185
  int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
947,228✔
186
  if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
947,414!
187
    return true;
×
188
  }
189

190
  return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
947,414✔
191
}
192

193
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
1,890,765✔
194
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
1,890,765✔
195
  int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
1,892,228✔
196

197
  return numOfItems1 + numOfItems2;
1,892,099✔
198
}
199

200
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
52,747✔
201
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
52,747✔
202
  int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
52,751✔
203

204
  return numOfItems1 + numOfItems2;
52,751✔
205
}
206

207
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
62,634✔
208
  return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
62,634✔
209
}
210

211
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
488,209✔
212
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
488,209✔
213
  return p->dataSize;
488,209✔
214
}
215

216
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
434,919✔
217
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
434,919✔
218
  p->dataSize += size;
434,919✔
219
}
434,919✔
220

221
const char* streamQueueItemGetTypeStr(int32_t type) {
72,175✔
222
  switch (type) {
72,175✔
223
    case STREAM_INPUT__CHECKPOINT:
4,736✔
224
      return "checkpoint";
4,736✔
225
    case STREAM_INPUT__CHECKPOINT_TRIGGER:
18,812✔
226
      return "checkpoint-trigger";
18,812✔
227
    case STREAM_INPUT__TRANS_STATE:
15,877✔
228
      return "trans-state";
15,877✔
229
    case STREAM_INPUT__REF_DATA_BLOCK:
5,660✔
230
      return "ref-block";
5,660✔
231
    default:
27,090✔
232
      return "datablock";
27,090✔
233
  }
234
}
235

236
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
140,974✔
237
                                             int32_t* blockSize) {
238
  const char*   id = pTask->id.idStr;
140,974✔
239
  int32_t       taskLevel = pTask->info.taskLevel;
140,974✔
240
  SStreamQueue* pQueue = pTask->inputq.queue;
140,974✔
241

242
  *pInput = NULL;
140,974✔
243
  *numOfBlocks = 0;
140,974✔
244
  *blockSize = 0;
140,974✔
245

246
  // no available token in bucket for sink task, let's wait for a little bit
247
  if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
140,974!
248
    stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
×
249
    return EXEC_AFTER_IDLE;
×
250
  }
251

252
  while (1) {
476,962✔
253
    ETaskStatus status = streamTaskGetStatus(pTask).state;
617,921✔
254
    if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
617,887✔
255
      stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
9!
256
      return EXEC_CONTINUE;
140,983✔
257
    }
258

259
    SStreamQueueItem* qItem = NULL;
617,878✔
260
    if (taskLevel == TASK_LEVEL__SOURCE) {
617,878✔
261
      streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
513,037✔
262
    } else {
263
      streamQueueNextItem(pQueue, &qItem);
104,841✔
264
    }
265

266
    if (qItem == NULL) {
617,956✔
267
      // restore the token to bucket
268
      if (*numOfBlocks > 0) {
100,627✔
269
        *blockSize = streamQueueItemGetSize(*pInput);
38,889✔
270
        if (taskLevel == TASK_LEVEL__SINK) {
38,882✔
271
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
15,675✔
272
        }
273
      } else {
274
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
61,738✔
275
      }
276

277
      return EXEC_CONTINUE;
100,606✔
278
    }
279

280
    // do not merge blocks for sink node and check point data block
281
    int8_t type = qItem->type;
517,329✔
282
    if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
517,329✔
283
        type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK) {
492,759✔
284
      const char* p = streamQueueItemGetTypeStr(type);
28,991✔
285

286
      if (*pInput == NULL) {
28,983✔
287
        stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
25,957✔
288

289
        // restore the token to bucket in case of checkpoint/trans-state msg
290
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
25,957✔
291
        *blockSize = 0;
25,952✔
292
        *numOfBlocks = 1;
25,952✔
293
        *pInput = qItem;
25,952✔
294
        return EXEC_CONTINUE;
25,952✔
295
      } else {  // previous existed blocks needs to be handled, before handle the checkpoint msg block
296
        stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
3,026✔
297
        *blockSize = streamQueueItemGetSize(*pInput);
3,026✔
298
        if (taskLevel == TASK_LEVEL__SINK) {
3,027✔
299
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
517✔
300
        }
301

302
        if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
3,027!
303
            (taskLevel == TASK_LEVEL__SOURCE)) {
304
          streamQueueGetSourceChkptFailed(pQueue);
×
305
        } else {
306
          streamQueueProcessFail(pQueue);
3,027✔
307
        }
308
        return EXEC_CONTINUE;
3,028✔
309
      }
310
    } else {
311
      if (*pInput == NULL) {
488,338✔
312
        *pInput = qItem;
53,289✔
313
      } else { // merge current block failed, let's handle the already merged blocks.
314
        void*   newRet = NULL;
435,049✔
315
        int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
435,049✔
316
        if (newRet == NULL) {
435,051✔
317
          if (code != -1) {
112!
318
            stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
×
319
                    tstrerror(code));
320
          }
321

322
          *blockSize = streamQueueItemGetSize(*pInput);
112✔
323
          if (taskLevel == TASK_LEVEL__SINK) {
111!
324
            streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
325
          }
326

327
          streamQueueProcessFail(pQueue);
111✔
328
          return EXEC_CONTINUE;
111✔
329
        }
330

331
        *pInput = newRet;
434,939✔
332
      }
333

334
      *numOfBlocks += 1;
488,228✔
335
      streamQueueProcessSuccess(pQueue);
488,228✔
336

337
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
488,236✔
338
        stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
11,274✔
339

340
        *blockSize = streamQueueItemGetSize(*pInput);
11,274✔
341
        if (taskLevel == TASK_LEVEL__SINK) {
11,274✔
342
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
27✔
343
        }
344

345
        return EXEC_CONTINUE;
11,274✔
346
      }
347
    }
348
  }
349
}
350

351
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
514,490✔
352
  int8_t      type = pItem->type;
514,490✔
353
  STaosQueue* pQueue = pTask->inputq.queue->pQueue;
514,490✔
354
  int32_t     level = pTask->info.taskLevel;
514,490✔
355
  int32_t     total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
514,490✔
356

357
  if (type == STREAM_INPUT__DATA_SUBMIT) {
514,508✔
358
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
439,832✔
359
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
439,832!
360
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
16✔
361
      stTrace(
16!
362
          "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
363
          pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
364
      streamDataSubmitDestroy(px);
16✔
365
      return TSDB_CODE_STREAM_INPUTQ_FULL;
16✔
366
    }
367

368
    int32_t msgLen = px->submit.msgLen;
439,815✔
369
    int64_t ver = px->submit.ver;
439,815✔
370

371
    int32_t code = taosWriteQitem(pQueue, pItem);
439,815✔
372
    if (code != TSDB_CODE_SUCCESS) {
439,818!
373
      streamDataSubmitDestroy(px);
×
374
      return code;
×
375
    }
376

377
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
439,818✔
378

379
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
380
    stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
439,811✔
381
            msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
382
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
120,693✔
383
    if (streamQueueIsFull(pTask->inputq.queue)) {
46,016!
384
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
385

386
      stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
×
387
              pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
388
      streamFreeQitem(pItem);
×
389
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
390
    }
391

392
    int32_t code = taosWriteQitem(pQueue, pItem);
46,017✔
393
    if (code != TSDB_CODE_SUCCESS) {
46,017!
394
      streamFreeQitem(pItem);
×
395
      return code;
×
396
    }
397

398
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
46,017✔
399
    stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
46,017✔
400
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
28,660✔
401
             type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) {
29,148✔
402

403
    int32_t code = 0;
24,134✔
404
    if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
30,278✔
405
      STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
6,139✔
406
      code = taosWriteQitem(pChkptQ, pItem);
6,139✔
407

408
      double  size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
6,145✔
409
      int32_t num = taosQueueItemSize(pChkptQ);
6,144✔
410

411
      stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
6,145✔
412
              pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
413
    } else {
414
      code = taosWriteQitem(pQueue, pItem);
17,995✔
415
      if (code != TSDB_CODE_SUCCESS) {
17,992!
416
        streamFreeQitem(pItem);
×
417
        return code;
×
418
      }
419

420
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
17,992✔
421
      stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
17,992✔
422
              pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
423
    }
424
  } else if (type == STREAM_INPUT__GET_RES) {
4,526!
425
    // use the default memory limit, refactor later.
426
    int32_t code = taosWriteQitem(pQueue, pItem);
4,526✔
427
    if (code != TSDB_CODE_SUCCESS) {
4,526!
428
      streamFreeQitem(pItem);
×
429
      return code;
×
430
    }
431

432
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
4,526✔
433
    stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
4,526✔
434
  } else {
435
    stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
×
436
    return TSDB_CODE_INVALID_PARA;
×
437
  }
438

439
  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
514,486✔
440
      (pTask->info.delaySchedParam != 0)) {
495,328✔
441
    (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
11,873✔
442
                                        TASK_TRIGGER_STATUS__MAY_ACTIVE);
443
    stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
11,873✔
444
            pTask->schedInfo.status);
445
  }
446

447
  return 0;
514,485✔
448
}
449

450
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
2,209✔
451
  int32_t           code = 0;
2,209✔
452
  SStreamDataBlock* pTranstate = NULL;
2,209✔
453
  SSDataBlock*      pBlock = NULL;
2,209✔
454

455
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate);
2,209✔
456
  if (code) {
2,209!
457
    return code;
×
458
  }
459

460
  pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
2,209!
461
  if (pBlock == NULL) {
2,209!
462
    code = terrno;
×
463
    goto _err;
×
464
  }
465

466
  pTranstate->type = STREAM_INPUT__TRANS_STATE;
2,209✔
467

468
  pBlock->info.type = STREAM_TRANS_STATE;
2,209✔
469
  pBlock->info.rows = 1;
2,209✔
470
  pBlock->info.childId = pTask->info.selfChildId;
2,209✔
471

472
  pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
2,209✔
473
  if (pTranstate->blocks == NULL) {
2,209!
474
    code = terrno;
×
475
    goto _err;
×
476
  }
477

478
  void* p = taosArrayPush(pTranstate->blocks, pBlock);
2,209✔
479
  if (p == NULL) {
2,209!
480
    code = terrno;
×
481
    goto _err;
×
482
  }
483

484
  taosMemoryFree(pBlock);
2,209!
485
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
2,209!
486
    code = TSDB_CODE_OUT_OF_MEMORY;
×
487
    goto _err;
×
488
  }
489

490
  pTask->status.appendTranstateBlock = true;
2,209✔
491
  return TSDB_CODE_SUCCESS;
2,209✔
492

493
_err:
×
494
  taosMemoryFree(pBlock);
×
495
  taosFreeQitem(pTranstate);
×
496
  return code;
×
497
}
498

499
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
500
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
22,122✔
501
  STaosQueue* pQueue = pTask->outputq.queue->pQueue;
22,122✔
502
  int32_t     code = taosWriteQitem(pQueue, pBlock);
22,122✔
503

504
  int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
22,122✔
505
  double  size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
22,121✔
506
  if (code != 0) {
22,121!
507
    stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
×
508
            pTask->id.idStr, total + 1, size, tstrerror(code));
509
  } else {
510
    if (streamQueueIsFull(pTask->outputq.queue)) {
22,121!
511
      stWarn(
×
512
          "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms "
513
          "after handle this batch of blocks",
514
          pTask->id.idStr, total, size);
515
    } else {
516
      stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
22,122✔
517
    }
518
  }
519

520
  return code;
22,121✔
521
}
522

523
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
14,422✔
524
                                  const char* id) {
525
  if (numCap < 10 || numRate < 10 || pBucket == NULL) {
14,422!
526
    stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
×
527
    return TSDB_CODE_INVALID_PARA;
×
528
  }
529

530
  pBucket->numCapacity = numCap;
14,434✔
531
  pBucket->numOfToken = numCap;
14,434✔
532
  pBucket->numRate = numRate;
14,434✔
533

534
  pBucket->quotaRate = quotaRate;
14,434✔
535
  pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
14,434✔
536
  pBucket->quotaRemain = pBucket->quotaCapacity;
14,434✔
537

538
  pBucket->tokenFillTimestamp = taosGetTimestampMs();
14,426✔
539
  pBucket->quotaFillTimestamp = taosGetTimestampMs();
14,430✔
540
  stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
14,430✔
541
  return TSDB_CODE_SUCCESS;
14,431✔
542
}
543

544
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
56,332✔
545
  int64_t now = taosGetTimestampMs();
56,332✔
546

547
  int64_t deltaToken = now - pBucket->tokenFillTimestamp;
56,332✔
548
  if (pBucket->numOfToken < 0) {
56,332!
549
    return;
×
550
  }
551

552
  int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
56,332✔
553
  if (incNum > 0) {
56,332✔
554
    pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
29,778✔
555
    pBucket->tokenFillTimestamp = now;
29,778✔
556
  }
557

558
  // increase the new available quota as time goes on
559
  int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
56,332✔
560
  double  incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
56,332✔
561
  if (incSize > 0) {
56,332✔
562
    pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
38,911✔
563
    pBucket->quotaFillTimestamp = now;
38,911✔
564
  }
565

566
  if (incNum > 0 || incSize > 0) {
56,332✔
567
    stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
38,905✔
568
            ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
569
            pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
570
  }
571
}
572

573
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
56,332✔
574
  fillTokenBucket(pBucket, id);
56,332✔
575

576
  if (pBucket->numOfToken > 0) {
56,321!
577
    if (pBucket->quotaRemain > 0) {
56,325!
578
      pBucket->numOfToken -= 1;
56,325✔
579
      return true;
56,325✔
580
    } else {  // no available size quota now
581
      return false;
×
582
    }
583
  } else {
584
    return false;
×
585
  }
586
}
587

588
void streamTaskPutbackToken(STokenBucket* pBucket) {
87,667✔
589
  pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
87,667✔
590
}
87,667✔
591

592
// size in KB
593
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); }
16,219✔
594

595
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc