• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.67
/source/dnode/vnode/src/tq/tqStreamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "vnd.h"
18

19
#define SCAN_WAL_IDLE_DURATION    250  // idle for 500ms to do next wal scan
20
#define SCAN_WAL_WAIT_COUNT       2
21

22
typedef struct SBuildScanWalMsgParam {
23
  int64_t metaId;
24
  SMsgCb  msgCb;
25
} SBuildScanWalMsgParam;
26

27
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks);
28
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
29
static bool    handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
30
static bool    taskReadyForDataFromWal(SStreamTask* pTask);
31
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
32

33
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
34
int32_t tqScanWal(STQ* pTq) {
288,772✔
35
  SStreamMeta* pMeta = pTq->pStreamMeta;
288,772✔
36
  int32_t      vgId = pMeta->vgId;
288,772✔
37
  int64_t      st = taosGetTimestampMs();
289,067✔
38
  int32_t      numOfTasks = 0;
289,067✔
39
  int64_t      el = 0;
289,067✔
40
  int32_t      code = 0;
289,067✔
41

42
  int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanSentinel, 0, 1);
289,067✔
43
  if (old == 0) {
289,055!
44
    tqDebug("vgId:%d try to scan wal to extract data", vgId);
289,059✔
45
  } else {
46
    tqDebug("vgId:%d already in wal scan, abort", vgId);
×
47
    return code;
×
48
  }
49

50
  // the scan wal interval less than 200, not scan, actually.
51
  if ((pMeta->scanInfo.lastScanTs > 0) && (st - pMeta->scanInfo.lastScanTs < 200)) {
289,059✔
52
    tqDebug("vgId:%d scan wal less than 200ms, do nothing", vgId);
231✔
53
    atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
231✔
54
    return code;
231✔
55
  }
56

57
  // check all tasks
58
  code = doScanWalForAllTasks(pMeta, &numOfTasks);
288,828✔
59

60
  pMeta->scanInfo.lastScanTs = taosGetTimestampMs();
288,687✔
61
  el = (pMeta->scanInfo.lastScanTs - st);
288,687✔
62

63
  if (code) {
288,687✔
64
    tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el,
1!
65
            tstrerror(code));
66
  } else {
67
    tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms", vgId, el);
288,686✔
68
  }
69

70
  atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
288,688✔
71
  return code;
288,758✔
72
}
73

74
static bool waitEnoughDuration(SStreamMeta* pMeta) {
4,520,689✔
75
  if ((++pMeta->scanInfo.tickCounter) >= SCAN_WAL_WAIT_COUNT) {
4,520,689✔
76
    pMeta->scanInfo.tickCounter = 0;
2,257,988✔
77
    return true;
2,257,988✔
78
  }
79

80
  return false;
2,262,701✔
81
}
82

83
static void doStartScanWal(void* param, void* tmrId) {
4,533,571✔
84
  int32_t                vgId = 0;
4,533,571✔
85
  int32_t                code = 0;
4,533,571✔
86
  int32_t                numOfTasks = 0;
4,533,571✔
87
  tmr_h                  pTimer = NULL;
4,533,571✔
88
  SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
4,533,571✔
89

90
  tqDebug("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId);
4,533,571✔
91

92
  SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
4,533,571✔
93
  if (pMeta == NULL) {
4,533,571!
94
    tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
×
95
    taosMemoryFree(pParam);
×
96
    return;
2,272,691✔
97
  }
98

99
  vgId = pMeta->vgId;
4,533,571✔
100
  code = streamTimerGetInstance(&pTimer);
4,533,571✔
101
  if (code) {
4,533,571!
102
    tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code));
×
103
    taosMemoryFree(pParam);
×
104
    return;
×
105
  }
106

107
  if (pMeta->closeFlag) {
4,533,571✔
108
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
9,967✔
109
    if (code == TSDB_CODE_SUCCESS) {
9,967!
110
      tqInfo("vgId:%d jump out of scan wal timer since closed", vgId);
9,967!
111
    } else {
112
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
113
              tstrerror(code));
114
    }
115

116
    taosMemoryFree(pParam);
9,967!
117
    return;
9,967✔
118
  }
119

120
  if (pMeta->role != NODE_ROLE_LEADER) {
4,523,604✔
121
    tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role);
23!
122

123
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
23✔
124
    if (code == TSDB_CODE_SUCCESS) {
23!
125
      tqDebug("vgId:%d jump out of scan wal timer since not leader", vgId);
23!
126
    } else {
127
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
128
              tstrerror(code));
129
    }
130

131
    taosMemFree(pParam);
23✔
132
    return;
23✔
133
  }
134

135
  if (pMeta->startInfo.startAllTasks) {
4,523,581✔
136
    tqDebug("vgId:%d in restart procedure, not ready to scan wal", vgId);
2,892✔
137
    goto _end;
2,892✔
138
  }
139

140
  if (!waitEnoughDuration(pMeta)) {
4,520,689✔
141
    streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
2,262,701✔
142
                   "scan-wal");
143
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
2,262,701✔
144
    if (code) {
2,262,701!
145
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
146
              tstrerror(code));
147
    }
148
    return;
2,262,701✔
149
  }
150

151
  // failed to lock, try 500ms later
152
  code = streamMetaTryRlock(pMeta);
2,257,988✔
153
  if (code == 0) {
2,257,988✔
154
    numOfTasks = taosArrayGetSize(pMeta->pTaskList);
2,257,686✔
155
    streamMetaRUnLock(pMeta);
2,257,686✔
156
  } else {
157
    numOfTasks = 0;
302✔
158
  }
159

160
  if (numOfTasks > 0) {
2,257,988✔
161
    tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
289,102✔
162

163
#if 0
164
  //  wait for the vnode is freed, and invalid read may occur.
165
  taosMsleep(10000);
166
#endif
167

168
    code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA, false);
289,102✔
169
    if (code) {
289,102✔
170
      tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
3!
171
    }
172
  }
173

174
_end:
2,257,985✔
175
  streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
2,260,880✔
176
  tqDebug("vgId:%d try scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
2,260,880✔
177

178
  code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
2,260,880✔
179
  if (code) {
2,260,880!
180
    tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
181
            tstrerror(code));
182
  }
183
}
184

185
void tqScanWalAsync(STQ* pTq) {
9,985✔
186
  SStreamMeta*           pMeta = pTq->pStreamMeta;
9,985✔
187
  int32_t                code = 0;
9,985✔
188
  int32_t                vgId = TD_VID(pTq->pVnode);
9,985✔
189
  tmr_h                  pTimer = NULL;
9,985✔
190
  SBuildScanWalMsgParam* pParam = NULL;
9,985✔
191

192
  // 1. the vnode should be the leader.
193
  // 2. the stream isn't disabled
194
  if ((pMeta->role != NODE_ROLE_LEADER) || tsDisableStream) {
9,985!
195
    tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId);
×
196
    return;
×
197
  }
198

199
  pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
9,990!
200
  if (pParam == NULL) {
9,990!
201
    tqError("vgId:%d failed to start scan wal, stream not executes, code:%s", vgId, tstrerror(code));
×
202
    return;
×
203
  }
204

205
  pParam->metaId = pMeta->rid;
9,990✔
206
  pParam->msgCb = pTq->pVnode->msgCb;
9,990✔
207

208
  code = streamTimerGetInstance(&pTimer);
9,990✔
209
  if (code) {
9,988!
210
    tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
×
211
    taosMemoryFree(pParam);
×
212
  } else {
213
    streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
9,988✔
214
                   "scan-wal");
215
  }
216
}
217

218
int32_t tqStopStreamAllTasksAsync(SStreamMeta* pMeta, SMsgCb* pMsgCb) {
4,594✔
219
  return streamTaskSchedTask(pMsgCb, pMeta->vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS, false);
4,594✔
220
}
221

222
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
321,013✔
223
  // seek the stored version and extract data from WAL
224
  int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
321,013✔
225
  if (pTask->chkInfo.nextProcessVer < firstVer) {
320,962✔
226
    tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64,
6!
227
           vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer);
228

229
    pTask->chkInfo.nextProcessVer = firstVer;
6✔
230

231
    // todo need retry if failed
232
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
6✔
233
    if (code != TSDB_CODE_SUCCESS) {
6!
234
      return code;
×
235
    }
236

237
    // append the data for the stream
238
    tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer);
6✔
239
  } else {
240
    int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
320,956✔
241
    if (currentVer == -1) {  // we only seek the read for the first time
320,318✔
242
      int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
5,148✔
243
      if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
5,136!
244
        return code;
×
245
      }
246

247
      // append the data for the stream
248
      tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
5,136✔
249
              pTask->chkInfo.nextProcessVer);
250
    }
251
  }
252

253
  int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
320,312✔
254
  if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) {
320,360✔
255
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
452✔
256
    if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
452!
257
      return code;
×
258
    }
259

260
    tqDebug("vgId:%d s-task:%s wal reader jump to ver:%" PRId64, vgId, pTask->id.idStr, skipToVer);
452✔
261
  }
262

263
  return TSDB_CODE_SUCCESS;
320,084✔
264
}
265

266
// todo handle memory error
267
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
792,075✔
268
  const char* id = pTask->id.idStr;
792,075✔
269
  int64_t     maxVer = pTask->step2Range.maxVer;
792,075✔
270

271
  if ((pTask->info.fillHistory == 1) && ver > maxVer) {
792,075✔
272
    if (!pTask->status.appendTranstateBlock) {
1,007!
273
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
1,007!
274
            ", not scan wal anymore, add transfer-state block into inputQ",
275
            id, ver, maxVer);
276

277
      double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
1,007✔
278
      qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
1,007✔
279
             id, pTask->step2Range.minVer, maxVer, el);
280
      int32_t code = streamTaskPutTranstateIntoInputQ(pTask);
1,007✔
281
      if (code) {
1,007!
282
        qError("s-task:%s failed to put trans-state into inputQ", id);
×
283
      }
284

285
      return true;
1,007✔
286
    } else {
287
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
×
288
            ", not scan wal",
289
            id, ver, pTask->step2Range.minVer, maxVer);
290
    }
291
  }
292

293
  return false;
791,607✔
294
}
295

296
bool taskReadyForDataFromWal(SStreamTask* pTask) {
724,140✔
297
  // non-source or fill-history tasks don't need to response the WAL scan action.
298
  SSTaskBasicInfo* pInfo = &pTask->info;
724,140✔
299
  if ((pInfo->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
724,140✔
300
    return false;
338,899✔
301
  }
302

303
  if (pInfo->trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
385,241✔
304
    return false;
30,856✔
305
  }
306

307
  if (pInfo->fillHistory == STREAM_RECALCUL_TASK) {
354,385✔
308
    return false;
973✔
309
  }
310

311
  // not in ready state, do not handle the data from wal
312
  SStreamTaskState pState = streamTaskGetStatus(pTask);
353,412✔
313
  if (pState.state != TASK_STATUS__READY) {
353,366✔
314
    tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState.name);
26,589✔
315
    return false;
26,588✔
316
  }
317

318
  // fill-history task has entered into the last phase, no need to do anything
319
  if ((pInfo->fillHistory == STREAM_HISTORY_TASK) && pTask->status.appendTranstateBlock) {
326,777✔
320
    // the maximum version of data in the WAL has reached already, the step2 is done
321
    tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
5,623✔
322
            pTask->dataRange.range.maxVer);
323
    return false;
5,623✔
324
  }
325

326
  // check whether input queue is full or not
327
  if (streamQueueIsFull(pTask->inputq.queue)) {
321,154✔
328
    tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr);
110!
329
    int32_t code = streamTrySchedExec(pTask, false);
110✔
330
    if (code) {
11!
UNCOV
331
      tqError("s-task:%s failed to start task while inputQ is full", pTask->id.idStr);
×
332
    }
333
    return false;
11✔
334
  }
335

336
  // the input queue of downstream task is full, so the output is blocked, stopped for a while
337
  if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
321,097!
UNCOV
338
    tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
×
UNCOV
339
    return false;
×
340
  }
341

342
  return true;
321,097✔
343
}
344

345
int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) {
320,721✔
346
  const char* id = pTask->id.idStr;
320,721✔
347
  int32_t     numOfNewItems = 0;
320,721✔
348
  int32_t     code = 0;
320,721✔
349
  *pSucc = false;
320,721✔
350

351
  while (1) {
472,581✔
352
    if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
793,302!
UNCOV
353
      *numOfItems += numOfNewItems;
×
UNCOV
354
      return numOfNewItems > 0;
×
355
    }
356

357
    SStreamQueueItem* pItem = NULL;
793,302✔
358
    code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
793,302✔
359
    if (code != TSDB_CODE_SUCCESS || pItem == NULL) {  // failed, continue
792,061✔
360
      int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
319,129✔
361
      bool    itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
319,173✔
362
      if (itemInFillhistory) {
319,753✔
363
        numOfNewItems += 1;
706✔
364
      }
365
      break;
319,753✔
366
    }
367

368
    if (pItem != NULL) {
472,932✔
369
      code = streamTaskPutDataIntoInputQ(pTask, pItem);
472,930✔
370
      if (code == TSDB_CODE_SUCCESS) {
472,940✔
371
        numOfNewItems += 1;
472,894✔
372
        int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
472,894✔
373
        pTask->chkInfo.nextProcessVer = ver;
472,892✔
374
        tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver);
472,892✔
375

376
        bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
472,892✔
377
        if (itemInFillhistory) {
472,880✔
378
          break;
301✔
379
        }
380
      } else {
381
        if (code == TSDB_CODE_OUT_OF_MEMORY) {
46!
UNCOV
382
          tqError("s-task:%s failed to put data into inputQ, since out of memory", id);
×
383
        } else {
384
          tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id,
46!
385
                  pTask->chkInfo.nextProcessVer);
386
          code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
46✔
387
          if (code) {
46!
UNCOV
388
            tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
×
389
          }
390

391
          code = 0;  // reset the error code
46✔
392
        }
393

394
        break;
46✔
395
      }
396
    }
397
  }
398

399
  *numOfItems += numOfNewItems;
320,100✔
400
  *pSucc = (numOfNewItems > 0);
320,100✔
401
  return code;
320,100✔
402
}
403

404
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) {
288,725✔
405
  int32_t vgId = pStreamMeta->vgId;
288,725✔
406
  SArray* pTaskList = NULL;
288,725✔
407
  int32_t numOfTasks = 0;
288,725✔
408

409
  // clone the task list, to avoid the task update during scan wal files
410
  streamMetaWLock(pStreamMeta);
288,725✔
411
  pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
288,673✔
412
  streamMetaWUnLock(pStreamMeta);
288,775✔
413
  if (pTaskList == NULL) {
288,553!
UNCOV
414
    tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno));
×
UNCOV
415
    return terrno;
×
416
  }
417

418
  // update the new task number
419
  numOfTasks = taosArrayGetSize(pTaskList);
288,553✔
420
  if (pNumOfTasks != NULL) {
288,678!
421
    *pNumOfTasks = numOfTasks;
288,697✔
422
  }
423

424
  tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
288,678✔
425

426
  for (int32_t i = 0; i < numOfTasks; ++i) {
1,013,748✔
427
    STaskId* pTaskId = taosArrayGet(pTaskList, i);
724,927✔
428
    if (pTaskId == NULL) {
724,535!
429
      continue;
403,987✔
430
    }
431

432
    SStreamTask* pTask = NULL;
724,535✔
433
    int32_t      code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
724,535✔
434
    if (pTask == NULL || code != 0) {
725,136!
435
      continue;
672✔
436
    }
437

438
    if (!taskReadyForDataFromWal(pTask)) {
724,464✔
439
      streamMetaReleaseTask(pStreamMeta, pTask);
403,091✔
440
      continue;
403,310✔
441
    }
442

443
    // seek the stored version and extract data from WAL
444
    code = setWalReaderStartOffset(pTask, vgId);
321,037✔
445
    if (code != TSDB_CODE_SUCCESS) {
320,094!
UNCOV
446
      streamMetaReleaseTask(pStreamMeta, pTask);
×
UNCOV
447
      continue;
×
448
    }
449

450
    int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
320,094✔
451
    int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
320,731✔
452

453
    streamMutexLock(&pTask->lock);
320,731✔
454

455
    SStreamTaskState state = streamTaskGetStatus(pTask);
321,071✔
456
    if (state.state != TASK_STATUS__READY) {
320,903✔
457
      tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, state.name);
1!
458
      streamMutexUnlock(&pTask->lock);
1✔
459
      streamMetaReleaseTask(pStreamMeta, pTask);
1✔
460
      continue;
5✔
461
    }
462

463
    bool hasNewData = false;
320,902✔
464
    code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData);
320,902✔
465
    streamMutexUnlock(&pTask->lock);
319,531✔
466

467
    if ((numOfItems > 0) || hasNewData) {
320,977!
468
      code = streamTrySchedExec(pTask, false);
19,082✔
469
      if (code != TSDB_CODE_SUCCESS) {
19,099✔
470
        streamMetaReleaseTask(pStreamMeta, pTask);
1✔
471
        taosArrayDestroy(pTaskList);
1✔
472
        return code;
1✔
473
      }
474
    }
475

476
    streamMetaReleaseTask(pStreamMeta, pTask);
320,993✔
477
  }
478

479
  taosArrayDestroy(pTaskList);
288,821✔
480
  return TSDB_CODE_SUCCESS;
288,672✔
481
}
482

483
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
×
484
  SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
×
UNCOV
485
  p->metaId = pTq->pStreamMeta->rid;
×
486

UNCOV
487
  doStartScanWal(p, 0);
×
UNCOV
488
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc