• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4380

25 Jun 2025 06:58AM UTC coverage: 62.307% (-0.09%) from 62.393%
#4380

push

travis-ci

web-flow
feat(mqtt): mqtt subscription (#30127)

* feat(mqtt): Initial commit for mqtt

* chore(xnode/mnd): xnode message handlers for mnode

* chore(mnd/xnode): mnode part for xnode

* chore(xnode/translater): fix show commands

* fix(ast/creater): fix xnode create option

* fix(xnode/ci): fix ci & doc's error codes

* chore(xnode/sql): make create/drop/show work properly

* fix(xnode/sql): commit new files

* fix(xnode/sql): commit cmake files

* fix: fix testing cases

* fix(xnode/tsc): fix tokens

* fix(ast/anode): fix anode update decl.

* fix(xnode/error): fix xnode error codes

* fix: xnode make/destroy

* chore: xnode with option & dnode id

* chore: use taosmqtt for xnode

* chore: new error code for xnode launching

* chore(xnode): new error code

* chore: header for _xnode_mgmt_mqtt

* chore: source for _xnode_mgmt_mqtt

* chore: remove test directory from cmake

* chore: remove taosmqtt for ci to compile

* chore: remove taosudf header from xnode

* chore: new window macro

* chore: remove xnode mgmt mqtt for windows compilation

* Revert "chore: remove xnode mgmt mqtt for windows compilation"

This reverts commit 197e1640c.

* chore: cleanup code

* chore: xnode mgmt comment windows part out

* chore: mgmt/mqtt, move uv head toppest

* xnode/mnode: create xnode once per dnode

* fix(xnode/systable/test): fix column count

* xnode/sdb: renumber sdb type for xnode to make start/stop order correct

* xnode/mqtt: new param mqttPort

* fix SXnode's struct type

* transfer dnode id to mqtt subscription

* tmqtt: remove uv_a linking

* tmqtt/tools: sources for tools

* tools: fix windows compilation

* tools/producer: fix windows sleep param

* tools/producer: fix uninited var rc

* make tools only for linux

* test/mnodes: wail 1 or 2 seconds for offline to be leader

* update topic producer tool for geometry data type testing

* format tool sql statements

* show xnodes' ep

* make shell auto complete xnodes

* use usleep... (continued)

156642 of 320746 branches covered (48.84%)

Branch coverage included in aggregate %.

61 of 1020 new or added lines in 21 files covered. (5.98%)

1736 existing lines in 172 files now uncovered.

242538 of 319922 relevant lines covered (75.81%)

6277604.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.84
/source/libs/stream/src/streamQueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
#define MAX_STREAM_EXEC_BATCH_NUM 32
19
#define MAX_SMOOTH_BURST_RATIO    5  // 5 sec
20

21
// todo refactor:
22
// read data from input queue
23
typedef struct SQueueReader {
24
  SStreamQueue* pQueue;
25
  int32_t       taskLevel;
26
  int32_t       maxBlocks;     // maximum block in one batch
27
  int32_t       waitDuration;  // maximum wait time to format several block into a batch to process, unit: ms
28
} SQueueReader;
29

30
#define streamQueueCurItem(_q) ((_q)->qItem)
31

32
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
33
static void streamTaskPutbackToken(STokenBucket* pBucket);
34
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
35
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
36

37
static void streamQueueCleanup(SStreamQueue* pQueue) {
13,542✔
38
  SStreamQueueItem* qItem = NULL;
13,542✔
39
  while (1) {
40
    streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, "");
13,872✔
41
    if (qItem == NULL) {
13,872✔
42
      break;
13,542✔
43
    }
44
    streamFreeQitem(qItem);
330✔
45
  }
46
  pQueue->status = STREAM_QUEUE__SUCESS;
13,542✔
47
}
13,542✔
48

49
int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) {
13,542✔
50
  *pQ = NULL;
13,542✔
51

52
  int32_t code = 0;
13,542✔
53
  int32_t lino = 0;
13,542✔
54

55
  SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
13,542!
56
  if (pQueue == NULL) {
13,542!
57
    return terrno;
×
58
  }
59

60
  code = taosOpenQueue(&pQueue->pQueue);
13,542✔
61
  TSDB_CHECK_CODE(code, lino, _error);
13,542!
62

63
  code = taosAllocateQall(&pQueue->qall);
13,542✔
64
  TSDB_CHECK_CODE(code, lino, _error);
13,542!
65

66
  code = taosOpenQueue(&pQueue->pChkptQueue);
13,542✔
67
  TSDB_CHECK_CODE(code, lino, _error);
13,542!
68

69
  pQueue->status = STREAM_QUEUE__SUCESS;
13,542✔
70

71
  taosSetQueueCapacity(pQueue->pQueue, cap);
13,542✔
72
  taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024);
13,542✔
73

74
  *pQ = pQueue;
13,542✔
75
  return code;
13,542✔
76

77
_error:
×
78
  streamQueueClose(pQueue, 0);
×
79
  stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code));
×
80
  return code;
×
81
}
82

83
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
13,541✔
84
  stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue,
13,541!
85
          taosQueueItemSize(pQueue->pQueue));
86
  streamQueueCleanup(pQueue);
13,542✔
87

88
  taosFreeQall(pQueue->qall);
13,542✔
89
  taosCloseQueue(pQueue->pQueue);
13,542✔
90
  pQueue->pQueue = NULL;
13,541✔
91

92
  taosCloseQueue(pQueue->pChkptQueue);
13,541✔
93
  pQueue->pChkptQueue = NULL;
13,542✔
94

95
  taosMemoryFree(pQueue);
13,542!
96
}
13,542✔
97

98
void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
40,970✔
99
  *pItem = NULL;
40,970✔
100
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
40,970✔
101

102
  if (flag == STREAM_QUEUE__FAILED) {
40,970✔
103
    *pItem = streamQueueCurItem(pQueue);
137✔
104
  } else {
105
    pQueue->qItem = NULL;
40,833✔
106
    (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
40,833✔
107

108
    if (pQueue->qItem == NULL) {
40,832✔
109
      (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
37,743✔
110
      (void) taosGetQitem(pQueue->qall, &pQueue->qItem);
37,744✔
111
    }
112

113
    *pItem = streamQueueCurItem(pQueue);
40,834✔
114
  }
115
}
40,971✔
116

117
void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) {
104,963✔
118
  *pItem = NULL;
104,963✔
119
  int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
104,963✔
120

121
  if (flag == STREAM_QUEUE__CHKPTFAILED) {
104,964!
122
    *pItem = pQueue->qChkptItem;
×
123
    return;
×
124
  }
125

126
  if (flag == STREAM_QUEUE__FAILED) {
104,964✔
127
    *pItem = pQueue->qItem;
1,352✔
128
    return;
1,352✔
129
  }
130

131
  pQueue->qChkptItem = NULL;
103,612✔
132
  taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
103,612✔
133
  if (pQueue->qChkptItem != NULL) {
103,612✔
134
    stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status);
1,730!
135
    *pItem = pQueue->qChkptItem;
1,730✔
136
    return;
1,730✔
137
  }
138

139
  // if in checkpoint status, not read data from ordinary input q.
140
  if (status == TASK_STATUS__CK) {
101,882✔
141
    stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status);
830!
142
    return;
830✔
143
  }
144

145
  // let's try the ordinary input q
146
  pQueue->qItem = NULL;
101,052✔
147
  int32_t num = taosGetQitem(pQueue->qall, &pQueue->qItem);
101,052✔
148
  TAOS_UNUSED(num);
149

150
  if (pQueue->qItem == NULL) {
101,052✔
151
    num = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
52,000✔
152
    num = taosGetQitem(pQueue->qall, &pQueue->qItem);
52,000✔
153
    TAOS_UNUSED(num);
154
  }
155

156
  *pItem = streamQueueCurItem(pQueue);
101,052✔
157
}
158

159
void streamQueueProcessSuccess(SStreamQueue* queue) {
67,182✔
160
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
67,182!
161
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
162
    return;
×
163
  }
164

165
  queue->qItem = NULL;
67,182✔
166
  queue->qChkptItem = NULL;
67,182✔
167
  atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
67,182✔
168
}
169

170
void streamQueueProcessFail(SStreamQueue* queue) {
1,489✔
171
  if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
1,489!
172
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
×
173
    return;
×
174
  }
175
  atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
1,489✔
176
}
177

178
void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) {
×
179
  if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) {
×
180
    stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING);
×
181
    return;
×
182
  }
183
  atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED);
×
184
}
185

186
bool streamQueueIsFull(const SStreamQueue* pQueue) {
199,756✔
187
  int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
199,756✔
188
  if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
199,753!
UNCOV
189
    return true;
×
190
  }
191

192
  return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
199,753✔
193
}
194

195
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
394,285✔
196
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
394,285✔
197
  int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
394,319✔
198

199
  return numOfItems1 + numOfItems2;
394,317✔
200
}
201

202
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue) {
15,599✔
203
  int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
15,599✔
204
  int32_t numOfItems2 = taosQallUnAccessedItemSize(pQueue->qall);
15,599✔
205

206
  return numOfItems1 + numOfItems2;
15,599✔
207
}
208

209
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
22,277✔
210
  return taosQueueMemorySize(pQueue->pQueue) + taosQallUnAccessedMemSize(pQueue->qall);
22,277✔
211
}
212

213
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
67,183✔
214
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
67,183✔
215
  return p->dataSize;
67,183✔
216
}
217

218
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
47,489✔
219
  STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode));
47,489✔
220
  p->dataSize += size;
47,489✔
221
}
47,489✔
222

223
const char* streamQueueItemGetTypeStr(int32_t type) {
34,962✔
224
  switch (type) {
34,962!
225
    case STREAM_INPUT__CHECKPOINT:
1,802✔
226
      return "checkpoint";
1,802✔
227
    case STREAM_INPUT__CHECKPOINT_TRIGGER:
7,623✔
228
      return "checkpoint-trigger";
7,623✔
229
    case STREAM_INPUT__TRANS_STATE:
7,140✔
230
      return "trans-state";
7,140✔
231
    case STREAM_INPUT__REF_DATA_BLOCK:
2,762✔
232
      return "ref-block";
2,762✔
233
    case STREAM_INPUT__RECALCULATE:
×
234
      return "recalculate";
×
235
    default:
15,635✔
236
      return "datablock";
15,635✔
237
  }
238
}
239

240
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
51,243✔
241
                                             int32_t* blockSize) {
242
  const char*   id = pTask->id.idStr;
51,243✔
243
  int32_t       taskLevel = pTask->info.taskLevel;
51,243✔
244
  SStreamQueue* pQueue = pTask->inputq.queue;
51,243✔
245

246
  *pInput = NULL;
51,243✔
247
  *numOfBlocks = 0;
51,243✔
248
  *blockSize = 0;
51,243✔
249

250
  // no available token in bucket for sink task, let's wait for a little bit
251
  if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
51,243!
252
    stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
×
253
    return EXEC_AFTER_IDLE;
×
254
  }
255

256
  while (1) {
66,371✔
257
    ETaskStatus status = streamTaskGetStatus(pTask).state;
117,614✔
258
    if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
117,614!
259
      stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
×
260
      return EXEC_CONTINUE;
51,244✔
261
    }
262

263
    SStreamQueueItem* qItem = NULL;
117,614✔
264
    if (taskLevel == TASK_LEVEL__SOURCE) {
117,614✔
265
      streamQueueNextItemInSourceQ(pQueue, &qItem, status, id);
91,092✔
266
    } else {
267
      streamQueueNextItem(pQueue, &qItem);
26,522✔
268
    }
269

270
    if (qItem == NULL) {
117,615✔
271
      // restore the token to bucket
272
      if (*numOfBlocks > 0) {
40,385✔
273
        *blockSize = streamQueueItemGetSize(*pInput);
17,393✔
274
        if (taskLevel == TASK_LEVEL__SINK) {
17,393✔
275
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
4,569✔
276
        }
277
      } else {
278
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
22,992✔
279
      }
280

281
      return EXEC_CONTINUE;
40,385✔
282
    }
283

284
    // do not merge blocks for sink node and check point data block
285
    int8_t type = qItem->type;
77,230✔
286
    if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
77,230✔
287
        type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK || type == STREAM_INPUT__RECALCULATE) {
69,195!
288
      const char* p = streamQueueItemGetTypeStr(type);
9,833✔
289

290
      if (*pInput == NULL) {
9,833✔
291
        stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
8,558!
292

293
        // restore the token to bucket in case of checkpoint/trans-state msg
294
        streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
8,558✔
295
        *blockSize = 0;
8,558✔
296
        *numOfBlocks = 1;
8,558✔
297
        *pInput = qItem;
8,558✔
298
        return EXEC_CONTINUE;
8,558✔
299
      } else {  // previous existed blocks needs to be handled, before handle the checkpoint msg block
300
        stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
1,275!
301
        *blockSize = streamQueueItemGetSize(*pInput);
1,275✔
302
        if (taskLevel == TASK_LEVEL__SINK) {
1,275✔
303
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
118✔
304
        }
305

306
        if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) &&
1,275!
307
            (taskLevel == TASK_LEVEL__SOURCE)) {
308
          streamQueueGetSourceChkptFailed(pQueue);
×
309
        } else {
310
          streamQueueProcessFail(pQueue);
1,275✔
311
        }
312
        return EXEC_CONTINUE;
1,275✔
313
      }
314
    } else {
315
      if (*pInput == NULL) {
67,397✔
316
        *pInput = qItem;
19,693✔
317
      } else { // merge current block failed, let's handle the already merged blocks.
318
        void*   newRet = NULL;
47,704✔
319
        int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
47,704✔
320
        if (newRet == NULL) {
47,703✔
321
          if (code != -1) {
214!
322
            stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
×
323
                    tstrerror(code));
324
          }
325

326
          *blockSize = streamQueueItemGetSize(*pInput);
214✔
327
          if (taskLevel == TASK_LEVEL__SINK) {
214!
328
            streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
×
329
          }
330

331
          streamQueueProcessFail(pQueue);
214✔
332
          return EXEC_CONTINUE;
214✔
333
        }
334

335
        *pInput = newRet;
47,489✔
336
      }
337

338
      *numOfBlocks += 1;
67,182✔
339
      streamQueueProcessSuccess(pQueue);
67,182✔
340

341
      if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
67,183✔
342
        stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
812!
343

344
        *blockSize = streamQueueItemGetSize(*pInput);
812✔
345
        if (taskLevel == TASK_LEVEL__SINK) {
812✔
346
          streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
4✔
347
        }
348

349
        return EXEC_CONTINUE;
812✔
350
      }
351
    }
352
  }
353
}
354

355
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
76,058✔
356
  int8_t      type = pItem->type;
76,058✔
357
  STaosQueue* pQueue = pTask->inputq.queue->pQueue;
76,058✔
358
  int32_t     level = pTask->info.taskLevel;
76,058✔
359
  int32_t     total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1;
76,058✔
360

361
  if (type == STREAM_INPUT__DATA_SUBMIT) {
76,058✔
362
    SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
54,637✔
363
    if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) {
54,637!
UNCOV
364
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
UNCOV
365
      stTrace(
×
366
          "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
367
          pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
UNCOV
368
      streamDataSubmitDestroy(px);
×
UNCOV
369
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
370
    }
371

372
    int32_t msgLen = px->submit.msgLen;
54,637✔
373
    int64_t ver = px->submit.ver;
54,637✔
374

375
    int32_t code = taosWriteQitem(pQueue, pItem);
54,637✔
376
    if (code != TSDB_CODE_SUCCESS) {
54,637!
377
      streamDataSubmitDestroy(px);
×
378
      return code;
×
379
    }
380

381
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
54,637✔
382

383
    // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
384
    stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
54,636!
385
            msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
386
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
29,569✔
387
    if (streamQueueIsFull(pTask->inputq.queue)) {
8,148!
388
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
×
389

390
      stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
×
391
              pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
392
      streamFreeQitem(pItem);
×
393
      return TSDB_CODE_STREAM_INPUTQ_FULL;
×
394
    }
395

396
    int32_t code = taosWriteQitem(pQueue, pItem);
8,148✔
397
    if (code != TSDB_CODE_SUCCESS) {
8,148!
398
      streamFreeQitem(pItem);
×
399
      return code;
×
400
    }
401

402
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
8,148✔
403
    stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
8,148!
404
  } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
13,273✔
405
             type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__RECALCULATE) {
13,691!
406

407
    int32_t code = 0;
8,040✔
408
    if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) {
9,770✔
409
      STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue;
1,730✔
410
      code = taosWriteQitem(pChkptQ, pItem);
1,730✔
411

412
      double  size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ));
1,730✔
413
      int32_t num = taosQueueItemSize(pChkptQ);
1,730✔
414

415
      stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d",
1,730!
416
              pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1));
417
    } else {
418
      code = taosWriteQitem(pQueue, pItem);
6,310✔
419
      if (code != TSDB_CODE_SUCCESS) {
6,310!
420
        streamFreeQitem(pItem);
×
421
        return code;
×
422
      }
423

424
      double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
6,310✔
425
      stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
6,310!
426
              pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
427
    }
428
  } else if (type == STREAM_INPUT__GET_RES) {
5,233!
429
    // use the default memory limit, refactor later.
430
    int32_t code = taosWriteQitem(pQueue, pItem);
5,233✔
431
    if (code != TSDB_CODE_SUCCESS) {
5,233!
432
      streamFreeQitem(pItem);
×
433
      return code;
×
434
    }
435

436
    double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
5,233✔
437
    stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
5,233!
438
  } else {
439
    stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
×
440
    return TSDB_CODE_INVALID_PARA;
×
441
  }
442

443
  if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
76,058!
444
      type != STREAM_INPUT__RECALCULATE && (pTask->info.delaySchedParam != 0)) {
66,563✔
445
    (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE,
6,990✔
446
                                        TASK_TRIGGER_STATUS__MAY_ACTIVE);
447
    stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr,
6,990!
448
            pTask->schedInfo.status);
449
  }
450

451
  return 0;
76,058✔
452
}
453

454
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
857✔
455
  int32_t           code = 0;
857✔
456
  SStreamDataBlock* pTranstate = NULL;
857✔
457
  SSDataBlock*      pBlock = NULL;
857✔
458

459
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate);
857✔
460
  if (code) {
857!
461
    return code;
×
462
  }
463

464
  pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
857!
465
  if (pBlock == NULL) {
857!
466
    code = terrno;
×
467
    goto _err;
×
468
  }
469

470
  pTranstate->type = STREAM_INPUT__TRANS_STATE;
857✔
471

472
  pBlock->info.type = STREAM_TRANS_STATE;
857✔
473
  pBlock->info.rows = 1;
857✔
474
  pBlock->info.childId = pTask->info.selfChildId;
857✔
475

476
  pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
857✔
477
  if (pTranstate->blocks == NULL) {
857!
478
    code = terrno;
×
479
    goto _err;
×
480
  }
481

482
  void* p = taosArrayPush(pTranstate->blocks, pBlock);
857✔
483
  if (p == NULL) {
857!
484
    code = terrno;
×
485
    goto _err;
×
486
  }
487

488
  taosMemoryFree(pBlock);
857!
489
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
857!
490
    code = TSDB_CODE_OUT_OF_MEMORY;
×
491
    goto _err;
×
492
  }
493

494
  pTask->status.appendTranstateBlock = true;
857✔
495
  return TSDB_CODE_SUCCESS;
857✔
496

497
_err:
×
498
  taosMemoryFree(pBlock);
×
499
  taosFreeQitem(pTranstate);
×
500
  return code;
×
501
}
502

503
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
504
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
6,503✔
505
  STaosQueue* pQueue = pTask->outputq.queue->pQueue;
6,503✔
506
  int32_t     code = taosWriteQitem(pQueue, pBlock);
6,503✔
507

508
  int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
6,503✔
509
  double  size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
6,503✔
510
  if (code != 0) {
6,503!
511
    stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
×
512
            pTask->id.idStr, total + 1, size, tstrerror(code));
513
  } else {
514
    if (streamQueueIsFull(pTask->outputq.queue)) {
6,503!
515
      stWarn(
×
516
          "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms "
517
          "after handle this batch of blocks",
518
          pTask->id.idStr, total, size);
519
    } else {
520
      stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
6,503!
521
    }
522
  }
523

524
  return code;
6,503✔
525
}
526

527
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
6,771✔
528
                                  const char* id) {
529
  if (numCap < 10 || numRate < 10 || pBucket == NULL) {
6,771!
530
    stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
×
531
    return TSDB_CODE_INVALID_PARA;
×
532
  }
533

534
  pBucket->numCapacity = numCap;
6,771✔
535
  pBucket->numOfToken = numCap;
6,771✔
536
  pBucket->numRate = numRate;
6,771✔
537

538
  pBucket->quotaRate = quotaRate;
6,771✔
539
  pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
6,771✔
540
  pBucket->quotaRemain = pBucket->quotaCapacity;
6,771✔
541

542
  pBucket->tokenFillTimestamp = taosGetTimestampMs();
6,771✔
543
  pBucket->quotaFillTimestamp = taosGetTimestampMs();
6,771✔
544
  stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
6,771!
545
  return TSDB_CODE_SUCCESS;
6,771✔
546
}
547

548
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
16,998✔
549
  int64_t now = taosGetTimestampMs();
16,998✔
550

551
  int64_t deltaToken = now - pBucket->tokenFillTimestamp;
16,998✔
552
  if (pBucket->numOfToken < 0) {
16,998!
553
    return;
×
554
  }
555

556
  int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
16,998✔
557
  if (incNum > 0) {
16,998✔
558
    pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
8,971✔
559
    pBucket->tokenFillTimestamp = now;
8,971✔
560
  }
561

562
  // increase the new available quota as time goes on
563
  int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
16,998✔
564
  double  incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
16,998✔
565
  if (incSize > 0) {
16,998✔
566
    pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
12,130✔
567
    pBucket->quotaFillTimestamp = now;
12,130✔
568
  }
569

570
  if (incNum > 0 || incSize > 0) {
16,998✔
571
    stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
12,130✔
572
            ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
573
            pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
574
  }
575
}
576

577
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
16,998✔
578
  fillTokenBucket(pBucket, id);
16,998✔
579

580
  if (pBucket->numOfToken > 0) {
16,998!
581
    if (pBucket->quotaRemain > 0) {
16,998!
582
      pBucket->numOfToken -= 1;
16,998✔
583
      return true;
16,998✔
584
    } else {  // no available size quota now
585
      return false;
×
586
    }
587
  } else {
588
    return false;
×
589
  }
590
}
591

592
void streamTaskPutbackToken(STokenBucket* pBucket) {
31,550✔
593
  pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity);
31,550✔
594
}
31,550✔
595

596
// size in KB
597
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); }
4,691✔
598

599
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc