• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3594

24 Jan 2025 08:57AM UTC coverage: 63.086% (-0.2%) from 63.239%
#3594

push

travis-ci

web-flow
Merge pull request #29638 from taosdata/docs/TS-5846-3.0

enh: TDengine modify taosBenchmark new query rule cases and add doc

140232 of 285630 branches covered (49.1%)

Branch coverage included in aggregate %.

218398 of 282844 relevant lines covered (77.22%)

18911829.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.84
/source/libs/stream/src/streamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "osDir.h"
18
#include "osMemory.h"
19
#include "streamInt.h"
20
#include "streamsm.h"
21
#include "tmisce.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25
#include "streamMsg.h"
26

27
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
28
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
29
static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
30
static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
31

32
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
13,883✔
33
  int32_t childId = taosArrayGetSize(pArray);
13,883✔
34
  pTask->info.selfChildId = childId;
13,883✔
35
  void* p = taosArrayPush(pArray, &pTask);
13,883✔
36
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
13,883!
37
}
38

39
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
587✔
40
  int32_t code = 0;
587✔
41
  char    buf[512] = {0};
587✔
42

43
  if (pTask->info.nodeId == nodeId) {  // execution task should be moved away
587✔
44
    bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
186✔
45
    code = epsetToStr(pEpSet, buf, tListLen(buf));
186✔
46
    if (code) { // print error and continue
186!
47
      stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
48
      return code;
×
49
    }
50

51
    if (!isEqual) {
186✔
52
      (*pUpdated) = true;
134✔
53
      char tmp[512] = {0};
134✔
54
      code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));  // only for log file, ignore errors
134✔
55
      if (code) { // print error and continue
134!
56
        stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
57
        return code;
×
58
      }
59

60
      epsetAssign(&pTask->info.epSet, pEpSet);
134✔
61
      stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
134✔
62
    } else {
63
      stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", pTask->id.taskId, nodeId, buf);
52!
64
    }
65
  }
66

67
  // check for the dispatch info and the upstream task info
68
  int32_t level = pTask->info.taskLevel;
587✔
69
  if (level == TASK_LEVEL__SOURCE) {
587✔
70
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
282✔
71
  } else if (level == TASK_LEVEL__AGG) {
305✔
72
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
23✔
73
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
23✔
74
  } else {  // TASK_LEVEL__SINK
75
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
282✔
76
  }
77

78
  return code;
587✔
79
}
80

81
static void freeItem(void* p) {
×
82
  SStreamContinueExecInfo* pInfo = p;
×
83
  rpcFreeCont(pInfo->msg.pCont);
×
84
}
×
85

86
static void freeUpstreamItem(void* p) {
86,179✔
87
  SStreamUpstreamEpInfo** pInfo = p;
86,179✔
88
  taosMemoryFree(*pInfo);
86,179!
89
}
86,184✔
90

91
static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
19,932✔
92
  SStreamUpstreamEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamUpstreamEpInfo));
19,932!
93
  if (pEpInfo == NULL) {
19,932!
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
95
    return NULL;
×
96
  }
97

98
  pEpInfo->childId = pTask->info.selfChildId;
19,932✔
99
  pEpInfo->epSet = pTask->info.epSet;
19,932✔
100
  pEpInfo->nodeId = pTask->info.nodeId;
19,932✔
101
  pEpInfo->taskId = pTask->id.taskId;
19,932✔
102
  pEpInfo->stage = -1;
19,932✔
103

104
  return pEpInfo;
19,932✔
105
}
106

107
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int32_t trigger,
13,883✔
108
                       int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
109
                       SStreamTask** p) {
110
  *p = NULL;
13,883✔
111

112
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
13,883!
113
  if (pTask == NULL) {
13,883!
114
    stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId,
×
115
            (int32_t)sizeof(SStreamTask), tstrerror(terrno));
116
    return terrno;
×
117
  }
118

119
  pTask->ver = SSTREAM_TASK_VER;
13,883✔
120
  pTask->id.taskId = tGenIdPI32();
13,883✔
121
  pTask->id.streamId = streamId;
13,883✔
122

123
  pTask->info.taskLevel = taskLevel;
13,883✔
124
  pTask->info.fillHistory = fillHistory;
13,883✔
125
  pTask->info.trigger = trigger;
13,883✔
126
  pTask->info.delaySchedParam = triggerParam;
13,883✔
127
  pTask->subtableWithoutMd5 = subtableWithoutMd5;
13,883✔
128

129
  int32_t code = streamCreateStateMachine(pTask);
13,883✔
130
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
13,883!
131
    taosMemoryFreeClear(pTask);
×
132
    return code;
×
133
  }
134

135
  char    buf[128] = {0};
13,883✔
136
  int32_t ret = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
13,883✔
137
  if (ret < 0 || ret >= tListLen(buf)) {
13,883!
138
    stError("s-task:0x%x failed to set the taskIdstr, code: out of buffer", pTask->id.taskId);
×
139
    return TSDB_CODE_OUT_OF_BUFFER;
×
140
  }
141

142
  pTask->id.idStr = taosStrdup(buf);
13,883!
143
  if (pTask->id.idStr == NULL) {
13,883!
144
    stError("s-task:0x%x failed to build task id, code: out of memory", pTask->id.taskId);
×
145
    return terrno;
×
146
  }
147

148
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
13,883✔
149
  pTask->status.taskStatus = fillHistory ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
13,883✔
150
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
13,883✔
151
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
13,883✔
152

153
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
13,883✔
154
  code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
13,883✔
155
  if (code) {
13,883!
156
    return code;
×
157
  }
158

159
  if (fillHistory && !hasFillhistory) {
13,883!
160
    stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId);
×
161
    return TSDB_CODE_INVALID_PARA;
×
162
  }
163

164
  epsetAssign(&(pTask->info.mnodeEpset), pEpset);
13,883✔
165

166
  code = addToTaskset(pTaskList, pTask);
13,883✔
167
  *p = pTask;
13,883✔
168

169
  return code;
13,883✔
170
}
171

172
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
855✔
173
  int64_t skip64;
174
  int8_t  skip8;
175
  int32_t skip32;
176
  int16_t skip16;
177
  SEpSet  epSet;
178

179
  if (tStartDecode(pDecoder) < 0) return -1;
855!
180
  if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
1,710✔
181
  // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
182

183
  if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
807!
184
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
807!
185
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
807!
186
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
807!
187
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
807!
188
  if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
807!
189

190
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
807!
191
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
807!
192

193
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
807!
194
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
807!
195
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
807!
196
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
807!
197

198
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
1,614!
199
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
1,614!
200

201
  tEndDecode(pDecoder);
807✔
202
  return 0;
807✔
203
}
204

205
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
56✔
206
  int64_t ver;
207
  if (tStartDecode(pDecoder) < 0) return -1;
56!
208
  if (tDecodeI64(pDecoder, &ver) < 0) return -1;
56!
209
  if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
56!
210

211
  if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
112!
212

213
  int32_t taskId = 0;
56✔
214
  if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
56!
215

216
  pTaskId->taskId = taskId;
56✔
217
  tEndDecode(pDecoder);
56✔
218
  return 0;
56✔
219
}
220

221
void tFreeStreamTask(void* pParam) {
62,323✔
222
  char*        p = NULL;
62,323✔
223
  SStreamTask* pTask = pParam;
62,323✔
224
  int32_t      taskId = pTask->id.taskId;
62,323✔
225

226
  STaskExecStatisInfo* pStatis = &pTask->execInfo;
62,323✔
227

228
  ETaskStatus status1 = TASK_STATUS__UNINIT;
62,323✔
229
  streamMutexLock(&pTask->lock);
62,323✔
230
  if (pTask->status.pSM != NULL) {
62,347✔
231
    SStreamTaskState status = streamTaskGetStatus(pTask);
28,563✔
232
    p = status.name;
28,548✔
233
    status1 = status.state;
28,548✔
234
  }
235
  streamMutexUnlock(&pTask->lock);
62,332✔
236

237
  stDebug("start to free s-task:0x%x %p, state:%s, refId:%" PRId64, taskId, pTask, p, pTask->id.refId);
62,340✔
238

239
  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
62,340✔
240
  stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
62,340✔
241
          ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
242
          " nextProcessVer:%" PRId64 ", checkpointCount:%d",
243
          taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs,
244
          pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
245

246
  if (pTask->schedInfo.pDelayTimer != NULL) {
62,340✔
247
    streamTmrStop(pTask->schedInfo.pDelayTimer);
1,287✔
248
    pTask->schedInfo.pDelayTimer = NULL;
1,287✔
249
  }
250

251
  if (pTask->hTaskInfo.pTimer != NULL) {
62,340✔
252
    streamTmrStop(pTask->hTaskInfo.pTimer);
1,731✔
253
    pTask->hTaskInfo.pTimer = NULL;
1,732✔
254
  }
255

256
  if (pTask->msgInfo.pRetryTmr != NULL) {
62,341✔
257
    streamTmrStop(pTask->msgInfo.pRetryTmr);
5,429✔
258
    pTask->msgInfo.pRetryTmr = NULL;
5,429✔
259
  }
260

261
  if (pTask->inputq.queue) {
62,341✔
262
    streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
14,647✔
263
    pTask->inputq.queue = NULL;
14,643✔
264
  }
265

266
  if (pTask->outputq.queue) {
62,337✔
267
    streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
14,644✔
268
    pTask->outputq.queue = NULL;
14,650✔
269
  }
270

271
  if (pTask->exec.qmsg) {
62,343✔
272
    taosMemoryFree(pTask->exec.qmsg);
32,853!
273
  }
274

275
  if (pTask->exec.pExecutor) {
62,343✔
276
    qDestroyTask(pTask->exec.pExecutor);
7,383✔
277
    pTask->exec.pExecutor = NULL;
7,382✔
278
  }
279

280
  if (pTask->exec.pWalReader != NULL) {
62,342✔
281
    walCloseReader(pTask->exec.pWalReader);
7,370✔
282
    pTask->exec.pWalReader = NULL;
7,371✔
283
  }
284

285
  streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
62,343✔
286

287
  if (pTask->msgInfo.pData != NULL) {
62,344✔
288
    clearBufferedDispatchMsg(pTask);
37✔
289
  }
290

291
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
62,345✔
292
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
30,608!
293
    taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
30,611!
294
    tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo);
30,611✔
295
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
30,608✔
296
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
31,737✔
297
    taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
26,893✔
298
  }
299

300
  streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
62,345✔
301
  streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0);
62,345✔
302

303
  if (pTask->pNameMap) {
62,336✔
304
    tSimpleHashCleanup(pTask->pNameMap);
2,338✔
305
  }
306

307
  streamDestroyStateMachine(pTask->status.pSM);
62,336✔
308
  pTask->status.pSM = NULL;
62,345✔
309

310
  streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
62,345✔
311

312
  taosMemoryFree(pTask->outputInfo.pTokenBucket);
62,350!
313
  streamMutexDestroy(&pTask->lock);
62,350✔
314

315
  taosArrayDestroy(pTask->msgInfo.pSendInfo);
62,346✔
316
  pTask->msgInfo.pSendInfo = NULL;
62,346✔
317
  streamMutexDestroy(&pTask->msgInfo.lock);
62,346✔
318

319
  taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
62,348✔
320
  pTask->outputInfo.pNodeEpsetUpdateList = NULL;
62,347✔
321

322
  if (pTask->id.idStr != NULL) {
62,347✔
323
    taosMemoryFree((void*)pTask->id.idStr);
28,529!
324
  }
325

326
  streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
62,347✔
327
  pTask->chkInfo.pActiveInfo = NULL;
62,347✔
328

329
  taosArrayDestroyP(pTask->notifyInfo.pNotifyAddrUrls, NULL);
62,347✔
330
  taosMemoryFreeClear(pTask->notifyInfo.streamName);
62,350!
331
  taosMemoryFreeClear(pTask->notifyInfo.stbFullName);
62,350!
332
  tDeleteSchemaWrapper(pTask->notifyInfo.pSchemaWrapper);
62,350!
333

334
  taosMemoryFree(pTask);
62,344!
335
  stDebug("s-task:0x%x free task completed", taskId);
62,350✔
336
}
62,349✔
337

338
void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
62,338✔
339
  stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId);
62,338✔
340
  if (pTask->pState != NULL) {
62,342✔
341
    stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
7,382✔
342
    streamStateClose(pTask->pState, remove);
7,382✔
343

344
    if (remove) taskDbSetClearFileFlag(pTask->pBackend);
7,383✔
345
    taskDbRemoveRef(pTask->pBackend);
7,383✔
346
    pTask->pBackend = NULL;
7,382✔
347
    pTask->pState = NULL;
7,382✔
348
  } else {
349
    stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
54,960✔
350
            pTask->backendPath ? pTask->backendPath : "NULL");
351
    if (remove) {
54,960✔
352
      if (pTask->backendPath != NULL) {
3,413!
353
        stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
3,414✔
354
        taosRemoveDir(pTask->backendPath);
3,414✔
355
      }
356
    }
357
  }
358

359
  if (pTask->backendPath != NULL) {
62,330✔
360
    taosMemoryFree(pTask->backendPath);
14,644!
361
    pTask->backendPath = NULL;
14,646✔
362
  }
363
}
62,332✔
364

365
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
14,888✔
366
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
14,888✔
367
  SDataRange*      pRange = &pTask->dataRange;
14,888✔
368

369
  // only set the version info for stream tasks without fill-history task
370
  if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
14,888✔
371
    pChkInfo->checkpointVer = ver - 1;  // only update when generating checkpoint
4,716✔
372
    pChkInfo->processedVer = ver - 1;   // already processed version
4,716✔
373
    pChkInfo->nextProcessVer = ver;     // next processed version
4,716✔
374

375
    pRange->range.maxVer = ver;
4,716✔
376
    pRange->range.minVer = ver;
4,716✔
377
  } else {
378
    // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
379
    // is set at the mnode.
380
    if (pTask->info.fillHistory == 1) {
10,172✔
381
      pChkInfo->checkpointVer = pRange->range.maxVer;
5,150✔
382
      pChkInfo->processedVer = pRange->range.maxVer;
5,150✔
383
      pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
5,150✔
384
    } else {
385
      pChkInfo->checkpointVer = pRange->range.minVer - 1;
5,022✔
386
      pChkInfo->processedVer = pRange->range.minVer - 1;
5,022✔
387
      pChkInfo->nextProcessVer = pRange->range.minVer;
5,022✔
388

389
      {  // for compatible purpose, remove it later
390
        if (pRange->range.minVer == 0) {
5,022✔
391
          pChkInfo->checkpointVer = 0;
2,537✔
392
          pChkInfo->processedVer = 0;
2,537✔
393
          pChkInfo->nextProcessVer = 1;
2,537✔
394
          stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
2,537✔
395
        }
396
      }
397
    }
398
  }
399
}
14,888✔
400

401
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
14,901✔
402
  int64_t streamId = 0;
14,901✔
403
  int32_t taskId = 0;
14,901✔
404

405
  if (pTask->info.fillHistory) {
14,901✔
406
    streamId = pTask->streamTaskId.streamId;
5,149✔
407
    taskId = pTask->streamTaskId.taskId;
5,149✔
408
  } else {
409
    streamId = pTask->id.streamId;
9,752✔
410
    taskId = pTask->id.taskId;
9,752✔
411
  }
412

413
  char    id[128] = {0};
14,901✔
414
  int32_t nBytes = snprintf(id, tListLen(id), "0x%" PRIx64 "-0x%x", streamId, taskId);
14,901✔
415
  if (nBytes < 0 || nBytes >= sizeof(id)) {
14,901!
416
    return TSDB_CODE_OUT_OF_BUFFER;
×
417
  }
418

419
  int32_t len = strlen(pTask->pMeta->path);
14,906✔
420
  pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
14,906!
421
  if (pTask->backendPath == NULL) {
14,906!
422
    return terrno;
×
423
  }
424

425
  int32_t code = snprintf(pTask->backendPath, len + nBytes + 2, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
14,906✔
426
  if (code < 0 || code >= len + nBytes + 2) {
14,906✔
427
    stError("s-task:%s failed to set backend path:%s, code: out of buffer", pTask->id.idStr, pTask->backendPath);
3!
428
    return TSDB_CODE_OUT_OF_BUFFER;
×
429
  } else {
430
    stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
14,903✔
431
    return 0;
14,900✔
432
  }
433
}
434

435
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
14,885✔
436
  int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
14,885✔
437
  if (code) {
14,894!
438
    stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
×
439
    return code;
×
440
  }
441

442
  pTask->id.refId = 0;
14,894✔
443
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
14,894✔
444
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
14,894✔
445

446
  int32_t code1 = streamQueueOpen(512 << 10, &pTask->inputq.queue);
14,894✔
447
  int32_t code2 = streamQueueOpen(512 << 10, &pTask->outputq.queue);
14,889✔
448
  if (code1 || code2) {
14,904!
449
    stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
2!
450
    return TSDB_CODE_OUT_OF_MEMORY;
×
451
  }
452

453
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
14,902✔
454

455
  code = streamCreateStateMachine(pTask);
14,902✔
456
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
14,892!
457
    stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
×
458
            tstrerror(code));
459
    return code;
×
460
  }
461

462
  pTask->execInfo.created = taosGetTimestampMs();
14,903✔
463
  setInitialVersionInfo(pTask, ver);
14,903✔
464

465
  pTask->pMeta = pMeta;
14,897✔
466
  pTask->pMsgCb = pMsgCb;
14,897✔
467
  pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry));
14,897✔
468
  if (pTask->msgInfo.pSendInfo == NULL) {
14,896!
469
    stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr);
×
470
    return terrno;
×
471
  }
472

473
  code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
14,896✔
474
  if (code) {
14,895!
475
    stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code));
×
476
    return code;
×
477
  }
478

479
  TdThreadMutexAttr attr = {0};
14,895✔
480
  code = taosThreadMutexAttrInit(&attr);
14,895✔
481
  if (code != 0) {
14,883!
482
    stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
×
483
    return code;
×
484
  }
485

486
  code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
14,883✔
487
  if (code != 0) {
14,879!
488
    stError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
×
489
    return code;
×
490
  }
491

492
  code = taosThreadMutexInit(&pTask->lock, &attr);
14,879✔
493
  if (code) {
14,897!
494
    return code;
×
495
  }
496

497
  code = taosThreadMutexAttrDestroy(&attr);
14,897✔
498
  if (code) {
14,879!
499
    return code;
×
500
  }
501

502
  streamTaskOpenAllUpstreamInput(pTask);
14,879✔
503

504
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
14,877✔
505
  pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
14,877!
506
  if (pOutputInfo->pTokenBucket == NULL) {
14,903!
507
    stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno));
×
508
    return terrno;
×
509
  }
510

511
  // 2MiB per second for sink task
512
  // 50 times sink operator per second
513
  code = streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
14,903✔
514
  if (code) {
14,904!
515
    return code;
×
516
  }
517

518
  pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
14,904✔
519
  if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
14,904!
520
    stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno));
×
521
    return terrno;
×
522
  }
523

524
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
14,904✔
525
  if (pTask->taskCheckInfo.pList == NULL) {
14,897!
526
    stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno));
×
527
    return terrno;
×
528
  }
529

530
  if (pTask->chkInfo.pActiveInfo == NULL) {
14,897✔
531
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
14,893✔
532
    if (code) {
14,901!
533
      stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code));
×
534
      return code;
×
535
    }
536
  }
537

538
  return streamTaskSetBackendPath(pTask);
14,905✔
539
}
540

541
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
115,408✔
542
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
115,408✔
543
    return 0;
6,819✔
544
  }
545

546
  int32_t type = pTask->outputInfo.type;
108,589✔
547
  if (type == TASK_OUTPUT__TABLE) {
108,589✔
548
    return 0;
271✔
549
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
108,318✔
550
    return 1;
10,957✔
551
  } else {
552
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
97,361✔
553
    return taosArrayGetSize(vgInfo);
97,361✔
554
  }
555
}
556

557
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) { return taosArrayGetSize(pTask->upstreamInfo.pList); }
19,288✔
558

559
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
19,932✔
560
  SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
19,932✔
561
  if (pEpInfo == NULL) {
19,932!
562
    return terrno;
×
563
  }
564

565
  if (pTask->upstreamInfo.pList == NULL) {
19,932✔
566
    pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES);
6,890✔
567
  }
568

569
  void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
19,932✔
570
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
19,932!
571
}
572

573
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
305✔
574
  int32_t code = 0;
305✔
575
  char    buf[512] = {0};
305✔
576
  code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore error since it is only for log file.
305✔
577
  if (code != 0) {  // print error and continue
305!
578
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
579
    return code;
×
580
  }
581

582
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
305✔
583
  for (int32_t i = 0; i < numOfUpstream; ++i) {
597✔
584
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
494✔
585
    if (pInfo->nodeId == nodeId) {
494✔
586
      bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
202✔
587
      if (!equal) {
202✔
588
        *pUpdated = true;
143✔
589

590
        char tmp[512] = {0};
143✔
591
        code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
143✔
592
        if (code != 0) {  // print error and continue
143!
593
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
594
          return code;
×
595
        }
596

597
        epsetAssign(&pInfo->epSet, pEpSet);
143✔
598
        stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
143✔
599
                pInfo->taskId, nodeId, buf, tmp);
600
      } else {
601
        stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId,
59!
602
                pInfo->taskId, nodeId, buf);
603
      }
604

605
      break;
202✔
606
    }
607
  }
608

609
  return code;
305✔
610
}
611

612
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
62,332✔
613
  if (pUpstreamInfo->pList != NULL) {
62,332✔
614
    taosArrayDestroyEx(pUpstreamInfo->pList, freeUpstreamItem);
55,305✔
615
    pUpstreamInfo->numOfClosed = 0;
55,316✔
616
    pUpstreamInfo->pList = NULL;
55,316✔
617
  }
618
}
62,343✔
619

620
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
940✔
621
  STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
940✔
622
  pDispatcher->taskId = pDownstreamTask->id.taskId;
940✔
623
  pDispatcher->nodeId = pDownstreamTask->info.nodeId;
940✔
624
  pDispatcher->epSet = pDownstreamTask->info.epSet;
940✔
625

626
  pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
940✔
627
  pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
940✔
628
}
940✔
629

630
int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
305✔
631
  char    buf[512] = {0};
305✔
632
  int32_t code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore the error since only for log files.
305✔
633
  if (code != 0) {                                        // print error and continue
305!
634
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
635
    return code;
×
636
  }
637

638
  int32_t id = pTask->id.taskId;
305✔
639
  int8_t  type = pTask->outputInfo.type;
305✔
640

641
  if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
305✔
642
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
200✔
643

644
    for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
391✔
645
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
389✔
646
      if (pVgInfo == NULL) {
389!
647
        continue;
×
648
      }
649

650
      if (pVgInfo->vgId == nodeId) {
389✔
651
        bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
198✔
652
        if (!isEqual) {
198✔
653
          *pUpdated = true;
143✔
654

655
          char tmp[512] = {0};
143✔
656
          code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
143✔
657
          if (code != 0) {  // print error and continue
143!
658
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
659
            return code;
×
660
          }
661

662
          epsetAssign(&pVgInfo->epSet, pEpSet);
143✔
663
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
143✔
664
                  nodeId, buf, tmp);
665
        } else {
666
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
55!
667
                  pVgInfo->taskId, nodeId, buf);
668
        }
669
        break;
198✔
670
      }
671
    }
672
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
105!
673
    STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
105✔
674
    if (pDispatcher->nodeId == nodeId) {
105✔
675
      bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet);
4✔
676
      if (!equal) {
4!
677
        *pUpdated = true;
×
678

679
        char tmp[512] = {0};
×
680
        code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
×
681
        if (code != 0) {  // print error and continue
×
682
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
683
          return code;
×
684
        }
685

686
        epsetAssign(&pDispatcher->epSet, pEpSet);
×
687
        stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
×
688
                nodeId, buf, tmp);
689
      } else {
690
        stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
4!
691
                pDispatcher->taskId, nodeId, buf);
692
      }
693
    }
694
  }
695

696
  return code;
305✔
697
}
698

699
int32_t streamTaskStop(SStreamTask* pTask) {
3,226✔
700
  int32_t     vgId = pTask->pMeta->vgId;
3,226✔
701
  int64_t     st = taosGetTimestampMs();
3,226✔
702
  const char* id = pTask->id.idStr;
3,226✔
703

704
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
3,226✔
705
  if (code) {
3,226!
706
    stError("failed to handle STOP event, s-task:%s, code:%s", id, tstrerror(code));
×
707
    return code;
×
708
  }
709

710
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
3,226✔
711
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
1,591✔
712
    if (code != TSDB_CODE_SUCCESS) {
1,591!
713
      stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
×
714
    }
715
  }
716

717
  while (!streamTaskIsIdle(pTask)) {
3,225!
718
    stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
×
719
            pTask->info.taskLevel);
720
    taosMsleep(100);
×
721
  }
722

723
  int64_t el = taosGetTimestampMs() - st;
3,226✔
724
  stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
3,226✔
725
  return code;
3,226✔
726
}
727

728
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
307✔
729
  STaskExecStatisInfo* p = &pTask->execInfo;
307✔
730

731
  int32_t numOfNodes = taosArrayGetSize(pNodeList);
307✔
732
  int64_t prevTs = p->latestUpdateTs;
307✔
733

734
  p->latestUpdateTs = taosGetTimestampMs();
307✔
735
  p->updateCount += 1;
307✔
736
  stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
307✔
737
          numOfNodes, p->updateCount, prevTs);
738

739
  bool updated = false;
307✔
740
  for (int32_t i = 0; i < numOfNodes; ++i) {
894✔
741
    SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
587✔
742
    if (pInfo == NULL) {
587!
743
      continue;
×
744
    }
745

746
    int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
587✔
747
    if (code) {
587!
748
      stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
×
749
    }
750
  }
751

752
  return updated;
307✔
753
}
754

755
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
14,899✔
756
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
14,899✔
757
    return;
7,497✔
758
  }
759

760
  int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList);
7,402✔
761
  for (int32_t i = 0; i < size; ++i) {
28,545✔
762
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
21,141✔
763
    pInfo->stage = -1;
21,144✔
764
  }
765

766
  stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
7,404✔
767
}
768

769
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
24,715✔
770
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
24,715✔
771
  if (num == 0) {
24,716✔
772
    return;
12,363✔
773
  }
774

775
  for (int32_t i = 0; i < num; ++i) {
47,494✔
776
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
35,134✔
777
    pInfo->dataAllowed = true;
35,141✔
778
  }
779

780
  pTask->upstreamInfo.numOfClosed = 0;
12,360✔
781
  stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num);
12,360✔
782
}
783

784
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
8,125✔
785
  SStreamUpstreamEpInfo* pInfo = NULL;
8,125✔
786
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
8,125✔
787

788
  if ((pInfo != NULL) && pInfo->dataAllowed) {
8,119!
789
    pInfo->dataAllowed = false;
8,119✔
790
    if (pTask->upstreamInfo.numOfClosed < streamTaskGetNumOfUpstream(pTask)) {
8,119!
791
      int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
8,122✔
792
    } else {
793
      stError("s-task:%s not inc closed input, since they have been all closed already", pTask->id.idStr);
×
794
    }
795
  }
796
}
8,151✔
797

798
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
1✔
799
  SStreamUpstreamEpInfo* pInfo = NULL;
1✔
800
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
1✔
801

802
  if (pInfo != NULL && (!pInfo->dataAllowed)) {
1!
803
    int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
1✔
804
    stDebug("s-task:%s open inputQ for upstream:0x%x, remain closed:%d", pTask->id.idStr, taskId, t);
1!
805
    pInfo->dataAllowed = true;
1✔
806
  }
807
}
1✔
808

809
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
×
810
  return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
×
811
}
812

813
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
100,256✔
814
  bool ret = false;
100,256✔
815

816
  streamMutexLock(&pTask->lock);
100,256✔
817
  if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
100,276✔
818
    pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
68,066✔
819
    ret = true;
68,066✔
820
  }
821

822
  streamMutexUnlock(&pTask->lock);
100,276✔
823
  return ret;
100,276✔
824
}
825

826
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
66,843✔
827
  streamMutexLock(&pTask->lock);
66,843✔
828
  int8_t status = pTask->status.schedStatus;
66,880✔
829
  if (status == TASK_SCHED_STATUS__WAITING) {
66,880✔
830
    pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
66,861✔
831
  }
832
  streamMutexUnlock(&pTask->lock);
66,880✔
833

834
  return status;
66,888✔
835
}
836

837
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
1,089✔
838
  streamMutexLock(&pTask->lock);
1,089✔
839
  int8_t status = pTask->status.schedStatus;
1,089✔
840
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
1,089✔
841
  streamMutexUnlock(&pTask->lock);
1,089✔
842

843
  return status;
1,089✔
844
}
845

846
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
6,972✔
847
  int32_t      code = 0;
6,972✔
848
  SStreamMeta* pMeta = pTask->pMeta;
6,972✔
849
  SStreamTask* pStreamTask = NULL;
6,972✔
850

851
  if (pTask->info.fillHistory == 0) {
6,972!
852
    return code;
6,974✔
853
  }
854

855
  code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
×
856
  if (code == 0) {
6!
857
    stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
×
858
            (int32_t)pTask->streamTaskId.taskId);
859

860
    streamMutexLock(&(pStreamTask->lock));
×
861
    CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
×
862

863
    if (resetRelHalt) {
×
864
      stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
×
865
              pTask->streamTaskId.taskId, streamTaskGetStatusStr(pStreamTask->status.taskStatus),
866
              streamTaskGetStatus(pStreamTask).name);
867
      pStreamTask->status.taskStatus = TASK_STATUS__READY;
×
868
    }
869

870
    code = streamMetaSaveTask(pMeta, pStreamTask);
×
871
    streamMutexUnlock(&(pStreamTask->lock));
×
872

873
    streamMetaReleaseTask(pMeta, pStreamTask);
×
874
  }
875

876
  return code;
6✔
877
}
878

879
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
6✔
880
  SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
6✔
881
  if (pReq == NULL) {
6!
882
    return terrno;
×
883
  }
884

885
  pReq->head.vgId = vgId;
6✔
886
  pReq->taskId = pTaskId->taskId;
6✔
887
  pReq->streamId = pTaskId->streamId;
6✔
888
  pReq->resetRelHalt = resetRelHalt;
6✔
889

890
  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
6✔
891
  int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
6✔
892
  if (code != TSDB_CODE_SUCCESS) {
6!
893
    stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
×
894
  } else {
895
    stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
6!
896
  }
897

898
  return code;
6✔
899
}
900

901
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
6,108✔
902
  int32_t                code = 0;
6,108✔
903
  int32_t                tlen = 0;
6,108✔
904
  int32_t                vgId = pTask->pMeta->vgId;
6,108✔
905
  const char*            id = pTask->id.idStr;
6,108✔
906
  SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo;
6,108✔
907

908
  SCheckpointReport req = {.streamId = pTask->id.streamId,
6,108✔
909
                           .taskId = pTask->id.taskId,
6,108✔
910
                           .nodeId = vgId,
911
                           .dropHTask = dropRelHTask,
912
                           .transId = pActive->transId,
6,108✔
913
                           .checkpointId = pActive->activeId,
6,108✔
914
                           .checkpointVer = pCheckpointInfo->processedVer,
6,108✔
915
                           .checkpointTs = pCheckpointInfo->startTs};
6,108✔
916

917
  tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code);
6,108!
918
  if (code < 0) {
6,107!
919
    stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code));
×
920
    return -1;
×
921
  }
922

923
  void* buf = rpcMallocCont(tlen);
6,107✔
924
  if (buf == NULL) {
6,108!
925
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId,
×
926
            tstrerror(TSDB_CODE_OUT_OF_MEMORY));
927
    return -1;
×
928
  }
929

930
  SEncoder encoder;
931
  tEncoderInit(&encoder, buf, tlen);
6,108✔
932
  if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
6,107!
933
    rpcFreeCont(buf);
×
934
    tEncoderClear(&encoder);
×
935
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
×
936
    return -1;
×
937
  }
938
  tEncoderClear(&encoder);
6,108✔
939

940
  SRpcMsg msg = {0};
6,109✔
941
  initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen);
6,109✔
942
  stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId);
6,109✔
943

944
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
6,109✔
945
}
946

947
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
78,020✔
948
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
78,020✔
949
  return id;
78,020✔
950
}
951

952
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
1,782✔
953
  pInfo->waitInterval = LAUNCH_HTASK_INTERVAL;
1,782✔
954
  pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL);
1,782✔
955
  pInfo->retryTimes = 0;
1,782✔
956
}
1,782✔
957

958
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
1,776✔
959
  pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
1,776✔
960
  pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
1,776✔
961
  pInfo->retryTimes += 1;
1,776✔
962
}
1,776✔
963

964
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) {
9,189✔
965
  pEntry->id.streamId = pTask->id.streamId;
9,189✔
966
  pEntry->id.taskId = pTask->id.taskId;
9,189✔
967
  pEntry->stage = -1;
9,189✔
968
  pEntry->nodeId = pTask->info.nodeId;
9,189✔
969
  pEntry->status = TASK_STATUS__STOP;
9,189✔
970
}
9,189✔
971

972
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) {
61,271✔
973
  pDst->stage = pSrc->stage;
61,271✔
974
  pDst->inputQUsed = pSrc->inputQUsed;
61,271✔
975
  pDst->inputRate = pSrc->inputRate;
61,271✔
976
  pDst->procsTotal = pSrc->procsTotal;
61,271✔
977
  pDst->procsThroughput = pSrc->procsThroughput;
61,271✔
978
  pDst->outputTotal = pSrc->outputTotal;
61,271✔
979
  pDst->outputThroughput = pSrc->outputThroughput;
61,271✔
980
  pDst->processedVer = pSrc->processedVer;
61,271✔
981
  pDst->verRange = pSrc->verRange;
61,271✔
982
  pDst->sinkQuota = pSrc->sinkQuota;
61,271✔
983
  pDst->sinkDataSize = pSrc->sinkDataSize;
61,271✔
984
  pDst->checkpointInfo = pSrc->checkpointInfo;
61,271✔
985
  pDst->startCheckpointId = pSrc->startCheckpointId;
61,271✔
986
  pDst->startCheckpointVer = pSrc->startCheckpointVer;
61,271✔
987
  pDst->status = pSrc->status;
61,271✔
988

989
  pDst->startTime = pSrc->startTime;
61,271✔
990
  pDst->hTaskId = pSrc->hTaskId;
61,271✔
991
}
61,271✔
992

993
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
62,010✔
994
  SStreamMeta*         pMeta = pTask->pMeta;
62,010✔
995
  STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
62,010✔
996

997
  STaskStatusEntry entry = {
186,030✔
998
      .id = streamTaskGetTaskId(pTask),
62,010✔
999
      .status = streamTaskGetStatus(pTask).state,
62,010✔
1000
      .nodeId = pMeta->vgId,
62,010✔
1001
      .stage = pMeta->stage,
62,010✔
1002

1003
      .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)),
62,010✔
1004
      .startTime = pExecInfo->readyTs,
62,010✔
1005
      .checkpointInfo.latestId = pTask->chkInfo.checkpointId,
62,010✔
1006
      .checkpointInfo.latestVer = pTask->chkInfo.checkpointVer,
62,010✔
1007
      .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
62,010✔
1008
      .checkpointInfo.latestSize = 0,
1009
      .checkpointInfo.remoteBackup = 0,
1010
      .checkpointInfo.consensusChkptId = 0,
1011
      .checkpointInfo.consensusTs = 0,
1012
      .hTaskId = pTask->hTaskInfo.id.taskId,
62,010✔
1013
      .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
62,010✔
1014
      .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
62,010✔
1015
      .procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput),
62,010✔
1016
      .outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput),
62,010✔
1017
      .startCheckpointId = pExecInfo->startCheckpointId,
62,010✔
1018
      .startCheckpointVer = pExecInfo->startCheckpointVer,
62,010✔
1019
  };
1020
  return entry;
62,010✔
1021
}
1022

1023
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
1,413✔
1024
  SStreamMeta* pMeta = pTask->pMeta;
1,413✔
1025
  int32_t      code = 0;
1,413✔
1026

1027
  int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
1,413✔
1028
  stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
1,413!
1029

1030
  // in case of fill-history task, stop the tsdb file scan operation.
1031
  if (pTask->info.fillHistory == 1) {
1,413✔
1032
    void* pExecutor = pTask->exec.pExecutor;
54✔
1033
    code = qKillTask(pExecutor, TSDB_CODE_SUCCESS);
54✔
1034
  }
1035

1036
  stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
1,413✔
1037
  return code;
1,413✔
1038
}
1039

1040
void streamTaskPause(SStreamTask* pTask) {
1,468✔
1041
  int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
1,468✔
1042
  if (code) {
1,468!
1043
    stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code));
×
1044
  }
1045
}
1,468✔
1046

1047
void streamTaskResume(SStreamTask* pTask) {
2,654✔
1048
  SStreamTaskState prevState = streamTaskGetStatus(pTask);
2,654✔
1049

1050
  SStreamMeta* pMeta = pTask->pMeta;
2,654✔
1051
  int32_t      code = streamTaskRestoreStatus(pTask);
2,654✔
1052
  if (code == TSDB_CODE_SUCCESS) {
2,655✔
1053
    char*   pNew = streamTaskGetStatus(pTask).name;
1,396✔
1054
    int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
1,396✔
1055
    stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
1,396!
1056
  } else {
1057
    stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name,
1,259!
1058
           pMeta->numOfPausedTasks);
1059
  }
1060
}
2,655✔
1061

1062
bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
75,231✔
1063

1064
// this task must success
1065
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
4,461✔
1066
  int32_t     code;
1067
  int32_t     tlen = 0;
4,461✔
1068
  int32_t     vgId = pTask->pMeta->vgId;
4,461✔
1069
  const char* id = pTask->id.idStr;
4,461✔
1070

1071
  SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
4,461✔
1072
  tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
4,461!
1073
  if (code < 0) {
4,459!
1074
    stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
×
1075
    return TSDB_CODE_INVALID_MSG;
×
1076
  }
1077

1078
  void* buf = rpcMallocCont(tlen);
4,459✔
1079
  if (buf == NULL) {
4,464!
1080
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:Out of memory", id, vgId);
×
1081
    return terrno;
×
1082
  }
1083

1084
  SEncoder encoder;
1085
  tEncoderInit(&encoder, buf, tlen);
4,464✔
1086
  if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
4,463!
1087
    rpcFreeCont(buf);
×
1088
    tEncoderClear(&encoder);
×
1089
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
×
1090
    return code;
×
1091
  }
1092

1093
  tEncoderClear(&encoder);
4,460✔
1094

1095
  SRpcMsg msg = {0};
4,462✔
1096
  initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
4,462✔
1097
  stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
4,461✔
1098

1099
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
4,461✔
1100
}
1101

1102
void streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo) {
89,777✔
1103
  *pEpInfo = NULL;
89,777✔
1104

1105
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
89,777✔
1106
  for (int32_t i = 0; i < num; ++i) {
179,399✔
1107
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
179,372✔
1108
    if (pInfo == NULL) {
179,299!
1109
      return;
×
1110
    }
1111

1112
    if (pInfo->taskId == taskId) {
179,299✔
1113
      *pEpInfo = pInfo;
89,713✔
1114
      return;
89,713✔
1115
    }
1116
  }
1117

1118
  stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
27!
1119
}
1120

1121
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
×
1122
  if (pTask->info.taskLevel == TASK_OUTPUT__FIXED_DISPATCH) {
×
1123
    if (pTask->outputInfo.fixedDispatcher.taskId == taskId) {
×
1124
      return &pTask->outputInfo.fixedDispatcher.epSet;
×
1125
    }
1126
  } else if (pTask->info.taskLevel == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
1127
    SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
1128
    for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
1129
      SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
×
1130
      if (pVgInfo == NULL) {
×
1131
        continue;
×
1132
      }
1133

1134
      if (pVgInfo->taskId == taskId) {
×
1135
        return &pVgInfo->epSet;
×
1136
      }
1137
    }
1138
  }
1139

1140
  return NULL;
×
1141
}
1142

1143
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
14,884✔
1144
  char buf[128] = {0};
14,884✔
1145
  int32_t code = snprintf(buf, tListLen(buf),"0x%" PRIx64 "-0x%x", streamId, taskId);
14,884✔
1146
  if (code < 0 || code >= tListLen(buf)) {
14,884!
1147
    return TSDB_CODE_OUT_OF_BUFFER;
×
1148
  }
1149

1150
  *pId = taosStrdup(buf);
14,901!
1151

1152
  if (*pId == NULL) {
14,885!
1153
    return terrno;
×
1154
  } else {
1155
    return TSDB_CODE_SUCCESS;
14,885✔
1156
  }
1157
}
1158

1159
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
561✔
1160
  int32_t           code;
1161
  SStreamDataBlock* pData;
1162

1163
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock), (void**)&pData);
561✔
1164
  if (code) {
562!
1165
    stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
×
1166
    return terrno = code;
×
1167
  }
1168

1169
  pData->type = STREAM_INPUT__DATA_RETRIEVE;
562✔
1170
  pData->srcVgId = 0;
562✔
1171

1172
  code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
562✔
1173
  if (code != TSDB_CODE_SUCCESS) {
561!
1174
    stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code));
×
1175
    taosFreeQitem(pData);
×
1176
    return code;
×
1177
  }
1178

1179
  code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
561✔
1180
  if (code != TSDB_CODE_SUCCESS) {
562!
1181
    stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
×
1182
            pTask->id.idStr);
1183
  }
1184

1185
  return code;
562✔
1186
}
1187

1188
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
561✔
1189
  int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
561✔
1190
  if (code != 0) {
562!
1191
    return code;
×
1192
  }
1193
  return streamTrySchedExec(pTask);
562✔
1194
}
1195

1196
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
6,975✔
1197

1198
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
×
1199
  if (pTransId != NULL) {
×
1200
    *pTransId = pTask->chkInfo.pActiveInfo->transId;
×
1201
  }
1202

1203
  if (pCheckpointId != NULL) {
×
1204
    *pCheckpointId = pTask->chkInfo.pActiveInfo->activeId;
×
1205
  }
1206
}
×
1207

1208
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) {
28✔
1209
  pTask->chkInfo.pActiveInfo->activeId = activeCheckpointId;
28✔
1210
  return TSDB_CODE_SUCCESS;
28✔
1211
}
1212

1213
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
×
1214
  pTask->chkInfo.pActiveInfo->transId = transId;
×
1215
  pTask->chkInfo.pActiveInfo->activeId = checkpointId;
×
1216
  pTask->chkInfo.pActiveInfo->failedId = checkpointId;
×
1217
  stDebug("s-task:%s set failed checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
×
1218
}
×
1219

1220
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
14,931✔
1221
  SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo));
14,931!
1222
  if (pInfo == NULL) {
14,946!
1223
    return terrno;
×
1224
  }
1225

1226
  int32_t code = taosThreadMutexInit(&pInfo->lock, NULL);
14,946✔
1227
  if (code != TSDB_CODE_SUCCESS) {
14,946!
1228
    return code;
×
1229
  }
1230

1231
  pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
14,946✔
1232
  pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
14,946✔
1233
  pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
14,943✔
1234

1235
  *pRes = pInfo;
14,945✔
1236
  return code;
14,945✔
1237
}
1238

1239
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
62,337✔
1240
  if (pInfo == NULL) {
62,337✔
1241
    return;
47,658✔
1242
  }
1243

1244
  streamMutexDestroy(&pInfo->lock);
14,679✔
1245
  taosArrayDestroy(pInfo->pDispatchTriggerList);
14,688✔
1246
  pInfo->pDispatchTriggerList = NULL;
14,691✔
1247
  taosArrayDestroy(pInfo->pReadyMsgList);
14,691✔
1248
  pInfo->pReadyMsgList = NULL;
14,686✔
1249
  taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
14,686✔
1250
  pInfo->pCheckpointReadyRecvList = NULL;
14,692✔
1251

1252
  SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
14,692✔
1253
  if (pTriggerTmr->tmrHandle != NULL) {
14,692✔
1254
    streamTmrStop(pTriggerTmr->tmrHandle);
2,136✔
1255
    pTriggerTmr->tmrHandle = NULL;
2,136✔
1256
  }
1257

1258
  SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
14,692✔
1259
  if (pReadyTmr->tmrHandle != NULL) {
14,692✔
1260
    streamTmrStop(pReadyTmr->tmrHandle);
2,118✔
1261
    pReadyTmr->tmrHandle = NULL;
2,118✔
1262
  }
1263

1264
  taosMemoryFree(pInfo);
14,692!
1265
}
1266

1267
// NOTE: clear the checkpoint id, and keep the failed id
1268
// failedId for a task will increase as the checkpoint I.D. increases.
1269
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
5,281✔
1270
  pInfo->activeId = 0;
5,281✔
1271
  pInfo->transId = 0;
5,281✔
1272
  pInfo->allUpstreamTriggerRecv = 0;
5,281✔
1273
  pInfo->dispatchTrigger = false;
5,281✔
1274

1275
  taosArrayClear(pInfo->pDispatchTriggerList);
5,281✔
1276
  taosArrayClear(pInfo->pCheckpointReadyRecvList);
5,276✔
1277
}
5,274✔
1278

1279
const char* streamTaskGetExecType(int32_t type) {
98,130✔
1280
  switch (type) {
98,130!
1281
    case STREAM_EXEC_T_EXTRACT_WAL_DATA:
36,738✔
1282
      return "scan-wal-file";
36,738✔
1283
    case STREAM_EXEC_T_START_ALL_TASKS:
7,484✔
1284
      return "start-all-tasks";
7,484✔
1285
    case STREAM_EXEC_T_START_ONE_TASK:
5,724✔
1286
      return "start-one-task";
5,724✔
1287
    case STREAM_EXEC_T_RESTART_ALL_TASKS:
54✔
1288
      return "restart-all-tasks";
54✔
1289
    case STREAM_EXEC_T_STOP_ALL_TASKS:
4,648✔
1290
      return "stop-all-tasks";
4,648✔
1291
    case STREAM_EXEC_T_RESUME_TASK:
6,584✔
1292
      return "resume-task-from-idle";
6,584✔
1293
    case STREAM_EXEC_T_ADD_FAILED_TASK:
3✔
1294
      return "record-start-failed-task";
3✔
1295
    case 0:
36,960✔
1296
      return "exec-all-tasks";
36,960✔
1297
    default:
×
1298
      return "invalid-exec-type";
×
1299
  }
1300
}
1301

1302
int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) {
39,805✔
1303
  *pRefId = taosMemoryMalloc(sizeof(int64_t));
39,805!
1304
  if (*pRefId != NULL) {
39,801!
1305
    **pRefId = pTask->id.refId;
39,801✔
1306
    int32_t code = metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
39,801✔
1307
    if (code != 0) {
39,815!
1308
      stError("s-task:%s failed to add refId:%" PRId64 " into refId-mgmt, code:%s", pTask->id.idStr, pTask->id.refId,
×
1309
              tstrerror(code));
1310
    }
1311
    return code;
39,815✔
1312
  } else {
1313
    stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno));
×
1314
    return terrno;
×
1315
  }
1316
}
1317

1318
void streamTaskFreeRefId(int64_t* pRefId) {
37,615✔
1319
  if (pRefId == NULL) {
37,615✔
1320
    return;
2,959✔
1321
  }
1322

1323
  metaRefMgtRemove(pRefId);
34,656✔
1324
}
1325

1326
static int32_t tEncodeStreamNotifyInfo(SEncoder* pEncoder, const SNotifyInfo* info) {
139,501✔
1327
  int32_t code = TSDB_CODE_SUCCESS;
139,501✔
1328
  int32_t lino = 0;
139,501✔
1329

1330
  QUERY_CHECK_NULL(pEncoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
139,501!
1331
  QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
139,501!
1332

1333
  int32_t addrSize = taosArrayGetSize(info->pNotifyAddrUrls);
139,501✔
1334
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
139,504✔
1335
  for (int32_t i = 0; i < addrSize; ++i) {
139,482!
1336
    const char* url = taosArrayGetP(info->pNotifyAddrUrls, i);
×
1337
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, url));
×
1338
  }
1339
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyEventTypes));
278,964!
1340
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyErrorHandle));
278,964!
1341
  if (addrSize > 0) {
139,482!
1342
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->streamName));
×
1343
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->stbFullName));
×
1344
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, info->pSchemaWrapper));
×
1345
  }
1346

1347
_exit:
139,482✔
1348
  if (code != TSDB_CODE_SUCCESS) {
139,504✔
1349
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
22!
1350
  }
1351
  return code;
139,511✔
1352
}
1353

1354
static int32_t tDecodeStreamNotifyInfo(SDecoder* pDecoder, SNotifyInfo* info) {
48,665✔
1355
  int32_t code = TSDB_CODE_SUCCESS;
48,665✔
1356
  int32_t lino = 0;
48,665✔
1357

1358
  QUERY_CHECK_NULL(pDecoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
48,665!
1359
  QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
48,665!
1360

1361
  int32_t addrSize = 0;
48,665✔
1362
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
48,677!
1363
  info->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
48,677✔
1364
  QUERY_CHECK_NULL(info->pNotifyAddrUrls, code, lino, _exit, terrno);
48,682✔
1365
  for (int32_t i = 0; i < addrSize; ++i) {
48,681!
1366
    char *url = NULL;
×
1367
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
×
1368
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
×
1369
    QUERY_CHECK_NULL(url, code, lino, _exit, terrno);
×
1370
    if (taosArrayPush(info->pNotifyAddrUrls, &url) == NULL) {
×
1371
      taosMemoryFree(url);
×
1372
      TAOS_CHECK_EXIT(terrno);
×
1373
    }
1374
  }
1375
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyEventTypes));
97,354!
1376
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyErrorHandle));
97,354!
1377
  if (addrSize > 0) {
48,681!
1378
    char* name = NULL;
×
1379
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
×
1380
    info->streamName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
×
1381
    QUERY_CHECK_NULL(info->streamName, code, lino, _exit, terrno);
×
1382
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
×
1383
    info->stbFullName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
×
1384
    QUERY_CHECK_NULL(info->stbFullName, code, lino, _exit, terrno);
×
1385
    info->pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
×
1386
    if (info->pSchemaWrapper == NULL) {
×
1387
      TAOS_CHECK_EXIT(terrno);
×
1388
    }
1389
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, info->pSchemaWrapper));
×
1390
  }
1391

1392
_exit:
48,681✔
1393
  if (code != TSDB_CODE_SUCCESS) {
48,681!
1394
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1395
  }
1396
  return code;
48,672✔
1397
}
1398

1399
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
139,527✔
1400
  int32_t code = 0;
139,527✔
1401
  int32_t lino;
1402

1403
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
139,527!
1404
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
279,062!
1405
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
279,062!
1406
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
279,062!
1407
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
279,062!
1408
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
279,062!
1409
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
279,062!
1410
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
279,062!
1411

1412
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
279,062!
1413
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
279,062!
1414

1415
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
279,062!
1416
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
279,062!
1417
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
139,531!
1418
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
139,538!
1419

1420
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
279,072!
1421
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
279,072!
1422
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
279,072!
1423

1424
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
279,072!
1425
  int32_t taskId = pTask->hTaskInfo.id.taskId;
139,536✔
1426
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
139,536!
1427

1428
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
279,072!
1429
  taskId = pTask->streamTaskId.taskId;
139,536✔
1430
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
139,536!
1431

1432
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
279,072!
1433
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
279,072!
1434
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
279,072!
1435
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
279,072!
1436

1437
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
139,536✔
1438
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
139,535!
1439
  for (int32_t i = 0; i < epSz; i++) {
331,590✔
1440
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
192,064✔
1441
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
192,042!
1442
  }
1443

1444
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
139,526✔
1445
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
148,052!
1446
  }
1447

1448
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
139,526✔
1449
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
137,310!
1450
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
137,310!
1451
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
137,310!
1452
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
70,871✔
1453
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
704!
1454
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
70,519!
1455
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
1456
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
70,519✔
1457
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
21,948!
1458
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
21,948!
1459
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
10,974!
1460
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
59,545✔
1461
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
59,503!
1462
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
119,006!
1463
  }
1464
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
279,052!
1465
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
279,052!
1466
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
279,052!
1467

1468
  if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
139,526✔
1469
    TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo));
139,515✔
1470
  }
1471

1472
  tEndEncode(pEncoder);
139,501✔
1473
_exit:
139,531✔
1474
  return code;
139,531✔
1475
}
1476

1477
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
48,685✔
1478
  int32_t taskId = 0;
48,685✔
1479
  int32_t code = 0;
48,685✔
1480
  int32_t lino;
1481

1482
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
48,685!
1483
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
97,378✔
1484
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
48,679!
1485
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
1486
  }
1487

1488
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
97,348!
1489
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
97,346!
1490
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
97,351!
1491
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
97,349!
1492
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
97,350!
1493
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
97,351!
1494

1495
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
97,352!
1496
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
97,349!
1497

1498
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
97,337!
1499
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
97,339!
1500
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
48,675!
1501
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
48,681!
1502

1503
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
97,357!
1504
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
97,356!
1505
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
97,354!
1506

1507
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
97,351!
1508
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
48,663!
1509
  pTask->hTaskInfo.id.taskId = taskId;
48,663✔
1510

1511
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
97,332!
1512
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
48,673!
1513
  pTask->streamTaskId.taskId = taskId;
48,673✔
1514

1515
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
97,348!
1516
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
97,343!
1517
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
97,339!
1518
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
97,343!
1519

1520
  int32_t epSz = -1;
48,672✔
1521
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
48,673!
1522

1523
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
48,673!
1524
    TAOS_CHECK_EXIT(terrno);
×
1525
  }
1526
  for (int32_t i = 0; i < epSz; i++) {
115,233✔
1527
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
66,554!
1528
    if (pInfo == NULL) {
66,550!
1529
      TAOS_CHECK_EXIT(terrno);
×
1530
    }
1531
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
66,550!
1532
      taosMemoryFreeClear(pInfo);
×
1533
      goto _exit;
×
1534
    }
1535
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
133,123!
1536
      TAOS_CHECK_EXIT(terrno);
×
1537
    }
1538
  }
1539

1540
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
48,679✔
1541
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
51,324!
1542
  }
1543

1544
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
48,679✔
1545
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
47,872!
1546
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
23,937!
1547
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
23,936!
1548
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
23,939!
1549
      TAOS_CHECK_EXIT(terrno);
×
1550
    }
1551
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
47,919!
1552
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
24,744✔
1553
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
254!
1554
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
24,617!
1555
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
1556
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
24,617✔
1557
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
7,388!
1558
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
7,388!
1559
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
3,694!
1560
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
20,923✔
1561
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
20,914!
1562
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
20,915!
1563
  }
1564
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
97,395!
1565
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
48,671✔
1566
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
97,341!
1567
  }
1568
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
48,676!
1569

1570
  if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
48,673!
1571
    TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo));
48,676!
1572
  }
1573

1574
  tEndDecode(pDecoder);
48,669✔
1575

1576
_exit:
48,683✔
1577
  return code;
48,683✔
1578
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc