• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3629

04 Mar 2025 01:45PM UTC coverage: 63.692% (-0.1%) from 63.79%
#3629

push

travis-ci

web-flow
Merge pull request #30007 from taosdata/revert-29951-docs/update-exception-handling-strategy

Revert "docs: update exception handling strategy"

149369 of 300378 branches covered (49.73%)

Branch coverage included in aggregate %.

233614 of 300930 relevant lines covered (77.63%)

18792670.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.85
/source/dnode/vnode/src/tq/tqStreamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "vnd.h"
18

19
#define SCAN_WAL_IDLE_DURATION    250  // idle for 500ms to do next wal scan
20
#define SCAN_WAL_WAIT_COUNT       2
21

22
typedef struct SBuildScanWalMsgParam {
23
  int64_t metaId;
24
  SMsgCb  msgCb;
25
} SBuildScanWalMsgParam;
26

27
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks);
28
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
29
static bool    handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
30
static bool    taskReadyForDataFromWal(SStreamTask* pTask);
31
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
32

33
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
34
int32_t tqScanWal(STQ* pTq) {
277,553✔
35
  SStreamMeta* pMeta = pTq->pStreamMeta;
277,553✔
36
  int32_t      vgId = pMeta->vgId;
277,553✔
37
  int64_t      st = taosGetTimestampMs();
277,889✔
38
  int32_t      numOfTasks = 0;
277,889✔
39
  int64_t      el = 0;
277,889✔
40
  int32_t      code = 0;
277,889✔
41

42
  int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanSentinel, 0, 1);
277,889✔
43
  if (old == 0) {
277,836!
44
    tqDebug("vgId:%d try to scan wal to extract data", vgId);
277,861✔
45
  } else {
46
    tqDebug("vgId:%d already in wal scan, abort", vgId);
×
47
    return code;
×
48
  }
49

50
  // the scan wal interval less than 200, not scan, actually.
51
  if ((pMeta->scanInfo.lastScanTs > 0) && (st - pMeta->scanInfo.lastScanTs < 200)) {
277,860✔
52
    tqDebug("vgId:%d scan wal less than 200ms, do nothing", vgId);
208✔
53
    atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
208✔
54
    return code;
208✔
55
  }
56

57
  // check all tasks
58
  code = doScanWalForAllTasks(pMeta, &numOfTasks);
277,652✔
59

60
  pMeta->scanInfo.lastScanTs = taosGetTimestampMs();
277,316✔
61
  el = (pMeta->scanInfo.lastScanTs - st);
277,316✔
62

63
  if (code) {
277,316!
64
    tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el,
×
65
            tstrerror(code));
66
  } else {
67
    tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms", vgId, el);
277,316✔
68
  }
69

70
  atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
277,316✔
71
  return code;
277,569✔
72
}
73

74
static bool waitEnoughDuration(SStreamMeta* pMeta) {
4,174,346✔
75
  if ((++pMeta->scanInfo.tickCounter) >= SCAN_WAL_WAIT_COUNT) {
4,174,346✔
76
    pMeta->scanInfo.tickCounter = 0;
2,084,694✔
77
    return true;
2,084,694✔
78
  }
79

80
  return false;
2,089,652✔
81
}
82

83
static void doStartScanWal(void* param, void* tmrId) {
4,186,399✔
84
  int32_t                vgId = 0;
4,186,399✔
85
  int32_t                code = 0;
4,186,399✔
86
  int32_t                numOfTasks = 0;
4,186,399✔
87
  tmr_h                  pTimer = NULL;
4,186,399✔
88
  SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
4,186,399✔
89

90
  tqDebug("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId);
4,186,399✔
91

92
  SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
4,186,399✔
93
  if (pMeta == NULL) {
4,186,399!
94
    tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
×
95
    taosMemoryFree(pParam);
×
96
    return;
2,100,014✔
97
  }
98

99
  vgId = pMeta->vgId;
4,186,399✔
100
  code = streamTimerGetInstance(&pTimer);
4,186,399✔
101
  if (code) {
4,186,399!
102
    tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code));
×
103
    taosMemoryFree(pParam);
×
104
    return;
×
105
  }
106

107
  if (pMeta->closeFlag) {
4,186,399✔
108
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
10,340✔
109
    if (code == TSDB_CODE_SUCCESS) {
10,340!
110
      tqInfo("vgId:%d jump out of scan wal timer since closed", vgId);
10,340!
111
    } else {
112
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
113
              tstrerror(code));
114
    }
115

116
    taosMemoryFree(pParam);
10,340!
117
    return;
10,340✔
118
  }
119

120
  if (pMeta->role != NODE_ROLE_LEADER) {
4,176,059✔
121
    tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role);
22!
122

123
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
22✔
124
    if (code == TSDB_CODE_SUCCESS) {
22!
125
      tqDebug("vgId:%d jump out of scan wal timer since not leader", vgId);
22!
126
    } else {
127
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
128
              tstrerror(code));
129
    }
130

131
    taosMemFree(pParam);
22✔
132
    return;
22✔
133
  }
134

135
  if (pMeta->startInfo.startAllTasks) {
4,176,037✔
136
    tqDebug("vgId:%d in restart procedure, not ready to scan wal", vgId);
1,691✔
137
    goto _end;
1,691✔
138
  }
139

140
  if (!waitEnoughDuration(pMeta)) {
4,174,346✔
141
    streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
2,089,652✔
142
                   "scan-wal");
143
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
2,089,652✔
144
    if (code) {
2,089,652!
145
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
146
              tstrerror(code));
147
    }
148
    return;
2,089,652✔
149
  }
150

151
  code = streamMetaTryRlock(pMeta);
2,084,694✔
152
  if (code == 0) {
2,084,694✔
153
    numOfTasks = taosArrayGetSize(pMeta->pTaskList);
2,084,374✔
154
    streamMetaRUnLock(pMeta);
2,084,374✔
155
  } else {
156
    numOfTasks = 0;
320✔
157
  }
158

159
  if (numOfTasks == 0) {
2,084,694✔
160
    goto _end;
1,806,762✔
161
  }
162

163
  tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
277,932✔
164

165
   #if 0
166
  //  wait for the vnode is freed, and invalid read may occur.
167
  taosMsleep(10000);
168
   #endif
169

170
  code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
277,932✔
171
  if (code) {
277,932✔
172
    tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
1!
173
  }
174

175
_end:
277,931✔
176
  streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
2,086,385✔
177
  tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
2,086,385✔
178

179
  code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
2,086,385✔
180
  if (code) {
2,086,385!
181
    tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
182
            tstrerror(code));
183
  }
184
}
185

186
void tqScanWalAsync(STQ* pTq) {
10,355✔
187
  SStreamMeta*           pMeta = pTq->pStreamMeta;
10,355✔
188
  int32_t                code = 0;
10,355✔
189
  int32_t                vgId = TD_VID(pTq->pVnode);
10,355✔
190
  tmr_h                  pTimer = NULL;
10,355✔
191
  SBuildScanWalMsgParam* pParam = NULL;
10,355✔
192

193
  // 1. the vnode should be the leader.
194
  // 2. the stream isn't disabled
195
  if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) {
10,355!
196
    tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId);
×
197
    return;
×
198
  }
199

200
  pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
10,361!
201
  if (pParam == NULL) {
10,361!
202
    tqError("vgId:%d failed to start scan wal, stream not executes, code:%s", vgId, tstrerror(code));
×
203
    return;
×
204
  }
205

206
  pParam->metaId = pMeta->rid;
10,361✔
207
  pParam->msgCb = pTq->pVnode->msgCb;
10,361✔
208

209
  code = streamTimerGetInstance(&pTimer);
10,361✔
210
  if (code) {
10,358!
211
    tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
×
212
    taosMemoryFree(pParam);
×
213
  } else {
214
    streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
10,358✔
215
                   "scan-wal");
216
  }
217
}
218

219
int32_t tqStopStreamAllTasksAsync(SStreamMeta* pMeta, SMsgCb* pMsgCb) {
4,612✔
220
  return streamTaskSchedTask(pMsgCb, pMeta->vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS);
4,612✔
221
}
222

223
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
301,772✔
224
  // seek the stored version and extract data from WAL
225
  int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
301,772✔
226
  if (pTask->chkInfo.nextProcessVer < firstVer) {
301,630✔
227
    tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64,
4!
228
           vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer);
229

230
    pTask->chkInfo.nextProcessVer = firstVer;
4✔
231

232
    // todo need retry if failed
233
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
4✔
234
    if (code != TSDB_CODE_SUCCESS) {
4!
235
      return code;
×
236
    }
237

238
    // append the data for the stream
239
    tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer);
4!
240
  } else {
241
    int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
301,626✔
242
    if (currentVer == -1) {  // we only seek the read for the first time
299,855✔
243
      int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
4,995✔
244
      if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
4,988!
245
        return code;
×
246
      }
247

248
      // append the data for the stream
249
      tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
4,988✔
250
              pTask->chkInfo.nextProcessVer);
251
    }
252
  }
253

254
  int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
299,853✔
255
  if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) {
300,452✔
256
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
456✔
257
    if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
455!
258
      return code;
×
259
    }
260

261
    tqDebug("vgId:%d s-task:%s wal reader jump to ver:%" PRId64, vgId, pTask->id.idStr, skipToVer);
455✔
262
  }
263

264
  return TSDB_CODE_SUCCESS;
300,451✔
265
}
266

267
// todo handle memory error
268
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
867,215✔
269
  const char* id = pTask->id.idStr;
867,215✔
270
  int64_t     maxVer = pTask->step2Range.maxVer;
867,215✔
271

272
  if ((pTask->info.fillHistory == 1) && ver > maxVer) {
867,215✔
273
    if (!pTask->status.appendTranstateBlock) {
977!
274
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
977!
275
            ", not scan wal anymore, add transfer-state block into inputQ",
276
            id, ver, maxVer);
277

278
      double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
977✔
279
      qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
977✔
280
             id, pTask->step2Range.minVer, maxVer, el);
281
      int32_t code = streamTaskPutTranstateIntoInputQ(pTask);
977✔
282
      if (code) {
976!
283
        qError("s-task:%s failed to put trans-state into inputQ", id);
×
284
      }
285

286
      return true;
976✔
287
    } else {
288
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
×
289
            ", not scan wal",
290
            id, ver, pTask->step2Range.minVer, maxVer);
291
    }
292
  }
293

294
  return false;
866,158✔
295
}
296

297
bool taskReadyForDataFromWal(SStreamTask* pTask) {
689,916✔
298
  // non-source or fill-history tasks don't need to response the WAL scan action.
299
  SSTaskBasicInfo* pInfo = &pTask->info;
689,916✔
300
  if ((pInfo->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
689,916✔
301
    return false;
321,699✔
302
  }
303

304
  if (pInfo->taskLevel == TASK_LEVEL__SOURCE && pInfo->trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
368,217!
305
    return false;
30,631✔
306
  }
307

308
  // not in ready state, do not handle the data from wal
309
  SStreamTaskState pState = streamTaskGetStatus(pTask);
337,586✔
310
  if (pState.state != TASK_STATUS__READY) {
337,535✔
311
    tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState.name);
29,807✔
312
    return false;
29,802✔
313
  }
314

315
  // fill-history task has entered into the last phase, no need to anything
316
  if ((pInfo->fillHistory == 1) && pTask->status.appendTranstateBlock) {
307,728✔
317
    // the maximum version of data in the WAL has reached already, the step2 is done
318
    tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
5,604✔
319
            pTask->dataRange.range.maxVer);
320
    return false;
5,604✔
321
  }
322

323
  // check whether input queue is full or not
324
  if (streamQueueIsFull(pTask->inputq.queue)) {
302,124✔
325
    tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr);
281!
326
    int32_t code = streamTrySchedExec(pTask);
281✔
327
    if (code) {
2!
328
      tqError("s-task:%s failed to start task while inputQ is full", pTask->id.idStr);
×
329
    }
330
    return false;
2✔
331
  }
332

333
  // the input queue of downstream task is full, so the output is blocked, stopped for a while
334
  if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
301,933!
335
    tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
×
336
    return false;
×
337
  }
338

339
  return true;
301,933✔
340
}
341

342
int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) {
300,967✔
343
  const char* id = pTask->id.idStr;
300,967✔
344
  int32_t     numOfNewItems = 0;
300,967✔
345
  int32_t     code = 0;
300,967✔
346
  *pSucc = false;
300,967✔
347

348
  while (1) {
567,204✔
349
    if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
868,171!
350
      *numOfItems += numOfNewItems;
×
351
      return numOfNewItems > 0;
×
352
    }
353

354
    SStreamQueueItem* pItem = NULL;
868,171✔
355
    code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
868,171✔
356
    if (code != TSDB_CODE_SUCCESS || pItem == NULL) {  // failed, continue
867,266✔
357
      int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
299,628✔
358
      bool    itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
299,897✔
359
      if (itemInFillhistory) {
299,514✔
360
        numOfNewItems += 1;
693✔
361
      }
362
      break;
299,514✔
363
    }
364

365
    if (pItem != NULL) {
567,638✔
366
      code = streamTaskPutDataIntoInputQ(pTask, pItem);
567,635✔
367
      if (code == TSDB_CODE_SUCCESS) {
567,649✔
368
        numOfNewItems += 1;
567,495✔
369
        int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
567,495✔
370
        pTask->chkInfo.nextProcessVer = ver;
567,491✔
371
        tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver);
567,491✔
372

373
        bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
567,491✔
374
        if (itemInFillhistory) {
567,485✔
375
          break;
284✔
376
        }
377
      } else {
378
        if (code == TSDB_CODE_OUT_OF_MEMORY) {
154!
379
          tqError("s-task:%s failed to put data into inputQ, since out of memory", id);
×
380
        } else {
381
          tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id,
154!
382
                  pTask->chkInfo.nextProcessVer);
383
          code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
154✔
384
          if (code) {
154!
385
            tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
×
386
          }
387

388
          code = 0;  // reset the error code
154✔
389
        }
390

391
        break;
154✔
392
      }
393
    }
394
  }
395

396
  *numOfItems += numOfNewItems;
299,952✔
397
  *pSucc = (numOfNewItems > 0);
299,952✔
398
  return code;
299,952✔
399
}
400

401
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) {
277,361✔
402
  int32_t vgId = pStreamMeta->vgId;
277,361✔
403
  SArray* pTaskList = NULL;
277,361✔
404
  int32_t numOfTasks = 0;
277,361✔
405

406
  // clone the task list, to avoid the task update during scan wal files
407
  streamMetaWLock(pStreamMeta);
277,361✔
408
  pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
277,319✔
409
  streamMetaWUnLock(pStreamMeta);
277,619✔
410
  if (pTaskList == NULL) {
277,401!
411
    tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno));
×
412
    return terrno;
×
413
  }
414

415
  // update the new task number
416
  numOfTasks = taosArrayGetSize(pTaskList);
277,401✔
417
  if (pNumOfTasks != NULL) {
277,363!
418
    *pNumOfTasks = numOfTasks;
277,396✔
419
  }
420

421
  tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
277,363✔
422

423
  for (int32_t i = 0; i < numOfTasks; ++i) {
968,099✔
424
    STaskId* pTaskId = taosArrayGet(pTaskList, i);
690,623✔
425
    if (pTaskId == NULL) {
690,101!
426
      continue;
388,800✔
427
    }
428

429
    SStreamTask* pTask = NULL;
690,101✔
430
    int32_t      code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
690,101✔
431
    if (pTask == NULL || code != 0) {
691,030!
432
      continue;
559✔
433
    }
434

435
    if (!taskReadyForDataFromWal(pTask)) {
690,471✔
436
      streamMetaReleaseTask(pStreamMeta, pTask);
387,738✔
437
      continue;
388,269✔
438
    }
439

440
    // seek the stored version and extract data from WAL
441
    code = setWalReaderStartOffset(pTask, vgId);
301,909✔
442
    if (code != TSDB_CODE_SUCCESS) {
300,446!
443
      streamMetaReleaseTask(pStreamMeta, pTask);
×
444
      continue;
×
445
    }
446

447
    int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
300,446✔
448
    int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
301,650✔
449

450
    streamMutexLock(&pTask->lock);
301,650✔
451

452
    SStreamTaskState state = streamTaskGetStatus(pTask);
301,435✔
453
    if (state.state != TASK_STATUS__READY) {
301,014✔
454
      tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, state.name);
1!
455
      streamMutexUnlock(&pTask->lock);
1✔
456
      streamMetaReleaseTask(pStreamMeta, pTask);
1✔
457
      continue;
×
458
    }
459

460
    bool hasNewData = false;
301,013✔
461
    code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData);
301,013✔
462
    streamMutexUnlock(&pTask->lock);
300,019✔
463

464
    if ((numOfItems > 0) || hasNewData) {
302,090!
465
      code = streamTrySchedExec(pTask);
18,065✔
466
      if (code != TSDB_CODE_SUCCESS) {
18,084!
467
        streamMetaReleaseTask(pStreamMeta, pTask);
×
468
        taosArrayDestroy(pTaskList);
×
469
        return code;
×
470
      }
471
    }
472

473
    streamMetaReleaseTask(pStreamMeta, pTask);
302,109✔
474
  }
475

476
  taosArrayDestroy(pTaskList);
277,476✔
477
  return TSDB_CODE_SUCCESS;
277,254✔
478
}
479

480
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
×
481
  SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
×
482
  p->metaId = pTq->pStreamMeta->rid;
×
483

484
  doStartScanWal(p, 0);
×
485
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc