• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3632

08 Mar 2025 06:17AM UTC coverage: 60.719% (+0.05%) from 60.671%
#3632

push

travis-ci

web-flow
Merge pull request #29999 from taosdata/enh/TS-5089

feat: taosBenchmark supports exporting to CSV files

141890 of 300701 branches covered (47.19%)

Branch coverage included in aggregate %.

599 of 766 new or added lines in 3 files covered. (78.2%)

1025 existing lines in 124 files now uncovered.

223757 of 301490 relevant lines covered (74.22%)

17284906.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.44
/source/dnode/vnode/src/tq/tqStreamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "vnd.h"
18

19
#define SCAN_WAL_IDLE_DURATION    250  // idle for 500ms to do next wal scan
20
#define SCAN_WAL_WAIT_COUNT       2
21

22
typedef struct SBuildScanWalMsgParam {
23
  int64_t metaId;
24
  SMsgCb  msgCb;
25
} SBuildScanWalMsgParam;
26

27
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks);
28
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
29
static bool    handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
30
static bool    taskReadyForDataFromWal(SStreamTask* pTask);
31
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
32

33
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
34
int32_t tqScanWal(STQ* pTq) {
263,519✔
35
  SStreamMeta* pMeta = pTq->pStreamMeta;
263,519✔
36
  int32_t      vgId = pMeta->vgId;
263,519✔
37
  int64_t      st = taosGetTimestampMs();
263,695✔
38
  int32_t      numOfTasks = 0;
263,695✔
39
  int64_t      el = 0;
263,695✔
40
  int32_t      code = 0;
263,695✔
41

42
  int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanSentinel, 0, 1);
263,695✔
43
  if (old == 0) {
263,748!
44
    tqDebug("vgId:%d try to scan wal to extract data", vgId);
263,770✔
45
  } else {
46
    tqDebug("vgId:%d already in wal scan, abort", vgId);
×
UNCOV
47
    return code;
×
48
  }
49

50
  // the scan wal interval less than 200, not scan, actually.
51
  if ((pMeta->scanInfo.lastScanTs > 0) && (st - pMeta->scanInfo.lastScanTs < 200)) {
263,770✔
52
    tqDebug("vgId:%d scan wal less than 200ms, do nothing", vgId);
247✔
53
    atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
247✔
54
    return code;
247✔
55
  }
56

57
  // check all tasks
58
  code = doScanWalForAllTasks(pMeta, &numOfTasks);
263,523✔
59

60
  pMeta->scanInfo.lastScanTs = taosGetTimestampMs();
262,943✔
61
  el = (pMeta->scanInfo.lastScanTs - st);
262,943✔
62

63
  if (code) {
262,943✔
64
    tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el,
1!
65
            tstrerror(code));
66
  } else {
67
    tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms", vgId, el);
262,942✔
68
  }
69

70
  atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
262,943✔
71
  return code;
263,490✔
72
}
73

74
static bool waitEnoughDuration(SStreamMeta* pMeta) {
3,900,373✔
75
  if ((++pMeta->scanInfo.tickCounter) >= SCAN_WAL_WAIT_COUNT) {
3,900,373✔
76
    pMeta->scanInfo.tickCounter = 0;
1,947,926✔
77
    return true;
1,947,926✔
78
  }
79

80
  return false;
1,952,447✔
81
}
82

83
static void doStartScanWal(void* param, void* tmrId) {
3,912,617✔
84
  int32_t                vgId = 0;
3,912,617✔
85
  int32_t                code = 0;
3,912,617✔
86
  int32_t                numOfTasks = 0;
3,912,617✔
87
  tmr_h                  pTimer = NULL;
3,912,617✔
88
  SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
3,912,617✔
89

90
  tqDebug("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId);
3,912,617✔
91

92
  SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
3,912,617✔
93
  if (pMeta == NULL) {
3,912,617!
94
    tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
×
95
    taosMemoryFree(pParam);
×
96
    return;
1,961,849✔
97
  }
98

99
  vgId = pMeta->vgId;
3,912,617✔
100
  code = streamTimerGetInstance(&pTimer);
3,912,617✔
101
  if (code) {
3,912,617!
102
    tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code));
×
103
    taosMemoryFree(pParam);
×
104
    return;
×
105
  }
106

107
  if (pMeta->closeFlag) {
3,912,617✔
108
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
9,375✔
109
    if (code == TSDB_CODE_SUCCESS) {
9,375!
110
      tqInfo("vgId:%d jump out of scan wal timer since closed", vgId);
9,375!
111
    } else {
112
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
113
              tstrerror(code));
114
    }
115

116
    taosMemoryFree(pParam);
9,375!
117
    return;
9,375✔
118
  }
119

120
  if (pMeta->role != NODE_ROLE_LEADER) {
3,903,242✔
121
    tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role);
27!
122

123
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
27✔
124
    if (code == TSDB_CODE_SUCCESS) {
27!
125
      tqDebug("vgId:%d jump out of scan wal timer since not leader", vgId);
27!
126
    } else {
127
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
128
              tstrerror(code));
129
    }
130

131
    taosMemFree(pParam);
27✔
132
    return;
27✔
133
  }
134

135
  if (pMeta->startInfo.startAllTasks) {
3,903,215✔
136
    tqDebug("vgId:%d in restart procedure, not ready to scan wal", vgId);
2,842✔
137
    goto _end;
2,842✔
138
  }
139

140
  if (!waitEnoughDuration(pMeta)) {
3,900,373✔
141
    streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
1,952,447✔
142
                   "scan-wal");
143
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
1,952,447✔
144
    if (code) {
1,952,447!
145
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
146
              tstrerror(code));
147
    }
148
    return;
1,952,447✔
149
  }
150

151
  code = streamMetaTryRlock(pMeta);
1,947,926✔
152
  if (code == 0) {
1,947,926✔
153
    numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1,947,661✔
154
    streamMetaRUnLock(pMeta);
1,947,661✔
155
  } else {
156
    numOfTasks = 0;
265✔
157
  }
158

159
  if (numOfTasks == 0) {
1,947,926✔
160
    goto _end;
1,684,027✔
161
  }
162

163
  tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
263,899✔
164

165
   #if 0
166
  //  wait for the vnode is freed, and invalid read may occur.
167
  taosMsleep(10000);
168
   #endif
169

170
  code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
263,899✔
171
  if (code) {
263,899✔
172
    tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
1!
173
  }
174

175
_end:
263,898✔
176
  streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
1,950,768✔
177
  tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
1,950,768✔
178

179
  code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
1,950,768✔
180
  if (code) {
1,950,768!
181
    tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
182
            tstrerror(code));
183
  }
184
}
185

186
void tqScanWalAsync(STQ* pTq) {
9,399✔
187
  SStreamMeta*           pMeta = pTq->pStreamMeta;
9,399✔
188
  int32_t                code = 0;
9,399✔
189
  int32_t                vgId = TD_VID(pTq->pVnode);
9,399✔
190
  tmr_h                  pTimer = NULL;
9,399✔
191
  SBuildScanWalMsgParam* pParam = NULL;
9,399✔
192

193
  // 1. the vnode should be the leader.
194
  // 2. the stream isn't disabled
195
  if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) {
9,399!
196
    tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId);
×
197
    return;
×
198
  }
199

200
  pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
9,402!
201
  if (pParam == NULL) {
9,401!
202
    tqError("vgId:%d failed to start scan wal, stream not executes, code:%s", vgId, tstrerror(code));
×
203
    return;
×
204
  }
205

206
  pParam->metaId = pMeta->rid;
9,401✔
207
  pParam->msgCb = pTq->pVnode->msgCb;
9,401✔
208

209
  code = streamTimerGetInstance(&pTimer);
9,401✔
210
  if (code) {
9,401!
211
    tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
×
212
    taosMemoryFree(pParam);
×
213
  } else {
214
    streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
9,401✔
215
                   "scan-wal");
216
  }
217
}
218

219
int32_t tqStopStreamAllTasksAsync(SStreamMeta* pMeta, SMsgCb* pMsgCb) {
4,597✔
220
  return streamTaskSchedTask(pMsgCb, pMeta->vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS);
4,597✔
221
}
222

223
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
295,433✔
224
  // seek the stored version and extract data from WAL
225
  int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
295,433✔
226
  if (pTask->chkInfo.nextProcessVer < firstVer) {
295,212✔
227
    tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64,
4!
228
           vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer);
229

230
    pTask->chkInfo.nextProcessVer = firstVer;
4✔
231

232
    // todo need retry if failed
233
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
4✔
234
    if (code != TSDB_CODE_SUCCESS) {
4!
235
      return code;
×
236
    }
237

238
    // append the data for the stream
239
    tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer);
4✔
240
  } else {
241
    int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
295,208✔
242
    if (currentVer == -1) {  // we only seek the read for the first time
294,748✔
243
      int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
5,131✔
244
      if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
5,129!
245
        return code;
×
246
      }
247

248
      // append the data for the stream
249
      tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
5,129✔
250
              pTask->chkInfo.nextProcessVer);
251
    }
252
  }
253

254
  int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
294,753✔
255
  if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) {
294,964✔
256
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
457✔
257
    if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
457!
258
      return code;
×
259
    }
260

261
    tqDebug("vgId:%d s-task:%s wal reader jump to ver:%" PRId64, vgId, pTask->id.idStr, skipToVer);
457✔
262
  }
263

264
  return TSDB_CODE_SUCCESS;
294,445✔
265
}
266

267
// todo handle memory error
268
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
743,918✔
269
  const char* id = pTask->id.idStr;
743,918✔
270
  int64_t     maxVer = pTask->step2Range.maxVer;
743,918✔
271

272
  if ((pTask->info.fillHistory == 1) && ver > maxVer) {
743,918✔
273
    if (!pTask->status.appendTranstateBlock) {
1,031!
274
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
1,031!
275
            ", not scan wal anymore, add transfer-state block into inputQ",
276
            id, ver, maxVer);
277

278
      double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
1,031✔
279
      qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
1,031✔
280
             id, pTask->step2Range.minVer, maxVer, el);
281
      int32_t code = streamTaskPutTranstateIntoInputQ(pTask);
1,031✔
282
      if (code) {
1,031!
283
        qError("s-task:%s failed to put trans-state into inputQ", id);
×
284
      }
285

286
      return true;
1,031✔
287
    } else {
288
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
×
289
            ", not scan wal",
290
            id, ver, pTask->step2Range.minVer, maxVer);
291
    }
292
  }
293

294
  return false;
742,901✔
295
}
296

297
bool taskReadyForDataFromWal(SStreamTask* pTask) {
677,074✔
298
  // non-source or fill-history tasks don't need to response the WAL scan action.
299
  SSTaskBasicInfo* pInfo = &pTask->info;
677,074✔
300
  if ((pInfo->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
677,074✔
301
    return false;
317,669✔
302
  }
303

304
  if (pInfo->taskLevel == TASK_LEVEL__SOURCE && pInfo->trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
359,405✔
305
    return false;
30,544✔
306
  }
307

308
  // not in ready state, do not handle the data from wal
309
  SStreamTaskState pState = streamTaskGetStatus(pTask);
328,861✔
310
  if (pState.state != TASK_STATUS__READY) {
328,754✔
311
    tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState.name);
27,671✔
312
    return false;
27,671✔
313
  }
314

315
  // fill-history task has entered into the last phase, no need to anything
316
  if ((pInfo->fillHistory == 1) && pTask->status.appendTranstateBlock) {
301,083✔
317
    // the maximum version of data in the WAL has reached already, the step2 is done
318
    tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
5,486✔
319
            pTask->dataRange.range.maxVer);
320
    return false;
5,486✔
321
  }
322

323
  // check whether input queue is full or not
324
  if (streamQueueIsFull(pTask->inputq.queue)) {
295,597✔
325
    tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr);
193!
326
    int32_t code = streamTrySchedExec(pTask);
193✔
327
    if (code) {
2!
328
      tqError("s-task:%s failed to start task while inputQ is full", pTask->id.idStr);
×
329
    }
330
    return false;
2✔
331
  }
332

333
  // the input queue of downstream task is full, so the output is blocked, stopped for a while
334
  if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
295,557!
335
    tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
×
336
    return false;
×
337
  }
338

339
  return true;
295,557✔
340
}
341

342
int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) {
295,099✔
343
  const char* id = pTask->id.idStr;
295,099✔
344
  int32_t     numOfNewItems = 0;
295,099✔
345
  int32_t     code = 0;
295,099✔
346
  *pSucc = false;
295,099✔
347

348
  while (1) {
450,908✔
349
    if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
746,007!
350
      *numOfItems += numOfNewItems;
×
351
      return numOfNewItems > 0;
×
352
    }
353

354
    SStreamQueueItem* pItem = NULL;
746,007✔
355
    code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
746,007✔
356
    if (code != TSDB_CODE_SUCCESS || pItem == NULL) {  // failed, continue
744,008✔
357
      int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
292,743✔
358
      bool    itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
292,668✔
359
      if (itemInFillhistory) {
292,474✔
360
        numOfNewItems += 1;
698✔
361
      }
362
      break;
292,474✔
363
    }
364

365
    if (pItem != NULL) {
451,265!
366
      code = streamTaskPutDataIntoInputQ(pTask, pItem);
451,266✔
367
      if (code == TSDB_CODE_SUCCESS) {
451,286✔
368
        numOfNewItems += 1;
451,266✔
369
        int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
451,266✔
370
        pTask->chkInfo.nextProcessVer = ver;
451,247✔
371
        tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver);
451,247✔
372

373
        bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
451,247✔
374
        if (itemInFillhistory) {
451,242✔
375
          break;
333✔
376
        }
377
      } else {
378
        if (code == TSDB_CODE_OUT_OF_MEMORY) {
20!
379
          tqError("s-task:%s failed to put data into inputQ, since out of memory", id);
×
380
        } else {
381
          tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id,
20!
382
                  pTask->chkInfo.nextProcessVer);
383
          code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
20✔
384
          if (code) {
20!
385
            tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
×
386
          }
387

388
          code = 0;  // reset the error code
20✔
389
        }
390

391
        break;
20✔
392
      }
393
    }
394
  }
395

396
  *numOfItems += numOfNewItems;
292,827✔
397
  *pSucc = (numOfNewItems > 0);
292,827✔
398
  return code;
292,827✔
399
}
400

401
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) {
263,387✔
402
  int32_t vgId = pStreamMeta->vgId;
263,387✔
403
  SArray* pTaskList = NULL;
263,387✔
404
  int32_t numOfTasks = 0;
263,387✔
405

406
  // clone the task list, to avoid the task update during scan wal files
407
  streamMetaWLock(pStreamMeta);
263,387✔
408
  pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
263,320✔
409
  streamMetaWUnLock(pStreamMeta);
263,468✔
410
  if (pTaskList == NULL) {
263,404!
411
    tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno));
×
412
    return terrno;
×
413
  }
414

415
  // update the new task number
416
  numOfTasks = taosArrayGetSize(pTaskList);
263,404✔
417
  if (pNumOfTasks != NULL) {
263,320!
418
    *pNumOfTasks = numOfTasks;
263,349✔
419
  }
420

421
  tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
263,320✔
422

423
  for (int32_t i = 0; i < numOfTasks; ++i) {
940,888✔
424
    STaskId* pTaskId = taosArrayGet(pTaskList, i);
677,667✔
425
    if (pTaskId == NULL) {
676,972!
426
      continue;
382,364✔
427
    }
428

429
    SStreamTask* pTask = NULL;
676,972✔
430
    int32_t      code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
676,972✔
431
    if (pTask == NULL || code != 0) {
677,913!
432
      continue;
568✔
433
    }
434

435
    if (!taskReadyForDataFromWal(pTask)) {
677,345✔
436
      streamMetaReleaseTask(pStreamMeta, pTask);
381,256✔
437
      continue;
381,768✔
438
    }
439

440
    // seek the stored version and extract data from WAL
441
    code = setWalReaderStartOffset(pTask, vgId);
295,445✔
442
    if (code != TSDB_CODE_SUCCESS) {
294,477!
443
      streamMetaReleaseTask(pStreamMeta, pTask);
×
444
      continue;
×
445
    }
446

447
    int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
294,477✔
448
    int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
295,604✔
449

450
    streamMutexLock(&pTask->lock);
295,604✔
451

452
    SStreamTaskState state = streamTaskGetStatus(pTask);
295,681✔
453
    if (state.state != TASK_STATUS__READY) {
295,057!
454
      tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, state.name);
×
455
      streamMutexUnlock(&pTask->lock);
×
456
      streamMetaReleaseTask(pStreamMeta, pTask);
×
457
      continue;
28✔
458
    }
459

460
    bool hasNewData = false;
295,057✔
461
    code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData);
295,057✔
462
    streamMutexUnlock(&pTask->lock);
292,926✔
463

464
    if ((numOfItems > 0) || hasNewData) {
295,507!
465
      code = streamTrySchedExec(pTask);
19,146✔
466
      if (code != TSDB_CODE_SUCCESS) {
19,162✔
467
        streamMetaReleaseTask(pStreamMeta, pTask);
1✔
468
        taosArrayDestroy(pTaskList);
1✔
469
        return code;
1✔
470
      }
471
    }
472

473
    streamMetaReleaseTask(pStreamMeta, pTask);
295,522✔
474
  }
475

476
  taosArrayDestroy(pTaskList);
263,221✔
477
  return TSDB_CODE_SUCCESS;
263,004✔
478
}
479

480
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
×
481
  SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
×
482
  p->metaId = pTq->pStreamMeta->rid;
×
483

484
  doStartScanWal(p, 0);
×
485
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc