• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.65
/source/libs/stream/src/streamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "osDir.h"
18
#include "osMemory.h"
19
#include "streamInt.h"
20
#include "streamsm.h"
21
#include "tmisce.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25
#include "streamMsg.h"
26

27
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
28
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
29
static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
30
static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
31

32
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
7,664✔
33
  int32_t childId = taosArrayGetSize(pArray);
7,664✔
34
  pTask->info.selfChildId = childId;
7,664✔
35
  void* p = taosArrayPush(pArray, &pTask);
7,664✔
36
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
7,664!
37
}
38

39
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
10✔
40
  int32_t code = 0;
10✔
41
  char    buf[512] = {0};
10✔
42

43
  if (pTask->info.nodeId == nodeId) {  // execution task should be moved away
10✔
44
    bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
2✔
45
    code = epsetToStr(pEpSet, buf, tListLen(buf));
2✔
46
    if (code) { // print error and continue
2!
47
      stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
48
      return code;
×
49
    }
50

51
    if (!isEqual) {
2!
52
      (*pUpdated) = true;
×
53
      char tmp[512] = {0};
×
54
      code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));  // only for log file, ignore errors
×
55
      if (code) { // print error and continue
×
56
        stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
57
        return code;
×
58
      }
59

60
      epsetAssign(&pTask->info.epSet, pEpSet);
×
UNCOV
61
      stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
×
62
    } else {
63
      stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", pTask->id.taskId, nodeId, buf);
2!
64
    }
65
  }
66

67
  // check for the dispatch info and the upstream task info
68
  int32_t level = pTask->info.taskLevel;
10✔
69
  if (level == TASK_LEVEL__SOURCE) {
10✔
70
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
4✔
71
  } else if (level == TASK_LEVEL__AGG) {
6✔
72
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
2✔
73
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
2✔
74
  } else {  // TASK_LEVEL__SINK
75
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
4✔
76
  }
77

78
  return code;
10✔
79
}
80

81
static void freeItem(void* p) {
×
82
  SStreamContinueExecInfo* pInfo = p;
×
83
  rpcFreeCont(pInfo->msg.pCont);
×
UNCOV
84
}
×
85

86
static void freeUpstreamItem(void* p) {
42,857✔
87
  SStreamUpstreamEpInfo** pInfo = p;
42,857✔
88
  taosMemoryFree(*pInfo);
42,857✔
89
}
42,861✔
90

91
static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
10,850✔
92
  SStreamUpstreamEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamUpstreamEpInfo));
10,850✔
93
  if (pEpInfo == NULL) {
10,850!
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
95
    return NULL;
×
96
  }
97

98
  pEpInfo->childId = pTask->info.selfChildId;
10,850✔
99
  pEpInfo->epSet = pTask->info.epSet;
10,850✔
100
  pEpInfo->nodeId = pTask->info.nodeId;
10,850✔
101
  pEpInfo->taskId = pTask->id.taskId;
10,850✔
102
  pEpInfo->stage = -1;
10,850✔
103

104
  return pEpInfo;
10,850✔
105
}
106

107
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int32_t trigger,
7,664✔
108
                       int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
109
                       SStreamTask** p) {
110
  *p = NULL;
7,664✔
111

112
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
7,664✔
113
  if (pTask == NULL) {
7,664!
UNCOV
114
    stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId,
×
115
            (int32_t)sizeof(SStreamTask), tstrerror(terrno));
UNCOV
116
    return terrno;
×
117
  }
118

119
  pTask->ver = SSTREAM_TASK_VER;
7,664✔
120
  pTask->id.taskId = tGenIdPI32();
7,664✔
121
  pTask->id.streamId = streamId;
7,664✔
122

123
  pTask->info.taskLevel = taskLevel;
7,664✔
124
  pTask->info.fillHistory = fillHistory;
7,664✔
125
  pTask->info.trigger = trigger;
7,664✔
126
  pTask->info.delaySchedParam = triggerParam;
7,664✔
127
  pTask->subtableWithoutMd5 = subtableWithoutMd5;
7,664✔
128

129
  int32_t code = streamCreateStateMachine(pTask);
7,664✔
130
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
7,664!
131
    taosMemoryFreeClear(pTask);
×
UNCOV
132
    return code;
×
133
  }
134

135
  char buf[128] = {0};
7,664✔
136
  sprintf(buf, "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
7,664✔
137

138
  pTask->id.idStr = taosStrdup(buf);
7,664✔
139
  if (pTask->id.idStr == NULL) {
7,664!
140
    stError("s-task:0x%x failed to build task id, code: out of memory", pTask->id.taskId);
×
UNCOV
141
    return terrno;
×
142
  }
143

144
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
7,664✔
145
  pTask->status.taskStatus = fillHistory ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
7,664✔
146
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
7,664✔
147
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
7,664✔
148

149
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
7,664✔
150
  code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
7,664✔
151
  if (code) {
7,664!
UNCOV
152
    return code;
×
153
  }
154

155
  if (fillHistory && !hasFillhistory) {
7,664!
156
    stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId);
×
UNCOV
157
    return TSDB_CODE_INVALID_PARA;
×
158
  }
159

160
  epsetAssign(&(pTask->info.mnodeEpset), pEpset);
7,664✔
161

162
  code = addToTaskset(pTaskList, pTask);
7,664✔
163
  *p = pTask;
7,664✔
164

165
  return code;
7,664✔
166
}
167

168
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
139✔
169
  int64_t skip64;
170
  int8_t  skip8;
171
  int32_t skip32;
172
  int16_t skip16;
173
  SEpSet  epSet;
174

175
  if (tStartDecode(pDecoder) < 0) return -1;
139!
176
  if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
278!
177
  // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
178

179
  if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
139!
180
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
139!
181
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
139!
182
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
139!
183
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
139!
184
  if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
139!
185

186
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
139!
187
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
139!
188

189
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
139!
190
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
139!
191
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
139!
192
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
139!
193

194
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
278!
195
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
278!
196

197
  tEndDecode(pDecoder);
139✔
198
  return 0;
139✔
199
}
200

201
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
4✔
202
  int64_t ver;
203
  if (tStartDecode(pDecoder) < 0) return -1;
4!
204
  if (tDecodeI64(pDecoder, &ver) < 0) return -1;
4!
205
  if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
4!
206

207
  if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
8!
208

209
  int32_t taskId = 0;
4✔
210
  if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
4!
211

212
  pTaskId->taskId = taskId;
4✔
213
  tEndDecode(pDecoder);
4✔
214
  return 0;
4✔
215
}
216

217
void tFreeStreamTask(void* pParam) {
30,503✔
218
  char*        p = NULL;
30,503✔
219
  SStreamTask* pTask = pParam;
30,503✔
220
  int32_t      taskId = pTask->id.taskId;
30,503✔
221

222
  STaskExecStatisInfo* pStatis = &pTask->execInfo;
30,503✔
223

224
  ETaskStatus status1 = TASK_STATUS__UNINIT;
30,503✔
225
  streamMutexLock(&pTask->lock);
30,503✔
226
  if (pTask->status.pSM != NULL) {
30,503✔
227
    SStreamTaskState status = streamTaskGetStatus(pTask);
15,343✔
228
    p = status.name;
15,343✔
229
    status1 = status.state;
15,343✔
230
  }
231
  streamMutexUnlock(&pTask->lock);
30,503✔
232

233
  stDebug("start to free s-task:0x%x %p, state:%s, refId:%" PRId64, taskId, pTask, p, pTask->id.refId);
30,503✔
234

235
  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
30,503✔
236
  stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
30,503✔
237
          ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
238
          " nextProcessVer:%" PRId64 ", checkpointCount:%d",
239
          taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs,
240
          pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
241

242
  if (pTask->schedInfo.pDelayTimer != NULL) {
30,503✔
243
    streamTmrStop(pTask->schedInfo.pDelayTimer);
1,008✔
244
    pTask->schedInfo.pDelayTimer = NULL;
1,008✔
245
  }
246

247
  if (pTask->hTaskInfo.pTimer != NULL) {
30,503✔
248
    streamTmrStop(pTask->hTaskInfo.pTimer);
1,067✔
249
    pTask->hTaskInfo.pTimer = NULL;
1,067✔
250
  }
251

252
  if (pTask->msgInfo.pRetryTmr != NULL) {
30,503✔
253
    streamTmrStop(pTask->msgInfo.pRetryTmr);
3,202✔
254
    pTask->msgInfo.pRetryTmr = NULL;
3,202✔
255
  }
256

257
  if (pTask->inputq.queue) {
30,503✔
258
    streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
7,645✔
259
    pTask->inputq.queue = NULL;
7,645✔
260
  }
261

262
  if (pTask->outputq.queue) {
30,503✔
263
    streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
7,645✔
264
    pTask->outputq.queue = NULL;
7,645✔
265
  }
266

267
  if (pTask->exec.qmsg) {
30,503✔
268
    taosMemoryFree(pTask->exec.qmsg);
16,244✔
269
  }
270

271
  if (pTask->exec.pExecutor) {
30,503✔
272
    qDestroyTask(pTask->exec.pExecutor);
4,033✔
273
    pTask->exec.pExecutor = NULL;
4,032✔
274
  }
275

276
  if (pTask->exec.pWalReader != NULL) {
30,502✔
277
    walCloseReader(pTask->exec.pWalReader);
3,861✔
278
    pTask->exec.pWalReader = NULL;
3,862✔
279
  }
280

281
  streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
30,503✔
282

283
  if (pTask->msgInfo.pData != NULL) {
30,503✔
284
    clearBufferedDispatchMsg(pTask);
33✔
285
  }
286

287
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
30,503✔
288
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
14,703!
289
    taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
14,703✔
290
    tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo);
14,703✔
291
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
14,703✔
292
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
15,800✔
293
    taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
13,008✔
294
  }
295

296
  streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
30,503✔
297
  streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0);
30,503✔
298

299
  if (pTask->pNameMap) {
30,502✔
300
    tSimpleHashCleanup(pTask->pNameMap);
1,378✔
301
  }
302

303
  streamDestroyStateMachine(pTask->status.pSM);
30,502✔
304
  pTask->status.pSM = NULL;
30,503✔
305

306
  streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
30,503✔
307

308
  taosMemoryFree(pTask->outputInfo.pTokenBucket);
30,503✔
309
  streamMutexDestroy(&pTask->lock);
30,503✔
310

311
  taosArrayDestroy(pTask->msgInfo.pSendInfo);
30,503✔
312
  pTask->msgInfo.pSendInfo = NULL;
30,503✔
313
  streamMutexDestroy(&pTask->msgInfo.lock);
30,503✔
314

315
  taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
30,503✔
316
  pTask->outputInfo.pNodeEpsetUpdateList = NULL;
30,503✔
317

318
  if (pTask->id.idStr != NULL) {
30,503✔
319
    taosMemoryFree((void*)pTask->id.idStr);
15,309✔
320
  }
321

322
  streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
30,503✔
323
  pTask->chkInfo.pActiveInfo = NULL;
30,503✔
324

325
  taosMemoryFree(pTask);
30,503✔
326
  stDebug("s-task:0x%x free task completed", taskId);
30,503✔
327
}
30,503✔
328

329
void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
30,502✔
330
  stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId);
30,502✔
331
  if (pTask->pState != NULL) {
30,503✔
332
    stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
4,033✔
333
    streamStateClose(pTask->pState, remove);
4,033✔
334

335
    if (remove) taskDbSetClearFileFlag(pTask->pBackend);
4,033✔
336
    taskDbRemoveRef(pTask->pBackend);
4,033✔
337
    pTask->pBackend = NULL;
4,033✔
338
    pTask->pState = NULL;
4,033✔
339
  } else {
340
    stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
26,470✔
341
            pTask->backendPath ? pTask->backendPath : "NULL");
342
    if (remove) {
26,470✔
343
      if (pTask->backendPath != NULL) {
1,560!
344
        stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
1,560!
345
        taosRemoveDir(pTask->backendPath);
1,560✔
346
      }
347
    }
348
  }
349

350
  if (pTask->backendPath != NULL) {
30,503✔
351
    taosMemoryFree(pTask->backendPath);
7,645✔
352
    pTask->backendPath = NULL;
7,645✔
353
  }
354
}
30,503✔
355

356
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
7,770✔
357
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
7,770✔
358
  SDataRange*      pRange = &pTask->dataRange;
7,770✔
359

360
  // only set the version info for stream tasks without fill-history task
361
  if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
7,770✔
362
    pChkInfo->checkpointVer = ver - 1;  // only update when generating checkpoint
2,229✔
363
    pChkInfo->processedVer = ver - 1;   // already processed version
2,229✔
364
    pChkInfo->nextProcessVer = ver;     // next processed version
2,229✔
365

366
    pRange->range.maxVer = ver;
2,229✔
367
    pRange->range.minVer = ver;
2,229✔
368
  } else {
369
    // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
370
    // is set at the mnode.
371
    if (pTask->info.fillHistory == 1) {
5,541✔
372
      pChkInfo->checkpointVer = pRange->range.maxVer;
2,771✔
373
      pChkInfo->processedVer = pRange->range.maxVer;
2,771✔
374
      pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
2,771✔
375
    } else {
376
      pChkInfo->checkpointVer = pRange->range.minVer - 1;
2,770✔
377
      pChkInfo->processedVer = pRange->range.minVer - 1;
2,770✔
378
      pChkInfo->nextProcessVer = pRange->range.minVer;
2,770✔
379

380
      {  // for compatible purpose, remove it later
381
        if (pRange->range.minVer == 0) {
2,770✔
382
          pChkInfo->checkpointVer = 0;
1,395✔
383
          pChkInfo->processedVer = 0;
1,395✔
384
          pChkInfo->nextProcessVer = 1;
1,395✔
385
          stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
1,395✔
386
        }
387
      }
388
    }
389
  }
390
}
7,770✔
391

392
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
7,770✔
393
  int64_t streamId = 0;
7,770✔
394
  int32_t taskId = 0;
7,770✔
395

396
  if (pTask->info.fillHistory) {
7,770✔
397
    streamId = pTask->streamTaskId.streamId;
2,771✔
398
    taskId = pTask->streamTaskId.taskId;
2,771✔
399
  } else {
400
    streamId = pTask->id.streamId;
4,999✔
401
    taskId = pTask->id.taskId;
4,999✔
402
  }
403

404
  char    id[128] = {0};
7,770✔
405
  int32_t nBytes = sprintf(id, "0x%" PRIx64 "-0x%x", streamId, taskId);
7,770✔
406
  if (nBytes < 0 || nBytes >= sizeof(id)) {
7,770!
UNCOV
407
    return TSDB_CODE_OUT_OF_BUFFER;
×
408
  }
409

410
  int32_t len = strlen(pTask->pMeta->path);
7,770✔
411
  pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
7,770✔
412
  if (pTask->backendPath == NULL) {
7,770!
UNCOV
413
    return terrno;
×
414
  }
415

416
  (void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
7,770✔
417
  stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
7,770✔
418

419
  return 0;
7,770✔
420
}
421

422
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
7,770✔
423
  int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
7,770✔
424
  if (code) {
7,770!
425
    stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
×
UNCOV
426
    return code;
×
427
  }
428

429
  pTask->id.refId = 0;
7,770✔
430
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
7,770✔
431
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
7,770✔
432

433
  int32_t code1 = streamQueueOpen(512 << 10, &pTask->inputq.queue);
7,770✔
434
  int32_t code2 = streamQueueOpen(512 << 10, &pTask->outputq.queue);
7,770✔
435
  if (code1 || code2) {
7,770!
436
    stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
×
UNCOV
437
    return TSDB_CODE_OUT_OF_MEMORY;
×
438
  }
439

440
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
7,770✔
441

442
  code = streamCreateStateMachine(pTask);
7,770✔
443
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
7,770!
UNCOV
444
    stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
×
445
            tstrerror(code));
UNCOV
446
    return code;
×
447
  }
448

449
  pTask->execInfo.created = taosGetTimestampMs();
7,770✔
450
  setInitialVersionInfo(pTask, ver);
7,770✔
451

452
  pTask->pMeta = pMeta;
7,770✔
453
  pTask->pMsgCb = pMsgCb;
7,770✔
454
  pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry));
7,770✔
455
  if (pTask->msgInfo.pSendInfo == NULL) {
7,770!
456
    stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr);
×
UNCOV
457
    return terrno;
×
458
  }
459

460
  code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
7,770✔
461
  if (code) {
7,770!
462
    stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code));
×
UNCOV
463
    return code;
×
464
  }
465

466
  TdThreadMutexAttr attr = {0};
7,770✔
467
  code = taosThreadMutexAttrInit(&attr);
7,770✔
468
  if (code != 0) {
7,770!
469
    stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
470
    return code;
×
471
  }
472

473
  code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
7,770✔
474
  if (code != 0) {
7,770!
475
    stError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
476
    return code;
×
477
  }
478

479
  code = taosThreadMutexInit(&pTask->lock, &attr);
7,770✔
480
  if (code) {
7,770!
UNCOV
481
    return code;
×
482
  }
483

484
  code = taosThreadMutexAttrDestroy(&attr);
7,770✔
485
  if (code) {
7,770!
UNCOV
486
    return code;
×
487
  }
488

489
  streamTaskOpenAllUpstreamInput(pTask);
7,770✔
490

491
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
7,770✔
492
  pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
7,770✔
493
  if (pOutputInfo->pTokenBucket == NULL) {
7,770!
494
    stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno));
×
UNCOV
495
    return terrno;
×
496
  }
497

498
  // 2MiB per second for sink task
499
  // 50 times sink operator per second
500
  code = streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
7,770✔
501
  if (code) {
7,769!
UNCOV
502
    return code;
×
503
  }
504

505
  pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
7,769✔
506
  if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
7,770!
507
    stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno));
×
UNCOV
508
    return terrno;
×
509
  }
510

511
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
7,770✔
512
  if (pTask->taskCheckInfo.pList == NULL) {
7,770!
513
    stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno));
×
UNCOV
514
    return terrno;
×
515
  }
516

517
  if (pTask->chkInfo.pActiveInfo == NULL) {
7,770!
518
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
7,770✔
519
    if (code) {
7,770!
520
      stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
521
      return code;
×
522
    }
523
  }
524

525
  return streamTaskSetBackendPath(pTask);
7,770✔
526
}
527

528
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
56,783✔
529
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
56,783✔
530
    return 0;
3,565✔
531
  }
532

533
  int32_t type = pTask->outputInfo.type;
53,218✔
534
  if (type == TASK_OUTPUT__TABLE) {
53,218✔
535
    return 0;
183✔
536
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
53,035✔
537
    return 1;
8,574✔
538
  } else {
539
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
44,461✔
540
    return taosArrayGetSize(vgInfo);
44,461✔
541
  }
542
}
543

544
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) { return taosArrayGetSize(pTask->upstreamInfo.pList); }
9,775✔
545

546
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
10,850✔
547
  SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
10,850✔
548
  if (pEpInfo == NULL) {
10,850!
UNCOV
549
    return terrno;
×
550
  }
551

552
  if (pTask->upstreamInfo.pList == NULL) {
10,850✔
553
    pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES);
3,783✔
554
  }
555

556
  void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
10,850✔
557
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
10,850!
558
}
559

560
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
6✔
561
  int32_t code = 0;
6✔
562
  char    buf[512] = {0};
6✔
563
  code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore error since it is only for log file.
6✔
564
  if (code != 0) {  // print error and continue
6!
565
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
566
    return code;
×
567
  }
568

569
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
6✔
570
  for (int32_t i = 0; i < numOfUpstream; ++i) {
10✔
571
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
8✔
572
    if (pInfo->nodeId == nodeId) {
8✔
573
      bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
4✔
574
      if (!equal) {
4!
UNCOV
575
        *pUpdated = true;
×
576

577
        char tmp[512] = {0};
×
578
        code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
×
579
        if (code != 0) {  // print error and continue
×
580
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
581
          return code;
×
582
        }
583

584
        epsetAssign(&pInfo->epSet, pEpSet);
×
UNCOV
585
        stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
×
586
                pInfo->taskId, nodeId, buf, tmp);
587
      } else {
588
        stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId,
4!
589
                pInfo->taskId, nodeId, buf);
590
      }
591

592
      break;
4✔
593
    }
594
  }
595

596
  return code;
6✔
597
}
598

599
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
30,502✔
600
  if (pUpstreamInfo->pList != NULL) {
30,502✔
601
    taosArrayDestroyEx(pUpstreamInfo->pList, freeUpstreamItem);
26,588✔
602
    pUpstreamInfo->numOfClosed = 0;
26,588✔
603
    pUpstreamInfo->pList = NULL;
26,588✔
604
  }
605
}
30,502✔
606

607
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
718✔
608
  STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
718✔
609
  pDispatcher->taskId = pDownstreamTask->id.taskId;
718✔
610
  pDispatcher->nodeId = pDownstreamTask->info.nodeId;
718✔
611
  pDispatcher->epSet = pDownstreamTask->info.epSet;
718✔
612

613
  pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
718✔
614
  pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
718✔
615
}
718✔
616

617
int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
6✔
618
  char    buf[512] = {0};
6✔
619
  int32_t code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore the error since only for log files.
6✔
620
  if (code != 0) {                                        // print error and continue
6!
621
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
622
    return code;
×
623
  }
624

625
  int32_t id = pTask->id.taskId;
6✔
626
  int8_t  type = pTask->outputInfo.type;
6✔
627

628
  if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
6✔
629
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
2✔
630

631
    for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
6✔
632
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
4✔
633
      if (pVgInfo == NULL) {
4!
UNCOV
634
        continue;
×
635
      }
636

637
      if (pVgInfo->vgId == nodeId) {
4!
638
        bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
×
639
        if (!isEqual) {
×
UNCOV
640
          *pUpdated = true;
×
641

642
          char tmp[512] = {0};
×
643
          code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
×
644
          if (code != 0) {  // print error and continue
×
645
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
646
            return code;
×
647
          }
648

649
          epsetAssign(&pVgInfo->epSet, pEpSet);
×
UNCOV
650
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
×
651
                  nodeId, buf, tmp);
652
        } else {
UNCOV
653
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
×
654
                  pVgInfo->taskId, nodeId, buf);
655
        }
UNCOV
656
        break;
×
657
      }
658
    }
659
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
4!
660
    STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
4✔
661
    if (pDispatcher->nodeId == nodeId) {
4!
662
      bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet);
4✔
663
      if (!equal) {
4!
UNCOV
664
        *pUpdated = true;
×
665

666
        char tmp[512] = {0};
×
667
        code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
×
668
        if (code != 0) {  // print error and continue
×
669
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
670
          return code;
×
671
        }
672

673
        epsetAssign(&pDispatcher->epSet, pEpSet);
×
UNCOV
674
        stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
×
675
                nodeId, buf, tmp);
676
      } else {
677
        stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
4!
678
                pDispatcher->taskId, nodeId, buf);
679
      }
680
    }
681
  }
682

683
  return code;
6✔
684
}
685

686
int32_t streamTaskStop(SStreamTask* pTask) {
1,755✔
687
  int32_t     vgId = pTask->pMeta->vgId;
1,755✔
688
  int64_t     st = taosGetTimestampMs();
1,755✔
689
  const char* id = pTask->id.idStr;
1,755✔
690

691
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
1,755✔
692
  if (code) {
1,755!
693
    stError("failed to handle STOP event, s-task:%s, code:%s", id, tstrerror(code));
×
UNCOV
694
    return code;
×
695
  }
696

697
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
1,755✔
698
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
1,003✔
699
    if (code != TSDB_CODE_SUCCESS) {
1,003!
UNCOV
700
      stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
×
701
    }
702
  }
703

704
  while (!streamTaskIsIdle(pTask)) {
1,755!
UNCOV
705
    stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
×
706
            pTask->info.taskLevel);
UNCOV
707
    taosMsleep(100);
×
708
  }
709

710
  int64_t el = taosGetTimestampMs() - st;
1,755✔
711
  stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
1,755✔
712
  return code;
1,755✔
713
}
714

715
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
10✔
716
  STaskExecStatisInfo* p = &pTask->execInfo;
10✔
717

718
  int32_t numOfNodes = taosArrayGetSize(pNodeList);
10✔
719
  int64_t prevTs = p->latestUpdateTs;
10✔
720

721
  p->latestUpdateTs = taosGetTimestampMs();
10✔
722
  p->updateCount += 1;
10✔
723
  stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
10!
724
          numOfNodes, p->updateCount, prevTs);
725

726
  bool updated = false;
10✔
727
  for (int32_t i = 0; i < numOfNodes; ++i) {
20✔
728
    SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
10✔
729
    if (pInfo == NULL) {
10!
UNCOV
730
      continue;
×
731
    }
732

733
    int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
10✔
734
    if (code) {
10!
UNCOV
735
      stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
×
736
    }
737
  }
738

739
  return updated;
10✔
740
}
741

742
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
7,770✔
743
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
7,770✔
744
    return;
3,924✔
745
  }
746

747
  int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList);
3,846✔
748
  for (int32_t i = 0; i < size; ++i) {
14,752✔
749
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
10,906✔
750
    pInfo->stage = -1;
10,906✔
751
  }
752

753
  stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
3,846✔
754
}
755

756
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
12,696✔
757
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
12,696✔
758
  if (num == 0) {
12,695✔
759
    return;
6,394✔
760
  }
761

762
  for (int32_t i = 0; i < num; ++i) {
25,176✔
763
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
18,875✔
764
    pInfo->dataAllowed = true;
18,875✔
765
  }
766

767
  pTask->upstreamInfo.numOfClosed = 0;
6,301✔
768
  stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num);
6,301✔
769
}
770

771
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
4,224✔
772
  SStreamUpstreamEpInfo* pInfo = NULL;
4,224✔
773
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
4,224✔
774

775
  if ((pInfo != NULL) && pInfo->dataAllowed) {
4,224!
776
    pInfo->dataAllowed = false;
4,224✔
777
    if (pTask->upstreamInfo.numOfClosed < streamTaskGetNumOfUpstream(pTask)) {
4,224!
778
      int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
4,224✔
779
    } else {
UNCOV
780
      stError("s-task:%s not inc closed input, since they have been all closed already", pTask->id.idStr);
×
781
    }
782
  }
783
}
4,224✔
784

785
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
×
786
  SStreamUpstreamEpInfo* pInfo = NULL;
×
UNCOV
787
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
×
788

789
  if (pInfo != NULL && (!pInfo->dataAllowed)) {
×
790
    int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
×
791
    stDebug("s-task:%s open inputQ for upstream:0x%x, remain closed:%d", pTask->id.idStr, taskId, t);
×
UNCOV
792
    pInfo->dataAllowed = true;
×
793
  }
UNCOV
794
}
×
795

796
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
×
UNCOV
797
  return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
×
798
}
799

800
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
45,232✔
801
  bool ret = false;
45,232✔
802

803
  streamMutexLock(&pTask->lock);
45,232✔
804
  if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
45,232✔
805
    pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
36,206✔
806
    ret = true;
36,206✔
807
  }
808

809
  streamMutexUnlock(&pTask->lock);
45,232✔
810
  return ret;
45,232✔
811
}
812

813
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
35,146✔
814
  streamMutexLock(&pTask->lock);
35,146✔
815
  int8_t status = pTask->status.schedStatus;
35,146✔
816
  if (status == TASK_SCHED_STATUS__WAITING) {
35,146!
817
    pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
35,146✔
818
  }
819
  streamMutexUnlock(&pTask->lock);
35,146✔
820

821
  return status;
35,146✔
822
}
823

824
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
1,017✔
825
  streamMutexLock(&pTask->lock);
1,017✔
826
  int8_t status = pTask->status.schedStatus;
1,017✔
827
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
1,017✔
828
  streamMutexUnlock(&pTask->lock);
1,017✔
829

830
  return status;
1,017✔
831
}
832

833
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
3,347✔
834
  int32_t      code = 0;
3,347✔
835
  SStreamMeta* pMeta = pTask->pMeta;
3,347✔
836
  SStreamTask* pStreamTask = NULL;
3,347✔
837

838
  if (pTask->info.fillHistory == 0) {
3,347✔
839
    return code;
3,343✔
840
  }
841

842
  code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
4✔
843
  if (code == 0) {
4!
UNCOV
844
    stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
×
845
            (int32_t)pTask->streamTaskId.taskId);
846

847
    streamMutexLock(&(pStreamTask->lock));
×
UNCOV
848
    CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
×
849

850
    if (resetRelHalt) {
×
UNCOV
851
      stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
×
852
              pTask->streamTaskId.taskId, streamTaskGetStatusStr(pStreamTask->status.taskStatus),
853
              streamTaskGetStatus(pStreamTask).name);
UNCOV
854
      pStreamTask->status.taskStatus = TASK_STATUS__READY;
×
855
    }
856

857
    code = streamMetaSaveTask(pMeta, pStreamTask);
×
UNCOV
858
    streamMutexUnlock(&(pStreamTask->lock));
×
859

UNCOV
860
    streamMetaReleaseTask(pMeta, pStreamTask);
×
861
  }
862

863
  return code;
4✔
864
}
865

866
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
4✔
867
  SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
4✔
868
  if (pReq == NULL) {
4!
UNCOV
869
    return terrno;
×
870
  }
871

872
  pReq->head.vgId = vgId;
4✔
873
  pReq->taskId = pTaskId->taskId;
4✔
874
  pReq->streamId = pTaskId->streamId;
4✔
875
  pReq->resetRelHalt = resetRelHalt;
4✔
876

877
  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
4✔
878
  int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
4✔
879
  if (code != TSDB_CODE_SUCCESS) {
4!
UNCOV
880
    stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
×
881
  } else {
882
    stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
4!
883
  }
884

885
  return code;
4✔
886
}
887

888
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
2,645✔
889
  int32_t                code = 0;
2,645✔
890
  int32_t                tlen = 0;
2,645✔
891
  int32_t                vgId = pTask->pMeta->vgId;
2,645✔
892
  const char*            id = pTask->id.idStr;
2,645✔
893
  SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo;
2,645✔
894

895
  SCheckpointReport req = {.streamId = pTask->id.streamId,
2,645✔
896
                           .taskId = pTask->id.taskId,
2,645✔
897
                           .nodeId = vgId,
898
                           .dropHTask = dropRelHTask,
899
                           .transId = pActive->transId,
2,645✔
900
                           .checkpointId = pActive->activeId,
2,645✔
901
                           .checkpointVer = pCheckpointInfo->processedVer,
2,645✔
902
                           .checkpointTs = pCheckpointInfo->startTs};
2,645✔
903

904
  tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code);
2,645!
905
  if (code < 0) {
2,645!
906
    stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code));
×
UNCOV
907
    return -1;
×
908
  }
909

910
  void* buf = rpcMallocCont(tlen);
2,645✔
911
  if (buf == NULL) {
2,645!
UNCOV
912
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId,
×
913
            tstrerror(TSDB_CODE_OUT_OF_MEMORY));
UNCOV
914
    return -1;
×
915
  }
916

917
  SEncoder encoder;
918
  tEncoderInit(&encoder, buf, tlen);
2,645✔
919
  if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
2,645!
920
    rpcFreeCont(buf);
×
921
    tEncoderClear(&encoder);
×
922
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
×
UNCOV
923
    return -1;
×
924
  }
925
  tEncoderClear(&encoder);
2,645✔
926

927
  SRpcMsg msg = {0};
2,645✔
928
  initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen);
2,645✔
929
  stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId);
2,645✔
930

931
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
2,645✔
932
}
933

934
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
22,922✔
935
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
22,922✔
936
  return id;
22,922✔
937
}
938

939
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
1,083✔
940
  pInfo->waitInterval = LAUNCH_HTASK_INTERVAL;
1,083✔
941
  pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL);
1,083✔
942
  pInfo->retryTimes = 0;
1,083✔
943
}
1,083✔
944

945
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
1,079✔
946
  pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
1,079✔
947
  pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
1,079✔
948
  pInfo->retryTimes += 1;
1,079✔
949
}
1,079✔
950

951
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) {
4,847✔
952
  pEntry->id.streamId = pTask->id.streamId;
4,847✔
953
  pEntry->id.taskId = pTask->id.taskId;
4,847✔
954
  pEntry->stage = -1;
4,847✔
955
  pEntry->nodeId = pTask->info.nodeId;
4,847✔
956
  pEntry->status = TASK_STATUS__STOP;
4,847✔
957
}
4,847✔
958

959
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) {
13,591✔
960
  pDst->stage = pSrc->stage;
13,591✔
961
  pDst->inputQUsed = pSrc->inputQUsed;
13,591✔
962
  pDst->inputRate = pSrc->inputRate;
13,591✔
963
  pDst->procsTotal = pSrc->procsTotal;
13,591✔
964
  pDst->procsThroughput = pSrc->procsThroughput;
13,591✔
965
  pDst->outputTotal = pSrc->outputTotal;
13,591✔
966
  pDst->outputThroughput = pSrc->outputThroughput;
13,591✔
967
  pDst->processedVer = pSrc->processedVer;
13,591✔
968
  pDst->verRange = pSrc->verRange;
13,591✔
969
  pDst->sinkQuota = pSrc->sinkQuota;
13,591✔
970
  pDst->sinkDataSize = pSrc->sinkDataSize;
13,591✔
971
  pDst->checkpointInfo = pSrc->checkpointInfo;
13,591✔
972
  pDst->startCheckpointId = pSrc->startCheckpointId;
13,591✔
973
  pDst->startCheckpointVer = pSrc->startCheckpointVer;
13,591✔
974
  pDst->status = pSrc->status;
13,591✔
975

976
  pDst->startTime = pSrc->startTime;
13,591✔
977
  pDst->hTaskId = pSrc->hTaskId;
13,591✔
978
}
13,591✔
979

980
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
14,142✔
981
  SStreamMeta*         pMeta = pTask->pMeta;
14,142✔
982
  STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
14,142✔
983

984
  STaskStatusEntry entry = {
42,426✔
985
      .id = streamTaskGetTaskId(pTask),
14,142✔
986
      .status = streamTaskGetStatus(pTask).state,
14,142✔
987
      .nodeId = pMeta->vgId,
14,142✔
988
      .stage = pMeta->stage,
14,142✔
989

990
      .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)),
14,142✔
991
      .startTime = pExecInfo->readyTs,
14,142✔
992
      .checkpointInfo.latestId = pTask->chkInfo.checkpointId,
14,142✔
993
      .checkpointInfo.latestVer = pTask->chkInfo.checkpointVer,
14,142✔
994
      .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
14,142✔
995
      .checkpointInfo.latestSize = 0,
996
      .checkpointInfo.remoteBackup = 0,
997
      .checkpointInfo.consensusChkptId = 0,
998
      .checkpointInfo.consensusTs = 0,
999
      .hTaskId = pTask->hTaskInfo.id.taskId,
14,142✔
1000
      .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
14,142✔
1001
      .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
14,142✔
1002
      .procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput),
14,142✔
1003
      .outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput),
14,142✔
1004
      .startCheckpointId = pExecInfo->startCheckpointId,
14,142✔
1005
      .startCheckpointVer = pExecInfo->startCheckpointVer,
14,142✔
1006
  };
1007
  return entry;
14,142✔
1008
}
1009

1010
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
709✔
1011
  SStreamMeta* pMeta = pTask->pMeta;
709✔
1012
  int32_t      code = 0;
709✔
1013

1014
  int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
709✔
1015
  stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
709!
1016

1017
  // in case of fill-history task, stop the tsdb file scan operation.
1018
  if (pTask->info.fillHistory == 1) {
709!
1019
    void* pExecutor = pTask->exec.pExecutor;
×
UNCOV
1020
    code = qKillTask(pExecutor, TSDB_CODE_SUCCESS);
×
1021
  }
1022

1023
  stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
709!
1024
  return code;
709✔
1025
}
1026

1027
void streamTaskPause(SStreamTask* pTask) {
709✔
1028
  int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
709✔
1029
  if (code) {
709!
UNCOV
1030
    stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code));
×
1031
  }
1032
}
709✔
1033

1034
void streamTaskResume(SStreamTask* pTask) {
702✔
1035
  SStreamTaskState prevState = streamTaskGetStatus(pTask);
702✔
1036

1037
  SStreamMeta* pMeta = pTask->pMeta;
702✔
1038
  int32_t      code = streamTaskRestoreStatus(pTask);
702✔
1039
  if (code == TSDB_CODE_SUCCESS) {
702!
1040
    char*   pNew = streamTaskGetStatus(pTask).name;
702✔
1041
    int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
702✔
1042
    stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
702!
1043
  } else {
UNCOV
1044
    stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name,
×
1045
           pMeta->numOfPausedTasks);
1046
  }
1047
}
702✔
1048

1049
bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
41,511✔
1050

1051
// this task must success
1052
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
2,669✔
1053
  int32_t     code;
1054
  int32_t     tlen = 0;
2,669✔
1055
  int32_t     vgId = pTask->pMeta->vgId;
2,669✔
1056
  const char* id = pTask->id.idStr;
2,669✔
1057

1058
  SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
2,669✔
1059
  tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
2,669!
1060
  if (code < 0) {
2,669!
1061
    stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
×
UNCOV
1062
    return TSDB_CODE_INVALID_MSG;
×
1063
  }
1064

1065
  void* buf = rpcMallocCont(tlen);
2,669✔
1066
  if (buf == NULL) {
2,669!
1067
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:Out of memory", id, vgId);
×
UNCOV
1068
    return terrno;
×
1069
  }
1070

1071
  SEncoder encoder;
1072
  tEncoderInit(&encoder, buf, tlen);
2,669✔
1073
  if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
2,669!
1074
    rpcFreeCont(buf);
×
1075
    tEncoderClear(&encoder);
×
1076
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
×
UNCOV
1077
    return code;
×
1078
  }
1079

1080
  tEncoderClear(&encoder);
2,669✔
1081

1082
  SRpcMsg msg = {0};
2,669✔
1083
  initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
2,669✔
1084
  stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
2,669✔
1085

1086
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
2,669✔
1087
}
1088

1089
void streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo) {
38,778✔
1090
  *pEpInfo = NULL;
38,778✔
1091

1092
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
38,778✔
1093
  for (int32_t i = 0; i < num; ++i) {
81,143!
1094
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
81,144✔
1095
    if (pInfo == NULL) {
81,143!
UNCOV
1096
      return;
×
1097
    }
1098

1099
    if (pInfo->taskId == taskId) {
81,143✔
1100
      *pEpInfo = pInfo;
38,778✔
1101
      return;
38,778✔
1102
    }
1103
  }
1104

UNCOV
1105
  stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
×
1106
}
1107

1108
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
×
1109
  if (pTask->info.taskLevel == TASK_OUTPUT__FIXED_DISPATCH) {
×
1110
    if (pTask->outputInfo.fixedDispatcher.taskId == taskId) {
×
UNCOV
1111
      return &pTask->outputInfo.fixedDispatcher.epSet;
×
1112
    }
1113
  } else if (pTask->info.taskLevel == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
1114
    SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
1115
    for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
1116
      SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
×
1117
      if (pVgInfo == NULL) {
×
UNCOV
1118
        continue;
×
1119
      }
1120

1121
      if (pVgInfo->taskId == taskId) {
×
UNCOV
1122
        return &pVgInfo->epSet;
×
1123
      }
1124
    }
1125
  }
1126

UNCOV
1127
  return NULL;
×
1128
}
1129

1130
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
7,770✔
1131
  char buf[128] = {0};
7,770✔
1132
  sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
7,770✔
1133
  *pId = taosStrdup(buf);
7,770✔
1134

1135
  if (*pId == NULL) {
7,770!
UNCOV
1136
    return terrno;
×
1137
  } else {
1138
    return TSDB_CODE_SUCCESS;
7,770✔
1139
  }
1140
}
1141

1142
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
436✔
1143
  int32_t           code;
1144
  SStreamDataBlock* pData;
1145

1146
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock), (void**)&pData);
436✔
1147
  if (code) {
436!
1148
    stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
×
UNCOV
1149
    return terrno = code;
×
1150
  }
1151

1152
  pData->type = STREAM_INPUT__DATA_RETRIEVE;
436✔
1153
  pData->srcVgId = 0;
436✔
1154

1155
  code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
436✔
1156
  if (code != TSDB_CODE_SUCCESS) {
436!
1157
    stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code));
×
1158
    taosFreeQitem(pData);
×
UNCOV
1159
    return code;
×
1160
  }
1161

1162
  code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
436✔
1163
  if (code != TSDB_CODE_SUCCESS) {
436!
UNCOV
1164
    stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
×
1165
            pTask->id.idStr);
1166
  }
1167

1168
  return code;
436✔
1169
}
1170

1171
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
436✔
1172
  int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
436✔
1173
  if (code != 0) {
436!
UNCOV
1174
    return code;
×
1175
  }
1176
  return streamTrySchedExec(pTask);
436✔
1177
}
1178

1179
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
3,347✔
1180

1181
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
×
1182
  if (pTransId != NULL) {
×
UNCOV
1183
    *pTransId = pTask->chkInfo.pActiveInfo->transId;
×
1184
  }
1185

1186
  if (pCheckpointId != NULL) {
×
UNCOV
1187
    *pCheckpointId = pTask->chkInfo.pActiveInfo->activeId;
×
1188
  }
UNCOV
1189
}
×
1190

1191
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) {
28✔
1192
  pTask->chkInfo.pActiveInfo->activeId = activeCheckpointId;
28✔
1193
  return TSDB_CODE_SUCCESS;
28✔
1194
}
1195

1196
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
×
1197
  pTask->chkInfo.pActiveInfo->transId = transId;
×
1198
  pTask->chkInfo.pActiveInfo->activeId = checkpointId;
×
1199
  pTask->chkInfo.pActiveInfo->failedId = checkpointId;
×
1200
  stDebug("s-task:%s set failed checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
×
UNCOV
1201
}
×
1202

1203
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
7,804✔
1204
  SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo));
7,804✔
1205
  if (pInfo == NULL) {
7,804!
UNCOV
1206
    return terrno;
×
1207
  }
1208

1209
  int32_t code = taosThreadMutexInit(&pInfo->lock, NULL);
7,804✔
1210
  if (code != TSDB_CODE_SUCCESS) {
7,804!
UNCOV
1211
    return code;
×
1212
  }
1213

1214
  pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
7,804✔
1215
  pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
7,804✔
1216
  pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
7,804✔
1217

1218
  *pRes = pInfo;
7,804✔
1219
  return code;
7,804✔
1220
}
1221

1222
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
30,503✔
1223
  if (pInfo == NULL) {
30,503✔
1224
    return;
22,824✔
1225
  }
1226

1227
  streamMutexDestroy(&pInfo->lock);
7,679✔
1228
  taosArrayDestroy(pInfo->pDispatchTriggerList);
7,679✔
1229
  pInfo->pDispatchTriggerList = NULL;
7,679✔
1230
  taosArrayDestroy(pInfo->pReadyMsgList);
7,679✔
1231
  pInfo->pReadyMsgList = NULL;
7,679✔
1232
  taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
7,679✔
1233
  pInfo->pCheckpointReadyRecvList = NULL;
7,679✔
1234

1235
  SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
7,679✔
1236
  if (pTriggerTmr->tmrHandle != NULL) {
7,679✔
1237
    streamTmrStop(pTriggerTmr->tmrHandle);
1,293✔
1238
    pTriggerTmr->tmrHandle = NULL;
1,293✔
1239
  }
1240

1241
  SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
7,679✔
1242
  if (pReadyTmr->tmrHandle != NULL) {
7,679✔
1243
    streamTmrStop(pReadyTmr->tmrHandle);
1,287✔
1244
    pReadyTmr->tmrHandle = NULL;
1,287✔
1245
  }
1246

1247
  taosMemoryFree(pInfo);
7,679✔
1248
}
1249

1250
// NOTE: clear the checkpoint id, and keep the failed id
1251
// failedId for a task will increase as the checkpoint I.D. increases.
1252
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
2,246✔
1253
  pInfo->activeId = 0;
2,246✔
1254
  pInfo->transId = 0;
2,246✔
1255
  pInfo->allUpstreamTriggerRecv = 0;
2,246✔
1256
  pInfo->dispatchTrigger = false;
2,246✔
1257

1258
  taosArrayClear(pInfo->pDispatchTriggerList);
2,246✔
1259
  taosArrayClear(pInfo->pCheckpointReadyRecvList);
2,245✔
1260
}
2,245✔
1261

1262
const char* streamTaskGetExecType(int32_t type) {
100,798✔
1263
  switch (type) {
100,798!
1264
    case STREAM_EXEC_T_EXTRACT_WAL_DATA:
42,077✔
1265
      return "scan-wal-file";
42,077✔
1266
    case STREAM_EXEC_T_START_ALL_TASKS:
9,454✔
1267
      return "start-all-tasks";
9,454✔
1268
    case STREAM_EXEC_T_START_ONE_TASK:
4,907✔
1269
      return "start-one-task";
4,907✔
1270
    case STREAM_EXEC_T_RESTART_ALL_TASKS:
3✔
1271
      return "restart-all-tasks";
3✔
1272
    case STREAM_EXEC_T_STOP_ALL_TASKS:
4,758✔
1273
      return "stop-all-tasks";
4,758✔
1274
    case STREAM_EXEC_T_RESUME_TASK:
6,363✔
1275
      return "resume-task-from-idle";
6,363✔
1276
    case STREAM_EXEC_T_ADD_FAILED_TASK:
×
UNCOV
1277
      return "record-start-failed-task";
×
1278
    case 0:
33,326✔
1279
      return "exec-all-tasks";
33,326✔
1280
    default:
×
UNCOV
1281
      return "invalid-exec-type";
×
1282
  }
1283
}
1284

1285
int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) {
22,176✔
1286
  *pRefId = taosMemoryMalloc(sizeof(int64_t));
22,176✔
1287
  if (*pRefId != NULL) {
22,176!
1288
    **pRefId = pTask->id.refId;
22,176✔
1289
    int32_t code = metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
22,176✔
1290
    if (code != 0) {
22,176!
UNCOV
1291
      stError("s-task:%s failed to add refId:%" PRId64 " into refId-mgmt, code:%s", pTask->id.idStr, pTask->id.refId,
×
1292
              tstrerror(code));
1293
    }
1294
    return code;
22,176✔
1295
  } else {
1296
    stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno));
×
UNCOV
1297
    return terrno;
×
1298
  }
1299
}
1300

1301
void streamTaskFreeRefId(int64_t* pRefId) {
20,120✔
1302
  if (pRefId == NULL) {
20,120✔
1303
    return;
1,448✔
1304
  }
1305

1306
  metaRefMgtRemove(pRefId);
18,672✔
1307
}
1308

1309

1310
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
68,512✔
1311
  int32_t code = 0;
68,512✔
1312
  int32_t lino;
1313

1314
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
68,512!
1315
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
137,028!
1316
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
137,028!
1317
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
137,028!
1318
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
137,028!
1319
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
137,028!
1320
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
137,028!
1321
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
137,028!
1322

1323
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
137,028!
1324
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
137,028!
1325

1326
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
137,028!
1327
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
137,028!
1328
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
68,514!
1329
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
68,510!
1330

1331
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
137,022!
1332
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
137,022!
1333
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
137,022!
1334

1335
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
137,022!
1336
  int32_t taskId = pTask->hTaskInfo.id.taskId;
68,511✔
1337
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
68,511!
1338

1339
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
137,022!
1340
  taskId = pTask->streamTaskId.taskId;
68,511✔
1341
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
68,511!
1342

1343
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
137,022!
1344
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
137,022!
1345
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
137,022!
1346
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
137,022!
1347

1348
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
68,511✔
1349
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
68,508!
1350
  for (int32_t i = 0; i < epSz; i++) {
166,034✔
1351
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
97,527✔
1352
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
97,526!
1353
  }
1354

1355
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
68,507✔
1356
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
73,056!
1357
  }
1358

1359
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
68,507✔
1360
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
66,218!
1361
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
66,218!
1362
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
66,218!
1363
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
35,398✔
1364
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
820!
1365
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
34,988!
UNCOV
1366
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
1367
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
34,988✔
1368
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
11,672!
1369
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
11,672!
1370
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
5,836!
1371
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
29,152✔
1372
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
29,104!
1373
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
58,208!
1374
  }
1375
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
137,012!
1376
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
137,012!
1377
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
137,012!
1378

1379
  tEndEncode(pEncoder);
68,506✔
1380
_exit:
68,511✔
1381
  return code;
68,511✔
1382
}
1383

1384
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
22,940✔
1385
  int32_t taskId = 0;
22,940✔
1386
  int32_t code = 0;
22,940✔
1387
  int32_t lino;
1388

1389
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
22,940!
1390
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
45,881!
1391
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
22,941!
UNCOV
1392
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
1393
  }
1394

1395
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
45,882!
1396
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
45,882!
1397
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
45,882!
1398
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
45,882!
1399
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
45,881!
1400
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
45,880!
1401

1402
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
45,880!
1403
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
45,880!
1404

1405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
45,880!
1406
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
45,880!
1407
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
22,940!
1408
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
22,940!
1409

1410
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
45,880!
1411
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
45,880!
1412
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
45,880!
1413

1414
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
45,880!
1415
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
22,940!
1416
  pTask->hTaskInfo.id.taskId = taskId;
22,940✔
1417

1418
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
45,880!
1419
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
22,940!
1420
  pTask->streamTaskId.taskId = taskId;
22,940✔
1421

1422
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
45,880!
1423
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
45,880!
1424
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
45,880!
1425
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
45,880!
1426

1427
  int32_t epSz = -1;
22,940✔
1428
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
22,941!
1429

1430
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
22,941!
UNCOV
1431
    TAOS_CHECK_EXIT(terrno);
×
1432
  }
1433
  for (int32_t i = 0; i < epSz; i++) {
55,081✔
1434
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
32,142✔
1435
    if (pInfo == NULL) {
32,138!
UNCOV
1436
      TAOS_CHECK_EXIT(terrno);
×
1437
    }
1438
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
32,138!
UNCOV
1439
      taosMemoryFreeClear(pInfo);
×
UNCOV
1440
      goto _exit;
×
1441
    }
1442
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
64,283!
UNCOV
1443
      TAOS_CHECK_EXIT(terrno);
×
1444
    }
1445
  }
1446

1447
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
22,939✔
1448
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
24,348!
1449
  }
1450

1451
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
22,939✔
1452
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
22,184!
1453
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
11,092!
1454
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
11,091✔
1455
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
11,092!
UNCOV
1456
      TAOS_CHECK_EXIT(terrno);
×
1457
    }
1458
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
22,162!
1459
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
11,847✔
1460
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
286!
1461
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
11,704!
UNCOV
1462
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
1463
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
11,704✔
1464
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
3,674!
1465
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
3,674!
1466
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
1,837!
1467
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
9,867✔
1468
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
9,849!
1469
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
9,849!
1470
  }
1471
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
45,857!
1472
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
22,940!
1473
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
45,880!
1474
  }
1475
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
22,940!
1476

1477
  tEndDecode(pDecoder);
22,941✔
1478

1479
_exit:
22,941✔
1480
  return code;
22,941✔
1481
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc