• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3819

01 Apr 2025 09:27AM UTC coverage: 34.076% (+0.01%) from 34.065%
#3819

push

travis-ci

happyguoxy
test:alter gcda dir

148544 of 599532 branches covered (24.78%)

Branch coverage included in aggregate %.

222541 of 489451 relevant lines covered (45.47%)

763329.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.16
/source/libs/stream/src/streamQueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
#define MAX_STREAM_EXEC_BATCH_NUM 32
19
#define MAX_SMOOTH_BURST_RATIO    5  // 5 sec
20

21
// todo refactor:
22
// read data from input queue
23
typedef struct SQueueReader {
24
  SStreamQueue* pQueue;
25
  int32_t       taskLevel;
26
  int32_t       maxBlocks;     // maximum block in one batch
27
  int32_t       waitDuration;  // maximum wait time to format several block into a batch to process, unit: ms
28
} SQueueReader;
29

30
#define streamQueueCurItem(_q) ((_q)->qItem)
31

32
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
33
static void streamTaskPutbackToken(STokenBucket* pBucket);
34
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
35
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
36

37
static void streamQueueCleanup(SStreamQueue* pQueue) {
58✔
38
  SStreamQueueItem* qItem = NULL;
58✔
39
  while (1) {
40
    streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, "");
58✔
41
    if (qItem == NULL) {
58!
42
      break;
58✔
43
    }
44
    streamFreeQitem(qItem);
×
45
  }
46
  pQueue->status = STREAM_QUEUE__SUCESS;
58✔
47
}
58✔
48

49
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
58✔
50
  *pQ = NULL;
58✔
51

52
  int32_t code = 0;
58✔
53
  int32_t lino = 0;
58✔
54

55
  SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
58!
56
  if (pQueue == NULL) {
58!
57
    return terrno;
×
58
  }
59

60
  code = taosOpenQueue(&pQueue->pQueue);
58✔
61
  TSDB_CHECK_CODE(code, lino, _error);
58!
62

63
  code = taosAllocateQall(&pQueue->qall);
58✔
64
  TSDB_CHECK_CODE(code, lino, _error);
58!
65

66
  code = taosOpenQueue(&pQueue->pChkptQueue);
58✔
67
  TSDB_CHECK_CODE(code, lino, _error);
58!
68

69
  pQueue->status = STREAM_QUEUE__SUCESS;
58✔
70

71
  taosSetQueueCapacity(pQueue->pQueue, cap);
58✔
72
  taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
58✔
73

74
  *pQ = pQueue;
58✔
75
  return code;
58✔
76

77
_error:
×
78
  streamQueueClose(pQueue, 0);
×
79
  stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
×
80
  return code;
×
81
}
82

83
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
58✔
84
  stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue,
58!
85
          taosQueueItemSize(pQueue->pQueue));
86
  streamQueueCleanup(pQueue);
58✔
87

88
  taosFreeQall(pQueue->qall);
58✔
89
  taosCloseQueue(pQueue->pQueue);
58✔
90
  pQueue->pQueue = NULL;
58✔
91

92
  taosCloseQueue(pQueue->pChkptQueue);
58✔
93
  pQueue->pChkptQueue = NULL;
58✔
94

95
  taosMemoryFree(pQueue);
58!
96
}
58✔
97

98
void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
1,423✔
99
  *pItem = NULL;
1,423✔
100
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
1,423✔
101

102
  if (flag == STREAM_QUEUE__FAILED) {
1,423!
103
    *pItem = streamQueueCurItem(pQueue);
×
104
  } else {
105
    pQueue->qItem = NULL;
1,423✔
106
    (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
1,423✔
107

108
    if (pQueue->qItem == NULL) {
1,423✔
109
      (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
1,395✔
110
      (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
1,395✔
111
    }
112

113
    *pItem = streamQueueCurItem(pQueue);
1,423✔
114
  }
115
}
1,423✔
116

117
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
1,770✔
118
  *pItem = NULL;
1,770✔
119
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
1,770✔
120

121
  if (flag == STREAM_QUEUE__CHKPTFAILED) {
1,770!
122
    *pItem = pQueue->qChkptItem;
×
123
    return;
×
124
  }
125

126
  if (flag == STREAM_QUEUE__FAILED) {
1,770!
127
    *pItem = pQueue->qItem;
×
128
    return;
×
129
  }
130

131
  pQueue->qChkptItem = NULL;
1,770✔
132
  taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
1,770✔
133
  if (pQueue->qChkptItem != NULL) {
1,770✔
134
    stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
4!
135
    *pItem = pQueue->qChkptItem;
4✔
136
    return;
4✔
137
  }
138

139
  // if in checkpoint status, not read data from ordinary input q.
140
  if (status == TASK_STATUS__CK) {
1,766✔
141
    stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
2!
142
    return;
2✔
143
  }
144

145
  // let's try the ordinary input q
146
  pQueue->qItem = NULL;
1,764✔
147
  int32_t num = taosGetQitem(pQueue->qall, &pQueue->qItem);
1,764✔
148
  TAOS_UNUSED(num);
149

150
  if (pQueue->qItem == NULL) {
1,764✔
151
    num = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
968✔
152
    num = taosGetQitem(pQueue->qall, &pQueue->qItem);
968✔
153
    TAOS_UNUSED(num);
154
  }
155

156
  *pItem = streamQueueCurItem(pQueue);
1,764✔
157
}
158

159
void streamQueueProcessSuccess(SStreamQueue* queue) {
1,369✔
160
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
1,369!
161
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
162
    return;
×
163
  }
164

165
  queue->qItem = NULL;
1,369✔
166
  queue->qChkptItem = NULL;
1,369✔
167
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
1,369✔
168
}
169

170
void streamQueueProcessFail(SStreamQueue* queue) {
×
171
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
×
172
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
173
    return;
×
174
  }
175
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
×
176
}
177

178
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
×
179
  if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
×
180
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
×
181
    return;
×
182
  }
183
  atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
×
184
}
185

186
bool streamQueueIsFull(const SStreamQueue* pQueue) {
3,496✔
187
  int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
3,496✔
188
  if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
3,496!
189
    return true;
×
190
  }
191

192
  return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
3,496✔
193
}
194

195
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
7,056✔
196
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
7,056✔
197
  int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
7,056✔
198

199
  return numOfItems1 + numOfItems2;
7,056✔
200
}
201

202
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
641✔
203
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
641✔
204
  int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
641✔
205

206
  return numOfItems1 + numOfItems2;
641✔
207
}
208

209
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
145✔
210
  return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
145✔
211
}
212

213
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
1,369✔
214
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
1,369✔
215
  return p->dataSize;
1,369✔
216
}
217

218
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
794✔
219
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
794✔
220
  p->dataSize += size;
794✔
221
}
794✔
222

223
const char* streamQueueItemGetTypeStr(int32_t type) {
474✔
224
  switch (type) {
474!
225
    case STREAM_INPUT__CHECKPOINT:
6✔
226
      return "checkpoint";
6✔
227
    case STREAM_INPUT__CHECKPOINT_TRIGGER:
15✔
228
      return "checkpoint-trigger";
15✔
229
    case STREAM_INPUT__TRANS_STATE:
12✔
230
      return "trans-state";
12✔
231
    case STREAM_INPUT__REF_DATA_BLOCK:
×
232
      return "ref-block";
×
233
    case STREAM_INPUT__RECALCULATE:
×
234
      return "recalculate";
×
235
    default:
441✔
236
      return "datablock";
441✔
237
  }
238
}
239

240
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
1,162✔
241
                                             int32_t* blockSize) {
242
  const char*   id = pTask->id.idStr;
1,162✔
243
  int32_t       taskLevel = pTask->info.taskLevel;
1,162✔
244
  SStreamQueue* pQueue = pTask->inputq.queue;
1,162✔
245

246
  *pInput = NULL;
1,162✔
247
  *numOfBlocks = 0;
1,162✔
248
  *blockSize = 0;
1,162✔
249

250
  // no available token in bucket for sink task, let's wait for a little bit
251
  if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
1,162!
252
    stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
×
253
    return EXEC_AFTER_IDLE;
×
254
  }
255

256
  while (1) {
1,356✔
257
    ETaskStatus status = streamTaskGetStatus(pTask).state;
2,518✔
258
    if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
2,518!
259
      stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
×
260
      return EXEC_CONTINUE;
1,162✔
261
    }
262

263
    SStreamQueueItem* qItem = NULL;
2,518✔
264
    if (taskLevel == TASK_LEVEL__SOURCE) {
2,518✔
265
      streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
1,712✔
266
    } else {
267
      streamQueueNextItem(pQueue, &qItem);
806✔
268
    }
269

270
    if (qItem == NULL) {
2,518✔
271
      // restore the token to bucket
272
      if (*numOfBlocks > 0) {
1,134✔
273
        *blockSize = streamQueueItemGetSize(*pInput);
562✔
274
        if (taskLevel == TASK_LEVEL__SINK) {
562✔
275
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
134✔
276
        }
277
      } else {
278
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
572✔
279
      }
280

281
      return EXEC_CONTINUE;
1,134✔
282
    }
283

284
    // do not merge blocks for sink node and check point data block
285
    int8_t type = qItem->type;
1,384✔
286
    if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
1,384✔
287
        type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK || type == STREAM_INPUT__RECALCULATE) {
1,369!
288
      const char* p = streamQueueItemGetTypeStr(type);
15✔
289

290
      if (*pInput == NULL) {
15!
291
        stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
15!
292

293
        // restore the token to bucket in case of checkpoint/trans-state msg
294
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
15✔
295
        *blockSize = 0;
15✔
296
        *numOfBlocks = 1;
15✔
297
        *pInput = qItem;
15✔
298
        return EXEC_CONTINUE;
15✔
299
      } else {  // previous existed blocks needs to be handled, before handle the checkpoint msg block
300
        stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
×
301
        *blockSize = streamQueueItemGetSize(*pInput);
×
302
        if (taskLevel == TASK_LEVEL__SINK) {
×
303
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
304
        }
305

306
        if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
×
307
            (taskLevel == TASK_LEVEL__SOURCE)) {
308
          streamQueueGetSourceChkptFailed(pQueue);
×
309
        } else {
310
          streamQueueProcessFail(pQueue);
×
311
        }
312
        return EXEC_CONTINUE;
×
313
      }
314
    } else {
315
      if (*pInput == NULL) {
1,369✔
316
        *pInput = qItem;
575✔
317
      } else { // merge current block failed, let's handle the already merged blocks.
318
        void*   newRet = NULL;
794✔
319
        int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
794✔
320
        if (newRet == NULL) {
794!
321
          if (code != -1) {
×
322
            stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
×
323
                    tstrerror(code));
324
          }
325

326
          *blockSize = streamQueueItemGetSize(*pInput);
×
327
          if (taskLevel == TASK_LEVEL__SINK) {
×
328
            streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
329
          }
330

331
          streamQueueProcessFail(pQueue);
×
332
          return EXEC_CONTINUE;
×
333
        }
334

335
        *pInput = newRet;
794✔
336
      }
337

338
      *numOfBlocks += 1;
1,369✔
339
      streamQueueProcessSuccess(pQueue);
1,369✔
340

341
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
1,369✔
342
        stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
13!
343

344
        *blockSize = streamQueueItemGetSize(*pInput);
13✔
345
        if (taskLevel == TASK_LEVEL__SINK) {
13!
346
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
347
        }
348

349
        return EXEC_CONTINUE;
13✔
350
      }
351
    }
352
  }
353
}
354

355
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
1,384✔
356
  int8_t      type = pItem->type;
1,384✔
357
  STaosQueue* pQueue = pTask->inputq.queue->pQueue;
1,384✔
358
  int32_t     level = pTask->info.taskLevel;
1,384✔
359
  int32_t     total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
1,384✔
360

361
  if (type == STREAM_INPUT__DATA_SUBMIT) {
1,384✔
362
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
1,098✔
363
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
1,098!
364
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
365
      stTrace(
×
366
          "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
367
          pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
368
      streamDataSubmitDestroy(px);
×
369
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
370
    }
371

372
    int32_t msgLen = px->submit.msgLen;
1,098✔
373
    int64_t ver = px->submit.ver;
1,098✔
374

375
    int32_t code = taosWriteQitem(pQueue, pItem);
1,098✔
376
    if (code != TSDB_CODE_SUCCESS) {
1,098!
377
      streamDataSubmitDestroy(px);
×
378
      return code;
×
379
    }
380

381
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
1,098✔
382

383
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
384
    stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
1,098!
385
            msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
386
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
557!
387
    if (streamQueueIsFull(pTask->inputq.queue)) {
271!
388
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
389

390
      stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
×
391
              pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
392
      streamFreeQitem(pItem);
×
393
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
394
    }
395

396
    int32_t code = taosWriteQitem(pQueue, pItem);
271✔
397
    if (code != TSDB_CODE_SUCCESS) {
271!
398
      streamFreeQitem(pItem);
×
399
      return code;
×
400
    }
401

402
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
271✔
403
    stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
271!
404
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
15!
405
             type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__RECALCULATE) {
15!
406

407
    int32_t code = 0;
15✔
408
    if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
19✔
409
      STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
4✔
410
      code = taosWriteQitem(pChkptQ, pItem);
4✔
411

412
      double  size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
4✔
413
      int32_t num = taosQueueItemSize(pChkptQ);
4✔
414

415
      stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
4!
416
              pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
417
    } else {
418
      code = taosWriteQitem(pQueue, pItem);
11✔
419
      if (code != TSDB_CODE_SUCCESS) {
11!
420
        streamFreeQitem(pItem);
×
421
        return code;
×
422
      }
423

424
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
11✔
425
      stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
11!
426
              pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
427
    }
428
  } else if (type == STREAM_INPUT__GET_RES) {
×
429
    // use the default memory limit, refactor later.
430
    int32_t code = taosWriteQitem(pQueue, pItem);
×
431
    if (code != TSDB_CODE_SUCCESS) {
×
432
      streamFreeQitem(pItem);
×
433
      return code;
×
434
    }
435

436
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
437
    stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
×
438
  } else {
439
    stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
×
440
    return TSDB_CODE_INVALID_PARA;
×
441
  }
442

443
  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
1,384!
444
      type != STREAM_INPUT__RECALCULATE && (pTask->info.delaySchedParam != 0)) {
1,375!
445
    (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
×
446
                                        TASK_TRIGGER_STATUS__MAY_ACTIVE);
447
    stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
×
448
            pTask->schedInfo.status);
449
  }
450

451
  return 0;
1,384✔
452
}
453

454
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
2✔
455
  int32_t           code = 0;
2✔
456
  SStreamDataBlock* pTranstate = NULL;
2✔
457
  SSDataBlock*      pBlock = NULL;
2✔
458

459
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate);
2✔
460
  if (code) {
2!
461
    return code;
×
462
  }
463

464
  pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
2!
465
  if (pBlock == NULL) {
2!
466
    code = terrno;
×
467
    goto _err;
×
468
  }
469

470
  pTranstate->type = STREAM_INPUT__TRANS_STATE;
2✔
471

472
  pBlock->info.type = STREAM_TRANS_STATE;
2✔
473
  pBlock->info.rows = 1;
2✔
474
  pBlock->info.childId = pTask->info.selfChildId;
2✔
475

476
  pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
2✔
477
  if (pTranstate->blocks == NULL) {
2!
478
    code = terrno;
×
479
    goto _err;
×
480
  }
481

482
  void* p = taosArrayPush(pTranstate->blocks, pBlock);
2✔
483
  if (p == NULL) {
2!
484
    code = terrno;
×
485
    goto _err;
×
486
  }
487

488
  taosMemoryFree(pBlock);
2!
489
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
2!
490
    code = TSDB_CODE_OUT_OF_MEMORY;
×
491
    goto _err;
×
492
  }
493

494
  pTask->status.appendTranstateBlock = true;
2✔
495
  return TSDB_CODE_SUCCESS;
2✔
496

497
_err:
×
498
  taosMemoryFree(pBlock);
×
499
  taosFreeQitem(pTranstate);
×
500
  return code;
×
501
}
502

503
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
504
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
316✔
505
  STaosQueue* pQueue = pTask->outputq.queue->pQueue;
316✔
506
  int32_t     code = taosWriteQitem(pQueue, pBlock);
316✔
507

508
  int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
316✔
509
  double  size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
316✔
510
  if (code != 0) {
316!
511
    stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
×
512
            pTask->id.idStr, total + 1, size, tstrerror(code));
513
  } else {
514
    if (streamQueueIsFull(pTask->outputq.queue)) {
316!
515
      stWarn(
×
516
          "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms "
517
          "after handle this batch of blocks",
518
          pTask->id.idStr, total, size);
519
    } else {
520
      stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
316!
521
    }
522
  }
523

524
  return code;
316✔
525
}
526

527
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
29✔
528
                                  const char* id) {
529
  if (numCap < 10 || numRate < 10 || pBucket == NULL) {
29!
530
    stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
×
531
    return TSDB_CODE_INVALID_PARA;
×
532
  }
533

534
  pBucket->numCapacity = numCap;
29✔
535
  pBucket->numOfToken = numCap;
29✔
536
  pBucket->numRate = numRate;
29✔
537

538
  pBucket->quotaRate = quotaRate;
29✔
539
  pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
29✔
540
  pBucket->quotaRemain = pBucket->quotaCapacity;
29✔
541

542
  pBucket->tokenFillTimestamp = taosGetTimestampMs();
29✔
543
  pBucket->quotaFillTimestamp = taosGetTimestampMs();
29✔
544
  stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
29!
545
  return TSDB_CODE_SUCCESS;
29✔
546
}
547

548
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
274✔
549
  int64_t now = taosGetTimestampMs();
274✔
550

551
  int64_t deltaToken = now - pBucket->tokenFillTimestamp;
274✔
552
  if (pBucket->numOfToken < 0) {
274!
553
    return;
×
554
  }
555

556
  int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
274✔
557
  if (incNum > 0) {
274✔
558
    pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
143✔
559
    pBucket->tokenFillTimestamp = now;
143✔
560
  }
561

562
  // increase the new available quota as time goes on
563
  int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
274✔
564
  double  incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
274✔
565
  if (incSize > 0) {
274✔
566
    pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
163✔
567
    pBucket->quotaFillTimestamp = now;
163✔
568
  }
569

570
  if (incNum > 0 || incSize > 0) {
274✔
571
    stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
163!
572
            ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
573
            pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
574
  }
575
}
576

577
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
274✔
578
  fillTokenBucket(pBucket, id);
274✔
579

580
  if (pBucket->numOfToken > 0) {
274!
581
    if (pBucket->quotaRemain > 0) {
274!
582
      pBucket->numOfToken -= 1;
274✔
583
      return true;
274✔
584
    } else {  // no available size quota now
585
      return false;
×
586
    }
587
  } else {
588
    return false;
×
589
  }
590
}
591

592
void streamTaskPutbackToken(STokenBucket* pBucket) {
587✔
593
  pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
587✔
594
}
587✔
595

596
// size in KB
597
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); }
134✔
598

599
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc