• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3608

12 Feb 2025 05:57AM UTC coverage: 63.066% (+1.4%) from 61.715%
#3608

push

travis-ci

web-flow
Merge pull request #29746 from taosdata/merge/mainto3.02

merge: from main to 3.0 branch

140199 of 286257 branches covered (48.98%)

Branch coverage included in aggregate %.

89 of 161 new or added lines in 18 files covered. (55.28%)

3211 existing lines in 190 files now uncovered.

218998 of 283298 relevant lines covered (77.3%)

5949310.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.93
/source/dnode/vnode/src/tq/tqStreamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "vnd.h"
18

19
#define MAX_REPEAT_SCAN_THRESHOLD 3
20
#define SCAN_WAL_IDLE_DURATION    500    // idle for 500ms to do next wal scan
21

22
typedef struct SBuildScanWalMsgParam {
23
  int64_t metaId;
24
  int32_t numOfTasks;
25
  int8_t  restored;
26
  SMsgCb  msgCb;
27
} SBuildScanWalMsgParam;
28

29
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta);
30
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
31
static bool    handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
32
static bool    taskReadyForDataFromWal(SStreamTask* pTask);
33
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
34
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
35
static int32_t doScanWalAsync(STQ* pTq, bool ckPause);
36

37
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
38
int32_t tqScanWal(STQ* pTq) {
32,060✔
39
  SStreamMeta* pMeta = pTq->pStreamMeta;
32,060✔
40
  int32_t      vgId = pMeta->vgId;
32,060✔
41
  int64_t      st = taosGetTimestampMs();
32,063✔
42
  int32_t      numOfTasks = 0;
32,063✔
43

44
  tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
32,063✔
45

46
  // check all tasks
47
  int32_t code = doScanWalForAllTasks(pMeta);
32,063✔
48
  if (code) {
32,063!
49
    tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
×
50
    return code;
×
51
  }
52

53
  streamMetaWLock(pMeta);
32,063✔
54
  int32_t times = (--pMeta->scanInfo.scanCounter);
32,063✔
55
  if (times < 0) {
32,063!
56
    tqError("vgId:%d invalid scan counter:%d, reset to 0", vgId, times);
×
57
    times = 0;
×
58
  }
59

60
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
32,063✔
61
  streamMetaWUnLock(pMeta);
32,063✔
62

63
  int64_t el = (taosGetTimestampMs() - st);
32,063✔
64
  tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 " ms", vgId, el);
32,063✔
65

66
  if (times > 0) {
32,063✔
67
    tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
14,833✔
68
    code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION);
14,833✔
69
    if (code) {
14,833!
70
      tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION);
×
71
    }
72
  }
73

74
  return code;
32,063✔
75
}
76

77
static void doStartScanWal(void* param, void* tmrId) {
14,833✔
78
  int32_t vgId = 0;
14,833✔
79
  int32_t code = 0;
14,833✔
80

81
  SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
14,833✔
82

83
  SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
14,833✔
84
  if (pMeta == NULL) {
14,833✔
85
    tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
95!
86
    taosMemoryFree(pParam);
95!
87
    return;
95✔
88
  }
89

90
  if (pMeta->closeFlag) {
14,738✔
91
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
14✔
92
    if (code == TSDB_CODE_SUCCESS) {
14!
93
      tqDebug("vgId:%d jump out of scan wal timer since closed", vgId);
14!
94
    } else {
95
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
96
              tstrerror(code));
97
    }
98

99
    taosMemoryFree(pParam);
14!
100
    return;
14✔
101
  }
102

103
  vgId = pMeta->vgId;
14,724✔
104

105
  tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
14,724✔
106
          pParam->restored);
107
#if 0
108
  // wait for the vnode is freed, and invalid read may occur.
109
  taosMsleep(10000);
110
#endif
111

112
  code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
14,724✔
113
  if (code) {
14,724✔
114
    tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
218!
115
  }
116

117
  code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
14,724✔
118
  if (code) {
14,724!
119
    tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
120
            tstrerror(code));
121
  }
122

123
  taosMemoryFree(pParam);
14,724!
124
}
125

126
int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
14,833✔
127
  SStreamMeta*           pMeta = pTq->pStreamMeta;
14,833✔
128
  int32_t                code = 0;
14,833✔
129
  int32_t                vgId = TD_VID(pTq->pVnode);
14,833✔
130
  tmr_h                  pTimer = NULL;
14,833✔
131
  SBuildScanWalMsgParam* pParam = NULL;
14,833✔
132

133
  pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
14,833!
134
  if (pParam == NULL) {
14,833!
135
    return terrno;
×
136
  }
137

138
  pParam->metaId = pMeta->rid;
14,833✔
139
  pParam->numOfTasks = numOfTasks;
14,833✔
140
  pParam->restored = pTq->pVnode->restored;
14,833✔
141
  pParam->msgCb = pTq->pVnode->msgCb;
14,833✔
142

143
  code = streamTimerGetInstance(&pTimer);
14,833✔
144
  if (code) {
14,833!
145
    tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
×
146
    taosMemoryFree(pParam);
×
147
  } else {
148
    streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut");
14,833✔
149
  }
150

151
  return code;
14,833✔
152
}
153

154
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
114,381✔
155
  SStreamMeta* pMeta = pTq->pStreamMeta;
114,381✔
156
  bool         alreadyRestored = pTq->pVnode->restored;
114,381✔
157
  int32_t      code = 0;
114,381✔
158

159
  // do not launch the stream tasks, if it is a follower or not restored vnode.
160
  if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
114,381✔
161
    return TSDB_CODE_SUCCESS;
10,764✔
162
  }
163

164
  streamMetaWLock(pMeta);
103,624✔
165
  code = doScanWalAsync(pTq, ckPause);
103,621✔
166
  streamMetaWUnLock(pMeta);
103,624✔
167
  return code;
103,623✔
168
}
169

170
int32_t tqStopStreamTasksAsync(STQ* pTq) {
4,574✔
171
  SStreamMeta* pMeta = pTq->pStreamMeta;
4,574✔
172
  int32_t      vgId = pMeta->vgId;
4,574✔
173
  return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS);
4,574✔
174
}
175

176
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
38,076✔
177
  // seek the stored version and extract data from WAL
178
  int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
38,076✔
179
  if (pTask->chkInfo.nextProcessVer < firstVer) {
38,076!
UNCOV
180
    tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64,
×
181
           vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer);
182

UNCOV
183
    pTask->chkInfo.nextProcessVer = firstVer;
×
184

185
    // todo need retry if failed
UNCOV
186
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
×
UNCOV
187
    if (code != TSDB_CODE_SUCCESS) {
×
188
      return code;
×
189
    }
190

191
    // append the data for the stream
UNCOV
192
    tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer);
×
193
  } else {
194
    int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
38,076✔
195
    if (currentVer == -1) {  // we only seek the read for the first time
38,076✔
196
      int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
3,108✔
197
      if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
3,108!
198
        return code;
×
199
      }
200

201
      // append the data for the stream
202
      tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
3,108✔
203
              pTask->chkInfo.nextProcessVer);
204
    }
205
  }
206

207
  int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
38,076✔
208
  if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) {
38,076✔
209
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
313✔
210
    if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
313!
211
      return code;
×
212
    }
213

214
    tqDebug("vgId:%d s-task:%s wal reader jump to ver:%" PRId64, vgId, pTask->id.idStr, skipToVer);
313!
215
  }
216

217
  return TSDB_CODE_SUCCESS;
38,076✔
218
}
219

220
// todo handle memory error
221
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
95,432✔
222
  const char* id = pTask->id.idStr;
95,432✔
223
  int64_t     maxVer = pTask->step2Range.maxVer;
95,432✔
224

225
  if ((pTask->info.fillHistory == 1) && ver > maxVer) {
95,432✔
226
    if (!pTask->status.appendTranstateBlock) {
566!
227
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
566!
228
            ", not scan wal anymore, add transfer-state block into inputQ",
229
            id, ver, maxVer);
230

231
      double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
566✔
232
      qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
566✔
233
             id, pTask->step2Range.minVer, maxVer, el);
234
      int32_t code = streamTaskPutTranstateIntoInputQ(pTask);
566✔
235
      if (code) {
566!
236
        qError("s-task:%s failed to put trans-state into inputQ", id);
×
237
      }
238

239
      return true;
566✔
240
    } else {
241
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
×
242
            ", not scan wal",
243
            id, ver, pTask->step2Range.minVer, maxVer);
244
    }
245
  }
246

247
  return false;
94,866✔
248
}
249

250
bool taskReadyForDataFromWal(SStreamTask* pTask) {
137,853✔
251
  // non-source or fill-history tasks don't need to response the WAL scan action.
252
  SSTaskBasicInfo* pInfo = &pTask->info;
137,853✔
253
  if ((pInfo->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
137,853✔
254
    return false;
69,903✔
255
  }
256

257
  if (pInfo->taskLevel == TASK_LEVEL__SOURCE && pInfo->trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
67,950!
258
    return false;
12,645✔
259
  }
260

261
  // not in ready state, do not handle the data from wal
262
  SStreamTaskState pState = streamTaskGetStatus(pTask);
55,305✔
263
  if (pState.state != TASK_STATUS__READY) {
55,305✔
264
    tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState.name);
14,607✔
265
    return false;
14,607✔
266
  }
267

268
  // fill-history task has entered into the last phase, no need to anything
269
  if ((pInfo->fillHistory == 1) && pTask->status.appendTranstateBlock) {
40,698✔
270
    // the maximum version of data in the WAL has reached already, the step2 is done
271
    tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
2,622✔
272
            pTask->dataRange.range.maxVer);
273
    return false;
2,622✔
274
  }
275

276
  // check if input queue is full or not
277
  if (streamQueueIsFull(pTask->inputq.queue)) {
38,076!
UNCOV
278
    tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
×
UNCOV
279
    return false;
×
280
  }
281

282
  // the input queue of downstream task is full, so the output is blocked, stopped for a while
283
  if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
38,076!
284
    tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
×
285
    return false;
×
286
  }
287

288
  return true;
38,076✔
289
}
290

291
int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) {
38,071✔
292
  const char* id = pTask->id.idStr;
38,071✔
293
  int32_t     numOfNewItems = 0;
38,071✔
294
  int32_t     code = 0;
38,071✔
295
  *pSucc = false;
38,071✔
296

297
  while (1) {
57,361✔
298
    if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
95,432!
299
      *numOfItems += numOfNewItems;
×
300
      return numOfNewItems > 0;
×
301
    }
302

303
    SStreamQueueItem* pItem = NULL;
95,432✔
304
    code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
95,432✔
305
    if (code != TSDB_CODE_SUCCESS || pItem == NULL) {  // failed, continue
95,431✔
306
      int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
37,966✔
307
      bool    itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
37,967✔
308
      if (itemInFillhistory) {
37,967✔
309
        numOfNewItems += 1;
462✔
310
      }
311
      break;
37,967✔
312
    }
313

314
    if (pItem != NULL) {
57,465!
315
      code = streamTaskPutDataIntoInputQ(pTask, pItem);
57,465✔
316
      if (code == TSDB_CODE_SUCCESS) {
57,465!
317
        numOfNewItems += 1;
57,465✔
318
        int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
57,465✔
319
        pTask->chkInfo.nextProcessVer = ver;
57,465✔
320
        tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver);
57,465✔
321

322
        bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
57,465✔
323
        if (itemInFillhistory) {
57,465✔
324
          break;
104✔
325
        }
326
      } else {
UNCOV
327
        if (code == TSDB_CODE_OUT_OF_MEMORY) {
×
328
          tqError("s-task:%s failed to put data into inputQ, since out of memory", id);
×
329
        } else {
UNCOV
330
          tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id,
×
331
                  pTask->chkInfo.nextProcessVer);
UNCOV
332
          code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
×
UNCOV
333
          if (code) {
×
334
            tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
×
335
          }
336

UNCOV
337
          code = 0;  // reset the error code
×
338
        }
339

UNCOV
340
        break;
×
341
      }
342
    }
343
  }
344

345
  *numOfItems += numOfNewItems;
38,071✔
346
  *pSucc = (numOfNewItems > 0);
38,071✔
347
  return code;
38,071✔
348
}
349

350
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
32,063✔
351
  int32_t vgId = pStreamMeta->vgId;
32,063✔
352
  SArray* pTaskList = NULL;
32,063✔
353
  int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
32,063✔
354
  if (numOfTasks == 0) {
32,063✔
355
    return TSDB_CODE_SUCCESS;
262✔
356
  }
357

358
  // clone the task list, to avoid the task update during scan wal files
359
  streamMetaWLock(pStreamMeta);
31,801✔
360
  pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
31,801✔
361
  streamMetaWUnLock(pStreamMeta);
31,801✔
362
  if (pTaskList == NULL) {
31,801!
363
    tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno));
×
364
    return terrno;
×
365
  }
366

367
  tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
31,801✔
368

369
  // update the new task number
370
  numOfTasks = taosArrayGetSize(pTaskList);
31,801✔
371

372
  for (int32_t i = 0; i < numOfTasks; ++i) {
170,263✔
373
    STaskId* pTaskId = taosArrayGet(pTaskList, i);
138,462✔
374
    if (pTaskId == NULL) {
138,462!
375
      continue;
100,391✔
376
    }
377

378
    SStreamTask* pTask = NULL;
138,462✔
379
    int32_t      code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
138,462✔
380
    if (pTask == NULL || code != 0) {
138,462!
381
      continue;
609✔
382
    }
383

384
    if (!taskReadyForDataFromWal(pTask)) {
137,853✔
385
      streamMetaReleaseTask(pStreamMeta, pTask);
99,777✔
386
      continue;
99,777✔
387
    }
388

389
    // seek the stored version and extract data from WAL
390
    code = setWalReaderStartOffset(pTask, vgId);
38,076✔
391
    if (code != TSDB_CODE_SUCCESS) {
38,076!
392
      streamMetaReleaseTask(pStreamMeta, pTask);
×
393
      continue;
×
394
    }
395

396
    int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
38,076✔
397
    int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
38,076✔
398

399
    streamMutexLock(&pTask->lock);
38,076✔
400

401
    SStreamTaskState state = streamTaskGetStatus(pTask);
38,076✔
402
    if (state.state != TASK_STATUS__READY) {
38,076✔
403
      tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, state.name);
5!
404
      streamMutexUnlock(&pTask->lock);
5✔
405
      streamMetaReleaseTask(pStreamMeta, pTask);
5✔
406
      continue;
5✔
407
    }
408

409
    bool hasNewData = false;
38,071✔
410
    code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData);
38,071✔
411
    streamMutexUnlock(&pTask->lock);
38,071✔
412

413
    if ((numOfItems > 0) || hasNewData) {
38,071!
414
      code = streamTrySchedExec(pTask);
12,388✔
415
      if (code != TSDB_CODE_SUCCESS) {
12,388!
416
        streamMetaReleaseTask(pStreamMeta, pTask);
×
417
        taosArrayDestroy(pTaskList);
×
418
        return code;
×
419
      }
420
    }
421

422
    streamMetaReleaseTask(pStreamMeta, pTask);
38,071✔
423
  }
424

425
  taosArrayDestroy(pTaskList);
31,801✔
426
  return TSDB_CODE_SUCCESS;
31,801✔
427
}
428

429
int32_t doScanWalAsync(STQ* pTq, bool ckPause) {
103,620✔
430
  SStreamMeta* pMeta = pTq->pStreamMeta;
103,620✔
431
  bool         alreadyRestored = pTq->pVnode->restored;
103,620✔
432
  int32_t      vgId = pMeta->vgId;
103,620✔
433
  int32_t      numOfTasks = taosArrayGetSize(pMeta->pTaskList);
103,620✔
434

435
  if (numOfTasks == 0) {
103,619!
UNCOV
436
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
437
    return 0;
3✔
438
  }
439

440
  if (pMeta->startInfo.startAllTasks) {
103,619✔
441
    tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
7,254✔
442
    return 0;
7,254✔
443
  }
444

445
  pMeta->scanInfo.scanCounter += 1;
96,365✔
446
  if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
96,365✔
447
    pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
62,995✔
448
  }
449

450
  if (pMeta->scanInfo.scanCounter > 1) {
96,365✔
451
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter);
78,275✔
452
    return 0;
78,275✔
453
  }
454

455
  int32_t numOfPauseTasks = pMeta->numOfPausedTasks;
18,090✔
456
  if (ckPause && numOfTasks == numOfPauseTasks) {
18,090✔
457
    tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
29!
458

459
    // reset the counter value, since we do not launch the scan wal operation.
460
    pMeta->scanInfo.scanCounter = 0;
29✔
461
    return 0;
29✔
462
  }
463

464
  tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
18,061✔
465
          numOfTasks, alreadyRestored);
466

467
  return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
18,062✔
468
}
469

470
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
×
471
  SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
×
472
  p->metaId = pTq->pStreamMeta->rid;
×
473
  p->numOfTasks = 0;
×
474

475
  doStartScanWal(p, 0);
×
476
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc