• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4087

17 May 2025 01:40AM UTC coverage: 62.757% (-0.3%) from 63.048%
#4087

push

travis-ci

GitHub
fix: double close wal meta file. (#31059)

156587 of 317922 branches covered (49.25%)

Branch coverage included in aggregate %.

241845 of 316955 relevant lines covered (76.3%)

6570724.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.39
/source/dnode/vnode/src/tq/tqStreamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "vnd.h"
18

19
#define SCAN_WAL_IDLE_DURATION    250  // idle for 500ms to do next wal scan
20
#define SCAN_WAL_WAIT_COUNT       2
21

22
typedef struct SBuildScanWalMsgParam {
23
  int64_t metaId;
24
  SMsgCb  msgCb;
25
} SBuildScanWalMsgParam;
26

27
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks);
28
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
29
static bool    handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
30
static bool    taskReadyForDataFromWal(SStreamTask* pTask);
31
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
32

33
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
34
int32_t tqScanWal(STQ* pTq) {
542,642✔
35
  SStreamMeta* pMeta = pTq->pStreamMeta;
542,642✔
36
  int32_t      vgId = pMeta->vgId;
542,642✔
37
  int64_t      st = taosGetTimestampMs();
551,404✔
38
  int32_t      numOfTasks = 0;
551,404✔
39
  int64_t      el = 0;
551,404✔
40
  int32_t      code = 0;
551,404✔
41

42
  int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanSentinel, 0, 1);
551,404✔
43
  if (old == 0) {
550,581!
44
    tqDebug("vgId:%d try to scan wal to extract data", vgId);
551,144✔
45
  } else {
46
    tqDebug("vgId:%d already in wal scan, abort", vgId);
×
47
    return code;
×
48
  }
49

50
  // the scan wal interval less than 200, not scan, actually.
51
  if ((pMeta->scanInfo.lastScanTs > 0) && (st - pMeta->scanInfo.lastScanTs < 200)) {
551,145✔
52
    tqDebug("vgId:%d scan wal less than 200ms, do nothing", vgId);
18✔
53
    atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
18✔
54
    return code;
18✔
55
  }
56

57
  // check all tasks
58
  code = doScanWalForAllTasks(pMeta, &numOfTasks);
551,127✔
59

60
  pMeta->scanInfo.lastScanTs = taosGetTimestampMs();
546,785✔
61
  el = (pMeta->scanInfo.lastScanTs - st);
546,785✔
62

63
  if (code) {
546,785!
64
    tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el,
×
65
            tstrerror(code));
66
  } else {
67
    tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms", vgId, el);
546,785✔
68
  }
69

70
  atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
546,788✔
71
  return code;
549,210✔
72
}
73

74
static bool waitEnoughDuration(SStreamMeta* pMeta) {
2,490,182✔
75
  if ((++pMeta->scanInfo.tickCounter) >= SCAN_WAL_WAIT_COUNT) {
2,490,182✔
76
    pMeta->scanInfo.tickCounter = 0;
1,242,180✔
77
    return true;
1,242,180✔
78
  }
79

80
  return false;
1,248,002✔
81
}
82

83
static void doStartScanWal(void* param, void* tmrId) {
2,505,367✔
84
  int32_t                vgId = 0;
2,505,367✔
85
  int32_t                code = 0;
2,505,367✔
86
  int32_t                numOfTasks = 0;
2,505,367✔
87
  tmr_h                  pTimer = NULL;
2,505,367✔
88
  SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
2,505,367✔
89

90
  tqTrace("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId);
2,505,367✔
91

92
  SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
2,505,367✔
93
  if (pMeta == NULL) {
2,505,367!
94
    tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
×
95
    taosMemoryFree(pParam);
×
96
    return;
1,259,637✔
97
  }
98

99
  vgId = pMeta->vgId;
2,505,367✔
100
  code = streamTimerGetInstance(&pTimer);
2,505,367✔
101
  if (code) {
2,505,367!
102
    tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code));
×
103
    taosMemoryFree(pParam);
×
104
    return;
×
105
  }
106

107
  if (pMeta->closeFlag) {
2,505,367✔
108
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
11,609✔
109
    if (code == TSDB_CODE_SUCCESS) {
11,609!
110
      tqInfo("vgId:%d jump out of scan wal timer since closed", vgId);
11,609!
111
    } else {
112
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
113
              tstrerror(code));
114
    }
115

116
    taosMemoryFree(pParam);
11,609!
117
    return;
11,609✔
118
  }
119

120
  if (pMeta->role != NODE_ROLE_LEADER) {
2,493,758✔
121
    tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role);
26!
122

123
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
26✔
124
    if (code == TSDB_CODE_SUCCESS) {
26!
125
      tqDebug("vgId:%d jump out of scan wal timer since not leader", vgId);
26!
126
    } else {
127
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
128
              tstrerror(code));
129
    }
130

131
    taosMemFree(pParam);
26✔
132
    return;
26✔
133
  }
134

135
  if (pMeta->startInfo.startAllTasks) {
2,493,732✔
136
    tqDebug("vgId:%d in restart procedure, not ready to scan wal", vgId);
3,550✔
137
    goto _end;
3,550✔
138
  }
139

140
  if (!waitEnoughDuration(pMeta)) {
2,490,182✔
141
    streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
1,248,002✔
142
                   "scan-wal");
143
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
1,248,002✔
144
    if (code) {
1,248,002!
145
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
146
              tstrerror(code));
147
    }
148
    return;
1,248,002✔
149
  }
150

151
  // failed to lock, try 500ms later
152
  code = streamMetaTryRlock(pMeta);
1,242,180✔
153
  if (code == 0) {
1,242,180✔
154
    numOfTasks = taosArrayGetSize(pMeta->pTaskList);
1,241,819✔
155
    streamMetaRUnLock(pMeta);
1,241,819✔
156
  } else {
157
    numOfTasks = 0;
361✔
158
  }
159

160
  if (numOfTasks > 0) {
1,242,180✔
161
    tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
552,169✔
162

163
#if 0
164
  //  wait for the vnode is freed, and invalid read may occur.
165
  taosMsleep(10000);
166
#endif
167

168
    code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA, false);
552,169✔
169
    if (code) {
552,169!
170
      tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
×
171
    }
172
  }
173

174
_end:
1,242,180✔
175
  streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
1,245,730✔
176
  tqTrace("vgId:%d try scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
1,245,730✔
177

178
  code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
1,245,730✔
179
  if (code) {
1,245,730!
180
    tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
181
            tstrerror(code));
182
  }
183
}
184

185
void tqScanWalAsync(STQ* pTq) {
11,632✔
186
  SStreamMeta*           pMeta = pTq->pStreamMeta;
11,632✔
187
  int32_t                code = 0;
11,632✔
188
  int32_t                vgId = TD_VID(pTq->pVnode);
11,632✔
189
  tmr_h                  pTimer = NULL;
11,632✔
190
  SBuildScanWalMsgParam* pParam = NULL;
11,632✔
191

192
  // 1. the vnode should be the leader.
193
  // 2. the stream isn't disabled
194
  if ((pMeta->role != NODE_ROLE_LEADER) || tsDisableStream) {
11,632!
195
    tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId);
×
196
    return;
×
197
  }
198

199
  pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
11,634!
200
  if (pParam == NULL) {
11,635!
201
    tqError("vgId:%d failed to start scan wal, stream not executes, code:%s", vgId, tstrerror(code));
×
202
    return;
×
203
  }
204

205
  pParam->metaId = pMeta->rid;
11,635✔
206
  pParam->msgCb = pTq->pVnode->msgCb;
11,635✔
207

208
  code = streamTimerGetInstance(&pTimer);
11,635✔
209
  if (code) {
11,635!
210
    tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
×
211
    taosMemoryFree(pParam);
×
212
  } else {
213
    streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
11,635✔
214
                   "scan-wal");
215
  }
216
}
217

218
int32_t tqStopStreamAllTasksAsync(SStreamMeta* pMeta, SMsgCb* pMsgCb) {
5,672✔
219
  return streamTaskSchedTask(pMsgCb, pMeta->vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS, false);
5,672✔
220
}
221

222
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
535,169✔
223
  // seek the stored version and extract data from WAL
224
  int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
535,169✔
225
  if (pTask->chkInfo.nextProcessVer < firstVer) {
543,724✔
226
    tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64,
2!
227
           vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer);
228

229
    pTask->chkInfo.nextProcessVer = firstVer;
2✔
230

231
    // todo need retry if failed
232
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
2✔
233
    if (code != TSDB_CODE_SUCCESS) {
2!
234
      return code;
×
235
    }
236

237
    // append the data for the stream
238
    tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer);
2!
239
  } else {
240
    int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
543,722✔
241
    if (currentVer == -1) {  // we only seek the read for the first time
540,626✔
242
      int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
4,137✔
243
      if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
4,133!
244
        return code;
×
245
      }
246

247
      // append the data for the stream
248
      tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
4,133✔
249
              pTask->chkInfo.nextProcessVer);
250
    }
251
  }
252

253
  int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
540,624✔
254
  if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) {
539,528✔
255
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
133✔
256
    if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
133!
257
      return code;
×
258
    }
259

260
    tqDebug("vgId:%d s-task:%s wal reader jump to ver:%" PRId64, vgId, pTask->id.idStr, skipToVer);
133✔
261
  }
262

263
  return TSDB_CODE_SUCCESS;
539,876✔
264
}
265

266
// todo handle memory error
267
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
650,664✔
268
  const char* id = pTask->id.idStr;
650,664✔
269
  int64_t     maxVer = pTask->step2Range.maxVer;
650,664✔
270

271
  if ((pTask->info.fillHistory == 1) && ver > maxVer) {
650,664✔
272
    if (!pTask->status.appendTranstateBlock) {
634!
273
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
634!
274
            ", not scan wal anymore, add transfer-state block into inputQ",
275
            id, ver, maxVer);
276

277
      double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
634✔
278
      qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
634✔
279
             id, pTask->step2Range.minVer, maxVer, el);
280
      int32_t code = streamTaskPutTranstateIntoInputQ(pTask);
634✔
281
      if (code) {
634!
282
        qError("s-task:%s failed to put trans-state into inputQ", id);
×
283
      }
284

285
      return true;
634✔
286
    } else {
287
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
×
288
            ", not scan wal",
289
            id, ver, pTask->step2Range.minVer, maxVer);
290
    }
291
  }
292

293
  return false;
641,804✔
294
}
295

296
bool taskReadyForDataFromWal(SStreamTask* pTask) {
1,186,121✔
297
  // non-source or fill-history tasks don't need to response the WAL scan action.
298
  SSTaskBasicInfo* pInfo = &pTask->info;
1,186,121✔
299
  if ((pInfo->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
1,186,121✔
300
    return false;
533,261✔
301
  }
302

303
  if (pInfo->trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
652,860✔
304
    return false;
36,987✔
305
  }
306

307
  if (pInfo->fillHistory == STREAM_RECALCUL_TASK) {
615,873!
308
    return false;
×
309
  }
310

311
  // not in ready state, do not handle the data from wal
312
  SStreamTaskState pState = streamTaskGetStatus(pTask);
615,873✔
313
  if (pState.state != TASK_STATUS__READY) {
615,750✔
314
    tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState.name);
63,700✔
315
    return false;
63,659✔
316
  }
317

318
  // fill-history task has entered into the last phase, no need to do anything
319
  if ((pInfo->fillHistory == STREAM_HISTORY_TASK) && pTask->status.appendTranstateBlock) {
552,050✔
320
    // the maximum version of data in the WAL has reached already, the step2 is done
321
    tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
7,436✔
322
            pTask->dataRange.range.maxVer);
323
    return false;
7,436✔
324
  }
325

326
  // check whether input queue is full or not
327
  if (streamQueueIsFull(pTask->inputq.queue)) {
544,614✔
328
    tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr);
1,247!
329
    int32_t code = streamTrySchedExec(pTask, false);
1,247✔
330
    if (code) {
×
331
      tqError("s-task:%s failed to start task while inputQ is full", pTask->id.idStr);
×
332
    }
333
    return false;
×
334
  }
335

336
  // the input queue of downstream task is full, so the output is blocked, stopped for a while
337
  if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
543,621!
338
    tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
×
339
    return false;
×
340
  }
341

342
  return true;
543,621✔
343
}
344

345
int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) {
538,766✔
346
  const char* id = pTask->id.idStr;
538,766✔
347
  int32_t     numOfNewItems = 0;
538,766✔
348
  int32_t     code = 0;
538,766✔
349
  *pSucc = false;
538,766✔
350

351
  while (1) {
107,594✔
352
    if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
646,360!
353
      *numOfItems += numOfNewItems;
×
354
      return numOfNewItems > 0;
×
355
    }
356

357
    SStreamQueueItem* pItem = NULL;
646,360✔
358
    code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
646,360✔
359
    if (code != TSDB_CODE_SUCCESS || pItem == NULL) {  // failed, continue
649,514✔
360
      int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
541,762✔
361
      bool    itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
543,128✔
362
      if (itemInFillhistory) {
535,982✔
363
        numOfNewItems += 1;
477✔
364
      }
365
      break;
535,982✔
366
    }
367

368
    if (pItem != NULL) {
107,752!
369
      code = streamTaskPutDataIntoInputQ(pTask, pItem);
107,753✔
370
      if (code == TSDB_CODE_SUCCESS) {
107,769!
371
        numOfNewItems += 1;
107,769✔
372
        int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
107,769✔
373
        pTask->chkInfo.nextProcessVer = ver;
107,761✔
374
        tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver);
107,761✔
375

376
        bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
107,762✔
377
        if (itemInFillhistory) {
107,752✔
378
          break;
157✔
379
        }
380
      } else {
381
        if (code == TSDB_CODE_OUT_OF_MEMORY) {
×
382
          tqError("s-task:%s failed to put data into inputQ, since out of memory", id);
×
383
        } else {
384
          tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id,
×
385
                  pTask->chkInfo.nextProcessVer);
386
          code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
×
387
          if (code) {
×
388
            tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
×
389
          }
390

391
          code = 0;  // reset the error code
×
392
        }
393

394
        break;
×
395
      }
396
    }
397
  }
398

399
  *numOfItems += numOfNewItems;
536,139✔
400
  *pSucc = (numOfNewItems > 0);
536,139✔
401
  return code;
536,139✔
402
}
403

404
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) {
546,499✔
405
  int32_t vgId = pStreamMeta->vgId;
546,499✔
406
  SArray* pTaskList = NULL;
546,499✔
407
  int32_t numOfTasks = 0;
546,499✔
408

409
  // clone the task list, to avoid the task update during scan wal files
410
  streamMetaWLock(pStreamMeta);
546,499✔
411
  pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
548,890✔
412
  streamMetaWUnLock(pStreamMeta);
551,166✔
413
  if (pTaskList == NULL) {
549,858!
414
    tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno));
×
415
    return terrno;
×
416
  }
417

418
  // update the new task number
419
  numOfTasks = taosArrayGetSize(pTaskList);
549,858✔
420
  if (pNumOfTasks != NULL) {
538,885!
421
    *pNumOfTasks = numOfTasks;
542,283✔
422
  }
423

424
  tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
538,885✔
425

426
  for (int32_t i = 0; i < numOfTasks; ++i) {
1,742,169✔
427
    STaskId* pTaskId = taosArrayGet(pTaskList, i);
1,186,113✔
428
    if (pTaskId == NULL) {
1,189,911!
429
      continue;
651,897✔
430
    }
431

432
    SStreamTask* pTask = NULL;
1,189,911✔
433
    int32_t      code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
1,189,911✔
434
    if (pTask == NULL || code != 0) {
1,201,056!
435
      continue;
193✔
436
    }
437

438
    if (!taskReadyForDataFromWal(pTask)) {
1,200,863✔
439
      streamMetaReleaseTask(pStreamMeta, pTask);
654,961✔
440
      continue;
647,462✔
441
    }
442

443
    // seek the stored version and extract data from WAL
444
    code = setWalReaderStartOffset(pTask, vgId);
541,225✔
445
    if (code != TSDB_CODE_SUCCESS) {
540,926!
446
      streamMetaReleaseTask(pStreamMeta, pTask);
×
447
      continue;
×
448
    }
449

450
    int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
540,926✔
451
    int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
542,853✔
452

453
    streamMutexLock(&pTask->lock);
542,853✔
454

455
    SStreamTaskState state = streamTaskGetStatus(pTask);
544,231✔
456
    if (state.state != TASK_STATUS__READY) {
542,652!
457
      tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, state.name);
×
458
      streamMutexUnlock(&pTask->lock);
×
459
      streamMetaReleaseTask(pStreamMeta, pTask);
×
460
      continue;
4,242✔
461
    }
462

463
    bool hasNewData = false;
542,652✔
464
    code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData);
542,652✔
465
    streamMutexUnlock(&pTask->lock);
538,856✔
466

467
    if ((numOfItems > 0) || hasNewData) {
544,049!
468
      code = streamTrySchedExec(pTask, false);
14,686✔
469
      if (code != TSDB_CODE_SUCCESS) {
15,028!
470
        streamMetaReleaseTask(pStreamMeta, pTask);
×
471
        taosArrayDestroy(pTaskList);
×
472
        return code;
×
473
      }
474
    }
475

476
    streamMetaReleaseTask(pStreamMeta, pTask);
544,391✔
477
  }
478

479
  taosArrayDestroy(pTaskList);
556,056✔
480
  return TSDB_CODE_SUCCESS;
550,999✔
481
}
482

483
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
×
484
  SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
×
485
  p->metaId = pTq->pStreamMeta->rid;
×
486

487
  doStartScanWal(p, 0);
×
488
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc