• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.31
/source/libs/stream/src/streamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "osDir.h"
18
#include "osMemory.h"
19
#include "streamInt.h"
20
#include "streamsm.h"
21
#include "tmisce.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25

26
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
27
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
28
static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
29
static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
30

31
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
5,341✔
32
  int32_t childId = taosArrayGetSize(pArray);
5,341✔
33
  pTask->info.selfChildId = childId;
5,341✔
34
  void* p = taosArrayPush(pArray, &pTask);
5,341✔
35
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
5,341!
36
}
37

UNCOV
38
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
×
UNCOV
39
  int32_t code = 0;
×
UNCOV
40
  char    buf[512] = {0};
×
41

UNCOV
42
  if (pTask->info.nodeId == nodeId) {  // execution task should be moved away
×
UNCOV
43
    bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
×
UNCOV
44
    code = epsetToStr(pEpSet, buf, tListLen(buf));
×
UNCOV
45
    if (code) { // print error and continue
×
46
      stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
47
      return code;
×
48
    }
49

UNCOV
50
    if (!isEqual) {
×
UNCOV
51
      (*pUpdated) = true;
×
UNCOV
52
      char tmp[512] = {0};
×
UNCOV
53
      code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));  // only for log file, ignore errors
×
UNCOV
54
      if (code) { // print error and continue
×
55
        stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
56
        return code;
×
57
      }
58

UNCOV
59
      epsetAssign(&pTask->info.epSet, pEpSet);
×
UNCOV
60
      stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
×
61
    } else {
UNCOV
62
      stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", pTask->id.taskId, nodeId, buf);
×
63
    }
64
  }
65

66
  // check for the dispatch info and the upstream task info
UNCOV
67
  int32_t level = pTask->info.taskLevel;
×
UNCOV
68
  if (level == TASK_LEVEL__SOURCE) {
×
UNCOV
69
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
×
UNCOV
70
  } else if (level == TASK_LEVEL__AGG) {
×
71
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
×
72
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
×
73
  } else {  // TASK_LEVEL__SINK
UNCOV
74
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
×
75
  }
76

UNCOV
77
  return code;
×
78
}
79

80
static void freeItem(void* p) {
×
81
  SStreamContinueExecInfo* pInfo = p;
×
82
  rpcFreeCont(pInfo->msg.pCont);
×
83
}
×
84

85
static void freeUpstreamItem(void* p) {
30,897✔
86
  SStreamUpstreamEpInfo** pInfo = p;
30,897✔
87
  taosMemoryFree(*pInfo);
30,897✔
88
}
30,906✔
89

90
static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
7,430✔
91
  SStreamUpstreamEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamUpstreamEpInfo));
7,430✔
92
  if (pEpInfo == NULL) {
7,430!
93
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
94
    return NULL;
×
95
  }
96

97
  pEpInfo->childId = pTask->info.selfChildId;
7,430✔
98
  pEpInfo->epSet = pTask->info.epSet;
7,430✔
99
  pEpInfo->nodeId = pTask->info.nodeId;
7,430✔
100
  pEpInfo->taskId = pTask->id.taskId;
7,430✔
101
  pEpInfo->stage = -1;
7,430✔
102

103
  return pEpInfo;
7,430✔
104
}
105

106
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
5,341✔
107
                       SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) {
108
  *p = NULL;
5,341✔
109

110
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
5,341✔
111
  if (pTask == NULL) {
5,341!
UNCOV
112
    stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId,
×
113
            (int32_t)sizeof(SStreamTask), tstrerror(terrno));
UNCOV
114
    return terrno;
×
115
  }
116

117
  pTask->ver = SSTREAM_TASK_VER;
5,341✔
118
  pTask->id.taskId = tGenIdPI32();
5,341✔
119
  pTask->id.streamId = streamId;
5,341✔
120

121
  pTask->info.taskLevel = taskLevel;
5,341✔
122
  pTask->info.fillHistory = fillHistory;
5,341✔
123
  pTask->info.delaySchedParam = triggerParam;
5,341✔
124
  pTask->subtableWithoutMd5 = subtableWithoutMd5;
5,341✔
125

126
  int32_t code = streamCreateStateMachine(pTask);
5,341✔
127
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
5,341!
UNCOV
128
    taosMemoryFreeClear(pTask);
×
UNCOV
129
    return code;
×
130
  }
131

132
  char buf[128] = {0};
5,341✔
133
  sprintf(buf, "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
5,341✔
134

135
  pTask->id.idStr = taosStrdup(buf);
5,341✔
136
  if (pTask->id.idStr == NULL) {
5,341!
UNCOV
137
    stError("s-task:0x%x failed to build task id, code: out of memory", pTask->id.taskId);
×
UNCOV
138
    return terrno;
×
139
  }
140

141
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
5,341✔
142
  pTask->status.taskStatus = fillHistory ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
5,341✔
143
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
5,341✔
144
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
5,341✔
145

146
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
5,341✔
147
  code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
5,341✔
148
  if (code) {
5,341!
UNCOV
149
    return code;
×
150
  }
151

152
  if (fillHistory && !hasFillhistory) {
5,341!
UNCOV
153
    stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId);
×
UNCOV
154
    return TSDB_CODE_INVALID_PARA;
×
155
  }
156

157
  epsetAssign(&(pTask->info.mnodeEpset), pEpset);
5,341✔
158

159
  code = addToTaskset(pTaskList, pTask);
5,341✔
160
  *p = pTask;
5,341✔
161

162
  return code;
5,341✔
163
}
164

165
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
175✔
166
  int64_t skip64;
167
  int8_t  skip8;
168
  int32_t skip32;
169
  int16_t skip16;
170
  SEpSet  epSet;
171

172
  if (tStartDecode(pDecoder) < 0) return -1;
175!
173
  if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
352!
174
  // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
175

176
  if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
176!
177
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
176!
178
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
176!
179
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
176!
180
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
176!
181
  if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
176!
182

183
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
176!
184
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
176!
185

186
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
176!
187
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
176!
188
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
176!
189
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
176!
190

191
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
352!
192
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
352!
193

194
  tEndDecode(pDecoder);
176✔
195
  return 0;
176✔
196
}
197

UNCOV
198
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
×
199
  int64_t ver;
UNCOV
200
  if (tStartDecode(pDecoder) < 0) return -1;
×
UNCOV
201
  if (tDecodeI64(pDecoder, &ver) < 0) return -1;
×
UNCOV
202
  if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
×
203

UNCOV
204
  if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
×
205

UNCOV
206
  int32_t taskId = 0;
×
UNCOV
207
  if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
×
208

UNCOV
209
  pTaskId->taskId = taskId;
×
UNCOV
210
  tEndDecode(pDecoder);
×
UNCOV
211
  return 0;
×
212
}
213

214
void tFreeStreamTask(void* pParam) {
23,345✔
215
  char*        p = NULL;
23,345✔
216
  SStreamTask* pTask = pParam;
23,345✔
217
  int32_t      taskId = pTask->id.taskId;
23,345✔
218

219
  STaskExecStatisInfo* pStatis = &pTask->execInfo;
23,345✔
220

221
  ETaskStatus status1 = TASK_STATUS__UNINIT;
23,345✔
222
  streamMutexLock(&pTask->lock);
23,345✔
223
  if (pTask->status.pSM != NULL) {
23,359✔
224
    SStreamTaskState status = streamTaskGetStatus(pTask);
10,781✔
225
    p = status.name;
10,769✔
226
    status1 = status.state;
10,769✔
227
  }
228
  streamMutexUnlock(&pTask->lock);
23,347✔
229

230
  stDebug("start to free s-task:0x%x %p, state:%s, refId:%" PRId64, taskId, pTask, p, pTask->id.refId);
23,355✔
231

232
  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
23,355✔
233
  stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
23,355✔
234
          ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
235
          " nextProcessVer:%" PRId64 ", checkpointCount:%d",
236
          taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs,
237
          pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
238

239
  if (pTask->schedInfo.pDelayTimer != NULL) {
23,355✔
240
    streamTmrStop(pTask->schedInfo.pDelayTimer);
134✔
241
    pTask->schedInfo.pDelayTimer = NULL;
134✔
242
  }
243

244
  if (pTask->hTaskInfo.pTimer != NULL) {
23,355✔
245
    streamTmrStop(pTask->hTaskInfo.pTimer);
571✔
246
    pTask->hTaskInfo.pTimer = NULL;
567✔
247
  }
248

249
  if (pTask->msgInfo.pRetryTmr != NULL) {
23,351✔
250
    streamTmrStop(pTask->msgInfo.pRetryTmr);
1,882✔
251
    pTask->msgInfo.pRetryTmr = NULL;
1,882✔
252
  }
253

254
  if (pTask->inputq.queue) {
23,351✔
255
    streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
5,409✔
256
    pTask->inputq.queue = NULL;
5,407✔
257
  }
258

259
  if (pTask->outputq.queue) {
23,349✔
260
    streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
5,407✔
261
    pTask->outputq.queue = NULL;
5,412✔
262
  }
263

264
  if (pTask->exec.qmsg) {
23,354✔
265
    taosMemoryFree(pTask->exec.qmsg);
12,658✔
266
  }
267

268
  if (pTask->exec.pExecutor) {
23,353✔
269
    qDestroyTask(pTask->exec.pExecutor);
2,858✔
270
    pTask->exec.pExecutor = NULL;
2,858✔
271
  }
272

273
  if (pTask->exec.pWalReader != NULL) {
23,353✔
274
    walCloseReader(pTask->exec.pWalReader);
2,762✔
275
    pTask->exec.pWalReader = NULL;
2,763✔
276
  }
277

278
  streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
23,354✔
279

280
  if (pTask->msgInfo.pData != NULL) {
23,359✔
281
    clearBufferedDispatchMsg(pTask);
5✔
282
  }
283

284
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
23,359✔
285
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
11,604!
286
    taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
11,603✔
287
    tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo);
11,605✔
288
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
11,609✔
289
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
11,755✔
290
    taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
9,791✔
291
  }
292

293
  streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
23,363✔
294
  streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0);
23,362✔
295

296
  if (pTask->pNameMap) {
23,363✔
297
    tSimpleHashCleanup(pTask->pNameMap);
640✔
298
  }
299

300
  streamDestroyStateMachine(pTask->status.pSM);
23,363✔
301
  pTask->status.pSM = NULL;
23,364✔
302

303
  streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
23,364✔
304

305
  taosMemoryFree(pTask->outputInfo.pTokenBucket);
23,359✔
306
  streamMutexDestroy(&pTask->lock);
23,364✔
307

308
  taosArrayDestroy(pTask->msgInfo.pSendInfo);
23,362✔
309
  pTask->msgInfo.pSendInfo = NULL;
23,365✔
310
  streamMutexDestroy(&pTask->msgInfo.lock);
23,365✔
311

312
  taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
23,365✔
313
  pTask->outputInfo.pNodeEpsetUpdateList = NULL;
23,365✔
314

315
  if (pTask->id.idStr != NULL) {
23,365✔
316
    taosMemoryFree((void*)pTask->id.idStr);
10,753✔
317
  }
318

319
  streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
23,365✔
320
  pTask->chkInfo.pActiveInfo = NULL;
23,364✔
321

322
  taosMemoryFree(pTask);
23,364✔
323
  stDebug("s-task:0x%x free task completed", taskId);
23,365✔
324
}
23,365✔
325

326
void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
23,360✔
327
  stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId);
23,360✔
328
  if (pTask->pState != NULL) {
23,362✔
329
    stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
2,859✔
330
    streamStateClose(pTask->pState, remove);
2,859✔
331

332
    if (remove) taskDbSetClearFileFlag(pTask->pBackend);
2,859✔
333
    taskDbRemoveRef(pTask->pBackend);
2,859✔
334
    pTask->pBackend = NULL;
2,858✔
335
    pTask->pState = NULL;
2,858✔
336
  } else {
337
    stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
20,503✔
338
            pTask->backendPath ? pTask->backendPath : "NULL");
339
    if (remove) {
20,503✔
340
      if (pTask->backendPath != NULL) {
1,453!
341
        stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
1,453✔
342
        taosRemoveDir(pTask->backendPath);
1,453✔
343
      }
344
    }
345
  }
346

347
  if (pTask->backendPath != NULL) {
23,356✔
348
    taosMemoryFree(pTask->backendPath);
5,409✔
349
    pTask->backendPath = NULL;
5,411✔
350
  }
351
}
23,358✔
352

353
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
5,457✔
354
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
5,457✔
355
  SDataRange*      pRange = &pTask->dataRange;
5,457✔
356

357
  // only set the version info for stream tasks without fill-history task
358
  if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
5,457✔
359
    pChkInfo->checkpointVer = ver - 1;  // only update when generating checkpoint
2,622✔
360
    pChkInfo->processedVer = ver - 1;   // already processed version
2,622✔
361
    pChkInfo->nextProcessVer = ver;     // next processed version
2,622✔
362

363
    pRange->range.maxVer = ver;
2,622✔
364
    pRange->range.minVer = ver;
2,622✔
365
  } else {
366
    // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
367
    // is set at the mnode.
368
    if (pTask->info.fillHistory == 1) {
2,835✔
369
      pChkInfo->checkpointVer = pRange->range.maxVer;
1,433✔
370
      pChkInfo->processedVer = pRange->range.maxVer;
1,433✔
371
      pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
1,433✔
372
    } else {
373
      pChkInfo->checkpointVer = pRange->range.minVer - 1;
1,402✔
374
      pChkInfo->processedVer = pRange->range.minVer - 1;
1,402✔
375
      pChkInfo->nextProcessVer = pRange->range.minVer;
1,402✔
376

377
      {  // for compatible purpose, remove it later
378
        if (pRange->range.minVer == 0) {
1,402✔
379
          pChkInfo->checkpointVer = 0;
707✔
380
          pChkInfo->processedVer = 0;
707✔
381
          pChkInfo->nextProcessVer = 1;
707✔
382
          stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
707✔
383
        }
384
      }
385
    }
386
  }
387
}
5,457✔
388

389
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
5,474✔
390
  int64_t streamId = 0;
5,474✔
391
  int32_t taskId = 0;
5,474✔
392

393
  if (pTask->info.fillHistory) {
5,474✔
394
    streamId = pTask->streamTaskId.streamId;
1,433✔
395
    taskId = pTask->streamTaskId.taskId;
1,433✔
396
  } else {
397
    streamId = pTask->id.streamId;
4,041✔
398
    taskId = pTask->id.taskId;
4,041✔
399
  }
400

401
  char    id[128] = {0};
5,474✔
402
  int32_t nBytes = sprintf(id, "0x%" PRIx64 "-0x%x", streamId, taskId);
5,474✔
403
  if (nBytes < 0 || nBytes >= sizeof(id)) {
5,474!
UNCOV
404
    return TSDB_CODE_OUT_OF_BUFFER;
×
405
  }
406

407
  int32_t len = strlen(pTask->pMeta->path);
5,476✔
408
  pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
5,476✔
409
  if (pTask->backendPath == NULL) {
5,473!
UNCOV
410
    return terrno;
×
411
  }
412

413
  (void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
5,473✔
414
  stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
5,473✔
415

416
  return 0;
5,472✔
417
}
418

419
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
5,455✔
420
  int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
5,455✔
421
  if (code) {
5,460!
UNCOV
422
    stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
×
UNCOV
423
    return code;
×
424
  }
425

426
  pTask->id.refId = 0;
5,460✔
427
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
5,460✔
428
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
5,460✔
429

430
  int32_t code1 = streamQueueOpen(512 << 10, &pTask->inputq.queue);
5,460✔
431
  int32_t code2 = streamQueueOpen(512 << 10, &pTask->outputq.queue);
5,466✔
432
  if (code1 || code2) {
5,472!
433
    stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
1!
UNCOV
434
    return TSDB_CODE_OUT_OF_MEMORY;
×
435
  }
436

437
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
5,471✔
438

439
  code = streamCreateStateMachine(pTask);
5,471✔
440
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
5,469!
UNCOV
441
    stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
×
442
            tstrerror(code));
443
    return code;
×
444
  }
445

446
  pTask->execInfo.created = taosGetTimestampMs();
5,469✔
447
  setInitialVersionInfo(pTask, ver);
5,469✔
448

449
  pTask->pMeta = pMeta;
5,469✔
450
  pTask->pMsgCb = pMsgCb;
5,469✔
451
  pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry));
5,469✔
452
  if (pTask->msgInfo.pSendInfo == NULL) {
5,470!
UNCOV
453
    stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr);
×
UNCOV
454
    return terrno;
×
455
  }
456

457
  code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
5,470✔
458
  if (code) {
5,465!
UNCOV
459
    stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code));
×
UNCOV
460
    return code;
×
461
  }
462

463
  TdThreadMutexAttr attr = {0};
5,465✔
464
  code = taosThreadMutexAttrInit(&attr);
5,465✔
465
  if (code != 0) {
5,464!
UNCOV
466
    stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
467
    return code;
×
468
  }
469

470
  code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
5,464✔
471
  if (code != 0) {
5,461!
UNCOV
472
    stError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
473
    return code;
×
474
  }
475

476
  code = taosThreadMutexInit(&pTask->lock, &attr);
5,461✔
477
  if (code) {
5,460!
UNCOV
478
    return code;
×
479
  }
480

481
  code = taosThreadMutexAttrDestroy(&attr);
5,460✔
482
  if (code) {
5,455!
UNCOV
483
    return code;
×
484
  }
485

486
  streamTaskOpenAllUpstreamInput(pTask);
5,455✔
487

488
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
5,466✔
489
  pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
5,466✔
490
  if (pOutputInfo->pTokenBucket == NULL) {
5,470!
UNCOV
491
    stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno));
×
UNCOV
492
    return terrno;
×
493
  }
494

495
  // 2MiB per second for sink task
496
  // 50 times sink operator per second
497
  code = streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
5,470✔
498
  if (code) {
5,469!
UNCOV
499
    return code;
×
500
  }
501

502
  pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
5,469✔
503
  if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
5,468!
UNCOV
504
    stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno));
×
UNCOV
505
    return terrno;
×
506
  }
507

508
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
5,468✔
509
  if (pTask->taskCheckInfo.pList == NULL) {
5,472!
UNCOV
510
    stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno));
×
UNCOV
511
    return terrno;
×
512
  }
513

514
  if (pTask->chkInfo.pActiveInfo == NULL) {
5,472!
515
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
5,472✔
516
    if (code) {
5,473!
UNCOV
517
      stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
518
      return code;
×
519
    }
520
  }
521

522
  return streamTaskSetBackendPath(pTask);
5,473✔
523
}
524

525
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
39,198✔
526
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
39,198✔
527
    return 0;
2,510✔
528
  }
529

530
  int32_t type = pTask->outputInfo.type;
36,688✔
531
  if (type == TASK_OUTPUT__TABLE) {
36,688✔
532
    return 0;
150✔
533
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
36,538✔
534
    return 1;
4,352✔
535
  } else {
536
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
32,186✔
537
    return taosArrayGetSize(vgInfo);
32,186✔
538
  }
539
}
540

541
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) { return taosArrayGetSize(pTask->upstreamInfo.pList); }
7,188✔
542

543
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
7,430✔
544
  SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
7,430✔
545
  if (pEpInfo == NULL) {
7,430!
UNCOV
546
    return terrno;
×
547
  }
548

549
  if (pTask->upstreamInfo.pList == NULL) {
7,430✔
550
    pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES);
2,605✔
551
  }
552

553
  void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
7,430✔
554
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
7,430!
555
}
556

UNCOV
557
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
×
UNCOV
558
  int32_t code = 0;
×
UNCOV
559
  char    buf[512] = {0};
×
UNCOV
560
  code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore error since it is only for log file.
×
UNCOV
561
  if (code != 0) {  // print error and continue
×
UNCOV
562
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
563
    return code;
×
564
  }
565

UNCOV
566
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
×
UNCOV
567
  for (int32_t i = 0; i < numOfUpstream; ++i) {
×
UNCOV
568
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
×
UNCOV
569
    if (pInfo->nodeId == nodeId) {
×
UNCOV
570
      bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
×
UNCOV
571
      if (!equal) {
×
UNCOV
572
        *pUpdated = true;
×
573

UNCOV
574
        char tmp[512] = {0};
×
UNCOV
575
        code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
×
UNCOV
576
        if (code != 0) {  // print error and continue
×
UNCOV
577
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
578
          return code;
×
579
        }
580

UNCOV
581
        epsetAssign(&pInfo->epSet, pEpSet);
×
UNCOV
582
        stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
×
583
                pInfo->taskId, nodeId, buf, tmp);
584
      } else {
UNCOV
585
        stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId,
×
586
                pInfo->taskId, nodeId, buf);
587
      }
588

UNCOV
589
      break;
×
590
    }
591
  }
592

UNCOV
593
  return code;
×
594
}
595

596
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
23,361✔
597
  if (pUpstreamInfo->pList != NULL) {
23,361✔
598
    taosArrayDestroyEx(pUpstreamInfo->pList, freeUpstreamItem);
20,593✔
599
    pUpstreamInfo->numOfClosed = 0;
20,590✔
600
    pUpstreamInfo->pList = NULL;
20,590✔
601
  }
602
}
23,358✔
603

604
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
503✔
605
  STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
503✔
606
  pDispatcher->taskId = pDownstreamTask->id.taskId;
503✔
607
  pDispatcher->nodeId = pDownstreamTask->info.nodeId;
503✔
608
  pDispatcher->epSet = pDownstreamTask->info.epSet;
503✔
609

610
  pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
503✔
611
  pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
503✔
612
}
503✔
613

UNCOV
614
int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
×
UNCOV
615
  char    buf[512] = {0};
×
UNCOV
616
  int32_t code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore the error since only for log files.
×
UNCOV
617
  if (code != 0) {                                        // print error and continue
×
UNCOV
618
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
619
    return code;
×
620
  }
621

UNCOV
622
  int32_t id = pTask->id.taskId;
×
UNCOV
623
  int8_t  type = pTask->outputInfo.type;
×
624

UNCOV
625
  if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
UNCOV
626
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
627

UNCOV
628
    for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
×
UNCOV
629
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
×
UNCOV
630
      if (pVgInfo == NULL) {
×
UNCOV
631
        continue;
×
632
      }
633

UNCOV
634
      if (pVgInfo->vgId == nodeId) {
×
UNCOV
635
        bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
×
UNCOV
636
        if (!isEqual) {
×
UNCOV
637
          *pUpdated = true;
×
638

UNCOV
639
          char tmp[512] = {0};
×
UNCOV
640
          code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
×
UNCOV
641
          if (code != 0) {  // print error and continue
×
UNCOV
642
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
643
            return code;
×
644
          }
645

UNCOV
646
          epsetAssign(&pVgInfo->epSet, pEpSet);
×
UNCOV
647
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
×
648
                  nodeId, buf, tmp);
649
        } else {
UNCOV
650
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
×
651
                  pVgInfo->taskId, nodeId, buf);
652
        }
UNCOV
653
        break;
×
654
      }
655
    }
UNCOV
656
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
×
UNCOV
657
    STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
×
658
    if (pDispatcher->nodeId == nodeId) {
×
659
      bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet);
×
660
      if (!equal) {
×
661
        *pUpdated = true;
×
662

663
        char tmp[512] = {0};
×
UNCOV
664
        code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
×
665
        if (code != 0) {  // print error and continue
×
666
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
667
          return code;
×
668
        }
669

UNCOV
670
        epsetAssign(&pDispatcher->epSet, pEpSet);
×
UNCOV
671
        stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
×
672
                nodeId, buf, tmp);
673
      } else {
UNCOV
674
        stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
×
675
                pDispatcher->taskId, nodeId, buf);
676
      }
677
    }
678
  }
679

UNCOV
680
  return code;
×
681
}
682

683
int32_t streamTaskStop(SStreamTask* pTask) {
1,082✔
684
  int32_t     vgId = pTask->pMeta->vgId;
1,082✔
685
  int64_t     st = taosGetTimestampMs();
1,082✔
686
  const char* id = pTask->id.idStr;
1,082✔
687

688
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
1,082✔
689
  if (code) {
1,082!
UNCOV
690
    stError("failed to handle STOP event, s-task:%s, code:%s", id, tstrerror(code));
×
UNCOV
691
    return code;
×
692
  }
693

694
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
1,082✔
695
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
644✔
696
    if (code != TSDB_CODE_SUCCESS) {
644!
UNCOV
697
      stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
×
698
    }
699
  }
700

701
  while (!streamTaskIsIdle(pTask)) {
1,082!
UNCOV
702
    stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
×
703
            pTask->info.taskLevel);
704
    taosMsleep(100);
×
705
  }
706

707
  int64_t el = taosGetTimestampMs() - st;
1,081✔
708
  stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
1,081✔
709
  return code;
1,081✔
710
}
711

UNCOV
712
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
×
UNCOV
713
  STaskExecStatisInfo* p = &pTask->execInfo;
×
714

UNCOV
715
  int32_t numOfNodes = taosArrayGetSize(pNodeList);
×
UNCOV
716
  int64_t prevTs = p->latestUpdateTs;
×
717

UNCOV
718
  p->latestUpdateTs = taosGetTimestampMs();
×
UNCOV
719
  p->updateCount += 1;
×
UNCOV
720
  stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
×
721
          numOfNodes, p->updateCount, prevTs);
722

UNCOV
723
  bool updated = false;
×
UNCOV
724
  for (int32_t i = 0; i < numOfNodes; ++i) {
×
UNCOV
725
    SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
×
UNCOV
726
    if (pInfo == NULL) {
×
UNCOV
727
      continue;
×
728
    }
729

UNCOV
730
    int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
×
UNCOV
731
    if (code) {
×
UNCOV
732
      stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
×
733
    }
734
  }
735

UNCOV
736
  return updated;
×
737
}
738

739
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
5,465✔
740
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
5,465✔
741
    return;
2,792✔
742
  }
743

744
  int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList);
2,673✔
745
  for (int32_t i = 0; i < size; ++i) {
10,256✔
746
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
7,576✔
747
    pInfo->stage = -1;
7,579✔
748
  }
749

750
  stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
2,680✔
751
}
752

753
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
8,786✔
754
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
8,786✔
755
  if (num == 0) {
8,800✔
756
    return;
4,605✔
757
  }
758

759
  for (int32_t i = 0; i < num; ++i) {
15,727✔
760
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
11,527✔
761
    pInfo->dataAllowed = true;
11,532✔
762
  }
763

764
  pTask->upstreamInfo.numOfClosed = 0;
4,200✔
765
  stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num);
4,200✔
766
}
767

768
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
3,017✔
769
  SStreamUpstreamEpInfo* pInfo = NULL;
3,017✔
770
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
3,017✔
771

772
  if ((pInfo != NULL) && pInfo->dataAllowed) {
3,016!
773
    pInfo->dataAllowed = false;
3,016✔
774
    if (pTask->upstreamInfo.numOfClosed < streamTaskGetNumOfUpstream(pTask)) {
3,016!
775
      int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
3,013✔
776
    } else {
UNCOV
777
      stError("s-task:%s not inc closed input, since they have been all closed already", pTask->id.idStr);
×
778
    }
779
  }
780
}
3,031✔
781

UNCOV
782
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
×
UNCOV
783
  SStreamUpstreamEpInfo* pInfo = NULL;
×
784
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
×
785

786
  if (pInfo != NULL && (!pInfo->dataAllowed)) {
×
UNCOV
787
    int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
×
788
    stDebug("s-task:%s open inputQ for upstream:0x%x, remain closed:%d", pTask->id.idStr, taskId, t);
×
789
    pInfo->dataAllowed = true;
×
790
  }
791
}
×
792

793
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
×
UNCOV
794
  return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
×
795
}
796

797
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
32,264✔
798
  bool ret = false;
32,264✔
799

800
  streamMutexLock(&pTask->lock);
32,264✔
801
  if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
32,296✔
802
    pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
24,573✔
803
    ret = true;
24,573✔
804
  }
805

806
  streamMutexUnlock(&pTask->lock);
32,296✔
807
  return ret;
32,284✔
808
}
809

810
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
23,997✔
811
  streamMutexLock(&pTask->lock);
23,997✔
812
  int8_t status = pTask->status.schedStatus;
24,063✔
813
  if (status == TASK_SCHED_STATUS__WAITING) {
24,063✔
814
    pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
24,052✔
815
  }
816
  streamMutexUnlock(&pTask->lock);
24,063✔
817

818
  return status;
24,066✔
819
}
820

821
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
512✔
822
  streamMutexLock(&pTask->lock);
512✔
823
  int8_t status = pTask->status.schedStatus;
512✔
824
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
512✔
825
  streamMutexUnlock(&pTask->lock);
512✔
826

827
  return status;
512✔
828
}
829

830
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
3,019✔
831
  int32_t      code = 0;
3,019✔
832
  SStreamMeta* pMeta = pTask->pMeta;
3,019✔
833
  SStreamTask* pStreamTask = NULL;
3,019✔
834

835
  if (pTask->info.fillHistory == 0) {
3,019!
836
    return code;
3,019✔
837
  }
838

UNCOV
839
  code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
×
840
  if (code == 0) {
6!
841
    stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
×
842
            (int32_t)pTask->streamTaskId.taskId);
843

UNCOV
844
    streamMutexLock(&(pStreamTask->lock));
×
UNCOV
845
    CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
×
846

847
    if (resetRelHalt) {
×
UNCOV
848
      stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
×
849
              pTask->streamTaskId.taskId, streamTaskGetStatusStr(pStreamTask->status.taskStatus),
850
              streamTaskGetStatus(pStreamTask).name);
UNCOV
851
      pStreamTask->status.taskStatus = TASK_STATUS__READY;
×
852
    }
853

UNCOV
854
    code = streamMetaSaveTask(pMeta, pStreamTask);
×
UNCOV
855
    streamMutexUnlock(&(pStreamTask->lock));
×
856

857
    streamMetaReleaseTask(pMeta, pStreamTask);
×
858
  }
859

860
  return code;
6✔
861
}
862

863
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
6✔
864
  SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
6✔
865
  if (pReq == NULL) {
6!
UNCOV
866
    return terrno;
×
867
  }
868

869
  pReq->head.vgId = vgId;
6✔
870
  pReq->taskId = pTaskId->taskId;
6✔
871
  pReq->streamId = pTaskId->streamId;
6✔
872
  pReq->resetRelHalt = resetRelHalt;
6✔
873

874
  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
6✔
875
  int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
6✔
876
  if (code != TSDB_CODE_SUCCESS) {
6!
UNCOV
877
    stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
×
878
  } else {
879
    stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
6!
880
  }
881

882
  return code;
6✔
883
}
884

885
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
2,682✔
886
  int32_t                code = 0;
2,682✔
887
  int32_t                tlen = 0;
2,682✔
888
  int32_t                vgId = pTask->pMeta->vgId;
2,682✔
889
  const char*            id = pTask->id.idStr;
2,682✔
890
  SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo;
2,682✔
891

892
  SCheckpointReport req = {.streamId = pTask->id.streamId,
2,682✔
893
                           .taskId = pTask->id.taskId,
2,682✔
894
                           .nodeId = vgId,
895
                           .dropHTask = dropRelHTask,
896
                           .transId = pActive->transId,
2,682✔
897
                           .checkpointId = pActive->activeId,
2,682✔
898
                           .checkpointVer = pCheckpointInfo->processedVer,
2,682✔
899
                           .checkpointTs = pCheckpointInfo->startTs};
2,682✔
900

901
  tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code);
2,682!
902
  if (code < 0) {
2,678!
UNCOV
903
    stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code));
×
UNCOV
904
    return -1;
×
905
  }
906

907
  void* buf = rpcMallocCont(tlen);
2,678✔
908
  if (buf == NULL) {
2,681!
UNCOV
909
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId,
×
910
            tstrerror(TSDB_CODE_OUT_OF_MEMORY));
911
    return -1;
×
912
  }
913

914
  SEncoder encoder;
915
  tEncoderInit(&encoder, buf, tlen);
2,681✔
916
  if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
2,680!
UNCOV
917
    rpcFreeCont(buf);
×
UNCOV
918
    tEncoderClear(&encoder);
×
919
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
×
920
    return -1;
×
921
  }
922
  tEncoderClear(&encoder);
2,680✔
923

924
  SRpcMsg msg = {0};
2,679✔
925
  initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen);
2,679✔
926
  stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId);
2,678✔
927

928
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
2,678✔
929
}
930

931
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
32,183✔
932
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
32,183✔
933
  return id;
32,183✔
934
}
935

936
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
571✔
937
  pInfo->waitInterval = LAUNCH_HTASK_INTERVAL;
571✔
938
  pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL);
571✔
939
  pInfo->retryTimes = 0;
571✔
940
}
571✔
941

942
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
565✔
943
  pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
565✔
944
  pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
565✔
945
  pInfo->retryTimes += 1;
565✔
946
}
565✔
947

948
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) {
3,961✔
949
  pEntry->id.streamId = pTask->id.streamId;
3,961✔
950
  pEntry->id.taskId = pTask->id.taskId;
3,961✔
951
  pEntry->stage = -1;
3,961✔
952
  pEntry->nodeId = pTask->info.nodeId;
3,961✔
953
  pEntry->status = TASK_STATUS__STOP;
3,961✔
954
}
3,961✔
955

956
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) {
25,932✔
957
  pDst->stage = pSrc->stage;
25,932✔
958
  pDst->inputQUsed = pSrc->inputQUsed;
25,932✔
959
  pDst->inputRate = pSrc->inputRate;
25,932✔
960
  pDst->procsTotal = pSrc->procsTotal;
25,932✔
961
  pDst->procsThroughput = pSrc->procsThroughput;
25,932✔
962
  pDst->outputTotal = pSrc->outputTotal;
25,932✔
963
  pDst->outputThroughput = pSrc->outputThroughput;
25,932✔
964
  pDst->processedVer = pSrc->processedVer;
25,932✔
965
  pDst->verRange = pSrc->verRange;
25,932✔
966
  pDst->sinkQuota = pSrc->sinkQuota;
25,932✔
967
  pDst->sinkDataSize = pSrc->sinkDataSize;
25,932✔
968
  pDst->checkpointInfo = pSrc->checkpointInfo;
25,932✔
969
  pDst->startCheckpointId = pSrc->startCheckpointId;
25,932✔
970
  pDst->startCheckpointVer = pSrc->startCheckpointVer;
25,932✔
971
  pDst->status = pSrc->status;
25,932✔
972

973
  pDst->startTime = pSrc->startTime;
25,932✔
974
  pDst->hTaskId = pSrc->hTaskId;
25,932✔
975
}
25,932✔
976

977
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
26,178✔
978
  SStreamMeta*         pMeta = pTask->pMeta;
26,178✔
979
  STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
26,178✔
980

981
  STaskStatusEntry entry = {
78,534✔
982
      .id = streamTaskGetTaskId(pTask),
26,178✔
983
      .status = streamTaskGetStatus(pTask).state,
26,178✔
984
      .nodeId = pMeta->vgId,
26,178✔
985
      .stage = pMeta->stage,
26,178✔
986

987
      .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)),
26,178✔
988
      .startTime = pExecInfo->readyTs,
26,178✔
989
      .checkpointInfo.latestId = pTask->chkInfo.checkpointId,
26,178✔
990
      .checkpointInfo.latestVer = pTask->chkInfo.checkpointVer,
26,178✔
991
      .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
26,178✔
992
      .checkpointInfo.latestSize = 0,
993
      .checkpointInfo.remoteBackup = 0,
994
      .checkpointInfo.consensusChkptId = 0,
995
      .checkpointInfo.consensusTs = 0,
996
      .hTaskId = pTask->hTaskInfo.id.taskId,
26,178✔
997
      .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
26,178✔
998
      .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
26,178✔
999
      .procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput),
26,178✔
1000
      .outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput),
26,178✔
1001
      .startCheckpointId = pExecInfo->startCheckpointId,
26,178✔
1002
      .startCheckpointVer = pExecInfo->startCheckpointVer,
26,178✔
1003
  };
1004
  return entry;
26,178✔
1005
}
1006

1007
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
125✔
1008
  SStreamMeta* pMeta = pTask->pMeta;
125✔
1009
  int32_t      code = 0;
125✔
1010

1011
  int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
125✔
1012
  stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
127!
1013

1014
  // in case of fill-history task, stop the tsdb file scan operation.
1015
  if (pTask->info.fillHistory == 1) {
127!
UNCOV
1016
    void* pExecutor = pTask->exec.pExecutor;
×
UNCOV
1017
    code = qKillTask(pExecutor, TSDB_CODE_SUCCESS);
×
1018
  }
1019

1020
  stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
127✔
1021
  return code;
127✔
1022
}
1023

1024
void streamTaskPause(SStreamTask* pTask) {
125✔
1025
  int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
125✔
1026
  if (code) {
127!
UNCOV
1027
    stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code));
×
1028
  }
1029
}
127✔
1030

1031
void streamTaskResume(SStreamTask* pTask) {
116✔
1032
  SStreamTaskState prevState = streamTaskGetStatus(pTask);
116✔
1033

1034
  SStreamMeta* pMeta = pTask->pMeta;
118✔
1035
  int32_t      code = streamTaskRestoreStatus(pTask);
118✔
1036
  if (code == TSDB_CODE_SUCCESS) {
120!
1037
    char*   pNew = streamTaskGetStatus(pTask).name;
120✔
1038
    int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
120✔
1039
    stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
120!
1040
  } else {
UNCOV
1041
    stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name,
×
1042
           pMeta->numOfPausedTasks);
1043
  }
1044
}
120✔
1045

1046
bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
28,251✔
1047

1048
// this task must success
1049
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
1,367✔
1050
  int32_t     code;
1051
  int32_t     tlen = 0;
1,367✔
1052
  int32_t     vgId = pTask->pMeta->vgId;
1,367✔
1053
  const char* id = pTask->id.idStr;
1,367✔
1054

1055
  SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
1,367✔
1056
  tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
1,367!
1057
  if (code < 0) {
1,367!
UNCOV
1058
    stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
×
UNCOV
1059
    return TSDB_CODE_INVALID_MSG;
×
1060
  }
1061

1062
  void* buf = rpcMallocCont(tlen);
1,367✔
1063
  if (buf == NULL) {
1,362!
UNCOV
1064
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:Out of memory", id, vgId);
×
UNCOV
1065
    return terrno;
×
1066
  }
1067

1068
  SEncoder encoder;
1069
  tEncoderInit(&encoder, buf, tlen);
1,362✔
1070
  if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
1,367!
UNCOV
1071
    rpcFreeCont(buf);
×
UNCOV
1072
    tEncoderClear(&encoder);
×
1073
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
×
1074
    return code;
×
1075
  }
1076

1077
  tEncoderClear(&encoder);
1,371✔
1078

1079
  SRpcMsg msg = {0};
1,372✔
1080
  initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
1,372✔
1081
  stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
1,366✔
1082

1083
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
1,366✔
1084
}
1085

1086
void streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo) {
26,340✔
1087
  *pEpInfo = NULL;
26,340✔
1088

1089
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
26,340✔
1090
  for (int32_t i = 0; i < num; ++i) {
49,398!
1091
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
49,428✔
1092
    if (pInfo == NULL) {
49,350!
UNCOV
1093
      return;
×
1094
    }
1095

1096
    if (pInfo->taskId == taskId) {
49,350✔
1097
      *pEpInfo = pInfo;
26,356✔
1098
      return;
26,356✔
1099
    }
1100
  }
1101

UNCOV
1102
  stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
×
1103
}
1104

UNCOV
1105
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
×
UNCOV
1106
  if (pTask->info.taskLevel == TASK_OUTPUT__FIXED_DISPATCH) {
×
1107
    if (pTask->outputInfo.fixedDispatcher.taskId == taskId) {
×
1108
      return &pTask->outputInfo.fixedDispatcher.epSet;
×
1109
    }
1110
  } else if (pTask->info.taskLevel == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
UNCOV
1111
    SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
1112
    for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
1113
      SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
×
1114
      if (pVgInfo == NULL) {
×
1115
        continue;
×
1116
      }
1117

UNCOV
1118
      if (pVgInfo->taskId == taskId) {
×
UNCOV
1119
        return &pVgInfo->epSet;
×
1120
      }
1121
    }
1122
  }
1123

UNCOV
1124
  return NULL;
×
1125
}
1126

1127
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
5,454✔
1128
  char buf[128] = {0};
5,454✔
1129
  sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
5,454✔
1130
  *pId = taosStrdup(buf);
5,454✔
1131

1132
  if (*pId == NULL) {
5,453!
UNCOV
1133
    return terrno;
×
1134
  } else {
1135
    return TSDB_CODE_SUCCESS;
5,455✔
1136
  }
1137
}
1138

1139
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
227✔
1140
  int32_t           code;
1141
  SStreamDataBlock* pData;
1142

1143
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock), (void**)&pData);
227✔
1144
  if (code) {
227!
UNCOV
1145
    stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
×
UNCOV
1146
    return terrno = code;
×
1147
  }
1148

1149
  pData->type = STREAM_INPUT__DATA_RETRIEVE;
227✔
1150
  pData->srcVgId = 0;
227✔
1151

1152
  code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
227✔
1153
  if (code != TSDB_CODE_SUCCESS) {
227!
UNCOV
1154
    stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
1155
    taosFreeQitem(pData);
×
1156
    return code;
×
1157
  }
1158

1159
  code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
227✔
1160
  if (code != TSDB_CODE_SUCCESS) {
226!
UNCOV
1161
    stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
×
1162
            pTask->id.idStr);
1163
  }
1164

1165
  return code;
226✔
1166
}
1167

1168
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
227✔
1169
  int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
227✔
1170
  if (code != 0) {
226!
UNCOV
1171
    return code;
×
1172
  }
1173
  return streamTrySchedExec(pTask);
226✔
1174
}
1175

1176
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
3,029✔
1177

UNCOV
1178
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
×
UNCOV
1179
  if (pTransId != NULL) {
×
1180
    *pTransId = pTask->chkInfo.pActiveInfo->transId;
×
1181
  }
1182

UNCOV
1183
  if (pCheckpointId != NULL) {
×
UNCOV
1184
    *pCheckpointId = pTask->chkInfo.pActiveInfo->activeId;
×
1185
  }
1186
}
×
1187

1188
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) {
28✔
1189
  pTask->chkInfo.pActiveInfo->activeId = activeCheckpointId;
28✔
1190
  return TSDB_CODE_SUCCESS;
28✔
1191
}
1192

UNCOV
1193
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
×
UNCOV
1194
  pTask->chkInfo.pActiveInfo->transId = transId;
×
1195
  pTask->chkInfo.pActiveInfo->activeId = checkpointId;
×
1196
  pTask->chkInfo.pActiveInfo->failedId = checkpointId;
×
1197
  stDebug("s-task:%s set failed checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
×
1198
}
×
1199

1200
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
5,503✔
1201
  SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo));
5,503✔
1202
  if (pInfo == NULL) {
5,502!
UNCOV
1203
    return terrno;
×
1204
  }
1205

1206
  int32_t code = taosThreadMutexInit(&pInfo->lock, NULL);
5,502✔
1207
  if (code != TSDB_CODE_SUCCESS) {
5,504!
UNCOV
1208
    return code;
×
1209
  }
1210

1211
  pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
5,504✔
1212
  pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
5,505✔
1213
  pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
5,503✔
1214

1215
  *pRes = pInfo;
5,507✔
1216
  return code;
5,507✔
1217
}
1218

1219
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
23,362✔
1220
  if (pInfo == NULL) {
23,362✔
1221
    return;
17,919✔
1222
  }
1223

1224
  streamMutexDestroy(&pInfo->lock);
5,443✔
1225
  taosArrayDestroy(pInfo->pDispatchTriggerList);
5,443✔
1226
  pInfo->pDispatchTriggerList = NULL;
5,446✔
1227
  taosArrayDestroy(pInfo->pReadyMsgList);
5,446✔
1228
  pInfo->pReadyMsgList = NULL;
5,445✔
1229
  taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
5,445✔
1230
  pInfo->pCheckpointReadyRecvList = NULL;
5,446✔
1231

1232
  SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
5,446✔
1233
  if (pTriggerTmr->tmrHandle != NULL) {
5,446✔
1234
    streamTmrStop(pTriggerTmr->tmrHandle);
689✔
1235
    pTriggerTmr->tmrHandle = NULL;
689✔
1236
  }
1237

1238
  SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
5,446✔
1239
  if (pReadyTmr->tmrHandle != NULL) {
5,446✔
1240
    streamTmrStop(pReadyTmr->tmrHandle);
677✔
1241
    pReadyTmr->tmrHandle = NULL;
677✔
1242
  }
1243

1244
  taosMemoryFree(pInfo);
5,446✔
1245
}
1246

1247
//NOTE: clear the checkpoint id, and keep the failed id
1248
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
1,938✔
1249
  pInfo->activeId = 0;
1,938✔
1250
  pInfo->transId = 0;
1,938✔
1251
  pInfo->allUpstreamTriggerRecv = 0;
1,938✔
1252
  pInfo->dispatchTrigger = false;
1,938✔
1253
//  pInfo->failedId = 0;
1254

1255
  taosArrayClear(pInfo->pDispatchTriggerList);
1,938✔
1256
  taosArrayClear(pInfo->pCheckpointReadyRecvList);
1,936✔
1257
}
1,933✔
1258

1259
const char* streamTaskGetExecType(int32_t type) {
12,440✔
1260
  switch (type) {
12,440!
1261
    case STREAM_EXEC_T_EXTRACT_WAL_DATA:
4,848✔
1262
      return "scan-wal-file";
4,848✔
1263
    case STREAM_EXEC_T_START_ALL_TASKS:
2,423✔
1264
      return "start-all-tasks";
2,423✔
1265
    case STREAM_EXEC_T_START_ONE_TASK:
1,096✔
1266
      return "start-one-task";
1,096✔
UNCOV
1267
    case STREAM_EXEC_T_RESTART_ALL_TASKS:
×
UNCOV
1268
      return "restart-all-tasks";
×
1269
    case STREAM_EXEC_T_STOP_ALL_TASKS:
990✔
1270
      return "stop-all-tasks";
990✔
1271
    case STREAM_EXEC_T_RESUME_TASK:
659✔
1272
      return "resume-task-from-idle";
659✔
UNCOV
1273
    case STREAM_EXEC_T_ADD_FAILED_TASK:
×
UNCOV
1274
      return "record-start-failed-task";
×
1275
    case 0:
2,456✔
1276
      return "exec-all-tasks";
2,456✔
UNCOV
1277
    default:
×
UNCOV
1278
      return "invalid-exec-type";
×
1279
  }
1280
}
1281

1282
int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) {
14,381✔
1283
  *pRefId = taosMemoryMalloc(sizeof(int64_t));
14,381✔
1284
  if (*pRefId != NULL) {
14,389!
1285
    **pRefId = pTask->id.refId;
14,391✔
1286
    int32_t code = metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
14,391✔
1287
    if (code != 0) {
14,400!
UNCOV
1288
      stError("s-task:%s failed to add refId:%" PRId64 " into refId-mgmt, code:%s", pTask->id.idStr, pTask->id.refId,
×
1289
              tstrerror(code));
1290
    }
1291
    return code;
14,400✔
1292
  } else {
UNCOV
1293
    stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno));
×
UNCOV
1294
    return terrno;
×
1295
  }
1296
}
1297

1298
void streamTaskFreeRefId(int64_t* pRefId) {
13,368✔
1299
  if (pRefId == NULL) {
13,368✔
1300
    return;
734✔
1301
  }
1302

1303
  metaRefMgtRemove(pRefId);
12,634✔
1304
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc