• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3545

02 Dec 2024 06:22AM UTC coverage: 60.839% (-0.04%) from 60.88%
#3545

push

travis-ci

web-flow
Merge pull request #28961 from taosdata/fix/refactor-vnode-management-open-vnode

fix/refactor-vnode-management-open-vnode

120592 of 253473 branches covered (47.58%)

Branch coverage included in aggregate %.

102 of 145 new or added lines in 3 files covered. (70.34%)

477 existing lines in 108 files now uncovered.

201840 of 276506 relevant lines covered (73.0%)

19392204.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.4
/source/dnode/vnode/src/tq/tqStreamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "vnd.h"
18

19
#define MAX_REPEAT_SCAN_THRESHOLD 3
20
#define SCAN_WAL_IDLE_DURATION    100
21

22
typedef struct SBuildScanWalMsgParam {
23
  int64_t metaId;
24
  int32_t numOfTasks;
25
} SBuildScanWalMsgParam;
26

27
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
28
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
29
static bool    handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
30
static bool    taskReadyForDataFromWal(SStreamTask* pTask);
31
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
32
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
33

34
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
35
int32_t tqScanWal(STQ* pTq) {
93,017✔
36
  SStreamMeta* pMeta = pTq->pStreamMeta;
93,017✔
37
  int32_t      vgId = pMeta->vgId;
93,017✔
38
  int64_t      st = taosGetTimestampMs();
93,063✔
39
  int32_t      numOfTasks = 0;
93,063✔
40
  bool         shouldIdle = true;
93,063✔
41

42
  tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
93,063✔
43

44
  // check all tasks
45
  int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle);
93,063✔
46
  if (code) {
93,041!
UNCOV
47
    tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
×
UNCOV
48
    return code;
×
49
  }
50

51
  streamMetaWLock(pMeta);
93,041✔
52
  int32_t times = (--pMeta->scanInfo.scanCounter);
93,063✔
53
  if (times < 0) {
93,063!
54
    tqError("vgId:%d invalid scan counter:%d, reset to 0", vgId, times);
×
55
    times = 0;
×
56
  }
57

58
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
93,063✔
59
  streamMetaWUnLock(pMeta);
93,056✔
60

61
  int64_t el = (taosGetTimestampMs() - st);
93,055✔
62
  tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 " ms", vgId, el);
93,055✔
63

64
  if (times > 0) {
93,055✔
65
    tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
37,457✔
66
    code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION);
37,457✔
67
    if (code) {
37,461!
68
      tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION);
×
69
    }
70
  }
71

72
  return code;
93,053✔
73
}
74

75
static void doStartScanWal(void* param, void* tmrId) {
37,461✔
76
  int32_t vgId = 0;
37,461✔
77
  STQ*    pTq = NULL;
37,461✔
78
  int32_t code = 0;
37,461✔
79

80
  SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
37,461✔
81

82
  SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
37,461✔
83
  if (pMeta == NULL) {
37,461!
UNCOV
84
    tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
×
UNCOV
85
    taosMemoryFree(pParam);
×
UNCOV
86
    return;
×
87
  }
88

89
  vgId = pMeta->vgId;
37,461✔
90
  pTq = pMeta->ahandle;
37,461✔
91

92
  tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
37,461✔
93
          pTq->pVnode->restored);
94

95
  code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
37,461✔
96
  if (code) {
37,461✔
97
    tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
205!
98
  }
99

100
  code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
37,461✔
101
  if (code) {
37,461!
102
    tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
103
            tstrerror(code));
104
  }
105

106
  taosMemoryFree(pParam);
37,461✔
107
}
108

109
int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
37,447✔
110
  SStreamMeta*           pMeta = pTq->pStreamMeta;
37,447✔
111
  int32_t                code = 0;
37,447✔
112
  int32_t                vgId = TD_VID(pTq->pVnode);
37,447✔
113
  tmr_h                  pTimer = NULL;
37,447✔
114
  SBuildScanWalMsgParam* pParam = NULL;
37,447✔
115

116
  pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
37,447✔
117
  if (pParam == NULL) {
37,456!
118
    return terrno;
×
119
  }
120

121
  pParam->metaId = pMeta->rid;
37,456✔
122
  pParam->numOfTasks = numOfTasks;
37,456✔
123

124
  code = streamTimerGetInstance(&pTimer);
37,456✔
125
  if (code) {
37,445!
126
    tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
×
127
    taosMemoryFree(pParam);
×
128
  } else {
129
    streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut");
37,445✔
130
  }
131

132
  return code;
37,461✔
133
}
134

135
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
532,815✔
136
  int32_t      vgId = TD_VID(pTq->pVnode);
532,815✔
137
  SStreamMeta* pMeta = pTq->pStreamMeta;
532,815✔
138
  bool         alreadyRestored = pTq->pVnode->restored;
532,815✔
139
  int32_t      numOfTasks = 0;
532,815✔
140

141
  // do not launch the stream tasks, if it is a follower or not restored vnode.
142
  if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
532,815✔
143
    return TSDB_CODE_SUCCESS;
25,639✔
144
  }
145

146
  streamMetaWLock(pMeta);
507,262✔
147

148
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
507,264✔
149
  if (numOfTasks == 0) {
507,222✔
150
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
14✔
151
    streamMetaWUnLock(pMeta);
14✔
152
    return 0;
14✔
153
  }
154

155
  if (pMeta->startInfo.startAllTasks) {
507,208✔
156
    tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
10,116✔
157
    streamMetaWUnLock(pMeta);
10,116✔
158
    return 0;
10,116✔
159
  }
160

161
  pMeta->scanInfo.scanCounter += 1;
497,092✔
162
  if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
497,092✔
163
    pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
402,890✔
164
  }
165

166
  if (pMeta->scanInfo.scanCounter > 1) {
497,092✔
167
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter);
440,716✔
168
    streamMetaWUnLock(pMeta);
440,721✔
169
    return 0;
440,715✔
170
  }
171

172
  int32_t numOfPauseTasks = pMeta->numOfPausedTasks;
56,376✔
173
  if (ckPause && numOfTasks == numOfPauseTasks) {
56,376✔
174
    tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
133✔
175

176
    // reset the counter value, since we do not launch the scan wal operation.
177
    pMeta->scanInfo.scanCounter = 0;
133✔
178
    streamMetaWUnLock(pMeta);
133✔
179
    return 0;
134✔
180
  }
181

182
  tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
56,243✔
183
          numOfTasks, alreadyRestored);
184

185
  int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
56,243✔
186
  streamMetaWUnLock(pMeta);
56,272✔
187

188
  return code;
56,281✔
189
}
190

191
int32_t tqStopStreamTasksAsync(STQ* pTq) {
5,020✔
192
  SStreamMeta* pMeta = pTq->pStreamMeta;
5,020✔
193
  int32_t      vgId = pMeta->vgId;
5,020✔
194
  return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS);
5,020✔
195
}
196

197
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
115,047✔
198
  // seek the stored version and extract data from WAL
199
  int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
115,047✔
200
  if (pTask->chkInfo.nextProcessVer < firstVer) {
115,049✔
201
    tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64,
14!
202
           vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer);
203

204
    pTask->chkInfo.nextProcessVer = firstVer;
14✔
205

206
    // todo need retry if failed
207
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
14✔
208
    if (code != TSDB_CODE_SUCCESS) {
14!
209
      return code;
×
210
    }
211

212
    // append the data for the stream
213
    tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer);
14✔
214
  } else {
215
    int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
115,035✔
216
    if (currentVer == -1) {  // we only seek the read for the first time
114,992✔
217
      int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
5,244✔
218
      if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
5,243!
219
        return code;
×
220
      }
221

222
      // append the data for the stream
223
      tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
5,243✔
224
              pTask->chkInfo.nextProcessVer);
225
    }
226
  }
227

228
  int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
115,005✔
229
  if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) {
114,991✔
230
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
295✔
231
    if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
295!
232
      return code;
×
233
    }
234

235
    tqDebug("vgId:%d s-task:%s wal reader jump to ver:%" PRId64, vgId, pTask->id.idStr, skipToVer);
295✔
236
  }
237

238
  return TSDB_CODE_SUCCESS;
114,959✔
239
}
240

241
// todo handle memory error
242
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
567,599✔
243
  const char* id = pTask->id.idStr;
567,599✔
244
  int64_t     maxVer = pTask->step2Range.maxVer;
567,599✔
245

246
  if ((pTask->info.fillHistory == 1) && ver > maxVer) {
567,599✔
247
    if (!pTask->status.appendTranstateBlock) {
1,378!
248
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
1,378!
249
            ", not scan wal anymore, add transfer-state block into inputQ",
250
            id, ver, maxVer);
251

252
      double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
1,378✔
253
      qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
1,378✔
254
             id, pTask->step2Range.minVer, maxVer, el);
255
      int32_t code = streamTaskPutTranstateIntoInputQ(pTask);
1,378✔
256
      if (code) {
1,378!
257
        qError("s-task:%s failed to put trans-state into inputQ", id);
×
258
      }
259

260
      return true;
1,378✔
261
    } else {
262
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
×
263
            ", not scan wal",
264
            id, ver, pTask->step2Range.minVer, maxVer);
265
    }
266
  }
267

268
  return false;
566,209✔
269
}
270

271
bool taskReadyForDataFromWal(SStreamTask* pTask) {
381,582✔
272
  // non-source or fill-history tasks don't need to response the WAL scan action.
273
  SSTaskBasicInfo* pInfo = &pTask->info;
381,582✔
274
  if ((pInfo->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
381,582✔
275
    return false;
199,298✔
276
  }
277

278
  if (pInfo->taskLevel == TASK_LEVEL__SOURCE && pInfo->trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
182,284✔
279
    return false;
6,421✔
280
  }
281

282
  // not in ready state, do not handle the data from wal
283
  SStreamTaskState pState = streamTaskGetStatus(pTask);
175,863✔
284
  if (pState.state != TASK_STATUS__READY) {
175,834✔
285
    tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState.name);
45,968✔
286
    return false;
45,970✔
287
  }
288

289
  // fill-history task has entered into the last phase, no need to anything
290
  if ((pInfo->fillHistory == 1) && pTask->status.appendTranstateBlock) {
129,866✔
291
    // the maximum version of data in the WAL has reached already, the step2 is done
292
    tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
14,813✔
293
            pTask->dataRange.range.maxVer);
294
    return false;
14,813✔
295
  }
296

297
  // check if input queue is full or not
298
  if (streamQueueIsFull(pTask->inputq.queue)) {
115,053✔
299
    tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
20!
300
    return false;
×
301
  }
302

303
  // the input queue of downstream task is full, so the output is blocked, stopped for a while
304
  if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
115,077!
305
    tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
×
306
    return false;
×
307
  }
308

309
  return true;
115,077✔
310
}
311

312
int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) {
115,009✔
313
  const char* id = pTask->id.idStr;
115,009✔
314
  int32_t     numOfNewItems = 0;
115,009✔
315
  int32_t     code = 0;
115,009✔
316
  *pSucc = false;
115,009✔
317

318
  while (1) {
452,694✔
319
    if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
567,703!
320
      *numOfItems += numOfNewItems;
×
321
      return numOfNewItems > 0;
×
322
    }
323

324
    SStreamQueueItem* pItem = NULL;
567,703✔
325
    code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
567,703✔
326
    if (code != TSDB_CODE_SUCCESS || pItem == NULL) {  // failed, continue
567,616✔
327
      int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
114,699✔
328
      bool    itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
114,720✔
329
      if (itemInFillhistory) {
114,695✔
330
        numOfNewItems += 1;
1,173✔
331
      }
332
      break;
114,695✔
333
    }
334

335
    if (pItem != NULL) {
452,917!
336
      code = streamTaskPutDataIntoInputQ(pTask, pItem);
452,919✔
337
      if (code == TSDB_CODE_SUCCESS) {
452,919✔
338
        numOfNewItems += 1;
452,917✔
339
        int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
452,917✔
340
        pTask->chkInfo.nextProcessVer = ver;
452,908✔
341
        tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver);
452,908✔
342

343
        bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
452,908✔
344
        if (itemInFillhistory) {
452,901✔
345
          break;
205✔
346
        }
347
      } else {
348
        if (code == TSDB_CODE_OUT_OF_MEMORY) {
2!
349
          tqError("s-task:%s failed to put data into inputQ, since out of memory", id);
×
350
        } else {
351
          tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id,
2!
352
                  pTask->chkInfo.nextProcessVer);
353
          code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
2✔
354
          if (code) {
2!
355
            tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
×
356
          }
357

358
          code = 0;  // reset the error code
2✔
359
        }
360

361
        break;
2✔
362
      }
363
    }
364
  }
365

366
  *numOfItems += numOfNewItems;
114,902✔
367
  *pSucc = (numOfNewItems > 0);
114,902✔
368
  return code;
114,902✔
369
}
370

371
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
93,045✔
372
  *pScanIdle = true;
93,045✔
373
  bool    noDataInWal = true;
93,045✔
374
  int32_t vgId = pStreamMeta->vgId;
93,045✔
375

376
  int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
93,045✔
377
  if (numOfTasks == 0) {
93,047✔
378
    return TSDB_CODE_SUCCESS;
926✔
379
  }
380

381
  // clone the task list, to avoid the task update during scan wal files
382
  SArray* pTaskList = NULL;
92,121✔
383
  streamMetaWLock(pStreamMeta);
92,121✔
384
  pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
92,127✔
385
  streamMetaWUnLock(pStreamMeta);
92,127✔
386
  if (pTaskList == NULL) {
92,117!
387
    tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno));
×
388
    return terrno;
×
389
  }
390

391
  tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
92,117✔
392

393
  // update the new task number
394
  numOfTasks = taosArrayGetSize(pTaskList);
92,118✔
395

396
  for (int32_t i = 0; i < numOfTasks; ++i) {
474,308✔
397
    STaskId* pTaskId = taosArrayGet(pTaskList, i);
382,109✔
398
    if (pTaskId == NULL) {
382,044!
399
      continue;
267,134✔
400
    }
401

402
    SStreamTask* pTask = NULL;
382,044✔
403
    int32_t      code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
382,044✔
404
    if (pTask == NULL || code != 0) {
382,081!
405
      continue;
439✔
406
    }
407

408
    if (!taskReadyForDataFromWal(pTask)) {
381,642✔
409
      streamMetaReleaseTask(pStreamMeta, pTask);
266,497✔
410
      continue;
266,677✔
411
    }
412

413
    *pScanIdle = false;
115,057✔
414

415
    // seek the stored version and extract data from WAL
416
    code = setWalReaderStartOffset(pTask, vgId);
115,057✔
417
    if (code != TSDB_CODE_SUCCESS) {
114,952!
418
      streamMetaReleaseTask(pStreamMeta, pTask);
×
419
      continue;
×
420
    }
421

422
    int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
114,952✔
423
    int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
115,079✔
424

425
    streamMutexLock(&pTask->lock);
115,079✔
426

427
    SStreamTaskState state = streamTaskGetStatus(pTask);
115,053✔
428
    if (state.state != TASK_STATUS__READY) {
115,038✔
429
      tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, state.name);
33✔
430
      streamMutexUnlock(&pTask->lock);
33✔
431
      streamMetaReleaseTask(pStreamMeta, pTask);
33✔
432
      continue;
18✔
433
    }
434

435
    bool hasNewData = false;
115,005✔
436
    code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData);
115,005✔
437
    streamMutexUnlock(&pTask->lock);
114,906✔
438

439
    if ((numOfItems > 0) || hasNewData) {
115,054!
440
      noDataInWal = false;
37,049✔
441
      code = streamTrySchedExec(pTask);
37,049✔
442
      if (code != TSDB_CODE_SUCCESS) {
37,048!
UNCOV
443
        streamMetaReleaseTask(pStreamMeta, pTask);
×
UNCOV
444
        taosArrayDestroy(pTaskList);
×
UNCOV
445
        return code;
×
446
      }
447
    }
448

449
    streamMetaReleaseTask(pStreamMeta, pTask);
115,053✔
450
  }
451

452
  // all wal are checked, and no new data available in wal.
453
  if (noDataInWal) {
92,199✔
454
    *pScanIdle = true;
68,178✔
455
  }
456

457
  taosArrayDestroy(pTaskList);
92,199✔
458
  return TSDB_CODE_SUCCESS;
92,129✔
459
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc