• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3633

11 Mar 2025 12:59PM UTC coverage: 0.0% (-60.7%) from 60.719%
#3633

push

travis-ci

web-flow
Merge pull request #30118 from taosdata/wl30

udpate ci workflow

0 of 280412 branches covered (0.0%)

Branch coverage included in aggregate %.

0 of 275582 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/stream/src/streamQueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
#define MAX_STREAM_EXEC_BATCH_NUM 32
19
#define MAX_SMOOTH_BURST_RATIO    5  // 5 sec
20

21
// todo refactor:
22
// read data from input queue
23
typedef struct SQueueReader {
24
  SStreamQueue* pQueue;
25
  int32_t       taskLevel;
26
  int32_t       maxBlocks;     // maximum block in one batch
27
  int32_t       waitDuration;  // maximum wait time to format several block into a batch to process, unit: ms
28
} SQueueReader;
29

30
#define streamQueueCurItem(_q) ((_q)->qItem)
31

32
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
33
static void streamTaskPutbackToken(STokenBucket* pBucket);
34
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
35
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
36

37
static void streamQueueCleanup(SStreamQueue* pQueue) {
×
38
  SStreamQueueItem* qItem = NULL;
×
39
  while (1) {
40
    streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, "");
×
41
    if (qItem == NULL) {
×
42
      break;
×
43
    }
44
    streamFreeQitem(qItem);
×
45
  }
46
  pQueue->status = STREAM_QUEUE__SUCESS;
×
47
}
×
48

49
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
×
50
  *pQ = NULL;
×
51

52
  int32_t code = 0;
×
53
  int32_t lino = 0;
×
54

55
  SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
×
56
  if (pQueue == NULL) {
×
57
    return terrno;
×
58
  }
59

60
  code = taosOpenQueue(&pQueue->pQueue);
×
61
  TSDB_CHECK_CODE(code, lino, _error);
×
62

63
  code = taosAllocateQall(&pQueue->qall);
×
64
  TSDB_CHECK_CODE(code, lino, _error);
×
65

66
  code = taosOpenQueue(&pQueue->pChkptQueue);
×
67
  TSDB_CHECK_CODE(code, lino, _error);
×
68

69
  pQueue->status = STREAM_QUEUE__SUCESS;
×
70

71
  taosSetQueueCapacity(pQueue->pQueue, cap);
×
72
  taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
×
73

74
  *pQ = pQueue;
×
75
  return code;
×
76

77
_error:
×
78
  streamQueueClose(pQueue, 0);
×
79
  stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
×
80
  return code;
×
81
}
82

83
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
×
84
  stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue,
×
85
          taosQueueItemSize(pQueue->pQueue));
86
  streamQueueCleanup(pQueue);
×
87

88
  taosFreeQall(pQueue->qall);
×
89
  taosCloseQueue(pQueue->pQueue);
×
90
  pQueue->pQueue = NULL;
×
91

92
  taosCloseQueue(pQueue->pChkptQueue);
×
93
  pQueue->pChkptQueue = NULL;
×
94

95
  taosMemoryFree(pQueue);
×
96
}
×
97

98
void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
×
99
  *pItem = NULL;
×
100
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
×
101

102
  if (flag == STREAM_QUEUE__FAILED) {
×
103
    *pItem = streamQueueCurItem(pQueue);
×
104
  } else {
105
    pQueue->qItem = NULL;
×
106
    (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
×
107

108
    if (pQueue->qItem == NULL) {
×
109
      (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
×
110
      (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
×
111
    }
112

113
    *pItem = streamQueueCurItem(pQueue);
×
114
  }
115
}
×
116

117
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
×
118
  *pItem = NULL;
×
119
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
×
120

121
  if (flag == STREAM_QUEUE__CHKPTFAILED) {
×
122
    *pItem = pQueue->qChkptItem;
×
123
    return;
×
124
  }
125

126
  if (flag == STREAM_QUEUE__FAILED) {
×
127
    *pItem = pQueue->qItem;
×
128
    return;
×
129
  }
130

131
  pQueue->qChkptItem = NULL;
×
132
  taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
×
133
  if (pQueue->qChkptItem != NULL) {
×
134
    stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
×
135
    *pItem = pQueue->qChkptItem;
×
136
    return;
×
137
  }
138

139
  // if in checkpoint status, not read data from ordinary input q.
140
  if (status == TASK_STATUS__CK) {
×
141
    stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
×
142
    return;
×
143
  }
144

145
  // let's try the ordinary input q
146
  pQueue->qItem = NULL;
×
147
  int32_t code = taosGetQitem(pQueue->qall, &pQueue->qItem);
×
148
  if (code) {
×
149
    stError("s-task:%s failed to extract data from inputQ, code:%s", id, tstrerror(code));
×
150
  }
151

152
  if (pQueue->qItem == NULL) {
×
153
    code = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
×
154
    if (code) {
×
155
      stError("s-task:%s failed to read qitem into qall, code:%s", id, tstrerror(code));
×
156
    }
157
    code = taosGetQitem(pQueue->qall, &pQueue->qItem);
×
158
    if (code) {
×
159
      stError("s-task:%s failed to extract data from inputQ(qall), code:%s", id, tstrerror(code));
×
160
    }
161
  }
162

163
  *pItem = streamQueueCurItem(pQueue);
×
164
}
165

166
void streamQueueProcessSuccess(SStreamQueue* queue) {
×
167
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
×
168
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
169
    return;
×
170
  }
171

172
  queue->qItem = NULL;
×
173
  queue->qChkptItem = NULL;
×
174
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
×
175
}
176

177
void streamQueueProcessFail(SStreamQueue* queue) {
×
178
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
×
179
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
180
    return;
×
181
  }
182
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
×
183
}
184

185
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
×
186
  if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
×
187
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
×
188
    return;
×
189
  }
190
  atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
×
191
}
192

193
bool streamQueueIsFull(const SStreamQueue* pQueue) {
×
194
  int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
×
195
  if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
×
196
    return true;
×
197
  }
198

199
  return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
×
200
}
201

202
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
×
203
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
×
204
  int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
×
205

206
  return numOfItems1 + numOfItems2;
×
207
}
208

209
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
×
210
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
×
211
  int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
×
212

213
  return numOfItems1 + numOfItems2;
×
214
}
215

216
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
×
217
  return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
×
218
}
219

220
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
×
221
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
×
222
  return p->dataSize;
×
223
}
224

225
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
×
226
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
×
227
  p->dataSize += size;
×
228
}
×
229

230
const char* streamQueueItemGetTypeStr(int32_t type) {
×
231
  switch (type) {
×
232
    case STREAM_INPUT__CHECKPOINT:
×
233
      return "checkpoint";
×
234
    case STREAM_INPUT__CHECKPOINT_TRIGGER:
×
235
      return "checkpoint-trigger";
×
236
    case STREAM_INPUT__TRANS_STATE:
×
237
      return "trans-state";
×
238
    case STREAM_INPUT__REF_DATA_BLOCK:
×
239
      return "ref-block";
×
240
    default:
×
241
      return "datablock";
×
242
  }
243
}
244

245
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
×
246
                                             int32_t* blockSize) {
247
  const char*   id = pTask->id.idStr;
×
248
  int32_t       taskLevel = pTask->info.taskLevel;
×
249
  SStreamQueue* pQueue = pTask->inputq.queue;
×
250

251
  *pInput = NULL;
×
252
  *numOfBlocks = 0;
×
253
  *blockSize = 0;
×
254

255
  // no available token in bucket for sink task, let's wait for a little bit
256
  if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
×
257
    stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
×
258
    return EXEC_AFTER_IDLE;
×
259
  }
260

261
  while (1) {
×
262
    ETaskStatus status = streamTaskGetStatus(pTask).state;
×
263
    if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
×
264
      stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
×
265
      return EXEC_CONTINUE;
×
266
    }
267

268
    SStreamQueueItem* qItem = NULL;
×
269
    if (taskLevel == TASK_LEVEL__SOURCE) {
×
270
      streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
×
271
    } else {
272
      streamQueueNextItem(pQueue, &qItem);
×
273
    }
274

275
    if (qItem == NULL) {
×
276
      // restore the token to bucket
277
      if (*numOfBlocks > 0) {
×
278
        *blockSize = streamQueueItemGetSize(*pInput);
×
279
        if (taskLevel == TASK_LEVEL__SINK) {
×
280
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
281
        }
282
      } else {
283
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
×
284
      }
285

286
      return EXEC_CONTINUE;
×
287
    }
288

289
    // do not merge blocks for sink node and check point data block
290
    int8_t type = qItem->type;
×
291
    if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
×
292
        type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK) {
×
293
      const char* p = streamQueueItemGetTypeStr(type);
×
294

295
      if (*pInput == NULL) {
×
296
        stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
×
297

298
        // restore the token to bucket in case of checkpoint/trans-state msg
299
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
×
300
        *blockSize = 0;
×
301
        *numOfBlocks = 1;
×
302
        *pInput = qItem;
×
303
        return EXEC_CONTINUE;
×
304
      } else {  // previous existed blocks needs to be handled, before handle the checkpoint msg block
305
        stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
×
306
        *blockSize = streamQueueItemGetSize(*pInput);
×
307
        if (taskLevel == TASK_LEVEL__SINK) {
×
308
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
309
        }
310

311
        if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
×
312
            (taskLevel == TASK_LEVEL__SOURCE)) {
313
          streamQueueGetSourceChkptFailed(pQueue);
×
314
        } else {
315
          streamQueueProcessFail(pQueue);
×
316
        }
317
        return EXEC_CONTINUE;
×
318
      }
319
    } else {
320
      if (*pInput == NULL) {
×
321
        *pInput = qItem;
×
322
      } else { // merge current block failed, let's handle the already merged blocks.
323
        void*   newRet = NULL;
×
324
        int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
×
325
        if (newRet == NULL) {
×
326
          if (code != -1) {
×
327
            stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
×
328
                    tstrerror(code));
329
          }
330

331
          *blockSize = streamQueueItemGetSize(*pInput);
×
332
          if (taskLevel == TASK_LEVEL__SINK) {
×
333
            streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
334
          }
335

336
          streamQueueProcessFail(pQueue);
×
337
          return EXEC_CONTINUE;
×
338
        }
339

340
        *pInput = newRet;
×
341
      }
342

343
      *numOfBlocks += 1;
×
344
      streamQueueProcessSuccess(pQueue);
×
345

346
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
×
347
        stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
×
348

349
        *blockSize = streamQueueItemGetSize(*pInput);
×
350
        if (taskLevel == TASK_LEVEL__SINK) {
×
351
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
352
        }
353

354
        return EXEC_CONTINUE;
×
355
      }
356
    }
357
  }
358
}
359

360
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
×
361
  int8_t      type = pItem->type;
×
362
  STaosQueue* pQueue = pTask->inputq.queue->pQueue;
×
363
  int32_t     level = pTask->info.taskLevel;
×
364
  int32_t     total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
×
365

366
  if (type == STREAM_INPUT__DATA_SUBMIT) {
×
367
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
×
368
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
×
369
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
370
      stTrace(
×
371
          "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
372
          pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
373
      streamDataSubmitDestroy(px);
×
374
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
375
    }
376

377
    int32_t msgLen = px->submit.msgLen;
×
378
    int64_t ver = px->submit.ver;
×
379

380
    int32_t code = taosWriteQitem(pQueue, pItem);
×
381
    if (code != TSDB_CODE_SUCCESS) {
×
382
      streamDataSubmitDestroy(px);
×
383
      return code;
×
384
    }
385

386
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
387

388
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
389
    stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
×
390
            msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
391
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
×
392
    if (streamQueueIsFull(pTask->inputq.queue)) {
×
393
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
394

395
      stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
×
396
              pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
397
      streamFreeQitem(pItem);
×
398
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
399
    }
400

401
    int32_t code = taosWriteQitem(pQueue, pItem);
×
402
    if (code != TSDB_CODE_SUCCESS) {
×
403
      streamFreeQitem(pItem);
×
404
      return code;
×
405
    }
406

407
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
408
    stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
×
409
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
×
410
             type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) {
×
411

412
    int32_t code = 0;
×
413
    if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
×
414
      STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
×
415
      code = taosWriteQitem(pChkptQ, pItem);
×
416

417
      double  size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
×
418
      int32_t num = taosQueueItemSize(pChkptQ);
×
419

420
      stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
×
421
              pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
422
    } else {
423
      code = taosWriteQitem(pQueue, pItem);
×
424
      if (code != TSDB_CODE_SUCCESS) {
×
425
        streamFreeQitem(pItem);
×
426
        return code;
×
427
      }
428

429
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
430
      stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
×
431
              pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
432
    }
433
  } else if (type == STREAM_INPUT__GET_RES) {
×
434
    // use the default memory limit, refactor later.
435
    int32_t code = taosWriteQitem(pQueue, pItem);
×
436
    if (code != TSDB_CODE_SUCCESS) {
×
437
      streamFreeQitem(pItem);
×
438
      return code;
×
439
    }
440

441
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
442
    stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
×
443
  } else {
444
    stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
×
445
    return TSDB_CODE_INVALID_PARA;
×
446
  }
447

448
  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
×
449
      (pTask->info.delaySchedParam != 0)) {
×
450
    (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
×
451
                                        TASK_TRIGGER_STATUS__MAY_ACTIVE);
452
    stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
×
453
            pTask->schedInfo.status);
454
  }
455

456
  return 0;
×
457
}
458

459
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
×
460
  int32_t           code = 0;
×
461
  SStreamDataBlock* pTranstate = NULL;
×
462
  SSDataBlock*      pBlock = NULL;
×
463

464
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate);
×
465
  if (code) {
×
466
    return code;
×
467
  }
468

469
  pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
×
470
  if (pBlock == NULL) {
×
471
    code = terrno;
×
472
    goto _err;
×
473
  }
474

475
  pTranstate->type = STREAM_INPUT__TRANS_STATE;
×
476

477
  pBlock->info.type = STREAM_TRANS_STATE;
×
478
  pBlock->info.rows = 1;
×
479
  pBlock->info.childId = pTask->info.selfChildId;
×
480

481
  pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
×
482
  if (pTranstate->blocks == NULL) {
×
483
    code = terrno;
×
484
    goto _err;
×
485
  }
486

487
  void* p = taosArrayPush(pTranstate->blocks, pBlock);
×
488
  if (p == NULL) {
×
489
    code = terrno;
×
490
    goto _err;
×
491
  }
492

493
  taosMemoryFree(pBlock);
×
494
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
×
495
    code = TSDB_CODE_OUT_OF_MEMORY;
×
496
    goto _err;
×
497
  }
498

499
  pTask->status.appendTranstateBlock = true;
×
500
  return TSDB_CODE_SUCCESS;
×
501

502
_err:
×
503
  taosMemoryFree(pBlock);
×
504
  taosFreeQitem(pTranstate);
×
505
  return code;
×
506
}
507

508
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
509
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
×
510
  STaosQueue* pQueue = pTask->outputq.queue->pQueue;
×
511
  int32_t     code = taosWriteQitem(pQueue, pBlock);
×
512

513
  int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
×
514
  double  size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
515
  if (code != 0) {
×
516
    stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
×
517
            pTask->id.idStr, total + 1, size, tstrerror(code));
518
  } else {
519
    if (streamQueueIsFull(pTask->outputq.queue)) {
×
520
      stWarn(
×
521
          "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms "
522
          "after handle this batch of blocks",
523
          pTask->id.idStr, total, size);
524
    } else {
525
      stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
×
526
    }
527
  }
528

529
  return code;
×
530
}
531

532
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
×
533
                                  const char* id) {
534
  if (numCap < 10 || numRate < 10 || pBucket == NULL) {
×
535
    stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
×
536
    return TSDB_CODE_INVALID_PARA;
×
537
  }
538

539
  pBucket->numCapacity = numCap;
×
540
  pBucket->numOfToken = numCap;
×
541
  pBucket->numRate = numRate;
×
542

543
  pBucket->quotaRate = quotaRate;
×
544
  pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
×
545
  pBucket->quotaRemain = pBucket->quotaCapacity;
×
546

547
  pBucket->tokenFillTimestamp = taosGetTimestampMs();
×
548
  pBucket->quotaFillTimestamp = taosGetTimestampMs();
×
549
  stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
×
550
  return TSDB_CODE_SUCCESS;
×
551
}
552

553
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
×
554
  int64_t now = taosGetTimestampMs();
×
555

556
  int64_t deltaToken = now - pBucket->tokenFillTimestamp;
×
557
  if (pBucket->numOfToken < 0) {
×
558
    return;
×
559
  }
560

561
  int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
×
562
  if (incNum > 0) {
×
563
    pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
×
564
    pBucket->tokenFillTimestamp = now;
×
565
  }
566

567
  // increase the new available quota as time goes on
568
  int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
×
569
  double  incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
×
570
  if (incSize > 0) {
×
571
    pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
×
572
    pBucket->quotaFillTimestamp = now;
×
573
  }
574

575
  if (incNum > 0 || incSize > 0) {
×
576
    stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
×
577
            ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
578
            pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
579
  }
580
}
581

582
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
×
583
  fillTokenBucket(pBucket, id);
×
584

585
  if (pBucket->numOfToken > 0) {
×
586
    if (pBucket->quotaRemain > 0) {
×
587
      pBucket->numOfToken -= 1;
×
588
      return true;
×
589
    } else {  // no available size quota now
590
      return false;
×
591
    }
592
  } else {
593
    return false;
×
594
  }
595
}
596

597
void streamTaskPutbackToken(STokenBucket* pBucket) {
×
598
  pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
×
599
}
×
600

601
// size in KB
602
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); }
×
603

604
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc