• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3526

10 Nov 2024 03:50AM UTC coverage: 60.225% (-0.6%) from 60.818%
#3526

push

travis-ci

web-flow
Merge pull request #28709 from taosdata/main

merge: from main to 3.0 branch

117031 of 249004 branches covered (47.0%)

Branch coverage included in aggregate %.

130 of 169 new or added lines in 23 files covered. (76.92%)

4149 existing lines in 176 files now uncovered.

197577 of 273386 relevant lines covered (72.27%)

5840219.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.62
/source/libs/stream/src/streamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "osDir.h"
18
#include "osMemory.h"
19
#include "streamInt.h"
20
#include "streamsm.h"
21
#include "tmisce.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25

26
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
27
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
28
static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
29
static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
30

31
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
8,080✔
32
  int32_t childId = taosArrayGetSize(pArray);
8,080✔
33
  pTask->info.selfChildId = childId;
8,080✔
34
  void* p = taosArrayPush(pArray, &pTask);
8,080✔
35
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
8,080!
36
}
37

38
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
10✔
39
  int32_t code = 0;
10✔
40
  char    buf[512] = {0};
10✔
41

42
  if (pTask->info.nodeId == nodeId) {  // execution task should be moved away
10✔
43
    bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
2✔
44
    code = epsetToStr(pEpSet, buf, tListLen(buf));
2✔
45
    if (code) { // print error and continue
2!
46
      stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
47
      return code;
×
48
    }
49

50
    if (!isEqual) {
2!
UNCOV
51
      (*pUpdated) = true;
×
UNCOV
52
      char tmp[512] = {0};
×
UNCOV
53
      code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));  // only for log file, ignore errors
×
UNCOV
54
      if (code) { // print error and continue
×
55
        stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
56
        return code;
×
57
      }
58

UNCOV
59
      epsetAssign(&pTask->info.epSet, pEpSet);
×
UNCOV
60
      stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
×
61
    } else {
62
      stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", pTask->id.taskId, nodeId, buf);
2!
63
    }
64
  }
65

66
  // check for the dispatch info and the upstream task info
67
  int32_t level = pTask->info.taskLevel;
10✔
68
  if (level == TASK_LEVEL__SOURCE) {
10✔
69
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
4✔
70
  } else if (level == TASK_LEVEL__AGG) {
6✔
71
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
2✔
72
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
2✔
73
  } else {  // TASK_LEVEL__SINK
74
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
4✔
75
  }
76

77
  return code;
10✔
78
}
79

80
static void freeItem(void* p) {
×
81
  SStreamContinueExecInfo* pInfo = p;
×
82
  rpcFreeCont(pInfo->msg.pCont);
×
83
}
×
84

85
static void freeUpstreamItem(void* p) {
45,892✔
86
  SStreamUpstreamEpInfo** pInfo = p;
45,892✔
87
  taosMemoryFree(*pInfo);
45,892✔
88
}
45,894✔
89

90
static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
11,634✔
91
  SStreamUpstreamEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamUpstreamEpInfo));
11,634✔
92
  if (pEpInfo == NULL) {
11,634!
93
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
94
    return NULL;
×
95
  }
96

97
  pEpInfo->childId = pTask->info.selfChildId;
11,634✔
98
  pEpInfo->epSet = pTask->info.epSet;
11,634✔
99
  pEpInfo->nodeId = pTask->info.nodeId;
11,634✔
100
  pEpInfo->taskId = pTask->id.taskId;
11,634✔
101
  pEpInfo->stage = -1;
11,634✔
102

103
  return pEpInfo;
11,634✔
104
}
105

106
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int32_t trigger,
8,080✔
107
                       int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
108
                       SStreamTask** p) {
109
  *p = NULL;
8,080✔
110

111
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
8,080✔
112
  if (pTask == NULL) {
8,080!
113
    stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId,
×
114
            (int32_t)sizeof(SStreamTask), tstrerror(terrno));
115
    return terrno;
×
116
  }
117

118
  pTask->ver = SSTREAM_TASK_VER;
8,080✔
119
  pTask->id.taskId = tGenIdPI32();
8,080✔
120
  pTask->id.streamId = streamId;
8,080✔
121

122
  pTask->info.taskLevel = taskLevel;
8,080✔
123
  pTask->info.fillHistory = fillHistory;
8,080✔
124
  pTask->info.trigger = trigger;
8,080✔
125
  pTask->info.delaySchedParam = triggerParam;
8,080✔
126
  pTask->subtableWithoutMd5 = subtableWithoutMd5;
8,080✔
127

128
  int32_t code = streamCreateStateMachine(pTask);
8,080✔
129
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
8,080!
130
    taosMemoryFreeClear(pTask);
×
131
    return code;
×
132
  }
133

134
  char buf[128] = {0};
8,080✔
135
  sprintf(buf, "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
8,080✔
136

137
  pTask->id.idStr = taosStrdup(buf);
8,080✔
138
  if (pTask->id.idStr == NULL) {
8,080!
139
    stError("s-task:0x%x failed to build task id, code: out of memory", pTask->id.taskId);
×
140
    return terrno;
×
141
  }
142

143
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
8,080✔
144
  pTask->status.taskStatus = fillHistory ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
8,080✔
145
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
8,080✔
146
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
8,080✔
147

148
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
8,080✔
149
  code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
8,080✔
150
  if (code) {
8,080!
151
    return code;
×
152
  }
153

154
  if (fillHistory && !hasFillhistory) {
8,080!
155
    stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId);
×
156
    return TSDB_CODE_INVALID_PARA;
×
157
  }
158

159
  epsetAssign(&(pTask->info.mnodeEpset), pEpset);
8,080✔
160

161
  code = addToTaskset(pTaskList, pTask);
8,080✔
162
  *p = pTask;
8,080✔
163

164
  return code;
8,080✔
165
}
166

167
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
90✔
168
  int64_t skip64;
169
  int8_t  skip8;
170
  int32_t skip32;
171
  int16_t skip16;
172
  SEpSet  epSet;
173

174
  if (tStartDecode(pDecoder) < 0) return -1;
90!
175
  if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
180!
176
  // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
177

178
  if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
90!
179
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
90!
180
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
90!
181
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
90!
182
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
90!
183
  if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
90!
184

185
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
90!
186
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
90!
187

188
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
90!
189
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
90!
190
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
90!
191
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
90!
192

193
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
180!
194
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
180!
195

196
  tEndDecode(pDecoder);
90✔
197
  return 0;
90✔
198
}
199

UNCOV
200
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
×
201
  int64_t ver;
UNCOV
202
  if (tStartDecode(pDecoder) < 0) return -1;
×
UNCOV
203
  if (tDecodeI64(pDecoder, &ver) < 0) return -1;
×
UNCOV
204
  if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
×
205

UNCOV
206
  if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
×
207

UNCOV
208
  int32_t taskId = 0;
×
UNCOV
209
  if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
×
210

UNCOV
211
  pTaskId->taskId = taskId;
×
UNCOV
212
  tEndDecode(pDecoder);
×
UNCOV
213
  return 0;
×
214
}
215

216
void tFreeStreamTask(void* pParam) {
32,070✔
217
  char*        p = NULL;
32,070✔
218
  SStreamTask* pTask = pParam;
32,070✔
219
  int32_t      taskId = pTask->id.taskId;
32,070✔
220

221
  STaskExecStatisInfo* pStatis = &pTask->execInfo;
32,070✔
222

223
  ETaskStatus status1 = TASK_STATUS__UNINIT;
32,070✔
224
  streamMutexLock(&pTask->lock);
32,070✔
225
  if (pTask->status.pSM != NULL) {
32,070✔
226
    SStreamTaskState status = streamTaskGetStatus(pTask);
16,129✔
227
    p = status.name;
16,129✔
228
    status1 = status.state;
16,129✔
229
  }
230
  streamMutexUnlock(&pTask->lock);
32,070✔
231

232
  stDebug("start to free s-task:0x%x %p, state:%s, refId:%" PRId64, taskId, pTask, p, pTask->id.refId);
32,070!
233

234
  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
32,070✔
235
  stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
32,070!
236
          ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
237
          " nextProcessVer:%" PRId64 ", checkpointCount:%d",
238
          taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs,
239
          pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
240

241
  if (pTask->schedInfo.pDelayTimer != NULL) {
32,070✔
242
    streamTmrStop(pTask->schedInfo.pDelayTimer);
1,116✔
243
    pTask->schedInfo.pDelayTimer = NULL;
1,116✔
244
  }
245

246
  if (pTask->hTaskInfo.pTimer != NULL) {
32,070✔
247
    streamTmrStop(pTask->hTaskInfo.pTimer);
1,161✔
248
    pTask->hTaskInfo.pTimer = NULL;
1,161✔
249
  }
250

251
  if (pTask->msgInfo.pRetryTmr != NULL) {
32,070✔
252
    streamTmrStop(pTask->msgInfo.pRetryTmr);
3,388✔
253
    pTask->msgInfo.pRetryTmr = NULL;
3,388✔
254
  }
255

256
  if (pTask->inputq.queue) {
32,070✔
257
    streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
8,015✔
258
    pTask->inputq.queue = NULL;
8,015✔
259
  }
260

261
  if (pTask->outputq.queue) {
32,070✔
262
    streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
8,015✔
263
    pTask->outputq.queue = NULL;
8,015✔
264
  }
265

266
  if (pTask->exec.qmsg) {
32,070✔
267
    taosMemoryFree(pTask->exec.qmsg);
17,022✔
268
  }
269

270
  if (pTask->exec.pExecutor) {
32,070✔
271
    qDestroyTask(pTask->exec.pExecutor);
4,226✔
272
    pTask->exec.pExecutor = NULL;
4,226✔
273
  }
274

275
  if (pTask->exec.pWalReader != NULL) {
32,070✔
276
    walCloseReader(pTask->exec.pWalReader);
4,051✔
277
    pTask->exec.pWalReader = NULL;
4,051✔
278
  }
279

280
  streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
32,070✔
281

282
  if (pTask->msgInfo.pData != NULL) {
32,070✔
283
    clearBufferedDispatchMsg(pTask);
29✔
284
  }
285

286
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
32,070✔
287
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
15,496!
288
    taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
15,496✔
289
    tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo);
15,496✔
290
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
15,496✔
291
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
16,574✔
292
    taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
13,838✔
293
  }
294

295
  streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
32,070✔
296
  streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0);
32,070✔
297

298
  if (pTask->pNameMap) {
32,068✔
299
    tSimpleHashCleanup(pTask->pNameMap);
1,481✔
300
  }
301

302
  streamDestroyStateMachine(pTask->status.pSM);
32,068✔
303
  pTask->status.pSM = NULL;
32,070✔
304

305
  streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
32,070✔
306

307
  taosMemoryFree(pTask->outputInfo.pTokenBucket);
32,070✔
308
  streamMutexDestroy(&pTask->lock);
32,070✔
309

310
  taosArrayDestroy(pTask->msgInfo.pSendInfo);
32,069✔
311
  pTask->msgInfo.pSendInfo = NULL;
32,069✔
312
  streamMutexDestroy(&pTask->msgInfo.lock);
32,069✔
313

314
  taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
32,069✔
315
  pTask->outputInfo.pNodeEpsetUpdateList = NULL;
32,070✔
316

317
  if (pTask->id.idStr != NULL) {
32,070✔
318
    taosMemoryFree((void*)pTask->id.idStr);
16,095✔
319
  }
320

321
  streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
32,069✔
322
  pTask->chkInfo.pActiveInfo = NULL;
32,070✔
323

324
  taosMemoryFree(pTask);
32,070✔
325
  stDebug("s-task:0x%x free task completed", taskId);
32,070!
326
}
32,070✔
327

328
void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
32,070✔
329
  stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId);
32,070!
330
  if (pTask->pState != NULL) {
32,070✔
331
    stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
4,226!
332
    streamStateClose(pTask->pState, remove);
4,226✔
333

334
    if (remove) taskDbSetClearFileFlag(pTask->pBackend);
4,226✔
335
    taskDbRemoveRef(pTask->pBackend);
4,226✔
336
    pTask->pBackend = NULL;
4,226✔
337
    pTask->pState = NULL;
4,226✔
338
  } else {
339
    stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
27,844!
340
            pTask->backendPath ? pTask->backendPath : "NULL");
341
    if (remove) {
27,844✔
342
      if (pTask->backendPath != NULL) {
1,650!
343
        stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
1,650!
344
        taosRemoveDir(pTask->backendPath);
1,650✔
345
      }
346
    }
347
  }
348

349
  if (pTask->backendPath != NULL) {
32,068✔
350
    taosMemoryFree(pTask->backendPath);
8,015✔
351
    pTask->backendPath = NULL;
8,015✔
352
  }
353
}
32,068✔
354

355
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
8,140✔
356
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
8,140✔
357
  SDataRange*      pRange = &pTask->dataRange;
8,140✔
358

359
  // only set the version info for stream tasks without fill-history task
360
  if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
8,140✔
361
    pChkInfo->checkpointVer = ver - 1;  // only update when generating checkpoint
2,178✔
362
    pChkInfo->processedVer = ver - 1;   // already processed version
2,178✔
363
    pChkInfo->nextProcessVer = ver;     // next processed version
2,178✔
364

365
    pRange->range.maxVer = ver;
2,178✔
366
    pRange->range.minVer = ver;
2,178✔
367
  } else {
368
    // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
369
    // is set at the mnode.
370
    if (pTask->info.fillHistory == 1) {
5,962✔
371
      pChkInfo->checkpointVer = pRange->range.maxVer;
2,981✔
372
      pChkInfo->processedVer = pRange->range.maxVer;
2,981✔
373
      pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
2,981✔
374
    } else {
375
      pChkInfo->checkpointVer = pRange->range.minVer - 1;
2,981✔
376
      pChkInfo->processedVer = pRange->range.minVer - 1;
2,981✔
377
      pChkInfo->nextProcessVer = pRange->range.minVer;
2,981✔
378

379
      {  // for compatible purpose, remove it later
380
        if (pRange->range.minVer == 0) {
2,981✔
381
          pChkInfo->checkpointVer = 0;
1,500✔
382
          pChkInfo->processedVer = 0;
1,500✔
383
          pChkInfo->nextProcessVer = 1;
1,500✔
384
          stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
1,500!
385
        }
386
      }
387
    }
388
  }
389
}
8,140✔
390

391
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
8,140✔
392
  int64_t streamId = 0;
8,140✔
393
  int32_t taskId = 0;
8,140✔
394

395
  if (pTask->info.fillHistory) {
8,140✔
396
    streamId = pTask->streamTaskId.streamId;
2,981✔
397
    taskId = pTask->streamTaskId.taskId;
2,981✔
398
  } else {
399
    streamId = pTask->id.streamId;
5,159✔
400
    taskId = pTask->id.taskId;
5,159✔
401
  }
402

403
  char    id[128] = {0};
8,140✔
404
  int32_t nBytes = sprintf(id, "0x%" PRIx64 "-0x%x", streamId, taskId);
8,140✔
405
  if (nBytes < 0 || nBytes >= sizeof(id)) {
8,140!
406
    return TSDB_CODE_OUT_OF_BUFFER;
×
407
  }
408

409
  int32_t len = strlen(pTask->pMeta->path);
8,140✔
410
  pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
8,140✔
411
  if (pTask->backendPath == NULL) {
8,140!
412
    return terrno;
×
413
  }
414

415
  (void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
8,140✔
416
  stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
8,140!
417

418
  return 0;
8,140✔
419
}
420

421
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
8,140✔
422
  int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
8,140✔
423
  if (code) {
8,140!
424
    stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
×
425
    return code;
×
426
  }
427

428
  pTask->id.refId = 0;
8,140✔
429
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
8,140✔
430
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
8,140✔
431

432
  int32_t code1 = streamQueueOpen(512 << 10, &pTask->inputq.queue);
8,140✔
433
  int32_t code2 = streamQueueOpen(512 << 10, &pTask->outputq.queue);
8,139✔
434
  if (code1 || code2) {
8,140!
435
    stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
×
436
    return TSDB_CODE_OUT_OF_MEMORY;
×
437
  }
438

439
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
8,140✔
440

441
  code = streamCreateStateMachine(pTask);
8,140✔
442
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
8,140!
443
    stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
×
444
            tstrerror(code));
445
    return code;
×
446
  }
447

448
  pTask->execInfo.created = taosGetTimestampMs();
8,140✔
449
  setInitialVersionInfo(pTask, ver);
8,140✔
450

451
  pTask->pMeta = pMeta;
8,140✔
452
  pTask->pMsgCb = pMsgCb;
8,140✔
453
  pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry));
8,140✔
454
  if (pTask->msgInfo.pSendInfo == NULL) {
8,140!
455
    stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr);
×
456
    return terrno;
×
457
  }
458

459
  code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
8,140✔
460
  if (code) {
8,139!
461
    stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code));
×
462
    return code;
×
463
  }
464

465
  TdThreadMutexAttr attr = {0};
8,139✔
466
  code = taosThreadMutexAttrInit(&attr);
8,139✔
467
  if (code != 0) {
8,139!
468
    stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
×
469
    return code;
×
470
  }
471

472
  code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
8,139✔
473
  if (code != 0) {
8,139!
474
    stError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
×
475
    return code;
×
476
  }
477

478
  code = taosThreadMutexInit(&pTask->lock, &attr);
8,139✔
479
  if (code) {
8,139!
480
    return code;
×
481
  }
482

483
  code = taosThreadMutexAttrDestroy(&attr);
8,139✔
484
  if (code) {
8,139!
485
    return code;
×
486
  }
487

488
  streamTaskOpenAllUpstreamInput(pTask);
8,139✔
489

490
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
8,140✔
491
  pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
8,140✔
492
  if (pOutputInfo->pTokenBucket == NULL) {
8,140!
493
    stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno));
×
494
    return terrno;
×
495
  }
496

497
  // 2MiB per second for sink task
498
  // 50 times sink operator per second
499
  code = streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
8,140✔
500
  if (code) {
8,140!
501
    return code;
×
502
  }
503

504
  pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
8,140✔
505
  if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
8,140!
506
    stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno));
×
507
    return terrno;
×
508
  }
509

510
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
8,140✔
511
  if (pTask->taskCheckInfo.pList == NULL) {
8,140!
512
    stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno));
×
513
    return terrno;
×
514
  }
515

516
  if (pTask->chkInfo.pActiveInfo == NULL) {
8,140!
517
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
8,140✔
518
    if (code) {
8,140!
519
      stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code));
×
520
      return code;
×
521
    }
522
  }
523

524
  return streamTaskSetBackendPath(pTask);
8,140✔
525
}
526

527
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
57,579✔
528
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
57,579✔
529
    return 0;
3,767✔
530
  }
531

532
  int32_t type = pTask->outputInfo.type;
53,812✔
533
  if (type == TASK_OUTPUT__TABLE) {
53,812✔
534
    return 0;
184✔
535
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
53,628✔
536
    return 1;
7,162✔
537
  } else {
538
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
46,466✔
539
    return taosArrayGetSize(vgInfo);
46,466✔
540
  }
541
}
542

543
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) { return taosArrayGetSize(pTask->upstreamInfo.pList); }
10,637✔
544

545
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
11,634✔
546
  SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
11,634✔
547
  if (pEpInfo == NULL) {
11,634!
548
    return terrno;
×
549
  }
550

551
  if (pTask->upstreamInfo.pList == NULL) {
11,634✔
552
    pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES);
3,989✔
553
  }
554

555
  void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
11,634✔
556
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
11,634!
557
}
558

559
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
6✔
560
  int32_t code = 0;
6✔
561
  char    buf[512] = {0};
6✔
562
  code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore error since it is only for log file.
6✔
563
  if (code != 0) {  // print error and continue
6!
564
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
565
    return code;
×
566
  }
567

568
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
6✔
569
  for (int32_t i = 0; i < numOfUpstream; ++i) {
10✔
570
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
8✔
571
    if (pInfo->nodeId == nodeId) {
8✔
572
      bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
4✔
573
      if (!equal) {
4!
UNCOV
574
        *pUpdated = true;
×
575

UNCOV
576
        char tmp[512] = {0};
×
UNCOV
577
        code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
×
UNCOV
578
        if (code != 0) {  // print error and continue
×
579
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
580
          return code;
×
581
        }
582

UNCOV
583
        epsetAssign(&pInfo->epSet, pEpSet);
×
UNCOV
584
        stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
×
585
                pInfo->taskId, nodeId, buf, tmp);
586
      } else {
587
        stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId,
4!
588
                pInfo->taskId, nodeId, buf);
589
      }
590

591
      break;
4✔
592
    }
593
  }
594

595
  return code;
6✔
596
}
597

598
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
32,067✔
599
  if (pUpstreamInfo->pList != NULL) {
32,067✔
600
    taosArrayDestroyEx(pUpstreamInfo->pList, freeUpstreamItem);
27,943✔
601
    pUpstreamInfo->numOfClosed = 0;
27,945✔
602
    pUpstreamInfo->pList = NULL;
27,945✔
603
  }
604
}
32,069✔
605

606
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
710✔
607
  STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
710✔
608
  pDispatcher->taskId = pDownstreamTask->id.taskId;
710✔
609
  pDispatcher->nodeId = pDownstreamTask->info.nodeId;
710✔
610
  pDispatcher->epSet = pDownstreamTask->info.epSet;
710✔
611

612
  pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
710✔
613
  pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
710✔
614
}
710✔
615

616
int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
6✔
617
  char    buf[512] = {0};
6✔
618
  int32_t code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore the error since only for log files.
6✔
619
  if (code != 0) {                                        // print error and continue
6!
620
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
621
    return code;
×
622
  }
623

624
  int32_t id = pTask->id.taskId;
6✔
625
  int8_t  type = pTask->outputInfo.type;
6✔
626

627
  if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
6✔
628
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
2✔
629

630
    for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
6✔
631
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
4✔
632
      if (pVgInfo == NULL) {
4!
633
        continue;
×
634
      }
635

636
      if (pVgInfo->vgId == nodeId) {
4!
UNCOV
637
        bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
×
UNCOV
638
        if (!isEqual) {
×
UNCOV
639
          *pUpdated = true;
×
640

UNCOV
641
          char tmp[512] = {0};
×
UNCOV
642
          code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
×
UNCOV
643
          if (code != 0) {  // print error and continue
×
644
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
645
            return code;
×
646
          }
647

UNCOV
648
          epsetAssign(&pVgInfo->epSet, pEpSet);
×
UNCOV
649
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
×
650
                  nodeId, buf, tmp);
651
        } else {
UNCOV
652
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
×
653
                  pVgInfo->taskId, nodeId, buf);
654
        }
UNCOV
655
        break;
×
656
      }
657
    }
658
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
4!
659
    STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
4✔
660
    if (pDispatcher->nodeId == nodeId) {
4!
661
      bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet);
4✔
662
      if (!equal) {
4!
663
        *pUpdated = true;
×
664

665
        char tmp[512] = {0};
×
666
        code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
×
667
        if (code != 0) {  // print error and continue
×
668
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
669
          return code;
×
670
        }
671

672
        epsetAssign(&pDispatcher->epSet, pEpSet);
×
673
        stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
×
674
                nodeId, buf, tmp);
675
      } else {
676
        stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
4!
677
                pDispatcher->taskId, nodeId, buf);
678
      }
679
    }
680
  }
681

682
  return code;
6✔
683
}
684

685
int32_t streamTaskStop(SStreamTask* pTask) {
1,733✔
686
  int32_t     vgId = pTask->pMeta->vgId;
1,733✔
687
  int64_t     st = taosGetTimestampMs();
1,733✔
688
  const char* id = pTask->id.idStr;
1,733✔
689

690
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
1,733✔
691
  if (code) {
1,733!
692
    stError("failed to handle STOP event, s-task:%s, code:%s", id, tstrerror(code));
×
693
    return code;
×
694
  }
695

696
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
1,733✔
697
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
1,009✔
698
    if (code != TSDB_CODE_SUCCESS) {
1,009!
699
      stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
×
700
    }
701
  }
702

703
  while (!streamTaskIsIdle(pTask)) {
1,732!
704
    stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
×
705
            pTask->info.taskLevel);
706
    taosMsleep(100);
×
707
  }
708

709
  int64_t el = taosGetTimestampMs() - st;
1,733✔
710
  stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
1,733!
711
  return code;
1,733✔
712
}
713

714
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
10✔
715
  STaskExecStatisInfo* p = &pTask->execInfo;
10✔
716

717
  int32_t numOfNodes = taosArrayGetSize(pNodeList);
10✔
718
  int64_t prevTs = p->latestUpdateTs;
10✔
719

720
  p->latestUpdateTs = taosGetTimestampMs();
10✔
721
  p->updateCount += 1;
10✔
722
  stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
10!
723
          numOfNodes, p->updateCount, prevTs);
724

725
  bool updated = false;
10✔
726
  for (int32_t i = 0; i < numOfNodes; ++i) {
20✔
727
    SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
10✔
728
    if (pInfo == NULL) {
10!
729
      continue;
×
730
    }
731

732
    int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
10✔
733
    if (code) {
10!
734
      stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
×
735
    }
736
  }
737

738
  return updated;
10✔
739
}
740

741
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
8,139✔
742
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
8,139✔
743
    return;
4,113✔
744
  }
745

746
  int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList);
4,026✔
747
  for (int32_t i = 0; i < size; ++i) {
15,680✔
748
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
11,654✔
749
    pInfo->stage = -1;
11,654✔
750
  }
751

752
  stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
4,026!
753
}
754

755
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
13,464✔
756
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
13,464✔
757
  if (num == 0) {
13,465✔
758
    return;
6,785✔
759
  }
760

761
  for (int32_t i = 0; i < num; ++i) {
27,052✔
762
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
20,372✔
763
    pInfo->dataAllowed = true;
20,372✔
764
  }
765

766
  pTask->upstreamInfo.numOfClosed = 0;
6,680✔
767
  stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num);
6,680!
768
}
769

770
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
4,607✔
771
  SStreamUpstreamEpInfo* pInfo = NULL;
4,607✔
772
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
4,607✔
773

774
  if ((pInfo != NULL) && pInfo->dataAllowed) {
4,607!
775
    pInfo->dataAllowed = false;
4,607✔
776
    if (pTask->upstreamInfo.numOfClosed < streamTaskGetNumOfUpstream(pTask)) {
4,607!
777
      int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
4,607✔
778
    } else {
779
      stError("s-task:%s not inc closed input, since they have been all closed already", pTask->id.idStr);
×
780
    }
781
  }
782
}
4,607✔
783

784
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
×
785
  SStreamUpstreamEpInfo* pInfo = NULL;
×
786
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
×
787

788
  if (pInfo != NULL && (!pInfo->dataAllowed)) {
×
789
    int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
×
790
    stDebug("s-task:%s open inputQ for upstream:0x%x, remain closed:%d", pTask->id.idStr, taskId, t);
×
791
    pInfo->dataAllowed = true;
×
792
  }
793
}
×
794

795
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
×
796
  return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
×
797
}
798

799
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
45,169✔
800
  bool ret = false;
45,169✔
801

802
  streamMutexLock(&pTask->lock);
45,169✔
803
  if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
45,169✔
804
    pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
35,962✔
805
    ret = true;
35,962✔
806
  }
807

808
  streamMutexUnlock(&pTask->lock);
45,169✔
809
  return ret;
45,169✔
810
}
811

812
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
34,829✔
813
  streamMutexLock(&pTask->lock);
34,829✔
814
  int8_t status = pTask->status.schedStatus;
34,829✔
815
  if (status == TASK_SCHED_STATUS__WAITING) {
34,829!
816
    pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
34,829✔
817
  }
818
  streamMutexUnlock(&pTask->lock);
34,829✔
819

820
  return status;
34,829✔
821
}
822

823
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
1,102✔
824
  streamMutexLock(&pTask->lock);
1,102✔
825
  int8_t status = pTask->status.schedStatus;
1,102✔
826
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
1,102✔
827
  streamMutexUnlock(&pTask->lock);
1,102✔
828

829
  return status;
1,102✔
830
}
831

832
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
3,524✔
833
  int32_t      code = 0;
3,524✔
834
  SStreamMeta* pMeta = pTask->pMeta;
3,524✔
835
  SStreamTask* pStreamTask = NULL;
3,524✔
836

837
  if (pTask->info.fillHistory == 0) {
3,524✔
838
    return code;
3,523✔
839
  }
840

841
  code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
1✔
842
  if (code == 0) {
1!
843
    stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
×
844
            (int32_t)pTask->streamTaskId.taskId);
845

846
    streamMutexLock(&(pStreamTask->lock));
×
847
    CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
×
848

849
    if (resetRelHalt) {
×
850
      stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
×
851
              pTask->streamTaskId.taskId, streamTaskGetStatusStr(pStreamTask->status.taskStatus),
852
              streamTaskGetStatus(pStreamTask).name);
853
      pStreamTask->status.taskStatus = TASK_STATUS__READY;
×
854
    }
855

856
    code = streamMetaSaveTask(pMeta, pStreamTask);
×
857
    streamMutexUnlock(&(pStreamTask->lock));
×
858

859
    streamMetaReleaseTask(pMeta, pStreamTask);
×
860
  }
861

862
  return code;
1✔
863
}
864

865
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
1✔
866
  SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
1✔
867
  if (pReq == NULL) {
1!
868
    return terrno;
×
869
  }
870

871
  pReq->head.vgId = vgId;
1✔
872
  pReq->taskId = pTaskId->taskId;
1✔
873
  pReq->streamId = pTaskId->streamId;
1✔
874
  pReq->resetRelHalt = resetRelHalt;
1✔
875

876
  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
1✔
877
  int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
1✔
878
  if (code != TSDB_CODE_SUCCESS) {
1!
879
    stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
×
880
  } else {
881
    stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
1!
882
  }
883

884
  return code;
1✔
885
}
886

887
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
2,843✔
888
  int32_t                code = 0;
2,843✔
889
  int32_t                tlen = 0;
2,843✔
890
  int32_t                vgId = pTask->pMeta->vgId;
2,843✔
891
  const char*            id = pTask->id.idStr;
2,843✔
892
  SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo;
2,843✔
893

894
  SCheckpointReport req = {.streamId = pTask->id.streamId,
2,843✔
895
                           .taskId = pTask->id.taskId,
2,843✔
896
                           .nodeId = vgId,
897
                           .dropHTask = dropRelHTask,
898
                           .transId = pActive->transId,
2,843✔
899
                           .checkpointId = pActive->activeId,
2,843✔
900
                           .checkpointVer = pCheckpointInfo->processedVer,
2,843✔
901
                           .checkpointTs = pCheckpointInfo->startTs};
2,843✔
902

903
  tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code);
2,843!
904
  if (code < 0) {
2,843!
905
    stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code));
×
906
    return -1;
×
907
  }
908

909
  void* buf = rpcMallocCont(tlen);
2,843✔
910
  if (buf == NULL) {
2,843!
911
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId,
×
912
            tstrerror(TSDB_CODE_OUT_OF_MEMORY));
913
    return -1;
×
914
  }
915

916
  SEncoder encoder;
917
  tEncoderInit(&encoder, buf, tlen);
2,843✔
918
  if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
2,843!
919
    rpcFreeCont(buf);
×
920
    tEncoderClear(&encoder);
×
921
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
×
922
    return -1;
×
923
  }
924
  tEncoderClear(&encoder);
2,843✔
925

926
  SRpcMsg msg = {0};
2,843✔
927
  initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen);
2,843✔
928
  stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId);
2,843!
929

930
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
2,843✔
931
}
932

933
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
24,034✔
934
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
24,034✔
935
  return id;
24,034✔
936
}
937

938
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
1,178✔
939
  pInfo->waitInterval = LAUNCH_HTASK_INTERVAL;
1,178✔
940
  pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL);
1,178✔
941
  pInfo->retryTimes = 0;
1,178✔
942
}
1,178✔
943

944
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
1,174✔
945
  pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
1,174✔
946
  pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
1,174✔
947
  pInfo->retryTimes += 1;
1,174✔
948
}
1,174✔
949

950
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) {
5,061✔
951
  pEntry->id.streamId = pTask->id.streamId;
5,061✔
952
  pEntry->id.taskId = pTask->id.taskId;
5,061✔
953
  pEntry->stage = -1;
5,061✔
954
  pEntry->nodeId = pTask->info.nodeId;
5,061✔
955
  pEntry->status = TASK_STATUS__STOP;
5,061✔
956
}
5,061✔
957

958
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) {
14,223✔
959
  pDst->stage = pSrc->stage;
14,223✔
960
  pDst->inputQUsed = pSrc->inputQUsed;
14,223✔
961
  pDst->inputRate = pSrc->inputRate;
14,223✔
962
  pDst->procsTotal = pSrc->procsTotal;
14,223✔
963
  pDst->procsThroughput = pSrc->procsThroughput;
14,223✔
964
  pDst->outputTotal = pSrc->outputTotal;
14,223✔
965
  pDst->outputThroughput = pSrc->outputThroughput;
14,223✔
966
  pDst->processedVer = pSrc->processedVer;
14,223✔
967
  pDst->verRange = pSrc->verRange;
14,223✔
968
  pDst->sinkQuota = pSrc->sinkQuota;
14,223✔
969
  pDst->sinkDataSize = pSrc->sinkDataSize;
14,223✔
970
  pDst->checkpointInfo = pSrc->checkpointInfo;
14,223✔
971
  pDst->startCheckpointId = pSrc->startCheckpointId;
14,223✔
972
  pDst->startCheckpointVer = pSrc->startCheckpointVer;
14,223✔
973
  pDst->status = pSrc->status;
14,223✔
974

975
  pDst->startTime = pSrc->startTime;
14,223✔
976
  pDst->hTaskId = pSrc->hTaskId;
14,223✔
977
}
14,223✔
978

979
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
14,753✔
980
  SStreamMeta*         pMeta = pTask->pMeta;
14,753✔
981
  STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
14,753✔
982

983
  STaskStatusEntry entry = {
44,259✔
984
      .id = streamTaskGetTaskId(pTask),
14,753✔
985
      .status = streamTaskGetStatus(pTask).state,
14,753✔
986
      .nodeId = pMeta->vgId,
14,753✔
987
      .stage = pMeta->stage,
14,753✔
988

989
      .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)),
14,753✔
990
      .startTime = pExecInfo->readyTs,
14,753✔
991
      .checkpointInfo.latestId = pTask->chkInfo.checkpointId,
14,753✔
992
      .checkpointInfo.latestVer = pTask->chkInfo.checkpointVer,
14,753✔
993
      .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
14,753✔
994
      .checkpointInfo.latestSize = 0,
995
      .checkpointInfo.remoteBackup = 0,
996
      .checkpointInfo.consensusChkptId = 0,
997
      .checkpointInfo.consensusTs = 0,
998
      .hTaskId = pTask->hTaskInfo.id.taskId,
14,753✔
999
      .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
14,753✔
1000
      .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
14,753✔
1001
      .procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput),
14,753✔
1002
      .outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput),
14,753✔
1003
      .startCheckpointId = pExecInfo->startCheckpointId,
14,753✔
1004
      .startCheckpointVer = pExecInfo->startCheckpointVer,
14,753✔
1005
  };
1006
  return entry;
14,753✔
1007
}
1008

1009
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
709✔
1010
  SStreamMeta* pMeta = pTask->pMeta;
709✔
1011
  int32_t      code = 0;
709✔
1012

1013
  int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
709✔
1014
  stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
709!
1015

1016
  // in case of fill-history task, stop the tsdb file scan operation.
1017
  if (pTask->info.fillHistory == 1) {
709!
UNCOV
1018
    void* pExecutor = pTask->exec.pExecutor;
×
UNCOV
1019
    code = qKillTask(pExecutor, TSDB_CODE_SUCCESS);
×
1020
  }
1021

1022
  stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
709!
1023
  return code;
709✔
1024
}
1025

1026
void streamTaskPause(SStreamTask* pTask) {
709✔
1027
  int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
709✔
1028
  if (code) {
709!
1029
    stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code));
×
1030
  }
1031
}
709✔
1032

1033
void streamTaskResume(SStreamTask* pTask) {
702✔
1034
  SStreamTaskState prevState = streamTaskGetStatus(pTask);
702✔
1035

1036
  SStreamMeta* pMeta = pTask->pMeta;
702✔
1037
  int32_t      code = streamTaskRestoreStatus(pTask);
702✔
1038
  if (code == TSDB_CODE_SUCCESS) {
702!
1039
    char*   pNew = streamTaskGetStatus(pTask).name;
702✔
1040
    int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
702✔
1041
    stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
702!
1042
  } else {
UNCOV
1043
    stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name,
×
1044
           pMeta->numOfPausedTasks);
1045
  }
1046
}
702✔
1047

1048
bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
40,782✔
1049

1050
// this task must success
1051
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
2,858✔
1052
  int32_t     code;
1053
  int32_t     tlen = 0;
2,858✔
1054
  int32_t     vgId = pTask->pMeta->vgId;
2,858✔
1055
  const char* id = pTask->id.idStr;
2,858✔
1056

1057
  SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
2,858✔
1058
  tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
2,858!
1059
  if (code < 0) {
2,858!
1060
    stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
×
1061
    return TSDB_CODE_INVALID_MSG;
×
1062
  }
1063

1064
  void* buf = rpcMallocCont(tlen);
2,858✔
1065
  if (buf == NULL) {
2,858!
1066
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:Out of memory", id, vgId);
×
1067
    return terrno;
×
1068
  }
1069

1070
  SEncoder encoder;
1071
  tEncoderInit(&encoder, buf, tlen);
2,858✔
1072
  if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
2,858!
1073
    rpcFreeCont(buf);
×
1074
    tEncoderClear(&encoder);
×
1075
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
×
1076
    return code;
×
1077
  }
1078

1079
  tEncoderClear(&encoder);
2,858✔
1080

1081
  SRpcMsg msg = {0};
2,858✔
1082
  initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
2,858✔
1083
  stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
2,858!
1084

1085
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
2,858✔
1086
}
1087

1088
void streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo) {
40,150✔
1089
  *pEpInfo = NULL;
40,150✔
1090

1091
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
40,150✔
1092
  for (int32_t i = 0; i < num; ++i) {
85,449!
1093
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
85,449✔
1094
    if (pInfo == NULL) {
85,449!
1095
      return;
×
1096
    }
1097

1098
    if (pInfo->taskId == taskId) {
85,449✔
1099
      *pEpInfo = pInfo;
40,150✔
1100
      return;
40,150✔
1101
    }
1102
  }
1103

UNCOV
1104
  stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
×
1105
}
1106

1107
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
×
1108
  if (pTask->info.taskLevel == TASK_OUTPUT__FIXED_DISPATCH) {
×
1109
    if (pTask->outputInfo.fixedDispatcher.taskId == taskId) {
×
1110
      return &pTask->outputInfo.fixedDispatcher.epSet;
×
1111
    }
1112
  } else if (pTask->info.taskLevel == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
1113
    SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
1114
    for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
1115
      SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
×
1116
      if (pVgInfo == NULL) {
×
1117
        continue;
×
1118
      }
1119

1120
      if (pVgInfo->taskId == taskId) {
×
1121
        return &pVgInfo->epSet;
×
1122
      }
1123
    }
1124
  }
1125

1126
  return NULL;
×
1127
}
1128

1129
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
8,139✔
1130
  char buf[128] = {0};
8,139✔
1131
  sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
8,139✔
1132
  *pId = taosStrdup(buf);
8,139✔
1133

1134
  if (*pId == NULL) {
8,140!
1135
    return terrno;
×
1136
  } else {
1137
    return TSDB_CODE_SUCCESS;
8,140✔
1138
  }
1139
}
1140

1141
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
432✔
1142
  int32_t           code;
1143
  SStreamDataBlock* pData;
1144

1145
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock), (void**)&pData);
432✔
1146
  if (code) {
432!
1147
    stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
×
1148
    return terrno = code;
×
1149
  }
1150

1151
  pData->type = STREAM_INPUT__DATA_RETRIEVE;
432✔
1152
  pData->srcVgId = 0;
432✔
1153

1154
  code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
432✔
1155
  if (code != TSDB_CODE_SUCCESS) {
432!
1156
    stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code));
×
1157
    taosFreeQitem(pData);
×
1158
    return code;
×
1159
  }
1160

1161
  code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
432✔
1162
  if (code != TSDB_CODE_SUCCESS) {
432!
1163
    stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
×
1164
            pTask->id.idStr);
1165
  }
1166

1167
  return code;
432✔
1168
}
1169

1170
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
432✔
1171
  int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
432✔
1172
  if (code != 0) {
432!
1173
    return code;
×
1174
  }
1175
  return streamTrySchedExec(pTask);
432✔
1176
}
1177

1178
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
3,524✔
1179

1180
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
×
1181
  if (pTransId != NULL) {
×
1182
    *pTransId = pTask->chkInfo.pActiveInfo->transId;
×
1183
  }
1184

1185
  if (pCheckpointId != NULL) {
×
1186
    *pCheckpointId = pTask->chkInfo.pActiveInfo->activeId;
×
1187
  }
1188
}
×
1189

1190
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) {
28✔
1191
  pTask->chkInfo.pActiveInfo->activeId = activeCheckpointId;
28✔
1192
  return TSDB_CODE_SUCCESS;
28✔
1193
}
1194

1195
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
×
1196
  pTask->chkInfo.pActiveInfo->transId = transId;
×
1197
  pTask->chkInfo.pActiveInfo->activeId = checkpointId;
×
1198
  pTask->chkInfo.pActiveInfo->failedId = checkpointId;
×
1199
  stDebug("s-task:%s set failed checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
×
1200
}
×
1201

1202
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
8,173✔
1203
  SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo));
8,173✔
1204
  if (pInfo == NULL) {
8,174!
1205
    return terrno;
×
1206
  }
1207

1208
  int32_t code = taosThreadMutexInit(&pInfo->lock, NULL);
8,174✔
1209
  if (code != TSDB_CODE_SUCCESS) {
8,174!
1210
    return code;
×
1211
  }
1212

1213
  pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
8,174✔
1214
  pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
8,174✔
1215
  pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
8,174✔
1216

1217
  *pRes = pInfo;
8,174✔
1218
  return code;
8,174✔
1219
}
1220

1221
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
32,069✔
1222
  if (pInfo == NULL) {
32,069✔
1223
    return;
24,021✔
1224
  }
1225

1226
  streamMutexDestroy(&pInfo->lock);
8,048✔
1227
  taosArrayDestroy(pInfo->pDispatchTriggerList);
8,049✔
1228
  pInfo->pDispatchTriggerList = NULL;
8,047✔
1229
  taosArrayDestroy(pInfo->pReadyMsgList);
8,047✔
1230
  pInfo->pReadyMsgList = NULL;
8,049✔
1231
  taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
8,049✔
1232
  pInfo->pCheckpointReadyRecvList = NULL;
8,048✔
1233

1234
  SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
8,048✔
1235
  if (pTriggerTmr->tmrHandle != NULL) {
8,048✔
1236
    streamTmrStop(pTriggerTmr->tmrHandle);
1,396✔
1237
    pTriggerTmr->tmrHandle = NULL;
1,396✔
1238
  }
1239

1240
  SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
8,048✔
1241
  if (pReadyTmr->tmrHandle != NULL) {
8,048✔
1242
    streamTmrStop(pReadyTmr->tmrHandle);
1,385✔
1243
    pReadyTmr->tmrHandle = NULL;
1,385✔
1244
  }
1245

1246
  taosMemoryFree(pInfo);
8,048✔
1247
}
1248

1249
//NOTE: clear the checkpoint id, and keep the failed id
1250
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
2,461✔
1251
  pInfo->activeId = 0;
2,461✔
1252
  pInfo->transId = 0;
2,461✔
1253
  pInfo->allUpstreamTriggerRecv = 0;
2,461✔
1254
  pInfo->dispatchTrigger = false;
2,461✔
1255
//  pInfo->failedId = 0;
1256

1257
  taosArrayClear(pInfo->pDispatchTriggerList);
2,461✔
1258
  taosArrayClear(pInfo->pCheckpointReadyRecvList);
2,461✔
1259
}
2,461✔
1260

1261
const char* streamTaskGetExecType(int32_t type) {
104,312✔
1262
  switch (type) {
104,312!
1263
    case STREAM_EXEC_T_EXTRACT_WAL_DATA:
44,090✔
1264
      return "scan-wal-file";
44,090✔
1265
    case STREAM_EXEC_T_START_ALL_TASKS:
9,483✔
1266
      return "start-all-tasks";
9,483✔
1267
    case STREAM_EXEC_T_START_ONE_TASK:
5,107✔
1268
      return "start-one-task";
5,107✔
1269
    case STREAM_EXEC_T_RESTART_ALL_TASKS:
3✔
1270
      return "restart-all-tasks";
3✔
1271
    case STREAM_EXEC_T_STOP_ALL_TASKS:
4,871✔
1272
      return "stop-all-tasks";
4,871✔
1273
    case STREAM_EXEC_T_RESUME_TASK:
6,457✔
1274
      return "resume-task-from-idle";
6,457✔
UNCOV
1275
    case STREAM_EXEC_T_ADD_FAILED_TASK:
×
UNCOV
1276
      return "record-start-failed-task";
×
1277
    case 0:
34,358✔
1278
      return "exec-all-tasks";
34,358✔
1279
    default:
×
1280
      return "invalid-exec-type";
×
1281
  }
1282
}
1283

1284
int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) {
22,060✔
1285
  *pRefId = taosMemoryMalloc(sizeof(int64_t));
22,060✔
1286
  if (*pRefId != NULL) {
22,060!
1287
    **pRefId = pTask->id.refId;
22,060✔
1288
    int32_t code = metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
22,060✔
1289
    if (code != 0) {
22,060!
1290
      stError("s-task:%s failed to add refId:%" PRId64 " into refId-mgmt, code:%s", pTask->id.idStr, pTask->id.refId,
×
1291
              tstrerror(code));
1292
    }
1293
    return code;
22,060✔
1294
  } else {
1295
    stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno));
×
1296
    return terrno;
×
1297
  }
1298
}
1299

1300
void streamTaskFreeRefId(int64_t* pRefId) {
19,871✔
1301
  if (pRefId == NULL) {
19,871✔
1302
    return;
1,474✔
1303
  }
1304

1305
  metaRefMgtRemove(pRefId);
18,397✔
1306
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc