• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3849

18 Apr 2025 06:24AM UTC coverage: 62.513% (-0.1%) from 62.61%
#3849

push

travis-ci

GitHub
feat: taosBenchmark restore RESTFUL (3.0) (#30699)

154684 of 316122 branches covered (48.93%)

Branch coverage included in aggregate %.

240354 of 315808 relevant lines covered (76.11%)

10508512.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.77
/source/libs/stream/src/streamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "osDir.h"
18
#include "osMemory.h"
19
#include "streamInt.h"
20
#include "streamMsg.h"
21
#include "streamsm.h"
22
#include "tmisce.h"
23
#include "tstream.h"
24
#include "ttimer.h"
25
#include "wal.h"
26

27
static void    streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
28
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
29
static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
30
static void    streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
31

32
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
13,957✔
33
  int32_t childId = taosArrayGetSize(pArray);
13,957✔
34
  pTask->info.selfChildId = childId;
13,957✔
35
  void* p = taosArrayPush(pArray, &pTask);
13,957✔
36
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
13,957!
37
}
38

39
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
911✔
40
  int32_t code = 0;
911✔
41
  char    buf[512] = {0};
911✔
42

43
  if (pTask->info.nodeId == nodeId) {  // execution task should be moved away
911✔
44
    bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
370✔
45
    code = epsetToStr(pEpSet, buf, tListLen(buf));
370✔
46
    if (code) {  // print error and continue
370!
47
      stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
48
      return code;
×
49
    }
50

51
    if (!isEqual) {
370✔
52
      (*pUpdated) = true;
60✔
53
      char tmp[512] = {0};
60✔
54
      code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));  // only for log file, ignore errors
60✔
55
      if (code) {                                                 // print error and continue
60!
56
        stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
57
        return code;
×
58
      }
59

60
      epsetAssign(&pTask->info.epSet, pEpSet);
60✔
61
      stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
60!
62
    } else {
63
      stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", pTask->id.taskId, nodeId, buf);
310!
64
    }
65
  }
66

67
  // check for the dispatch info and the upstream task info
68
  int32_t level = pTask->info.taskLevel;
911✔
69
  if (level == TASK_LEVEL__SOURCE) {
911✔
70
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
446✔
71
  } else if (level == TASK_LEVEL__AGG) {
465✔
72
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
19✔
73
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
19✔
74
  } else {  // TASK_LEVEL__SINK
75
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
446✔
76
  }
77

78
  return code;
911✔
79
}
80

81
static void freeItem(void* p) {
×
82
  SStreamContinueExecInfo* pInfo = p;
×
83
  rpcFreeCont(pInfo->msg.pCont);
×
84
}
×
85

86
static void freeUpstreamItem(void* p) {
81,346✔
87
  SStreamUpstreamEpInfo** pInfo = p;
81,346✔
88
  taosMemoryFree(*pInfo);
81,346!
89
}
81,350✔
90

91
static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
20,092✔
92
  SStreamUpstreamEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamUpstreamEpInfo));
20,092!
93
  if (pEpInfo == NULL) {
20,092!
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
95
    return NULL;
×
96
  }
97

98
  pEpInfo->childId = pTask->info.selfChildId;
20,092✔
99
  pEpInfo->epSet = pTask->info.epSet;
20,092✔
100
  pEpInfo->nodeId = pTask->info.nodeId;
20,092✔
101
  pEpInfo->taskId = pTask->id.taskId;
20,092✔
102
  pEpInfo->dataAllowed = true;
20,092✔
103
  pEpInfo->stage = -1;
20,092✔
104
  pEpInfo->lastMsgId = -1;
20,092✔
105

106
  return pEpInfo;
20,092✔
107
}
108

109
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, EStreamTaskType type, int32_t trigger,
13,957✔
110
                       int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
111
                       int8_t hasAggTasks, SStreamTask** p) {
112
  *p = NULL;
13,957✔
113

114
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
13,957!
115
  if (pTask == NULL) {
13,957!
116
    stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId,
×
117
            (int32_t)sizeof(SStreamTask), tstrerror(terrno));
118
    return terrno;
×
119
  }
120

121
  pTask->ver = SSTREAM_TASK_VER;
13,957✔
122
  pTask->id.taskId = tGenIdPI32();
13,957✔
123
  pTask->id.streamId = streamId;
13,957✔
124

125
  pTask->info.taskLevel = taskLevel;
13,957✔
126
  pTask->info.fillHistory = type;
13,957✔
127
  pTask->info.trigger = trigger;
13,957✔
128
  pTask->info.hasAggTasks = hasAggTasks;
13,957✔
129
  pTask->info.delaySchedParam = triggerParam;
13,957✔
130
  pTask->subtableWithoutMd5 = subtableWithoutMd5;
13,957✔
131

132
  int32_t code = streamCreateStateMachine(pTask);
13,957✔
133
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
13,957!
134
    taosMemoryFreeClear(pTask);
×
135
    return code;
×
136
  }
137

138
  char    buf[128] = {0};
13,957✔
139
  int32_t ret = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
13,957✔
140
  if (ret < 0 || ret >= tListLen(buf)) {
13,957!
141
    stError("s-task:0x%x failed to set the taskIdstr, code: out of buffer", pTask->id.taskId);
×
142
    return TSDB_CODE_OUT_OF_BUFFER;
×
143
  }
144

145
  pTask->id.idStr = taosStrdup(buf);
13,957!
146
  if (pTask->id.idStr == NULL) {
13,957!
147
    stError("s-task:0x%x failed to build task id, code: out of memory", pTask->id.taskId);
×
148
    return terrno;
×
149
  }
150

151
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
13,957✔
152
  pTask->status.taskStatus =
13,957✔
153
      (pTask->info.fillHistory == STREAM_HISTORY_TASK) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
13,957✔
154
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
13,957✔
155
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
13,957✔
156

157
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
13,957✔
158

159
  if ((pTask->info.fillHistory == STREAM_HISTORY_TASK) && !hasFillhistory) {
13,957!
160
    stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId);
×
161
    return TSDB_CODE_INVALID_PARA;
×
162
  }
163

164
  epsetAssign(&(pTask->info.mnodeEpset), pEpset);
13,957✔
165

166
  code = addToTaskset(pTaskList, pTask);
13,957✔
167
  *p = pTask;
13,957✔
168

169
  return code;
13,957✔
170
}
171

172
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
700✔
173
  int64_t skip64;
174
  int8_t  skip8;
175
  int32_t skip32;
176
  int16_t skip16;
177
  SEpSet  epSet;
178

179
  if (tStartDecode(pDecoder) < 0) return -1;
700!
180
  if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
1,403✔
181
  // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
182

183
  if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
654!
184
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
654!
185
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
654!
186
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
654!
187
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
653!
188
  if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
653!
189

190
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
652!
191
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
653!
192

193
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
653!
194
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
653!
195
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
653!
196
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
654!
197

198
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
1,308!
199
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
1,308!
200

201
  tEndDecode(pDecoder);
654✔
202
  return 0;
654✔
203
}
204

205
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
12✔
206
  int64_t ver;
207
  if (tStartDecode(pDecoder) < 0) return -1;
12!
208
  if (tDecodeI64(pDecoder, &ver) < 0) return -1;
12!
209
  if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
12!
210

211
  if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
24!
212

213
  int32_t taskId = 0;
12✔
214
  if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
12!
215

216
  pTaskId->taskId = taskId;
12✔
217
  tEndDecode(pDecoder);
12✔
218
  return 0;
12✔
219
}
220

221
void tFreeStreamTask(void* pParam) {
56,051✔
222
  char*        p = NULL;
56,051✔
223
  SStreamTask* pTask = pParam;
56,051✔
224
  int32_t      taskId = pTask->id.taskId;
56,051✔
225

226
  STaskExecStatisInfo* pStatis = &pTask->execInfo;
56,051✔
227

228
  ETaskStatus status1 = TASK_STATUS__UNINIT;
56,051✔
229
  if (pTask->status.pSM != NULL) {
56,051✔
230
    streamMutexLock(&pTask->lock);
28,681✔
231
    SStreamTaskState status = streamTaskGetStatus(pTask);
28,685✔
232
    p = status.name;
28,676✔
233
    status1 = status.state;
28,676✔
234
    streamMutexUnlock(&pTask->lock);
28,676✔
235
  }
236

237
  stDebug("start to free s-task:0x%x %p, state:%s, refId:%" PRId64, taskId, pTask, p, pTask->id.refId);
56,051✔
238

239
  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
56,051✔
240
  stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
56,051✔
241
          ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
242
          " nextProcessVer:%" PRId64 ", checkpointCount:%d",
243
          taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs,
244
          pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
245

246
  if (pTask->schedInfo.pDelayTimer != NULL) {
56,051✔
247
    streamTmrStop(pTask->schedInfo.pDelayTimer);
1,339✔
248
    pTask->schedInfo.pDelayTimer = NULL;
1,339✔
249
  }
250

251
  if (pTask->hTaskInfo.pTimer != NULL) {
56,051✔
252
    streamTmrStop(pTask->hTaskInfo.pTimer);
2,326✔
253
    pTask->hTaskInfo.pTimer = NULL;
2,325✔
254
  }
255

256
  if (pTask->msgInfo.pRetryTmr != NULL) {
56,050✔
257
    streamTmrStop(pTask->msgInfo.pRetryTmr);
5,450✔
258
    pTask->msgInfo.pRetryTmr = NULL;
5,450✔
259
  }
260

261
  if (pTask->inputq.queue) {
56,050✔
262
    streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
14,699✔
263
    pTask->inputq.queue = NULL;
14,697✔
264
  }
265

266
  if (pTask->outputq.queue) {
56,048✔
267
    streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
14,695✔
268
    pTask->outputq.queue = NULL;
14,697✔
269
  }
270

271
  if (pTask->exec.qmsg) {
56,050✔
272
    taosMemoryFree(pTask->exec.qmsg);
29,239!
273
  }
274

275
  if (pTask->exec.pExecutor) {
56,050✔
276
    qDestroyTask(pTask->exec.pExecutor);
7,453✔
277
    pTask->exec.pExecutor = NULL;
7,453✔
278
  }
279

280
  if (pTask->exec.pWalReader != NULL) {
56,050✔
281
    walCloseReader(pTask->exec.pWalReader);
7,388✔
282
    pTask->exec.pWalReader = NULL;
7,388✔
283
  }
284

285
  streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
56,050✔
286

287
  if (pTask->msgInfo.pData != NULL) {
56,065✔
288
    clearBufferedDispatchMsg(pTask);
23✔
289
  }
290

291
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
56,063✔
292
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
27,789!
293
    taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
27,799!
294
    tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo);
27,795✔
295
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
27,800✔
296
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
28,274✔
297
    taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
24,890✔
298
  } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
3,384!
299
    taosArrayDestroy(pTask->outputInfo.vtableMapDispatcher.taskInfos);
×
300
    tSimpleHashCleanup(pTask->outputInfo.vtableMapDispatcher.vtableMap);
×
301
  }
302

303
  streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
56,072✔
304
  streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0);
56,075✔
305

306
  if (pTask->pNameMap) {
56,070✔
307
    tSimpleHashCleanup(pTask->pNameMap);
2,394✔
308
  }
309

310
  if (pTask->status.pSM != NULL) {
56,070✔
311
    streamMutexDestroy(&pTask->lock);
28,687✔
312
    streamMutexDestroy(&pTask->msgInfo.lock);
28,684✔
313
    streamMutexDestroy(&pTask->taskCheckInfo.checkInfoLock);
28,682✔
314
  }
315

316
  taosArrayDestroy(pTask->pVTables);
56,062✔
317
  pTask->pVTables = NULL;
56,066✔
318

319
  streamDestroyStateMachine(pTask->status.pSM);
56,066✔
320
  pTask->status.pSM = NULL;
56,075✔
321

322
  streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
56,075✔
323

324
  taosMemoryFree(pTask->outputInfo.pTokenBucket);
56,075!
325

326
  taosArrayDestroy(pTask->msgInfo.pSendInfo);
56,075✔
327
  pTask->msgInfo.pSendInfo = NULL;
56,074✔
328

329
  taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
56,074✔
330
  pTask->outputInfo.pNodeEpsetUpdateList = NULL;
56,072✔
331

332
  if (pTask->id.idStr != NULL) {
56,072✔
333
    taosMemoryFree((void*)pTask->id.idStr);
28,651!
334
  }
335

336
  streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
56,075✔
337
  pTask->chkInfo.pActiveInfo = NULL;
56,074✔
338

339
  taosArrayDestroyP(pTask->notifyInfo.pNotifyAddrUrls, NULL);
56,074✔
340
  taosMemoryFreeClear(pTask->notifyInfo.streamName);
56,076!
341
  taosMemoryFreeClear(pTask->notifyInfo.stbFullName);
56,076!
342
  tDeleteSchemaWrapper(pTask->notifyInfo.pSchemaWrapper);
56,076!
343

344
  pTask->notifyEventStat = (STaskNotifyEventStat){0};
56,069✔
345

346
  taosMemoryFree(pTask);
56,069!
347
  stDebug("s-task:0x%x free task completed", taskId);
56,073✔
348
}
56,074✔
349

350
void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
56,083✔
351
  stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId);
56,083✔
352
  if (pTask->pState != NULL) {
56,084✔
353
    stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
7,462✔
354
    streamStateClose(pTask->pState, remove);
7,462✔
355

356
    if (remove) taskDbSetClearFileFlag(pTask->pBackend);
7,462✔
357
    taskDbRemoveRef(pTask->pBackend);
7,463✔
358
    pTask->pBackend = NULL;
7,463✔
359
    pTask->pState = NULL;
7,463✔
360

361
  } else {
362
    stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
48,622✔
363
            pTask->backendPath ? pTask->backendPath : "NULL");
364
    if (remove) {
48,622✔
365
      if (pTask->backendPath != NULL) {
3,451!
366
        stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
3,453✔
367
        taosRemoveDir(pTask->backendPath);
3,453✔
368
      }
369
    }
370
  }
371

372
  if (pTask->backendPath != NULL) {
56,083✔
373
    taosMemoryFree(pTask->backendPath);
14,698!
374
    pTask->backendPath = NULL;
14,698✔
375
  }
376
  // clear recal backend
377

378
  if (pTask->pRecalState != NULL) {
56,083✔
379
    stDebug("s-task:0x%x start to free recal task state", pTask->id.taskId);
36!
380
    streamStateClose(pTask->pRecalState, remove);
36✔
381

382
    if (remove) taskDbSetClearFileFlag(pTask->pRecalBackend);
36!
383
    taskDbRemoveRef(pTask->pRecalBackend);
36✔
384
    pTask->pRecalBackend = NULL;
36✔
385
    pTask->pRecalState = NULL;
36✔
386

387
  }  // else {
388
     //  stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
389
     //          pTask->backendPath ? pTask->backendPath : "NULL");
390
     //  if (remove) {
391
     //    if (pTask->backendPath != NULL) {
392
     //      stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
393
     //      taosRemoveDir(pTask->backendPath);
394
     //    }
395
     //  }
396
  //}
397
  // if (pTask->backendPath != NULL) {
398
  //   taosMemoryFree(pTask->backendPath);
399
  //   pTask->backendPath = NULL;
400
  // }
401
}
56,083✔
402

403
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
14,688✔
404
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
14,688✔
405
  SDataRange*      pRange = &pTask->dataRange;
14,688✔
406

407
  // only set the version info for stream tasks without fill-history task
408
  if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
14,688✔
409
    pChkInfo->checkpointVer = ver - 1;  // only update when generating checkpoint
4,937✔
410
    pChkInfo->processedVer = ver - 1;   // already processed version
4,937✔
411
    pChkInfo->nextProcessVer = ver;     // next processed version
4,937✔
412

413
    pRange->range.maxVer = ver;
4,937✔
414
    pRange->range.minVer = ver;
4,937✔
415
  } else {
416
    // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
417
    // is set at the mnode.
418
    if (pTask->info.fillHistory == 1) {
9,751✔
419
      pChkInfo->checkpointVer = pRange->range.maxVer;
4,831✔
420
      pChkInfo->processedVer = pRange->range.maxVer;
4,831✔
421
      pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
4,831✔
422
    } else {
423
      pChkInfo->checkpointVer = pRange->range.minVer - 1;
4,920✔
424
      pChkInfo->processedVer = pRange->range.minVer - 1;
4,920✔
425
      pChkInfo->nextProcessVer = pRange->range.minVer;
4,920✔
426

427
      {  // for compatible purpose, remove it later
428
        if (pRange->range.minVer == 0) {
4,920✔
429
          pChkInfo->checkpointVer = 0;
2,497✔
430
          pChkInfo->processedVer = 0;
2,497✔
431
          pChkInfo->nextProcessVer = 1;
2,497✔
432
          stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
2,497✔
433
        }
434
      }
435
    }
436
  }
437
}
14,688✔
438

439
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
14,692✔
440
  int64_t streamId = 0;
14,692✔
441
  int32_t taskId = 0;
14,692✔
442

443
  if (pTask->info.fillHistory) {
14,692✔
444
    streamId = pTask->streamTaskId.streamId;
4,880✔
445
    taskId = pTask->streamTaskId.taskId;
4,880✔
446
  } else {
447
    streamId = pTask->id.streamId;
9,812✔
448
    taskId = pTask->id.taskId;
9,812✔
449
  }
450

451
  char    id[128] = {0};
14,692✔
452
  int32_t nBytes = snprintf(id, tListLen(id), "0x%" PRIx64 "-0x%x", streamId, taskId);
14,692✔
453
  if (nBytes < 0 || nBytes >= sizeof(id)) {
14,692!
454
    return TSDB_CODE_OUT_OF_BUFFER;
×
455
  }
456

457
  int32_t len = strlen(pTask->pMeta->path);
14,699✔
458
  pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
14,699!
459
  if (pTask->backendPath == NULL) {
14,700!
460
    return terrno;
×
461
  }
462

463
  int32_t code = snprintf(pTask->backendPath, len + nBytes + 2, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
14,700✔
464
  if (code < 0 || code >= len + nBytes + 2) {
14,700✔
465
    stError("s-task:%s failed to set backend path:%s, code: out of buffer", pTask->id.idStr, pTask->backendPath);
4!
466
    return TSDB_CODE_OUT_OF_BUFFER;
×
467
  } else {
468
    stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
14,696✔
469
    return 0;
14,691✔
470
  }
471
}
472

473
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
14,686✔
474
  int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
14,686✔
475
  if (code) {
14,693!
476
    stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
×
477
    return code;
×
478
  }
479

480
  pTask->id.refId = 0;
14,693✔
481
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
14,693✔
482
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
14,693✔
483

484
  int32_t code1 = streamQueueOpen(512 << 10, &pTask->inputq.queue);
14,693✔
485
  int32_t code2 = streamQueueOpen(512 << 10, &pTask->outputq.queue);
14,687✔
486
  if (code1 || code2) {
14,699!
487
    stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
×
488
    return TSDB_CODE_OUT_OF_MEMORY;
×
489
  }
490

491
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
14,699✔
492

493
  code = streamCreateStateMachine(pTask);
14,699✔
494
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
14,686!
495
    stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
×
496
            tstrerror(code));
497
    return code;
×
498
  }
499

500
  pTask->execInfo.created = taosGetTimestampMs();
14,698✔
501
  setInitialVersionInfo(pTask, ver);
14,698✔
502

503
  pTask->pMeta = pMeta;
14,695✔
504
  pTask->pMsgCb = pMsgCb;
14,695✔
505
  pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry));
14,695✔
506
  if (pTask->msgInfo.pSendInfo == NULL) {
14,696!
507
    stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr);
×
508
    return terrno;
×
509
  }
510

511
  code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
14,696✔
512
  if (code) {
14,694!
513
    stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code));
×
514
    return code;
×
515
  }
516

517
  TdThreadMutexAttr attr = {0};
14,694✔
518
  code = taosThreadMutexAttrInit(&attr);
14,694✔
519
  if (code != 0) {
14,683!
520
    stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
×
521
    return code;
×
522
  }
523

524
  code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
14,683✔
525
  if (code != 0) {
14,679!
526
    stError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
×
527
    return code;
×
528
  }
529

530
  code = taosThreadMutexInit(&pTask->lock, &attr);
14,679✔
531
  if (code) {
14,696!
532
    return code;
×
533
  }
534

535
  code = taosThreadMutexAttrDestroy(&attr);
14,696✔
536
  if (code) {
14,674!
537
    return code;
×
538
  }
539

540
  streamTaskOpenAllUpstreamInput(pTask);
14,674✔
541

542
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
14,672✔
543
  pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
14,672!
544
  if (pOutputInfo->pTokenBucket == NULL) {
14,693!
545
    stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno));
×
546
    return terrno;
×
547
  }
548

549
  // 2MiB per second for sink task
550
  // 50 times sink operator per second
551
  code = streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
14,693✔
552
  if (code) {
14,694!
553
    return code;
×
554
  }
555

556
  pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
14,694✔
557
  if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
14,702!
558
    stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno));
×
559
    return terrno;
×
560
  }
561

562
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
14,702✔
563
  if (pTask->taskCheckInfo.pList == NULL) {
14,692!
564
    stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno));
×
565
    return terrno;
×
566
  }
567

568
  code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
14,692✔
569
  if (code) {
14,689!
570
    return code;
×
571
  }
572

573
  if (pTask->chkInfo.pActiveInfo == NULL) {
14,689!
574
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
14,690✔
575
    if (code) {
14,693!
576
      stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code));
×
577
      return code;
×
578
    }
579
  }
580

581
  return streamTaskSetBackendPath(pTask);
14,692✔
582
}
583

584
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
112,393✔
585
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
112,393✔
586
    return 0;
6,798✔
587
  }
588

589
  int32_t type = pTask->outputInfo.type;
105,595✔
590
  if (type == TASK_OUTPUT__TABLE) {
105,595✔
591
    return 0;
282✔
592
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
105,313✔
593
    return 1;
8,708✔
594
  } else if (type == TASK_OUTPUT__VTABLE_MAP) {
96,605!
595
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
596
    return taosArrayGetSize(pTaskInfos);
×
597
  } else {
598
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
96,605✔
599
    return taosArrayGetSize(vgInfo);
96,605✔
600
  }
601
}
602

603
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) { return taosArrayGetSize(pTask->upstreamInfo.pList); }
16,102✔
604

605
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
20,092✔
606
  SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
20,092✔
607
  if (pEpInfo == NULL) {
20,092!
608
    return terrno;
×
609
  }
610

611
  if (pTask->upstreamInfo.pList == NULL) {
20,092✔
612
    pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES);
6,934✔
613
  }
614

615
  void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
20,092✔
616
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
20,092!
617
}
618

619
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
465✔
620
  int32_t code = 0;
465✔
621
  char    buf[512] = {0};
465✔
622
  code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore error since it is only for log file.
465✔
623
  if (code != 0) {                                // print error and continue
465!
624
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
625
    return code;
×
626
  }
627

628
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
465✔
629
  for (int32_t i = 0; i < numOfUpstream; ++i) {
921✔
630
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
826✔
631
    if (pInfo->nodeId == nodeId) {
826✔
632
      bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
370✔
633
      if (!equal) {
370✔
634
        *pUpdated = true;
24✔
635

636
        char tmp[512] = {0};
24✔
637
        code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
24✔
638
        if (code != 0) {  // print error and continue
24!
639
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
640
          return code;
×
641
        }
642

643
        epsetAssign(&pInfo->epSet, pEpSet);
24✔
644
        stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
24!
645
                pInfo->taskId, nodeId, buf, tmp);
646
      } else {
647
        stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId,
346!
648
                pInfo->taskId, nodeId, buf);
649
      }
650

651
      break;
370✔
652
    }
653
  }
654

655
  return code;
465✔
656
}
657

658
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
56,068✔
659
  if (pUpstreamInfo->pList != NULL) {
56,068✔
660
    taosArrayDestroyEx(pUpstreamInfo->pList, freeUpstreamItem);
49,008✔
661
    pUpstreamInfo->numOfClosed = 0;
49,011✔
662
    pUpstreamInfo->pList = NULL;
49,011✔
663
  }
664
}
56,071✔
665

666
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
840✔
667
  STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
840✔
668
  pDispatcher->taskId = pDownstreamTask->id.taskId;
840✔
669
  pDispatcher->nodeId = pDownstreamTask->info.nodeId;
840✔
670
  pDispatcher->epSet = pDownstreamTask->info.epSet;
840✔
671

672
  pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
840✔
673
  pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
840✔
674
}
840✔
675

676
int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
465✔
677
  char    buf[512] = {0};
465✔
678
  int32_t code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore the error since only for log files.
465✔
679
  if (code != 0) {                                        // print error and continue
465!
680
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
681
    return code;
×
682
  }
683

684
  int32_t id = pTask->id.taskId;
465✔
685
  int8_t  type = pTask->outputInfo.type;
465✔
686

687
  if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
465✔
688
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
368✔
689

690
    for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
731✔
691
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
729✔
692
      if (pVgInfo == NULL) {
729!
693
        continue;
×
694
      }
695

696
      if (pVgInfo->vgId == nodeId) {
729✔
697
        bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
366✔
698
        if (!isEqual) {
366✔
699
          *pUpdated = true;
24✔
700

701
          char tmp[512] = {0};
24✔
702
          code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
24✔
703
          if (code != 0) {  // print error and continue
24!
704
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
705
            return code;
×
706
          }
707

708
          epsetAssign(&pVgInfo->epSet, pEpSet);
24✔
709
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
24!
710
                  nodeId, buf, tmp);
711
        } else {
712
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
342!
713
                  pVgInfo->taskId, nodeId, buf);
714
        }
715
        break;
366✔
716
      }
717
    }
718
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
97!
719
    STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
97✔
720
    if (pDispatcher->nodeId == nodeId) {
97✔
721
      bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet);
4✔
722
      if (!equal) {
4!
723
        *pUpdated = true;
×
724

725
        char tmp[512] = {0};
×
726
        code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
×
727
        if (code != 0) {  // print error and continue
×
728
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
729
          return code;
×
730
        }
731

732
        epsetAssign(&pDispatcher->epSet, pEpSet);
×
733
        stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
×
734
                nodeId, buf, tmp);
735
      } else {
736
        stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
4!
737
                pDispatcher->taskId, nodeId, buf);
738
      }
739
    }
740
  } else if (type == TASK_OUTPUT__VTABLE_MAP) {
×
741
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
742

743
    for (int32_t i = 0; i < taosArrayGetSize(pTaskInfos); ++i) {
×
744
      STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
745
      if (pAddr == NULL) {
×
746
        continue;
×
747
      }
748

749
      if (pAddr->nodeId == nodeId) {
×
750
        bool isEqual = isEpsetEqual(&pAddr->epSet, pEpSet);
×
751
        if (!isEqual) {
×
752
          *pUpdated = true;
×
753

754
          char tmp[512] = {0};
×
755
          code = epsetToStr(&pAddr->epSet, tmp, tListLen(tmp));
×
756
          if (code != 0) {  // print error and continue
×
757
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
758
            return code;
×
759
          }
760

761
          epsetAssign(&pAddr->epSet, pEpSet);
×
762
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pAddr->taskId,
×
763
                  nodeId, buf, tmp);
764
        } else {
765
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
×
766
                  pAddr->taskId, nodeId, buf);
767
        }
768
        break;
×
769
      }
770
    }
771
  }
772

773
  return code;
465✔
774
}
775

776
int32_t streamTaskStop(SStreamTask* pTask) {
3,254✔
777
  int32_t     vgId = pTask->pMeta->vgId;
3,254✔
778
  int64_t     st = taosGetTimestampMs();
3,254✔
779
  const char* id = pTask->id.idStr;
3,254✔
780

781
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
3,254✔
782
  if (code) {
3,254!
783
    stError("failed to handle STOP event, s-task:%s, code:%s", id, tstrerror(code));
×
784
    return code;
×
785
  }
786

787
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
3,254✔
788
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000);
1,561✔
789
    if (code != TSDB_CODE_SUCCESS) {
1,561!
790
      stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
×
791
    }
792
  }
793

794
  while (!streamTaskIsIdle(pTask)) {
3,254!
795
    stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
×
796
            pTask->info.taskLevel);
797
    taosMsleep(100);
×
798
  }
799

800
  int64_t el = taosGetTimestampMs() - st;
3,253✔
801
  stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
3,253✔
802
  return code;
3,254✔
803
}
804

805
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
453✔
806
  STaskExecStatisInfo* p = &pTask->execInfo;
453✔
807

808
  int32_t numOfNodes = taosArrayGetSize(pNodeList);
453✔
809
  int64_t prevTs = p->latestUpdateTs;
453✔
810

811
  p->latestUpdateTs = taosGetTimestampMs();
453✔
812
  p->updateCount += 1;
453✔
813
  stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
453!
814
          numOfNodes, p->updateCount, prevTs);
815

816
  bool updated = false;
453✔
817
  for (int32_t i = 0; i < numOfNodes; ++i) {
1,364✔
818
    SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
911✔
819
    if (pInfo == NULL) {
911!
820
      continue;
×
821
    }
822

823
    int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
911✔
824
    if (code) {
911!
825
      stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
×
826
    }
827
  }
828

829
  return updated;
453✔
830
}
831

832
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
14,696✔
833
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
14,696✔
834
    return;
7,386✔
835
  }
836

837
  int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList);
7,310✔
838
  for (int32_t i = 0; i < size; ++i) {
28,546✔
839
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
21,237✔
840
    pInfo->stage = -1;
21,237✔
841
  }
842

843
  stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
7,309✔
844
}
845

846
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
22,714✔
847
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
22,714✔
848
  if (num == 0) {
22,724✔
849
    return;
11,354✔
850
  }
851

852
  for (int32_t i = 0; i < num; ++i) {
45,239✔
853
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
33,872✔
854
    pInfo->dataAllowed = true;
33,869✔
855
  }
856

857
  pTask->upstreamInfo.numOfClosed = 0;
11,367✔
858
  stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num);
11,367✔
859
}
860

861
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
6,917✔
862
  SStreamUpstreamEpInfo* pInfo = NULL;
6,917✔
863
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
6,917✔
864

865
  if ((pInfo != NULL) && pInfo->dataAllowed) {
6,928!
866
    pInfo->dataAllowed = false;
6,930✔
867
    if (pTask->upstreamInfo.numOfClosed < streamTaskGetNumOfUpstream(pTask)) {
6,930!
868
      int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
6,917✔
869
    } else {
870
      stError("s-task:%s not inc closed input, since they have been all closed already", pTask->id.idStr);
×
871
    }
872
  }
873
}
6,941✔
874

875
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
1✔
876
  SStreamUpstreamEpInfo* pInfo = NULL;
1✔
877
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
1✔
878

879
  if (pInfo != NULL && (!pInfo->dataAllowed)) {
1!
880
    int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
×
881
    stDebug("s-task:%s open inputQ for upstream:0x%x, remain closed:%d", pTask->id.idStr, taskId, t);
×
882
    pInfo->dataAllowed = true;
×
883
  }
884
}
1✔
885

886
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
×
887
  return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
×
888
}
889

890
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
88,636✔
891
  bool ret = false;
88,636✔
892

893
  streamMutexLock(&pTask->lock);
88,636✔
894
  if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
88,675✔
895
    pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
52,760✔
896
    ret = true;
52,760✔
897
  }
898

899
  streamMutexUnlock(&pTask->lock);
88,675✔
900
  return ret;
88,672✔
901
}
902

903
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
51,566✔
904
  streamMutexLock(&pTask->lock);
51,566✔
905
  int8_t status = pTask->status.schedStatus;
51,611✔
906
  if (status == TASK_SCHED_STATUS__WAITING) {
51,611✔
907
    pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
51,600✔
908
  }
909
  streamMutexUnlock(&pTask->lock);
51,611✔
910

911
  return status;
51,610✔
912
}
913

914
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
1,134✔
915
  streamMutexLock(&pTask->lock);
1,134✔
916
  int8_t status = pTask->status.schedStatus;
1,134✔
917
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
1,134✔
918
  streamMutexUnlock(&pTask->lock);
1,134✔
919

920
  return status;
1,134✔
921
}
922

923
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
6,936✔
924
  int32_t      code = 0;
6,936✔
925
  SStreamMeta* pMeta = pTask->pMeta;
6,936✔
926
  SStreamTask* pStreamTask = NULL;
6,936✔
927

928
  if (pTask->info.fillHistory == 0) {
6,936!
929
    return code;
6,949✔
930
  }
931

932
  code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
×
933
  if (code == 0) {
×
934
    stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
×
935
            (int32_t)pTask->streamTaskId.taskId);
936

937
    streamMutexLock(&(pStreamTask->lock));
×
938
    CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
×
939

940
    if (resetRelHalt) {
×
941
      stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
×
942
              pTask->streamTaskId.taskId, streamTaskGetStatusStr(pStreamTask->status.taskStatus),
943
              streamTaskGetStatus(pStreamTask).name);
944
      pStreamTask->status.taskStatus = TASK_STATUS__READY;
×
945
    }
946

947
    code = streamMetaSaveTaskInMeta(pMeta, pStreamTask);
×
948
    streamMutexUnlock(&(pStreamTask->lock));
×
949

950
    streamMetaReleaseTask(pMeta, pStreamTask);
×
951
  }
952

953
  return code;
×
954
}
955

956
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
×
957
  SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
×
958
  if (pReq == NULL) {
×
959
    return terrno;
×
960
  }
961

962
  pReq->head.vgId = vgId;
×
963
  pReq->taskId = pTaskId->taskId;
×
964
  pReq->streamId = pTaskId->streamId;
×
965
  pReq->resetRelHalt = resetRelHalt;
×
966

967
  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
×
968
  int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
×
969
  if (code != TSDB_CODE_SUCCESS) {
×
970
    stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
×
971
  } else {
972
    stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
×
973
  }
974

975
  return code;
×
976
}
977

978
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
4,466✔
979
  int32_t                code = 0;
4,466✔
980
  int32_t                tlen = 0;
4,466✔
981
  int32_t                vgId = pTask->pMeta->vgId;
4,466✔
982
  const char*            id = pTask->id.idStr;
4,466✔
983
  SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo;
4,466✔
984

985
  SCheckpointReport req = {.streamId = pTask->id.streamId,
4,466✔
986
                           .taskId = pTask->id.taskId,
4,466✔
987
                           .nodeId = vgId,
988
                           .dropHTask = dropRelHTask,
989
                           .transId = pActive->transId,
4,466✔
990
                           .checkpointId = pActive->activeId,
4,466✔
991
                           .checkpointVer = pCheckpointInfo->processedVer,
4,466✔
992
                           .checkpointTs = pCheckpointInfo->startTs};
4,466✔
993

994
  tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code);
4,466!
995
  if (code < 0) {
4,466!
996
    stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code));
×
997
    return -1;
×
998
  }
999

1000
  void* buf = rpcMallocCont(tlen);
4,466✔
1001
  if (buf == NULL) {
4,466!
1002
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId,
×
1003
            tstrerror(TSDB_CODE_OUT_OF_MEMORY));
1004
    return -1;
×
1005
  }
1006

1007
  SEncoder encoder;
1008
  tEncoderInit(&encoder, buf, tlen);
4,466✔
1009
  if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
4,466!
1010
    rpcFreeCont(buf);
×
1011
    tEncoderClear(&encoder);
×
1012
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
×
1013
    return -1;
×
1014
  }
1015
  tEncoderClear(&encoder);
4,466✔
1016

1017
  SRpcMsg msg = {0};
4,466✔
1018
  initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen);
4,466✔
1019
  stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId);
4,466✔
1020

1021
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
4,466✔
1022
}
1023

1024
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
56,543✔
1025
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
56,543✔
1026
  return id;
56,543✔
1027
}
1028

1029
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
2,326✔
1030
  pInfo->waitInterval = LAUNCH_HTASK_INTERVAL;
2,326✔
1031
  pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL);
2,326✔
1032
  pInfo->retryTimes = 0;
2,326✔
1033
}
2,326✔
1034

1035
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
2,308✔
1036
  pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
2,308✔
1037
  pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
2,308✔
1038
  pInfo->retryTimes += 1;
2,308✔
1039
}
2,308✔
1040

1041
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) {
9,357✔
1042
  pEntry->id.streamId = pTask->id.streamId;
9,357✔
1043
  pEntry->id.taskId = pTask->id.taskId;
9,357✔
1044
  pEntry->stage = -1;
9,357✔
1045
  pEntry->nodeId = pTask->info.nodeId;
9,357✔
1046
  pEntry->status = TASK_STATUS__STOP;
9,357✔
1047
}
9,357✔
1048

1049
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) {
38,768✔
1050
  pDst->stage = pSrc->stage;
38,768✔
1051
  pDst->inputQUsed = pSrc->inputQUsed;
38,768✔
1052
  pDst->inputRate = pSrc->inputRate;
38,768✔
1053
  pDst->procsTotal = pSrc->procsTotal;
38,768✔
1054
  pDst->procsThroughput = pSrc->procsThroughput;
38,768✔
1055
  pDst->outputTotal = pSrc->outputTotal;
38,768✔
1056
  pDst->outputThroughput = pSrc->outputThroughput;
38,768✔
1057
  pDst->processedVer = pSrc->processedVer;
38,768✔
1058
  pDst->verRange = pSrc->verRange;
38,768✔
1059
  pDst->sinkQuota = pSrc->sinkQuota;
38,768✔
1060
  pDst->sinkDataSize = pSrc->sinkDataSize;
38,768✔
1061
  pDst->checkpointInfo = pSrc->checkpointInfo;
38,768✔
1062
  pDst->startCheckpointId = pSrc->startCheckpointId;
38,768✔
1063
  pDst->startCheckpointVer = pSrc->startCheckpointVer;
38,768✔
1064
  pDst->status = pSrc->status;
38,768✔
1065

1066
  pDst->startTime = pSrc->startTime;
38,768✔
1067
  pDst->hTaskId = pSrc->hTaskId;
38,768✔
1068
  pDst->notifyEventStat = pSrc->notifyEventStat;
38,768✔
1069
}
38,768✔
1070

1071
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
40,159✔
1072
  SStreamMeta*         pMeta = pTask->pMeta;
40,159✔
1073
  STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
40,159✔
1074

1075
  STaskStatusEntry entry = {
120,477✔
1076
      .id = streamTaskGetTaskId(pTask),
40,159✔
1077
      .status = streamTaskGetStatus(pTask).state,
40,159✔
1078
      .nodeId = pMeta->vgId,
40,159✔
1079
      .stage = pMeta->stage,
40,159✔
1080

1081
      .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)),
40,159✔
1082
      .startTime = pExecInfo->readyTs,
40,159✔
1083
      .checkpointInfo.latestId = pTask->chkInfo.checkpointId,
40,159✔
1084
      .checkpointInfo.latestVer = pTask->chkInfo.checkpointVer,
40,159✔
1085
      .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
40,159✔
1086
      .checkpointInfo.latestSize = 0,
1087
      .checkpointInfo.remoteBackup = 0,
1088
      .checkpointInfo.consensusChkptId = 0,
1089
      .checkpointInfo.consensusTs = 0,
1090
      .hTaskId = pTask->hTaskInfo.id.taskId,
40,159✔
1091
      .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
40,159✔
1092
      .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
40,159✔
1093
      .procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput),
40,159✔
1094
      .outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput),
40,159✔
1095
      .startCheckpointId = pExecInfo->startCheckpointId,
40,159✔
1096
      .startCheckpointVer = pExecInfo->startCheckpointVer,
40,159✔
1097
      .notifyEventStat = pTask->notifyEventStat,
1098
  };
1099
  return entry;
40,159✔
1100
}
1101

1102
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
739✔
1103
  SStreamMeta* pMeta = pTask->pMeta;
739✔
1104
  int32_t      code = 0;
739✔
1105

1106
  int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
739✔
1107
  stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
742!
1108

1109
  // in case of fill-history task, stop the tsdb file scan operation.
1110
  if (pTask->info.fillHistory == 1) {
743!
1111
    void* pExecutor = pTask->exec.pExecutor;
×
1112
    code = qKillTask(pExecutor, TSDB_CODE_SUCCESS, 10000);
×
1113
  }
1114

1115
  stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
743✔
1116
  return code;
743✔
1117
}
1118

1119
void streamTaskPause(SStreamTask* pTask) {
745✔
1120
  int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
745✔
1121
  if (code) {
750!
1122
    stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code));
×
1123
  }
1124
}
750✔
1125

1126
void streamTaskResume(SStreamTask* pTask) {
1,244✔
1127
  SStreamTaskState prevState = streamTaskGetStatus(pTask);
1,244✔
1128

1129
  SStreamMeta* pMeta = pTask->pMeta;
1,243✔
1130
  int32_t      code = streamTaskRestoreStatus(pTask);
1,243✔
1131
  if (code == TSDB_CODE_SUCCESS) {
1,247✔
1132
    char*   pNew = streamTaskGetStatus(pTask).name;
732✔
1133
    int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
732✔
1134
    stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
732!
1135
  } else {
1136
    stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name,
515✔
1137
           pMeta->numOfPausedTasks);
1138
  }
1139
}
1,248✔
1140

1141
bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
56,573✔
1142

1143
// this task must success
1144
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
4,275✔
1145
  int32_t     code;
1146
  int32_t     tlen = 0;
4,275✔
1147
  int32_t     vgId = pTask->pMeta->vgId;
4,275✔
1148
  const char* id = pTask->id.idStr;
4,275✔
1149

1150
  SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
4,275✔
1151
  tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
4,275!
1152
  if (code < 0) {
4,275!
1153
    stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
×
1154
    return TSDB_CODE_INVALID_MSG;
×
1155
  }
1156

1157
  void* buf = rpcMallocCont(tlen);
4,275✔
1158
  if (buf == NULL) {
4,272!
1159
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:Out of memory", id, vgId);
×
1160
    return terrno;
×
1161
  }
1162

1163
  SEncoder encoder;
1164
  tEncoderInit(&encoder, buf, tlen);
4,272✔
1165
  if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
4,276!
1166
    rpcFreeCont(buf);
×
1167
    tEncoderClear(&encoder);
×
1168
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
×
1169
    return code;
×
1170
  }
1171

1172
  tEncoderClear(&encoder);
4,273✔
1173

1174
  SRpcMsg msg = {0};
4,275✔
1175
  initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
4,275✔
1176
  stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
4,270✔
1177

1178
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
4,270✔
1179
}
1180

1181
void streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo) {
93,250✔
1182
  *pEpInfo = NULL;
93,250✔
1183

1184
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
93,250✔
1185
  for (int32_t i = 0; i < num; ++i) {
190,583✔
1186
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
190,579✔
1187
    if (pInfo == NULL) {
190,555!
1188
      return;
×
1189
    }
1190

1191
    if (pInfo->taskId == taskId) {
190,555✔
1192
      *pEpInfo = pInfo;
93,242✔
1193
      return;
93,242✔
1194
    }
1195
  }
1196

1197
  stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
4!
1198
}
1199

1200
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
×
1201
  if (pTask->info.taskLevel == TASK_OUTPUT__FIXED_DISPATCH) {
×
1202
    if (pTask->outputInfo.fixedDispatcher.taskId == taskId) {
×
1203
      return &pTask->outputInfo.fixedDispatcher.epSet;
×
1204
    }
1205
  } else if (pTask->info.taskLevel == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
1206
    SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
1207
    for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
1208
      SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
×
1209
      if (pVgInfo == NULL) {
×
1210
        continue;
×
1211
      }
1212

1213
      if (pVgInfo->taskId == taskId) {
×
1214
        return &pVgInfo->epSet;
×
1215
      }
1216
    }
1217
  } else if (pTask->info.taskLevel == TASK_OUTPUT__VTABLE_MAP) {
×
1218
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
1219
    for (int32_t i = 0; i < taosArrayGetSize(pTaskInfos); ++i) {
×
1220
      STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
1221
      if (pAddr == NULL) {
×
1222
        continue;
×
1223
      }
1224

1225
      if (pAddr->taskId == taskId) {
×
1226
        return &pAddr->epSet;
×
1227
      }
1228
    }
1229
  }
1230

1231
  return NULL;
×
1232
}
1233

1234
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
14,691✔
1235
  char    buf[128] = {0};
14,691✔
1236
  int32_t code = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", streamId, taskId);
14,691✔
1237
  if (code < 0 || code >= tListLen(buf)) {
14,691!
1238
    return TSDB_CODE_OUT_OF_BUFFER;
×
1239
  }
1240

1241
  *pId = taosStrdup(buf);
14,701!
1242

1243
  if (*pId == NULL) {
14,690!
1244
    return terrno;
×
1245
  } else {
1246
    return TSDB_CODE_SUCCESS;
14,690✔
1247
  }
1248
}
1249

1250
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
525✔
1251
  int32_t           code;
1252
  SStreamDataBlock* pData;
1253

1254
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock), (void**)&pData);
525✔
1255
  if (code) {
525!
1256
    stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
×
1257
    return terrno = code;
×
1258
  }
1259

1260
  pData->type = STREAM_INPUT__DATA_RETRIEVE;
525✔
1261
  pData->srcVgId = 0;
525✔
1262

1263
  code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
525✔
1264
  if (code != TSDB_CODE_SUCCESS) {
525!
1265
    stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code));
×
1266
    taosFreeQitem(pData);
×
1267
    return code;
×
1268
  }
1269

1270
  code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
525✔
1271
  if (code != TSDB_CODE_SUCCESS) {
525!
1272
    stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
×
1273
            pTask->id.idStr);
1274
  }
1275

1276
  return code;
525✔
1277
}
1278

1279
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
524✔
1280
  int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
524✔
1281
  if (code != 0) {
525!
1282
    return code;
×
1283
  }
1284
  return streamTrySchedExec(pTask, false);
525✔
1285
}
1286

1287
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
6,941✔
1288

1289
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
×
1290
  if (pTransId != NULL) {
×
1291
    *pTransId = pTask->chkInfo.pActiveInfo->transId;
×
1292
  }
1293

1294
  if (pCheckpointId != NULL) {
×
1295
    *pCheckpointId = pTask->chkInfo.pActiveInfo->activeId;
×
1296
  }
1297
}
×
1298

1299
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) {
28✔
1300
  pTask->chkInfo.pActiveInfo->activeId = activeCheckpointId;
28✔
1301
  return TSDB_CODE_SUCCESS;
28✔
1302
}
1303

1304
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
×
1305
  pTask->chkInfo.pActiveInfo->transId = transId;
×
1306
  pTask->chkInfo.pActiveInfo->activeId = checkpointId;
×
1307
  pTask->chkInfo.pActiveInfo->failedId = checkpointId;
×
1308
  stDebug("s-task:%s set failed checkpointId:%" PRId64, pTask->id.idStr, checkpointId);
×
1309
}
×
1310

1311
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
14,731✔
1312
  SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo));
14,731!
1313
  if (pInfo == NULL) {
14,740!
1314
    return terrno;
×
1315
  }
1316

1317
  int32_t code = taosThreadMutexInit(&pInfo->lock, NULL);
14,740✔
1318
  if (code != TSDB_CODE_SUCCESS) {
14,741!
1319
    return code;
×
1320
  }
1321

1322
  pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
14,741✔
1323
  pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
14,743✔
1324
  pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
14,730✔
1325

1326
  *pRes = pInfo;
14,738✔
1327
  return code;
14,738✔
1328
}
1329

1330
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
56,068✔
1331
  if (pInfo == NULL) {
56,068✔
1332
    return;
41,334✔
1333
  }
1334

1335
  streamMutexDestroy(&pInfo->lock);
14,734✔
1336
  taosArrayDestroy(pInfo->pDispatchTriggerList);
14,738✔
1337
  pInfo->pDispatchTriggerList = NULL;
14,742✔
1338
  taosArrayDestroy(pInfo->pReadyMsgList);
14,742✔
1339
  pInfo->pReadyMsgList = NULL;
14,742✔
1340
  taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
14,742✔
1341
  pInfo->pCheckpointReadyRecvList = NULL;
14,742✔
1342

1343
  SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
14,742✔
1344
  if (pTriggerTmr->tmrHandle != NULL) {
14,742✔
1345
    streamTmrStop(pTriggerTmr->tmrHandle);
2,158✔
1346
    pTriggerTmr->tmrHandle = NULL;
2,157✔
1347
  }
1348

1349
  SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
14,741✔
1350
  if (pReadyTmr->tmrHandle != NULL) {
14,741✔
1351
    streamTmrStop(pReadyTmr->tmrHandle);
2,153✔
1352
    pReadyTmr->tmrHandle = NULL;
2,153✔
1353
  }
1354

1355
  taosMemoryFree(pInfo);
14,741!
1356
}
1357

1358
// NOTE: clear the checkpoint id, and keep the failed id
1359
// failedId for a task will increase as the checkpoint I.D. increases.
1360
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
3,673✔
1361
  pInfo->activeId = 0;
3,673✔
1362
  pInfo->transId = 0;
3,673✔
1363
  pInfo->allUpstreamTriggerRecv = 0;
3,673✔
1364
  pInfo->dispatchTrigger = false;
3,673✔
1365

1366
  taosArrayClear(pInfo->pDispatchTriggerList);
3,673✔
1367
  taosArrayClear(pInfo->pCheckpointReadyRecvList);
3,669✔
1368
}
3,665✔
1369

1370
const char* streamTaskGetExecType(int32_t type) {
145,380✔
1371
  switch (type) {
145,380!
1372
    case STREAM_EXEC_T_EXTRACT_WAL_DATA:
91,123✔
1373
      return "scan-wal-file";
91,123✔
1374
    case STREAM_EXEC_T_START_ALL_TASKS:
7,385✔
1375
      return "start-all-tasks";
7,385✔
1376
    case STREAM_EXEC_T_START_ONE_TASK:
5,927✔
1377
      return "start-one-task";
5,927✔
1378
    case STREAM_EXEC_T_RESTART_ALL_TASKS:
48✔
1379
      return "restart-all-tasks";
48✔
1380
    case STREAM_EXEC_T_STOP_ALL_TASKS:
4,747✔
1381
      return "stop-all-tasks";
4,747✔
1382
    case STREAM_EXEC_T_RESUME_TASK:
4,415✔
1383
      return "resume-task-from-idle";
4,415✔
1384
    case STREAM_EXEC_T_ADD_FAILED_TASK:
1✔
1385
      return "record-start-failed-task";
1✔
1386
    case STREAM_EXEC_T_STOP_ONE_TASK:
×
1387
      return "stop-one-task";
×
1388
    case 0:
31,832✔
1389
      return "exec-all-tasks";
31,832✔
1390
    default:
×
1391
      return "invalid-exec-type";
×
1392
  }
1393
}
1394

1395
int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) {
32,147✔
1396
  *pRefId = taosMemoryMalloc(sizeof(int64_t));
32,147!
1397
  if (*pRefId != NULL) {
32,152!
1398
    **pRefId = pTask->id.refId;
32,152✔
1399
    int32_t code = metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
32,152✔
1400
    if (code != 0) {
32,156!
1401
      stError("s-task:%s failed to add refId:%" PRId64 " into refId-mgmt, code:%s", pTask->id.idStr, pTask->id.refId,
×
1402
              tstrerror(code));
1403
    }
1404
    return code;
32,156✔
1405
  } else {
1406
    stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno));
×
1407
    return terrno;
×
1408
  }
1409
}
1410

1411
void streamTaskFreeRefId(int64_t* pRefId) {
30,285✔
1412
  if (pRefId == NULL) {
30,285✔
1413
    return;
2,163✔
1414
  }
1415

1416
  metaRefMgtRemove(pRefId);
28,122✔
1417
}
1418

1419
static int32_t tEncodeStreamNotifyInfo(SEncoder* pEncoder, const SNotifyInfo* info) {
122,705✔
1420
  int32_t code = TSDB_CODE_SUCCESS;
122,705✔
1421
  int32_t lino = 0;
122,705✔
1422

1423
  QUERY_CHECK_NULL(pEncoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
122,705!
1424
  QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
122,705!
1425

1426
  int32_t addrSize = taosArrayGetSize(info->pNotifyAddrUrls);
122,705✔
1427
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
122,711!
1428
  for (int32_t i = 0; i < addrSize; ++i) {
122,711!
1429
    const char* url = taosArrayGetP(info->pNotifyAddrUrls, i);
×
1430
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, url));
×
1431
  }
1432
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyEventTypes));
245,422!
1433
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyErrorHandle));
245,422!
1434
  if (addrSize > 0) {
122,711!
1435
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->streamName));
×
1436
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->stbFullName));
×
1437
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, info->pSchemaWrapper));
×
1438
  }
1439

1440
_exit:
122,711✔
1441
  if (code != TSDB_CODE_SUCCESS) {
122,711!
1442
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1443
  }
1444
  return code;
122,710✔
1445
}
1446

1447
static int32_t tDecodeStreamNotifyInfo(SDecoder* pDecoder, SNotifyInfo* info) {
42,059✔
1448
  int32_t code = TSDB_CODE_SUCCESS;
42,059✔
1449
  int32_t lino = 0;
42,059✔
1450

1451
  QUERY_CHECK_NULL(pDecoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
42,059!
1452
  QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
42,059!
1453

1454
  int32_t addrSize = 0;
42,059✔
1455
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
42,071!
1456
  info->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
42,071✔
1457
  QUERY_CHECK_NULL(info->pNotifyAddrUrls, code, lino, _exit, terrno);
42,078✔
1458
  for (int32_t i = 0; i < addrSize; ++i) {
42,076!
1459
    char* url = NULL;
×
1460
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
×
1461
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
×
1462
    QUERY_CHECK_NULL(url, code, lino, _exit, terrno);
×
1463
    if (taosArrayPush(info->pNotifyAddrUrls, &url) == NULL) {
×
1464
      taosMemoryFree(url);
×
1465
      TAOS_CHECK_EXIT(terrno);
×
1466
    }
1467
  }
1468
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyEventTypes));
84,147!
1469
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyErrorHandle));
84,147!
1470
  if (addrSize > 0) {
42,076!
1471
    char* name = NULL;
×
1472
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
×
1473
    info->streamName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
×
1474
    QUERY_CHECK_NULL(info->streamName, code, lino, _exit, terrno);
×
1475
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
×
1476
    info->stbFullName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
×
1477
    QUERY_CHECK_NULL(info->stbFullName, code, lino, _exit, terrno);
×
1478
    info->pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
×
1479
    if (info->pSchemaWrapper == NULL) {
×
1480
      TAOS_CHECK_EXIT(terrno);
×
1481
    }
1482
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, info->pSchemaWrapper));
×
1483
  }
1484

1485
_exit:
42,076✔
1486
  if (code != TSDB_CODE_SUCCESS) {
42,076!
1487
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1488
  }
1489
  return code;
42,073✔
1490
}
1491

1492
int32_t tEncodeVTablesInfo(SEncoder* pEncoder, SArray* pVTables) {
122,704✔
1493
  int32_t code = TSDB_CODE_SUCCESS;
122,704✔
1494
  int32_t lino = 0;
122,704✔
1495
  SVCTableMergeInfo* pMergeInfo = NULL;
122,704✔
1496
  int32_t mergeNum = taosArrayGetSize(pVTables);
122,704✔
1497
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, mergeNum));
122,713!
1498
  for (int32_t i = 0; i < mergeNum; ++i) {
122,713!
1499
    pMergeInfo = (SVCTableMergeInfo*)taosArrayGet(pVTables, i);
×
1500
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMergeInfo->uid));
×
1501
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMergeInfo->numOfSrcTbls));
×
1502
  }
1503

1504
_exit:
122,713✔
1505
  if (code != TSDB_CODE_SUCCESS) {
122,713!
1506
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1507
  }
1508
  return code;
122,711✔
1509
}
1510

1511
static int32_t tDecodeVTablesInfo(SDecoder* pDecoder, SArray** pTables) {
42,065✔
1512
  int32_t code = TSDB_CODE_SUCCESS;
42,065✔
1513
  int32_t lino = 0;
42,065✔
1514

1515
  int32_t mergeNum = 0;
42,065✔
1516
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &mergeNum));
42,070!
1517
  if (mergeNum > 0) {
42,070!
1518
    *pTables = taosArrayInit(mergeNum, sizeof(SVCTableMergeInfo));
×
1519
    QUERY_CHECK_NULL(*pTables, code, lino, _exit, terrno);
×
1520
  }
1521

1522
  SVCTableMergeInfo mergeInfo;
1523
  for (int32_t i = 0; i < mergeNum; ++i) {
42,071!
1524
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &mergeInfo.uid));
×
1525
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &mergeInfo.numOfSrcTbls));
×
1526
    if (taosArrayPush(*pTables, &mergeInfo) == NULL) {
×
1527
      TAOS_CHECK_EXIT(terrno);
×
1528
    }
1529
  }
1530

1531
_exit:
42,071✔
1532
  if (code != TSDB_CODE_SUCCESS) {
42,071!
1533
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1534
  }
1535
  return code;
42,065✔
1536
}
1537

1538
int32_t tSerializeDispatcherTaskInfo(SEncoder* pEncoder, const SArray* pTaskInfos) {
×
1539
  int32_t code = TSDB_CODE_SUCCESS;
×
1540
  int32_t lino = 0;
×
1541
  int32_t nTasks = taosArrayGetSize(pTaskInfos);
×
1542
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, nTasks));
×
1543
  for (int32_t i = 0; i < nTasks; ++i) {
×
1544
    STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
1545
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pAddr->taskId));
×
1546
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pAddr->nodeId));
×
1547
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pAddr->epSet));
×
1548
  }
1549

1550
_exit:
×
1551
  if (code != TSDB_CODE_SUCCESS) {
×
1552
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1553
  }
1554
  return code;
×
1555
}
1556

1557
int32_t tSerializeDispatcherVtableMap(SEncoder* pEncoder, const SSHashObj* pVtables) {
×
1558
  int32_t code = TSDB_CODE_SUCCESS;
×
1559
  int32_t lino = 0;
×
1560
  int32_t tbNum = tSimpleHashGetSize(pVtables);
×
1561
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, tbNum));
×
1562
  int32_t iter = 0;
×
1563
  void*   p = tSimpleHashIterate(pVtables, NULL, &iter);
×
1564
  while (p != NULL) {
×
1565
    int64_t* pUid = tSimpleHashGetKey(p, NULL);
×
1566
    int32_t* pIdx = p;
×
1567
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, *pUid));
×
1568
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pIdx));
×
1569
    p = tSimpleHashIterate(pVtables, p, &iter);
×
1570
  }
1571

1572
_exit:
×
1573
  if (code != TSDB_CODE_SUCCESS) {
×
1574
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1575
  }
1576
  return code;
×
1577
}
1578

1579
int32_t tDeserializeDispatcherTaskInfo(SDecoder* pDecoder, SArray** ppTaskInfos) {
×
1580
  int32_t code = TSDB_CODE_SUCCESS;
×
1581
  int32_t lino = 0;
×
1582
  int32_t nTasks = 0;
×
1583
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &nTasks));
×
1584
  if (nTasks <= 0) {
×
1585
    return code;
×
1586
  }
1587

1588
  *ppTaskInfos = taosArrayInit(nTasks, sizeof(STaskDispatcherFixed));
×
1589
  QUERY_CHECK_NULL(*ppTaskInfos, code, lino, _exit, terrno);
×
1590

1591
  STaskDispatcherFixed addr;
1592
  for (int32_t i = 0; i < nTasks; ++i) {
×
1593
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addr.taskId));
×
1594
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addr.nodeId));
×
1595
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &addr.epSet));
×
1596
    void* px = taosArrayPush(*ppTaskInfos, &addr);
×
1597
    QUERY_CHECK_NULL(px, code, lino, _exit, terrno);
×
1598
  }
1599

1600
_exit:
×
1601
  if (code != TSDB_CODE_SUCCESS) {
×
1602
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1603
  }
1604
  return code;
×
1605
}
1606

1607
int32_t tDeserializeDispatcherVtableMap(SDecoder* pDecoder, SSHashObj** ppVtables) {
×
1608
  int32_t code = TSDB_CODE_SUCCESS;
×
1609
  int32_t lino = 0;
×
1610
  int32_t tbNum = 0;
×
1611
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &tbNum));
×
1612
  if (tbNum <= 0) {
×
1613
    return code;
×
1614
  }
1615

1616
  *ppVtables = tSimpleHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1617
  QUERY_CHECK_NULL(*ppVtables, code, lino, _exit, terrno);
×
1618

1619
  uint64_t uid = 0;
×
1620
  int32_t  idx = 0;
×
1621
  for (int32_t i = 0; i < tbNum; ++i) {
×
1622
    TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &uid));
×
1623
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &idx));
×
1624
    TAOS_CHECK_EXIT(tSimpleHashPut(*ppVtables, &uid, sizeof(uid), &idx, sizeof(idx)));
×
1625
  }
1626

1627
_exit:
×
1628

1629
  if (code != TSDB_CODE_SUCCESS) {
×
1630
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1631
  }
1632
  return code;
×
1633
}
1634

1635
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
122,766✔
1636
  int32_t code = 0;
122,766✔
1637
  int32_t lino;
1638

1639
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
122,766!
1640
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
245,520!
1641
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
245,520!
1642
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
245,520!
1643
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
245,520!
1644
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
245,520!
1645
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
245,520!
1646
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
245,520!
1647

1648
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
245,520!
1649
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
245,520!
1650

1651
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
245,520!
1652
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
245,520!
1653
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
122,760!
1654
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
122,763!
1655

1656
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
245,534!
1657
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
245,534!
1658
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
245,534!
1659

1660
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
245,534!
1661
  int32_t taskId = pTask->hTaskInfo.id.taskId;
122,767✔
1662
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
122,767!
1663

1664
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
245,534!
1665
  taskId = pTask->streamTaskId.taskId;
122,767✔
1666
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
122,767!
1667

1668
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
245,534!
1669
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
245,534!
1670
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
245,534!
1671
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
245,534!
1672

1673
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
122,767✔
1674
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
122,756!
1675
  for (int32_t i = 0; i < epSz; i++) {
301,406✔
1676
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
178,665✔
1677
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
178,675!
1678
  }
1679

1680
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
122,741✔
1681
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
128,426!
1682
  }
1683

1684
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
122,741✔
1685
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
121,856!
1686
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
121,856!
1687
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
121,856!
1688
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
61,813!
1689
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
×
1690
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
61,813!
1691
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
1692
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
61,813✔
1693
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
15,052!
1694
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
15,052!
1695
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
7,526!
1696
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
54,287✔
1697
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
54,246!
1698
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
108,492!
1699
  } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
41!
1700
    TAOS_CHECK_EXIT(tSerializeDispatcherTaskInfo(pEncoder, pTask->outputInfo.vtableMapDispatcher.taskInfos));
×
1701
    TAOS_CHECK_EXIT(tSerializeDispatcherVtableMap(pEncoder, pTask->outputInfo.vtableMapDispatcher.vtableMap));
×
1702
  }
1703
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
245,482!
1704
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
245,482!
1705
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
245,482!
1706

1707
  if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
122,741✔
1708
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.hasAggTasks));
245,454✔
1709
    TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo));
122,705!
1710
    TAOS_CHECK_EXIT(tEncodeVTablesInfo(pEncoder, pTask->pVTables));
122,711!
1711
  }
1712

1713
  tEndEncode(pEncoder);
122,724✔
1714
_exit:
122,751✔
1715
  return code;
122,751✔
1716
}
1717

1718
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
42,067✔
1719
  int32_t taskId = 0;
42,067✔
1720
  int32_t code = 0;
42,067✔
1721
  int32_t lino;
1722

1723
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
42,067!
1724
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
84,169✔
1725
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
42,075!
1726
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
1727
  }
1728

1729
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
84,144!
1730
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
84,141!
1731
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
84,142!
1732
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
84,139!
1733
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
84,134!
1734
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
84,133!
1735

1736
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
84,137!
1737
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
84,136!
1738

1739
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
84,130!
1740
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
84,132!
1741
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
42,069!
1742
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
42,072!
1743

1744
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
84,146!
1745
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
84,146!
1746
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
84,148!
1747

1748
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
84,147!
1749
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
42,070!
1750
  pTask->hTaskInfo.id.taskId = taskId;
42,070✔
1751

1752
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
84,144!
1753
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
42,072!
1754
  pTask->streamTaskId.taskId = taskId;
42,072✔
1755

1756
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
84,141!
1757
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
84,137!
1758
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
84,140!
1759
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
84,144!
1760

1761
  int32_t epSz = -1;
42,072✔
1762
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
42,073!
1763

1764
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
42,073!
1765
    TAOS_CHECK_EXIT(terrno);
×
1766
  }
1767
  for (int32_t i = 0; i < epSz; i++) {
103,338✔
1768
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
61,263!
1769
    if (pInfo == NULL) {
61,255!
1770
      TAOS_CHECK_EXIT(terrno);
×
1771
    }
1772
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
61,255!
1773
      taosMemoryFreeClear(pInfo);
×
1774
      goto _exit;
×
1775
    }
1776
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
122,536!
1777
      TAOS_CHECK_EXIT(terrno);
×
1778
    }
1779
  }
1780

1781
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
42,075✔
1782
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
43,816!
1783
  }
1784

1785
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
42,073✔
1786
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
41,740!
1787
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
20,870!
1788
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
20,871!
1789
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
20,869!
1790
      TAOS_CHECK_EXIT(terrno);
×
1791
    }
1792
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
41,732!
1793
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
21,203!
1794
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
×
1795
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
21,203!
1796
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
1797
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
21,203✔
1798
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
4,980!
1799
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
4,980!
1800
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
2,490!
1801
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
18,713✔
1802
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
18,709!
1803
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
18,707!
1804
  } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
4!
1805
    TAOS_CHECK_EXIT(tDeserializeDispatcherTaskInfo(pDecoder, &pTask->outputInfo.vtableMapDispatcher.taskInfos));
×
1806
    TAOS_CHECK_EXIT(tDeserializeDispatcherVtableMap(pDecoder, &pTask->outputInfo.vtableMapDispatcher.vtableMap));
×
1807
  }
1808
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
84,129!
1809
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
42,065!
1810
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
84,135!
1811
  }
1812
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
42,066!
1813

1814
  if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
42,072!
1815
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.hasAggTasks));
84,145!
1816
    TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo));
42,073!
1817
    TAOS_CHECK_EXIT(tDecodeVTablesInfo(pDecoder, &pTask->pVTables));
42,075!
1818
  }
1819

1820
  tEndDecode(pDecoder);
42,067✔
1821

1822
_exit:
42,080✔
1823
  return code;
42,080✔
1824
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc