• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3656

14 Mar 2025 08:10AM UTC coverage: 62.841% (+3.3%) from 59.532%
#3656

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

147682 of 302527 branches covered (48.82%)

Branch coverage included in aggregate %.

88 of 99 new or added lines in 12 files covered. (88.89%)

3177 existing lines in 34 files now uncovered.

232747 of 302857 relevant lines covered (76.85%)

5880306.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.98
/source/libs/stream/src/streamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "osDir.h"
18
#include "osMemory.h"
19
#include "streamInt.h"
20
#include "streamsm.h"
21
#include "tmisce.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25
#include "streamMsg.h"
26

27
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
28
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
29
static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
30
static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
31

32
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
12,839✔
33
  int32_t childId = taosArrayGetSize(pArray);
12,839✔
34
  pTask->info.selfChildId = childId;
12,839✔
35
  void* p = taosArrayPush(pArray, &pTask);
12,839✔
36
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
12,839!
37
}
38

39
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
187✔
40
  int32_t code = 0;
187✔
41
  char    buf[512] = {0};
187✔
42

43
  if (pTask->info.nodeId == nodeId) {  // execution task should be moved away
187✔
44
    bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
56✔
45
    code = epsetToStr(pEpSet, buf, tListLen(buf));
56✔
46
    if (code) { // print error and continue
56!
47
      stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
48
      return code;
×
49
    }
50

51
    if (!isEqual) {
56✔
52
      (*pUpdated) = true;
54✔
53
      char tmp[512] = {0};
54✔
54
      code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));  // only for log file, ignore errors
54✔
55
      if (code) { // print error and continue
54!
56
        stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
57
        return code;
×
58
      }
59

60
      epsetAssign(&pTask->info.epSet, pEpSet);
54✔
61
      stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
54!
62
    } else {
63
      stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", pTask->id.taskId, nodeId, buf);
2!
64
    }
65
  }
66

67
  // check for the dispatch info and the upstream task info
68
  int32_t level = pTask->info.taskLevel;
187✔
69
  if (level == TASK_LEVEL__SOURCE) {
187✔
70
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
85✔
71
  } else if (level == TASK_LEVEL__AGG) {
102✔
72
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
17✔
73
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
17✔
74
  } else {  // TASK_LEVEL__SINK
75
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
85✔
76
  }
77

78
  return code;
187✔
79
}
80

81
static void freeItem(void* p) {
×
82
  SStreamContinueExecInfo* pInfo = p;
×
83
  rpcFreeCont(pInfo->msg.pCont);
×
84
}
×
85

86
static void freeUpstreamItem(void* p) {
74,916✔
87
  SStreamUpstreamEpInfo** pInfo = p;
74,916✔
88
  taosMemoryFree(*pInfo);
74,916!
89
}
74,925✔
90

91
static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
18,533✔
92
  SStreamUpstreamEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamUpstreamEpInfo));
18,533!
93
  if (pEpInfo == NULL) {
18,533!
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
95
    return NULL;
×
96
  }
97

98
  pEpInfo->childId = pTask->info.selfChildId;
18,533✔
99
  pEpInfo->epSet = pTask->info.epSet;
18,533✔
100
  pEpInfo->nodeId = pTask->info.nodeId;
18,533✔
101
  pEpInfo->taskId = pTask->id.taskId;
18,533✔
102
  pEpInfo->stage = -1;
18,533✔
103

104
  return pEpInfo;
18,533✔
105
}
106

107
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int32_t trigger,
12,839✔
108
                       int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
109
                       SStreamTask** p) {
110
  *p = NULL;
12,839✔
111

112
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
12,839!
113
  if (pTask == NULL) {
12,839!
114
    stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId,
×
115
            (int32_t)sizeof(SStreamTask), tstrerror(terrno));
116
    return terrno;
×
117
  }
118

119
  pTask->ver = SSTREAM_TASK_VER;
12,839✔
120
  pTask->id.taskId = tGenIdPI32();
12,839✔
121
  pTask->id.streamId = streamId;
12,839✔
122

123
  pTask->info.taskLevel = taskLevel;
12,839✔
124
  pTask->info.fillHistory = fillHistory;
12,839✔
125
  pTask->info.trigger = trigger;
12,839✔
126
  pTask->info.delaySchedParam = triggerParam;
12,839✔
127
  pTask->subtableWithoutMd5 = subtableWithoutMd5;
12,839✔
128

129
  int32_t code = streamCreateStateMachine(pTask);
12,839✔
130
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
12,839!
131
    taosMemoryFreeClear(pTask);
×
132
    return code;
×
133
  }
134

135
  char    buf[128] = {0};
12,839✔
136
  int32_t ret = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
12,839✔
137
  if (ret < 0 || ret >= tListLen(buf)) {
12,839!
138
    stError("s-task:0x%x failed to set the taskIdstr, code: out of buffer", pTask->id.taskId);
×
139
    return TSDB_CODE_OUT_OF_BUFFER;
×
140
  }
141

142
  pTask->id.idStr = taosStrdup(buf);
12,839!
143
  if (pTask->id.idStr == NULL) {
12,839!
144
    stError("s-task:0x%x failed to build task id, code: out of memory", pTask->id.taskId);
×
145
    return terrno;
×
146
  }
147

148
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
12,839✔
149
  pTask->status.taskStatus = fillHistory ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
12,839✔
150
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
12,839✔
151
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
12,839✔
152

153
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
12,839✔
154

155
  if (fillHistory && !hasFillhistory) {
12,839!
156
    stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId);
×
157
    return TSDB_CODE_INVALID_PARA;
×
158
  }
159

160
  epsetAssign(&(pTask->info.mnodeEpset), pEpset);
12,839✔
161

162
  code = addToTaskset(pTaskList, pTask);
12,839✔
163
  *p = pTask;
12,839✔
164

165
  return code;
12,839✔
166
}
167

168
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
413✔
169
  int64_t skip64;
170
  int8_t  skip8;
171
  int32_t skip32;
172
  int16_t skip16;
173
  SEpSet  epSet;
174

175
  if (tStartDecode(pDecoder) < 0) return -1;
413!
176
  if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
828✔
177
  // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
178

179
  if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
366!
180
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
366!
181
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
366!
182
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
366!
183
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
366!
184
  if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
366!
185

186
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
366!
187
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
366!
188

189
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
366!
190
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
366!
191
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
366!
192
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
366!
193

194
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
732!
195
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
732!
196

197
  tEndDecode(pDecoder);
366✔
198
  return 0;
366✔
199
}
200

201
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
4✔
202
  int64_t ver;
203
  if (tStartDecode(pDecoder) < 0) return -1;
4!
204
  if (tDecodeI64(pDecoder, &ver) < 0) return -1;
4!
205
  if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
4!
206

207
  if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
8!
208

209
  int32_t taskId = 0;
4✔
210
  if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
4!
211

212
  pTaskId->taskId = taskId;
4✔
213
  tEndDecode(pDecoder);
4✔
214
  return 0;
4✔
215
}
216

217
void tFreeStreamTask(void* pParam) {
52,246✔
218
  char*        p = NULL;
52,246✔
219
  SStreamTask* pTask = pParam;
52,246✔
220
  int32_t      taskId = pTask->id.taskId;
52,246✔
221

222
  STaskExecStatisInfo* pStatis = &pTask->execInfo;
52,246✔
223

224
  ETaskStatus status1 = TASK_STATUS__UNINIT;
52,246✔
225
  if (pTask->status.pSM != NULL) {
52,246✔
226
    streamMutexLock(&pTask->lock);
26,134✔
227
    SStreamTaskState status = streamTaskGetStatus(pTask);
26,138✔
228
    p = status.name;
26,130✔
229
    status1 = status.state;
26,130✔
230
    streamMutexUnlock(&pTask->lock);
26,130✔
231
  }
232

233
  stDebug("start to free s-task:0x%x %p, state:%s, refId:%" PRId64, taskId, pTask, p, pTask->id.refId);
52,252✔
234

235
  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
52,252✔
236
  stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
52,252✔
237
          ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
238
          " nextProcessVer:%" PRId64 ", checkpointCount:%d",
239
          taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs,
240
          pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
241

242
  if (pTask->schedInfo.pDelayTimer != NULL) {
52,252✔
243
    streamTmrStop(pTask->schedInfo.pDelayTimer);
1,296✔
244
    pTask->schedInfo.pDelayTimer = NULL;
1,296✔
245
  }
246

247
  if (pTask->hTaskInfo.pTimer != NULL) {
52,252✔
248
    streamTmrStop(pTask->hTaskInfo.pTimer);
1,769✔
249
    pTask->hTaskInfo.pTimer = NULL;
1,767✔
250
  }
251

252
  if (pTask->msgInfo.pRetryTmr != NULL) {
52,250✔
253
    streamTmrStop(pTask->msgInfo.pRetryTmr);
4,940✔
254
    pTask->msgInfo.pRetryTmr = NULL;
4,941✔
255
  }
256

257
  if (pTask->inputq.queue) {
52,251✔
258
    streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
13,272✔
259
    pTask->inputq.queue = NULL;
13,264✔
260
  }
261

262
  if (pTask->outputq.queue) {
52,243✔
263
    streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
13,270✔
264
    pTask->outputq.queue = NULL;
13,271✔
265
  }
266

267
  if (pTask->exec.qmsg) {
52,244✔
268
    taosMemoryFree(pTask->exec.qmsg);
27,351!
269
  }
270

271
  if (pTask->exec.pExecutor) {
52,244✔
272
    qDestroyTask(pTask->exec.pExecutor);
6,834✔
273
    pTask->exec.pExecutor = NULL;
6,833✔
274
  }
275

276
  if (pTask->exec.pWalReader != NULL) {
52,243✔
277
    walCloseReader(pTask->exec.pWalReader);
6,697✔
278
    pTask->exec.pWalReader = NULL;
6,698✔
279
  }
280

281
  streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
52,244✔
282

283
  if (pTask->msgInfo.pData != NULL) {
52,256✔
284
    clearBufferedDispatchMsg(pTask);
27✔
285
  }
286

287
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
52,257✔
288
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
25,631!
289
    taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
25,633!
290
    tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo);
25,630✔
291
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
25,635✔
292
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
26,626✔
293
    taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
22,984✔
294
  }
295

296
  streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
52,259✔
297
  streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0);
52,261✔
298

299
  if (pTask->pNameMap) {
52,256✔
300
    tSimpleHashCleanup(pTask->pNameMap);
2,196✔
301
  }
302

303
  if (pTask->status.pSM != NULL) {
52,256✔
304
    streamMutexDestroy(&pTask->lock);
26,141✔
305
    streamMutexDestroy(&pTask->msgInfo.lock);
26,138✔
306
    streamMutexDestroy(&pTask->taskCheckInfo.checkInfoLock);
26,136✔
307
  }
308

309
  streamDestroyStateMachine(pTask->status.pSM);
52,248✔
310
  pTask->status.pSM = NULL;
52,262✔
311

312
  streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
52,262✔
313

314
  taosMemoryFree(pTask->outputInfo.pTokenBucket);
52,261!
315

316
  taosArrayDestroy(pTask->msgInfo.pSendInfo);
52,262✔
317
  pTask->msgInfo.pSendInfo = NULL;
52,261✔
318

319
  taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
52,261✔
320
  pTask->outputInfo.pNodeEpsetUpdateList = NULL;
52,260✔
321

322
  if (pTask->id.idStr != NULL) {
52,260✔
323
    taosMemoryFree((void*)pTask->id.idStr);
26,106!
324
  }
325

326
  streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
52,262✔
327
  pTask->chkInfo.pActiveInfo = NULL;
52,262✔
328

329
  taosArrayDestroyP(pTask->notifyInfo.pNotifyAddrUrls, NULL);
52,262✔
330
  taosMemoryFreeClear(pTask->notifyInfo.streamName);
52,263!
331
  taosMemoryFreeClear(pTask->notifyInfo.stbFullName);
52,263!
332
  tDeleteSchemaWrapper(pTask->notifyInfo.pSchemaWrapper);
52,263!
333

334
  pTask->notifyEventStat = (STaskNotifyEventStat){0};
52,257✔
335

336
  taosMemoryFree(pTask);
52,257!
337
  stDebug("s-task:0x%x free task completed", taskId);
52,262✔
338
}
52,262✔
339

340
void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
52,258✔
341
  stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId);
52,258✔
342
  if (pTask->pState != NULL) {
52,258✔
343
    stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
6,834✔
344
    streamStateClose(pTask->pState, remove);
6,834✔
345

346
    if (remove) taskDbSetClearFileFlag(pTask->pBackend);
6,834✔
347
    taskDbRemoveRef(pTask->pBackend);
6,834✔
348
    pTask->pBackend = NULL;
6,833✔
349
    pTask->pState = NULL;
6,833✔
350
  } else {
351
    stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
45,424✔
352
            pTask->backendPath ? pTask->backendPath : "NULL");
353
    if (remove) {
45,424✔
354
      if (pTask->backendPath != NULL) {
3,135✔
355
        stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
3,134✔
356
        taosRemoveDir(pTask->backendPath);
3,134✔
357
      }
358
    }
359
  }
360

361
  if (pTask->backendPath != NULL) {
52,251✔
362
    taosMemoryFree(pTask->backendPath);
13,266!
363
    pTask->backendPath = NULL;
13,268✔
364
  }
365
}
52,253✔
366

367
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
13,254✔
368
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
13,254✔
369
  SDataRange*      pRange = &pTask->dataRange;
13,254✔
370

371
  // only set the version info for stream tasks without fill-history task
372
  if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
13,254✔
373
    pChkInfo->checkpointVer = ver - 1;  // only update when generating checkpoint
4,524✔
374
    pChkInfo->processedVer = ver - 1;   // already processed version
4,524✔
375
    pChkInfo->nextProcessVer = ver;     // next processed version
4,524✔
376

377
    pRange->range.maxVer = ver;
4,524✔
378
    pRange->range.minVer = ver;
4,524✔
379
  } else {
380
    // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
381
    // is set at the mnode.
382
    if (pTask->info.fillHistory == 1) {
8,730✔
383
      pChkInfo->checkpointVer = pRange->range.maxVer;
4,371✔
384
      pChkInfo->processedVer = pRange->range.maxVer;
4,371✔
385
      pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
4,371✔
386
    } else {
387
      pChkInfo->checkpointVer = pRange->range.minVer - 1;
4,359✔
388
      pChkInfo->processedVer = pRange->range.minVer - 1;
4,359✔
389
      pChkInfo->nextProcessVer = pRange->range.minVer;
4,359✔
390

391
      {  // for compatible purpose, remove it later
392
        if (pRange->range.minVer == 0) {
4,359✔
393
          pChkInfo->checkpointVer = 0;
2,196✔
394
          pChkInfo->processedVer = 0;
2,196✔
395
          pChkInfo->nextProcessVer = 1;
2,196✔
396
          stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
2,196✔
397
        }
398
      }
399
    }
400
  }
401
}
13,254✔
402

403
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
13,267✔
404
  int64_t streamId = 0;
13,267✔
405
  int32_t taskId = 0;
13,267✔
406

407
  if (pTask->info.fillHistory) {
13,267✔
408
    streamId = pTask->streamTaskId.streamId;
4,372✔
409
    taskId = pTask->streamTaskId.taskId;
4,372✔
410
  } else {
411
    streamId = pTask->id.streamId;
8,895✔
412
    taskId = pTask->id.taskId;
8,895✔
413
  }
414

415
  char    id[128] = {0};
13,267✔
416
  int32_t nBytes = snprintf(id, tListLen(id), "0x%" PRIx64 "-0x%x", streamId, taskId);
13,267✔
417
  if (nBytes < 0 || nBytes >= sizeof(id)) {
13,267!
418
    return TSDB_CODE_OUT_OF_BUFFER;
×
419
  }
420

421
  int32_t len = strlen(pTask->pMeta->path);
13,272✔
422
  pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
13,272!
423
  if (pTask->backendPath == NULL) {
13,269!
424
    return terrno;
×
425
  }
426

427
  int32_t code = snprintf(pTask->backendPath, len + nBytes + 2, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
13,269✔
428
  if (code < 0 || code >= len + nBytes + 2) {
13,269!
429
    stError("s-task:%s failed to set backend path:%s, code: out of buffer", pTask->id.idStr, pTask->backendPath);
×
430
    return TSDB_CODE_OUT_OF_BUFFER;
×
431
  } else {
432
    stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
13,270✔
433
    return 0;
13,270✔
434
  }
435
}
436

437
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
13,260✔
438
  int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
13,260✔
439
  if (code) {
13,262!
440
    stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
×
441
    return code;
×
442
  }
443

444
  pTask->id.refId = 0;
13,262✔
445
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
13,262✔
446
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
13,262✔
447

448
  int32_t code1 = streamQueueOpen(512 << 10, &pTask->inputq.queue);
13,262✔
449
  int32_t code2 = streamQueueOpen(512 << 10, &pTask->outputq.queue);
13,267✔
450
  if (code1 || code2) {
13,271!
451
    stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
1!
452
    return TSDB_CODE_OUT_OF_MEMORY;
×
453
  }
454

455
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
13,270✔
456

457
  code = streamCreateStateMachine(pTask);
13,270✔
458
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
13,258!
459
    stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
×
460
            tstrerror(code));
461
    return code;
×
462
  }
463

464
  pTask->execInfo.created = taosGetTimestampMs();
13,263✔
465
  setInitialVersionInfo(pTask, ver);
13,263✔
466

467
  pTask->pMeta = pMeta;
13,264✔
468
  pTask->pMsgCb = pMsgCb;
13,264✔
469
  pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry));
13,264✔
470
  if (pTask->msgInfo.pSendInfo == NULL) {
13,265!
471
    stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr);
×
472
    return terrno;
×
473
  }
474

475
  code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
13,265✔
476
  if (code) {
13,260!
477
    stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code));
×
478
    return code;
×
479
  }
480

481
  TdThreadMutexAttr attr = {0};
13,260✔
482
  code = taosThreadMutexAttrInit(&attr);
13,260✔
483
  if (code != 0) {
13,258!
484
    stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
×
485
    return code;
×
486
  }
487

488
  code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
13,258✔
489
  if (code != 0) {
13,258!
490
    stError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
×
491
    return code;
×
492
  }
493

494
  code = taosThreadMutexInit(&pTask->lock, &attr);
13,258✔
495
  if (code) {
13,254!
496
    return code;
×
497
  }
498

499
  code = taosThreadMutexAttrDestroy(&attr);
13,254✔
500
  if (code) {
13,255!
501
    return code;
×
502
  }
503

504
  streamTaskOpenAllUpstreamInput(pTask);
13,255✔
505

506
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
13,267✔
507
  pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
13,267!
508
  if (pOutputInfo->pTokenBucket == NULL) {
13,262!
509
    stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno));
×
510
    return terrno;
×
511
  }
512

513
  // 2MiB per second for sink task
514
  // 50 times sink operator per second
515
  code = streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
13,262✔
516
  if (code) {
13,269!
517
    return code;
×
518
  }
519

520
  pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
13,269✔
521
  if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
13,270!
522
    stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno));
×
523
    return terrno;
×
524
  }
525

526
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
13,270✔
527
  if (pTask->taskCheckInfo.pList == NULL) {
13,264!
528
    stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno));
×
529
    return terrno;
×
530
  }
531

532
  code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
13,264✔
533
  if (code) {
13,264!
534
    return code;
×
535
  }
536

537
  if (pTask->chkInfo.pActiveInfo == NULL) {
13,264!
538
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
13,266✔
539
    if (code) {
13,267!
540
      stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code));
×
541
      return code;
×
542
    }
543
  }
544

545
  return streamTaskSetBackendPath(pTask);
13,265✔
546
}
547

548
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
82,190✔
549
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
82,190✔
550
    return 0;
6,189✔
551
  }
552

553
  int32_t type = pTask->outputInfo.type;
76,001✔
554
  if (type == TASK_OUTPUT__TABLE) {
76,001✔
555
    return 0;
257✔
556
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
75,744✔
557
    return 1;
7,751✔
558
  } else {
559
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
67,993✔
560
    return taosArrayGetSize(vgInfo);
67,993✔
561
  }
562
}
563

564
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) { return taosArrayGetSize(pTask->upstreamInfo.pList); }
13,736✔
565

566
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
18,533✔
567
  SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
18,533✔
568
  if (pEpInfo == NULL) {
18,533!
569
    return terrno;
×
570
  }
571

572
  if (pTask->upstreamInfo.pList == NULL) {
18,533✔
573
    pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES);
6,353✔
574
  }
575

576
  void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
18,533✔
577
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
18,533!
578
}
579

580
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
102✔
581
  int32_t code = 0;
102✔
582
  char    buf[512] = {0};
102✔
583
  code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore error since it is only for log file.
102✔
584
  if (code != 0) {  // print error and continue
102!
585
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
586
    return code;
×
587
  }
588

589
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
102✔
590
  for (int32_t i = 0; i < numOfUpstream; ++i) {
202✔
591
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
119✔
592
    if (pInfo->nodeId == nodeId) {
119✔
593
      bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
19✔
594
      if (!equal) {
19✔
595
        *pUpdated = true;
15✔
596

597
        char tmp[512] = {0};
15✔
598
        code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
15✔
599
        if (code != 0) {  // print error and continue
15!
600
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
601
          return code;
×
602
        }
603

604
        epsetAssign(&pInfo->epSet, pEpSet);
15✔
605
        stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
15!
606
                pInfo->taskId, nodeId, buf, tmp);
607
      } else {
608
        stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId,
4!
609
                pInfo->taskId, nodeId, buf);
610
      }
611

612
      break;
19✔
613
    }
614
  }
615

616
  return code;
102✔
617
}
618

619
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
52,248✔
620
  if (pUpstreamInfo->pList != NULL) {
52,248✔
621
    taosArrayDestroyEx(pUpstreamInfo->pList, freeUpstreamItem);
45,726✔
622
    pUpstreamInfo->numOfClosed = 0;
45,734✔
623
    pUpstreamInfo->pList = NULL;
45,734✔
624
  }
625
}
52,256✔
626

627
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
842✔
628
  STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
842✔
629
  pDispatcher->taskId = pDownstreamTask->id.taskId;
842✔
630
  pDispatcher->nodeId = pDownstreamTask->info.nodeId;
842✔
631
  pDispatcher->epSet = pDownstreamTask->info.epSet;
842✔
632

633
  pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
842✔
634
  pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
842✔
635
}
842✔
636

637
int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
102✔
638
  char    buf[512] = {0};
102✔
639
  int32_t code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore the error since only for log files.
102✔
640
  if (code != 0) {                                        // print error and continue
102!
641
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
642
    return code;
×
643
  }
644

645
  int32_t id = pTask->id.taskId;
102✔
646
  int8_t  type = pTask->outputInfo.type;
102✔
647

648
  if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
102✔
649
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
17✔
650

651
    for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
36✔
652
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
34✔
653
      if (pVgInfo == NULL) {
34!
654
        continue;
×
655
      }
656

657
      if (pVgInfo->vgId == nodeId) {
34✔
658
        bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
15✔
659
        if (!isEqual) {
15!
660
          *pUpdated = true;
15✔
661

662
          char tmp[512] = {0};
15✔
663
          code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
15✔
664
          if (code != 0) {  // print error and continue
15!
665
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
666
            return code;
×
667
          }
668

669
          epsetAssign(&pVgInfo->epSet, pEpSet);
15✔
670
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
15!
671
                  nodeId, buf, tmp);
672
        } else {
673
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
×
674
                  pVgInfo->taskId, nodeId, buf);
675
        }
676
        break;
15✔
677
      }
678
    }
679
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
85!
680
    STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
85✔
681
    if (pDispatcher->nodeId == nodeId) {
85✔
682
      bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet);
4✔
683
      if (!equal) {
4!
684
        *pUpdated = true;
×
685

686
        char tmp[512] = {0};
×
687
        code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
×
688
        if (code != 0) {  // print error and continue
×
689
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
690
          return code;
×
691
        }
692

693
        epsetAssign(&pDispatcher->epSet, pEpSet);
×
694
        stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
×
695
                nodeId, buf, tmp);
696
      } else {
697
        stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
4!
698
                pDispatcher->taskId, nodeId, buf);
699
      }
700
    }
701
  }
702

703
  return code;
102✔
704
}
705

706
int32_t streamTaskStop(SStreamTask* pTask) {
2,696✔
707
  int32_t     vgId = pTask->pMeta->vgId;
2,696✔
708
  int64_t     st = taosGetTimestampMs();
2,696✔
709
  const char* id = pTask->id.idStr;
2,696✔
710

711
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
2,696✔
712
  if (code) {
2,696!
713
    stError("failed to handle STOP event, s-task:%s, code:%s", id, tstrerror(code));
×
714
    return code;
×
715
  }
716

717
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
2,696✔
718
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000);
1,453✔
719
    if (code != TSDB_CODE_SUCCESS) {
1,453!
720
      stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
×
721
    }
722
  }
723

724
  while (!streamTaskIsIdle(pTask)) {
2,696!
725
    stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
×
726
            pTask->info.taskLevel);
727
    taosMsleep(100);
×
728
  }
729

730
  int64_t el = taosGetTimestampMs() - st;
2,696✔
731
  stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
2,696✔
732
  return code;
2,696✔
733
}
734

735
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
69✔
736
  STaskExecStatisInfo* p = &pTask->execInfo;
69✔
737

738
  int32_t numOfNodes = taosArrayGetSize(pNodeList);
69✔
739
  int64_t prevTs = p->latestUpdateTs;
69✔
740

741
  p->latestUpdateTs = taosGetTimestampMs();
69✔
742
  p->updateCount += 1;
69✔
743
  stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
69!
744
          numOfNodes, p->updateCount, prevTs);
745

746
  bool updated = false;
69✔
747
  for (int32_t i = 0; i < numOfNodes; ++i) {
256✔
748
    SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
187✔
749
    if (pInfo == NULL) {
187!
750
      continue;
×
751
    }
752

753
    int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
187✔
754
    if (code) {
187!
755
      stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
×
756
    }
757
  }
758

759
  return updated;
69✔
760
}
761

762
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
13,270✔
763
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
13,270✔
764
    return;
6,698✔
765
  }
766

767
  int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList);
6,572✔
768
  for (int32_t i = 0; i < size; ++i) {
25,814✔
769
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
19,242✔
770
    pInfo->stage = -1;
19,240✔
771
  }
772

773
  stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
6,572✔
774
}
775

776
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
20,147✔
777
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
20,147✔
778
  if (num == 0) {
20,155✔
779
    return;
10,147✔
780
  }
781

782
  for (int32_t i = 0; i < num; ++i) {
40,212✔
783
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
30,207✔
784
    pInfo->dataAllowed = true;
30,204✔
785
  }
786

787
  pTask->upstreamInfo.numOfClosed = 0;
10,005✔
788
  stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num);
10,005✔
789
}
790

791
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
5,938✔
792
  SStreamUpstreamEpInfo* pInfo = NULL;
5,938✔
793
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
5,938✔
794

795
  if ((pInfo != NULL) && pInfo->dataAllowed) {
5,938!
796
    pInfo->dataAllowed = false;
5,938✔
797
    if (pTask->upstreamInfo.numOfClosed < streamTaskGetNumOfUpstream(pTask)) {
5,938!
798
      int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
5,938✔
799
    } else {
800
      stError("s-task:%s not inc closed input, since they have been all closed already", pTask->id.idStr);
×
801
    }
802
  }
803
}
5,938✔
804

805
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
1✔
806
  SStreamUpstreamEpInfo* pInfo = NULL;
1✔
807
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
1✔
808

809
  if (pInfo != NULL && (!pInfo->dataAllowed)) {
1!
810
    int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
1✔
811
    stDebug("s-task:%s open inputQ for upstream:0x%x, remain closed:%d", pTask->id.idStr, taskId, t);
1!
812
    pInfo->dataAllowed = true;
1✔
813
  }
814
}
1✔
815

816
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
×
817
  return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
×
818
}
819

820
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
56,195✔
821
  bool ret = false;
56,195✔
822

823
  streamMutexLock(&pTask->lock);
56,195✔
824
  if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
56,197✔
825
    pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
48,358✔
826
    ret = true;
48,358✔
827
  }
828

829
  streamMutexUnlock(&pTask->lock);
56,197✔
830
  return ret;
56,195✔
831
}
832

833
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
47,038✔
834
  streamMutexLock(&pTask->lock);
47,038✔
835
  int8_t status = pTask->status.schedStatus;
47,054✔
836
  if (status == TASK_SCHED_STATUS__WAITING) {
47,054!
837
    pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
47,054✔
838
  }
839
  streamMutexUnlock(&pTask->lock);
47,054✔
840

841
  return status;
47,050✔
842
}
843

844
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
1,295✔
845
  streamMutexLock(&pTask->lock);
1,295✔
846
  int8_t status = pTask->status.schedStatus;
1,295✔
847
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
1,295✔
848
  streamMutexUnlock(&pTask->lock);
1,295✔
849

850
  return status;
1,295✔
851
}
852

853
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
6,384✔
854
  int32_t      code = 0;
6,384✔
855
  SStreamMeta* pMeta = pTask->pMeta;
6,384✔
856
  SStreamTask* pStreamTask = NULL;
6,384✔
857

858
  if (pTask->info.fillHistory == 0) {
6,384!
859
    return code;
6,386✔
860
  }
861

UNCOV
862
  code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
×
863
  if (code == 0) {
8!
864
    stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
×
865
            (int32_t)pTask->streamTaskId.taskId);
866

867
    streamMutexLock(&(pStreamTask->lock));
×
868
    CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
×
869

870
    if (resetRelHalt) {
×
871
      stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
×
872
              pTask->streamTaskId.taskId, streamTaskGetStatusStr(pStreamTask->status.taskStatus),
873
              streamTaskGetStatus(pStreamTask).name);
874
      pStreamTask->status.taskStatus = TASK_STATUS__READY;
×
875
    }
876

877
    code = streamMetaSaveTaskInMeta(pMeta, pStreamTask);
×
878
    streamMutexUnlock(&(pStreamTask->lock));
×
879

880
    streamMetaReleaseTask(pMeta, pStreamTask);
×
881
  }
882

883
  return code;
8✔
884
}
885

886
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
8✔
887
  SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
8✔
888
  if (pReq == NULL) {
8!
889
    return terrno;
×
890
  }
891

892
  pReq->head.vgId = vgId;
8✔
893
  pReq->taskId = pTaskId->taskId;
8✔
894
  pReq->streamId = pTaskId->streamId;
8✔
895
  pReq->resetRelHalt = resetRelHalt;
8✔
896

897
  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
8✔
898
  int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
8✔
899
  if (code != TSDB_CODE_SUCCESS) {
8!
900
    stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
×
901
  } else {
902
    stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
8!
903
  }
904

905
  return code;
8✔
906
}
907

908
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
3,720✔
909
  int32_t                code = 0;
3,720✔
910
  int32_t                tlen = 0;
3,720✔
911
  int32_t                vgId = pTask->pMeta->vgId;
3,720✔
912
  const char*            id = pTask->id.idStr;
3,720✔
913
  SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo;
3,720✔
914

915
  SCheckpointReport req = {.streamId = pTask->id.streamId,
3,720✔
916
                           .taskId = pTask->id.taskId,
3,720✔
917
                           .nodeId = vgId,
918
                           .dropHTask = dropRelHTask,
919
                           .transId = pActive->transId,
3,720✔
920
                           .checkpointId = pActive->activeId,
3,720✔
921
                           .checkpointVer = pCheckpointInfo->processedVer,
3,720✔
922
                           .checkpointTs = pCheckpointInfo->startTs};
3,720✔
923

924
  tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code);
3,720!
925
  if (code < 0) {
3,720!
926
    stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code));
×
927
    return -1;
×
928
  }
929

930
  void* buf = rpcMallocCont(tlen);
3,720✔
931
  if (buf == NULL) {
3,720!
932
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId,
×
933
            tstrerror(TSDB_CODE_OUT_OF_MEMORY));
934
    return -1;
×
935
  }
936

937
  SEncoder encoder;
938
  tEncoderInit(&encoder, buf, tlen);
3,720✔
939
  if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
3,720!
940
    rpcFreeCont(buf);
×
941
    tEncoderClear(&encoder);
×
942
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
×
943
    return -1;
×
944
  }
945
  tEncoderClear(&encoder);
3,720✔
946

947
  SRpcMsg msg = {0};
3,720✔
948
  initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen);
3,720✔
949
  stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId);
3,720✔
950

951
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
3,720✔
952
}
953

954
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
41,891✔
955
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
41,891✔
956
  return id;
41,891✔
957
}
958

959
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
1,770✔
960
  pInfo->waitInterval = LAUNCH_HTASK_INTERVAL;
1,770✔
961
  pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL);
1,770✔
962
  pInfo->retryTimes = 0;
1,770✔
963
}
1,770✔
964

965
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
1,764✔
966
  pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
1,764✔
967
  pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
1,764✔
968
  pInfo->retryTimes += 1;
1,764✔
969
}
1,764✔
970

971
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) {
8,656✔
972
  pEntry->id.streamId = pTask->id.streamId;
8,656✔
973
  pEntry->id.taskId = pTask->id.taskId;
8,656✔
974
  pEntry->stage = -1;
8,656✔
975
  pEntry->nodeId = pTask->info.nodeId;
8,656✔
976
  pEntry->status = TASK_STATUS__STOP;
8,656✔
977
}
8,656✔
978

979
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) {
25,892✔
980
  pDst->stage = pSrc->stage;
25,892✔
981
  pDst->inputQUsed = pSrc->inputQUsed;
25,892✔
982
  pDst->inputRate = pSrc->inputRate;
25,892✔
983
  pDst->procsTotal = pSrc->procsTotal;
25,892✔
984
  pDst->procsThroughput = pSrc->procsThroughput;
25,892✔
985
  pDst->outputTotal = pSrc->outputTotal;
25,892✔
986
  pDst->outputThroughput = pSrc->outputThroughput;
25,892✔
987
  pDst->processedVer = pSrc->processedVer;
25,892✔
988
  pDst->verRange = pSrc->verRange;
25,892✔
989
  pDst->sinkQuota = pSrc->sinkQuota;
25,892✔
990
  pDst->sinkDataSize = pSrc->sinkDataSize;
25,892✔
991
  pDst->checkpointInfo = pSrc->checkpointInfo;
25,892✔
992
  pDst->startCheckpointId = pSrc->startCheckpointId;
25,892✔
993
  pDst->startCheckpointVer = pSrc->startCheckpointVer;
25,892✔
994
  pDst->status = pSrc->status;
25,892✔
995

996
  pDst->startTime = pSrc->startTime;
25,892✔
997
  pDst->hTaskId = pSrc->hTaskId;
25,892✔
998
  pDst->notifyEventStat = pSrc->notifyEventStat;
25,892✔
999
}
25,892✔
1000

1001
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
27,176✔
1002
  SStreamMeta*         pMeta = pTask->pMeta;
27,176✔
1003
  STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
27,176✔
1004

1005
  STaskStatusEntry entry = {
81,528✔
1006
      .id = streamTaskGetTaskId(pTask),
27,176✔
1007
      .status = streamTaskGetStatus(pTask).state,
27,176✔
1008
      .nodeId = pMeta->vgId,
27,176✔
1009
      .stage = pMeta->stage,
27,176✔
1010

1011
      .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)),
27,176✔
1012
      .startTime = pExecInfo->readyTs,
27,176✔
1013
      .checkpointInfo.latestId = pTask->chkInfo.checkpointId,
27,176✔
1014
      .checkpointInfo.latestVer = pTask->chkInfo.checkpointVer,
27,176✔
1015
      .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
27,176✔
1016
      .checkpointInfo.latestSize = 0,
1017
      .checkpointInfo.remoteBackup = 0,
1018
      .checkpointInfo.consensusChkptId = 0,
1019
      .checkpointInfo.consensusTs = 0,
1020
      .hTaskId = pTask->hTaskInfo.id.taskId,
27,176✔
1021
      .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
27,176✔
1022
      .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
27,176✔
1023
      .procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput),
27,176✔
1024
      .outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput),
27,176✔
1025
      .startCheckpointId = pExecInfo->startCheckpointId,
27,176✔
1026
      .startCheckpointVer = pExecInfo->startCheckpointVer,
27,176✔
1027
      .notifyEventStat = pTask->notifyEventStat,
1028
  };
1029
  return entry;
27,176✔
1030
}
1031

1032
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
910✔
1033
  SStreamMeta* pMeta = pTask->pMeta;
910✔
1034
  int32_t      code = 0;
910✔
1035

1036
  int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
910✔
1037
  stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
910!
1038

1039
  // in case of fill-history task, stop the tsdb file scan operation.
1040
  if (pTask->info.fillHistory == 1) {
910!
1041
    void* pExecutor = pTask->exec.pExecutor;
×
1042
    code = qKillTask(pExecutor, TSDB_CODE_SUCCESS, 10000);
×
1043
  }
1044

1045
  stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
910✔
1046
  return code;
910✔
1047
}
1048

1049
void streamTaskPause(SStreamTask* pTask) {
910✔
1050
  int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
910✔
1051
  if (code) {
910!
1052
    stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code));
×
1053
  }
1054
}
910✔
1055

1056
void streamTaskResume(SStreamTask* pTask) {
1,575✔
1057
  SStreamTaskState prevState = streamTaskGetStatus(pTask);
1,575✔
1058

1059
  SStreamMeta* pMeta = pTask->pMeta;
1,575✔
1060
  int32_t      code = streamTaskRestoreStatus(pTask);
1,575✔
1061
  if (code == TSDB_CODE_SUCCESS) {
1,575✔
1062
    char*   pNew = streamTaskGetStatus(pTask).name;
899✔
1063
    int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
899✔
1064
    stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
899!
1065
  } else {
1066
    stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name,
676!
1067
           pMeta->numOfPausedTasks);
1068
  }
1069
}
1,575✔
1070

1071
bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
50,114✔
1072

1073
// this task must success
1074
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
3,846✔
1075
  int32_t     code;
1076
  int32_t     tlen = 0;
3,846✔
1077
  int32_t     vgId = pTask->pMeta->vgId;
3,846✔
1078
  const char* id = pTask->id.idStr;
3,846✔
1079

1080
  SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
3,846✔
1081
  tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
3,846!
1082
  if (code < 0) {
3,848!
1083
    stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
×
1084
    return TSDB_CODE_INVALID_MSG;
×
1085
  }
1086

1087
  void* buf = rpcMallocCont(tlen);
3,848✔
1088
  if (buf == NULL) {
3,846!
1089
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:Out of memory", id, vgId);
×
1090
    return terrno;
×
1091
  }
1092

1093
  SEncoder encoder;
1094
  tEncoderInit(&encoder, buf, tlen);
3,846✔
1095
  if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
3,849!
1096
    rpcFreeCont(buf);
×
1097
    tEncoderClear(&encoder);
×
1098
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
×
1099
    return code;
×
1100
  }
1101

1102
  tEncoderClear(&encoder);
3,846✔
1103

1104
  SRpcMsg msg = {0};
3,849✔
1105
  initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
3,849✔
1106
  stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
3,846✔
1107

1108
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
3,846✔
1109
}
1110

1111
void streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo) {
57,851✔
1112
  *pEpInfo = NULL;
57,851✔
1113

1114
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
57,851✔
1115
  for (int32_t i = 0; i < num; ++i) {
119,698!
1116
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
119,698✔
1117
    if (pInfo == NULL) {
119,697!
1118
      return;
×
1119
    }
1120

1121
    if (pInfo->taskId == taskId) {
119,697✔
1122
      *pEpInfo = pInfo;
57,850✔
1123
      return;
57,850✔
1124
    }
1125
  }
1126

1127
  stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
×
1128
}
1129

1130
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
×
1131
  if (pTask->info.taskLevel == TASK_OUTPUT__FIXED_DISPATCH) {
×
1132
    if (pTask->outputInfo.fixedDispatcher.taskId == taskId) {
×
1133
      return &pTask->outputInfo.fixedDispatcher.epSet;
×
1134
    }
1135
  } else if (pTask->info.taskLevel == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
1136
    SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
1137
    for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
1138
      SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
×
1139
      if (pVgInfo == NULL) {
×
1140
        continue;
×
1141
      }
1142

1143
      if (pVgInfo->taskId == taskId) {
×
1144
        return &pVgInfo->epSet;
×
1145
      }
1146
    }
1147
  }
1148

1149
  return NULL;
×
1150
}
1151

1152
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
13,257✔
1153
  char buf[128] = {0};
13,257✔
1154
  int32_t code = snprintf(buf, tListLen(buf),"0x%" PRIx64 "-0x%x", streamId, taskId);
13,257✔
1155
  if (code < 0 || code >= tListLen(buf)) {
13,257!
1156
    return TSDB_CODE_OUT_OF_BUFFER;
×
1157
  }
1158

1159
  *pId = taosStrdup(buf);
13,270!
1160

1161
  if (*pId == NULL) {
13,256!
1162
    return terrno;
×
1163
  } else {
1164
    return TSDB_CODE_SUCCESS;
13,256✔
1165
  }
1166
}
1167

1168
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
485✔
1169
  int32_t           code;
1170
  SStreamDataBlock* pData;
1171

1172
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock), (void**)&pData);
485✔
1173
  if (code) {
485!
1174
    stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
×
1175
    return terrno = code;
×
1176
  }
1177

1178
  pData->type = STREAM_INPUT__DATA_RETRIEVE;
485✔
1179
  pData->srcVgId = 0;
485✔
1180

1181
  code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
485✔
1182
  if (code != TSDB_CODE_SUCCESS) {
485!
1183
    stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code));
×
1184
    taosFreeQitem(pData);
×
1185
    return code;
×
1186
  }
1187

1188
  code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
485✔
1189
  if (code != TSDB_CODE_SUCCESS) {
486!
1190
    stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
×
1191
            pTask->id.idStr);
1192
  }
1193

1194
  return code;
486✔
1195
}
1196

1197
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
485✔
1198
  int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
485✔
1199
  if (code != 0) {
486!
1200
    return code;
×
1201
  }
1202
  return streamTrySchedExec(pTask, false);
486✔
1203
}
1204

1205
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
6,394✔
1206

1207
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
×
1208
  if (pTransId != NULL) {
×
1209
    *pTransId = pTask->chkInfo.pActiveInfo->transId;
×
1210
  }
1211

1212
  if (pCheckpointId != NULL) {
×
1213
    *pCheckpointId = pTask->chkInfo.pActiveInfo->activeId;
×
1214
  }
1215
}
×
1216

1217
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) {
28✔
1218
  pTask->chkInfo.pActiveInfo->activeId = activeCheckpointId;
28✔
1219
  return TSDB_CODE_SUCCESS;
28✔
1220
}
1221

1222
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
×
1223
  pTask->chkInfo.pActiveInfo->transId = transId;
×
1224
  pTask->chkInfo.pActiveInfo->activeId = checkpointId;
×
1225
  pTask->chkInfo.pActiveInfo->failedId = checkpointId;
×
1226
  stDebug("s-task:%s set failed checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
×
1227
}
×
1228

1229
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
13,298✔
1230
  SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo));
13,298!
1231
  if (pInfo == NULL) {
13,313!
1232
    return terrno;
×
1233
  }
1234

1235
  int32_t code = taosThreadMutexInit(&pInfo->lock, NULL);
13,313✔
1236
  if (code != TSDB_CODE_SUCCESS) {
13,315!
1237
    return code;
×
1238
  }
1239

1240
  pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
13,315✔
1241
  pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
13,316✔
1242
  pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
13,310✔
1243

1244
  *pRes = pInfo;
13,311✔
1245
  return code;
13,311✔
1246
}
1247

1248
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
52,258✔
1249
  if (pInfo == NULL) {
52,258✔
1250
    return;
38,950✔
1251
  }
1252

1253
  streamMutexDestroy(&pInfo->lock);
13,308✔
1254
  taosArrayDestroy(pInfo->pDispatchTriggerList);
13,309✔
1255
  pInfo->pDispatchTriggerList = NULL;
13,313✔
1256
  taosArrayDestroy(pInfo->pReadyMsgList);
13,313✔
1257
  pInfo->pReadyMsgList = NULL;
13,310✔
1258
  taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
13,310✔
1259
  pInfo->pCheckpointReadyRecvList = NULL;
13,309✔
1260

1261
  SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
13,309✔
1262
  if (pTriggerTmr->tmrHandle != NULL) {
13,309✔
1263
    streamTmrStop(pTriggerTmr->tmrHandle);
1,901✔
1264
    pTriggerTmr->tmrHandle = NULL;
1,902✔
1265
  }
1266

1267
  SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
13,310✔
1268
  if (pReadyTmr->tmrHandle != NULL) {
13,310✔
1269
    streamTmrStop(pReadyTmr->tmrHandle);
1,883✔
1270
    pReadyTmr->tmrHandle = NULL;
1,884✔
1271
  }
1272

1273
  taosMemoryFree(pInfo);
13,311!
1274
}
1275

1276
// NOTE: clear the checkpoint id, and keep the failed id
1277
// failedId for a task will increase as the checkpoint I.D. increases.
1278
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
3,004✔
1279
  pInfo->activeId = 0;
3,004✔
1280
  pInfo->transId = 0;
3,004✔
1281
  pInfo->allUpstreamTriggerRecv = 0;
3,004✔
1282
  pInfo->dispatchTrigger = false;
3,004✔
1283

1284
  taosArrayClear(pInfo->pDispatchTriggerList);
3,004✔
1285
  taosArrayClear(pInfo->pCheckpointReadyRecvList);
3,003✔
1286
}
3,001✔
1287

1288
const char* streamTaskGetExecType(int32_t type) {
118,863✔
1289
  switch (type) {
118,863!
1290
    case STREAM_EXEC_T_EXTRACT_WAL_DATA:
63,325✔
1291
      return "scan-wal-file";
63,325✔
1292
    case STREAM_EXEC_T_START_ALL_TASKS:
8,181✔
1293
      return "start-all-tasks";
8,181✔
1294
    case STREAM_EXEC_T_START_ONE_TASK:
5,875✔
1295
      return "start-one-task";
5,875✔
1296
    case STREAM_EXEC_T_RESTART_ALL_TASKS:
11✔
1297
      return "restart-all-tasks";
11✔
1298
    case STREAM_EXEC_T_STOP_ALL_TASKS:
4,604✔
1299
      return "stop-all-tasks";
4,604✔
1300
    case STREAM_EXEC_T_RESUME_TASK:
2,677✔
1301
      return "resume-task-from-idle";
2,677✔
1302
    case STREAM_EXEC_T_ADD_FAILED_TASK:
3✔
1303
      return "record-start-failed-task";
3✔
1304
    case STREAM_EXEC_T_STOP_ONE_TASK:
×
1305
      return "stop-one-task";
×
1306
    case 0:
34,290✔
1307
      return "exec-all-tasks";
34,290✔
1308
    default:
×
1309
      return "invalid-exec-type";
×
1310
  }
1311
}
1312

1313
int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) {
26,327✔
1314
  *pRefId = taosMemoryMalloc(sizeof(int64_t));
26,327!
1315
  if (*pRefId != NULL) {
26,328!
1316
    **pRefId = pTask->id.refId;
26,328✔
1317
    int32_t code = metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
26,328✔
1318
    if (code != 0) {
26,329!
1319
      stError("s-task:%s failed to add refId:%" PRId64 " into refId-mgmt, code:%s", pTask->id.idStr, pTask->id.refId,
×
1320
              tstrerror(code));
1321
    }
1322
    return code;
26,329✔
1323
  } else {
1324
    stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno));
×
1325
    return terrno;
×
1326
  }
1327
}
1328

1329
void streamTaskFreeRefId(int64_t* pRefId) {
23,811✔
1330
  if (pRefId == NULL) {
23,811✔
1331
    return;
2,265✔
1332
  }
1333

1334
  metaRefMgtRemove(pRefId);
21,546✔
1335
}
1336

1337
static int32_t tEncodeStreamNotifyInfo(SEncoder* pEncoder, const SNotifyInfo* info) {
114,093✔
1338
  int32_t code = TSDB_CODE_SUCCESS;
114,093✔
1339
  int32_t lino = 0;
114,093✔
1340

1341
  QUERY_CHECK_NULL(pEncoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
114,093!
1342
  QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
114,093!
1343

1344
  int32_t addrSize = taosArrayGetSize(info->pNotifyAddrUrls);
114,093✔
1345
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
114,102✔
1346
  for (int32_t i = 0; i < addrSize; ++i) {
114,080!
1347
    const char* url = taosArrayGetP(info->pNotifyAddrUrls, i);
×
1348
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, url));
×
1349
  }
1350
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyEventTypes));
228,160!
1351
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyErrorHandle));
228,160!
1352
  if (addrSize > 0) {
114,080!
1353
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->streamName));
×
1354
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->stbFullName));
×
1355
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, info->pSchemaWrapper));
×
1356
  }
1357

1358
_exit:
114,080✔
1359
  if (code != TSDB_CODE_SUCCESS) {
114,102✔
1360
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
22!
1361
  }
1362
  return code;
114,103✔
1363
}
1364

1365
static int32_t tDecodeStreamNotifyInfo(SDecoder* pDecoder, SNotifyInfo* info) {
39,373✔
1366
  int32_t code = TSDB_CODE_SUCCESS;
39,373✔
1367
  int32_t lino = 0;
39,373✔
1368

1369
  QUERY_CHECK_NULL(pDecoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
39,373!
1370
  QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
39,373!
1371

1372
  int32_t addrSize = 0;
39,373✔
1373
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
39,380!
1374
  info->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
39,380✔
1375
  QUERY_CHECK_NULL(info->pNotifyAddrUrls, code, lino, _exit, terrno);
39,383!
1376
  for (int32_t i = 0; i < addrSize; ++i) {
39,383!
1377
    char *url = NULL;
×
1378
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
×
1379
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
×
1380
    QUERY_CHECK_NULL(url, code, lino, _exit, terrno);
×
1381
    if (taosArrayPush(info->pNotifyAddrUrls, &url) == NULL) {
×
1382
      taosMemoryFree(url);
×
1383
      TAOS_CHECK_EXIT(terrno);
×
1384
    }
1385
  }
1386
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyEventTypes));
78,760!
1387
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyErrorHandle));
78,751!
1388
  if (addrSize > 0) {
39,374!
1389
    char* name = NULL;
×
1390
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
×
1391
    info->streamName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
×
1392
    QUERY_CHECK_NULL(info->streamName, code, lino, _exit, terrno);
×
1393
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
×
1394
    info->stbFullName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
×
1395
    QUERY_CHECK_NULL(info->stbFullName, code, lino, _exit, terrno);
×
1396
    info->pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
×
1397
    if (info->pSchemaWrapper == NULL) {
×
1398
      TAOS_CHECK_EXIT(terrno);
×
1399
    }
1400
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, info->pSchemaWrapper));
×
1401
  }
1402

1403
_exit:
39,374✔
1404
  if (code != TSDB_CODE_SUCCESS) {
39,374!
1405
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1406
  }
1407
  return code;
39,376✔
1408
}
1409

1410
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
114,121✔
1411
  int32_t code = 0;
114,121✔
1412
  int32_t lino;
1413

1414
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
114,121!
1415
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
228,258!
1416
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
228,258!
1417
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
228,258!
1418
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
228,258!
1419
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
228,258!
1420
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
228,258!
1421
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
228,258!
1422

1423
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
228,258!
1424
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
228,258!
1425

1426
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
228,258!
1427
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
228,258!
1428
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
114,129!
1429
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
114,123!
1430

1431
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
228,246!
1432
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
228,246!
1433
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
228,246!
1434

1435
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
228,246!
1436
  int32_t taskId = pTask->hTaskInfo.id.taskId;
114,123✔
1437
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
114,123!
1438

1439
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
228,246!
1440
  taskId = pTask->streamTaskId.taskId;
114,123✔
1441
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
114,123!
1442

1443
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
228,246!
1444
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
228,246!
1445
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
228,246!
1446
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
228,246!
1447

1448
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
114,123✔
1449
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
114,123!
1450
  for (int32_t i = 0; i < epSz; i++) {
278,573✔
1451
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
164,461✔
1452
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
164,457!
1453
  }
1454

1455
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
114,112✔
1456
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
119,808!
1457
  }
1458

1459
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
114,112✔
1460
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
112,162!
1461
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
112,162!
1462
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
112,162!
1463
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
58,031✔
1464
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
756!
1465
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
57,653!
1466
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
1467
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
57,653✔
1468
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
15,148!
1469
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
15,148!
1470
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
7,574!
1471
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
50,079✔
1472
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
50,033!
1473
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
100,064!
1474
  }
1475
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
228,222!
1476
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
228,222!
1477
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
228,222!
1478

1479
  if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
114,111✔
1480
    TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo));
114,096✔
1481
  }
1482

1483
  tEndEncode(pEncoder);
114,093✔
1484
_exit:
114,122✔
1485
  return code;
114,122✔
1486
}
1487

1488
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
39,384✔
1489
  int32_t taskId = 0;
39,384✔
1490
  int32_t code = 0;
39,384✔
1491
  int32_t lino;
1492

1493
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
39,384!
1494
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
78,779✔
1495
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
39,380!
1496
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
5!
1497
  }
1498

1499
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
78,759!
1500
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
78,758!
1501
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
78,756!
1502
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
78,756!
1503
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
78,755!
1504
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
78,752!
1505

1506
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
78,743!
1507
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
78,738!
1508

1509
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
78,743!
1510
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
78,744!
1511
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
39,372!
1512
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
39,379!
1513

1514
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
78,764!
1515
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
78,763!
1516
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
78,755!
1517

1518
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
78,753!
1519
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
39,380!
1520
  pTask->hTaskInfo.id.taskId = taskId;
39,380✔
1521

1522
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
78,759!
1523
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
39,376!
1524
  pTask->streamTaskId.taskId = taskId;
39,376✔
1525

1526
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
78,753!
1527
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
78,754!
1528
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
78,755!
1529
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
78,750!
1530

1531
  int32_t epSz = -1;
39,372✔
1532
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
39,377!
1533

1534
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
39,377!
1535
    TAOS_CHECK_EXIT(terrno);
×
1536
  }
1537
  for (int32_t i = 0; i < epSz; i++) {
95,776✔
1538
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
56,389!
1539
    if (pInfo == NULL) {
56,383!
1540
      TAOS_CHECK_EXIT(terrno);
×
1541
    }
1542
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
56,383!
1543
      taosMemoryFreeClear(pInfo);
×
1544
      goto _exit;
×
1545
    }
1546
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
112,792!
1547
      TAOS_CHECK_EXIT(terrno);
×
1548
    }
1549
  }
1550

1551
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
39,387✔
1552
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
41,146!
1553
  }
1554

1555
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
39,387✔
1556
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
38,694!
1557
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
19,348!
1558
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
19,348!
1559
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
19,345!
1560
      TAOS_CHECK_EXIT(terrno);
×
1561
    }
1562
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
38,683!
1563
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
20,041✔
1564
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
254!
1565
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
19,914!
1566
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
1567
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
19,914✔
1568
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
5,128!
1569
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
5,128!
1570
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
2,564!
1571
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
17,350✔
1572
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
17,338!
1573
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
17,339!
1574
  }
1575
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
78,756!
1576
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
39,376!
1577
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
78,754!
1578
  }
1579
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
39,376!
1580

1581
  if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
39,378!
1582
    TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo));
39,380!
1583
  }
1584

1585
  tEndDecode(pDecoder);
39,374✔
1586

1587
_exit:
39,380✔
1588
  return code;
39,380✔
1589
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc