• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4192

30 May 2025 03:55AM UTC coverage: 63.023% (-0.2%) from 63.267%
#4192

push

travis-ci

web-flow
fix:defined col bind in interlace mode (#31246)

157832 of 318864 branches covered (49.5%)

Branch coverage included in aggregate %.

1 of 3 new or added lines in 1 file covered. (33.33%)

2934 existing lines in 172 files now uncovered.

243367 of 317732 relevant lines covered (76.6%)

17346426.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.01
/source/dnode/vnode/src/tq/tqStreamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "vnd.h"
18

19
#define SCAN_WAL_IDLE_DURATION    250  // idle for 500ms to do next wal scan
20
#define SCAN_WAL_WAIT_COUNT       2
21

22
typedef struct SBuildScanWalMsgParam {
23
  int64_t metaId;
24
  SMsgCb  msgCb;
25
} SBuildScanWalMsgParam;
26

27
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks);
28
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
29
static bool    handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
30
static bool    taskReadyForDataFromWal(SStreamTask* pTask);
31
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
32

33
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
34
int32_t tqScanWal(STQ* pTq) {
202,755✔
35
  SStreamMeta* pMeta = pTq->pStreamMeta;
202,755✔
36
  int32_t      vgId = pMeta->vgId;
202,755✔
37
  int64_t      st = taosGetTimestampMs();
203,034✔
38
  int32_t      numOfTasks = 0;
203,034✔
39
  int64_t      el = 0;
203,034✔
40
  int32_t      code = 0;
203,034✔
41

42
  int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanSentinel, 0, 1);
203,034✔
43
  if (old == 0) {
203,028!
44
    tqDebug("vgId:%d try to scan wal to extract data", vgId);
203,064✔
45
  } else {
46
    tqDebug("vgId:%d already in wal scan, abort", vgId);
×
47
    return code;
×
48
  }
49

50
  // the scan wal interval less than 200, not scan, actually.
51
  if ((pMeta->scanInfo.lastScanTs > 0) && (st - pMeta->scanInfo.lastScanTs < 200)) {
203,064✔
52
    tqDebug("vgId:%d scan wal less than 200ms, do nothing", vgId);
11✔
53
    atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
11✔
54
    return code;
11✔
55
  }
56

57
  // check all tasks
58
  code = doScanWalForAllTasks(pMeta, &numOfTasks);
203,053✔
59

60
  pMeta->scanInfo.lastScanTs = taosGetTimestampMs();
202,697✔
61
  el = (pMeta->scanInfo.lastScanTs - st);
202,697✔
62

63
  if (code) {
202,697!
64
    tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el,
×
65
            tstrerror(code));
66
  } else {
67
    tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms", vgId, el);
202,697✔
68
  }
69

70
  atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
202,698✔
71
  return code;
203,113✔
72
}
73

74
static bool waitEnoughDuration(SStreamMeta* pMeta) {
2,729,379✔
75
  if ((++pMeta->scanInfo.tickCounter) >= SCAN_WAL_WAIT_COUNT) {
2,729,379✔
76
    pMeta->scanInfo.tickCounter = 0;
1,361,722✔
77
    return true;
1,361,722✔
78
  }
79

80
  return false;
1,367,657✔
81
}
82

83
static void doStartScanWal(void* param, void* tmrId) {
2,754,258✔
84
  int32_t                vgId = 0;
2,754,258✔
85
  int32_t                code = 0;
2,754,258✔
86
  int32_t                numOfTasks = 0;
2,754,258✔
87
  tmr_h                  pTimer = NULL;
2,754,258✔
88
  int32_t                numOfItems = 0;
2,754,258✔
89
  STQ*                   pTq = NULL;
2,754,258✔
90
  SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
2,754,258✔
91

92
  tqTrace("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId);
2,754,258✔
93

94
  SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
2,754,258✔
95
  if (pMeta == NULL) {
2,754,258!
96
    tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
×
97
    taosMemoryFree(pParam);
×
98
    return;
1,379,756✔
99
  }
100

101
  pTq = pMeta->ahandle;
2,754,258✔
102
  vgId = pMeta->vgId;
2,754,258✔
103
  code = streamTimerGetInstance(&pTimer);
2,754,258✔
104
  if (code) {
2,754,258!
105
    tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code));
×
106
    taosMemoryFree(pParam);
×
107
    return;
×
108
  }
109

110
  if (pMeta->closeFlag) {
2,754,258✔
111
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
12,058✔
112
    if (code == TSDB_CODE_SUCCESS) {
12,058!
113
      tqInfo("vgId:%d jump out of scan wal timer since closed", vgId);
12,058!
114
    } else {
115
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
116
              tstrerror(code));
117
    }
118

119
    taosMemoryFree(pParam);
12,058!
120
    return;
12,058✔
121
  }
122

123
  if (pMeta->role != NODE_ROLE_LEADER) {
2,742,200✔
124
    tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role);
41!
125

126
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
41✔
127
    if (code == TSDB_CODE_SUCCESS) {
41!
128
      tqDebug("vgId:%d jump out of scan wal timer since not leader", vgId);
41!
129
    } else {
130
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
131
              tstrerror(code));
132
    }
133

134
    taosMemFree(pParam);
41✔
135
    return;
41✔
136
  }
137

138
  if (pMeta->startInfo.startAllTasks) {
2,742,159✔
139
    tqDebug("vgId:%d in restart procedure, not ready to scan wal", vgId);
12,780✔
140
    goto _end;
12,780✔
141
  }
142

143
  numOfItems = tmsgGetQueueSize(&pTq->pVnode->msgCb, pMeta->vgId, STREAM_QUEUE);
2,729,379✔
144
  bool tooMany = (numOfItems > tsThresholdItemsInStreamQueue);
2,729,379✔
145

146
  if (!waitEnoughDuration(pMeta) || tooMany) {
2,729,379!
147
    if (tooMany) {
1,367,657!
148
      tqDebug("vgId:%d %d items (threshold: %d) in stream_queue, not scan wal now", vgId, numOfItems,
×
149
              tsThresholdItemsInStreamQueue);
150
    }
151

152
    streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
1,367,657✔
153
                   "scan-wal");
154
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
1,367,657✔
155
    if (code) {
1,367,657!
156
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
157
              tstrerror(code));
158
    }
159
    return;
1,367,657✔
160
  }
161

162
  // failed to lock, try 500ms later
163
  code = streamMetaTryRlock(pMeta);
1,361,722✔
164
  if (code == 0) {
1,361,722✔
165
    numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1,361,352✔
166
    streamMetaRUnLock(pMeta);
1,361,352✔
167
  } else {
168
    numOfTasks = 0;
370✔
169
  }
170

171
  if (numOfTasks > 0) {
1,361,722✔
172
    tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
203,138✔
173

174
#if 0
175
  //  wait for the vnode is freed, and invalid read may occur.
176
  taosMsleep(10000);
177
#endif
178

179
    code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA, false);
203,138✔
180
    if (code) {
203,138✔
181
      tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
3!
182
    }
183
  }
184

185
_end:
1,361,719✔
186
  streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
1,374,502✔
187
  tqTrace("vgId:%d try scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
1,374,502✔
188

189
  code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
1,374,502✔
190
  if (code) {
1,374,502!
191
    tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
192
            tstrerror(code));
193
  }
194
}
195

196
void tqScanWalAsync(STQ* pTq) {
12,095✔
197
  SStreamMeta*           pMeta = pTq->pStreamMeta;
12,095✔
198
  int32_t                code = 0;
12,095✔
199
  int32_t                vgId = TD_VID(pTq->pVnode);
12,095✔
200
  tmr_h                  pTimer = NULL;
12,095✔
201
  SBuildScanWalMsgParam* pParam = NULL;
12,095✔
202

203
  // 1. the vnode should be the leader.
204
  // 2. the stream isn't disabled
205
  if ((pMeta->role != NODE_ROLE_LEADER) || tsDisableStream) {
12,095!
206
    tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId);
×
207
    return;
×
208
  }
209

210
  pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
12,099!
211
  if (pParam == NULL) {
12,099!
212
    tqError("vgId:%d failed to start scan wal, stream not executes, code:%s", vgId, tstrerror(code));
×
213
    return;
×
214
  }
215

216
  pParam->metaId = pMeta->rid;
12,099✔
217
  pParam->msgCb = pTq->pVnode->msgCb;
12,099✔
218

219
  code = streamTimerGetInstance(&pTimer);
12,099✔
220
  if (code) {
12,097!
221
    tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
×
222
    taosMemoryFree(pParam);
×
223
  } else {
224
    streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
12,097✔
225
                   "scan-wal");
226
  }
227
}
228

229
int32_t tqStopStreamAllTasksAsync(SStreamMeta* pMeta, SMsgCb* pMsgCb) {
5,788✔
230
  return streamTaskSchedTask(pMsgCb, pMeta->vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS, false);
5,788✔
231
}
232

233
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
216,864✔
234
  // seek the stored version and extract data from WAL
235
  int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
216,864✔
236
  if (pTask->chkInfo.nextProcessVer < firstVer) {
217,204!
UNCOV
237
    tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64,
×
238
           vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer);
239

UNCOV
240
    pTask->chkInfo.nextProcessVer = firstVer;
×
241

242
    // todo need retry if failed
UNCOV
243
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
×
UNCOV
244
    if (code != TSDB_CODE_SUCCESS) {
×
245
      return code;
×
246
    }
247

248
    // append the data for the stream
UNCOV
249
    tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer);
×
250
  } else {
251
    int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
217,204✔
252
    if (currentVer == -1) {  // we only seek the read for the first time
216,685✔
253
      int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
4,284✔
254
      if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
4,270!
255
        return code;
×
256
      }
257

258
      // append the data for the stream
259
      tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
4,270✔
260
              pTask->chkInfo.nextProcessVer);
261
    }
262
  }
263

264
  int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
216,672✔
265
  if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) {
216,793✔
266
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
133✔
267
    if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
133!
268
      return code;
×
269
    }
270

271
    tqDebug("vgId:%d s-task:%s wal reader jump to ver:%" PRId64, vgId, pTask->id.idStr, skipToVer);
133✔
272
  }
273

274
  return TSDB_CODE_SUCCESS;
216,650✔
275
}
276

277
// todo handle memory error
278
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
347,319✔
279
  const char* id = pTask->id.idStr;
347,319✔
280
  int64_t     maxVer = pTask->step2Range.maxVer;
347,319✔
281

282
  if ((pTask->info.fillHistory == 1) && ver > maxVer) {
347,319✔
283
    if (!pTask->status.appendTranstateBlock) {
714!
284
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
714!
285
            ", not scan wal anymore, add transfer-state block into inputQ",
286
            id, ver, maxVer);
287

288
      double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
714✔
289
      qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
714✔
290
             id, pTask->step2Range.minVer, maxVer, el);
291
      int32_t code = streamTaskPutTranstateIntoInputQ(pTask);
714✔
292
      if (code) {
714!
293
        qError("s-task:%s failed to put trans-state into inputQ", id);
×
294
      }
295

296
      return true;
714✔
297
    } else {
298
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
×
299
            ", not scan wal",
300
            id, ver, pTask->step2Range.minVer, maxVer);
301
    }
302
  }
303

304
  return false;
346,631✔
305
}
306

307
bool taskReadyForDataFromWal(SStreamTask* pTask) {
572,334✔
308
  // non-source or fill-history tasks don't need to response the WAL scan action.
309
  SSTaskBasicInfo* pInfo = &pTask->info;
572,334✔
310
  if ((pInfo->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
572,334✔
311
    return false;
271,348✔
312
  }
313

314
  if (pInfo->trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
300,986✔
315
    return false;
36,950✔
316
  }
317

318
  if (pInfo->fillHistory == STREAM_RECALCUL_TASK) {
264,036!
319
    return false;
×
320
  }
321

322
  // not in ready state, do not handle the data from wal
323
  SStreamTaskState pState = streamTaskGetStatus(pTask);
264,036✔
324
  if (pState.state != TASK_STATUS__READY) {
263,963✔
325
    tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState.name);
39,731✔
326
    return false;
39,726✔
327
  }
328

329
  // fill-history task has entered into the last phase, no need to do anything
330
  if ((pInfo->fillHistory == STREAM_HISTORY_TASK) && pTask->status.appendTranstateBlock) {
224,232✔
331
    // the maximum version of data in the WAL has reached already, the step2 is done
332
    tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
6,798✔
333
            pTask->dataRange.range.maxVer);
334
    return false;
6,798✔
335
  }
336

337
  // check whether input queue is full or not
338
  if (streamQueueIsFull(pTask->inputq.queue)) {
217,434✔
339
    tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr);
332!
340
    int32_t code = streamTrySchedExec(pTask, false);
332✔
UNCOV
341
    if (code) {
×
342
      tqError("s-task:%s failed to start task while inputQ is full", pTask->id.idStr);
×
343
    }
UNCOV
344
    return false;
×
345
  }
346

347
  // the input queue of downstream task is full, so the output is blocked, stopped for a while
348
  if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
217,216!
349
    tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
×
350
    return false;
×
351
  }
352

353
  return true;
217,216✔
354
}
355

356
int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) {
216,878✔
357
  const char* id = pTask->id.idStr;
216,878✔
358
  int32_t     numOfNewItems = 0;
216,878✔
359
  int32_t     code = 0;
216,878✔
360
  *pSucc = false;
216,878✔
361

362
  while (1) {
130,869✔
363
    if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
347,747!
364
      *numOfItems += numOfNewItems;
×
365
      return numOfNewItems > 0;
×
366
    }
367

368
    SStreamQueueItem* pItem = NULL;
347,747✔
369
    code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
347,747✔
370
    if (code != TSDB_CODE_SUCCESS || pItem == NULL) {  // failed, continue
347,355✔
371
      int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
216,258✔
372
      bool    itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
216,349✔
373
      if (itemInFillhistory) {
216,239✔
374
        numOfNewItems += 1;
501✔
375
      }
376
      break;
216,239✔
377
    }
378

379
    if (pItem != NULL) {
131,097!
380
      code = streamTaskPutDataIntoInputQ(pTask, pItem);
131,100✔
381
      if (code == TSDB_CODE_SUCCESS) {
131,112✔
382
        numOfNewItems += 1;
131,110✔
383
        int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
131,110✔
384
        pTask->chkInfo.nextProcessVer = ver;
131,094✔
385
        tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver);
131,094✔
386

387
        bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
131,094✔
388
        if (itemInFillhistory) {
131,085✔
389
          break;
213✔
390
        }
391
      } else {
392
        if (code == TSDB_CODE_OUT_OF_MEMORY) {
2!
393
          tqError("s-task:%s failed to put data into inputQ, since out of memory", id);
×
394
        } else {
395
          tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id,
2!
396
                  pTask->chkInfo.nextProcessVer);
397
          code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
2✔
398
          if (code) {
2!
399
            tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
×
400
          }
401

402
          code = 0;  // reset the error code
2✔
403
        }
404

405
        break;
2✔
406
      }
407
    }
408
  }
409

410
  *numOfItems += numOfNewItems;
216,454✔
411
  *pSucc = (numOfNewItems > 0);
216,454✔
412
  return code;
216,454✔
413
}
414

415
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) {
202,790✔
416
  int32_t vgId = pStreamMeta->vgId;
202,790✔
417
  SArray* pTaskList = NULL;
202,790✔
418
  int32_t numOfTasks = 0;
202,790✔
419

420
  // clone the task list, to avoid the task update during scan wal files
421
  streamMetaWLock(pStreamMeta);
202,790✔
422
  pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
202,960✔
423
  streamMetaWUnLock(pStreamMeta);
203,022✔
424
  if (pTaskList == NULL) {
202,779!
425
    tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno));
×
426
    return terrno;
×
427
  }
428

429
  // update the new task number
430
  numOfTasks = taosArrayGetSize(pTaskList);
202,779✔
431
  if (pNumOfTasks != NULL) {
202,687!
432
    *pNumOfTasks = numOfTasks;
202,751✔
433
  }
434

435
  tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
202,687✔
436

437
  for (int32_t i = 0; i < numOfTasks; ++i) {
782,479✔
438
    STaskId* pTaskId = taosArrayGet(pTaskList, i);
579,171✔
439
    if (pTaskId == NULL) {
578,224!
440
      continue;
362,283✔
441
    }
442

443
    SStreamTask* pTask = NULL;
578,224✔
444
    int32_t      code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
578,224✔
445
    if (pTask == NULL || code != 0) {
579,767!
446
      continue;
6,766✔
447
    }
448

449
    if (!taskReadyForDataFromWal(pTask)) {
573,001✔
450
      streamMetaReleaseTask(pStreamMeta, pTask);
355,040✔
451
      continue;
355,477✔
452
    }
453

454
    // seek the stored version and extract data from WAL
455
    code = setWalReaderStartOffset(pTask, vgId);
217,221✔
456
    if (code != TSDB_CODE_SUCCESS) {
216,734!
457
      streamMetaReleaseTask(pStreamMeta, pTask);
×
458
      continue;
×
459
    }
460

461
    int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
216,734✔
462
    int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
217,489✔
463

464
    streamMutexLock(&pTask->lock);
217,489✔
465

466
    SStreamTaskState state = streamTaskGetStatus(pTask);
217,458✔
467
    if (state.state != TASK_STATUS__READY) {
217,076!
468
      tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, state.name);
×
469
      streamMutexUnlock(&pTask->lock);
×
470
      streamMetaReleaseTask(pStreamMeta, pTask);
×
471
      continue;
40✔
472
    }
473

474
    bool hasNewData = false;
217,076✔
475
    code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData);
217,076✔
476
    streamMutexUnlock(&pTask->lock);
216,379✔
477

478
    TAOS_UNUSED(code);
479

480
    if ((numOfItems > 0) || hasNewData) {
217,361!
481
      code = streamTrySchedExec(pTask, false);
15,888✔
482
      if (code != TSDB_CODE_SUCCESS) {
15,892!
483
        streamMetaReleaseTask(pStreamMeta, pTask);
×
484
        taosArrayDestroy(pTaskList);
×
485
        return code;
×
486
      }
487
    }
488

489
    streamMetaReleaseTask(pStreamMeta, pTask);
217,365✔
490
  }
491

492
  taosArrayDestroy(pTaskList);
203,308✔
493
  return TSDB_CODE_SUCCESS;
202,906✔
494
}
495

496
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
×
497
  SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
×
498
  p->metaId = pTq->pStreamMeta->rid;
×
499

500
  doStartScanWal(p, 0);
×
501
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc