• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3579

12 Jan 2025 03:09AM UTC coverage: 62.976% (-0.2%) from 63.183%
#3579

push

travis-ci

web-flow
Merge pull request #29551 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

139324 of 284527 branches covered (48.97%)

Branch coverage included in aggregate %.

34 of 50 new or added lines in 4 files covered. (68.0%)

1114 existing lines in 141 files now uncovered.

217258 of 281694 relevant lines covered (77.13%)

9262344.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.54
/source/dnode/vnode/src/tq/tqStreamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "vnd.h"
18

19
#define MAX_REPEAT_SCAN_THRESHOLD 3
20
#define SCAN_WAL_IDLE_DURATION    500    // idle for 500ms to do next wal scan
21

22
typedef struct SBuildScanWalMsgParam {
23
  int64_t metaId;
24
  int32_t numOfTasks;
25
} SBuildScanWalMsgParam;
26

27
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta);
28
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
29
static bool    handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
30
static bool    taskReadyForDataFromWal(SStreamTask* pTask);
31
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
32
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
33
static int32_t doScanWalAsync(STQ* pTq, bool ckPause);
34

35
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
36
int32_t tqScanWal(STQ* pTq) {
61,933✔
37
  SStreamMeta* pMeta = pTq->pStreamMeta;
61,933✔
38
  int32_t      vgId = pMeta->vgId;
61,933✔
39
  int64_t      st = taosGetTimestampMs();
61,954✔
40
  int32_t      numOfTasks = 0;
61,954✔
41

42
  tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
61,954✔
43

44
  // check all tasks
45
  int32_t code = doScanWalForAllTasks(pMeta);
61,955✔
46
  if (code) {
61,947!
47
    tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
×
48
    return code;
×
49
  }
50

51
  streamMetaWLock(pMeta);
61,947✔
52
  int32_t times = (--pMeta->scanInfo.scanCounter);
61,950✔
53
  if (times < 0) {
61,950!
54
    tqError("vgId:%d invalid scan counter:%d, reset to 0", vgId, times);
×
55
    times = 0;
×
56
  }
57

58
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
61,950✔
59
  streamMetaWUnLock(pMeta);
61,945✔
60

61
  int64_t el = (taosGetTimestampMs() - st);
61,952✔
62
  tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 " ms", vgId, el);
61,952✔
63

64
  if (times > 0) {
61,952✔
65
    tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
22,233✔
66
    code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION);
22,233✔
67
    if (code) {
22,233!
68
      tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION);
×
69
    }
70
  }
71

72
  return code;
61,950✔
73
}
74

75
static void doStartScanWal(void* param, void* tmrId) {
22,233✔
76
  int32_t vgId = 0;
22,233✔
77
  STQ*    pTq = NULL;
22,233✔
78
  int32_t code = 0;
22,233✔
79

80
  SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
22,233✔
81

82
  SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
22,233✔
83
  if (pMeta == NULL) {
22,233✔
84
    tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
199!
85
    taosMemoryFree(pParam);
199!
86
    return;
199✔
87
  }
88

89
  vgId = pMeta->vgId;
22,034✔
90
  pTq = pMeta->ahandle;
22,034✔
91

92
  tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
22,034✔
93
          pTq->pVnode->restored);
94

95
  code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
22,034✔
96
  if (code) {
22,034✔
97
    tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
552!
98
  }
99

100
  code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
22,034✔
101
  if (code) {
22,034!
102
    tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
×
103
            tstrerror(code));
104
  }
105

106
  taosMemoryFree(pParam);
22,034!
107
}
108

109
int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
22,224✔
110
  SStreamMeta*           pMeta = pTq->pStreamMeta;
22,224✔
111
  int32_t                code = 0;
22,224✔
112
  int32_t                vgId = TD_VID(pTq->pVnode);
22,224✔
113
  tmr_h                  pTimer = NULL;
22,224✔
114
  SBuildScanWalMsgParam* pParam = NULL;
22,224✔
115

116
  pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
22,224!
117
  if (pParam == NULL) {
22,232!
118
    return terrno;
×
119
  }
120

121
  pParam->metaId = pMeta->rid;
22,232✔
122
  pParam->numOfTasks = numOfTasks;
22,232✔
123

124
  code = streamTimerGetInstance(&pTimer);
22,232✔
125
  if (code) {
22,224!
126
    tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
×
127
    taosMemoryFree(pParam);
×
128
  } else {
129
    streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut");
22,224✔
130
  }
131

132
  return code;
22,233✔
133
}
134

135
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
284,546✔
136
  SStreamMeta* pMeta = pTq->pStreamMeta;
284,546✔
137
  bool         alreadyRestored = pTq->pVnode->restored;
284,546✔
138
  int32_t      code = 0;
284,546✔
139

140
  // do not launch the stream tasks, if it is a follower or not restored vnode.
141
  if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
284,546✔
142
    return TSDB_CODE_SUCCESS;
27,957✔
143
  }
144

145
  streamMetaWLock(pMeta);
256,653✔
146
  code = doScanWalAsync(pTq, ckPause);
256,654✔
147
  streamMetaWUnLock(pMeta);
256,634✔
148
  return code;
256,657✔
149
}
150

151
int32_t tqStopStreamTasksAsync(STQ* pTq) {
4,703✔
152
  SStreamMeta* pMeta = pTq->pStreamMeta;
4,703✔
153
  int32_t      vgId = pMeta->vgId;
4,703✔
154
  return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS);
4,703✔
155
}
156

157
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
66,284✔
158
  // seek the stored version and extract data from WAL
159
  int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
66,284✔
160
  if (pTask->chkInfo.nextProcessVer < firstVer) {
66,292✔
161
    tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64,
5!
162
           vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer);
163

164
    pTask->chkInfo.nextProcessVer = firstVer;
5✔
165

166
    // todo need retry if failed
167
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
5✔
168
    if (code != TSDB_CODE_SUCCESS) {
5!
169
      return code;
×
170
    }
171

172
    // append the data for the stream
173
    tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer);
5!
174
  } else {
175
    int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
66,287✔
176
    if (currentVer == -1) {  // we only seek the read for the first time
66,283✔
177
      int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
4,773✔
178
      if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
4,770!
179
        return code;
×
180
      }
181

182
      // append the data for the stream
183
      tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
4,770✔
184
              pTask->chkInfo.nextProcessVer);
185
    }
186
  }
187

188
  int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
66,285✔
189
  if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) {
66,266✔
190
    int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
448✔
191
    if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
450!
192
      return code;
×
193
    }
194

195
    tqDebug("vgId:%d s-task:%s wal reader jump to ver:%" PRId64, vgId, pTask->id.idStr, skipToVer);
450✔
196
  }
197

198
  return TSDB_CODE_SUCCESS;
66,267✔
199
}
200

201
// todo handle memory error
202
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
196,447✔
203
  const char* id = pTask->id.idStr;
196,447✔
204
  int64_t     maxVer = pTask->step2Range.maxVer;
196,447✔
205

206
  if ((pTask->info.fillHistory == 1) && ver > maxVer) {
196,447✔
207
    if (!pTask->status.appendTranstateBlock) {
782!
208
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
782!
209
            ", not scan wal anymore, add transfer-state block into inputQ",
210
            id, ver, maxVer);
211

212
      double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
782✔
213
      qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
782✔
214
             id, pTask->step2Range.minVer, maxVer, el);
215
      int32_t code = streamTaskPutTranstateIntoInputQ(pTask);
782✔
216
      if (code) {
782!
217
        qError("s-task:%s failed to put trans-state into inputQ", id);
×
218
      }
219

220
      return true;
782✔
221
    } else {
222
      qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
×
223
            ", not scan wal",
224
            id, ver, pTask->step2Range.minVer, maxVer);
225
    }
226
  }
227

228
  return false;
195,659✔
229
}
230

231
bool taskReadyForDataFromWal(SStreamTask* pTask) {
261,159✔
232
  // non-source or fill-history tasks don't need to response the WAL scan action.
233
  SSTaskBasicInfo* pInfo = &pTask->info;
261,159✔
234
  if ((pInfo->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
261,159✔
235
    return false;
146,456✔
236
  }
237

238
  if (pInfo->taskLevel == TASK_LEVEL__SOURCE && pInfo->trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
114,703!
239
    return false;
12,475✔
240
  }
241

242
  // not in ready state, do not handle the data from wal
243
  SStreamTaskState pState = streamTaskGetStatus(pTask);
102,228✔
244
  if (pState.state != TASK_STATUS__READY) {
102,218✔
245
    tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState.name);
30,567✔
246
    return false;
30,567✔
247
  }
248

249
  // fill-history task has entered into the last phase, no need to anything
250
  if ((pInfo->fillHistory == 1) && pTask->status.appendTranstateBlock) {
71,651✔
251
    // the maximum version of data in the WAL has reached already, the step2 is done
252
    tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
5,367✔
253
            pTask->dataRange.range.maxVer);
254
    return false;
5,367✔
255
  }
256

257
  // check if input queue is full or not
258
  if (streamQueueIsFull(pTask->inputq.queue)) {
66,284✔
259
    tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
6!
UNCOV
260
    return false;
×
261
  }
262

263
  // the input queue of downstream task is full, so the output is blocked, stopped for a while
264
  if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
66,297!
265
    tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
×
266
    return false;
×
267
  }
268

269
  return true;
66,297✔
270
}
271

272
int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) {
66,279✔
273
  const char* id = pTask->id.idStr;
66,279✔
274
  int32_t     numOfNewItems = 0;
66,279✔
275
  int32_t     code = 0;
66,279✔
276
  *pSucc = false;
66,279✔
277

278
  while (1) {
130,184✔
279
    if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
196,463!
280
      *numOfItems += numOfNewItems;
×
281
      return numOfNewItems > 0;
×
282
    }
283

284
    SStreamQueueItem* pItem = NULL;
196,463✔
285
    code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
196,463✔
286
    if (code != TSDB_CODE_SUCCESS || pItem == NULL) {  // failed, continue
196,460✔
287
      int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
66,115✔
288
      bool    itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
66,124✔
289
      if (itemInFillhistory) {
66,114✔
290
        numOfNewItems += 1;
631✔
291
      }
292
      break;
66,114✔
293
    }
294

295
    if (pItem != NULL) {
130,345!
296
      code = streamTaskPutDataIntoInputQ(pTask, pItem);
130,347✔
297
      if (code == TSDB_CODE_SUCCESS) {
130,348!
298
        numOfNewItems += 1;
130,348✔
299
        int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
130,348✔
300
        pTask->chkInfo.nextProcessVer = ver;
130,337✔
301
        tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver);
130,337✔
302

303
        bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
130,337✔
304
        if (itemInFillhistory) {
130,337✔
305
          break;
151✔
306
        }
307
      } else {
UNCOV
308
        if (code == TSDB_CODE_OUT_OF_MEMORY) {
×
309
          tqError("s-task:%s failed to put data into inputQ, since out of memory", id);
×
310
        } else {
UNCOV
311
          tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id,
×
312
                  pTask->chkInfo.nextProcessVer);
UNCOV
313
          code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
×
UNCOV
314
          if (code) {
×
315
            tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
×
316
          }
317

UNCOV
318
          code = 0;  // reset the error code
×
319
        }
320

UNCOV
321
        break;
×
322
      }
323
    }
324
  }
325

326
  *numOfItems += numOfNewItems;
66,265✔
327
  *pSucc = (numOfNewItems > 0);
66,265✔
328
  return code;
66,265✔
329
}
330

331
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
61,943✔
332
  int32_t vgId = pStreamMeta->vgId;
61,943✔
333
  int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
61,943✔
334
  if (numOfTasks == 0) {
61,952✔
335
    return TSDB_CODE_SUCCESS;
592✔
336
  }
337

338
  // clone the task list, to avoid the task update during scan wal files
339
  SArray* pTaskList = NULL;
61,360✔
340
  streamMetaWLock(pStreamMeta);
61,360✔
341
  pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
61,356✔
342
  streamMetaWUnLock(pStreamMeta);
61,358✔
343
  if (pTaskList == NULL) {
61,355!
344
    tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno));
×
345
    return terrno;
×
346
  }
347

348
  tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
61,355✔
349

350
  // update the new task number
351
  numOfTasks = taosArrayGetSize(pTaskList);
61,355✔
352

353
  for (int32_t i = 0; i < numOfTasks; ++i) {
323,096✔
354
    STaskId* pTaskId = taosArrayGet(pTaskList, i);
261,717✔
355
    if (pTaskId == NULL) {
261,694!
356
      continue;
195,452✔
357
    }
358

359
    SStreamTask* pTask = NULL;
261,694✔
360
    int32_t      code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
261,694✔
361
    if (pTask == NULL || code != 0) {
261,712!
362
      continue;
519✔
363
    }
364

365
    if (!taskReadyForDataFromWal(pTask)) {
261,193✔
366
      streamMetaReleaseTask(pStreamMeta, pTask);
194,861✔
367
      continue;
194,942✔
368
    }
369

370
    // seek the stored version and extract data from WAL
371
    code = setWalReaderStartOffset(pTask, vgId);
66,283✔
372
    if (code != TSDB_CODE_SUCCESS) {
66,265!
373
      streamMetaReleaseTask(pStreamMeta, pTask);
×
374
      continue;
×
375
    }
376

377
    int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
66,265✔
378
    int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
66,295✔
379

380
    streamMutexLock(&pTask->lock);
66,295✔
381

382
    SStreamTaskState state = streamTaskGetStatus(pTask);
66,298✔
383
    if (state.state != TASK_STATUS__READY) {
66,293✔
384
      tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, state.name);
13✔
385
      streamMutexUnlock(&pTask->lock);
13✔
386
      streamMetaReleaseTask(pStreamMeta, pTask);
13✔
UNCOV
387
      continue;
×
388
    }
389

390
    bool hasNewData = false;
66,280✔
391
    code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData);
66,280✔
392
    streamMutexUnlock(&pTask->lock);
66,267✔
393

394
    if ((numOfItems > 0) || hasNewData) {
66,288!
395
      code = streamTrySchedExec(pTask);
25,082✔
396
      if (code != TSDB_CODE_SUCCESS) {
25,082!
397
        streamMetaReleaseTask(pStreamMeta, pTask);
×
398
        taosArrayDestroy(pTaskList);
×
399
        return code;
×
400
      }
401
    }
402

403
    streamMetaReleaseTask(pStreamMeta, pTask);
66,288✔
404
  }
405

406
  taosArrayDestroy(pTaskList);
61,379✔
407
  return TSDB_CODE_SUCCESS;
61,357✔
408
}
409

410
int32_t doScanWalAsync(STQ* pTq, bool ckPause) {
256,603✔
411
  SStreamMeta* pMeta = pTq->pStreamMeta;
256,603✔
412
  bool         alreadyRestored = pTq->pVnode->restored;
256,603✔
413
  int32_t      vgId = pMeta->vgId;
256,603✔
414
  int32_t      numOfTasks = taosArrayGetSize(pMeta->pTaskList);
256,603✔
415

416
  if (numOfTasks == 0) {
256,632✔
417
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
10!
418
    return 0;
14✔
419
  }
420

421
  if (pMeta->startInfo.startAllTasks) {
256,622✔
422
    tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
9,324✔
423
    return 0;
9,324✔
424
  }
425

426
  pMeta->scanInfo.scanCounter += 1;
247,298✔
427
  if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
247,298✔
428
    pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
182,993✔
429
  }
430

431
  if (pMeta->scanInfo.scanCounter > 1) {
247,298✔
432
    tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter);
206,183✔
433
    return 0;
206,179✔
434
  }
435

436
  int32_t numOfPauseTasks = pMeta->numOfPausedTasks;
41,115✔
437
  if (ckPause && numOfTasks == numOfPauseTasks) {
41,115✔
438
    tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
115✔
439

440
    // reset the counter value, since we do not launch the scan wal operation.
441
    pMeta->scanInfo.scanCounter = 0;
115✔
442
    return 0;
115✔
443
  }
444

445
  tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
41,000✔
446
          numOfTasks, alreadyRestored);
447

448
  return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
41,004✔
449
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc