• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.05
/source/libs/stream/src/streamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "osDir.h"
18
#include "osMemory.h"
19
#include "streamInt.h"
20
#include "streamMsg.h"
21
#include "streamsm.h"
22
#include "tmisce.h"
23
#include "tstream.h"
24
#include "ttimer.h"
25
#include "wal.h"
26

27
static void    streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
28
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
29
static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
30
static void    streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
31

32
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
14,051✔
33
  int32_t childId = taosArrayGetSize(pArray);
14,051✔
34
  pTask->info.selfChildId = childId;
14,051✔
35
  void* p = taosArrayPush(pArray, &pTask);
14,051✔
36
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
14,051!
37
}
38

39
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
363✔
40
  int32_t code = 0;
363✔
41
  char    buf[512] = {0};
363✔
42

43
  if (pTask->info.nodeId == nodeId) {  // execution task should be moved away
363✔
44
    bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
120✔
45
    code = epsetToStr(pEpSet, buf, tListLen(buf));
120✔
46
    if (code) {  // print error and continue
120!
47
      stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
48
      return code;
×
49
    }
50

51
    if (!isEqual) {
120✔
52
      (*pUpdated) = true;
60✔
53
      char tmp[512] = {0};
60✔
54
      code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));  // only for log file, ignore errors
60✔
55
      if (code) {                                                 // print error and continue
60!
56
        stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
57
        return code;
×
58
      }
59

60
      epsetAssign(&pTask->info.epSet, pEpSet);
60✔
61
      stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
60!
62
    } else {
63
      stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", pTask->id.taskId, nodeId, buf);
60!
64
    }
65
  }
66

67
  // check for the dispatch info and the upstream task info
68
  int32_t level = pTask->info.taskLevel;
363✔
69
  if (level == TASK_LEVEL__SOURCE) {
363✔
70
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
173✔
71
  } else if (level == TASK_LEVEL__AGG) {
190✔
72
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
17✔
73
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
17✔
74
  } else {  // TASK_LEVEL__SINK
75
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
173✔
76
  }
77

78
  return code;
363✔
79
}
80

81
static void freeItem(void* p) {
×
82
  SStreamContinueExecInfo* pInfo = p;
×
83
  rpcFreeCont(pInfo->msg.pCont);
×
84
}
×
85

86
static void freeUpstreamItem(void* p) {
88,460✔
87
  SStreamUpstreamEpInfo** pInfo = p;
88,460✔
88
  taosMemoryFree(*pInfo);
88,460!
89
}
88,456✔
90

91
static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
20,188✔
92
  SStreamUpstreamEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamUpstreamEpInfo));
20,188!
93
  if (pEpInfo == NULL) {
20,188!
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
95
    return NULL;
×
96
  }
97

98
  pEpInfo->childId = pTask->info.selfChildId;
20,188✔
99
  pEpInfo->epSet = pTask->info.epSet;
20,188✔
100
  pEpInfo->nodeId = pTask->info.nodeId;
20,188✔
101
  pEpInfo->taskId = pTask->id.taskId;
20,188✔
102
  pEpInfo->stage = -1;
20,188✔
103
  pEpInfo->lastMsgId = -1;
20,188✔
104

105
  return pEpInfo;
20,188✔
106
}
107

108
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, EStreamTaskType type, int32_t trigger,
14,051✔
109
                       int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
110
                       int8_t hasAggTasks, SStreamTask** p) {
111
  *p = NULL;
14,051✔
112

113
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
14,051!
114
  if (pTask == NULL) {
14,051!
UNCOV
115
    stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId,
×
116
            (int32_t)sizeof(SStreamTask), tstrerror(terrno));
UNCOV
117
    return terrno;
×
118
  }
119

120
  pTask->ver = SSTREAM_TASK_VER;
14,051✔
121
  pTask->id.taskId = tGenIdPI32();
14,051✔
122
  pTask->id.streamId = streamId;
14,051✔
123

124
  pTask->info.taskLevel = taskLevel;
14,051✔
125
  pTask->info.fillHistory = type;
14,051✔
126
  pTask->info.trigger = trigger;
14,051✔
127
  pTask->info.hasAggTasks = hasAggTasks;
14,051✔
128
  pTask->info.delaySchedParam = triggerParam;
14,051✔
129
  pTask->subtableWithoutMd5 = subtableWithoutMd5;
14,051✔
130

131
  int32_t code = streamCreateStateMachine(pTask);
14,051✔
132
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
14,051!
UNCOV
133
    taosMemoryFreeClear(pTask);
×
UNCOV
134
    return code;
×
135
  }
136

137
  char    buf[128] = {0};
14,051✔
138
  int32_t ret = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
14,051✔
139
  if (ret < 0 || ret >= tListLen(buf)) {
14,051!
UNCOV
140
    stError("s-task:0x%x failed to set the taskIdstr, code: out of buffer", pTask->id.taskId);
×
UNCOV
141
    return TSDB_CODE_OUT_OF_BUFFER;
×
142
  }
143

144
  pTask->id.idStr = taosStrdup(buf);
14,051!
145
  if (pTask->id.idStr == NULL) {
14,051!
UNCOV
146
    stError("s-task:0x%x failed to build task id, code: out of memory", pTask->id.taskId);
×
UNCOV
147
    return terrno;
×
148
  }
149

150
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
14,051✔
151
  pTask->status.taskStatus =
14,051✔
152
      (pTask->info.fillHistory == STREAM_HISTORY_TASK) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
14,051✔
153
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
14,051✔
154
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
14,051✔
155

156
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
14,051✔
157

158
  if ((pTask->info.fillHistory == STREAM_HISTORY_TASK) && !hasFillhistory) {
14,051!
UNCOV
159
    stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId);
×
UNCOV
160
    return TSDB_CODE_INVALID_PARA;
×
161
  }
162

163
  epsetAssign(&(pTask->info.mnodeEpset), pEpset);
14,051✔
164

165
  code = addToTaskset(pTaskList, pTask);
14,051✔
166
  *p = pTask;
14,051✔
167

168
  return code;
14,051✔
169
}
170

171
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
651✔
172
  int64_t skip64;
173
  int8_t  skip8;
174
  int32_t skip32;
175
  int16_t skip16;
176
  SEpSet  epSet;
177

178
  if (tStartDecode(pDecoder) < 0) return -1;
651!
179
  if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
1,306✔
180
  // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
181

182
  if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
605!
183
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
605!
184
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
605!
185
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
603!
186
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
603!
187
  if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
603!
188

189
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
604!
190
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
604!
191

192
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
604!
193
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
604!
194
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
604!
195
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
605!
196

197
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
1,210!
198
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
1,210!
199

200
  tEndDecode(pDecoder);
605✔
201
  return 0;
605✔
202
}
203

204
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
12✔
205
  int64_t ver;
206
  if (tStartDecode(pDecoder) < 0) return -1;
12!
207
  if (tDecodeI64(pDecoder, &ver) < 0) return -1;
12!
208
  if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
12!
209

210
  if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
24!
211

212
  int32_t taskId = 0;
12✔
213
  if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
12!
214

215
  pTaskId->taskId = taskId;
12✔
216
  tEndDecode(pDecoder);
12✔
217
  return 0;
12✔
218
}
219

220
void tFreeStreamTask(void* pParam) {
63,762✔
221
  char*        p = NULL;
63,762✔
222
  SStreamTask* pTask = pParam;
63,762✔
223
  int32_t      taskId = pTask->id.taskId;
63,762✔
224

225
  STaskExecStatisInfo* pStatis = &pTask->execInfo;
63,762✔
226

227
  ETaskStatus status1 = TASK_STATUS__UNINIT;
63,762✔
228
  if (pTask->status.pSM != NULL) {
63,762✔
229
    streamMutexLock(&pTask->lock);
28,793✔
230
    SStreamTaskState status = streamTaskGetStatus(pTask);
28,793✔
231
    p = status.name;
28,780✔
232
    status1 = status.state;
28,780✔
233
    streamMutexUnlock(&pTask->lock);
28,780✔
234
  }
235

236
  stDebug("start to free s-task:0x%x %p, state:%s, refId:%" PRId64, taskId, pTask, p, pTask->id.refId);
63,766✔
237

238
  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
63,766✔
239
  stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
63,766✔
240
          ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
241
          " nextProcessVer:%" PRId64 ", checkpointCount:%d",
242
          taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs,
243
          pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
244

245
  if (pTask->schedInfo.pDelayTimer != NULL) {
63,766✔
246
    streamTmrStop(pTask->schedInfo.pDelayTimer);
1,303✔
247
    pTask->schedInfo.pDelayTimer = NULL;
1,303✔
248
  }
249

250
  if (pTask->hTaskInfo.pTimer != NULL) {
63,766✔
251
    streamTmrStop(pTask->hTaskInfo.pTimer);
2,049✔
252
    pTask->hTaskInfo.pTimer = NULL;
2,043✔
253
  }
254

255
  if (pTask->msgInfo.pRetryTmr != NULL) {
63,760✔
256
    streamTmrStop(pTask->msgInfo.pRetryTmr);
5,562✔
257
    pTask->msgInfo.pRetryTmr = NULL;
5,560✔
258
  }
259

260
  if (pTask->inputq.queue) {
63,758✔
261
    streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
14,712✔
262
    pTask->inputq.queue = NULL;
14,712✔
263
  }
264

265
  if (pTask->outputq.queue) {
63,758✔
266
    streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
14,716✔
267
    pTask->outputq.queue = NULL;
14,722✔
268
  }
269

270
  if (pTask->exec.qmsg) {
63,764✔
271
    taosMemoryFree(pTask->exec.qmsg);
33,556!
272
  }
273

274
  if (pTask->exec.pExecutor) {
63,763✔
275
    qDestroyTask(pTask->exec.pExecutor);
7,490✔
276
    pTask->exec.pExecutor = NULL;
7,491✔
277
  }
278

279
  if (pTask->exec.pWalReader != NULL) {
63,764✔
280
    walCloseReader(pTask->exec.pWalReader);
7,395✔
281
    pTask->exec.pWalReader = NULL;
7,395✔
282
  }
283

284
  streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
63,764✔
285

286
  if (pTask->msgInfo.pData != NULL) {
63,775✔
287
    clearBufferedDispatchMsg(pTask);
23✔
288
  }
289

290
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
63,774✔
291
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
31,588!
292
    taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
31,599!
293
    tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo);
31,592✔
294
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
31,607✔
295
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
32,186✔
296
    taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
27,640✔
297
  } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
4,546!
NEW
298
    taosArrayDestroy(pTask->outputInfo.vtableMapDispatcher.taskInfos);
×
NEW
299
    tSimpleHashCleanup(pTask->outputInfo.vtableMapDispatcher.vtableMap);
×
300
  }
301

302
  streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
63,788✔
303
  streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0);
63,792✔
304

305
  if (pTask->pNameMap) {
63,781✔
306
    tSimpleHashCleanup(pTask->pNameMap);
2,411✔
307
  }
308

309
  if (pTask->status.pSM != NULL) {
63,781✔
310
    streamMutexDestroy(&pTask->lock);
28,800✔
311
    streamMutexDestroy(&pTask->msgInfo.lock);
28,797✔
312
    streamMutexDestroy(&pTask->taskCheckInfo.checkInfoLock);
28,790✔
313
  }
314

315
  taosArrayDestroy(pTask->pVTables);
63,769✔
316
  pTask->pVTables = NULL;
63,771✔
317

318
  streamDestroyStateMachine(pTask->status.pSM);
63,771✔
319
  pTask->status.pSM = NULL;
63,793✔
320

321
  streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
63,793✔
322

323
  taosMemoryFree(pTask->outputInfo.pTokenBucket);
63,793!
324

325
  taosArrayDestroy(pTask->msgInfo.pSendInfo);
63,791✔
326
  pTask->msgInfo.pSendInfo = NULL;
63,794✔
327

328
  taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
63,794✔
329
  pTask->outputInfo.pNodeEpsetUpdateList = NULL;
63,789✔
330

331
  if (pTask->id.idStr != NULL) {
63,789✔
332
    taosMemoryFree((void*)pTask->id.idStr);
28,766!
333
  }
334

335
  streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
63,791✔
336
  pTask->chkInfo.pActiveInfo = NULL;
63,790✔
337

338
  taosArrayDestroyP(pTask->notifyInfo.pNotifyAddrUrls, NULL);
63,790✔
339
  taosMemoryFreeClear(pTask->notifyInfo.streamName);
63,792!
340
  taosMemoryFreeClear(pTask->notifyInfo.stbFullName);
63,792!
341
  tDeleteSchemaWrapper(pTask->notifyInfo.pSchemaWrapper);
63,792!
342

343
  pTask->notifyEventStat = (STaskNotifyEventStat){0};
63,782✔
344

345
  taosMemoryFree(pTask);
63,782!
346
  stDebug("s-task:0x%x free task completed", taskId);
63,787✔
347
}
63,787✔
348

349
void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
63,791✔
350
  stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId);
63,791✔
351
  if (pTask->pState != NULL) {
63,785✔
352
    stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
7,496✔
353
    streamStateClose(pTask->pState, remove);
7,496✔
354

355
    if (remove) taskDbSetClearFileFlag(pTask->pBackend);
7,495✔
356
    taskDbRemoveRef(pTask->pBackend);
7,495✔
357
    pTask->pBackend = NULL;
7,495✔
358
    pTask->pState = NULL;
7,495✔
359

360
  } else {
361
    stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
56,289✔
362
            pTask->backendPath ? pTask->backendPath : "NULL");
363
    if (remove) {
56,289✔
364
      if (pTask->backendPath != NULL) {
3,491!
365
        stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
3,495✔
366
        taosRemoveDir(pTask->backendPath);
3,495✔
367
      }
368
    }
369
  }
370

371
  if (pTask->backendPath != NULL) {
63,775✔
372
    taosMemoryFree(pTask->backendPath);
14,709!
373
    pTask->backendPath = NULL;
14,714✔
374
  }
375
  // clear recal backend
376

377
  if (pTask->pRecalState != NULL) {
63,780✔
378
    stDebug("s-task:0x%x start to free recal task state", pTask->id.taskId);
27!
379
    streamStateClose(pTask->pRecalState, remove);
27✔
380

381
    if (remove) taskDbSetClearFileFlag(pTask->pRecalBackend);
27!
382
    taskDbRemoveRef(pTask->pRecalBackend);
27✔
383
    pTask->pRecalBackend = NULL;
27✔
384
    pTask->pRecalState = NULL;
27✔
385

386
  }  // else {
387
     //  stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
388
     //          pTask->backendPath ? pTask->backendPath : "NULL");
389
     //  if (remove) {
390
     //    if (pTask->backendPath != NULL) {
391
     //      stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
392
     //      taosRemoveDir(pTask->backendPath);
393
     //    }
394
     //  }
395
  //}
396
  // if (pTask->backendPath != NULL) {
397
  //   taosMemoryFree(pTask->backendPath);
398
  //   pTask->backendPath = NULL;
399
  // }
400
}
63,780✔
401

402
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
14,697✔
403
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
14,697✔
404
  SDataRange*      pRange = &pTask->dataRange;
14,697✔
405

406
  // only set the version info for stream tasks without fill-history task
407
  if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
14,697✔
408
    pChkInfo->checkpointVer = ver - 1;  // only update when generating checkpoint
4,874✔
409
    pChkInfo->processedVer = ver - 1;   // already processed version
4,874✔
410
    pChkInfo->nextProcessVer = ver;     // next processed version
4,874✔
411

412
    pRange->range.maxVer = ver;
4,874✔
413
    pRange->range.minVer = ver;
4,874✔
414
  } else {
415
    // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
416
    // is set at the mnode.
417
    if (pTask->info.fillHistory == 1) {
9,823✔
418
      pChkInfo->checkpointVer = pRange->range.maxVer;
4,882✔
419
      pChkInfo->processedVer = pRange->range.maxVer;
4,882✔
420
      pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
4,882✔
421
    } else {
422
      pChkInfo->checkpointVer = pRange->range.minVer - 1;
4,941✔
423
      pChkInfo->processedVer = pRange->range.minVer - 1;
4,941✔
424
      pChkInfo->nextProcessVer = pRange->range.minVer;
4,941✔
425

426
      {  // for compatible purpose, remove it later
427
        if (pRange->range.minVer == 0) {
4,941✔
428
          pChkInfo->checkpointVer = 0;
2,493✔
429
          pChkInfo->processedVer = 0;
2,493✔
430
          pChkInfo->nextProcessVer = 1;
2,493✔
431
          stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
2,493✔
432
        }
433
      }
434
    }
435
  }
436
}
14,697✔
437

438
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
14,708✔
439
  int64_t streamId = 0;
14,708✔
440
  int32_t taskId = 0;
14,708✔
441

442
  if (pTask->info.fillHistory) {
14,708✔
443
    streamId = pTask->streamTaskId.streamId;
4,920✔
444
    taskId = pTask->streamTaskId.taskId;
4,920✔
445
  } else {
446
    streamId = pTask->id.streamId;
9,788✔
447
    taskId = pTask->id.taskId;
9,788✔
448
  }
449

450
  char    id[128] = {0};
14,708✔
451
  int32_t nBytes = snprintf(id, tListLen(id), "0x%" PRIx64 "-0x%x", streamId, taskId);
14,708✔
452
  if (nBytes < 0 || nBytes >= sizeof(id)) {
14,708!
UNCOV
453
    return TSDB_CODE_OUT_OF_BUFFER;
×
454
  }
455

456
  int32_t len = strlen(pTask->pMeta->path);
14,718✔
457
  pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
14,718!
458
  if (pTask->backendPath == NULL) {
14,722!
UNCOV
459
    return terrno;
×
460
  }
461

462
  int32_t code = snprintf(pTask->backendPath, len + nBytes + 2, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
14,722✔
463
  if (code < 0 || code >= len + nBytes + 2) {
14,722!
464
    stError("s-task:%s failed to set backend path:%s, code: out of buffer", pTask->id.idStr, pTask->backendPath);
8!
465
    return TSDB_CODE_OUT_OF_BUFFER;
×
466
  } else {
467
    stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
14,714✔
468
    return 0;
14,702✔
469
  }
470
}
471

472
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
14,700✔
473
  int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
14,700✔
474
  if (code) {
14,665!
UNCOV
475
    stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
×
UNCOV
476
    return code;
×
477
  }
478

479
  pTask->id.refId = 0;
14,665✔
480
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
14,665✔
481
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
14,665✔
482

483
  int32_t code1 = streamQueueOpen(512 << 10, &pTask->inputq.queue);
14,665✔
484
  int32_t code2 = streamQueueOpen(512 << 10, &pTask->outputq.queue);
14,680✔
485
  if (code1 || code2) {
14,710!
UNCOV
486
    stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
×
UNCOV
487
    return TSDB_CODE_OUT_OF_MEMORY;
×
488
  }
489

490
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
14,710✔
491

492
  code = streamCreateStateMachine(pTask);
14,710✔
493
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
14,706!
UNCOV
494
    stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
×
495
            tstrerror(code));
496
    return code;
×
497
  }
498

499
  pTask->execInfo.created = taosGetTimestampMs();
14,705✔
500
  setInitialVersionInfo(pTask, ver);
14,705✔
501

502
  pTask->pMeta = pMeta;
14,660✔
503
  pTask->pMsgCb = pMsgCb;
14,660✔
504
  pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry));
14,660✔
505
  if (pTask->msgInfo.pSendInfo == NULL) {
14,690!
UNCOV
506
    stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr);
×
507
    return terrno;
×
508
  }
509

510
  code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
14,690✔
511
  if (code) {
14,694!
UNCOV
512
    stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code));
×
UNCOV
513
    return code;
×
514
  }
515

516
  TdThreadMutexAttr attr = {0};
14,694✔
517
  code = taosThreadMutexAttrInit(&attr);
14,694✔
518
  if (code != 0) {
14,686!
UNCOV
519
    stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
520
    return code;
×
521
  }
522

523
  code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
14,686✔
524
  if (code != 0) {
14,645!
UNCOV
525
    stError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
526
    return code;
×
527
  }
528

529
  code = taosThreadMutexInit(&pTask->lock, &attr);
14,645✔
530
  if (code) {
14,663!
UNCOV
531
    return code;
×
532
  }
533

534
  code = taosThreadMutexAttrDestroy(&attr);
14,663✔
535
  if (code) {
14,651!
UNCOV
536
    return code;
×
537
  }
538

539
  streamTaskOpenAllUpstreamInput(pTask);
14,651✔
540

541
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
14,653✔
542
  pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
14,653!
543
  if (pOutputInfo->pTokenBucket == NULL) {
14,669!
UNCOV
544
    stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno));
×
UNCOV
545
    return terrno;
×
546
  }
547

548
  // 2MiB per second for sink task
549
  // 50 times sink operator per second
550
  code = streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
14,669✔
551
  if (code) {
14,663!
UNCOV
552
    return code;
×
553
  }
554

555
  pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
14,663✔
556
  if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
14,696!
UNCOV
557
    stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno));
×
UNCOV
558
    return terrno;
×
559
  }
560

561
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
14,696✔
562
  if (pTask->taskCheckInfo.pList == NULL) {
14,699!
UNCOV
563
    stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno));
×
UNCOV
564
    return terrno;
×
565
  }
566

567
  code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
14,699✔
568
  if (code) {
14,692!
UNCOV
569
    return code;
×
570
  }
571

572
  if (pTask->chkInfo.pActiveInfo == NULL) {
14,692!
573
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
14,698✔
574
    if (code) {
14,710!
575
      stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
576
      return code;
×
577
    }
578
  }
579

580
  return streamTaskSetBackendPath(pTask);
14,704✔
581
}
582

583
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
122,332✔
584
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
122,332✔
585
    return 0;
6,838✔
586
  }
587

588
  int32_t type = pTask->outputInfo.type;
115,494✔
589
  if (type == TASK_OUTPUT__TABLE) {
115,494✔
590
    return 0;
271✔
591
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
115,223✔
592
    return 1;
10,712✔
593
  } else if (type == TASK_OUTPUT__VTABLE_MAP) {
104,511!
NEW
594
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
NEW
595
    return taosArrayGetSize(pTaskInfos);
×
596
  } else {
597
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
104,511✔
598
    return taosArrayGetSize(vgInfo);
104,511✔
599
  }
600
}
601

602
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) { return taosArrayGetSize(pTask->upstreamInfo.pList); }
20,432✔
603

604
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
20,188✔
605
  SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
20,188✔
606
  if (pEpInfo == NULL) {
20,188!
UNCOV
607
    return terrno;
×
608
  }
609

610
  if (pTask->upstreamInfo.pList == NULL) {
20,188✔
611
    pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES);
6,986✔
612
  }
613

614
  void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
20,188✔
615
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
20,188!
616
}
617

618
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
190✔
619
  int32_t code = 0;
190✔
620
  char    buf[512] = {0};
190✔
621
  code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore error since it is only for log file.
190✔
622
  if (code != 0) {                                // print error and continue
190!
UNCOV
623
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
624
    return code;
×
625
  }
626

627
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
190✔
628
  for (int32_t i = 0; i < numOfUpstream; ++i) {
326✔
629
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
243✔
630
    if (pInfo->nodeId == nodeId) {
243✔
631
      bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
107✔
632
      if (!equal) {
107✔
633
        *pUpdated = true;
24✔
634

635
        char tmp[512] = {0};
24✔
636
        code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
24✔
637
        if (code != 0) {  // print error and continue
24!
UNCOV
638
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
639
          return code;
×
640
        }
641

642
        epsetAssign(&pInfo->epSet, pEpSet);
24✔
643
        stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
24!
644
                pInfo->taskId, nodeId, buf, tmp);
645
      } else {
646
        stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId,
83!
647
                pInfo->taskId, nodeId, buf);
648
      }
649

650
      break;
107✔
651
    }
652
  }
653

654
  return code;
190✔
655
}
656

657
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
63,779✔
658
  if (pUpstreamInfo->pList != NULL) {
63,779✔
659
    taosArrayDestroyEx(pUpstreamInfo->pList, freeUpstreamItem);
56,678✔
660
    pUpstreamInfo->numOfClosed = 0;
56,687✔
661
    pUpstreamInfo->pList = NULL;
56,687✔
662
  }
663
}
63,788✔
664

665
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
862✔
666
  STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
862✔
667
  pDispatcher->taskId = pDownstreamTask->id.taskId;
862✔
668
  pDispatcher->nodeId = pDownstreamTask->info.nodeId;
862✔
669
  pDispatcher->epSet = pDownstreamTask->info.epSet;
862✔
670

671
  pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
862✔
672
  pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
862✔
673
}
862✔
674

675
int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
190✔
676
  char    buf[512] = {0};
190✔
677
  int32_t code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore the error since only for log files.
190✔
678
  if (code != 0) {                                        // print error and continue
190!
UNCOV
679
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
680
    return code;
×
681
  }
682

683
  int32_t id = pTask->id.taskId;
190✔
684
  int8_t  type = pTask->outputInfo.type;
190✔
685

686
  if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
190✔
687
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
105✔
688

689
    for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
160✔
690
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
158✔
691
      if (pVgInfo == NULL) {
158!
UNCOV
692
        continue;
×
693
      }
694

695
      if (pVgInfo->vgId == nodeId) {
158✔
696
        bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
103✔
697
        if (!isEqual) {
103✔
698
          *pUpdated = true;
24✔
699

700
          char tmp[512] = {0};
24✔
701
          code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
24✔
702
          if (code != 0) {  // print error and continue
24!
703
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
704
            return code;
×
705
          }
706

707
          epsetAssign(&pVgInfo->epSet, pEpSet);
24✔
708
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
24!
709
                  nodeId, buf, tmp);
710
        } else {
711
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
79!
712
                  pVgInfo->taskId, nodeId, buf);
713
        }
714
        break;
103✔
715
      }
716
    }
717
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
85!
718
    STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
85✔
719
    if (pDispatcher->nodeId == nodeId) {
85✔
720
      bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet);
4✔
721
      if (!equal) {
4!
722
        *pUpdated = true;
×
723

UNCOV
724
        char tmp[512] = {0};
×
UNCOV
725
        code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
×
UNCOV
726
        if (code != 0) {  // print error and continue
×
UNCOV
727
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
728
          return code;
×
729
        }
730

UNCOV
731
        epsetAssign(&pDispatcher->epSet, pEpSet);
×
UNCOV
732
        stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
×
733
                nodeId, buf, tmp);
734
      } else {
735
        stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
4!
736
                pDispatcher->taskId, nodeId, buf);
737
      }
738
    }
NEW
739
  } else if (type == TASK_OUTPUT__VTABLE_MAP) {
×
NEW
740
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
741

NEW
742
    for (int32_t i = 0; i < taosArrayGetSize(pTaskInfos); ++i) {
×
NEW
743
      STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
NEW
744
      if (pAddr == NULL) {
×
NEW
745
        continue;
×
746
      }
747

NEW
748
      if (pAddr->nodeId == nodeId) {
×
NEW
749
        bool isEqual = isEpsetEqual(&pAddr->epSet, pEpSet);
×
NEW
750
        if (!isEqual) {
×
NEW
751
          *pUpdated = true;
×
752

NEW
753
          char tmp[512] = {0};
×
NEW
754
          code = epsetToStr(&pAddr->epSet, tmp, tListLen(tmp));
×
NEW
755
          if (code != 0) {  // print error and continue
×
NEW
756
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
NEW
757
            return code;
×
758
          }
759

NEW
760
          epsetAssign(&pAddr->epSet, pEpSet);
×
NEW
761
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pAddr->taskId,
×
762
                  nodeId, buf, tmp);
763
        } else {
NEW
764
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
×
765
                  pAddr->taskId, nodeId, buf);
766
        }
NEW
767
        break;
×
768
      }
769
    }
770
  }
771

772
  return code;
190✔
773
}
774

775
int32_t streamTaskStop(SStreamTask* pTask) {
2,985✔
776
  int32_t     vgId = pTask->pMeta->vgId;
2,985✔
777
  int64_t     st = taosGetTimestampMs();
2,985✔
778
  const char* id = pTask->id.idStr;
2,985✔
779

780
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
2,985✔
781
  if (code) {
2,985!
UNCOV
782
    stError("failed to handle STOP event, s-task:%s, code:%s", id, tstrerror(code));
×
UNCOV
783
    return code;
×
784
  }
785

786
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
2,985✔
787
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000);
1,569✔
788
    if (code != TSDB_CODE_SUCCESS) {
1,568!
UNCOV
789
      stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
×
790
    }
791
  }
792

793
  while (!streamTaskIsIdle(pTask)) {
2,984!
UNCOV
794
    stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
×
795
            pTask->info.taskLevel);
UNCOV
796
    taosMsleep(100);
×
797
  }
798

799
  int64_t el = taosGetTimestampMs() - st;
2,985✔
800
  stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
2,985✔
801
  return code;
2,985✔
802
}
803

804
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
173✔
805
  STaskExecStatisInfo* p = &pTask->execInfo;
173✔
806

807
  int32_t numOfNodes = taosArrayGetSize(pNodeList);
173✔
808
  int64_t prevTs = p->latestUpdateTs;
173✔
809

810
  p->latestUpdateTs = taosGetTimestampMs();
173✔
811
  p->updateCount += 1;
173✔
812
  stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
173!
813
          numOfNodes, p->updateCount, prevTs);
814

815
  bool updated = false;
173✔
816
  for (int32_t i = 0; i < numOfNodes; ++i) {
536✔
817
    SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
363✔
818
    if (pInfo == NULL) {
363!
UNCOV
819
      continue;
×
820
    }
821

822
    int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
363✔
823
    if (code) {
363!
UNCOV
824
      stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
×
825
    }
826
  }
827

828
  return updated;
173✔
829
}
830

831
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
14,718✔
832
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
14,718✔
833
    return;
7,395✔
834
  }
835

836
  int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList);
7,323✔
837
  for (int32_t i = 0; i < size; ++i) {
28,548✔
838
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
21,226✔
839
    pInfo->stage = -1;
21,227✔
840
  }
841

842
  stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
7,322✔
843
}
844

845
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
24,923✔
846
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
24,923✔
847
  if (num == 0) {
24,926✔
848
    return;
12,458✔
849
  }
850

851
  for (int32_t i = 0; i < num; ++i) {
47,793✔
852
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
35,360✔
853
    pInfo->dataAllowed = true;
35,325✔
854
  }
855

856
  pTask->upstreamInfo.numOfClosed = 0;
12,433✔
857
  stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num);
12,433✔
858
}
859

860
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
8,601✔
861
  SStreamUpstreamEpInfo* pInfo = NULL;
8,601✔
862
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
8,601✔
863

864
  if ((pInfo != NULL) && pInfo->dataAllowed) {
8,600!
865
    pInfo->dataAllowed = false;
8,600✔
866
    if (pTask->upstreamInfo.numOfClosed < streamTaskGetNumOfUpstream(pTask)) {
8,600!
867
      int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
8,601✔
868
    } else {
UNCOV
869
      stError("s-task:%s not inc closed input, since they have been all closed already", pTask->id.idStr);
×
870
    }
871
  }
872
}
8,600✔
873

874
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
1✔
875
  SStreamUpstreamEpInfo* pInfo = NULL;
1✔
876
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
1✔
877

878
  if (pInfo != NULL && (!pInfo->dataAllowed)) {
1!
879
    int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
1✔
880
    stDebug("s-task:%s open inputQ for upstream:0x%x, remain closed:%d", pTask->id.idStr, taskId, t);
1!
881
    pInfo->dataAllowed = true;
1✔
882
  }
883
}
1✔
884

UNCOV
885
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
×
UNCOV
886
  return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
×
887
}
888

889
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
102,543✔
890
  bool ret = false;
102,543✔
891

892
  streamMutexLock(&pTask->lock);
102,543✔
893
  if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
102,545✔
894
    pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
68,440✔
895
    ret = true;
68,440✔
896
  }
897

898
  streamMutexUnlock(&pTask->lock);
102,545✔
899
  return ret;
102,547✔
900
}
901

902
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
67,074✔
903
  streamMutexLock(&pTask->lock);
67,074✔
904
  int8_t status = pTask->status.schedStatus;
67,110✔
905
  if (status == TASK_SCHED_STATUS__WAITING) {
67,110✔
906
    pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
67,093✔
907
  }
908
  streamMutexUnlock(&pTask->lock);
67,110✔
909

910
  return status;
67,109✔
911
}
912

913
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
1,317✔
914
  streamMutexLock(&pTask->lock);
1,317✔
915
  int8_t status = pTask->status.schedStatus;
1,317✔
916
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
1,317✔
917
  streamMutexUnlock(&pTask->lock);
1,317✔
918

919
  return status;
1,317✔
920
}
921

922
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
6,982✔
923
  int32_t      code = 0;
6,982✔
924
  SStreamMeta* pMeta = pTask->pMeta;
6,982✔
925
  SStreamTask* pStreamTask = NULL;
6,982✔
926

927
  if (pTask->info.fillHistory == 0) {
6,982!
928
    return code;
6,987✔
929
  }
930

UNCOV
931
  code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
×
UNCOV
932
  if (code == 0) {
×
UNCOV
933
    stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
×
934
            (int32_t)pTask->streamTaskId.taskId);
935

UNCOV
936
    streamMutexLock(&(pStreamTask->lock));
×
UNCOV
937
    CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
×
938

UNCOV
939
    if (resetRelHalt) {
×
940
      stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
×
941
              pTask->streamTaskId.taskId, streamTaskGetStatusStr(pStreamTask->status.taskStatus),
942
              streamTaskGetStatus(pStreamTask).name);
UNCOV
943
      pStreamTask->status.taskStatus = TASK_STATUS__READY;
×
944
    }
945

UNCOV
946
    code = streamMetaSaveTaskInMeta(pMeta, pStreamTask);
×
UNCOV
947
    streamMutexUnlock(&(pStreamTask->lock));
×
948

UNCOV
949
    streamMetaReleaseTask(pMeta, pStreamTask);
×
950
  }
951

UNCOV
952
  return code;
×
953
}
954

UNCOV
955
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
×
UNCOV
956
  SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
×
UNCOV
957
  if (pReq == NULL) {
×
UNCOV
958
    return terrno;
×
959
  }
960

UNCOV
961
  pReq->head.vgId = vgId;
×
UNCOV
962
  pReq->taskId = pTaskId->taskId;
×
UNCOV
963
  pReq->streamId = pTaskId->streamId;
×
UNCOV
964
  pReq->resetRelHalt = resetRelHalt;
×
965

966
  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
×
967
  int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
×
UNCOV
968
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
969
    stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
×
970
  } else {
UNCOV
971
    stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
×
972
  }
973

974
  return code;
×
975
}
976

977
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
6,499✔
978
  int32_t                code = 0;
6,499✔
979
  int32_t                tlen = 0;
6,499✔
980
  int32_t                vgId = pTask->pMeta->vgId;
6,499✔
981
  const char*            id = pTask->id.idStr;
6,499✔
982
  SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo;
6,499✔
983

984
  SCheckpointReport req = {.streamId = pTask->id.streamId,
6,499✔
985
                           .taskId = pTask->id.taskId,
6,499✔
986
                           .nodeId = vgId,
987
                           .dropHTask = dropRelHTask,
988
                           .transId = pActive->transId,
6,499✔
989
                           .checkpointId = pActive->activeId,
6,499✔
990
                           .checkpointVer = pCheckpointInfo->processedVer,
6,499✔
991
                           .checkpointTs = pCheckpointInfo->startTs};
6,499✔
992

993
  tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code);
6,499!
994
  if (code < 0) {
6,499!
UNCOV
995
    stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code));
×
UNCOV
996
    return -1;
×
997
  }
998

999
  void* buf = rpcMallocCont(tlen);
6,499✔
1000
  if (buf == NULL) {
6,499!
UNCOV
1001
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId,
×
1002
            tstrerror(TSDB_CODE_OUT_OF_MEMORY));
UNCOV
1003
    return -1;
×
1004
  }
1005

1006
  SEncoder encoder;
1007
  tEncoderInit(&encoder, buf, tlen);
6,499✔
1008
  if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
6,499!
UNCOV
1009
    rpcFreeCont(buf);
×
UNCOV
1010
    tEncoderClear(&encoder);
×
UNCOV
1011
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
×
UNCOV
1012
    return -1;
×
1013
  }
1014
  tEncoderClear(&encoder);
6,499✔
1015

1016
  SRpcMsg msg = {0};
6,499✔
1017
  initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen);
6,499✔
1018
  stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId);
6,499✔
1019

1020
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
6,499✔
1021
}
1022

1023
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
87,880✔
1024
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
87,880✔
1025
  return id;
87,880✔
1026
}
1027

1028
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
2,051✔
1029
  pInfo->waitInterval = LAUNCH_HTASK_INTERVAL;
2,051✔
1030
  pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL);
2,051✔
1031
  pInfo->retryTimes = 0;
2,051✔
1032
}
2,051✔
1033

1034
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
2,052✔
1035
  pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
2,052✔
1036
  pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
2,052✔
1037
  pInfo->retryTimes += 1;
2,052✔
1038
}
2,052✔
1039

1040
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) {
9,407✔
1041
  pEntry->id.streamId = pTask->id.streamId;
9,407✔
1042
  pEntry->id.taskId = pTask->id.taskId;
9,407✔
1043
  pEntry->stage = -1;
9,407✔
1044
  pEntry->nodeId = pTask->info.nodeId;
9,407✔
1045
  pEntry->status = TASK_STATUS__STOP;
9,407✔
1046
}
9,407✔
1047

1048
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) {
70,264✔
1049
  pDst->stage = pSrc->stage;
70,264✔
1050
  pDst->inputQUsed = pSrc->inputQUsed;
70,264✔
1051
  pDst->inputRate = pSrc->inputRate;
70,264✔
1052
  pDst->procsTotal = pSrc->procsTotal;
70,264✔
1053
  pDst->procsThroughput = pSrc->procsThroughput;
70,264✔
1054
  pDst->outputTotal = pSrc->outputTotal;
70,264✔
1055
  pDst->outputThroughput = pSrc->outputThroughput;
70,264✔
1056
  pDst->processedVer = pSrc->processedVer;
70,264✔
1057
  pDst->verRange = pSrc->verRange;
70,264✔
1058
  pDst->sinkQuota = pSrc->sinkQuota;
70,264✔
1059
  pDst->sinkDataSize = pSrc->sinkDataSize;
70,264✔
1060
  pDst->checkpointInfo = pSrc->checkpointInfo;
70,264✔
1061
  pDst->startCheckpointId = pSrc->startCheckpointId;
70,264✔
1062
  pDst->startCheckpointVer = pSrc->startCheckpointVer;
70,264✔
1063
  pDst->status = pSrc->status;
70,264✔
1064

1065
  pDst->startTime = pSrc->startTime;
70,264✔
1066
  pDst->hTaskId = pSrc->hTaskId;
70,264✔
1067
  pDst->notifyEventStat = pSrc->notifyEventStat;
70,264✔
1068
}
70,264✔
1069

1070
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
71,670✔
1071
  SStreamMeta*         pMeta = pTask->pMeta;
71,670✔
1072
  STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
71,670✔
1073

1074
  STaskStatusEntry entry = {
215,010✔
1075
      .id = streamTaskGetTaskId(pTask),
71,670✔
1076
      .status = streamTaskGetStatus(pTask).state,
71,670✔
1077
      .nodeId = pMeta->vgId,
71,670✔
1078
      .stage = pMeta->stage,
71,670✔
1079

1080
      .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)),
71,670✔
1081
      .startTime = pExecInfo->readyTs,
71,670✔
1082
      .checkpointInfo.latestId = pTask->chkInfo.checkpointId,
71,670✔
1083
      .checkpointInfo.latestVer = pTask->chkInfo.checkpointVer,
71,670✔
1084
      .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
71,670✔
1085
      .checkpointInfo.latestSize = 0,
1086
      .checkpointInfo.remoteBackup = 0,
1087
      .checkpointInfo.consensusChkptId = 0,
1088
      .checkpointInfo.consensusTs = 0,
1089
      .hTaskId = pTask->hTaskInfo.id.taskId,
71,670✔
1090
      .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
71,670✔
1091
      .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
71,670✔
1092
      .procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput),
71,670✔
1093
      .outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput),
71,670✔
1094
      .startCheckpointId = pExecInfo->startCheckpointId,
71,670✔
1095
      .startCheckpointVer = pExecInfo->startCheckpointVer,
71,670✔
1096
      .notifyEventStat = pTask->notifyEventStat,
1097
  };
1098
  return entry;
71,670✔
1099
}
1100

1101
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
1,419✔
1102
  SStreamMeta* pMeta = pTask->pMeta;
1,419✔
1103
  int32_t      code = 0;
1,419✔
1104

1105
  int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
1,419✔
1106
  stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
1,416✔
1107

1108
  // in case of fill-history task, stop the tsdb file scan operation.
1109
  if (pTask->info.fillHistory == 1) {
1,421✔
1110
    void* pExecutor = pTask->exec.pExecutor;
60✔
1111
    code = qKillTask(pExecutor, TSDB_CODE_SUCCESS, 10000);
60✔
1112
  }
1113

1114
  stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
1,421✔
1115
  return code;
1,420✔
1116
}
1117

1118
void streamTaskPause(SStreamTask* pTask) {
1,471✔
1119
  int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
1,471✔
1120
  if (code) {
1,473!
UNCOV
1121
    stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code));
×
1122
  }
1123
}
1,473✔
1124

1125
void streamTaskResume(SStreamTask* pTask) {
2,675✔
1126
  SStreamTaskState prevState = streamTaskGetStatus(pTask);
2,675✔
1127

1128
  SStreamMeta* pMeta = pTask->pMeta;
2,676✔
1129
  int32_t      code = streamTaskRestoreStatus(pTask);
2,676✔
1130
  if (code == TSDB_CODE_SUCCESS) {
2,682✔
1131
    char*   pNew = streamTaskGetStatus(pTask).name;
1,401✔
1132
    int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
1,401✔
1133
    stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
1,403!
1134
  } else {
1135
    stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name,
1,281✔
1136
           pMeta->numOfPausedTasks);
1137
  }
1138
}
2,687✔
1139

1140
bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
74,910✔
1141

1142
// this task must success
1143
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
4,403✔
1144
  int32_t     code;
1145
  int32_t     tlen = 0;
4,403✔
1146
  int32_t     vgId = pTask->pMeta->vgId;
4,403✔
1147
  const char* id = pTask->id.idStr;
4,403✔
1148

1149
  SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
4,403✔
1150
  tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
4,403!
1151
  if (code < 0) {
4,404!
UNCOV
1152
    stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
×
UNCOV
1153
    return TSDB_CODE_INVALID_MSG;
×
1154
  }
1155

1156
  void* buf = rpcMallocCont(tlen);
4,404✔
1157
  if (buf == NULL) {
4,408!
1158
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:Out of memory", id, vgId);
×
UNCOV
1159
    return terrno;
×
1160
  }
1161

1162
  SEncoder encoder;
1163
  tEncoderInit(&encoder, buf, tlen);
4,408✔
1164
  if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
4,409!
UNCOV
1165
    rpcFreeCont(buf);
×
UNCOV
1166
    tEncoderClear(&encoder);
×
1167
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
×
UNCOV
1168
    return code;
×
1169
  }
1170

1171
  tEncoderClear(&encoder);
4,406✔
1172

1173
  SRpcMsg msg = {0};
4,411✔
1174
  initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
4,411✔
1175
  stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
4,405✔
1176

1177
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
4,405✔
1178
}
1179

1180
void streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo) {
100,605✔
1181
  *pEpInfo = NULL;
100,605✔
1182

1183
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
100,605✔
1184
  for (int32_t i = 0; i < num; ++i) {
202,142!
1185
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
202,144✔
1186
    if (pInfo == NULL) {
202,143!
UNCOV
1187
      return;
×
1188
    }
1189

1190
    if (pInfo->taskId == taskId) {
202,143✔
1191
      *pEpInfo = pInfo;
100,605✔
1192
      return;
100,605✔
1193
    }
1194
  }
1195

1196
  stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
×
1197
}
1198

UNCOV
1199
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
×
UNCOV
1200
  if (pTask->info.taskLevel == TASK_OUTPUT__FIXED_DISPATCH) {
×
UNCOV
1201
    if (pTask->outputInfo.fixedDispatcher.taskId == taskId) {
×
1202
      return &pTask->outputInfo.fixedDispatcher.epSet;
×
1203
    }
UNCOV
1204
  } else if (pTask->info.taskLevel == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
UNCOV
1205
    SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
UNCOV
1206
    for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
UNCOV
1207
      SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
×
UNCOV
1208
      if (pVgInfo == NULL) {
×
UNCOV
1209
        continue;
×
1210
      }
1211

UNCOV
1212
      if (pVgInfo->taskId == taskId) {
×
UNCOV
1213
        return &pVgInfo->epSet;
×
1214
      }
1215
    }
NEW
1216
  } else if (pTask->info.taskLevel == TASK_OUTPUT__VTABLE_MAP) {
×
NEW
1217
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
NEW
1218
    for (int32_t i = 0; i < taosArrayGetSize(pTaskInfos); ++i) {
×
NEW
1219
      STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
NEW
1220
      if (pAddr == NULL) {
×
NEW
1221
        continue;
×
1222
      }
1223

NEW
1224
      if (pAddr->taskId == taskId) {
×
NEW
1225
        return &pAddr->epSet;
×
1226
      }
1227
    }
1228
  }
1229

UNCOV
1230
  return NULL;
×
1231
}
1232

1233
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
14,700✔
1234
  char    buf[128] = {0};
14,700✔
1235
  int32_t code = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", streamId, taskId);
14,700✔
1236
  if (code < 0 || code >= tListLen(buf)) {
14,700!
1237
    return TSDB_CODE_OUT_OF_BUFFER;
×
1238
  }
1239

1240
  *pId = taosStrdup(buf);
14,706!
1241

1242
  if (*pId == NULL) {
14,664!
UNCOV
1243
    return terrno;
×
1244
  } else {
1245
    return TSDB_CODE_SUCCESS;
14,664✔
1246
  }
1247
}
1248

1249
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
528✔
1250
  int32_t           code;
1251
  SStreamDataBlock* pData;
1252

1253
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock), (void**)&pData);
528✔
1254
  if (code) {
528!
UNCOV
1255
    stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
×
UNCOV
1256
    return terrno = code;
×
1257
  }
1258

1259
  pData->type = STREAM_INPUT__DATA_RETRIEVE;
528✔
1260
  pData->srcVgId = 0;
528✔
1261

1262
  code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
528✔
1263
  if (code != TSDB_CODE_SUCCESS) {
528!
1264
    stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code));
×
1265
    taosFreeQitem(pData);
×
UNCOV
1266
    return code;
×
1267
  }
1268

1269
  code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
528✔
1270
  if (code != TSDB_CODE_SUCCESS) {
528!
UNCOV
1271
    stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
×
1272
            pTask->id.idStr);
1273
  }
1274

1275
  return code;
528✔
1276
}
1277

1278
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
528✔
1279
  int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
528✔
1280
  if (code != 0) {
528!
UNCOV
1281
    return code;
×
1282
  }
1283
  return streamTrySchedExec(pTask, false);
528✔
1284
}
1285

1286
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
6,995✔
1287

UNCOV
1288
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
×
1289
  if (pTransId != NULL) {
×
UNCOV
1290
    *pTransId = pTask->chkInfo.pActiveInfo->transId;
×
1291
  }
1292

UNCOV
1293
  if (pCheckpointId != NULL) {
×
UNCOV
1294
    *pCheckpointId = pTask->chkInfo.pActiveInfo->activeId;
×
1295
  }
UNCOV
1296
}
×
1297

1298
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) {
28✔
1299
  pTask->chkInfo.pActiveInfo->activeId = activeCheckpointId;
28✔
1300
  return TSDB_CODE_SUCCESS;
28✔
1301
}
1302

UNCOV
1303
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
×
UNCOV
1304
  pTask->chkInfo.pActiveInfo->transId = transId;
×
UNCOV
1305
  pTask->chkInfo.pActiveInfo->activeId = checkpointId;
×
UNCOV
1306
  pTask->chkInfo.pActiveInfo->failedId = checkpointId;
×
UNCOV
1307
  stDebug("s-task:%s set failed checkpointId:%" PRId64, pTask->id.idStr, checkpointId);
×
UNCOV
1308
}
×
1309

1310
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
14,743✔
1311
  SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo));
14,743!
1312
  if (pInfo == NULL) {
14,736!
UNCOV
1313
    return terrno;
×
1314
  }
1315

1316
  int32_t code = taosThreadMutexInit(&pInfo->lock, NULL);
14,736✔
1317
  if (code != TSDB_CODE_SUCCESS) {
14,747!
UNCOV
1318
    return code;
×
1319
  }
1320

1321
  pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
14,747✔
1322
  pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
14,752✔
1323
  pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
14,755✔
1324

1325
  *pRes = pInfo;
14,754✔
1326
  return code;
14,754✔
1327
}
1328

1329
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
63,788✔
1330
  if (pInfo == NULL) {
63,788✔
1331
    return;
49,031✔
1332
  }
1333

1334
  streamMutexDestroy(&pInfo->lock);
14,757✔
1335
  taosArrayDestroy(pInfo->pDispatchTriggerList);
14,756✔
1336
  pInfo->pDispatchTriggerList = NULL;
14,762✔
1337
  taosArrayDestroy(pInfo->pReadyMsgList);
14,762✔
1338
  pInfo->pReadyMsgList = NULL;
14,761✔
1339
  taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
14,761✔
1340
  pInfo->pCheckpointReadyRecvList = NULL;
14,760✔
1341

1342
  SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
14,760✔
1343
  if (pTriggerTmr->tmrHandle != NULL) {
14,760✔
1344
    streamTmrStop(pTriggerTmr->tmrHandle);
2,235✔
1345
    pTriggerTmr->tmrHandle = NULL;
2,235✔
1346
  }
1347

1348
  SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
14,760✔
1349
  if (pReadyTmr->tmrHandle != NULL) {
14,760✔
1350
    streamTmrStop(pReadyTmr->tmrHandle);
2,197✔
1351
    pReadyTmr->tmrHandle = NULL;
2,197✔
1352
  }
1353

1354
  taosMemoryFree(pInfo);
14,760!
1355
}
1356

1357
// NOTE: clear the checkpoint id, and keep the failed id
1358
// failedId for a task will increase as the checkpoint I.D. increases.
1359
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
5,726✔
1360
  pInfo->activeId = 0;
5,726✔
1361
  pInfo->transId = 0;
5,726✔
1362
  pInfo->allUpstreamTriggerRecv = 0;
5,726✔
1363
  pInfo->dispatchTrigger = false;
5,726✔
1364

1365
  taosArrayClear(pInfo->pDispatchTriggerList);
5,726✔
1366
  taosArrayClear(pInfo->pCheckpointReadyRecvList);
5,702✔
1367
}
5,701✔
1368

1369
const char* streamTaskGetExecType(int32_t type) {
132,708✔
1370
  switch (type) {
132,708!
1371
    case STREAM_EXEC_T_EXTRACT_WAL_DATA:
71,949✔
1372
      return "scan-wal-file";
71,949✔
1373
    case STREAM_EXEC_T_START_ALL_TASKS:
7,703✔
1374
      return "start-all-tasks";
7,703✔
1375
    case STREAM_EXEC_T_START_ONE_TASK:
5,917✔
1376
      return "start-one-task";
5,917✔
1377
    case STREAM_EXEC_T_RESTART_ALL_TASKS:
28✔
1378
      return "restart-all-tasks";
28✔
1379
    case STREAM_EXEC_T_STOP_ALL_TASKS:
4,584✔
1380
      return "stop-all-tasks";
4,584✔
1381
    case STREAM_EXEC_T_RESUME_TASK:
6,276✔
1382
      return "resume-task-from-idle";
6,276✔
1383
    case STREAM_EXEC_T_ADD_FAILED_TASK:
1✔
1384
      return "record-start-failed-task";
1✔
UNCOV
1385
    case STREAM_EXEC_T_STOP_ONE_TASK:
×
UNCOV
1386
      return "stop-one-task";
×
1387
    case 0:
36,361✔
1388
      return "exec-all-tasks";
36,361✔
UNCOV
1389
    default:
×
UNCOV
1390
      return "invalid-exec-type";
×
1391
  }
1392
}
1393

1394
int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) {
38,813✔
1395
  *pRefId = taosMemoryMalloc(sizeof(int64_t));
38,813!
1396
  if (*pRefId != NULL) {
38,814!
1397
    **pRefId = pTask->id.refId;
38,814✔
1398
    int32_t code = metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
38,814✔
1399
    if (code != 0) {
38,817!
1400
      stError("s-task:%s failed to add refId:%" PRId64 " into refId-mgmt, code:%s", pTask->id.idStr, pTask->id.refId,
×
1401
              tstrerror(code));
1402
    }
1403
    return code;
38,817✔
1404
  } else {
1405
    stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno));
×
1406
    return terrno;
×
1407
  }
1408
}
1409

1410
void streamTaskFreeRefId(int64_t* pRefId) {
36,312✔
1411
  if (pRefId == NULL) {
36,312✔
1412
    return;
2,395✔
1413
  }
1414

1415
  metaRefMgtRemove(pRefId);
33,917✔
1416
}
1417

1418
static int32_t tEncodeStreamNotifyInfo(SEncoder* pEncoder, const SNotifyInfo* info) {
143,798✔
1419
  int32_t code = TSDB_CODE_SUCCESS;
143,798✔
1420
  int32_t lino = 0;
143,798✔
1421

1422
  QUERY_CHECK_NULL(pEncoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
143,798!
1423
  QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
143,798!
1424

1425
  int32_t addrSize = taosArrayGetSize(info->pNotifyAddrUrls);
143,798✔
1426
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
143,799!
1427
  for (int32_t i = 0; i < addrSize; ++i) {
143,799!
UNCOV
1428
    const char* url = taosArrayGetP(info->pNotifyAddrUrls, i);
×
1429
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, url));
×
1430
  }
1431
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyEventTypes));
287,598!
1432
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyErrorHandle));
287,598!
1433
  if (addrSize > 0) {
143,799!
1434
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->streamName));
×
1435
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->stbFullName));
×
UNCOV
1436
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, info->pSchemaWrapper));
×
1437
  }
1438

1439
_exit:
143,799✔
1440
  if (code != TSDB_CODE_SUCCESS) {
143,799!
1441
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1442
  }
1443
  return code;
143,804✔
1444
}
1445

1446
static int32_t tDecodeStreamNotifyInfo(SDecoder* pDecoder, SNotifyInfo* info) {
49,689✔
1447
  int32_t code = TSDB_CODE_SUCCESS;
49,689✔
1448
  int32_t lino = 0;
49,689✔
1449

1450
  QUERY_CHECK_NULL(pDecoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
49,689!
1451
  QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
49,689!
1452

1453
  int32_t addrSize = 0;
49,689✔
1454
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
49,678!
1455
  info->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
49,678✔
1456
  QUERY_CHECK_NULL(info->pNotifyAddrUrls, code, lino, _exit, terrno);
49,701✔
1457
  for (int32_t i = 0; i < addrSize; ++i) {
49,700!
UNCOV
1458
    char* url = NULL;
×
UNCOV
1459
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
×
UNCOV
1460
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
×
UNCOV
1461
    QUERY_CHECK_NULL(url, code, lino, _exit, terrno);
×
UNCOV
1462
    if (taosArrayPush(info->pNotifyAddrUrls, &url) == NULL) {
×
UNCOV
1463
      taosMemoryFree(url);
×
UNCOV
1464
      TAOS_CHECK_EXIT(terrno);
×
1465
    }
1466
  }
1467
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyEventTypes));
99,389!
1468
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyErrorHandle));
99,379!
1469
  if (addrSize > 0) {
49,690!
UNCOV
1470
    char* name = NULL;
×
UNCOV
1471
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
×
UNCOV
1472
    info->streamName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
×
UNCOV
1473
    QUERY_CHECK_NULL(info->streamName, code, lino, _exit, terrno);
×
UNCOV
1474
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
×
UNCOV
1475
    info->stbFullName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
×
UNCOV
1476
    QUERY_CHECK_NULL(info->stbFullName, code, lino, _exit, terrno);
×
UNCOV
1477
    info->pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
×
UNCOV
1478
    if (info->pSchemaWrapper == NULL) {
×
UNCOV
1479
      TAOS_CHECK_EXIT(terrno);
×
1480
    }
UNCOV
1481
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, info->pSchemaWrapper));
×
1482
  }
1483

1484
_exit:
49,690✔
1485
  if (code != TSDB_CODE_SUCCESS) {
49,690!
UNCOV
1486
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1487
  }
1488
  return code;
49,687✔
1489
}
1490

1491
int32_t tEncodeVTablesInfo(SEncoder* pEncoder, SArray* pVTables) {
143,801✔
1492
  int32_t code = TSDB_CODE_SUCCESS;
143,801✔
1493
  int32_t lino = 0;
143,801✔
1494
  SVCTableMergeInfo* pMergeInfo = NULL;
143,801✔
1495
  int32_t mergeNum = taosArrayGetSize(pVTables);
143,801✔
1496
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, mergeNum));
143,807!
1497
  for (int32_t i = 0; i < mergeNum; ++i) {
143,807!
NEW
1498
    pMergeInfo = (SVCTableMergeInfo*)taosArrayGet(pVTables, i);
×
NEW
1499
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMergeInfo->uid));
×
NEW
1500
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMergeInfo->numOfSrcTbls));
×
1501
  }
1502

1503
_exit:
143,807✔
1504
  if (code != TSDB_CODE_SUCCESS) {
143,807!
NEW
1505
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1506
  }
1507
  return code;
143,800✔
1508
}
1509

1510
static int32_t tDecodeVTablesInfo(SDecoder* pDecoder, SArray** pTables) {
49,689✔
1511
  int32_t code = TSDB_CODE_SUCCESS;
49,689✔
1512
  int32_t lino = 0;
49,689✔
1513

1514
  int32_t mergeNum = 0;
49,689✔
1515
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &mergeNum));
49,683!
1516
  if (mergeNum > 0) {
49,683!
NEW
1517
    *pTables = taosArrayInit(mergeNum, sizeof(SVCTableMergeInfo));
×
NEW
1518
    QUERY_CHECK_NULL(*pTables, code, lino, _exit, terrno);
×
1519
  }
1520

1521
  SVCTableMergeInfo mergeInfo;
1522
  for (int32_t i = 0; i < mergeNum; ++i) {
49,680!
NEW
1523
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &mergeInfo.uid));
×
NEW
1524
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &mergeInfo.numOfSrcTbls));
×
NEW
1525
    if (taosArrayPush(*pTables, &mergeInfo) == NULL) {
×
NEW
1526
      TAOS_CHECK_EXIT(terrno);
×
1527
    }
1528
  }
1529

1530
_exit:
49,680✔
1531
  if (code != TSDB_CODE_SUCCESS) {
49,680!
NEW
1532
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1533
  }
1534
  return code;
49,677✔
1535
}
1536

NEW
1537
int32_t tSerializeDispatcherTaskInfo(SEncoder* pEncoder, const SArray* pTaskInfos) {
×
NEW
1538
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
1539
  int32_t lino = 0;
×
NEW
1540
  int32_t nTasks = taosArrayGetSize(pTaskInfos);
×
NEW
1541
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, nTasks));
×
NEW
1542
  for (int32_t i = 0; i < nTasks; ++i) {
×
NEW
1543
    STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
NEW
1544
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pAddr->taskId));
×
NEW
1545
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pAddr->nodeId));
×
NEW
1546
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pAddr->epSet));
×
1547
  }
1548

NEW
1549
_exit:
×
NEW
1550
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
1551
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1552
  }
NEW
1553
  return code;
×
1554
}
1555

NEW
1556
int32_t tSerializeDispatcherVtableMap(SEncoder* pEncoder, const SSHashObj* pVtables) {
×
NEW
1557
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
1558
  int32_t lino = 0;
×
NEW
1559
  int32_t tbNum = tSimpleHashGetSize(pVtables);
×
NEW
1560
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, tbNum));
×
NEW
1561
  int32_t iter = 0;
×
NEW
1562
  void*   p = tSimpleHashIterate(pVtables, NULL, &iter);
×
NEW
1563
  while (p != NULL) {
×
NEW
1564
    int64_t* pUid = tSimpleHashGetKey(p, NULL);
×
NEW
1565
    int32_t* pIdx = p;
×
NEW
1566
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, *pUid));
×
NEW
1567
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pIdx));
×
NEW
1568
    p = tSimpleHashIterate(pVtables, p, &iter);
×
1569
  }
1570

NEW
1571
_exit:
×
NEW
1572
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
1573
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1574
  }
NEW
1575
  return code;
×
1576
}
1577

NEW
1578
int32_t tDeserializeDispatcherTaskInfo(SDecoder* pDecoder, SArray** ppTaskInfos) {
×
NEW
1579
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
1580
  int32_t lino = 0;
×
NEW
1581
  int32_t nTasks = 0;
×
NEW
1582
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &nTasks));
×
NEW
1583
  if (nTasks <= 0) {
×
NEW
1584
    return code;
×
1585
  }
1586

NEW
1587
  *ppTaskInfos = taosArrayInit(nTasks, sizeof(STaskDispatcherFixed));
×
NEW
1588
  QUERY_CHECK_NULL(*ppTaskInfos, code, lino, _exit, terrno);
×
1589

1590
  STaskDispatcherFixed addr;
NEW
1591
  for (int32_t i = 0; i < nTasks; ++i) {
×
NEW
1592
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addr.taskId));
×
NEW
1593
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addr.nodeId));
×
NEW
1594
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &addr.epSet));
×
NEW
1595
    void* px = taosArrayPush(*ppTaskInfos, &addr);
×
NEW
1596
    QUERY_CHECK_NULL(px, code, lino, _exit, terrno);
×
1597
  }
1598

NEW
1599
_exit:
×
NEW
1600
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
1601
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1602
  }
NEW
1603
  return code;
×
1604
}
1605

NEW
1606
int32_t tDeserializeDispatcherVtableMap(SDecoder* pDecoder, SSHashObj** ppVtables) {
×
NEW
1607
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
1608
  int32_t lino = 0;
×
NEW
1609
  int32_t tbNum = 0;
×
NEW
1610
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &tbNum));
×
NEW
1611
  if (tbNum <= 0) {
×
NEW
1612
    return code;
×
1613
  }
1614

NEW
1615
  *ppVtables = tSimpleHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
NEW
1616
  QUERY_CHECK_NULL(*ppVtables, code, lino, _exit, terrno);
×
1617

NEW
1618
  uint64_t uid = 0;
×
NEW
1619
  int32_t  idx = 0;
×
NEW
1620
  for (int32_t i = 0; i < tbNum; ++i) {
×
NEW
1621
    TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &uid));
×
NEW
1622
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &idx));
×
NEW
1623
    TAOS_CHECK_EXIT(tSimpleHashPut(*ppVtables, &uid, sizeof(uid), &idx, sizeof(idx)));
×
1624
  }
1625

NEW
1626
_exit:
×
1627

NEW
1628
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
1629
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1630
  }
NEW
1631
  return code;
×
1632
}
1633

1634
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
143,857✔
1635
  int32_t code = 0;
143,857✔
1636
  int32_t lino;
1637

1638
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
143,857!
1639
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
287,716!
1640
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
287,716!
1641
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
287,716!
1642
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
287,716!
1643
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
287,716!
1644
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
287,716!
1645
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
287,716!
1646

1647
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
287,716!
1648
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
287,716!
1649

1650
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
287,716!
1651
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
287,716!
1652
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
143,858!
1653
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
143,861!
1654

1655
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
287,732!
1656
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
287,732!
1657
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
287,732!
1658

1659
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
287,732!
1660
  int32_t taskId = pTask->hTaskInfo.id.taskId;
143,866✔
1661
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
143,866!
1662

1663
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
287,732!
1664
  taskId = pTask->streamTaskId.taskId;
143,866✔
1665
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
143,866!
1666

1667
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
287,732!
1668
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
287,732!
1669
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
287,732!
1670
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
287,732!
1671

1672
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
143,866✔
1673
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
143,847!
1674
  for (int32_t i = 0; i < epSz; i++) {
341,224✔
1675
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
197,364✔
1676
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
197,356!
1677
  }
1678

1679
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
143,860✔
1680
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
152,482!
1681
  }
1682

1683
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
143,860✔
1684
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
142,736!
1685
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
142,736!
1686
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
142,736!
1687
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
72,492!
UNCOV
1688
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
×
1689
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
72,492!
UNCOV
1690
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
1691
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
72,492✔
1692
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
21,976!
1693
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
21,976!
1694
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
10,988!
1695
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
61,504✔
1696
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
61,449!
1697
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
122,898!
1698
  } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
55!
NEW
1699
    TAOS_CHECK_EXIT(tSerializeDispatcherTaskInfo(pEncoder, pTask->outputInfo.vtableMapDispatcher.taskInfos));
×
NEW
1700
    TAOS_CHECK_EXIT(tSerializeDispatcherVtableMap(pEncoder, pTask->outputInfo.vtableMapDispatcher.vtableMap));
×
1701
  }
1702
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
287,718!
1703
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
287,718!
1704
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
287,718!
1705

1706
  if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
143,859✔
1707
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.hasAggTasks));
287,636✔
1708
    TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo));
143,796!
1709
    TAOS_CHECK_EXIT(tEncodeVTablesInfo(pEncoder, pTask->pVTables));
143,802!
1710
  }
1711

1712
  tEndEncode(pEncoder);
143,828✔
1713
_exit:
143,829✔
1714
  return code;
143,829✔
1715
}
1716

1717
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
49,699✔
1718
  int32_t taskId = 0;
49,699✔
1719
  int32_t code = 0;
49,699✔
1720
  int32_t lino;
1721

1722
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
49,699!
1723
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
99,414✔
1724
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
49,696!
UNCOV
1725
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
1726
  }
1727

1728
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
99,387!
1729
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
99,377!
1730
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
99,371!
1731
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
99,368!
1732
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
99,365!
1733
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
99,371!
1734

1735
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
99,375!
1736
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
99,372!
1737

1738
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
99,361!
1739
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
99,357!
1740
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
49,682!
1741
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
49,703!
1742

1743
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
99,406!
1744
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
99,404!
1745
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
99,402!
1746

1747
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
99,400!
1748
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
49,694!
1749
  pTask->hTaskInfo.id.taskId = taskId;
49,694✔
1750

1751
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
99,391!
1752
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
49,697!
1753
  pTask->streamTaskId.taskId = taskId;
49,697✔
1754

1755
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
99,394!
1756
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
99,388!
1757
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
99,384!
1758
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
99,388!
1759

1760
  int32_t epSz = -1;
49,695✔
1761
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
49,693!
1762

1763
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
49,693!
UNCOV
1764
    TAOS_CHECK_EXIT(terrno);
×
1765
  }
1766
  for (int32_t i = 0; i < epSz; i++) {
117,986✔
1767
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
68,283!
1768
    if (pInfo == NULL) {
68,261!
UNCOV
1769
      TAOS_CHECK_EXIT(terrno);
×
1770
    }
1771
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
68,261!
UNCOV
1772
      taosMemoryFreeClear(pInfo);
×
UNCOV
1773
      goto _exit;
×
1774
    }
1775
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
136,596!
UNCOV
1776
      TAOS_CHECK_EXIT(terrno);
×
1777
    }
1778
  }
1779

1780
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
49,703✔
1781
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
52,359!
1782
  }
1783

1784
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
49,702✔
1785
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
49,266!
1786
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
24,631!
1787
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
24,622!
1788
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
24,625!
UNCOV
1789
      TAOS_CHECK_EXIT(terrno);
×
1790
    }
1791
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
49,216!
1792
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
25,067!
UNCOV
1793
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
×
1794
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
25,067!
UNCOV
1795
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
1796
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
25,067✔
1797
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
7,262!
1798
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
7,262!
1799
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
3,631!
1800
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
21,436✔
1801
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
21,427!
1802
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
21,427!
1803
  } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
9!
NEW
1804
    TAOS_CHECK_EXIT(tDeserializeDispatcherTaskInfo(pDecoder, &pTask->outputInfo.vtableMapDispatcher.taskInfos));
×
NEW
1805
    TAOS_CHECK_EXIT(tDeserializeDispatcherVtableMap(pDecoder, &pTask->outputInfo.vtableMapDispatcher.vtableMap));
×
1806
  }
1807
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
99,350!
1808
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
49,692!
1809
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
99,387!
1810
  }
1811
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
49,689!
1812

1813
  if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
49,688!
1814
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.hasAggTasks));
99,379!
1815
    TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo));
49,690!
1816
    TAOS_CHECK_EXIT(tDecodeVTablesInfo(pDecoder, &pTask->pVTables));
49,687!
1817
  }
1818

1819
  tEndDecode(pDecoder);
49,671✔
1820

1821
_exit:
49,686✔
1822
  return code;
49,686✔
1823
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc