• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3625

26 Feb 2025 10:19AM UTC coverage: 63.633% (+0.1%) from 63.485%
#3625

push

travis-ci

web-flow
Merge pull request #29914 from taosdata/feat/TS-5613-3.0

feat:[TS-5613]support bool in cast

148738 of 299799 branches covered (49.61%)

Branch coverage included in aggregate %.

233124 of 300297 relevant lines covered (77.63%)

17654074.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.27
/source/libs/stream/src/streamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "osDir.h"
18
#include "osMemory.h"
19
#include "streamInt.h"
20
#include "streamsm.h"
21
#include "tmisce.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25
#include "streamMsg.h"
26

27
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
28
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
29
static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
30
static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
31

32
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
14,047✔
33
  int32_t childId = taosArrayGetSize(pArray);
14,047✔
34
  pTask->info.selfChildId = childId;
14,047✔
35
  void* p = taosArrayPush(pArray, &pTask);
14,047✔
36
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
14,047!
37
}
38

39
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
217✔
40
  int32_t code = 0;
217✔
41
  char    buf[512] = {0};
217✔
42

43
  if (pTask->info.nodeId == nodeId) {  // execution task should be moved away
217✔
44
    bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
66✔
45
    code = epsetToStr(pEpSet, buf, tListLen(buf));
66✔
46
    if (code) { // print error and continue
66!
47
      stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
48
      return code;
×
49
    }
50

51
    if (!isEqual) {
66✔
52
      (*pUpdated) = true;
60✔
53
      char tmp[512] = {0};
60✔
54
      code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));  // only for log file, ignore errors
60✔
55
      if (code) { // print error and continue
60!
56
        stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
57
        return code;
×
58
      }
59

60
      epsetAssign(&pTask->info.epSet, pEpSet);
60✔
61
      stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
60!
62
    } else {
63
      stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", pTask->id.taskId, nodeId, buf);
6!
64
    }
65
  }
66

67
  // check for the dispatch info and the upstream task info
68
  int32_t level = pTask->info.taskLevel;
217✔
69
  if (level == TASK_LEVEL__SOURCE) {
217✔
70
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
100✔
71
  } else if (level == TASK_LEVEL__AGG) {
117✔
72
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
17✔
73
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
17✔
74
  } else {  // TASK_LEVEL__SINK
75
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
100✔
76
  }
77

78
  return code;
217✔
79
}
80

81
static void freeItem(void* p) {
×
82
  SStreamContinueExecInfo* pInfo = p;
×
83
  rpcFreeCont(pInfo->msg.pCont);
×
84
}
×
85

86
static void freeUpstreamItem(void* p) {
87,112✔
87
  SStreamUpstreamEpInfo** pInfo = p;
87,112✔
88
  taosMemoryFree(*pInfo);
87,112!
89
}
87,119✔
90

91
static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
20,160✔
92
  SStreamUpstreamEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamUpstreamEpInfo));
20,160!
93
  if (pEpInfo == NULL) {
20,160!
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
95
    return NULL;
×
96
  }
97

98
  pEpInfo->childId = pTask->info.selfChildId;
20,160✔
99
  pEpInfo->epSet = pTask->info.epSet;
20,160✔
100
  pEpInfo->nodeId = pTask->info.nodeId;
20,160✔
101
  pEpInfo->taskId = pTask->id.taskId;
20,160✔
102
  pEpInfo->stage = -1;
20,160✔
103

104
  return pEpInfo;
20,160✔
105
}
106

107
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int32_t trigger,
14,047✔
108
                       int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
109
                       SStreamTask** p) {
110
  *p = NULL;
14,047✔
111

112
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
14,047!
113
  if (pTask == NULL) {
14,047!
114
    stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId,
×
115
            (int32_t)sizeof(SStreamTask), tstrerror(terrno));
116
    return terrno;
×
117
  }
118

119
  pTask->ver = SSTREAM_TASK_VER;
14,047✔
120
  pTask->id.taskId = tGenIdPI32();
14,047✔
121
  pTask->id.streamId = streamId;
14,047✔
122

123
  pTask->info.taskLevel = taskLevel;
14,047✔
124
  pTask->info.fillHistory = fillHistory;
14,047✔
125
  pTask->info.trigger = trigger;
14,047✔
126
  pTask->info.delaySchedParam = triggerParam;
14,047✔
127
  pTask->subtableWithoutMd5 = subtableWithoutMd5;
14,047✔
128

129
  int32_t code = streamCreateStateMachine(pTask);
14,047✔
130
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
14,047!
131
    taosMemoryFreeClear(pTask);
×
132
    return code;
×
133
  }
134

135
  char    buf[128] = {0};
14,047✔
136
  int32_t ret = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
14,047✔
137
  if (ret < 0 || ret >= tListLen(buf)) {
14,047!
138
    stError("s-task:0x%x failed to set the taskIdstr, code: out of buffer", pTask->id.taskId);
×
139
    return TSDB_CODE_OUT_OF_BUFFER;
×
140
  }
141

142
  pTask->id.idStr = taosStrdup(buf);
14,047!
143
  if (pTask->id.idStr == NULL) {
14,047!
144
    stError("s-task:0x%x failed to build task id, code: out of memory", pTask->id.taskId);
×
145
    return terrno;
×
146
  }
147

148
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
14,047✔
149
  pTask->status.taskStatus = fillHistory ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
14,047✔
150
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
14,047✔
151
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
14,047✔
152

153
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
14,047✔
154
  code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
14,047✔
155
  if (code) {
14,047!
156
    return code;
×
157
  }
158

159
  if (fillHistory && !hasFillhistory) {
14,047!
160
    stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId);
×
161
    return TSDB_CODE_INVALID_PARA;
×
162
  }
163

164
  epsetAssign(&(pTask->info.mnodeEpset), pEpset);
14,047✔
165

166
  code = addToTaskset(pTaskList, pTask);
14,047✔
167
  *p = pTask;
14,047✔
168

169
  return code;
14,047✔
170
}
171

172
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
639✔
173
  int64_t skip64;
174
  int8_t  skip8;
175
  int32_t skip32;
176
  int16_t skip16;
177
  SEpSet  epSet;
178

179
  if (tStartDecode(pDecoder) < 0) return -1;
639!
180
  if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
1,278✔
181
  // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
182

183
  if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
591!
184
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
591!
185
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
591!
186
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
591!
187
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
591!
188
  if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
591!
189

190
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
591!
191
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
591!
192

193
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
591!
194
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
591!
195
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
591!
196
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
591!
197

198
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
1,182!
199
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
1,182!
200

201
  tEndDecode(pDecoder);
591✔
202
  return 0;
591✔
203
}
204

205
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
12✔
206
  int64_t ver;
207
  if (tStartDecode(pDecoder) < 0) return -1;
12!
208
  if (tDecodeI64(pDecoder, &ver) < 0) return -1;
12!
209
  if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
12!
210

211
  if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
24!
212

213
  int32_t taskId = 0;
12✔
214
  if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
12!
215

216
  pTaskId->taskId = taskId;
12✔
217
  tEndDecode(pDecoder);
12✔
218
  return 0;
12✔
219
}
220

221
void tFreeStreamTask(void* pParam) {
63,019✔
222
  char*        p = NULL;
63,019✔
223
  SStreamTask* pTask = pParam;
63,019✔
224
  int32_t      taskId = pTask->id.taskId;
63,019✔
225

226
  STaskExecStatisInfo* pStatis = &pTask->execInfo;
63,019✔
227

228
  ETaskStatus status1 = TASK_STATUS__UNINIT;
63,019✔
229
  streamMutexLock(&pTask->lock);
63,019✔
230
  if (pTask->status.pSM != NULL) {
63,049✔
231
    SStreamTaskState status = streamTaskGetStatus(pTask);
28,820✔
232
    p = status.name;
28,809✔
233
    status1 = status.state;
28,809✔
234
  }
235
  streamMutexUnlock(&pTask->lock);
63,038✔
236

237
  stDebug("start to free s-task:0x%x %p, state:%s, refId:%" PRId64, taskId, pTask, p, pTask->id.refId);
63,038✔
238

239
  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
63,038✔
240
  stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
63,038✔
241
          ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
242
          " nextProcessVer:%" PRId64 ", checkpointCount:%d",
243
          taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs,
244
          pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
245

246
  if (pTask->schedInfo.pDelayTimer != NULL) {
63,038✔
247
    streamTmrStop(pTask->schedInfo.pDelayTimer);
1,303✔
248
    pTask->schedInfo.pDelayTimer = NULL;
1,303✔
249
  }
250

251
  if (pTask->hTaskInfo.pTimer != NULL) {
63,038✔
252
    streamTmrStop(pTask->hTaskInfo.pTimer);
2,035✔
253
    pTask->hTaskInfo.pTimer = NULL;
2,035✔
254
  }
255

256
  if (pTask->msgInfo.pRetryTmr != NULL) {
63,038✔
257
    streamTmrStop(pTask->msgInfo.pRetryTmr);
5,528✔
258
    pTask->msgInfo.pRetryTmr = NULL;
5,527✔
259
  }
260

261
  if (pTask->inputq.queue) {
63,037✔
262
    streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
14,730✔
263
    pTask->inputq.queue = NULL;
14,741✔
264
  }
265

266
  if (pTask->outputq.queue) {
63,048✔
267
    streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
14,737✔
268
    pTask->outputq.queue = NULL;
14,741✔
269
  }
270

271
  if (pTask->exec.qmsg) {
63,052✔
272
    taosMemoryFree(pTask->exec.qmsg);
33,207!
273
  }
274

275
  if (pTask->exec.pExecutor) {
63,052✔
276
    qDestroyTask(pTask->exec.pExecutor);
7,477✔
277
    pTask->exec.pExecutor = NULL;
7,476✔
278
  }
279

280
  if (pTask->exec.pWalReader != NULL) {
63,051✔
281
    walCloseReader(pTask->exec.pWalReader);
7,413✔
282
    pTask->exec.pWalReader = NULL;
7,413✔
283
  }
284

285
  streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
63,051✔
286

287
  if (pTask->msgInfo.pData != NULL) {
63,045✔
288
    clearBufferedDispatchMsg(pTask);
31✔
289
  }
290

291
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
63,045✔
292
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
30,950✔
293
    taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
30,952!
294
    tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo);
30,951✔
295
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
30,953✔
296
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
32,095✔
297
    taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
27,206✔
298
  }
299

300
  streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
63,044✔
301
  streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0);
63,046✔
302

303
  if (pTask->pNameMap) {
63,042✔
304
    tSimpleHashCleanup(pTask->pNameMap);
2,388✔
305
  }
306

307
  streamDestroyStateMachine(pTask->status.pSM);
63,042✔
308
  pTask->status.pSM = NULL;
63,044✔
309

310
  streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
63,044✔
311

312
  taosMemoryFree(pTask->outputInfo.pTokenBucket);
63,049!
313
  streamMutexDestroy(&pTask->lock);
63,048✔
314

315
  taosArrayDestroy(pTask->msgInfo.pSendInfo);
63,048✔
316
  pTask->msgInfo.pSendInfo = NULL;
63,047✔
317
  streamMutexDestroy(&pTask->msgInfo.lock);
63,047✔
318

319
  taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
63,046✔
320
  pTask->outputInfo.pNodeEpsetUpdateList = NULL;
63,046✔
321

322
  if (pTask->id.idStr != NULL) {
63,046✔
323
    taosMemoryFree((void*)pTask->id.idStr);
28,782!
324
  }
325

326
  streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
63,050✔
327
  pTask->chkInfo.pActiveInfo = NULL;
63,048✔
328

329
  taosArrayDestroyP(pTask->notifyInfo.pNotifyAddrUrls, NULL);
63,048✔
330
  taosMemoryFreeClear(pTask->notifyInfo.streamName);
63,046!
331
  taosMemoryFreeClear(pTask->notifyInfo.stbFullName);
63,046!
332
  tDeleteSchemaWrapper(pTask->notifyInfo.pSchemaWrapper);
63,046!
333

334
  pTask->notifyEventStat = (STaskNotifyEventStat){0};
63,042✔
335

336
  taosMemoryFree(pTask);
63,042!
337
  stDebug("s-task:0x%x free task completed", taskId);
63,045✔
338
}
63,045✔
339

340
void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
63,041✔
341
  stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId);
63,041✔
342
  if (pTask->pState != NULL) {
63,042✔
343
    stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
7,476✔
344
    streamStateClose(pTask->pState, remove);
7,476✔
345

346
    if (remove) taskDbSetClearFileFlag(pTask->pBackend);
7,477✔
347
    taskDbRemoveRef(pTask->pBackend);
7,477✔
348
    pTask->pBackend = NULL;
7,477✔
349
    pTask->pState = NULL;
7,477✔
350
  } else {
351
    stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
55,566✔
352
            pTask->backendPath ? pTask->backendPath : "NULL");
353
    if (remove) {
55,566✔
354
      if (pTask->backendPath != NULL) {
3,470!
355
        stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
3,473✔
356
        taosRemoveDir(pTask->backendPath);
3,473✔
357
      }
358
    }
359
  }
360

361
  if (pTask->backendPath != NULL) {
63,039✔
362
    taosMemoryFree(pTask->backendPath);
14,736!
363
    pTask->backendPath = NULL;
14,739✔
364
  }
365
}
63,042✔
366

367
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
14,725✔
368
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
14,725✔
369
  SDataRange*      pRange = &pTask->dataRange;
14,725✔
370

371
  // only set the version info for stream tasks without fill-history task
372
  if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
14,725✔
373
    pChkInfo->checkpointVer = ver - 1;  // only update when generating checkpoint
4,795✔
374
    pChkInfo->processedVer = ver - 1;   // already processed version
4,795✔
375
    pChkInfo->nextProcessVer = ver;     // next processed version
4,795✔
376

377
    pRange->range.maxVer = ver;
4,795✔
378
    pRange->range.minVer = ver;
4,795✔
379
  } else {
380
    // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
381
    // is set at the mnode.
382
    if (pTask->info.fillHistory == 1) {
9,930✔
383
      pChkInfo->checkpointVer = pRange->range.maxVer;
5,027✔
384
      pChkInfo->processedVer = pRange->range.maxVer;
5,027✔
385
      pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
5,027✔
386
    } else {
387
      pChkInfo->checkpointVer = pRange->range.minVer - 1;
4,903✔
388
      pChkInfo->processedVer = pRange->range.minVer - 1;
4,903✔
389
      pChkInfo->nextProcessVer = pRange->range.minVer;
4,903✔
390

391
      {  // for compatible purpose, remove it later
392
        if (pRange->range.minVer == 0) {
4,903✔
393
          pChkInfo->checkpointVer = 0;
2,474✔
394
          pChkInfo->processedVer = 0;
2,474✔
395
          pChkInfo->nextProcessVer = 1;
2,474✔
396
          stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
2,474✔
397
        }
398
      }
399
    }
400
  }
401
}
14,725✔
402

403
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
14,740✔
404
  int64_t streamId = 0;
14,740✔
405
  int32_t taskId = 0;
14,740✔
406

407
  if (pTask->info.fillHistory) {
14,740✔
408
    streamId = pTask->streamTaskId.streamId;
5,026✔
409
    taskId = pTask->streamTaskId.taskId;
5,026✔
410
  } else {
411
    streamId = pTask->id.streamId;
9,714✔
412
    taskId = pTask->id.taskId;
9,714✔
413
  }
414

415
  char    id[128] = {0};
14,740✔
416
  int32_t nBytes = snprintf(id, tListLen(id), "0x%" PRIx64 "-0x%x", streamId, taskId);
14,740✔
417
  if (nBytes < 0 || nBytes >= sizeof(id)) {
14,740!
418
    return TSDB_CODE_OUT_OF_BUFFER;
×
419
  }
420

421
  int32_t len = strlen(pTask->pMeta->path);
14,741✔
422
  pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
14,741!
423
  if (pTask->backendPath == NULL) {
14,740!
424
    return terrno;
×
425
  }
426

427
  int32_t code = snprintf(pTask->backendPath, len + nBytes + 2, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
14,740✔
428
  if (code < 0 || code >= len + nBytes + 2) {
14,740!
429
    stError("s-task:%s failed to set backend path:%s, code: out of buffer", pTask->id.idStr, pTask->backendPath);
2!
430
    return TSDB_CODE_OUT_OF_BUFFER;
×
431
  } else {
432
    stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
14,738✔
433
    return 0;
14,741✔
434
  }
435
}
436

437
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
14,729✔
438
  int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
14,729✔
439
  if (code) {
14,731!
440
    stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
×
441
    return code;
×
442
  }
443

444
  pTask->id.refId = 0;
14,731✔
445
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
14,731✔
446
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
14,731✔
447

448
  int32_t code1 = streamQueueOpen(512 << 10, &pTask->inputq.queue);
14,731✔
449
  int32_t code2 = streamQueueOpen(512 << 10, &pTask->outputq.queue);
14,724✔
450
  if (code1 || code2) {
14,742!
451
    stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
×
452
    return TSDB_CODE_OUT_OF_MEMORY;
×
453
  }
454

455
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
14,742✔
456

457
  code = streamCreateStateMachine(pTask);
14,742✔
458
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
14,729!
459
    stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
×
460
            tstrerror(code));
461
    return code;
×
462
  }
463

464
  pTask->execInfo.created = taosGetTimestampMs();
14,738✔
465
  setInitialVersionInfo(pTask, ver);
14,738✔
466

467
  pTask->pMeta = pMeta;
14,732✔
468
  pTask->pMsgCb = pMsgCb;
14,732✔
469
  pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry));
14,732✔
470
  if (pTask->msgInfo.pSendInfo == NULL) {
14,738!
471
    stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr);
×
472
    return terrno;
×
473
  }
474

475
  code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
14,738✔
476
  if (code) {
14,734!
477
    stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code));
×
478
    return code;
×
479
  }
480

481
  TdThreadMutexAttr attr = {0};
14,734✔
482
  code = taosThreadMutexAttrInit(&attr);
14,734✔
483
  if (code != 0) {
14,725!
484
    stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
×
485
    return code;
×
486
  }
487

488
  code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
14,725✔
489
  if (code != 0) {
14,722!
490
    stError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
×
491
    return code;
×
492
  }
493

494
  code = taosThreadMutexInit(&pTask->lock, &attr);
14,722✔
495
  if (code) {
14,730!
496
    return code;
×
497
  }
498

499
  code = taosThreadMutexAttrDestroy(&attr);
14,730✔
500
  if (code) {
14,718!
501
    return code;
×
502
  }
503

504
  streamTaskOpenAllUpstreamInput(pTask);
14,718✔
505

506
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
14,714✔
507
  pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
14,714!
508
  if (pOutputInfo->pTokenBucket == NULL) {
14,733!
509
    stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno));
×
510
    return terrno;
×
511
  }
512

513
  // 2MiB per second for sink task
514
  // 50 times sink operator per second
515
  code = streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
14,733✔
516
  if (code) {
14,733!
517
    return code;
×
518
  }
519

520
  pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
14,733✔
521
  if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
14,741!
522
    stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno));
×
523
    return terrno;
×
524
  }
525

526
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
14,741✔
527
  if (pTask->taskCheckInfo.pList == NULL) {
14,737!
528
    stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno));
×
529
    return terrno;
×
530
  }
531

532
  if (pTask->chkInfo.pActiveInfo == NULL) {
14,737✔
533
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
14,735✔
534
    if (code) {
14,740!
535
      stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code));
×
536
      return code;
×
537
    }
538
  }
539

540
  return streamTaskSetBackendPath(pTask);
14,742✔
541
}
542

543
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
122,315✔
544
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
122,315✔
545
    return 0;
6,788✔
546
  }
547

548
  int32_t type = pTask->outputInfo.type;
115,527✔
549
  if (type == TASK_OUTPUT__TABLE) {
115,527✔
550
    return 0;
267✔
551
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
115,260✔
552
    return 1;
11,354✔
553
  } else {
554
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
103,906✔
555
    return taosArrayGetSize(vgInfo);
103,906✔
556
  }
557
}
558

559
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) { return taosArrayGetSize(pTask->upstreamInfo.pList); }
19,609✔
560

561
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
20,160✔
562
  SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
20,160✔
563
  if (pEpInfo == NULL) {
20,160!
564
    return terrno;
×
565
  }
566

567
  if (pTask->upstreamInfo.pList == NULL) {
20,160✔
568
    pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES);
6,976✔
569
  }
570

571
  void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
20,160✔
572
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
20,160!
573
}
574

575
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
117✔
576
  int32_t code = 0;
117✔
577
  char    buf[512] = {0};
117✔
578
  code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore error since it is only for log file.
117✔
579
  if (code != 0) {  // print error and continue
117!
580
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
581
    return code;
×
582
  }
583

584
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
117✔
585
  for (int32_t i = 0; i < numOfUpstream; ++i) {
223✔
586
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
140✔
587
    if (pInfo->nodeId == nodeId) {
140✔
588
      bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
34✔
589
      if (!equal) {
34✔
590
        *pUpdated = true;
24✔
591

592
        char tmp[512] = {0};
24✔
593
        code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
24✔
594
        if (code != 0) {  // print error and continue
24!
595
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
596
          return code;
×
597
        }
598

599
        epsetAssign(&pInfo->epSet, pEpSet);
24✔
600
        stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
24!
601
                pInfo->taskId, nodeId, buf, tmp);
602
      } else {
603
        stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId,
10!
604
                pInfo->taskId, nodeId, buf);
605
      }
606

607
      break;
34✔
608
    }
609
  }
610

611
  return code;
117✔
612
}
613

614
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
63,036✔
615
  if (pUpstreamInfo->pList != NULL) {
63,036✔
616
    taosArrayDestroyEx(pUpstreamInfo->pList, freeUpstreamItem);
55,930✔
617
    pUpstreamInfo->numOfClosed = 0;
55,937✔
618
    pUpstreamInfo->pList = NULL;
55,937✔
619
  }
620
}
63,043✔
621

622
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
950✔
623
  STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
950✔
624
  pDispatcher->taskId = pDownstreamTask->id.taskId;
950✔
625
  pDispatcher->nodeId = pDownstreamTask->info.nodeId;
950✔
626
  pDispatcher->epSet = pDownstreamTask->info.epSet;
950✔
627

628
  pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
950✔
629
  pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
950✔
630
}
950✔
631

632
int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
117✔
633
  char    buf[512] = {0};
117✔
634
  int32_t code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore the error since only for log files.
117✔
635
  if (code != 0) {                                        // print error and continue
117!
636
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
637
    return code;
×
638
  }
639

640
  int32_t id = pTask->id.taskId;
117✔
641
  int8_t  type = pTask->outputInfo.type;
117✔
642

643
  if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
117✔
644
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
32✔
645

646
    for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
57✔
647
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
55✔
648
      if (pVgInfo == NULL) {
55!
649
        continue;
×
650
      }
651

652
      if (pVgInfo->vgId == nodeId) {
55✔
653
        bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
30✔
654
        if (!isEqual) {
30✔
655
          *pUpdated = true;
24✔
656

657
          char tmp[512] = {0};
24✔
658
          code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
24✔
659
          if (code != 0) {  // print error and continue
24!
660
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
661
            return code;
×
662
          }
663

664
          epsetAssign(&pVgInfo->epSet, pEpSet);
24✔
665
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
24!
666
                  nodeId, buf, tmp);
667
        } else {
668
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
6!
669
                  pVgInfo->taskId, nodeId, buf);
670
        }
671
        break;
30✔
672
      }
673
    }
674
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
85!
675
    STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
85✔
676
    if (pDispatcher->nodeId == nodeId) {
85✔
677
      bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet);
4✔
678
      if (!equal) {
4!
679
        *pUpdated = true;
×
680

681
        char tmp[512] = {0};
×
682
        code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
×
683
        if (code != 0) {  // print error and continue
×
684
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
685
          return code;
×
686
        }
687

688
        epsetAssign(&pDispatcher->epSet, pEpSet);
×
689
        stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
×
690
                nodeId, buf, tmp);
691
      } else {
692
        stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
4!
693
                pDispatcher->taskId, nodeId, buf);
694
      }
695
    }
696
  }
697

698
  return code;
117✔
699
}
700

701
int32_t streamTaskStop(SStreamTask* pTask) {
2,909✔
702
  int32_t     vgId = pTask->pMeta->vgId;
2,909✔
703
  int64_t     st = taosGetTimestampMs();
2,909✔
704
  const char* id = pTask->id.idStr;
2,909✔
705

706
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
2,909✔
707
  if (code) {
2,909!
708
    stError("failed to handle STOP event, s-task:%s, code:%s", id, tstrerror(code));
×
709
    return code;
×
710
  }
711

712
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
2,909✔
713
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
1,525✔
714
    if (code != TSDB_CODE_SUCCESS) {
1,525!
715
      stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
×
716
    }
717
  }
718

719
  while (!streamTaskIsIdle(pTask)) {
2,909!
720
    stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
×
721
            pTask->info.taskLevel);
722
    taosMsleep(100);
×
723
  }
724

725
  int64_t el = taosGetTimestampMs() - st;
2,909✔
726
  stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
2,909✔
727
  return code;
2,909✔
728
}
729

730
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
99✔
731
  STaskExecStatisInfo* p = &pTask->execInfo;
99✔
732

733
  int32_t numOfNodes = taosArrayGetSize(pNodeList);
99✔
734
  int64_t prevTs = p->latestUpdateTs;
99✔
735

736
  p->latestUpdateTs = taosGetTimestampMs();
99✔
737
  p->updateCount += 1;
99✔
738
  stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
99!
739
          numOfNodes, p->updateCount, prevTs);
740

741
  bool updated = false;
99✔
742
  for (int32_t i = 0; i < numOfNodes; ++i) {
316✔
743
    SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
217✔
744
    if (pInfo == NULL) {
217!
745
      continue;
×
746
    }
747

748
    int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
217✔
749
    if (code) {
217!
750
      stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
×
751
    }
752
  }
753

754
  return updated;
99✔
755
}
756

757
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
14,738✔
758
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
14,738✔
759
    return;
7,413✔
760
  }
761

762
  int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList);
7,325✔
763
  for (int32_t i = 0; i < size; ++i) {
28,225✔
764
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
20,897✔
765
    pInfo->stage = -1;
20,899✔
766
  }
767

768
  stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
7,328✔
769
}
770

771
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
24,639✔
772
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
24,639✔
773
  if (num == 0) {
24,648✔
774
    return;
12,343✔
775
  }
776

777
  for (int32_t i = 0; i < num; ++i) {
47,265✔
778
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
34,967✔
779
    pInfo->dataAllowed = true;
34,960✔
780
  }
781

782
  pTask->upstreamInfo.numOfClosed = 0;
12,298✔
783
  stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num);
12,298✔
784
}
785

786
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
8,291✔
787
  SStreamUpstreamEpInfo* pInfo = NULL;
8,291✔
788
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
8,291✔
789

790
  if ((pInfo != NULL) && pInfo->dataAllowed) {
8,287!
791
    pInfo->dataAllowed = false;
8,287✔
792
    if (pTask->upstreamInfo.numOfClosed < streamTaskGetNumOfUpstream(pTask)) {
8,287!
793
      int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
8,287✔
794
    } else {
795
      stError("s-task:%s not inc closed input, since they have been all closed already", pTask->id.idStr);
×
796
    }
797
  }
798
}
8,292✔
799

800
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
1✔
801
  SStreamUpstreamEpInfo* pInfo = NULL;
1✔
802
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
1✔
803

804
  if (pInfo != NULL && (!pInfo->dataAllowed)) {
1!
805
    int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
1✔
806
    stDebug("s-task:%s open inputQ for upstream:0x%x, remain closed:%d", pTask->id.idStr, taskId, t);
1!
807
    pInfo->dataAllowed = true;
1✔
808
  }
809
}
1✔
810

811
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
×
812
  return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
×
813
}
814

815
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
109,189✔
816
  bool ret = false;
109,189✔
817

818
  streamMutexLock(&pTask->lock);
109,189✔
819
  if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
109,196✔
820
    pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
71,288✔
821
    ret = true;
71,288✔
822
  }
823

824
  streamMutexUnlock(&pTask->lock);
109,196✔
825
  return ret;
109,195✔
826
}
827

828
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
70,155✔
829
  streamMutexLock(&pTask->lock);
70,155✔
830
  int8_t status = pTask->status.schedStatus;
70,217✔
831
  if (status == TASK_SCHED_STATUS__WAITING) {
70,217✔
832
    pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
70,187✔
833
  }
834
  streamMutexUnlock(&pTask->lock);
70,217✔
835

836
  return status;
70,221✔
837
}
838

839
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
1,040✔
840
  streamMutexLock(&pTask->lock);
1,040✔
841
  int8_t status = pTask->status.schedStatus;
1,040✔
842
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
1,040✔
843
  streamMutexUnlock(&pTask->lock);
1,040✔
844

845
  return status;
1,040✔
846
}
847

848
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
7,017✔
849
  int32_t      code = 0;
7,017✔
850
  SStreamMeta* pMeta = pTask->pMeta;
7,017✔
851
  SStreamTask* pStreamTask = NULL;
7,017✔
852

853
  if (pTask->info.fillHistory == 0) {
7,017!
854
    return code;
7,019✔
855
  }
856

857
  code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
×
858
  if (code == 0) {
7!
859
    stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
×
860
            (int32_t)pTask->streamTaskId.taskId);
861

862
    streamMutexLock(&(pStreamTask->lock));
×
863
    CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
×
864

865
    if (resetRelHalt) {
×
866
      stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
×
867
              pTask->streamTaskId.taskId, streamTaskGetStatusStr(pStreamTask->status.taskStatus),
868
              streamTaskGetStatus(pStreamTask).name);
869
      pStreamTask->status.taskStatus = TASK_STATUS__READY;
×
870
    }
871

872
    code = streamMetaSaveTask(pMeta, pStreamTask);
×
873
    streamMutexUnlock(&(pStreamTask->lock));
×
874

875
    streamMetaReleaseTask(pMeta, pStreamTask);
×
876
  }
877

878
  return code;
7✔
879
}
880

881
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
7✔
882
  SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
7✔
883
  if (pReq == NULL) {
7!
884
    return terrno;
×
885
  }
886

887
  pReq->head.vgId = vgId;
7✔
888
  pReq->taskId = pTaskId->taskId;
7✔
889
  pReq->streamId = pTaskId->streamId;
7✔
890
  pReq->resetRelHalt = resetRelHalt;
7✔
891

892
  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
7✔
893
  int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
7✔
894
  if (code != TSDB_CODE_SUCCESS) {
7!
895
    stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
×
896
  } else {
897
    stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
7!
898
  }
899

900
  return code;
7✔
901
}
902

903
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
6,167✔
904
  int32_t                code = 0;
6,167✔
905
  int32_t                tlen = 0;
6,167✔
906
  int32_t                vgId = pTask->pMeta->vgId;
6,167✔
907
  const char*            id = pTask->id.idStr;
6,167✔
908
  SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo;
6,167✔
909

910
  SCheckpointReport req = {.streamId = pTask->id.streamId,
6,167✔
911
                           .taskId = pTask->id.taskId,
6,167✔
912
                           .nodeId = vgId,
913
                           .dropHTask = dropRelHTask,
914
                           .transId = pActive->transId,
6,167✔
915
                           .checkpointId = pActive->activeId,
6,167✔
916
                           .checkpointVer = pCheckpointInfo->processedVer,
6,167✔
917
                           .checkpointTs = pCheckpointInfo->startTs};
6,167✔
918

919
  tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code);
6,167!
920
  if (code < 0) {
6,166!
921
    stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code));
×
922
    return -1;
×
923
  }
924

925
  void* buf = rpcMallocCont(tlen);
6,166✔
926
  if (buf == NULL) {
6,165!
927
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId,
×
928
            tstrerror(TSDB_CODE_OUT_OF_MEMORY));
929
    return -1;
×
930
  }
931

932
  SEncoder encoder;
933
  tEncoderInit(&encoder, buf, tlen);
6,165✔
934
  if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
6,165!
935
    rpcFreeCont(buf);
×
936
    tEncoderClear(&encoder);
×
937
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
×
938
    return -1;
×
939
  }
940
  tEncoderClear(&encoder);
6,165✔
941

942
  SRpcMsg msg = {0};
6,168✔
943
  initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen);
6,168✔
944
  stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId);
6,165✔
945

946
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
6,165✔
947
}
948

949
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
79,964✔
950
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
79,964✔
951
  return id;
79,964✔
952
}
953

954
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
2,036✔
955
  pInfo->waitInterval = LAUNCH_HTASK_INTERVAL;
2,036✔
956
  pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL);
2,036✔
957
  pInfo->retryTimes = 0;
2,036✔
958
}
2,036✔
959

960
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
2,030✔
961
  pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
2,030✔
962
  pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
2,030✔
963
  pInfo->retryTimes += 1;
2,030✔
964
}
2,030✔
965

966
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) {
9,317✔
967
  pEntry->id.streamId = pTask->id.streamId;
9,317✔
968
  pEntry->id.taskId = pTask->id.taskId;
9,317✔
969
  pEntry->stage = -1;
9,317✔
970
  pEntry->nodeId = pTask->info.nodeId;
9,317✔
971
  pEntry->status = TASK_STATUS__STOP;
9,317✔
972
}
9,317✔
973

974
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) {
62,146✔
975
  pDst->stage = pSrc->stage;
62,146✔
976
  pDst->inputQUsed = pSrc->inputQUsed;
62,146✔
977
  pDst->inputRate = pSrc->inputRate;
62,146✔
978
  pDst->procsTotal = pSrc->procsTotal;
62,146✔
979
  pDst->procsThroughput = pSrc->procsThroughput;
62,146✔
980
  pDst->outputTotal = pSrc->outputTotal;
62,146✔
981
  pDst->outputThroughput = pSrc->outputThroughput;
62,146✔
982
  pDst->processedVer = pSrc->processedVer;
62,146✔
983
  pDst->verRange = pSrc->verRange;
62,146✔
984
  pDst->sinkQuota = pSrc->sinkQuota;
62,146✔
985
  pDst->sinkDataSize = pSrc->sinkDataSize;
62,146✔
986
  pDst->checkpointInfo = pSrc->checkpointInfo;
62,146✔
987
  pDst->startCheckpointId = pSrc->startCheckpointId;
62,146✔
988
  pDst->startCheckpointVer = pSrc->startCheckpointVer;
62,146✔
989
  pDst->status = pSrc->status;
62,146✔
990

991
  pDst->startTime = pSrc->startTime;
62,146✔
992
  pDst->hTaskId = pSrc->hTaskId;
62,146✔
993
  pDst->notifyEventStat = pSrc->notifyEventStat;
62,146✔
994
}
62,146✔
995

996
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
63,478✔
997
  SStreamMeta*         pMeta = pTask->pMeta;
63,478✔
998
  STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
63,478✔
999

1000
  STaskStatusEntry entry = {
190,434✔
1001
      .id = streamTaskGetTaskId(pTask),
63,478✔
1002
      .status = streamTaskGetStatus(pTask).state,
63,478✔
1003
      .nodeId = pMeta->vgId,
63,478✔
1004
      .stage = pMeta->stage,
63,478✔
1005

1006
      .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)),
63,478✔
1007
      .startTime = pExecInfo->readyTs,
63,478✔
1008
      .checkpointInfo.latestId = pTask->chkInfo.checkpointId,
63,478✔
1009
      .checkpointInfo.latestVer = pTask->chkInfo.checkpointVer,
63,478✔
1010
      .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
63,478✔
1011
      .checkpointInfo.latestSize = 0,
1012
      .checkpointInfo.remoteBackup = 0,
1013
      .checkpointInfo.consensusChkptId = 0,
1014
      .checkpointInfo.consensusTs = 0,
1015
      .hTaskId = pTask->hTaskInfo.id.taskId,
63,478✔
1016
      .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
63,478✔
1017
      .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
63,478✔
1018
      .procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput),
63,478✔
1019
      .outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput),
63,478✔
1020
      .startCheckpointId = pExecInfo->startCheckpointId,
63,478✔
1021
      .startCheckpointVer = pExecInfo->startCheckpointVer,
63,478✔
1022
      .notifyEventStat = pTask->notifyEventStat,
1023
  };
1024
  return entry;
63,478✔
1025
}
1026

1027
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
1,412✔
1028
  SStreamMeta* pMeta = pTask->pMeta;
1,412✔
1029
  int32_t      code = 0;
1,412✔
1030

1031
  int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
1,412✔
1032
  stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
1,414!
1033

1034
  // in case of fill-history task, stop the tsdb file scan operation.
1035
  if (pTask->info.fillHistory == 1) {
1,414✔
1036
    void* pExecutor = pTask->exec.pExecutor;
72✔
1037
    code = qKillTask(pExecutor, TSDB_CODE_SUCCESS);
72✔
1038
  }
1039

1040
  stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
1,414✔
1041
  return code;
1,414✔
1042
}
1043

1044
void streamTaskPause(SStreamTask* pTask) {
1,485✔
1045
  int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
1,485✔
1046
  if (code) {
1,486!
1047
    stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code));
×
1048
  }
1049
}
1,486✔
1050

1051
void streamTaskResume(SStreamTask* pTask) {
2,702✔
1052
  SStreamTaskState prevState = streamTaskGetStatus(pTask);
2,702✔
1053

1054
  SStreamMeta* pMeta = pTask->pMeta;
2,701✔
1055
  int32_t      code = streamTaskRestoreStatus(pTask);
2,701✔
1056
  if (code == TSDB_CODE_SUCCESS) {
2,701✔
1057
    char*   pNew = streamTaskGetStatus(pTask).name;
1,402✔
1058
    int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
1,401✔
1059
    stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
1,403!
1060
  } else {
1061
    stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name,
1,299!
1062
           pMeta->numOfPausedTasks);
1063
  }
1064
}
2,703✔
1065

1066
bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
79,223✔
1067

1068
// this task must success
1069
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
4,417✔
1070
  int32_t     code;
1071
  int32_t     tlen = 0;
4,417✔
1072
  int32_t     vgId = pTask->pMeta->vgId;
4,417✔
1073
  const char* id = pTask->id.idStr;
4,417✔
1074

1075
  SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
4,417✔
1076
  tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
4,417!
1077
  if (code < 0) {
4,416!
1078
    stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
×
1079
    return TSDB_CODE_INVALID_MSG;
×
1080
  }
1081

1082
  void* buf = rpcMallocCont(tlen);
4,416✔
1083
  if (buf == NULL) {
4,424!
1084
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:Out of memory", id, vgId);
×
1085
    return terrno;
×
1086
  }
1087

1088
  SEncoder encoder;
1089
  tEncoderInit(&encoder, buf, tlen);
4,424✔
1090
  if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
4,425!
1091
    rpcFreeCont(buf);
×
1092
    tEncoderClear(&encoder);
×
1093
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
×
1094
    return code;
×
1095
  }
1096

1097
  tEncoderClear(&encoder);
4,419✔
1098

1099
  SRpcMsg msg = {0};
4,422✔
1100
  initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
4,422✔
1101
  stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
4,421✔
1102

1103
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
4,421✔
1104
}
1105

1106
void streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo) {
99,745✔
1107
  *pEpInfo = NULL;
99,745✔
1108

1109
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
99,745✔
1110
  for (int32_t i = 0; i < num; ++i) {
199,862✔
1111
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
199,856✔
1112
    if (pInfo == NULL) {
199,844!
1113
      return;
×
1114
    }
1115

1116
    if (pInfo->taskId == taskId) {
199,844✔
1117
      *pEpInfo = pInfo;
99,736✔
1118
      return;
99,736✔
1119
    }
1120
  }
1121

1122
  stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
6!
1123
}
1124

1125
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
×
1126
  if (pTask->info.taskLevel == TASK_OUTPUT__FIXED_DISPATCH) {
×
1127
    if (pTask->outputInfo.fixedDispatcher.taskId == taskId) {
×
1128
      return &pTask->outputInfo.fixedDispatcher.epSet;
×
1129
    }
1130
  } else if (pTask->info.taskLevel == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
1131
    SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
1132
    for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
1133
      SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
×
1134
      if (pVgInfo == NULL) {
×
1135
        continue;
×
1136
      }
1137

1138
      if (pVgInfo->taskId == taskId) {
×
1139
        return &pVgInfo->epSet;
×
1140
      }
1141
    }
1142
  }
1143

1144
  return NULL;
×
1145
}
1146

1147
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
14,729✔
1148
  char buf[128] = {0};
14,729✔
1149
  int32_t code = snprintf(buf, tListLen(buf),"0x%" PRIx64 "-0x%x", streamId, taskId);
14,729✔
1150
  if (code < 0 || code >= tListLen(buf)) {
14,729!
1151
    return TSDB_CODE_OUT_OF_BUFFER;
×
1152
  }
1153

1154
  *pId = taosStrdup(buf);
14,737!
1155

1156
  if (*pId == NULL) {
14,730!
1157
    return terrno;
×
1158
  } else {
1159
    return TSDB_CODE_SUCCESS;
14,730✔
1160
  }
1161
}
1162

1163
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
543✔
1164
  int32_t           code;
1165
  SStreamDataBlock* pData;
1166

1167
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock), (void**)&pData);
543✔
1168
  if (code) {
544!
1169
    stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
×
1170
    return terrno = code;
×
1171
  }
1172

1173
  pData->type = STREAM_INPUT__DATA_RETRIEVE;
544✔
1174
  pData->srcVgId = 0;
544✔
1175

1176
  code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
544✔
1177
  if (code != TSDB_CODE_SUCCESS) {
544!
1178
    stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code));
×
1179
    taosFreeQitem(pData);
×
1180
    return code;
×
1181
  }
1182

1183
  code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
544✔
1184
  if (code != TSDB_CODE_SUCCESS) {
544!
1185
    stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
×
1186
            pTask->id.idStr);
1187
  }
1188

1189
  return code;
544✔
1190
}
1191

1192
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
544✔
1193
  int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
544✔
1194
  if (code != 0) {
544!
1195
    return code;
×
1196
  }
1197
  return streamTrySchedExec(pTask);
544✔
1198
}
1199

1200
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
7,023✔
1201

1202
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
×
1203
  if (pTransId != NULL) {
×
1204
    *pTransId = pTask->chkInfo.pActiveInfo->transId;
×
1205
  }
1206

1207
  if (pCheckpointId != NULL) {
×
1208
    *pCheckpointId = pTask->chkInfo.pActiveInfo->activeId;
×
1209
  }
1210
}
×
1211

1212
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) {
28✔
1213
  pTask->chkInfo.pActiveInfo->activeId = activeCheckpointId;
28✔
1214
  return TSDB_CODE_SUCCESS;
28✔
1215
}
1216

1217
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
×
1218
  pTask->chkInfo.pActiveInfo->transId = transId;
×
1219
  pTask->chkInfo.pActiveInfo->activeId = checkpointId;
×
1220
  pTask->chkInfo.pActiveInfo->failedId = checkpointId;
×
1221
  stDebug("s-task:%s set failed checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
×
1222
}
×
1223

1224
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
14,776✔
1225
  SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo));
14,776!
1226
  if (pInfo == NULL) {
14,778!
1227
    return terrno;
×
1228
  }
1229

1230
  int32_t code = taosThreadMutexInit(&pInfo->lock, NULL);
14,778✔
1231
  if (code != TSDB_CODE_SUCCESS) {
14,783!
1232
    return code;
×
1233
  }
1234

1235
  pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
14,783✔
1236
  pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
14,782✔
1237
  pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
14,778✔
1238

1239
  *pRes = pInfo;
14,784✔
1240
  return code;
14,784✔
1241
}
1242

1243
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
63,041✔
1244
  if (pInfo == NULL) {
63,041✔
1245
    return;
48,266✔
1246
  }
1247

1248
  streamMutexDestroy(&pInfo->lock);
14,775✔
1249
  taosArrayDestroy(pInfo->pDispatchTriggerList);
14,782✔
1250
  pInfo->pDispatchTriggerList = NULL;
14,782✔
1251
  taosArrayDestroy(pInfo->pReadyMsgList);
14,782✔
1252
  pInfo->pReadyMsgList = NULL;
14,780✔
1253
  taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
14,780✔
1254
  pInfo->pCheckpointReadyRecvList = NULL;
14,782✔
1255

1256
  SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
14,782✔
1257
  if (pTriggerTmr->tmrHandle != NULL) {
14,782✔
1258
    streamTmrStop(pTriggerTmr->tmrHandle);
2,194✔
1259
    pTriggerTmr->tmrHandle = NULL;
2,195✔
1260
  }
1261

1262
  SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
14,783✔
1263
  if (pReadyTmr->tmrHandle != NULL) {
14,783✔
1264
    streamTmrStop(pReadyTmr->tmrHandle);
2,156✔
1265
    pReadyTmr->tmrHandle = NULL;
2,156✔
1266
  }
1267

1268
  taosMemoryFree(pInfo);
14,783!
1269
}
1270

1271
// NOTE: clear the checkpoint id, and keep the failed id
1272
// failedId for a task will increase as the checkpoint I.D. increases.
1273
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
5,418✔
1274
  pInfo->activeId = 0;
5,418✔
1275
  pInfo->transId = 0;
5,418✔
1276
  pInfo->allUpstreamTriggerRecv = 0;
5,418✔
1277
  pInfo->dispatchTrigger = false;
5,418✔
1278

1279
  taosArrayClear(pInfo->pDispatchTriggerList);
5,418✔
1280
  taosArrayClear(pInfo->pCheckpointReadyRecvList);
5,414✔
1281
}
5,409✔
1282

1283
const char* streamTaskGetExecType(int32_t type) {
99,331✔
1284
  switch (type) {
99,331!
1285
    case STREAM_EXEC_T_EXTRACT_WAL_DATA:
34,440✔
1286
      return "scan-wal-file";
34,440✔
1287
    case STREAM_EXEC_T_START_ALL_TASKS:
7,731✔
1288
      return "start-all-tasks";
7,731✔
1289
    case STREAM_EXEC_T_START_ONE_TASK:
5,857✔
1290
      return "start-one-task";
5,857✔
1291
    case STREAM_EXEC_T_RESTART_ALL_TASKS:
23✔
1292
      return "restart-all-tasks";
23✔
1293
    case STREAM_EXEC_T_STOP_ALL_TASKS:
4,608✔
1294
      return "stop-all-tasks";
4,608✔
1295
    case STREAM_EXEC_T_RESUME_TASK:
7,355✔
1296
      return "resume-task-from-idle";
7,355✔
1297
    case STREAM_EXEC_T_ADD_FAILED_TASK:
×
1298
      return "record-start-failed-task";
×
1299
    case 0:
39,405✔
1300
      return "exec-all-tasks";
39,405✔
1301
    default:
×
1302
      return "invalid-exec-type";
×
1303
  }
1304
}
1305

1306
int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) {
40,126✔
1307
  *pRefId = taosMemoryMalloc(sizeof(int64_t));
40,126!
1308
  if (*pRefId != NULL) {
40,134!
1309
    **pRefId = pTask->id.refId;
40,134✔
1310
    int32_t code = metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
40,134✔
1311
    if (code != 0) {
40,145!
1312
      stError("s-task:%s failed to add refId:%" PRId64 " into refId-mgmt, code:%s", pTask->id.idStr, pTask->id.refId,
×
1313
              tstrerror(code));
1314
    }
1315
    return code;
40,145✔
1316
  } else {
1317
    stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno));
×
1318
    return terrno;
×
1319
  }
1320
}
1321

1322
void streamTaskFreeRefId(int64_t* pRefId) {
37,754✔
1323
  if (pRefId == NULL) {
37,754✔
1324
    return;
2,720✔
1325
  }
1326

1327
  metaRefMgtRemove(pRefId);
35,034✔
1328
}
1329

1330
static int32_t tEncodeStreamNotifyInfo(SEncoder* pEncoder, const SNotifyInfo* info) {
141,002✔
1331
  int32_t code = TSDB_CODE_SUCCESS;
141,002✔
1332
  int32_t lino = 0;
141,002✔
1333

1334
  QUERY_CHECK_NULL(pEncoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
141,002!
1335
  QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
141,002!
1336

1337
  int32_t addrSize = taosArrayGetSize(info->pNotifyAddrUrls);
141,002✔
1338
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
141,012✔
1339
  for (int32_t i = 0; i < addrSize; ++i) {
140,990!
1340
    const char* url = taosArrayGetP(info->pNotifyAddrUrls, i);
×
1341
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, url));
×
1342
  }
1343
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyEventTypes));
281,980!
1344
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyErrorHandle));
281,980!
1345
  if (addrSize > 0) {
140,990!
1346
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->streamName));
×
1347
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->stbFullName));
×
1348
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, info->pSchemaWrapper));
×
1349
  }
1350

1351
_exit:
140,990✔
1352
  if (code != TSDB_CODE_SUCCESS) {
141,012✔
1353
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
22!
1354
  }
1355
  return code;
141,022✔
1356
}
1357

1358
static int32_t tDecodeStreamNotifyInfo(SDecoder* pDecoder, SNotifyInfo* info) {
48,949✔
1359
  int32_t code = TSDB_CODE_SUCCESS;
48,949✔
1360
  int32_t lino = 0;
48,949✔
1361

1362
  QUERY_CHECK_NULL(pDecoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
48,949!
1363
  QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
48,949!
1364

1365
  int32_t addrSize = 0;
48,949✔
1366
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
48,959!
1367
  info->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
48,959✔
1368
  QUERY_CHECK_NULL(info->pNotifyAddrUrls, code, lino, _exit, terrno);
48,960!
1369
  for (int32_t i = 0; i < addrSize; ++i) {
48,960!
1370
    char *url = NULL;
×
1371
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
×
1372
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
×
1373
    QUERY_CHECK_NULL(url, code, lino, _exit, terrno);
×
1374
    if (taosArrayPush(info->pNotifyAddrUrls, &url) == NULL) {
×
1375
      taosMemoryFree(url);
×
1376
      TAOS_CHECK_EXIT(terrno);
×
1377
    }
1378
  }
1379
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyEventTypes));
97,911!
1380
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyErrorHandle));
97,903!
1381
  if (addrSize > 0) {
48,952!
1382
    char* name = NULL;
×
1383
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
×
1384
    info->streamName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
×
1385
    QUERY_CHECK_NULL(info->streamName, code, lino, _exit, terrno);
×
1386
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
×
1387
    info->stbFullName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
×
1388
    QUERY_CHECK_NULL(info->stbFullName, code, lino, _exit, terrno);
×
1389
    info->pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
×
1390
    if (info->pSchemaWrapper == NULL) {
×
1391
      TAOS_CHECK_EXIT(terrno);
×
1392
    }
1393
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, info->pSchemaWrapper));
×
1394
  }
1395

1396
_exit:
48,952✔
1397
  if (code != TSDB_CODE_SUCCESS) {
48,952!
1398
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1399
  }
1400
  return code;
48,952✔
1401
}
1402

1403
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
141,026✔
1404
  int32_t code = 0;
141,026✔
1405
  int32_t lino;
1406

1407
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
141,026!
1408
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
282,072!
1409
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
282,072!
1410
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
282,072!
1411
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
282,072!
1412
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
282,072!
1413
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
282,072!
1414
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
282,072!
1415

1416
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
282,072!
1417
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
282,072!
1418

1419
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
282,072!
1420
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
282,072!
1421
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
141,036!
1422
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
141,038!
1423

1424
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
282,082!
1425
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
282,082!
1426
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
282,082!
1427

1428
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
282,082!
1429
  int32_t taskId = pTask->hTaskInfo.id.taskId;
141,041✔
1430
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
141,041!
1431

1432
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
282,082!
1433
  taskId = pTask->streamTaskId.taskId;
141,041✔
1434
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
141,041!
1435

1436
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
282,082!
1437
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
282,082!
1438
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
282,082!
1439
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
282,082!
1440

1441
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
141,041✔
1442
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
141,039!
1443
  for (int32_t i = 0; i < epSz; i++) {
335,053✔
1444
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
194,031✔
1445
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
194,015!
1446
  }
1447

1448
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
141,022✔
1449
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
149,574!
1450
  }
1451

1452
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
141,022✔
1453
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
138,784!
1454
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
138,784!
1455
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
138,784!
1456
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
71,630✔
1457
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
700!
1458
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
71,280!
1459
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
1460
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
71,280✔
1461
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
22,070!
1462
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
22,070!
1463
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
11,035!
1464
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
60,245✔
1465
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
60,211!
1466
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
120,420!
1467
  }
1468
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
282,038!
1469
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
282,038!
1470
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
282,038!
1471

1472
  if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
141,019!
1473
    TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo));
141,020✔
1474
  }
1475

1476
  tEndEncode(pEncoder);
140,995✔
1477
_exit:
141,043✔
1478
  return code;
141,043✔
1479
}
1480

1481
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
48,963✔
1482
  int32_t taskId = 0;
48,963✔
1483
  int32_t code = 0;
48,963✔
1484
  int32_t lino;
1485

1486
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
48,963!
1487
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
97,939✔
1488
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
48,961!
1489
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
1490
  }
1491

1492
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
97,914!
1493
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
97,908!
1494
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
97,911!
1495
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
97,912!
1496
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
97,908!
1497
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
97,909!
1498

1499
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
97,913!
1500
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
97,912!
1501

1502
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
97,908!
1503
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
97,907!
1504
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
48,955!
1505
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
48,954!
1506

1507
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
97,910!
1508
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
97,910!
1509
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
97,909!
1510

1511
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
97,906!
1512
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
48,946!
1513
  pTask->hTaskInfo.id.taskId = taskId;
48,946✔
1514

1515
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
97,896!
1516
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
48,950!
1517
  pTask->streamTaskId.taskId = taskId;
48,950✔
1518

1519
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
97,900!
1520
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
97,901!
1521
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
97,901!
1522
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
97,901!
1523

1524
  int32_t epSz = -1;
48,951✔
1525
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
48,951!
1526

1527
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
48,951!
1528
    TAOS_CHECK_EXIT(terrno);
×
1529
  }
1530
  for (int32_t i = 0; i < epSz; i++) {
115,905✔
1531
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
66,948!
1532
    if (pInfo == NULL) {
66,950!
1533
      TAOS_CHECK_EXIT(terrno);
×
1534
    }
1535
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
66,950!
1536
      taosMemoryFreeClear(pInfo);
×
1537
      goto _exit;
×
1538
    }
1539
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
133,898!
1540
      TAOS_CHECK_EXIT(terrno);
×
1541
    }
1542
  }
1543

1544
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
48,957✔
1545
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
51,606!
1546
  }
1547

1548
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
48,957✔
1549
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
48,150!
1550
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
24,076!
1551
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
24,070!
1552
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
24,074!
1553
      TAOS_CHECK_EXIT(terrno);
×
1554
    }
1555
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
48,135!
1556
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
24,883✔
1557
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
252!
1558
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
24,757!
1559
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
1560
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
24,757✔
1561
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
7,416!
1562
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
7,416!
1563
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
3,708!
1564
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
21,049✔
1565
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
21,046!
1566
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
21,045!
1567
  }
1568
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
97,897!
1569
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
48,953!
1570
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
97,907!
1571
  }
1572
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
48,952!
1573

1574
  if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
48,955!
1575
    TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo));
48,959!
1576
  }
1577

1578
  tEndDecode(pDecoder);
48,951✔
1579

1580
_exit:
48,960✔
1581
  return code;
48,960✔
1582
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc