• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3631

07 Mar 2025 03:18PM UTC coverage: 60.671% (-3.0%) from 63.629%
#3631

push

travis-ci

web-flow
Merge pull request #30074 from taosdata/ciup30

ci: update ci workflow to fix path issue

141481 of 300084 branches covered (47.15%)

Branch coverage included in aggregate %.

223132 of 300884 relevant lines covered (74.16%)

7878557.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.06
/source/dnode/vnode/src/tq/tqStreamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "vnd.h"
18

19
#define SCAN_WAL_IDLE_DURATION    250  // idle for 500ms to do next wal scan
20
#define SCAN_WAL_WAIT_COUNT       2
21

22
typedef struct SBuildScanWalMsgParam {
23
  int64_t metaId;
24
  SMsgCb  msgCb;
25
} SBuildScanWalMsgParam;
26

27
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks);
28
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
29
static bool    handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
30
static bool    taskReadyForDataFromWal(SStreamTask* pTask);
31
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
32

33
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
34
int32_t tqScanWal(STQ* pTq) {
72,235✔
35
  SStreamMeta* pMeta = pTq->pStreamMeta;
72,235✔
36
  int32_t      vgId = pMeta->vgId;
72,235✔
37
  int64_t      st = taosGetTimestampMs();
72,245✔
38
  int32_t      numOfTasks = 0;
72,245✔
39
  int64_t      el = 0;
72,245✔
40
  int32_t      code = 0;
72,245✔
41

42
  int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanSentinel, 0, 1);
72,245✔
43
  if (old == 0) {
72,244!
44
    tqDebug("vgId:%d try to scan wal to extract data", vgId);
72,246✔
45
  } else {
46
    tqDebug("vgId:%d already in wal scan, abort", vgId);
×
47
    return code;
1✔
48
  }
49

50
  // the scan wal interval less than 200, not scan, actually.
51
  if ((pMeta->scanInfo.lastScanTs > 0) && (st - pMeta->scanInfo.lastScanTs < 200)) {
72,246✔
52
    tqDebug("vgId:%d scan wal less than 200ms, do nothing", vgId);
34✔
53
    atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
34✔
54
    return code;
34✔
55
  }
56

57
  // check all tasks
58
  code = doScanWalForAllTasks(pMeta, &numOfTasks);
72,212✔
59

60
  pMeta->scanInfo.lastScanTs = taosGetTimestampMs();
72,210✔
61
  el = (pMeta->scanInfo.lastScanTs - st);
72,210✔
62

63
  if (code) {
72,210!
64
    tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el,
×
65
            tstrerror(code));
66
  } else {
67
    tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms", vgId, el);
72,210✔
68
  }
69

70
  atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
72,211✔
71
  return code;
72,215✔
72
}
73

74
static bool waitEnoughDuration(SStreamMeta* pMeta) {
686,700✔
75
  if ((++pMeta->scanInfo.tickCounter) >= SCAN_WAL_WAIT_COUNT) {
686,700✔
76
    pMeta->scanInfo.tickCounter = 0;
341,244✔
77
    return true;
341,244✔
78
  }
79

80
  return false;
345,456✔
81
}
82

83
static void doStartScanWal(void* param, void* tmrId) {
697,276✔
84
  int32_t                vgId = 0;
697,276✔
85
  int32_t                code = 0;
697,276✔
86
  int32_t                numOfTasks = 0;
697,276✔
87
  tmr_h                  pTimer = NULL;
697,276✔
88
  SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
697,276✔
89

90
  tqDebug("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId);
697,276✔
91

92
  SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
697,276✔
93
  if (pMeta == NULL) {
697,276!
94
    tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
×
95
    taosMemoryFree(pParam);
×
96
    return;
354,193✔
97
  }
98

99
  vgId = pMeta->vgId;
697,276✔
100
  code = streamTimerGetInstance(&pTimer);
697,276✔
101
  if (code) {
697,276!
102
    tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code));
×
103
    taosMemoryFree(pParam);
×
104
    return;
×
105
  }
106

107
  if (pMeta->closeFlag) {
697,276✔
108
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
8,722✔
109
    if (code == TSDB_CODE_SUCCESS) {
8,722!
110
      tqInfo("vgId:%d jump out of scan wal timer since closed", vgId);
8,722!
111
    } else {
112
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
113
              tstrerror(code));
114
    }
115

116
    taosMemoryFree(pParam);
8,722!
117
    return;
8,722✔
118
  }
119

120
  if (pMeta->role != NODE_ROLE_LEADER) {
688,554✔
121
    tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role);
15!
122

123
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
15✔
124
    if (code == TSDB_CODE_SUCCESS) {
15!
125
      tqDebug("vgId:%d jump out of scan wal timer since not leader", vgId);
15!
126
    } else {
127
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
128
              tstrerror(code));
129
    }
130

131
    taosMemFree(pParam);
15✔
132
    return;
15✔
133
  }
134

135
  if (pMeta->startInfo.startAllTasks) {
688,539✔
136
    tqDebug("vgId:%d in restart procedure, not ready to scan wal", vgId);
1,839✔
137
    goto _end;
1,839✔
138
  }
139

140
  if (!waitEnoughDuration(pMeta)) {
686,700✔
141
    streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
345,456✔
142
                   "scan-wal");
143
    code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
345,456✔
144
    if (code) {
345,456!
145
      tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
146
              tstrerror(code));
147
    }
148
    return;
345,456✔
149
  }
150

151
  code = streamMetaTryRlock(pMeta);
341,244✔
152
  if (code == 0) {
341,244✔
153
    numOfTasks = taosArrayGetSize(pMeta->pTaskList);
341,001✔
154
    streamMetaRUnLock(pMeta);
341,001✔
155
  } else {
156
    numOfTasks = 0;
243✔
157
  }
158

159
  if (numOfTasks == 0) {
341,244✔
160
    goto _end;
268,991✔
161
  }
162

163
  tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
72,253✔
164

165
   #if 0
166
  //  wait for the vnode is freed, and invalid read may occur.
167
  taosMsleep(10000);
168
   #endif
169

170
  code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
72,253✔
171
  if (code) {
72,253✔
172
    tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
2!
173
  }
174

175
_end:
72,251✔
176
  streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
343,083✔
177
  tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
343,083✔
178

179
  code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
343,083✔
180
  if (code) {
343,083!
181
    tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
182
            tstrerror(code));
183
  }
184
}
185

186
void tqScanWalAsync(STQ* pTq) {
8,735✔
187
  SStreamMeta*           pMeta = pTq->pStreamMeta;
8,735✔
188
  int32_t                code = 0;
8,735✔
189
  int32_t                vgId = TD_VID(pTq->pVnode);
8,735✔
190
  tmr_h                  pTimer = NULL;
8,735✔
191
  SBuildScanWalMsgParam* pParam = NULL;
8,735✔
192

193
  // 1. the vnode should be the leader.
194
  // 2. the stream isn't disabled
195
  if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) {
8,735!
196
    tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId);
×
197
    return;
×
198
  }
199

200
  pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
8,735!
201
  if (pParam == NULL) {
8,736!
202
    tqError("vgId:%d failed to start scan wal, stream not executes, code:%s", vgId, tstrerror(code));
×
203
    return;
×
204
  }
205

206
  pParam->metaId = pMeta->rid;
8,736✔
207
  pParam->msgCb = pTq->pVnode->msgCb;
8,736✔
208

209
  code = streamTimerGetInstance(&pTimer);
8,736✔
210
  if (code) {
8,736!
211
    tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
×
212
    taosMemoryFree(pParam);
×
213
  } else {
214
    streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
8,736✔
215
                   "scan-wal");
216
  }
217
}
218

219
int32_t tqStopStreamAllTasksAsync(SStreamMeta* pMeta, SMsgCb* pMsgCb) {
4,668✔
220
  return streamTaskSchedTask(pMsgCb, pMeta->vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS);
4,668✔
221
}
222

223
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
101,370✔
224
  // seek the stored version and extract data from WAL
225
  int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
101,370✔
226
  if (pTask->chkInfo.nextProcessVer < firstVer) {
101,364✔
227
    tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64,
2!
228
           vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer);
229

230
    pTask->chkInfo.nextProcessVer = firstVer;
2✔
231

232
    // todo need retry if failed
233
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
2✔
234
    if (code != TSDB_CODE_SUCCESS) {
2!
235
      return code;
×
236
    }
237

238
    // append the data for the stream
239
    tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer);
2!
240
  } else {
241
    int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
101,362✔
242
    if (currentVer == -1) {  // we only seek the read for the first time
101,349✔
243
      int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
4,068✔
244
      if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
4,067!
245
        return code;
×
246
      }
247

248
      // append the data for the stream
249
      tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
4,067✔
250
              pTask->chkInfo.nextProcessVer);
251
    }
252
  }
253

254
  int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
101,350✔
255
  if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) {
101,345✔
256
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
310✔
257
    if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
310!
258
      return code;
×
259
    }
260

261
    tqDebug("vgId:%d s-task:%s wal reader jump to ver:%" PRId64, vgId, pTask->id.idStr, skipToVer);
310!
262
  }
263

264
  return TSDB_CODE_SUCCESS;
101,343✔
265
}
266

267
// todo handle memory error
268
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
528,613✔
269
  const char* id = pTask->id.idStr;
528,613✔
270
  int64_t     maxVer = pTask->step2Range.maxVer;
528,613✔
271

272
  if ((pTask->info.fillHistory == 1) && ver > maxVer) {
528,613✔
273
    if (!pTask->status.appendTranstateBlock) {
873!
274
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
873!
275
            ", not scan wal anymore, add transfer-state block into inputQ",
276
            id, ver, maxVer);
277

278
      double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
873✔
279
      qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
873✔
280
             id, pTask->step2Range.minVer, maxVer, el);
281
      int32_t code = streamTaskPutTranstateIntoInputQ(pTask);
873✔
282
      if (code) {
873!
283
        qError("s-task:%s failed to put trans-state into inputQ", id);
×
284
      }
285

286
      return true;
873✔
287
    } else {
288
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
×
289
            ", not scan wal",
290
            id, ver, pTask->step2Range.minVer, maxVer);
291
    }
292
  }
293

294
  return false;
527,735✔
295
}
296

297
bool taskReadyForDataFromWal(SStreamTask* pTask) {
265,252✔
298
  // non-source or fill-history tasks don't need to response the WAL scan action.
299
  SSTaskBasicInfo* pInfo = &pTask->info;
265,252✔
300
  if ((pInfo->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
265,252✔
301
    return false;
129,050✔
302
  }
303

304
  if (pInfo->taskLevel == TASK_LEVEL__SOURCE && pInfo->trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
136,202✔
305
    return false;
12,535✔
306
  }
307

308
  // not in ready state, do not handle the data from wal
309
  SStreamTaskState pState = streamTaskGetStatus(pTask);
123,667✔
310
  if (pState.state != TASK_STATUS__READY) {
123,653✔
311
    tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState.name);
17,476✔
312
    return false;
17,476✔
313
  }
314

315
  // fill-history task has entered into the last phase, no need to anything
316
  if ((pInfo->fillHistory == 1) && pTask->status.appendTranstateBlock) {
106,177✔
317
    // the maximum version of data in the WAL has reached already, the step2 is done
318
    tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
4,812✔
319
            pTask->dataRange.range.maxVer);
320
    return false;
4,812✔
321
  }
322

323
  // check whether input queue is full or not
324
  if (streamQueueIsFull(pTask->inputq.queue)) {
101,365✔
325
    tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr);
8!
326
    int32_t code = streamTrySchedExec(pTask);
8✔
327
    if (code) {
1!
328
      tqError("s-task:%s failed to start task while inputQ is full", pTask->id.idStr);
×
329
    }
330
    return false;
1✔
331
  }
332

333
  // the input queue of downstream task is full, so the output is blocked, stopped for a while
334
  if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
101,374!
335
    tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
×
336
    return false;
×
337
  }
338

339
  return true;
101,374✔
340
}
341

342
int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) {
101,364✔
343
  const char* id = pTask->id.idStr;
101,364✔
344
  int32_t     numOfNewItems = 0;
101,364✔
345
  int32_t     code = 0;
101,364✔
346
  *pSucc = false;
101,364✔
347

348
  while (1) {
427,296✔
349
    if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
528,660!
350
      *numOfItems += numOfNewItems;
×
351
      return numOfNewItems > 0;
×
352
    }
353

354
    SStreamQueueItem* pItem = NULL;
528,660✔
355
    code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
528,660✔
356
    if (code != TSDB_CODE_SUCCESS || pItem == NULL) {  // failed, continue
528,641✔
357
      int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
101,040✔
358
      bool    itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
101,048✔
359
      if (itemInFillhistory) {
101,044✔
360
        numOfNewItems += 1;
600✔
361
      }
362
      break;
101,044✔
363
    }
364

365
    if (pItem != NULL) {
427,601!
366
      code = streamTaskPutDataIntoInputQ(pTask, pItem);
427,602✔
367
      if (code == TSDB_CODE_SUCCESS) {
427,614✔
368
        numOfNewItems += 1;
427,586✔
369
        int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
427,586✔
370
        pTask->chkInfo.nextProcessVer = ver;
427,576✔
371
        tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver);
427,576✔
372

373
        bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
427,575✔
374
        if (itemInFillhistory) {
427,570✔
375
          break;
273✔
376
        }
377
      } else {
378
        if (code == TSDB_CODE_OUT_OF_MEMORY) {
28!
379
          tqError("s-task:%s failed to put data into inputQ, since out of memory", id);
×
380
        } else {
381
          tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id,
28!
382
                  pTask->chkInfo.nextProcessVer);
383
          code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
28✔
384
          if (code) {
28!
385
            tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
×
386
          }
387

388
          code = 0;  // reset the error code
28✔
389
        }
390

391
        break;
28✔
392
      }
393
    }
394
  }
395

396
  *numOfItems += numOfNewItems;
101,345✔
397
  *pSucc = (numOfNewItems > 0);
101,345✔
398
  return code;
101,345✔
399
}
400

401
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) {
72,204✔
402
  int32_t vgId = pStreamMeta->vgId;
72,204✔
403
  SArray* pTaskList = NULL;
72,204✔
404
  int32_t numOfTasks = 0;
72,204✔
405

406
  // clone the task list, to avoid the task update during scan wal files
407
  streamMetaWLock(pStreamMeta);
72,204✔
408
  pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
72,212✔
409
  streamMetaWUnLock(pStreamMeta);
72,212✔
410
  if (pTaskList == NULL) {
72,206!
411
    tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno));
×
412
    return terrno;
×
413
  }
414

415
  // update the new task number
416
  numOfTasks = taosArrayGetSize(pTaskList);
72,206✔
417
  if (pNumOfTasks != NULL) {
72,212!
418
    *pNumOfTasks = numOfTasks;
72,214✔
419
  }
420

421
  tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
72,212✔
422

423
  for (int32_t i = 0; i < numOfTasks; ++i) {
338,097✔
424
    STaskId* pTaskId = taosArrayGet(pTaskList, i);
265,869✔
425
    if (pTaskId == NULL) {
265,858!
426
      continue;
164,503✔
427
    }
428

429
    SStreamTask* pTask = NULL;
265,858✔
430
    int32_t      code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
265,858✔
431
    if (pTask == NULL || code != 0) {
265,820!
432
      continue;
552✔
433
    }
434

435
    if (!taskReadyForDataFromWal(pTask)) {
265,268✔
436
      streamMetaReleaseTask(pStreamMeta, pTask);
163,869✔
437
      continue;
163,951✔
438
    }
439

440
    // seek the stored version and extract data from WAL
441
    code = setWalReaderStartOffset(pTask, vgId);
101,371✔
442
    if (code != TSDB_CODE_SUCCESS) {
101,343!
443
      streamMetaReleaseTask(pStreamMeta, pTask);
×
444
      continue;
×
445
    }
446

447
    int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
101,343✔
448
    int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
101,369✔
449

450
    streamMutexLock(&pTask->lock);
101,369✔
451

452
    SStreamTaskState state = streamTaskGetStatus(pTask);
101,375✔
453
    if (state.state != TASK_STATUS__READY) {
101,366!
454
      tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, state.name);
×
455
      streamMutexUnlock(&pTask->lock);
×
456
      streamMetaReleaseTask(pStreamMeta, pTask);
×
457
      continue;
×
458
    }
459

460
    bool hasNewData = false;
101,366✔
461
    code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData);
101,366✔
462
    streamMutexUnlock(&pTask->lock);
101,344✔
463

464
    if ((numOfItems > 0) || hasNewData) {
101,386!
465
      code = streamTrySchedExec(pTask);
13,027✔
466
      if (code != TSDB_CODE_SUCCESS) {
13,028!
467
        streamMetaReleaseTask(pStreamMeta, pTask);
×
468
        taosArrayDestroy(pTaskList);
×
469
        return code;
×
470
      }
471
    }
472

473
    streamMetaReleaseTask(pStreamMeta, pTask);
101,387✔
474
  }
475

476
  taosArrayDestroy(pTaskList);
72,228✔
477
  return TSDB_CODE_SUCCESS;
72,215✔
478
}
479

480
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
×
481
  SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
×
482
  p->metaId = pTq->pStreamMeta->rid;
×
483

484
  doStartScanWal(p, 0);
×
485
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc