• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.12
/source/libs/stream/src/streamQueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
#define MAX_STREAM_EXEC_BATCH_NUM 32
19
#define MAX_SMOOTH_BURST_RATIO    5  // 5 sec
20

21
// todo refactor:
22
// read data from input queue
23
typedef struct SQueueReader {
24
  SStreamQueue* pQueue;
25
  int32_t       taskLevel;
26
  int32_t       maxBlocks;     // maximum block in one batch
27
  int32_t       waitDuration;  // maximum wait time to format several block into a batch to process, unit: ms
28
} SQueueReader;
29

30
#define streamQueueCurItem(_q) ((_q)->qItem)
31

32
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
33
static void streamTaskPutbackToken(STokenBucket* pBucket);
34
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
35

36
static void streamQueueCleanup(SStreamQueue* pQueue) {
10,795✔
37
  SStreamQueueItem* qItem = NULL;
10,795✔
38
  while (1) {
39
    streamQueueNextItem(pQueue, &qItem);
10,840✔
40
    if (qItem == NULL) {
10,862✔
41
      break;
10,817✔
42
    }
43
    streamFreeQitem(qItem);
45✔
44
  }
45
  pQueue->status = STREAM_QUEUE__SUCESS;
10,817✔
46
}
10,817✔
47

48
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
10,921✔
49
  *pQ = NULL;
10,921✔
50
  int32_t code = 0;
10,921✔
51

52
  SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
10,921✔
53
  if (pQueue == NULL) {
10,939!
54
    return terrno;
×
55
  }
56

57
  code = taosOpenQueue(&pQueue->pQueue);
10,939✔
58
  if (code) {
10,918!
59
    taosMemoryFreeClear(pQueue);
×
60
    return code;
×
61
  }
62

63
  code = taosAllocateQall(&pQueue->qall);
10,918✔
64
  if (code) {
10,945!
65
    taosCloseQueue(pQueue->pQueue);
×
66
    taosMemoryFree(pQueue);
×
67
    return code;
×
68
  }
69

70
  pQueue->status = STREAM_QUEUE__SUCESS;
10,945✔
71
  taosSetQueueCapacity(pQueue->pQueue, cap);
10,945✔
72
  taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
10,932✔
73

74
  *pQ = pQueue;
10,934✔
75
  return code;
10,934✔
76
}
77

78
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
10,795✔
79
  stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue,
10,795✔
80
          taosQueueItemSize(pQueue->pQueue));
81
  streamQueueCleanup(pQueue);
10,795✔
82

83
  taosFreeQall(pQueue->qall);
10,809✔
84
  taosCloseQueue(pQueue->pQueue);
10,818✔
85
  taosMemoryFree(pQueue);
10,814✔
86
}
10,820✔
87

88
void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
175,093✔
89
  *pItem = NULL;
175,093✔
90
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
175,093✔
91

92
  if (flag == STREAM_QUEUE__FAILED) {
175,383✔
93
    *pItem = streamQueueCurItem(pQueue);
1,838✔
94
  } else {
95
    pQueue->qItem = NULL;
173,545✔
96
    (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
173,545✔
97
    if (pQueue->qItem == NULL) {
173,430✔
98
      (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
90,777✔
99
      (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
90,829✔
100
    }
101

102
    *pItem = streamQueueCurItem(pQueue);
173,450✔
103
  }
104
}
175,288✔
105

106
void streamQueueProcessSuccess(SStreamQueue* queue) {
99,270✔
107
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
99,270!
108
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
109
    return;
×
110
  }
111

112
  queue->qItem = NULL;
99,265✔
113
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
99,265✔
114
}
115

116
void streamQueueProcessFail(SStreamQueue* queue) {
1,837✔
117
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
1,837!
118
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
119
    return;
×
120
  }
121
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
1,837✔
122
}
123

124
bool streamQueueIsFull(const SStreamQueue* pQueue) {
211,841✔
125
  int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
211,841✔
126
  if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
212,004!
127
    return true;
×
128
  }
129

130
  return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
212,004✔
131
}
132

133
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
415,908✔
134
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
415,908✔
135
  int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
416,605✔
136

137
  return numOfItems1 + numOfItems2;
416,492✔
138
}
139

140
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
14,190✔
141
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
14,190✔
142
  int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
14,193✔
143

144
  return numOfItems1 + numOfItems2;
14,191✔
145
}
146

147
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
26,178✔
148
  return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
26,178✔
149
}
150

151
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
99,203✔
152
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
99,203✔
153
  return p->dataSize;
99,203✔
154
}
155

156
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
77,484✔
157
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
77,484✔
158
  p->dataSize += size;
77,484✔
159
}
77,484✔
160

161
const char* streamQueueItemGetTypeStr(int32_t type) {
11,944✔
162
  switch (type) {
11,944✔
163
    case STREAM_INPUT__CHECKPOINT:
1,547✔
164
      return "checkpoint";
1,547✔
165
    case STREAM_INPUT__CHECKPOINT_TRIGGER:
4,689✔
166
      return "checkpoint-trigger";
4,689✔
167
    case STREAM_INPUT__TRANS_STATE:
3,325✔
168
      return "trans-state";
3,325✔
169
    default:
2,383✔
170
      return "datablock";
2,383✔
171
  }
172
}
173

174
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
54,372✔
175
                                             int32_t* blockSize) {
176
  const char* id = pTask->id.idStr;
54,372✔
177
  int32_t     taskLevel = pTask->info.taskLevel;
54,372✔
178

179
  *pInput = NULL;
54,372✔
180
  *numOfBlocks = 0;
54,372✔
181
  *blockSize = 0;
54,372✔
182

183
  // no available token in bucket for sink task, let's wait for a little bit
184
  if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
54,372!
185
    stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
×
186
    return EXEC_AFTER_IDLE;
×
187
  }
188

189
  while (1) {
97,716✔
190
    if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) {
152,090!
UNCOV
191
      stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks);
×
192
      return EXEC_CONTINUE;
54,425✔
193
    }
194

195
    SStreamQueueItem* qItem = NULL;
151,996✔
196
    streamQueueNextItem(pTask->inputq.queue, (SStreamQueueItem**)&qItem);
151,996✔
197
    if (qItem == NULL) {
152,074✔
198
      // restore the token to bucket
199
      if (*numOfBlocks > 0) {
42,330✔
200
        *blockSize = streamQueueItemGetSize(*pInput);
18,256✔
201
        if (taskLevel == TASK_LEVEL__SINK) {
18,231✔
202
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
4,252✔
203
        }
204
      } else {
205
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
24,074✔
206
      }
207

208
      return EXEC_CONTINUE;
42,292✔
209
    }
210

211
    // do not merge blocks for sink node and check point data block
212
    int8_t type = qItem->type;
109,744✔
213
    if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
109,744✔
214
        type == STREAM_INPUT__TRANS_STATE) {
215
      const char* p = streamQueueItemGetTypeStr(type);
9,197✔
216

217
      if (*pInput == NULL) {
9,221✔
218
        stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
8,636✔
219

220
        // restore the token to bucket in case of checkpoint/trans-state msg
221
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
8,636✔
222
        *blockSize = 0;
8,633✔
223
        *numOfBlocks = 1;
8,633✔
224
        *pInput = qItem;
8,633✔
225
        return EXEC_CONTINUE;
8,633✔
226
      } else {  // previous existed blocks needs to be handle, before handle the checkpoint msg block
227
        stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
585✔
228
        *blockSize = streamQueueItemGetSize(*pInput);
585✔
229
        if (taskLevel == TASK_LEVEL__SINK) {
585✔
230
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
191✔
231
        }
232

233
        streamQueueProcessFail(pTask->inputq.queue);
585✔
234
        return EXEC_CONTINUE;
585✔
235
      }
236
    } else {
237
      if (*pInput == NULL) {
100,547✔
238
        *pInput = qItem;
21,720✔
239
      } else { // merge current block failed, let's handle the already merged blocks.
240
        void*   newRet = NULL;
78,827✔
241
        int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
78,827✔
242
        if (newRet == NULL) {
78,836✔
243
          if (code != -1) {
1,254!
244
            stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
×
245
                    tstrerror(code));
246
          }
247

248
          *blockSize = streamQueueItemGetSize(*pInput);
1,254✔
249
          if (taskLevel == TASK_LEVEL__SINK) {
1,252!
250
            streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
251
          }
252

253
          streamQueueProcessFail(pTask->inputq.queue);
1,252✔
254
          return EXEC_CONTINUE;
1,253✔
255
        }
256

257
        *pInput = newRet;
77,582✔
258
      }
259

260
      *numOfBlocks += 1;
99,302✔
261
      streamQueueProcessSuccess(pTask->inputq.queue);
99,302✔
262

263
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
99,379✔
264
        stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
1,663!
265

266
        *blockSize = streamQueueItemGetSize(*pInput);
1,663✔
267
        if (taskLevel == TASK_LEVEL__SINK) {
1,662!
UNCOV
268
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
269
        }
270

271
        return EXEC_CONTINUE;
1,662✔
272
      }
273
    }
274
  }
275
}
276

277
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
107,942✔
278
  int8_t      type = pItem->type;
107,942✔
279
  STaosQueue* pQueue = pTask->inputq.queue->pQueue;
107,942✔
280
  int32_t     total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
107,942✔
281

282
  if (type == STREAM_INPUT__DATA_SUBMIT) {
108,008✔
283
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
91,421✔
284
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
91,421!
UNCOV
285
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
UNCOV
286
      stTrace(
×
287
          "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
288
          pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
UNCOV
289
      streamDataSubmitDestroy(px);
×
UNCOV
290
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
291
    }
292

293
    int32_t msgLen = px->submit.msgLen;
91,446✔
294
    int64_t ver = px->submit.ver;
91,446✔
295

296
    int32_t code = taosWriteQitem(pQueue, pItem);
91,446✔
297
    if (code != TSDB_CODE_SUCCESS) {
91,432!
298
      streamDataSubmitDestroy(px);
×
299
      return code;
×
300
    }
301

302
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
91,432✔
303

304
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
305
    stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
91,440✔
306
            msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
307
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
24,052✔
308
    if (streamQueueIsFull(pTask->inputq.queue)) {
7,488!
309
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
310

311
      stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
×
312
              pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
313
      streamFreeQitem(pItem);
×
314
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
315
    }
316

317
    int32_t code = taosWriteQitem(pQueue, pItem);
7,469✔
318
    if (code != TSDB_CODE_SUCCESS) {
7,465!
319
      streamFreeQitem(pItem);
×
320
      return code;
×
321
    }
322

323
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
7,465✔
324
    stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
7,467✔
325
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
9,099✔
326
             type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) {
9,332✔
327
    int32_t code = taosWriteQitem(pQueue, pItem);
8,859✔
328
    if (code != TSDB_CODE_SUCCESS) {
8,877!
329
      streamFreeQitem(pItem);
×
330
      return code;
×
331
    }
332

333
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
8,877✔
334
    stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
8,878✔
335
            pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
336
  } else if (type == STREAM_INPUT__GET_RES) {
240!
337
    // use the default memory limit, refactor later.
338
    int32_t code = taosWriteQitem(pQueue, pItem);
240✔
339
    if (code != TSDB_CODE_SUCCESS) {
240!
340
      streamFreeQitem(pItem);
×
341
      return code;
×
342
    }
343

344
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
240✔
345
    stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
240✔
346
  } else {
347
    stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
×
348
    return TSDB_CODE_INVALID_PARA;
×
349
  }
350

351
  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
108,010✔
352
      (pTask->info.delaySchedParam != 0)) {
101,755✔
353
    (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
1,670✔
354
                                        TASK_TRIGGER_STATUS__ACTIVE);
355
    stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
1,670✔
356
            pTask->schedInfo.status);
357
  }
358

359
  return 0;
107,996✔
360
}
361

362
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
688✔
363
  int32_t           code = 0;
688✔
364
  SStreamDataBlock* pTranstate = NULL;
688✔
365
  SSDataBlock*      pBlock = NULL;
688✔
366

367
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate);
688✔
368
  if (code) {
688!
369
    return code;
×
370
  }
371

372
  pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
688✔
373
  if (pBlock == NULL) {
688!
374
    code = terrno;
×
375
    goto _err;
×
376
  }
377

378
  pTranstate->type = STREAM_INPUT__TRANS_STATE;
688✔
379

380
  pBlock->info.type = STREAM_TRANS_STATE;
688✔
381
  pBlock->info.rows = 1;
688✔
382
  pBlock->info.childId = pTask->info.selfChildId;
688✔
383

384
  pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
688✔
385
  if (pTranstate->blocks == NULL) {
688!
386
    code = terrno;
×
387
    goto _err;
×
388
  }
389

390
  void* p = taosArrayPush(pTranstate->blocks, pBlock);
688✔
391
  if (p == NULL) {
688!
392
    code = terrno;
×
393
    goto _err;
×
394
  }
395

396
  taosMemoryFree(pBlock);
688✔
397
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
688!
398
    code = TSDB_CODE_OUT_OF_MEMORY;
×
399
    goto _err;
×
400
  }
401

402
  pTask->status.appendTranstateBlock = true;
688✔
403
  return TSDB_CODE_SUCCESS;
688✔
404

405
_err:
×
406
  taosMemoryFree(pBlock);
×
407
  taosFreeQitem(pTranstate);
×
408
  return code;
×
409
}
410

411
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
412
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
5,516✔
413
  STaosQueue* pQueue = pTask->outputq.queue->pQueue;
5,516✔
414
  int32_t     code = taosWriteQitem(pQueue, pBlock);
5,516✔
415

416
  int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
5,516✔
417
  double  size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
5,516✔
418
  if (code != 0) {
5,516!
419
    stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
×
420
            pTask->id.idStr, total + 1, size, tstrerror(code));
421
  } else {
422
    if (streamQueueIsFull(pTask->outputq.queue)) {
5,516!
423
      stWarn(
×
424
          "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms "
425
          "after handle this batch of blocks",
426
          pTask->id.idStr, total, size);
427
    } else {
428
      stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
5,516✔
429
    }
430
  }
431

432
  return code;
5,516✔
433
}
434

435
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
5,457✔
436
                                  const char* id) {
437
  if (numCap < 10 || numRate < 10 || pBucket == NULL) {
5,457!
438
    stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
×
439
    return TSDB_CODE_INVALID_PARA;
×
440
  }
441

442
  pBucket->numCapacity = numCap;
5,472✔
443
  pBucket->numOfToken = numCap;
5,472✔
444
  pBucket->numRate = numRate;
5,472✔
445

446
  pBucket->quotaRate = quotaRate;
5,472✔
447
  pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
5,472✔
448
  pBucket->quotaRemain = pBucket->quotaCapacity;
5,472✔
449

450
  pBucket->tokenFillTimestamp = taosGetTimestampMs();
5,472✔
451
  pBucket->quotaFillTimestamp = taosGetTimestampMs();
5,473✔
452
  stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
5,473✔
453
  return TSDB_CODE_SUCCESS;
5,472✔
454
}
455

456
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
16,556✔
457
  int64_t now = taosGetTimestampMs();
16,571✔
458

459
  int64_t deltaToken = now - pBucket->tokenFillTimestamp;
16,571✔
460
  if (pBucket->numOfToken < 0) {
16,571!
461
    return;
×
462
  }
463

464
  int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
16,571✔
465
  if (incNum > 0) {
16,571✔
466
    pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
7,970✔
467
    pBucket->tokenFillTimestamp = now;
7,970✔
468
  }
469

470
  // increase the new available quota as time goes on
471
  int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
16,571✔
472
  double  incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
16,571✔
473
  if (incSize > 0) {
16,571✔
474
    pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
9,502✔
475
    pBucket->quotaFillTimestamp = now;
9,502✔
476
  }
477

478
  if (incNum > 0 || incSize > 0) {
16,571✔
479
    stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
9,484✔
480
            ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
481
            pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
482
  }
483
}
484

485
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
16,564✔
486
  fillTokenBucket(pBucket, id);
16,564✔
487

488
  if (pBucket->numOfToken > 0) {
16,572!
489
    if (pBucket->quotaRemain > 0) {
16,575!
490
      pBucket->numOfToken -= 1;
16,575✔
491
      return true;
16,575✔
492
    } else {  // no available size quota now
493
      return false;
×
494
    }
495
  } else {
496
    return false;
×
497
  }
498
}
499

500
void streamTaskPutbackToken(STokenBucket* pBucket) {
32,686✔
501
  pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
32,686✔
502
}
32,686✔
503

504
// size in KB
505
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); }
4,443✔
506

507
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc