• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3583

17 Jan 2025 07:28AM UTC coverage: 63.876% (+0.07%) from 63.803%
#3583

push

travis-ci

web-flow
Merge pull request #29594 from taosdata/fix/insert-when-2-replicas

fix/insert-when-2-replicas

141608 of 284535 branches covered (49.77%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

396 existing lines in 105 files now uncovered.

220075 of 281695 relevant lines covered (78.13%)

19864675.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.22
/source/libs/stream/src/streamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "osDir.h"
18
#include "osMemory.h"
19
#include "streamInt.h"
20
#include "streamsm.h"
21
#include "tmisce.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25
#include "streamMsg.h"
26

27
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
28
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
29
static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
30
static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
31

32
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
14,003✔
33
  int32_t childId = taosArrayGetSize(pArray);
14,003✔
34
  pTask->info.selfChildId = childId;
14,003✔
35
  void* p = taosArrayPush(pArray, &pTask);
14,003✔
36
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
14,003!
37
}
38

39
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
515✔
40
  int32_t code = 0;
515✔
41
  char    buf[512] = {0};
515✔
42

43
  if (pTask->info.nodeId == nodeId) {  // execution task should be moved away
515✔
44
    bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
164✔
45
    code = epsetToStr(pEpSet, buf, tListLen(buf));
164✔
46
    if (code) { // print error and continue
164!
47
      stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
48
      return code;
×
49
    }
50

51
    if (!isEqual) {
164✔
52
      (*pUpdated) = true;
126✔
53
      char tmp[512] = {0};
126✔
54
      code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));  // only for log file, ignore errors
126✔
55
      if (code) { // print error and continue
126!
56
        stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
57
        return code;
×
58
      }
59

60
      epsetAssign(&pTask->info.epSet, pEpSet);
126✔
61
      stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
126!
62
    } else {
63
      stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", pTask->id.taskId, nodeId, buf);
38!
64
    }
65
  }
66

67
  // check for the dispatch info and the upstream task info
68
  int32_t level = pTask->info.taskLevel;
515✔
69
  if (level == TASK_LEVEL__SOURCE) {
515✔
70
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
249✔
71
  } else if (level == TASK_LEVEL__AGG) {
266✔
72
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
17✔
73
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
17✔
74
  } else {  // TASK_LEVEL__SINK
75
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
249✔
76
  }
77

78
  return code;
515✔
79
}
80

81
static void freeItem(void* p) {
×
82
  SStreamContinueExecInfo* pInfo = p;
×
83
  rpcFreeCont(pInfo->msg.pCont);
×
84
}
×
85

86
static void freeUpstreamItem(void* p) {
87,182✔
87
  SStreamUpstreamEpInfo** pInfo = p;
87,182✔
88
  taosMemoryFree(*pInfo);
87,182!
89
}
87,187✔
90

91
static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
20,110✔
92
  SStreamUpstreamEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamUpstreamEpInfo));
20,110!
93
  if (pEpInfo == NULL) {
20,110!
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
95
    return NULL;
×
96
  }
97

98
  pEpInfo->childId = pTask->info.selfChildId;
20,110✔
99
  pEpInfo->epSet = pTask->info.epSet;
20,110✔
100
  pEpInfo->nodeId = pTask->info.nodeId;
20,110✔
101
  pEpInfo->taskId = pTask->id.taskId;
20,110✔
102
  pEpInfo->stage = -1;
20,110✔
103

104
  return pEpInfo;
20,110✔
105
}
106

107
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int32_t trigger,
14,003✔
108
                       int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
109
                       SStreamTask** p) {
110
  *p = NULL;
14,003✔
111

112
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
14,003!
113
  if (pTask == NULL) {
14,003!
114
    stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId,
×
115
            (int32_t)sizeof(SStreamTask), tstrerror(terrno));
116
    return terrno;
×
117
  }
118

119
  pTask->ver = SSTREAM_TASK_VER;
14,003✔
120
  pTask->id.taskId = tGenIdPI32();
14,003✔
121
  pTask->id.streamId = streamId;
14,003✔
122

123
  pTask->info.taskLevel = taskLevel;
14,003✔
124
  pTask->info.fillHistory = fillHistory;
14,003✔
125
  pTask->info.trigger = trigger;
14,003✔
126
  pTask->info.delaySchedParam = triggerParam;
14,003✔
127
  pTask->subtableWithoutMd5 = subtableWithoutMd5;
14,003✔
128

129
  int32_t code = streamCreateStateMachine(pTask);
14,003✔
130
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
14,003!
131
    taosMemoryFreeClear(pTask);
×
132
    return code;
×
133
  }
134

135
  char    buf[128] = {0};
14,003✔
136
  int32_t ret = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
14,003✔
137
  if (ret < 0 || ret >= tListLen(buf)) {
14,003!
138
    stError("s-task:0x%x failed to set the taskIdstr, code: out of buffer", pTask->id.taskId);
×
139
    return TSDB_CODE_OUT_OF_BUFFER;
×
140
  }
141

142
  pTask->id.idStr = taosStrdup(buf);
14,003!
143
  if (pTask->id.idStr == NULL) {
14,003!
144
    stError("s-task:0x%x failed to build task id, code: out of memory", pTask->id.taskId);
×
145
    return terrno;
×
146
  }
147

148
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
14,003✔
149
  pTask->status.taskStatus = fillHistory ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
14,003✔
150
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
14,003✔
151
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
14,003✔
152

153
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
14,003✔
154
  code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
14,003✔
155
  if (code) {
14,003!
156
    return code;
×
157
  }
158

159
  if (fillHistory && !hasFillhistory) {
14,003!
160
    stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId);
×
161
    return TSDB_CODE_INVALID_PARA;
×
162
  }
163

164
  epsetAssign(&(pTask->info.mnodeEpset), pEpset);
14,003✔
165

166
  code = addToTaskset(pTaskList, pTask);
14,003✔
167
  *p = pTask;
14,003✔
168

169
  return code;
14,003✔
170
}
171

172
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
856✔
173
  int64_t skip64;
174
  int8_t  skip8;
175
  int32_t skip32;
176
  int16_t skip16;
177
  SEpSet  epSet;
178

179
  if (tStartDecode(pDecoder) < 0) return -1;
856!
180
  if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
1,712!
181
  // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
182

183
  if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
856!
184
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
856!
185
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
856!
186
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
856!
187
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
856!
188
  if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
856!
189

190
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
856!
191
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
856!
192

193
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
856!
194
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
856!
195
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
856!
196
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
856!
197

198
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
1,712!
199
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
1,712!
200

201
  tEndDecode(pDecoder);
856✔
202
  return 0;
856✔
203
}
204

205
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
56✔
206
  int64_t ver;
207
  if (tStartDecode(pDecoder) < 0) return -1;
56!
208
  if (tDecodeI64(pDecoder, &ver) < 0) return -1;
56!
209
  if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
56!
210

211
  if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
112!
212

213
  int32_t taskId = 0;
56✔
214
  if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
56!
215

216
  pTaskId->taskId = taskId;
56✔
217
  tEndDecode(pDecoder);
56✔
218
  return 0;
56✔
219
}
220

221
void tFreeStreamTask(void* pParam) {
63,124✔
222
  char*        p = NULL;
63,124✔
223
  SStreamTask* pTask = pParam;
63,124✔
224
  int32_t      taskId = pTask->id.taskId;
63,124✔
225

226
  STaskExecStatisInfo* pStatis = &pTask->execInfo;
63,124✔
227

228
  ETaskStatus status1 = TASK_STATUS__UNINIT;
63,124✔
229
  streamMutexLock(&pTask->lock);
63,124✔
230
  if (pTask->status.pSM != NULL) {
63,152✔
231
    SStreamTaskState status = streamTaskGetStatus(pTask);
28,795✔
232
    p = status.name;
28,777✔
233
    status1 = status.state;
28,777✔
234
  }
235
  streamMutexUnlock(&pTask->lock);
63,134✔
236

237
  stDebug("start to free s-task:0x%x %p, state:%s, refId:%" PRId64, taskId, pTask, p, pTask->id.refId);
63,142✔
238

239
  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
63,142✔
240
  stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
63,142✔
241
          ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
242
          " nextProcessVer:%" PRId64 ", checkpointCount:%d",
243
          taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs,
244
          pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
245

246
  if (pTask->schedInfo.pDelayTimer != NULL) {
63,142✔
247
    streamTmrStop(pTask->schedInfo.pDelayTimer);
1,306✔
248
    pTask->schedInfo.pDelayTimer = NULL;
1,306✔
249
  }
250

251
  if (pTask->hTaskInfo.pTimer != NULL) {
63,142✔
252
    streamTmrStop(pTask->hTaskInfo.pTimer);
1,733✔
253
    pTask->hTaskInfo.pTimer = NULL;
1,734✔
254
  }
255

256
  if (pTask->msgInfo.pRetryTmr != NULL) {
63,143✔
257
    streamTmrStop(pTask->msgInfo.pRetryTmr);
5,421✔
258
    pTask->msgInfo.pRetryTmr = NULL;
5,422✔
259
  }
260

261
  if (pTask->inputq.queue) {
63,144✔
262
    streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
14,761✔
263
    pTask->inputq.queue = NULL;
14,762✔
264
  }
265

266
  if (pTask->outputq.queue) {
63,145✔
267
    streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
14,759✔
268
    pTask->outputq.queue = NULL;
14,765✔
269
  }
270

271
  if (pTask->exec.qmsg) {
63,151✔
272
    taosMemoryFree(pTask->exec.qmsg);
33,323!
273
  }
274

275
  if (pTask->exec.pExecutor) {
63,153✔
276
    qDestroyTask(pTask->exec.pExecutor);
7,447✔
277
    pTask->exec.pExecutor = NULL;
7,447✔
278
  }
279

280
  if (pTask->exec.pWalReader != NULL) {
63,153✔
281
    walCloseReader(pTask->exec.pWalReader);
7,421✔
282
    pTask->exec.pWalReader = NULL;
7,421✔
283
  }
284

285
  streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
63,153✔
286

287
  if (pTask->msgInfo.pData != NULL) {
63,154✔
288
    clearBufferedDispatchMsg(pTask);
68✔
289
  }
290

291
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
63,151✔
292
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
30,972!
293
    taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
30,976!
294
    tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo);
30,971✔
295
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
30,978✔
296
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
32,179✔
297
    taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
27,182✔
298
  }
299

300
  streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
63,157✔
301
  streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0);
63,155✔
302

303
  if (pTask->pNameMap) {
63,149✔
304
    tSimpleHashCleanup(pTask->pNameMap);
2,343✔
305
  }
306

307
  streamDestroyStateMachine(pTask->status.pSM);
63,149✔
308
  pTask->status.pSM = NULL;
63,159✔
309

310
  streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
63,159✔
311

312
  taosMemoryFree(pTask->outputInfo.pTokenBucket);
63,160!
313
  streamMutexDestroy(&pTask->lock);
63,161✔
314

315
  taosArrayDestroy(pTask->msgInfo.pSendInfo);
63,160✔
316
  pTask->msgInfo.pSendInfo = NULL;
63,153✔
317
  streamMutexDestroy(&pTask->msgInfo.lock);
63,153✔
318

319
  taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
63,158✔
320
  pTask->outputInfo.pNodeEpsetUpdateList = NULL;
63,159✔
321

322
  if (pTask->id.idStr != NULL) {
63,159✔
323
    taosMemoryFree((void*)pTask->id.idStr);
28,767!
324
  }
325

326
  streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
63,160✔
327
  pTask->chkInfo.pActiveInfo = NULL;
63,154✔
328

329
  taosMemoryFree(pTask);
63,154!
330
  stDebug("s-task:0x%x free task completed", taskId);
63,159✔
331
}
63,159✔
332

333
void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
63,151✔
334
  stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId);
63,151✔
335
  if (pTask->pState != NULL) {
63,154✔
336
    stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
7,446✔
337
    streamStateClose(pTask->pState, remove);
7,446✔
338

339
    if (remove) taskDbSetClearFileFlag(pTask->pBackend);
7,447✔
340
    taskDbRemoveRef(pTask->pBackend);
7,447✔
341
    pTask->pBackend = NULL;
7,446✔
342
    pTask->pState = NULL;
7,446✔
343
  } else {
344
    stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
55,708✔
345
            pTask->backendPath ? pTask->backendPath : "NULL");
346
    if (remove) {
55,708✔
347
      if (pTask->backendPath != NULL) {
3,428!
348
        stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
3,433✔
349
        taosRemoveDir(pTask->backendPath);
3,433✔
350
      }
351
    }
352
  }
353

354
  if (pTask->backendPath != NULL) {
63,146✔
355
    taosMemoryFree(pTask->backendPath);
14,765!
356
    pTask->backendPath = NULL;
14,768✔
357
  }
358
}
63,149✔
359

360
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
15,006✔
361
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
15,006✔
362
  SDataRange*      pRange = &pTask->dataRange;
15,006✔
363

364
  // only set the version info for stream tasks without fill-history task
365
  if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
15,006✔
366
    pChkInfo->checkpointVer = ver - 1;  // only update when generating checkpoint
4,740✔
367
    pChkInfo->processedVer = ver - 1;   // already processed version
4,740✔
368
    pChkInfo->nextProcessVer = ver;     // next processed version
4,740✔
369

370
    pRange->range.maxVer = ver;
4,740✔
371
    pRange->range.minVer = ver;
4,740✔
372
  } else {
373
    // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
374
    // is set at the mnode.
375
    if (pTask->info.fillHistory == 1) {
10,266✔
376
      pChkInfo->checkpointVer = pRange->range.maxVer;
5,193✔
377
      pChkInfo->processedVer = pRange->range.maxVer;
5,193✔
378
      pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
5,193✔
379
    } else {
380
      pChkInfo->checkpointVer = pRange->range.minVer - 1;
5,073✔
381
      pChkInfo->processedVer = pRange->range.minVer - 1;
5,073✔
382
      pChkInfo->nextProcessVer = pRange->range.minVer;
5,073✔
383

384
      {  // for compatible purpose, remove it later
385
        if (pRange->range.minVer == 0) {
5,073✔
386
          pChkInfo->checkpointVer = 0;
2,562✔
387
          pChkInfo->processedVer = 0;
2,562✔
388
          pChkInfo->nextProcessVer = 1;
2,562✔
389
          stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
2,562✔
390
        }
391
      }
392
    }
393
  }
394
}
15,006✔
395

396
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
15,022✔
397
  int64_t streamId = 0;
15,022✔
398
  int32_t taskId = 0;
15,022✔
399

400
  if (pTask->info.fillHistory) {
15,022✔
401
    streamId = pTask->streamTaskId.streamId;
5,192✔
402
    taskId = pTask->streamTaskId.taskId;
5,192✔
403
  } else {
404
    streamId = pTask->id.streamId;
9,830✔
405
    taskId = pTask->id.taskId;
9,830✔
406
  }
407

408
  char    id[128] = {0};
15,022✔
409
  int32_t nBytes = snprintf(id, tListLen(id), "0x%" PRIx64 "-0x%x", streamId, taskId);
15,022✔
410
  if (nBytes < 0 || nBytes >= sizeof(id)) {
15,022!
411
    return TSDB_CODE_OUT_OF_BUFFER;
×
412
  }
413

414
  int32_t len = strlen(pTask->pMeta->path);
15,022✔
415
  pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
15,022!
416
  if (pTask->backendPath == NULL) {
15,019!
417
    return terrno;
×
418
  }
419

420
  int32_t code = snprintf(pTask->backendPath, len + nBytes + 2, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
15,019✔
421
  if (code < 0 || code >= len + nBytes + 2) {
15,019!
422
    stError("s-task:%s failed to set backend path:%s, code: out of buffer", pTask->id.idStr, pTask->backendPath);
2!
423
    return TSDB_CODE_OUT_OF_BUFFER;
×
424
  } else {
425
    stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
15,017✔
426
    return 0;
15,018✔
427
  }
428
}
429

430
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
15,005✔
431
  int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
15,005✔
432
  if (code) {
15,003!
433
    stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
×
434
    return code;
×
435
  }
436

437
  pTask->id.refId = 0;
15,003✔
438
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
15,003✔
439
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
15,003✔
440

441
  int32_t code1 = streamQueueOpen(512 << 10, &pTask->inputq.queue);
15,003✔
442
  int32_t code2 = streamQueueOpen(512 << 10, &pTask->outputq.queue);
15,009✔
443
  if (code1 || code2) {
15,022!
444
    stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
1!
445
    return TSDB_CODE_OUT_OF_MEMORY;
×
446
  }
447

448
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
15,021✔
449

450
  code = streamCreateStateMachine(pTask);
15,021✔
451
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
15,010!
452
    stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
×
453
            tstrerror(code));
454
    return code;
×
455
  }
456

457
  pTask->execInfo.created = taosGetTimestampMs();
15,020✔
458
  setInitialVersionInfo(pTask, ver);
15,020✔
459

460
  pTask->pMeta = pMeta;
15,014✔
461
  pTask->pMsgCb = pMsgCb;
15,014✔
462
  pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry));
15,014✔
463
  if (pTask->msgInfo.pSendInfo == NULL) {
15,017!
464
    stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr);
×
465
    return terrno;
×
466
  }
467

468
  code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
15,017✔
469
  if (code) {
15,013!
470
    stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code));
×
471
    return code;
×
472
  }
473

474
  TdThreadMutexAttr attr = {0};
15,013✔
475
  code = taosThreadMutexAttrInit(&attr);
15,013✔
476
  if (code != 0) {
15,001!
477
    stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
×
478
    return code;
×
479
  }
480

481
  code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
15,001✔
482
  if (code != 0) {
14,997!
483
    stError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
×
484
    return code;
×
485
  }
486

487
  code = taosThreadMutexInit(&pTask->lock, &attr);
14,997✔
488
  if (code) {
15,010!
489
    return code;
×
490
  }
491

492
  code = taosThreadMutexAttrDestroy(&attr);
15,010✔
493
  if (code) {
14,995!
494
    return code;
×
495
  }
496

497
  streamTaskOpenAllUpstreamInput(pTask);
14,995✔
498

499
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
14,991✔
500
  pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
14,991!
501
  if (pOutputInfo->pTokenBucket == NULL) {
15,016!
502
    stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno));
×
503
    return terrno;
×
504
  }
505

506
  // 2MiB per second for sink task
507
  // 50 times sink operator per second
508
  code = streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
15,016✔
509
  if (code) {
15,022!
510
    return code;
×
511
  }
512

513
  pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
15,022✔
514
  if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
15,020!
515
    stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno));
×
516
    return terrno;
×
517
  }
518

519
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
15,020✔
520
  if (pTask->taskCheckInfo.pList == NULL) {
15,016!
521
    stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno));
×
522
    return terrno;
×
523
  }
524

525
  if (pTask->chkInfo.pActiveInfo == NULL) {
15,016✔
526
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
15,015✔
527
    if (code) {
15,022!
528
      stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code));
×
529
      return code;
×
530
    }
531
  }
532

533
  return streamTaskSetBackendPath(pTask);
15,023✔
534
}
535

536
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
115,239✔
537
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
115,239✔
538
    return 0;
6,876✔
539
  }
540

541
  int32_t type = pTask->outputInfo.type;
108,363✔
542
  if (type == TASK_OUTPUT__TABLE) {
108,363✔
543
    return 0;
270✔
544
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
108,093✔
545
    return 1;
11,357✔
546
  } else {
547
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
96,736✔
548
    return taosArrayGetSize(vgInfo);
96,736✔
549
  }
550
}
551

552
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) { return taosArrayGetSize(pTask->upstreamInfo.pList); }
19,575✔
553

554
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
20,110✔
555
  SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
20,110✔
556
  if (pEpInfo == NULL) {
20,110!
557
    return terrno;
×
558
  }
559

560
  if (pTask->upstreamInfo.pList == NULL) {
20,110✔
561
    pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES);
6,952✔
562
  }
563

564
  void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
20,110✔
565
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
20,110!
566
}
567

568
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
266✔
569
  int32_t code = 0;
266✔
570
  char    buf[512] = {0};
266✔
571
  code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore error since it is only for log file.
266✔
572
  if (code != 0) {  // print error and continue
266!
573
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
574
    return code;
×
575
  }
576

577
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
266✔
578
  for (int32_t i = 0; i < numOfUpstream; ++i) {
510✔
579
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
427✔
580
    if (pInfo->nodeId == nodeId) {
427✔
581
      bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
183✔
582
      if (!equal) {
183✔
583
        *pUpdated = true;
127✔
584

585
        char tmp[512] = {0};
127✔
586
        code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
127✔
587
        if (code != 0) {  // print error and continue
127!
588
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
589
          return code;
×
590
        }
591

592
        epsetAssign(&pInfo->epSet, pEpSet);
127✔
593
        stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
127!
594
                pInfo->taskId, nodeId, buf, tmp);
595
      } else {
596
        stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId,
56!
597
                pInfo->taskId, nodeId, buf);
598
      }
599

600
      break;
183✔
601
    }
602
  }
603

604
  return code;
266✔
605
}
606

607
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
63,150✔
608
  if (pUpstreamInfo->pList != NULL) {
63,150✔
609
    taosArrayDestroyEx(pUpstreamInfo->pList, freeUpstreamItem);
56,071✔
610
    pUpstreamInfo->numOfClosed = 0;
56,076✔
611
    pUpstreamInfo->pList = NULL;
56,076✔
612
  }
613
}
63,155✔
614

615
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
966✔
616
  STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
966✔
617
  pDispatcher->taskId = pDownstreamTask->id.taskId;
966✔
618
  pDispatcher->nodeId = pDownstreamTask->info.nodeId;
966✔
619
  pDispatcher->epSet = pDownstreamTask->info.epSet;
966✔
620

621
  pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
966✔
622
  pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
966✔
623
}
966✔
624

625
int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
266✔
626
  char    buf[512] = {0};
266✔
627
  int32_t code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore the error since only for log files.
266✔
628
  if (code != 0) {                                        // print error and continue
266!
629
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
630
    return code;
×
631
  }
632

633
  int32_t id = pTask->id.taskId;
266✔
634
  int8_t  type = pTask->outputInfo.type;
266✔
635

636
  if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
266✔
637
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
181✔
638

639
    for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
344✔
640
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
342✔
641
      if (pVgInfo == NULL) {
342!
642
        continue;
×
643
      }
644

645
      if (pVgInfo->vgId == nodeId) {
342✔
646
        bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
179✔
647
        if (!isEqual) {
179✔
648
          *pUpdated = true;
127✔
649

650
          char tmp[512] = {0};
127✔
651
          code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
127✔
652
          if (code != 0) {  // print error and continue
127!
653
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
654
            return code;
×
655
          }
656

657
          epsetAssign(&pVgInfo->epSet, pEpSet);
127✔
658
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
127!
659
                  nodeId, buf, tmp);
660
        } else {
661
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
52!
662
                  pVgInfo->taskId, nodeId, buf);
663
        }
664
        break;
179✔
665
      }
666
    }
667
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
85!
668
    STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
85✔
669
    if (pDispatcher->nodeId == nodeId) {
85✔
670
      bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet);
4✔
671
      if (!equal) {
4!
672
        *pUpdated = true;
×
673

674
        char tmp[512] = {0};
×
675
        code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
×
676
        if (code != 0) {  // print error and continue
×
677
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
678
          return code;
×
679
        }
680

681
        epsetAssign(&pDispatcher->epSet, pEpSet);
×
682
        stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
×
683
                nodeId, buf, tmp);
684
      } else {
685
        stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
4!
686
                pDispatcher->taskId, nodeId, buf);
687
      }
688
    }
689
  }
690

691
  return code;
266✔
692
}
693

694
int32_t streamTaskStop(SStreamTask* pTask) {
3,250✔
695
  int32_t     vgId = pTask->pMeta->vgId;
3,250✔
696
  int64_t     st = taosGetTimestampMs();
3,250✔
697
  const char* id = pTask->id.idStr;
3,250✔
698

699
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
3,250✔
700
  if (code) {
3,250!
701
    stError("failed to handle STOP event, s-task:%s, code:%s", id, tstrerror(code));
×
702
    return code;
×
703
  }
704

705
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
3,250✔
706
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
1,622✔
707
    if (code != TSDB_CODE_SUCCESS) {
1,622!
708
      stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
×
709
    }
710
  }
711

712
  while (!streamTaskIsIdle(pTask)) {
3,250!
713
    stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
×
714
            pTask->info.taskLevel);
715
    taosMsleep(100);
×
716
  }
717

718
  int64_t el = taosGetTimestampMs() - st;
3,250✔
719
  stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
3,250✔
720
  return code;
3,250✔
721
}
722

723
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
273✔
724
  STaskExecStatisInfo* p = &pTask->execInfo;
273✔
725

726
  int32_t numOfNodes = taosArrayGetSize(pNodeList);
273✔
727
  int64_t prevTs = p->latestUpdateTs;
273✔
728

729
  p->latestUpdateTs = taosGetTimestampMs();
273✔
730
  p->updateCount += 1;
273✔
731
  stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
273!
732
          numOfNodes, p->updateCount, prevTs);
733

734
  bool updated = false;
273✔
735
  for (int32_t i = 0; i < numOfNodes; ++i) {
788✔
736
    SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
515✔
737
    if (pInfo == NULL) {
515!
738
      continue;
×
739
    }
740

741
    int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
515✔
742
    if (code) {
515!
743
      stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
×
744
    }
745
  }
746

747
  return updated;
273✔
748
}
749

750
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
15,021✔
751
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
15,021✔
752
    return;
7,547✔
753
  }
754

755
  int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList);
7,474✔
756
  for (int32_t i = 0; i < size; ++i) {
28,803✔
757
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
21,328✔
758
    pInfo->stage = -1;
21,329✔
759
  }
760

761
  stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
7,475✔
762
}
763

764
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
25,077✔
765
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
25,077✔
766
  if (num == 0) {
25,088✔
767
    return;
12,576✔
768
  }
769

770
  for (int32_t i = 0; i < num; ++i) {
47,978✔
771
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
35,476✔
772
    pInfo->dataAllowed = true;
35,466✔
773
  }
774

775
  pTask->upstreamInfo.numOfClosed = 0;
12,502✔
776
  stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num);
12,502✔
777
}
778

779
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
8,241✔
780
  SStreamUpstreamEpInfo* pInfo = NULL;
8,241✔
781
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
8,241✔
782

783
  if ((pInfo != NULL) && pInfo->dataAllowed) {
8,233!
784
    pInfo->dataAllowed = false;
8,232✔
785
    if (pTask->upstreamInfo.numOfClosed < streamTaskGetNumOfUpstream(pTask)) {
8,232!
786
      int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
8,229✔
787
    } else {
788
      stError("s-task:%s not inc closed input, since they have been all closed already", pTask->id.idStr);
×
789
    }
790
  }
791
}
8,260✔
792

793
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
1✔
794
  SStreamUpstreamEpInfo* pInfo = NULL;
1✔
795
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
1✔
796

797
  if (pInfo != NULL && (!pInfo->dataAllowed)) {
1!
798
    int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
1✔
799
    stDebug("s-task:%s open inputQ for upstream:0x%x, remain closed:%d", pTask->id.idStr, taskId, t);
1!
800
    pInfo->dataAllowed = true;
1✔
801
  }
802
}
1✔
803

804
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
×
805
  return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
×
806
}
807

808
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
101,558✔
809
  bool ret = false;
101,558✔
810

811
  streamMutexLock(&pTask->lock);
101,558✔
812
  if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
101,574✔
813
    pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
70,961✔
814
    ret = true;
70,961✔
815
  }
816

817
  streamMutexUnlock(&pTask->lock);
101,574✔
818
  return ret;
101,586✔
819
}
820

821
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
69,737✔
822
  streamMutexLock(&pTask->lock);
69,737✔
823
  int8_t status = pTask->status.schedStatus;
69,776✔
824
  if (status == TASK_SCHED_STATUS__WAITING) {
69,776✔
825
    pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
69,772✔
826
  }
827
  streamMutexUnlock(&pTask->lock);
69,776✔
828

829
  return status;
69,803✔
830
}
831

832
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
1,100✔
833
  streamMutexLock(&pTask->lock);
1,100✔
834
  int8_t status = pTask->status.schedStatus;
1,100✔
835
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
1,100✔
836
  streamMutexUnlock(&pTask->lock);
1,100✔
837

838
  return status;
1,100✔
839
}
840

841
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
7,011✔
842
  int32_t      code = 0;
7,011✔
843
  SStreamMeta* pMeta = pTask->pMeta;
7,011✔
844
  SStreamTask* pStreamTask = NULL;
7,011✔
845

846
  if (pTask->info.fillHistory == 0) {
7,011!
847
    return code;
7,020✔
848
  }
849

UNCOV
850
  code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
×
851
  if (code == 0) {
4!
852
    stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
×
853
            (int32_t)pTask->streamTaskId.taskId);
854

855
    streamMutexLock(&(pStreamTask->lock));
×
856
    CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
×
857

858
    if (resetRelHalt) {
×
859
      stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
×
860
              pTask->streamTaskId.taskId, streamTaskGetStatusStr(pStreamTask->status.taskStatus),
861
              streamTaskGetStatus(pStreamTask).name);
862
      pStreamTask->status.taskStatus = TASK_STATUS__READY;
×
863
    }
864

865
    code = streamMetaSaveTask(pMeta, pStreamTask);
×
866
    streamMutexUnlock(&(pStreamTask->lock));
×
867

868
    streamMetaReleaseTask(pMeta, pStreamTask);
×
869
  }
870

871
  return code;
4✔
872
}
873

874
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
4✔
875
  SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
4✔
876
  if (pReq == NULL) {
4!
877
    return terrno;
×
878
  }
879

880
  pReq->head.vgId = vgId;
4✔
881
  pReq->taskId = pTaskId->taskId;
4✔
882
  pReq->streamId = pTaskId->streamId;
4✔
883
  pReq->resetRelHalt = resetRelHalt;
4✔
884

885
  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
4✔
886
  int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
4✔
887
  if (code != TSDB_CODE_SUCCESS) {
4!
888
    stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
×
889
  } else {
890
    stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
4!
891
  }
892

893
  return code;
4✔
894
}
895

896
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
6,248✔
897
  int32_t                code = 0;
6,248✔
898
  int32_t                tlen = 0;
6,248✔
899
  int32_t                vgId = pTask->pMeta->vgId;
6,248✔
900
  const char*            id = pTask->id.idStr;
6,248✔
901
  SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo;
6,248✔
902

903
  SCheckpointReport req = {.streamId = pTask->id.streamId,
6,248✔
904
                           .taskId = pTask->id.taskId,
6,248✔
905
                           .nodeId = vgId,
906
                           .dropHTask = dropRelHTask,
907
                           .transId = pActive->transId,
6,248✔
908
                           .checkpointId = pActive->activeId,
6,248✔
909
                           .checkpointVer = pCheckpointInfo->processedVer,
6,248✔
910
                           .checkpointTs = pCheckpointInfo->startTs};
6,248✔
911

912
  tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code);
6,248!
913
  if (code < 0) {
6,247!
914
    stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code));
×
915
    return -1;
×
916
  }
917

918
  void* buf = rpcMallocCont(tlen);
6,247✔
919
  if (buf == NULL) {
6,247!
920
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId,
×
921
            tstrerror(TSDB_CODE_OUT_OF_MEMORY));
922
    return -1;
×
923
  }
924

925
  SEncoder encoder;
926
  tEncoderInit(&encoder, buf, tlen);
6,247✔
927
  if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
6,248!
928
    rpcFreeCont(buf);
×
929
    tEncoderClear(&encoder);
×
930
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
×
931
    return -1;
×
932
  }
933
  tEncoderClear(&encoder);
6,248✔
934

935
  SRpcMsg msg = {0};
6,246✔
936
  initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen);
6,246✔
937
  stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId);
6,244✔
938

939
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
6,244✔
940
}
941

942
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
82,490✔
943
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
82,490✔
944
  return id;
82,490✔
945
}
946

947
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
1,782✔
948
  pInfo->waitInterval = LAUNCH_HTASK_INTERVAL;
1,782✔
949
  pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL);
1,782✔
950
  pInfo->retryTimes = 0;
1,782✔
951
}
1,782✔
952

953
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
1,778✔
954
  pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
1,778✔
955
  pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
1,778✔
956
  pInfo->retryTimes += 1;
1,778✔
957
}
1,778✔
958

959
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) {
9,253✔
960
  pEntry->id.streamId = pTask->id.streamId;
9,253✔
961
  pEntry->id.taskId = pTask->id.taskId;
9,253✔
962
  pEntry->stage = -1;
9,253✔
963
  pEntry->nodeId = pTask->info.nodeId;
9,253✔
964
  pEntry->status = TASK_STATUS__STOP;
9,253✔
965
}
9,253✔
966

967
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) {
65,600✔
968
  pDst->stage = pSrc->stage;
65,600✔
969
  pDst->inputQUsed = pSrc->inputQUsed;
65,600✔
970
  pDst->inputRate = pSrc->inputRate;
65,600✔
971
  pDst->procsTotal = pSrc->procsTotal;
65,600✔
972
  pDst->procsThroughput = pSrc->procsThroughput;
65,600✔
973
  pDst->outputTotal = pSrc->outputTotal;
65,600✔
974
  pDst->outputThroughput = pSrc->outputThroughput;
65,600✔
975
  pDst->processedVer = pSrc->processedVer;
65,600✔
976
  pDst->verRange = pSrc->verRange;
65,600✔
977
  pDst->sinkQuota = pSrc->sinkQuota;
65,600✔
978
  pDst->sinkDataSize = pSrc->sinkDataSize;
65,600✔
979
  pDst->checkpointInfo = pSrc->checkpointInfo;
65,600✔
980
  pDst->startCheckpointId = pSrc->startCheckpointId;
65,600✔
981
  pDst->startCheckpointVer = pSrc->startCheckpointVer;
65,600✔
982
  pDst->status = pSrc->status;
65,600✔
983

984
  pDst->startTime = pSrc->startTime;
65,600✔
985
  pDst->hTaskId = pSrc->hTaskId;
65,600✔
986
}
65,600✔
987

988
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
66,362✔
989
  SStreamMeta*         pMeta = pTask->pMeta;
66,362✔
990
  STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
66,362✔
991

992
  STaskStatusEntry entry = {
199,086✔
993
      .id = streamTaskGetTaskId(pTask),
66,362✔
994
      .status = streamTaskGetStatus(pTask).state,
66,362✔
995
      .nodeId = pMeta->vgId,
66,362✔
996
      .stage = pMeta->stage,
66,362✔
997

998
      .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)),
66,362✔
999
      .startTime = pExecInfo->readyTs,
66,362✔
1000
      .checkpointInfo.latestId = pTask->chkInfo.checkpointId,
66,362✔
1001
      .checkpointInfo.latestVer = pTask->chkInfo.checkpointVer,
66,362✔
1002
      .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
66,362✔
1003
      .checkpointInfo.latestSize = 0,
1004
      .checkpointInfo.remoteBackup = 0,
1005
      .checkpointInfo.consensusChkptId = 0,
1006
      .checkpointInfo.consensusTs = 0,
1007
      .hTaskId = pTask->hTaskInfo.id.taskId,
66,362✔
1008
      .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
66,362✔
1009
      .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
66,362✔
1010
      .procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput),
66,362✔
1011
      .outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput),
66,362✔
1012
      .startCheckpointId = pExecInfo->startCheckpointId,
66,362✔
1013
      .startCheckpointVer = pExecInfo->startCheckpointVer,
66,362✔
1014
  };
1015
  return entry;
66,362✔
1016
}
1017

1018
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
1,412✔
1019
  SStreamMeta* pMeta = pTask->pMeta;
1,412✔
1020
  int32_t      code = 0;
1,412✔
1021

1022
  int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
1,412✔
1023
  stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
1,414!
1024

1025
  // in case of fill-history task, stop the tsdb file scan operation.
1026
  if (pTask->info.fillHistory == 1) {
1,414✔
1027
    void* pExecutor = pTask->exec.pExecutor;
72✔
1028
    code = qKillTask(pExecutor, TSDB_CODE_SUCCESS);
72✔
1029
  }
1030

1031
  stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
1,414✔
1032
  return code;
1,414✔
1033
}
1034

1035
void streamTaskPause(SStreamTask* pTask) {
1,484✔
1036
  int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
1,484✔
1037
  if (code) {
1,486!
1038
    stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code));
×
1039
  }
1040
}
1,486✔
1041

1042
void streamTaskResume(SStreamTask* pTask) {
2,684✔
1043
  SStreamTaskState prevState = streamTaskGetStatus(pTask);
2,684✔
1044

1045
  SStreamMeta* pMeta = pTask->pMeta;
2,688✔
1046
  int32_t      code = streamTaskRestoreStatus(pTask);
2,688✔
1047
  if (code == TSDB_CODE_SUCCESS) {
2,690✔
1048
    char*   pNew = streamTaskGetStatus(pTask).name;
1,398✔
1049
    int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
1,401✔
1050
    stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
1,400!
1051
  } else {
1052
    stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name,
1,292!
1053
           pMeta->numOfPausedTasks);
1054
  }
1055
}
2,696✔
1056

1057
bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
78,363✔
1058

1059
// this task must success
1060
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
4,456✔
1061
  int32_t     code;
1062
  int32_t     tlen = 0;
4,456✔
1063
  int32_t     vgId = pTask->pMeta->vgId;
4,456✔
1064
  const char* id = pTask->id.idStr;
4,456✔
1065

1066
  SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
4,456✔
1067
  tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
4,456!
1068
  if (code < 0) {
4,455!
1069
    stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
×
1070
    return TSDB_CODE_INVALID_MSG;
×
1071
  }
1072

1073
  void* buf = rpcMallocCont(tlen);
4,455✔
1074
  if (buf == NULL) {
4,460!
1075
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:Out of memory", id, vgId);
×
1076
    return terrno;
×
1077
  }
1078

1079
  SEncoder encoder;
1080
  tEncoderInit(&encoder, buf, tlen);
4,460✔
1081
  if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
4,461!
1082
    rpcFreeCont(buf);
×
1083
    tEncoderClear(&encoder);
×
1084
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
×
1085
    return code;
×
1086
  }
1087

1088
  tEncoderClear(&encoder);
4,459✔
1089

1090
  SRpcMsg msg = {0};
4,458✔
1091
  initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
4,458✔
1092
  stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
4,460✔
1093

1094
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
4,460✔
1095
}
1096

1097
void streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo) {
89,665✔
1098
  *pEpInfo = NULL;
89,665✔
1099

1100
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
89,665✔
1101
  for (int32_t i = 0; i < num; ++i) {
177,925✔
1102
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
177,898✔
1103
    if (pInfo == NULL) {
177,829!
1104
      return;
×
1105
    }
1106

1107
    if (pInfo->taskId == taskId) {
177,829✔
1108
      *pEpInfo = pInfo;
89,594✔
1109
      return;
89,594✔
1110
    }
1111
  }
1112

1113
  stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
27!
1114
}
1115

1116
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
×
1117
  if (pTask->info.taskLevel == TASK_OUTPUT__FIXED_DISPATCH) {
×
1118
    if (pTask->outputInfo.fixedDispatcher.taskId == taskId) {
×
1119
      return &pTask->outputInfo.fixedDispatcher.epSet;
×
1120
    }
1121
  } else if (pTask->info.taskLevel == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
1122
    SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
1123
    for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
1124
      SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
×
1125
      if (pVgInfo == NULL) {
×
1126
        continue;
×
1127
      }
1128

1129
      if (pVgInfo->taskId == taskId) {
×
1130
        return &pVgInfo->epSet;
×
1131
      }
1132
    }
1133
  }
1134

1135
  return NULL;
×
1136
}
1137

1138
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
15,006✔
1139
  char buf[128] = {0};
15,006✔
1140
  int32_t code = snprintf(buf, tListLen(buf),"0x%" PRIx64 "-0x%x", streamId, taskId);
15,006✔
1141
  if (code < 0 || code >= tListLen(buf)) {
15,006!
1142
    return TSDB_CODE_OUT_OF_BUFFER;
×
1143
  }
1144

1145
  *pId = taosStrdup(buf);
15,022!
1146

1147
  if (*pId == NULL) {
14,999!
1148
    return terrno;
×
1149
  } else {
1150
    return TSDB_CODE_SUCCESS;
14,999✔
1151
  }
1152
}
1153

1154
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
559✔
1155
  int32_t           code;
1156
  SStreamDataBlock* pData;
1157

1158
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock), (void**)&pData);
559✔
1159
  if (code) {
559!
1160
    stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
×
1161
    return terrno = code;
×
1162
  }
1163

1164
  pData->type = STREAM_INPUT__DATA_RETRIEVE;
559✔
1165
  pData->srcVgId = 0;
559✔
1166

1167
  code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
559✔
1168
  if (code != TSDB_CODE_SUCCESS) {
558!
1169
    stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code));
×
1170
    taosFreeQitem(pData);
×
1171
    return code;
×
1172
  }
1173

1174
  code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
558✔
1175
  if (code != TSDB_CODE_SUCCESS) {
558!
1176
    stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
×
1177
            pTask->id.idStr);
1178
  }
1179

1180
  return code;
558✔
1181
}
1182

1183
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
559✔
1184
  int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
559✔
1185
  if (code != 0) {
558!
1186
    return code;
×
1187
  }
1188
  return streamTrySchedExec(pTask);
558✔
1189
}
1190

1191
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
7,019✔
1192

1193
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
×
1194
  if (pTransId != NULL) {
×
1195
    *pTransId = pTask->chkInfo.pActiveInfo->transId;
×
1196
  }
1197

1198
  if (pCheckpointId != NULL) {
×
1199
    *pCheckpointId = pTask->chkInfo.pActiveInfo->activeId;
×
1200
  }
1201
}
×
1202

1203
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) {
28✔
1204
  pTask->chkInfo.pActiveInfo->activeId = activeCheckpointId;
28✔
1205
  return TSDB_CODE_SUCCESS;
28✔
1206
}
1207

1208
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
×
1209
  pTask->chkInfo.pActiveInfo->transId = transId;
×
1210
  pTask->chkInfo.pActiveInfo->activeId = checkpointId;
×
1211
  pTask->chkInfo.pActiveInfo->failedId = checkpointId;
×
1212
  stDebug("s-task:%s set failed checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
×
1213
}
×
1214

1215
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
15,048✔
1216
  SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo));
15,048!
1217
  if (pInfo == NULL) {
15,059!
1218
    return terrno;
×
1219
  }
1220

1221
  int32_t code = taosThreadMutexInit(&pInfo->lock, NULL);
15,059✔
1222
  if (code != TSDB_CODE_SUCCESS) {
15,061!
1223
    return code;
×
1224
  }
1225

1226
  pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
15,061✔
1227
  pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
15,064✔
1228
  pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
15,062✔
1229

1230
  *pRes = pInfo;
15,066✔
1231
  return code;
15,066✔
1232
}
1233

1234
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
63,152✔
1235
  if (pInfo == NULL) {
63,152✔
1236
    return;
48,351✔
1237
  }
1238

1239
  streamMutexDestroy(&pInfo->lock);
14,801✔
1240
  taosArrayDestroy(pInfo->pDispatchTriggerList);
14,807✔
1241
  pInfo->pDispatchTriggerList = NULL;
14,808✔
1242
  taosArrayDestroy(pInfo->pReadyMsgList);
14,808✔
1243
  pInfo->pReadyMsgList = NULL;
14,807✔
1244
  taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
14,807✔
1245
  pInfo->pCheckpointReadyRecvList = NULL;
14,809✔
1246

1247
  SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
14,809✔
1248
  if (pTriggerTmr->tmrHandle != NULL) {
14,809✔
1249
    streamTmrStop(pTriggerTmr->tmrHandle);
2,094✔
1250
    pTriggerTmr->tmrHandle = NULL;
2,094✔
1251
  }
1252

1253
  SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
14,809✔
1254
  if (pReadyTmr->tmrHandle != NULL) {
14,809✔
1255
    streamTmrStop(pReadyTmr->tmrHandle);
2,074✔
1256
    pReadyTmr->tmrHandle = NULL;
2,074✔
1257
  }
1258

1259
  taosMemoryFree(pInfo);
14,809!
1260
}
1261

1262
// NOTE: clear the checkpoint id, and keep the failed id
1263
// failedId for a task will increase as the checkpoint I.D. increases.
1264
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
5,535✔
1265
  pInfo->activeId = 0;
5,535✔
1266
  pInfo->transId = 0;
5,535✔
1267
  pInfo->allUpstreamTriggerRecv = 0;
5,535✔
1268
  pInfo->dispatchTrigger = false;
5,535✔
1269

1270
  taosArrayClear(pInfo->pDispatchTriggerList);
5,535✔
1271
  taosArrayClear(pInfo->pCheckpointReadyRecvList);
5,520✔
1272
}
5,529✔
1273

1274
const char* streamTaskGetExecType(int32_t type) {
98,732✔
1275
  switch (type) {
98,732!
1276
    case STREAM_EXEC_T_EXTRACT_WAL_DATA:
35,371✔
1277
      return "scan-wal-file";
35,371✔
1278
    case STREAM_EXEC_T_START_ALL_TASKS:
8,395✔
1279
      return "start-all-tasks";
8,395✔
1280
    case STREAM_EXEC_T_START_ONE_TASK:
5,793✔
1281
      return "start-one-task";
5,793✔
1282
    case STREAM_EXEC_T_RESTART_ALL_TASKS:
47✔
1283
      return "restart-all-tasks";
47✔
1284
    case STREAM_EXEC_T_STOP_ALL_TASKS:
4,719✔
1285
      return "stop-all-tasks";
4,719✔
1286
    case STREAM_EXEC_T_RESUME_TASK:
6,611✔
1287
      return "resume-task-from-idle";
6,611✔
1288
    case STREAM_EXEC_T_ADD_FAILED_TASK:
2✔
1289
      return "record-start-failed-task";
2✔
1290
    case 0:
37,858✔
1291
      return "exec-all-tasks";
37,858✔
1292
    default:
×
1293
      return "invalid-exec-type";
×
1294
  }
1295
}
1296

1297
int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) {
40,906✔
1298
  *pRefId = taosMemoryMalloc(sizeof(int64_t));
40,906!
1299
  if (*pRefId != NULL) {
40,902!
1300
    **pRefId = pTask->id.refId;
40,902✔
1301
    int32_t code = metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
40,902✔
1302
    if (code != 0) {
40,921!
1303
      stError("s-task:%s failed to add refId:%" PRId64 " into refId-mgmt, code:%s", pTask->id.idStr, pTask->id.refId,
×
1304
              tstrerror(code));
1305
    }
1306
    return code;
40,921✔
1307
  } else {
1308
    stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno));
×
1309
    return terrno;
×
1310
  }
1311
}
1312

1313
void streamTaskFreeRefId(int64_t* pRefId) {
38,651✔
1314
  if (pRefId == NULL) {
38,651✔
1315
    return;
2,980✔
1316
  }
1317

1318
  metaRefMgtRemove(pRefId);
35,671✔
1319
}
1320

1321

1322
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
141,542✔
1323
  int32_t code = 0;
141,542✔
1324
  int32_t lino;
1325

1326
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
141,542!
1327
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
283,082!
1328
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
283,082!
1329
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
283,082!
1330
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
283,082!
1331
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
283,082!
1332
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
283,082!
1333
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
283,082!
1334

1335
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
283,082!
1336
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
283,082!
1337

1338
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
283,082!
1339
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
283,082!
1340
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
141,541!
1341
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
141,539!
1342

1343
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
283,082!
1344
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
283,082!
1345
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
283,082!
1346

1347
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
283,082!
1348
  int32_t taskId = pTask->hTaskInfo.id.taskId;
141,541✔
1349
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
141,541!
1350

1351
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
283,082!
1352
  taskId = pTask->streamTaskId.taskId;
141,541✔
1353
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
141,541!
1354

1355
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
283,082!
1356
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
283,082!
1357
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
283,082!
1358
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
283,082!
1359

1360
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
141,541✔
1361
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
141,536!
1362
  for (int32_t i = 0; i < epSz; i++) {
335,773✔
1363
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
194,244✔
1364
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
194,209!
1365
  }
1366

1367
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
141,529✔
1368
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
150,390!
1369
  }
1370

1371
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
141,529✔
1372
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
139,144!
1373
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
139,144!
1374
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
139,144!
1375
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
71,957✔
1376
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
780!
1377
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
71,567!
1378
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
1379
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
71,567✔
1380
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
22,748!
1381
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
22,748!
1382
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
11,374!
1383
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
60,193✔
1384
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
60,151!
1385
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
120,304!
1386
  }
1387
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
283,060!
1388
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
283,060!
1389
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
283,060!
1390

1391
  tEndEncode(pEncoder);
141,530✔
1392
_exit:
141,539✔
1393
  return code;
141,539✔
1394
}
1395

1396
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
49,374✔
1397
  int32_t taskId = 0;
49,374✔
1398
  int32_t code = 0;
49,374✔
1399
  int32_t lino;
1400

1401
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
49,374!
1402
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
98,760!
1403
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
49,379!
1404
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
5!
1405
  }
1406

1407
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
98,757!
1408
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
98,754!
1409
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
98,749!
1410
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
98,750!
1411
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
98,754!
1412
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
98,751!
1413

1414
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
98,743!
1415
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
98,744!
1416

1417
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
98,749!
1418
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
98,748!
1419
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
49,374!
1420
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
49,378!
1421

1422
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
98,758!
1423
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
98,759!
1424
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
98,751!
1425

1426
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
98,746!
1427
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
49,376!
1428
  pTask->hTaskInfo.id.taskId = taskId;
49,376✔
1429

1430
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
98,751!
1431
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
49,373!
1432
  pTask->streamTaskId.taskId = taskId;
49,373✔
1433

1434
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
98,750!
1435
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
98,751!
1436
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
98,747!
1437
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
98,743!
1438

1439
  int32_t epSz = -1;
49,370✔
1440
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
49,375!
1441

1442
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
49,375!
1443
    TAOS_CHECK_EXIT(terrno);
×
1444
  }
1445
  for (int32_t i = 0; i < epSz; i++) {
116,772✔
1446
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
67,394!
1447
    if (pInfo == NULL) {
67,386!
1448
      TAOS_CHECK_EXIT(terrno);
×
1449
    }
1450
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
67,386!
1451
      taosMemoryFreeClear(pInfo);
×
1452
      goto _exit;
×
1453
    }
1454
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
134,784!
1455
      TAOS_CHECK_EXIT(terrno);
×
1456
    }
1457
  }
1458

1459
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
49,378✔
1460
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
52,136!
1461
  }
1462

1463
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
49,380✔
1464
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
48,521!
1465
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
24,261!
1466
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
24,259!
1467
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
24,257!
1468
      TAOS_CHECK_EXIT(terrno);
×
1469
    }
1470
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
48,502!
1471
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
25,120✔
1472
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
274!
1473
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
24,983!
1474
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
1475
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
24,983✔
1476
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
7,594!
1477
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
7,594!
1478
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
3,797!
1479
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
21,186✔
1480
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
21,165!
1481
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
21,165!
1482
  }
1483
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
98,737!
1484
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
49,374!
1485
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
98,751!
1486
  }
1487
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
49,373!
1488

1489
  tEndDecode(pDecoder);
49,377✔
1490

1491
_exit:
49,377✔
1492
  return code;
49,377✔
1493
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc