• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/stream/src/streamQueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
#define MAX_STREAM_EXEC_BATCH_NUM 32
19
#define MAX_SMOOTH_BURST_RATIO    5  // 5 sec
20

21
// todo refactor:
22
// read data from input queue
23
typedef struct SQueueReader {
24
  SStreamQueue* pQueue;
25
  int32_t       taskLevel;
26
  int32_t       maxBlocks;     // maximum block in one batch
27
  int32_t       waitDuration;  // maximum wait time to format several block into a batch to process, unit: ms
28
} SQueueReader;
29

30
#define streamQueueCurItem(_q) ((_q)->qItem)
31

32
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
33
static void streamTaskPutbackToken(STokenBucket* pBucket);
34
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
35

UNCOV
36
static void streamQueueCleanup(SStreamQueue* pQueue) {
×
UNCOV
37
  SStreamQueueItem* qItem = NULL;
×
38
  while (1) {
UNCOV
39
    streamQueueNextItem(pQueue, &qItem);
×
UNCOV
40
    if (qItem == NULL) {
×
UNCOV
41
      break;
×
42
    }
UNCOV
43
    streamFreeQitem(qItem);
×
44
  }
UNCOV
45
  pQueue->status = STREAM_QUEUE__SUCESS;
×
UNCOV
46
}
×
47

UNCOV
48
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
×
UNCOV
49
  *pQ = NULL;
×
UNCOV
50
  int32_t code = 0;
×
51

UNCOV
52
  SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
×
UNCOV
53
  if (pQueue == NULL) {
×
54
    return terrno;
×
55
  }
56

UNCOV
57
  code = taosOpenQueue(&pQueue->pQueue);
×
UNCOV
58
  if (code) {
×
59
    taosMemoryFreeClear(pQueue);
×
60
    return code;
×
61
  }
62

UNCOV
63
  code = taosAllocateQall(&pQueue->qall);
×
UNCOV
64
  if (code) {
×
65
    taosCloseQueue(pQueue->pQueue);
×
66
    taosMemoryFree(pQueue);
×
67
    return code;
×
68
  }
69

UNCOV
70
  pQueue->status = STREAM_QUEUE__SUCESS;
×
UNCOV
71
  taosSetQueueCapacity(pQueue->pQueue, cap);
×
UNCOV
72
  taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
×
73

UNCOV
74
  *pQ = pQueue;
×
UNCOV
75
  return code;
×
76
}
77

UNCOV
78
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
×
UNCOV
79
  stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue,
×
80
          taosQueueItemSize(pQueue->pQueue));
UNCOV
81
  streamQueueCleanup(pQueue);
×
82

UNCOV
83
  taosFreeQall(pQueue->qall);
×
UNCOV
84
  taosCloseQueue(pQueue->pQueue);
×
UNCOV
85
  taosMemoryFree(pQueue);
×
UNCOV
86
}
×
87

UNCOV
88
void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
×
UNCOV
89
  *pItem = NULL;
×
UNCOV
90
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
×
91

UNCOV
92
  if (flag == STREAM_QUEUE__FAILED) {
×
UNCOV
93
    *pItem = streamQueueCurItem(pQueue);
×
94
  } else {
UNCOV
95
    pQueue->qItem = NULL;
×
UNCOV
96
    (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
×
UNCOV
97
    if (pQueue->qItem == NULL) {
×
UNCOV
98
      (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
×
UNCOV
99
      (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
×
100
    }
101

UNCOV
102
    *pItem = streamQueueCurItem(pQueue);
×
103
  }
UNCOV
104
}
×
105

UNCOV
106
void streamQueueProcessSuccess(SStreamQueue* queue) {
×
UNCOV
107
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
×
108
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
109
    return;
×
110
  }
111

UNCOV
112
  queue->qItem = NULL;
×
UNCOV
113
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
×
114
}
115

UNCOV
116
void streamQueueProcessFail(SStreamQueue* queue) {
×
UNCOV
117
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
×
118
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
119
    return;
×
120
  }
UNCOV
121
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
×
122
}
123

UNCOV
124
bool streamQueueIsFull(const SStreamQueue* pQueue) {
×
UNCOV
125
  int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
×
UNCOV
126
  if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
×
127
    return true;
×
128
  }
129

UNCOV
130
  return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
×
131
}
132

UNCOV
133
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
×
UNCOV
134
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
×
UNCOV
135
  int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
×
136

UNCOV
137
  return numOfItems1 + numOfItems2;
×
138
}
139

UNCOV
140
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
×
UNCOV
141
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
×
UNCOV
142
  int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
×
143

UNCOV
144
  return numOfItems1 + numOfItems2;
×
145
}
146

UNCOV
147
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
×
UNCOV
148
  return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
×
149
}
150

UNCOV
151
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
×
UNCOV
152
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
×
UNCOV
153
  return p->dataSize;
×
154
}
155

UNCOV
156
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
×
UNCOV
157
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
×
UNCOV
158
  p->dataSize += size;
×
UNCOV
159
}
×
160

UNCOV
161
const char* streamQueueItemGetTypeStr(int32_t type) {
×
UNCOV
162
  switch (type) {
×
UNCOV
163
    case STREAM_INPUT__CHECKPOINT:
×
UNCOV
164
      return "checkpoint";
×
UNCOV
165
    case STREAM_INPUT__CHECKPOINT_TRIGGER:
×
UNCOV
166
      return "checkpoint-trigger";
×
UNCOV
167
    case STREAM_INPUT__TRANS_STATE:
×
UNCOV
168
      return "trans-state";
×
UNCOV
169
    case STREAM_INPUT__REF_DATA_BLOCK:
×
UNCOV
170
      return "ref-block";
×
UNCOV
171
    default:
×
UNCOV
172
      return "datablock";
×
173
  }
174
}
175

UNCOV
176
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
×
177
                                             int32_t* blockSize) {
UNCOV
178
  const char* id = pTask->id.idStr;
×
UNCOV
179
  int32_t     taskLevel = pTask->info.taskLevel;
×
180

UNCOV
181
  *pInput = NULL;
×
UNCOV
182
  *numOfBlocks = 0;
×
UNCOV
183
  *blockSize = 0;
×
184

185
  // no available token in bucket for sink task, let's wait for a little bit
UNCOV
186
  if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
×
187
    stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
×
188
    return EXEC_AFTER_IDLE;
×
189
  }
190

UNCOV
191
  while (1) {
×
UNCOV
192
    if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) {
×
193
      stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks);
×
UNCOV
194
      return EXEC_CONTINUE;
×
195
    }
196

UNCOV
197
    SStreamQueueItem* qItem = NULL;
×
UNCOV
198
    streamQueueNextItem(pTask->inputq.queue, (SStreamQueueItem**)&qItem);
×
UNCOV
199
    if (qItem == NULL) {
×
200
      // restore the token to bucket
UNCOV
201
      if (*numOfBlocks > 0) {
×
UNCOV
202
        *blockSize = streamQueueItemGetSize(*pInput);
×
UNCOV
203
        if (taskLevel == TASK_LEVEL__SINK) {
×
UNCOV
204
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
205
        }
206
      } else {
UNCOV
207
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
×
208
      }
209

UNCOV
210
      return EXEC_CONTINUE;
×
211
    }
212

213
    // do not merge blocks for sink node and check point data block
UNCOV
214
    int8_t type = qItem->type;
×
UNCOV
215
    if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
×
UNCOV
216
        type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK) {
×
UNCOV
217
      const char* p = streamQueueItemGetTypeStr(type);
×
218

UNCOV
219
      if (*pInput == NULL) {
×
UNCOV
220
        stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
×
221

222
        // restore the token to bucket in case of checkpoint/trans-state msg
UNCOV
223
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
×
UNCOV
224
        *blockSize = 0;
×
UNCOV
225
        *numOfBlocks = 1;
×
UNCOV
226
        *pInput = qItem;
×
UNCOV
227
        return EXEC_CONTINUE;
×
228
      } else {  // previous existed blocks needs to be handle, before handle the checkpoint msg block
UNCOV
229
        stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
×
UNCOV
230
        *blockSize = streamQueueItemGetSize(*pInput);
×
UNCOV
231
        if (taskLevel == TASK_LEVEL__SINK) {
×
UNCOV
232
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
233
        }
234

UNCOV
235
        streamQueueProcessFail(pTask->inputq.queue);
×
UNCOV
236
        return EXEC_CONTINUE;
×
237
      }
238
    } else {
UNCOV
239
      if (*pInput == NULL) {
×
UNCOV
240
        *pInput = qItem;
×
241
      } else { // merge current block failed, let's handle the already merged blocks.
UNCOV
242
        void*   newRet = NULL;
×
UNCOV
243
        int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
×
UNCOV
244
        if (newRet == NULL) {
×
UNCOV
245
          if (code != -1) {
×
246
            stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
×
247
                    tstrerror(code));
248
          }
249

UNCOV
250
          *blockSize = streamQueueItemGetSize(*pInput);
×
UNCOV
251
          if (taskLevel == TASK_LEVEL__SINK) {
×
252
            streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
253
          }
254

UNCOV
255
          streamQueueProcessFail(pTask->inputq.queue);
×
UNCOV
256
          return EXEC_CONTINUE;
×
257
        }
258

UNCOV
259
        *pInput = newRet;
×
260
      }
261

UNCOV
262
      *numOfBlocks += 1;
×
UNCOV
263
      streamQueueProcessSuccess(pTask->inputq.queue);
×
264

UNCOV
265
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
×
UNCOV
266
        stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
×
267

UNCOV
268
        *blockSize = streamQueueItemGetSize(*pInput);
×
UNCOV
269
        if (taskLevel == TASK_LEVEL__SINK) {
×
UNCOV
270
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
271
        }
272

UNCOV
273
        return EXEC_CONTINUE;
×
274
      }
275
    }
276
  }
277
}
278

UNCOV
279
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
×
UNCOV
280
  int8_t      type = pItem->type;
×
UNCOV
281
  STaosQueue* pQueue = pTask->inputq.queue->pQueue;
×
UNCOV
282
  int32_t     total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
×
283

UNCOV
284
  if (type == STREAM_INPUT__DATA_SUBMIT) {
×
UNCOV
285
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
×
UNCOV
286
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
×
UNCOV
287
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
UNCOV
288
      stTrace(
×
289
          "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
290
          pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
UNCOV
291
      streamDataSubmitDestroy(px);
×
UNCOV
292
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
293
    }
294

UNCOV
295
    int32_t msgLen = px->submit.msgLen;
×
UNCOV
296
    int64_t ver = px->submit.ver;
×
297

UNCOV
298
    int32_t code = taosWriteQitem(pQueue, pItem);
×
UNCOV
299
    if (code != TSDB_CODE_SUCCESS) {
×
300
      streamDataSubmitDestroy(px);
×
301
      return code;
×
302
    }
303

UNCOV
304
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
305

306
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
UNCOV
307
    stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
×
308
            msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
UNCOV
309
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
×
UNCOV
310
    if (streamQueueIsFull(pTask->inputq.queue)) {
×
311
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
312

313
      stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
×
314
              pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
315
      streamFreeQitem(pItem);
×
316
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
317
    }
318

UNCOV
319
    int32_t code = taosWriteQitem(pQueue, pItem);
×
UNCOV
320
    if (code != TSDB_CODE_SUCCESS) {
×
321
      streamFreeQitem(pItem);
×
322
      return code;
×
323
    }
324

UNCOV
325
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
UNCOV
326
    stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
×
UNCOV
327
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
×
UNCOV
328
             type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) {
×
UNCOV
329
    int32_t code = taosWriteQitem(pQueue, pItem);
×
UNCOV
330
    if (code != TSDB_CODE_SUCCESS) {
×
331
      streamFreeQitem(pItem);
×
332
      return code;
×
333
    }
334

UNCOV
335
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
UNCOV
336
    stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
×
337
            pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
UNCOV
338
  } else if (type == STREAM_INPUT__GET_RES) {
×
339
    // use the default memory limit, refactor later.
UNCOV
340
    int32_t code = taosWriteQitem(pQueue, pItem);
×
UNCOV
341
    if (code != TSDB_CODE_SUCCESS) {
×
342
      streamFreeQitem(pItem);
×
343
      return code;
×
344
    }
345

UNCOV
346
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
UNCOV
347
    stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
×
348
  } else {
349
    stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
×
350
    return TSDB_CODE_INVALID_PARA;
×
351
  }
352

UNCOV
353
  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
×
UNCOV
354
      (pTask->info.delaySchedParam != 0)) {
×
UNCOV
355
    (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
×
356
                                        TASK_TRIGGER_STATUS__MAY_ACTIVE);
UNCOV
357
    stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
×
358
            pTask->schedInfo.status);
359
  }
360

UNCOV
361
  return 0;
×
362
}
363

UNCOV
364
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
×
UNCOV
365
  int32_t           code = 0;
×
UNCOV
366
  SStreamDataBlock* pTranstate = NULL;
×
UNCOV
367
  SSDataBlock*      pBlock = NULL;
×
368

UNCOV
369
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate);
×
UNCOV
370
  if (code) {
×
371
    return code;
×
372
  }
373

UNCOV
374
  pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
×
UNCOV
375
  if (pBlock == NULL) {
×
376
    code = terrno;
×
377
    goto _err;
×
378
  }
379

UNCOV
380
  pTranstate->type = STREAM_INPUT__TRANS_STATE;
×
381

UNCOV
382
  pBlock->info.type = STREAM_TRANS_STATE;
×
UNCOV
383
  pBlock->info.rows = 1;
×
UNCOV
384
  pBlock->info.childId = pTask->info.selfChildId;
×
385

UNCOV
386
  pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
×
UNCOV
387
  if (pTranstate->blocks == NULL) {
×
388
    code = terrno;
×
389
    goto _err;
×
390
  }
391

UNCOV
392
  void* p = taosArrayPush(pTranstate->blocks, pBlock);
×
UNCOV
393
  if (p == NULL) {
×
394
    code = terrno;
×
395
    goto _err;
×
396
  }
397

UNCOV
398
  taosMemoryFree(pBlock);
×
UNCOV
399
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
×
400
    code = TSDB_CODE_OUT_OF_MEMORY;
×
401
    goto _err;
×
402
  }
403

UNCOV
404
  pTask->status.appendTranstateBlock = true;
×
UNCOV
405
  return TSDB_CODE_SUCCESS;
×
406

407
_err:
×
408
  taosMemoryFree(pBlock);
×
409
  taosFreeQitem(pTranstate);
×
410
  return code;
×
411
}
412

413
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
UNCOV
414
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
×
UNCOV
415
  STaosQueue* pQueue = pTask->outputq.queue->pQueue;
×
UNCOV
416
  int32_t     code = taosWriteQitem(pQueue, pBlock);
×
417

UNCOV
418
  int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
×
UNCOV
419
  double  size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
UNCOV
420
  if (code != 0) {
×
421
    stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
×
422
            pTask->id.idStr, total + 1, size, tstrerror(code));
423
  } else {
UNCOV
424
    if (streamQueueIsFull(pTask->outputq.queue)) {
×
425
      stWarn(
×
426
          "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms "
427
          "after handle this batch of blocks",
428
          pTask->id.idStr, total, size);
429
    } else {
UNCOV
430
      stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
×
431
    }
432
  }
433

UNCOV
434
  return code;
×
435
}
436

UNCOV
437
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
×
438
                                  const char* id) {
UNCOV
439
  if (numCap < 10 || numRate < 10 || pBucket == NULL) {
×
440
    stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
×
441
    return TSDB_CODE_INVALID_PARA;
×
442
  }
443

UNCOV
444
  pBucket->numCapacity = numCap;
×
UNCOV
445
  pBucket->numOfToken = numCap;
×
UNCOV
446
  pBucket->numRate = numRate;
×
447

UNCOV
448
  pBucket->quotaRate = quotaRate;
×
UNCOV
449
  pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
×
UNCOV
450
  pBucket->quotaRemain = pBucket->quotaCapacity;
×
451

UNCOV
452
  pBucket->tokenFillTimestamp = taosGetTimestampMs();
×
UNCOV
453
  pBucket->quotaFillTimestamp = taosGetTimestampMs();
×
UNCOV
454
  stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
×
UNCOV
455
  return TSDB_CODE_SUCCESS;
×
456
}
457

UNCOV
458
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
×
UNCOV
459
  int64_t now = taosGetTimestampMs();
×
460

UNCOV
461
  int64_t deltaToken = now - pBucket->tokenFillTimestamp;
×
UNCOV
462
  if (pBucket->numOfToken < 0) {
×
463
    return;
×
464
  }
465

UNCOV
466
  int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
×
UNCOV
467
  if (incNum > 0) {
×
UNCOV
468
    pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
×
UNCOV
469
    pBucket->tokenFillTimestamp = now;
×
470
  }
471

472
  // increase the new available quota as time goes on
UNCOV
473
  int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
×
UNCOV
474
  double  incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
×
UNCOV
475
  if (incSize > 0) {
×
UNCOV
476
    pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
×
UNCOV
477
    pBucket->quotaFillTimestamp = now;
×
478
  }
479

UNCOV
480
  if (incNum > 0 || incSize > 0) {
×
UNCOV
481
    stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
×
482
            ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
483
            pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
484
  }
485
}
486

UNCOV
487
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
×
UNCOV
488
  fillTokenBucket(pBucket, id);
×
489

UNCOV
490
  if (pBucket->numOfToken > 0) {
×
UNCOV
491
    if (pBucket->quotaRemain > 0) {
×
UNCOV
492
      pBucket->numOfToken -= 1;
×
UNCOV
493
      return true;
×
494
    } else {  // no available size quota now
495
      return false;
×
496
    }
497
  } else {
498
    return false;
×
499
  }
500
}
501

UNCOV
502
void streamTaskPutbackToken(STokenBucket* pBucket) {
×
UNCOV
503
  pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
×
UNCOV
504
}
×
505

506
// size in KB
UNCOV
507
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); }
×
508

509
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc