• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3720

26 Mar 2025 06:20AM UTC coverage: 30.242% (-31.7%) from 61.936%
#3720

push

travis-ci

web-flow
feat(taosBenchmark): supports decimal data type (#30456)

* feat: taosBenchmark supports decimal data type

* build: decimal script not use pytest.sh

* fix: fix typo for decimal script

* test: insertBasic.py debug

71234 of 313946 branches covered (22.69%)

Branch coverage included in aggregate %.

38 of 423 new or added lines in 8 files covered. (8.98%)

120240 existing lines in 447 files now uncovered.

118188 of 312400 relevant lines covered (37.83%)

1450220.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.34
/source/libs/stream/src/streamTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "osDir.h"
18
#include "osMemory.h"
19
#include "streamInt.h"
20
#include "streamMsg.h"
21
#include "streamsm.h"
22
#include "tmisce.h"
23
#include "tstream.h"
24
#include "ttimer.h"
25
#include "wal.h"
26

27
static void    streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
28
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
29
static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
30
static void    streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
31

UNCOV
32
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
×
UNCOV
33
  int32_t childId = taosArrayGetSize(pArray);
×
UNCOV
34
  pTask->info.selfChildId = childId;
×
UNCOV
35
  void* p = taosArrayPush(pArray, &pTask);
×
UNCOV
36
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
×
37
}
38

UNCOV
39
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
×
UNCOV
40
  int32_t code = 0;
×
UNCOV
41
  char    buf[512] = {0};
×
42

UNCOV
43
  if (pTask->info.nodeId == nodeId) {  // execution task should be moved away
×
UNCOV
44
    bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
×
UNCOV
45
    code = epsetToStr(pEpSet, buf, tListLen(buf));
×
UNCOV
46
    if (code) {  // print error and continue
×
47
      stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
48
      return code;
×
49
    }
50

UNCOV
51
    if (!isEqual) {
×
UNCOV
52
      (*pUpdated) = true;
×
UNCOV
53
      char tmp[512] = {0};
×
UNCOV
54
      code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));  // only for log file, ignore errors
×
UNCOV
55
      if (code) {                                                 // print error and continue
×
56
        stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
57
        return code;
×
58
      }
59

UNCOV
60
      epsetAssign(&pTask->info.epSet, pEpSet);
×
UNCOV
61
      stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
×
62
    } else {
UNCOV
63
      stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", pTask->id.taskId, nodeId, buf);
×
64
    }
65
  }
66

67
  // check for the dispatch info and the upstream task info
UNCOV
68
  int32_t level = pTask->info.taskLevel;
×
UNCOV
69
  if (level == TASK_LEVEL__SOURCE) {
×
UNCOV
70
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
×
UNCOV
71
  } else if (level == TASK_LEVEL__AGG) {
×
UNCOV
72
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
×
UNCOV
73
    code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated);
×
74
  } else {  // TASK_LEVEL__SINK
UNCOV
75
    code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated);
×
76
  }
77

UNCOV
78
  return code;
×
79
}
80

81
static void freeItem(void* p) {
×
82
  SStreamContinueExecInfo* pInfo = p;
×
83
  rpcFreeCont(pInfo->msg.pCont);
×
84
}
×
85

UNCOV
86
static void freeUpstreamItem(void* p) {
×
UNCOV
87
  SStreamUpstreamEpInfo** pInfo = p;
×
UNCOV
88
  taosMemoryFree(*pInfo);
×
UNCOV
89
}
×
90

UNCOV
91
static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
×
UNCOV
92
  SStreamUpstreamEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamUpstreamEpInfo));
×
UNCOV
93
  if (pEpInfo == NULL) {
×
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
95
    return NULL;
×
96
  }
97

UNCOV
98
  pEpInfo->childId = pTask->info.selfChildId;
×
UNCOV
99
  pEpInfo->epSet = pTask->info.epSet;
×
UNCOV
100
  pEpInfo->nodeId = pTask->info.nodeId;
×
UNCOV
101
  pEpInfo->taskId = pTask->id.taskId;
×
UNCOV
102
  pEpInfo->stage = -1;
×
UNCOV
103
  pEpInfo->lastMsgId = -1;
×
104

UNCOV
105
  return pEpInfo;
×
106
}
107

UNCOV
108
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, EStreamTaskType type, int32_t trigger,
×
109
                       int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
110
                       int8_t hasAggTasks, SStreamTask** p) {
UNCOV
111
  *p = NULL;
×
112

UNCOV
113
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
×
UNCOV
114
  if (pTask == NULL) {
×
115
    stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId,
×
116
            (int32_t)sizeof(SStreamTask), tstrerror(terrno));
117
    return terrno;
×
118
  }
119

UNCOV
120
  pTask->ver = SSTREAM_TASK_VER;
×
UNCOV
121
  pTask->id.taskId = tGenIdPI32();
×
UNCOV
122
  pTask->id.streamId = streamId;
×
123

UNCOV
124
  pTask->info.taskLevel = taskLevel;
×
UNCOV
125
  pTask->info.fillHistory = type;
×
UNCOV
126
  pTask->info.trigger = trigger;
×
UNCOV
127
  pTask->info.hasAggTasks = hasAggTasks;
×
UNCOV
128
  pTask->info.delaySchedParam = triggerParam;
×
UNCOV
129
  pTask->subtableWithoutMd5 = subtableWithoutMd5;
×
130

UNCOV
131
  int32_t code = streamCreateStateMachine(pTask);
×
UNCOV
132
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
×
133
    taosMemoryFreeClear(pTask);
×
134
    return code;
×
135
  }
136

UNCOV
137
  char    buf[128] = {0};
×
UNCOV
138
  int32_t ret = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
×
UNCOV
139
  if (ret < 0 || ret >= tListLen(buf)) {
×
140
    stError("s-task:0x%x failed to set the taskIdstr, code: out of buffer", pTask->id.taskId);
×
141
    return TSDB_CODE_OUT_OF_BUFFER;
×
142
  }
143

UNCOV
144
  pTask->id.idStr = taosStrdup(buf);
×
UNCOV
145
  if (pTask->id.idStr == NULL) {
×
146
    stError("s-task:0x%x failed to build task id, code: out of memory", pTask->id.taskId);
×
147
    return terrno;
×
148
  }
149

UNCOV
150
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
×
UNCOV
151
  pTask->status.taskStatus =
×
UNCOV
152
      (pTask->info.fillHistory == STREAM_HISTORY_TASK) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
×
UNCOV
153
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
×
UNCOV
154
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
×
155

UNCOV
156
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
×
157

UNCOV
158
  if ((pTask->info.fillHistory == STREAM_HISTORY_TASK) && !hasFillhistory) {
×
159
    stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId);
×
160
    return TSDB_CODE_INVALID_PARA;
×
161
  }
162

UNCOV
163
  epsetAssign(&(pTask->info.mnodeEpset), pEpset);
×
164

UNCOV
165
  code = addToTaskset(pTaskList, pTask);
×
UNCOV
166
  *p = pTask;
×
167

UNCOV
168
  return code;
×
169
}
170

UNCOV
171
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
×
172
  int64_t skip64;
173
  int8_t  skip8;
174
  int32_t skip32;
175
  int16_t skip16;
176
  SEpSet  epSet;
177

UNCOV
178
  if (tStartDecode(pDecoder) < 0) return -1;
×
UNCOV
179
  if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
×
180
  // if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
181

UNCOV
182
  if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
×
UNCOV
183
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
×
UNCOV
184
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
×
UNCOV
185
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
×
UNCOV
186
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
×
UNCOV
187
  if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
×
188

UNCOV
189
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
×
UNCOV
190
  if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
×
191

UNCOV
192
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
×
UNCOV
193
  if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
×
UNCOV
194
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
×
UNCOV
195
  if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
×
196

UNCOV
197
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
×
UNCOV
198
  if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
×
199

UNCOV
200
  tEndDecode(pDecoder);
×
UNCOV
201
  return 0;
×
202
}
203

UNCOV
204
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
×
205
  int64_t ver;
UNCOV
206
  if (tStartDecode(pDecoder) < 0) return -1;
×
UNCOV
207
  if (tDecodeI64(pDecoder, &ver) < 0) return -1;
×
UNCOV
208
  if (ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
×
209

UNCOV
210
  if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
×
211

UNCOV
212
  int32_t taskId = 0;
×
UNCOV
213
  if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
×
214

UNCOV
215
  pTaskId->taskId = taskId;
×
UNCOV
216
  tEndDecode(pDecoder);
×
UNCOV
217
  return 0;
×
218
}
219

UNCOV
220
void tFreeStreamTask(void* pParam) {
×
UNCOV
221
  char*        p = NULL;
×
UNCOV
222
  SStreamTask* pTask = pParam;
×
UNCOV
223
  int32_t      taskId = pTask->id.taskId;
×
224

UNCOV
225
  STaskExecStatisInfo* pStatis = &pTask->execInfo;
×
226

UNCOV
227
  ETaskStatus status1 = TASK_STATUS__UNINIT;
×
UNCOV
228
  if (pTask->status.pSM != NULL) {
×
UNCOV
229
    streamMutexLock(&pTask->lock);
×
UNCOV
230
    SStreamTaskState status = streamTaskGetStatus(pTask);
×
UNCOV
231
    p = status.name;
×
UNCOV
232
    status1 = status.state;
×
UNCOV
233
    streamMutexUnlock(&pTask->lock);
×
234
  }
235

UNCOV
236
  stDebug("start to free s-task:0x%x %p, state:%s, refId:%" PRId64, taskId, pTask, p, pTask->id.refId);
×
237

UNCOV
238
  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
×
UNCOV
239
  stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
×
240
          ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
241
          " nextProcessVer:%" PRId64 ", checkpointCount:%d",
242
          taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs,
243
          pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
244

UNCOV
245
  if (pTask->schedInfo.pDelayTimer != NULL) {
×
UNCOV
246
    streamTmrStop(pTask->schedInfo.pDelayTimer);
×
UNCOV
247
    pTask->schedInfo.pDelayTimer = NULL;
×
248
  }
249

UNCOV
250
  if (pTask->hTaskInfo.pTimer != NULL) {
×
UNCOV
251
    streamTmrStop(pTask->hTaskInfo.pTimer);
×
UNCOV
252
    pTask->hTaskInfo.pTimer = NULL;
×
253
  }
254

UNCOV
255
  if (pTask->msgInfo.pRetryTmr != NULL) {
×
UNCOV
256
    streamTmrStop(pTask->msgInfo.pRetryTmr);
×
UNCOV
257
    pTask->msgInfo.pRetryTmr = NULL;
×
258
  }
259

UNCOV
260
  if (pTask->inputq.queue) {
×
UNCOV
261
    streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
×
UNCOV
262
    pTask->inputq.queue = NULL;
×
263
  }
264

UNCOV
265
  if (pTask->outputq.queue) {
×
UNCOV
266
    streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
×
UNCOV
267
    pTask->outputq.queue = NULL;
×
268
  }
269

UNCOV
270
  if (pTask->exec.qmsg) {
×
UNCOV
271
    taosMemoryFree(pTask->exec.qmsg);
×
272
  }
273

UNCOV
274
  if (pTask->exec.pExecutor) {
×
UNCOV
275
    qDestroyTask(pTask->exec.pExecutor);
×
UNCOV
276
    pTask->exec.pExecutor = NULL;
×
277
  }
278

UNCOV
279
  if (pTask->exec.pWalReader != NULL) {
×
UNCOV
280
    walCloseReader(pTask->exec.pWalReader);
×
UNCOV
281
    pTask->exec.pWalReader = NULL;
×
282
  }
283

UNCOV
284
  streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
×
285

UNCOV
286
  if (pTask->msgInfo.pData != NULL) {
×
UNCOV
287
    clearBufferedDispatchMsg(pTask);
×
288
  }
289

UNCOV
290
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
×
UNCOV
291
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
×
UNCOV
292
    taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
×
UNCOV
293
    tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo);
×
UNCOV
294
    tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
×
UNCOV
295
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
UNCOV
296
    taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
×
UNCOV
297
  } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
×
298
    taosArrayDestroy(pTask->outputInfo.vtableMapDispatcher.taskInfos);
×
299
    tSimpleHashCleanup(pTask->outputInfo.vtableMapDispatcher.vtableMap);
×
300
  }
301

UNCOV
302
  streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
×
UNCOV
303
  streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0);
×
304

UNCOV
305
  if (pTask->pNameMap) {
×
UNCOV
306
    tSimpleHashCleanup(pTask->pNameMap);
×
307
  }
308

UNCOV
309
  if (pTask->status.pSM != NULL) {
×
UNCOV
310
    streamMutexDestroy(&pTask->lock);
×
UNCOV
311
    streamMutexDestroy(&pTask->msgInfo.lock);
×
UNCOV
312
    streamMutexDestroy(&pTask->taskCheckInfo.checkInfoLock);
×
313
  }
314

UNCOV
315
  taosArrayDestroy(pTask->pVTables);
×
UNCOV
316
  pTask->pVTables = NULL;
×
317

UNCOV
318
  streamDestroyStateMachine(pTask->status.pSM);
×
UNCOV
319
  pTask->status.pSM = NULL;
×
320

UNCOV
321
  streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
×
322

UNCOV
323
  taosMemoryFree(pTask->outputInfo.pTokenBucket);
×
324

UNCOV
325
  taosArrayDestroy(pTask->msgInfo.pSendInfo);
×
UNCOV
326
  pTask->msgInfo.pSendInfo = NULL;
×
327

UNCOV
328
  taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
×
UNCOV
329
  pTask->outputInfo.pNodeEpsetUpdateList = NULL;
×
330

UNCOV
331
  if (pTask->id.idStr != NULL) {
×
UNCOV
332
    taosMemoryFree((void*)pTask->id.idStr);
×
333
  }
334

UNCOV
335
  streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
×
UNCOV
336
  pTask->chkInfo.pActiveInfo = NULL;
×
337

UNCOV
338
  taosArrayDestroyP(pTask->notifyInfo.pNotifyAddrUrls, NULL);
×
UNCOV
339
  taosMemoryFreeClear(pTask->notifyInfo.streamName);
×
UNCOV
340
  taosMemoryFreeClear(pTask->notifyInfo.stbFullName);
×
UNCOV
341
  tDeleteSchemaWrapper(pTask->notifyInfo.pSchemaWrapper);
×
342

UNCOV
343
  pTask->notifyEventStat = (STaskNotifyEventStat){0};
×
344

UNCOV
345
  taosMemoryFree(pTask);
×
UNCOV
346
  stDebug("s-task:0x%x free task completed", taskId);
×
UNCOV
347
}
×
348

UNCOV
349
void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
×
UNCOV
350
  stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId);
×
UNCOV
351
  if (pTask->pState != NULL) {
×
UNCOV
352
    stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
×
UNCOV
353
    streamStateClose(pTask->pState, remove);
×
354

UNCOV
355
    if (remove) taskDbSetClearFileFlag(pTask->pBackend);
×
UNCOV
356
    taskDbRemoveRef(pTask->pBackend);
×
UNCOV
357
    pTask->pBackend = NULL;
×
UNCOV
358
    pTask->pState = NULL;
×
359

360
  } else {
UNCOV
361
    stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
×
362
            pTask->backendPath ? pTask->backendPath : "NULL");
UNCOV
363
    if (remove) {
×
UNCOV
364
      if (pTask->backendPath != NULL) {
×
UNCOV
365
        stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
×
UNCOV
366
        taosRemoveDir(pTask->backendPath);
×
367
      }
368
    }
369
  }
370

UNCOV
371
  if (pTask->backendPath != NULL) {
×
UNCOV
372
    taosMemoryFree(pTask->backendPath);
×
UNCOV
373
    pTask->backendPath = NULL;
×
374
  }
375
  // clear recal backend
376

UNCOV
377
  if (pTask->pRecalState != NULL) {
×
UNCOV
378
    stDebug("s-task:0x%x start to free recal task state", pTask->id.taskId);
×
UNCOV
379
    streamStateClose(pTask->pRecalState, remove);
×
380

UNCOV
381
    if (remove) taskDbSetClearFileFlag(pTask->pRecalBackend);
×
UNCOV
382
    taskDbRemoveRef(pTask->pRecalBackend);
×
UNCOV
383
    pTask->pRecalBackend = NULL;
×
UNCOV
384
    pTask->pRecalState = NULL;
×
385

386
  }  // else {
387
     //  stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
388
     //          pTask->backendPath ? pTask->backendPath : "NULL");
389
     //  if (remove) {
390
     //    if (pTask->backendPath != NULL) {
391
     //      stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
392
     //      taosRemoveDir(pTask->backendPath);
393
     //    }
394
     //  }
395
  //}
396
  // if (pTask->backendPath != NULL) {
397
  //   taosMemoryFree(pTask->backendPath);
398
  //   pTask->backendPath = NULL;
399
  // }
UNCOV
400
}
×
401

UNCOV
402
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
×
UNCOV
403
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
×
UNCOV
404
  SDataRange*      pRange = &pTask->dataRange;
×
405

406
  // only set the version info for stream tasks without fill-history task
UNCOV
407
  if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
×
UNCOV
408
    pChkInfo->checkpointVer = ver - 1;  // only update when generating checkpoint
×
UNCOV
409
    pChkInfo->processedVer = ver - 1;   // already processed version
×
UNCOV
410
    pChkInfo->nextProcessVer = ver;     // next processed version
×
411

UNCOV
412
    pRange->range.maxVer = ver;
×
UNCOV
413
    pRange->range.minVer = ver;
×
414
  } else {
415
    // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
416
    // is set at the mnode.
UNCOV
417
    if (pTask->info.fillHistory == 1) {
×
UNCOV
418
      pChkInfo->checkpointVer = pRange->range.maxVer;
×
UNCOV
419
      pChkInfo->processedVer = pRange->range.maxVer;
×
UNCOV
420
      pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
×
421
    } else {
UNCOV
422
      pChkInfo->checkpointVer = pRange->range.minVer - 1;
×
UNCOV
423
      pChkInfo->processedVer = pRange->range.minVer - 1;
×
UNCOV
424
      pChkInfo->nextProcessVer = pRange->range.minVer;
×
425

426
      {  // for compatible purpose, remove it later
UNCOV
427
        if (pRange->range.minVer == 0) {
×
UNCOV
428
          pChkInfo->checkpointVer = 0;
×
UNCOV
429
          pChkInfo->processedVer = 0;
×
UNCOV
430
          pChkInfo->nextProcessVer = 1;
×
UNCOV
431
          stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
×
432
        }
433
      }
434
    }
435
  }
UNCOV
436
}
×
437

UNCOV
438
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
×
UNCOV
439
  int64_t streamId = 0;
×
UNCOV
440
  int32_t taskId = 0;
×
441

UNCOV
442
  if (pTask->info.fillHistory) {
×
UNCOV
443
    streamId = pTask->streamTaskId.streamId;
×
UNCOV
444
    taskId = pTask->streamTaskId.taskId;
×
445
  } else {
UNCOV
446
    streamId = pTask->id.streamId;
×
UNCOV
447
    taskId = pTask->id.taskId;
×
448
  }
449

UNCOV
450
  char    id[128] = {0};
×
UNCOV
451
  int32_t nBytes = snprintf(id, tListLen(id), "0x%" PRIx64 "-0x%x", streamId, taskId);
×
UNCOV
452
  if (nBytes < 0 || nBytes >= sizeof(id)) {
×
453
    return TSDB_CODE_OUT_OF_BUFFER;
×
454
  }
455

UNCOV
456
  int32_t len = strlen(pTask->pMeta->path);
×
UNCOV
457
  pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
×
UNCOV
458
  if (pTask->backendPath == NULL) {
×
459
    return terrno;
×
460
  }
461

UNCOV
462
  int32_t code = snprintf(pTask->backendPath, len + nBytes + 2, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
×
UNCOV
463
  if (code < 0 || code >= len + nBytes + 2) {
×
UNCOV
464
    stError("s-task:%s failed to set backend path:%s, code: out of buffer", pTask->id.idStr, pTask->backendPath);
×
465
    return TSDB_CODE_OUT_OF_BUFFER;
×
466
  } else {
UNCOV
467
    stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
×
UNCOV
468
    return 0;
×
469
  }
470
}
471

UNCOV
472
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
×
UNCOV
473
  int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
×
UNCOV
474
  if (code) {
×
475
    stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
×
476
    return code;
×
477
  }
478

UNCOV
479
  pTask->id.refId = 0;
×
UNCOV
480
  pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
×
UNCOV
481
  pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
×
482

UNCOV
483
  int32_t code1 = streamQueueOpen(512 << 10, &pTask->inputq.queue);
×
UNCOV
484
  int32_t code2 = streamQueueOpen(512 << 10, &pTask->outputq.queue);
×
UNCOV
485
  if (code1 || code2) {
×
486
    stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
×
487
    return TSDB_CODE_OUT_OF_MEMORY;
×
488
  }
489

UNCOV
490
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
×
491

UNCOV
492
  code = streamCreateStateMachine(pTask);
×
UNCOV
493
  if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
×
494
    stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
×
495
            tstrerror(code));
496
    return code;
×
497
  }
498

UNCOV
499
  pTask->execInfo.created = taosGetTimestampMs();
×
UNCOV
500
  setInitialVersionInfo(pTask, ver);
×
501

UNCOV
502
  pTask->pMeta = pMeta;
×
UNCOV
503
  pTask->pMsgCb = pMsgCb;
×
UNCOV
504
  pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry));
×
UNCOV
505
  if (pTask->msgInfo.pSendInfo == NULL) {
×
506
    stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr);
×
507
    return terrno;
×
508
  }
509

UNCOV
510
  code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
×
UNCOV
511
  if (code) {
×
512
    stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code));
×
513
    return code;
×
514
  }
515

UNCOV
516
  TdThreadMutexAttr attr = {0};
×
UNCOV
517
  code = taosThreadMutexAttrInit(&attr);
×
UNCOV
518
  if (code != 0) {
×
519
    stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
×
520
    return code;
×
521
  }
522

UNCOV
523
  code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
×
UNCOV
524
  if (code != 0) {
×
525
    stError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
×
526
    return code;
×
527
  }
528

UNCOV
529
  code = taosThreadMutexInit(&pTask->lock, &attr);
×
UNCOV
530
  if (code) {
×
531
    return code;
×
532
  }
533

UNCOV
534
  code = taosThreadMutexAttrDestroy(&attr);
×
UNCOV
535
  if (code) {
×
536
    return code;
×
537
  }
538

UNCOV
539
  streamTaskOpenAllUpstreamInput(pTask);
×
540

UNCOV
541
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
×
UNCOV
542
  pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
×
UNCOV
543
  if (pOutputInfo->pTokenBucket == NULL) {
×
544
    stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno));
×
545
    return terrno;
×
546
  }
547

548
  // 2MiB per second for sink task
549
  // 50 times sink operator per second
UNCOV
550
  code = streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
×
UNCOV
551
  if (code) {
×
552
    return code;
×
553
  }
554

UNCOV
555
  pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
×
UNCOV
556
  if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
×
557
    stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno));
×
558
    return terrno;
×
559
  }
560

UNCOV
561
  pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
×
UNCOV
562
  if (pTask->taskCheckInfo.pList == NULL) {
×
563
    stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno));
×
564
    return terrno;
×
565
  }
566

UNCOV
567
  code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
×
UNCOV
568
  if (code) {
×
569
    return code;
×
570
  }
571

UNCOV
572
  if (pTask->chkInfo.pActiveInfo == NULL) {
×
UNCOV
573
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
×
UNCOV
574
    if (code) {
×
575
      stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code));
×
576
      return code;
×
577
    }
578
  }
579

UNCOV
580
  return streamTaskSetBackendPath(pTask);
×
581
}
582

UNCOV
583
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
×
UNCOV
584
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
×
UNCOV
585
    return 0;
×
586
  }
587

UNCOV
588
  int32_t type = pTask->outputInfo.type;
×
UNCOV
589
  if (type == TASK_OUTPUT__TABLE) {
×
UNCOV
590
    return 0;
×
UNCOV
591
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
×
UNCOV
592
    return 1;
×
UNCOV
593
  } else if (type == TASK_OUTPUT__VTABLE_MAP) {
×
594
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
595
    return taosArrayGetSize(pTaskInfos);
×
596
  } else {
UNCOV
597
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
UNCOV
598
    return taosArrayGetSize(vgInfo);
×
599
  }
600
}
601

UNCOV
602
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) { return taosArrayGetSize(pTask->upstreamInfo.pList); }
×
603

UNCOV
604
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
×
UNCOV
605
  SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
×
UNCOV
606
  if (pEpInfo == NULL) {
×
607
    return terrno;
×
608
  }
609

UNCOV
610
  if (pTask->upstreamInfo.pList == NULL) {
×
UNCOV
611
    pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES);
×
612
  }
613

UNCOV
614
  void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
×
UNCOV
615
  return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
×
616
}
617

UNCOV
618
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
×
UNCOV
619
  int32_t code = 0;
×
UNCOV
620
  char    buf[512] = {0};
×
UNCOV
621
  code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore error since it is only for log file.
×
UNCOV
622
  if (code != 0) {                                // print error and continue
×
623
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
624
    return code;
×
625
  }
626

UNCOV
627
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
×
UNCOV
628
  for (int32_t i = 0; i < numOfUpstream; ++i) {
×
UNCOV
629
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
×
UNCOV
630
    if (pInfo->nodeId == nodeId) {
×
UNCOV
631
      bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
×
UNCOV
632
      if (!equal) {
×
UNCOV
633
        *pUpdated = true;
×
634

UNCOV
635
        char tmp[512] = {0};
×
UNCOV
636
        code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
×
UNCOV
637
        if (code != 0) {  // print error and continue
×
638
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
639
          return code;
×
640
        }
641

UNCOV
642
        epsetAssign(&pInfo->epSet, pEpSet);
×
UNCOV
643
        stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
×
644
                pInfo->taskId, nodeId, buf, tmp);
645
      } else {
UNCOV
646
        stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId,
×
647
                pInfo->taskId, nodeId, buf);
648
      }
649

UNCOV
650
      break;
×
651
    }
652
  }
653

UNCOV
654
  return code;
×
655
}
656

UNCOV
657
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
×
UNCOV
658
  if (pUpstreamInfo->pList != NULL) {
×
UNCOV
659
    taosArrayDestroyEx(pUpstreamInfo->pList, freeUpstreamItem);
×
UNCOV
660
    pUpstreamInfo->numOfClosed = 0;
×
UNCOV
661
    pUpstreamInfo->pList = NULL;
×
662
  }
UNCOV
663
}
×
664

UNCOV
665
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
×
UNCOV
666
  STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
×
UNCOV
667
  pDispatcher->taskId = pDownstreamTask->id.taskId;
×
UNCOV
668
  pDispatcher->nodeId = pDownstreamTask->info.nodeId;
×
UNCOV
669
  pDispatcher->epSet = pDownstreamTask->info.epSet;
×
670

UNCOV
671
  pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
×
UNCOV
672
  pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
×
UNCOV
673
}
×
674

UNCOV
675
int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
×
UNCOV
676
  char    buf[512] = {0};
×
UNCOV
677
  int32_t code = epsetToStr(pEpSet, buf, tListLen(buf));  // ignore the error since only for log files.
×
UNCOV
678
  if (code != 0) {                                        // print error and continue
×
679
    stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
680
    return code;
×
681
  }
682

UNCOV
683
  int32_t id = pTask->id.taskId;
×
UNCOV
684
  int8_t  type = pTask->outputInfo.type;
×
685

UNCOV
686
  if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
UNCOV
687
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
688

UNCOV
689
    for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
×
UNCOV
690
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
×
UNCOV
691
      if (pVgInfo == NULL) {
×
692
        continue;
×
693
      }
694

UNCOV
695
      if (pVgInfo->vgId == nodeId) {
×
UNCOV
696
        bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
×
UNCOV
697
        if (!isEqual) {
×
UNCOV
698
          *pUpdated = true;
×
699

UNCOV
700
          char tmp[512] = {0};
×
UNCOV
701
          code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
×
UNCOV
702
          if (code != 0) {  // print error and continue
×
703
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
704
            return code;
×
705
          }
706

UNCOV
707
          epsetAssign(&pVgInfo->epSet, pEpSet);
×
UNCOV
708
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
×
709
                  nodeId, buf, tmp);
710
        } else {
UNCOV
711
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
×
712
                  pVgInfo->taskId, nodeId, buf);
713
        }
UNCOV
714
        break;
×
715
      }
716
    }
UNCOV
717
  } else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
×
UNCOV
718
    STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
×
UNCOV
719
    if (pDispatcher->nodeId == nodeId) {
×
UNCOV
720
      bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet);
×
UNCOV
721
      if (!equal) {
×
722
        *pUpdated = true;
×
723

724
        char tmp[512] = {0};
×
725
        code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
×
726
        if (code != 0) {  // print error and continue
×
727
          stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
728
          return code;
×
729
        }
730

731
        epsetAssign(&pDispatcher->epSet, pEpSet);
×
732
        stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
×
733
                nodeId, buf, tmp);
734
      } else {
UNCOV
735
        stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
×
736
                pDispatcher->taskId, nodeId, buf);
737
      }
738
    }
739
  } else if (type == TASK_OUTPUT__VTABLE_MAP) {
×
740
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
741

742
    for (int32_t i = 0; i < taosArrayGetSize(pTaskInfos); ++i) {
×
743
      STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
744
      if (pAddr == NULL) {
×
745
        continue;
×
746
      }
747

748
      if (pAddr->nodeId == nodeId) {
×
749
        bool isEqual = isEpsetEqual(&pAddr->epSet, pEpSet);
×
750
        if (!isEqual) {
×
751
          *pUpdated = true;
×
752

753
          char tmp[512] = {0};
×
754
          code = epsetToStr(&pAddr->epSet, tmp, tListLen(tmp));
×
755
          if (code != 0) {  // print error and continue
×
756
            stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code));
×
757
            return code;
×
758
          }
759

760
          epsetAssign(&pAddr->epSet, pEpSet);
×
761
          stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pAddr->taskId,
×
762
                  nodeId, buf, tmp);
763
        } else {
764
          stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
×
765
                  pAddr->taskId, nodeId, buf);
766
        }
767
        break;
×
768
      }
769
    }
770
  }
771

UNCOV
772
  return code;
×
773
}
774

UNCOV
775
int32_t streamTaskStop(SStreamTask* pTask) {
×
UNCOV
776
  int32_t     vgId = pTask->pMeta->vgId;
×
UNCOV
777
  int64_t     st = taosGetTimestampMs();
×
UNCOV
778
  const char* id = pTask->id.idStr;
×
779

UNCOV
780
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
×
UNCOV
781
  if (code) {
×
782
    stError("failed to handle STOP event, s-task:%s, code:%s", id, tstrerror(code));
×
783
    return code;
×
784
  }
785

UNCOV
786
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
×
UNCOV
787
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000);
×
UNCOV
788
    if (code != TSDB_CODE_SUCCESS) {
×
789
      stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
×
790
    }
791
  }
792

UNCOV
793
  while (!streamTaskIsIdle(pTask)) {
×
794
    stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
×
795
            pTask->info.taskLevel);
796
    taosMsleep(100);
×
797
  }
798

UNCOV
799
  int64_t el = taosGetTimestampMs() - st;
×
UNCOV
800
  stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
×
UNCOV
801
  return code;
×
802
}
803

UNCOV
804
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
×
UNCOV
805
  STaskExecStatisInfo* p = &pTask->execInfo;
×
806

UNCOV
807
  int32_t numOfNodes = taosArrayGetSize(pNodeList);
×
UNCOV
808
  int64_t prevTs = p->latestUpdateTs;
×
809

UNCOV
810
  p->latestUpdateTs = taosGetTimestampMs();
×
UNCOV
811
  p->updateCount += 1;
×
UNCOV
812
  stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
×
813
          numOfNodes, p->updateCount, prevTs);
814

UNCOV
815
  bool updated = false;
×
UNCOV
816
  for (int32_t i = 0; i < numOfNodes; ++i) {
×
UNCOV
817
    SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
×
UNCOV
818
    if (pInfo == NULL) {
×
819
      continue;
×
820
    }
821

UNCOV
822
    int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated);
×
UNCOV
823
    if (code) {
×
824
      stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code));
×
825
    }
826
  }
827

UNCOV
828
  return updated;
×
829
}
830

UNCOV
831
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
×
UNCOV
832
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
UNCOV
833
    return;
×
834
  }
835

UNCOV
836
  int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList);
×
UNCOV
837
  for (int32_t i = 0; i < size; ++i) {
×
UNCOV
838
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
×
UNCOV
839
    pInfo->stage = -1;
×
840
  }
841

UNCOV
842
  stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
×
843
}
844

UNCOV
845
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
×
UNCOV
846
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
×
UNCOV
847
  if (num == 0) {
×
UNCOV
848
    return;
×
849
  }
850

UNCOV
851
  for (int32_t i = 0; i < num; ++i) {
×
UNCOV
852
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
×
UNCOV
853
    pInfo->dataAllowed = true;
×
854
  }
855

UNCOV
856
  pTask->upstreamInfo.numOfClosed = 0;
×
UNCOV
857
  stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num);
×
858
}
859

UNCOV
860
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
×
UNCOV
861
  SStreamUpstreamEpInfo* pInfo = NULL;
×
UNCOV
862
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
×
863

UNCOV
864
  if ((pInfo != NULL) && pInfo->dataAllowed) {
×
UNCOV
865
    pInfo->dataAllowed = false;
×
UNCOV
866
    if (pTask->upstreamInfo.numOfClosed < streamTaskGetNumOfUpstream(pTask)) {
×
UNCOV
867
      int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
×
868
    } else {
869
      stError("s-task:%s not inc closed input, since they have been all closed already", pTask->id.idStr);
×
870
    }
871
  }
UNCOV
872
}
×
873

UNCOV
874
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
×
UNCOV
875
  SStreamUpstreamEpInfo* pInfo = NULL;
×
UNCOV
876
  streamTaskGetUpstreamTaskEpInfo(pTask, taskId, &pInfo);
×
877

UNCOV
878
  if (pInfo != NULL && (!pInfo->dataAllowed)) {
×
UNCOV
879
    int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
×
UNCOV
880
    stDebug("s-task:%s open inputQ for upstream:0x%x, remain closed:%d", pTask->id.idStr, taskId, t);
×
UNCOV
881
    pInfo->dataAllowed = true;
×
882
  }
UNCOV
883
}
×
884

885
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
×
886
  return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
×
887
}
888

UNCOV
889
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
×
UNCOV
890
  bool ret = false;
×
891

UNCOV
892
  streamMutexLock(&pTask->lock);
×
UNCOV
893
  if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
×
UNCOV
894
    pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
×
UNCOV
895
    ret = true;
×
896
  }
897

UNCOV
898
  streamMutexUnlock(&pTask->lock);
×
UNCOV
899
  return ret;
×
900
}
901

UNCOV
902
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
×
UNCOV
903
  streamMutexLock(&pTask->lock);
×
UNCOV
904
  int8_t status = pTask->status.schedStatus;
×
UNCOV
905
  if (status == TASK_SCHED_STATUS__WAITING) {
×
UNCOV
906
    pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
×
907
  }
UNCOV
908
  streamMutexUnlock(&pTask->lock);
×
909

UNCOV
910
  return status;
×
911
}
912

UNCOV
913
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
×
UNCOV
914
  streamMutexLock(&pTask->lock);
×
UNCOV
915
  int8_t status = pTask->status.schedStatus;
×
UNCOV
916
  pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
×
UNCOV
917
  streamMutexUnlock(&pTask->lock);
×
918

UNCOV
919
  return status;
×
920
}
921

UNCOV
922
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
×
UNCOV
923
  int32_t      code = 0;
×
UNCOV
924
  SStreamMeta* pMeta = pTask->pMeta;
×
UNCOV
925
  SStreamTask* pStreamTask = NULL;
×
926

UNCOV
927
  if (pTask->info.fillHistory == 0) {
×
UNCOV
928
    return code;
×
929
  }
930

931
  code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
×
932
  if (code == 0) {
×
933
    stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
×
934
            (int32_t)pTask->streamTaskId.taskId);
935

936
    streamMutexLock(&(pStreamTask->lock));
×
937
    CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
×
938

939
    if (resetRelHalt) {
×
940
      stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
×
941
              pTask->streamTaskId.taskId, streamTaskGetStatusStr(pStreamTask->status.taskStatus),
942
              streamTaskGetStatus(pStreamTask).name);
943
      pStreamTask->status.taskStatus = TASK_STATUS__READY;
×
944
    }
945

946
    code = streamMetaSaveTaskInMeta(pMeta, pStreamTask);
×
947
    streamMutexUnlock(&(pStreamTask->lock));
×
948

949
    streamMetaReleaseTask(pMeta, pStreamTask);
×
950
  }
951

952
  return code;
×
953
}
954

955
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
×
956
  SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
×
957
  if (pReq == NULL) {
×
958
    return terrno;
×
959
  }
960

961
  pReq->head.vgId = vgId;
×
962
  pReq->taskId = pTaskId->taskId;
×
963
  pReq->streamId = pTaskId->streamId;
×
964
  pReq->resetRelHalt = resetRelHalt;
×
965

966
  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
×
967
  int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
×
968
  if (code != TSDB_CODE_SUCCESS) {
×
969
    stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
×
970
  } else {
971
    stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
×
972
  }
973

974
  return code;
×
975
}
976

UNCOV
977
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
×
UNCOV
978
  int32_t                code = 0;
×
UNCOV
979
  int32_t                tlen = 0;
×
UNCOV
980
  int32_t                vgId = pTask->pMeta->vgId;
×
UNCOV
981
  const char*            id = pTask->id.idStr;
×
UNCOV
982
  SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo;
×
983

UNCOV
984
  SCheckpointReport req = {.streamId = pTask->id.streamId,
×
UNCOV
985
                           .taskId = pTask->id.taskId,
×
986
                           .nodeId = vgId,
987
                           .dropHTask = dropRelHTask,
UNCOV
988
                           .transId = pActive->transId,
×
UNCOV
989
                           .checkpointId = pActive->activeId,
×
UNCOV
990
                           .checkpointVer = pCheckpointInfo->processedVer,
×
UNCOV
991
                           .checkpointTs = pCheckpointInfo->startTs};
×
992

UNCOV
993
  tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code);
×
UNCOV
994
  if (code < 0) {
×
995
    stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code));
×
996
    return -1;
×
997
  }
998

UNCOV
999
  void* buf = rpcMallocCont(tlen);
×
UNCOV
1000
  if (buf == NULL) {
×
1001
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId,
×
1002
            tstrerror(TSDB_CODE_OUT_OF_MEMORY));
1003
    return -1;
×
1004
  }
1005

1006
  SEncoder encoder;
UNCOV
1007
  tEncoderInit(&encoder, buf, tlen);
×
UNCOV
1008
  if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
×
1009
    rpcFreeCont(buf);
×
1010
    tEncoderClear(&encoder);
×
1011
    stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
×
1012
    return -1;
×
1013
  }
UNCOV
1014
  tEncoderClear(&encoder);
×
1015

UNCOV
1016
  SRpcMsg msg = {0};
×
UNCOV
1017
  initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen);
×
UNCOV
1018
  stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId);
×
1019

UNCOV
1020
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
×
1021
}
1022

UNCOV
1023
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
×
UNCOV
1024
  STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
×
UNCOV
1025
  return id;
×
1026
}
1027

UNCOV
1028
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
×
UNCOV
1029
  pInfo->waitInterval = LAUNCH_HTASK_INTERVAL;
×
UNCOV
1030
  pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL);
×
UNCOV
1031
  pInfo->retryTimes = 0;
×
UNCOV
1032
}
×
1033

UNCOV
1034
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
×
UNCOV
1035
  pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
×
UNCOV
1036
  pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
×
UNCOV
1037
  pInfo->retryTimes += 1;
×
UNCOV
1038
}
×
1039

UNCOV
1040
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) {
×
UNCOV
1041
  pEntry->id.streamId = pTask->id.streamId;
×
UNCOV
1042
  pEntry->id.taskId = pTask->id.taskId;
×
UNCOV
1043
  pEntry->stage = -1;
×
UNCOV
1044
  pEntry->nodeId = pTask->info.nodeId;
×
UNCOV
1045
  pEntry->status = TASK_STATUS__STOP;
×
UNCOV
1046
}
×
1047

UNCOV
1048
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) {
×
UNCOV
1049
  pDst->stage = pSrc->stage;
×
UNCOV
1050
  pDst->inputQUsed = pSrc->inputQUsed;
×
UNCOV
1051
  pDst->inputRate = pSrc->inputRate;
×
UNCOV
1052
  pDst->procsTotal = pSrc->procsTotal;
×
UNCOV
1053
  pDst->procsThroughput = pSrc->procsThroughput;
×
UNCOV
1054
  pDst->outputTotal = pSrc->outputTotal;
×
UNCOV
1055
  pDst->outputThroughput = pSrc->outputThroughput;
×
UNCOV
1056
  pDst->processedVer = pSrc->processedVer;
×
UNCOV
1057
  pDst->verRange = pSrc->verRange;
×
UNCOV
1058
  pDst->sinkQuota = pSrc->sinkQuota;
×
UNCOV
1059
  pDst->sinkDataSize = pSrc->sinkDataSize;
×
UNCOV
1060
  pDst->checkpointInfo = pSrc->checkpointInfo;
×
UNCOV
1061
  pDst->startCheckpointId = pSrc->startCheckpointId;
×
UNCOV
1062
  pDst->startCheckpointVer = pSrc->startCheckpointVer;
×
UNCOV
1063
  pDst->status = pSrc->status;
×
1064

UNCOV
1065
  pDst->startTime = pSrc->startTime;
×
UNCOV
1066
  pDst->hTaskId = pSrc->hTaskId;
×
UNCOV
1067
  pDst->notifyEventStat = pSrc->notifyEventStat;
×
UNCOV
1068
}
×
1069

UNCOV
1070
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
×
UNCOV
1071
  SStreamMeta*         pMeta = pTask->pMeta;
×
UNCOV
1072
  STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
×
1073

UNCOV
1074
  STaskStatusEntry entry = {
×
UNCOV
1075
      .id = streamTaskGetTaskId(pTask),
×
UNCOV
1076
      .status = streamTaskGetStatus(pTask).state,
×
UNCOV
1077
      .nodeId = pMeta->vgId,
×
UNCOV
1078
      .stage = pMeta->stage,
×
1079

UNCOV
1080
      .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)),
×
UNCOV
1081
      .startTime = pExecInfo->readyTs,
×
UNCOV
1082
      .checkpointInfo.latestId = pTask->chkInfo.checkpointId,
×
UNCOV
1083
      .checkpointInfo.latestVer = pTask->chkInfo.checkpointVer,
×
UNCOV
1084
      .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
×
1085
      .checkpointInfo.latestSize = 0,
1086
      .checkpointInfo.remoteBackup = 0,
1087
      .checkpointInfo.consensusChkptId = 0,
1088
      .checkpointInfo.consensusTs = 0,
UNCOV
1089
      .hTaskId = pTask->hTaskInfo.id.taskId,
×
UNCOV
1090
      .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
×
UNCOV
1091
      .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
×
UNCOV
1092
      .procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput),
×
UNCOV
1093
      .outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput),
×
UNCOV
1094
      .startCheckpointId = pExecInfo->startCheckpointId,
×
UNCOV
1095
      .startCheckpointVer = pExecInfo->startCheckpointVer,
×
1096
      .notifyEventStat = pTask->notifyEventStat,
1097
  };
UNCOV
1098
  return entry;
×
1099
}
1100

UNCOV
1101
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
×
UNCOV
1102
  SStreamMeta* pMeta = pTask->pMeta;
×
UNCOV
1103
  int32_t      code = 0;
×
1104

UNCOV
1105
  int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
×
UNCOV
1106
  stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
×
1107

1108
  // in case of fill-history task, stop the tsdb file scan operation.
UNCOV
1109
  if (pTask->info.fillHistory == 1) {
×
1110
    void* pExecutor = pTask->exec.pExecutor;
×
1111
    code = qKillTask(pExecutor, TSDB_CODE_SUCCESS, 10000);
×
1112
  }
1113

UNCOV
1114
  stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
×
UNCOV
1115
  return code;
×
1116
}
1117

UNCOV
1118
void streamTaskPause(SStreamTask* pTask) {
×
UNCOV
1119
  int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
×
UNCOV
1120
  if (code) {
×
1121
    stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code));
×
1122
  }
UNCOV
1123
}
×
1124

UNCOV
1125
void streamTaskResume(SStreamTask* pTask) {
×
UNCOV
1126
  SStreamTaskState prevState = streamTaskGetStatus(pTask);
×
1127

UNCOV
1128
  SStreamMeta* pMeta = pTask->pMeta;
×
UNCOV
1129
  int32_t      code = streamTaskRestoreStatus(pTask);
×
UNCOV
1130
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
1131
    char*   pNew = streamTaskGetStatus(pTask).name;
×
UNCOV
1132
    int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
×
UNCOV
1133
    stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
×
1134
  } else {
UNCOV
1135
    stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name,
×
1136
           pMeta->numOfPausedTasks);
1137
  }
UNCOV
1138
}
×
1139

UNCOV
1140
bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
×
1141

1142
// this task must success
UNCOV
1143
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
×
1144
  int32_t     code;
UNCOV
1145
  int32_t     tlen = 0;
×
UNCOV
1146
  int32_t     vgId = pTask->pMeta->vgId;
×
UNCOV
1147
  const char* id = pTask->id.idStr;
×
1148

UNCOV
1149
  SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
×
UNCOV
1150
  tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
×
UNCOV
1151
  if (code < 0) {
×
1152
    stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
×
1153
    return TSDB_CODE_INVALID_MSG;
×
1154
  }
1155

UNCOV
1156
  void* buf = rpcMallocCont(tlen);
×
UNCOV
1157
  if (buf == NULL) {
×
1158
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:Out of memory", id, vgId);
×
1159
    return terrno;
×
1160
  }
1161

1162
  SEncoder encoder;
UNCOV
1163
  tEncoderInit(&encoder, buf, tlen);
×
UNCOV
1164
  if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
×
1165
    rpcFreeCont(buf);
×
1166
    tEncoderClear(&encoder);
×
1167
    stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
×
1168
    return code;
×
1169
  }
1170

UNCOV
1171
  tEncoderClear(&encoder);
×
1172

UNCOV
1173
  SRpcMsg msg = {0};
×
UNCOV
1174
  initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
×
UNCOV
1175
  stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
×
1176

UNCOV
1177
  return tmsgSendReq(&pTask->info.mnodeEpset, &msg);
×
1178
}
1179

UNCOV
1180
void streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId, SStreamUpstreamEpInfo** pEpInfo) {
×
UNCOV
1181
  *pEpInfo = NULL;
×
1182

UNCOV
1183
  int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
×
UNCOV
1184
  for (int32_t i = 0; i < num; ++i) {
×
UNCOV
1185
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
×
UNCOV
1186
    if (pInfo == NULL) {
×
1187
      return;
×
1188
    }
1189

UNCOV
1190
    if (pInfo->taskId == taskId) {
×
UNCOV
1191
      *pEpInfo = pInfo;
×
UNCOV
1192
      return;
×
1193
    }
1194
  }
1195

1196
  stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
×
1197
}
1198

1199
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
×
1200
  if (pTask->info.taskLevel == TASK_OUTPUT__FIXED_DISPATCH) {
×
1201
    if (pTask->outputInfo.fixedDispatcher.taskId == taskId) {
×
1202
      return &pTask->outputInfo.fixedDispatcher.epSet;
×
1203
    }
1204
  } else if (pTask->info.taskLevel == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
1205
    SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
1206
    for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
1207
      SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
×
1208
      if (pVgInfo == NULL) {
×
1209
        continue;
×
1210
      }
1211

1212
      if (pVgInfo->taskId == taskId) {
×
1213
        return &pVgInfo->epSet;
×
1214
      }
1215
    }
1216
  } else if (pTask->info.taskLevel == TASK_OUTPUT__VTABLE_MAP) {
×
1217
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
1218
    for (int32_t i = 0; i < taosArrayGetSize(pTaskInfos); ++i) {
×
1219
      STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
1220
      if (pAddr == NULL) {
×
1221
        continue;
×
1222
      }
1223

1224
      if (pAddr->taskId == taskId) {
×
1225
        return &pAddr->epSet;
×
1226
      }
1227
    }
1228
  }
1229

1230
  return NULL;
×
1231
}
1232

UNCOV
1233
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
×
UNCOV
1234
  char    buf[128] = {0};
×
UNCOV
1235
  int32_t code = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", streamId, taskId);
×
UNCOV
1236
  if (code < 0 || code >= tListLen(buf)) {
×
1237
    return TSDB_CODE_OUT_OF_BUFFER;
×
1238
  }
1239

UNCOV
1240
  *pId = taosStrdup(buf);
×
1241

UNCOV
1242
  if (*pId == NULL) {
×
1243
    return terrno;
×
1244
  } else {
UNCOV
1245
    return TSDB_CODE_SUCCESS;
×
1246
  }
1247
}
1248

UNCOV
1249
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
×
1250
  int32_t           code;
1251
  SStreamDataBlock* pData;
1252

UNCOV
1253
  code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock), (void**)&pData);
×
UNCOV
1254
  if (code) {
×
1255
    stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
×
1256
    return terrno = code;
×
1257
  }
1258

UNCOV
1259
  pData->type = STREAM_INPUT__DATA_RETRIEVE;
×
UNCOV
1260
  pData->srcVgId = 0;
×
1261

UNCOV
1262
  code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
×
UNCOV
1263
  if (code != TSDB_CODE_SUCCESS) {
×
1264
    stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code));
×
1265
    taosFreeQitem(pData);
×
1266
    return code;
×
1267
  }
1268

UNCOV
1269
  code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
×
UNCOV
1270
  if (code != TSDB_CODE_SUCCESS) {
×
1271
    stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
×
1272
            pTask->id.idStr);
1273
  }
1274

UNCOV
1275
  return code;
×
1276
}
1277

UNCOV
1278
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
×
UNCOV
1279
  int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
×
UNCOV
1280
  if (code != 0) {
×
1281
    return code;
×
1282
  }
UNCOV
1283
  return streamTrySchedExec(pTask, false);
×
1284
}
1285

UNCOV
1286
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
×
1287

1288
void streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
×
1289
  if (pTransId != NULL) {
×
1290
    *pTransId = pTask->chkInfo.pActiveInfo->transId;
×
1291
  }
1292

1293
  if (pCheckpointId != NULL) {
×
1294
    *pCheckpointId = pTask->chkInfo.pActiveInfo->activeId;
×
1295
  }
1296
}
×
1297

UNCOV
1298
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) {
×
UNCOV
1299
  pTask->chkInfo.pActiveInfo->activeId = activeCheckpointId;
×
UNCOV
1300
  return TSDB_CODE_SUCCESS;
×
1301
}
1302

1303
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
×
1304
  pTask->chkInfo.pActiveInfo->transId = transId;
×
1305
  pTask->chkInfo.pActiveInfo->activeId = checkpointId;
×
1306
  pTask->chkInfo.pActiveInfo->failedId = checkpointId;
×
1307
  stDebug("s-task:%s set failed checkpointId:%" PRId64, pTask->id.idStr, checkpointId);
×
1308
}
×
1309

UNCOV
1310
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
×
UNCOV
1311
  SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo));
×
UNCOV
1312
  if (pInfo == NULL) {
×
1313
    return terrno;
×
1314
  }
1315

UNCOV
1316
  int32_t code = taosThreadMutexInit(&pInfo->lock, NULL);
×
UNCOV
1317
  if (code != TSDB_CODE_SUCCESS) {
×
1318
    return code;
×
1319
  }
1320

UNCOV
1321
  pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
×
UNCOV
1322
  pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
×
UNCOV
1323
  pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskDownstreamReadyInfo));
×
1324

UNCOV
1325
  *pRes = pInfo;
×
UNCOV
1326
  return code;
×
1327
}
1328

UNCOV
1329
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
×
UNCOV
1330
  if (pInfo == NULL) {
×
UNCOV
1331
    return;
×
1332
  }
1333

UNCOV
1334
  streamMutexDestroy(&pInfo->lock);
×
UNCOV
1335
  taosArrayDestroy(pInfo->pDispatchTriggerList);
×
UNCOV
1336
  pInfo->pDispatchTriggerList = NULL;
×
UNCOV
1337
  taosArrayDestroy(pInfo->pReadyMsgList);
×
UNCOV
1338
  pInfo->pReadyMsgList = NULL;
×
UNCOV
1339
  taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
×
UNCOV
1340
  pInfo->pCheckpointReadyRecvList = NULL;
×
1341

UNCOV
1342
  SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
×
UNCOV
1343
  if (pTriggerTmr->tmrHandle != NULL) {
×
UNCOV
1344
    streamTmrStop(pTriggerTmr->tmrHandle);
×
UNCOV
1345
    pTriggerTmr->tmrHandle = NULL;
×
1346
  }
1347

UNCOV
1348
  SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
×
UNCOV
1349
  if (pReadyTmr->tmrHandle != NULL) {
×
UNCOV
1350
    streamTmrStop(pReadyTmr->tmrHandle);
×
UNCOV
1351
    pReadyTmr->tmrHandle = NULL;
×
1352
  }
1353

UNCOV
1354
  taosMemoryFree(pInfo);
×
1355
}
1356

1357
// NOTE: clear the checkpoint id, and keep the failed id
1358
// failedId for a task will increase as the checkpoint I.D. increases.
UNCOV
1359
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
×
UNCOV
1360
  pInfo->activeId = 0;
×
UNCOV
1361
  pInfo->transId = 0;
×
UNCOV
1362
  pInfo->allUpstreamTriggerRecv = 0;
×
UNCOV
1363
  pInfo->dispatchTrigger = false;
×
1364

UNCOV
1365
  taosArrayClear(pInfo->pDispatchTriggerList);
×
UNCOV
1366
  taosArrayClear(pInfo->pCheckpointReadyRecvList);
×
UNCOV
1367
}
×
1368

1369
const char* streamTaskGetExecType(int32_t type) {
993✔
1370
  switch (type) {
993!
UNCOV
1371
    case STREAM_EXEC_T_EXTRACT_WAL_DATA:
×
UNCOV
1372
      return "scan-wal-file";
×
1373
    case STREAM_EXEC_T_START_ALL_TASKS:
188✔
1374
      return "start-all-tasks";
188✔
UNCOV
1375
    case STREAM_EXEC_T_START_ONE_TASK:
×
UNCOV
1376
      return "start-one-task";
×
UNCOV
1377
    case STREAM_EXEC_T_RESTART_ALL_TASKS:
×
UNCOV
1378
      return "restart-all-tasks";
×
1379
    case STREAM_EXEC_T_STOP_ALL_TASKS:
806✔
1380
      return "stop-all-tasks";
806✔
UNCOV
1381
    case STREAM_EXEC_T_RESUME_TASK:
×
UNCOV
1382
      return "resume-task-from-idle";
×
1383
    case STREAM_EXEC_T_ADD_FAILED_TASK:
×
1384
      return "record-start-failed-task";
×
1385
    case STREAM_EXEC_T_STOP_ONE_TASK:
×
1386
      return "stop-one-task";
×
UNCOV
1387
    case 0:
×
UNCOV
1388
      return "exec-all-tasks";
×
1389
    default:
×
1390
      return "invalid-exec-type";
×
1391
  }
1392
}
1393

UNCOV
1394
int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) {
×
UNCOV
1395
  *pRefId = taosMemoryMalloc(sizeof(int64_t));
×
UNCOV
1396
  if (*pRefId != NULL) {
×
UNCOV
1397
    **pRefId = pTask->id.refId;
×
UNCOV
1398
    int32_t code = metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
×
UNCOV
1399
    if (code != 0) {
×
1400
      stError("s-task:%s failed to add refId:%" PRId64 " into refId-mgmt, code:%s", pTask->id.idStr, pTask->id.refId,
×
1401
              tstrerror(code));
1402
    }
UNCOV
1403
    return code;
×
1404
  } else {
1405
    stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno));
×
1406
    return terrno;
×
1407
  }
1408
}
1409

UNCOV
1410
void streamTaskFreeRefId(int64_t* pRefId) {
×
UNCOV
1411
  if (pRefId == NULL) {
×
UNCOV
1412
    return;
×
1413
  }
1414

UNCOV
1415
  metaRefMgtRemove(pRefId);
×
1416
}
1417

UNCOV
1418
static int32_t tEncodeStreamNotifyInfo(SEncoder* pEncoder, const SNotifyInfo* info) {
×
UNCOV
1419
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1420
  int32_t lino = 0;
×
1421

UNCOV
1422
  QUERY_CHECK_NULL(pEncoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
×
UNCOV
1423
  QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
×
1424

UNCOV
1425
  int32_t addrSize = taosArrayGetSize(info->pNotifyAddrUrls);
×
UNCOV
1426
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
×
UNCOV
1427
  for (int32_t i = 0; i < addrSize; ++i) {
×
1428
    const char* url = taosArrayGetP(info->pNotifyAddrUrls, i);
×
1429
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, url));
×
1430
  }
UNCOV
1431
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyEventTypes));
×
UNCOV
1432
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyErrorHandle));
×
UNCOV
1433
  if (addrSize > 0) {
×
1434
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->streamName));
×
1435
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->stbFullName));
×
1436
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, info->pSchemaWrapper));
×
1437
  }
1438

UNCOV
1439
_exit:
×
UNCOV
1440
  if (code != TSDB_CODE_SUCCESS) {
×
1441
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1442
  }
UNCOV
1443
  return code;
×
1444
}
1445

UNCOV
1446
static int32_t tDecodeStreamNotifyInfo(SDecoder* pDecoder, SNotifyInfo* info) {
×
UNCOV
1447
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1448
  int32_t lino = 0;
×
1449

UNCOV
1450
  QUERY_CHECK_NULL(pDecoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
×
UNCOV
1451
  QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
×
1452

UNCOV
1453
  int32_t addrSize = 0;
×
UNCOV
1454
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
×
UNCOV
1455
  info->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
×
UNCOV
1456
  QUERY_CHECK_NULL(info->pNotifyAddrUrls, code, lino, _exit, terrno);
×
UNCOV
1457
  for (int32_t i = 0; i < addrSize; ++i) {
×
1458
    char* url = NULL;
×
1459
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
×
1460
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
×
1461
    QUERY_CHECK_NULL(url, code, lino, _exit, terrno);
×
1462
    if (taosArrayPush(info->pNotifyAddrUrls, &url) == NULL) {
×
1463
      taosMemoryFree(url);
×
1464
      TAOS_CHECK_EXIT(terrno);
×
1465
    }
1466
  }
UNCOV
1467
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyEventTypes));
×
UNCOV
1468
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyErrorHandle));
×
UNCOV
1469
  if (addrSize > 0) {
×
1470
    char* name = NULL;
×
1471
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
×
1472
    info->streamName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
×
1473
    QUERY_CHECK_NULL(info->streamName, code, lino, _exit, terrno);
×
1474
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
×
1475
    info->stbFullName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
×
1476
    QUERY_CHECK_NULL(info->stbFullName, code, lino, _exit, terrno);
×
1477
    info->pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
×
1478
    if (info->pSchemaWrapper == NULL) {
×
1479
      TAOS_CHECK_EXIT(terrno);
×
1480
    }
1481
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, info->pSchemaWrapper));
×
1482
  }
1483

UNCOV
1484
_exit:
×
UNCOV
1485
  if (code != TSDB_CODE_SUCCESS) {
×
1486
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1487
  }
UNCOV
1488
  return code;
×
1489
}
1490

UNCOV
1491
int32_t tEncodeVTablesInfo(SEncoder* pEncoder, SArray* pVTables) {
×
UNCOV
1492
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1493
  int32_t lino = 0;
×
UNCOV
1494
  SVCTableMergeInfo* pMergeInfo = NULL;
×
UNCOV
1495
  int32_t mergeNum = taosArrayGetSize(pVTables);
×
UNCOV
1496
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, mergeNum));
×
UNCOV
1497
  for (int32_t i = 0; i < mergeNum; ++i) {
×
1498
    pMergeInfo = (SVCTableMergeInfo*)taosArrayGet(pVTables, i);
×
1499
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMergeInfo->uid));
×
1500
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMergeInfo->numOfSrcTbls));
×
1501
  }
1502

UNCOV
1503
_exit:
×
UNCOV
1504
  if (code != TSDB_CODE_SUCCESS) {
×
1505
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1506
  }
UNCOV
1507
  return code;
×
1508
}
1509

UNCOV
1510
static int32_t tDecodeVTablesInfo(SDecoder* pDecoder, SArray** pTables) {
×
UNCOV
1511
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1512
  int32_t lino = 0;
×
1513

UNCOV
1514
  int32_t mergeNum = 0;
×
UNCOV
1515
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &mergeNum));
×
UNCOV
1516
  if (mergeNum > 0) {
×
1517
    *pTables = taosArrayInit(mergeNum, sizeof(SVCTableMergeInfo));
×
1518
    QUERY_CHECK_NULL(*pTables, code, lino, _exit, terrno);
×
1519
  }
1520

1521
  SVCTableMergeInfo mergeInfo;
UNCOV
1522
  for (int32_t i = 0; i < mergeNum; ++i) {
×
1523
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &mergeInfo.uid));
×
1524
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &mergeInfo.numOfSrcTbls));
×
1525
    if (taosArrayPush(*pTables, &mergeInfo) == NULL) {
×
1526
      TAOS_CHECK_EXIT(terrno);
×
1527
    }
1528
  }
1529

UNCOV
1530
_exit:
×
UNCOV
1531
  if (code != TSDB_CODE_SUCCESS) {
×
1532
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1533
  }
UNCOV
1534
  return code;
×
1535
}
1536

1537
int32_t tSerializeDispatcherTaskInfo(SEncoder* pEncoder, const SArray* pTaskInfos) {
×
1538
  int32_t code = TSDB_CODE_SUCCESS;
×
1539
  int32_t lino = 0;
×
1540
  int32_t nTasks = taosArrayGetSize(pTaskInfos);
×
1541
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, nTasks));
×
1542
  for (int32_t i = 0; i < nTasks; ++i) {
×
1543
    STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
1544
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pAddr->taskId));
×
1545
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pAddr->nodeId));
×
1546
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pAddr->epSet));
×
1547
  }
1548

1549
_exit:
×
1550
  if (code != TSDB_CODE_SUCCESS) {
×
1551
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1552
  }
1553
  return code;
×
1554
}
1555

1556
int32_t tSerializeDispatcherVtableMap(SEncoder* pEncoder, const SSHashObj* pVtables) {
×
1557
  int32_t code = TSDB_CODE_SUCCESS;
×
1558
  int32_t lino = 0;
×
1559
  int32_t tbNum = tSimpleHashGetSize(pVtables);
×
1560
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, tbNum));
×
1561
  int32_t iter = 0;
×
1562
  void*   p = tSimpleHashIterate(pVtables, NULL, &iter);
×
1563
  while (p != NULL) {
×
1564
    int64_t* pUid = tSimpleHashGetKey(p, NULL);
×
1565
    int32_t* pIdx = p;
×
1566
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, *pUid));
×
1567
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pIdx));
×
1568
    p = tSimpleHashIterate(pVtables, p, &iter);
×
1569
  }
1570

1571
_exit:
×
1572
  if (code != TSDB_CODE_SUCCESS) {
×
1573
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1574
  }
1575
  return code;
×
1576
}
1577

1578
int32_t tDeserializeDispatcherTaskInfo(SDecoder* pDecoder, SArray** ppTaskInfos) {
×
1579
  int32_t code = TSDB_CODE_SUCCESS;
×
1580
  int32_t lino = 0;
×
1581
  int32_t nTasks = 0;
×
1582
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &nTasks));
×
1583
  if (nTasks <= 0) {
×
1584
    return code;
×
1585
  }
1586

1587
  *ppTaskInfos = taosArrayInit(nTasks, sizeof(STaskDispatcherFixed));
×
1588
  QUERY_CHECK_NULL(*ppTaskInfos, code, lino, _exit, terrno);
×
1589

1590
  STaskDispatcherFixed addr;
1591
  for (int32_t i = 0; i < nTasks; ++i) {
×
1592
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addr.taskId));
×
1593
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addr.nodeId));
×
1594
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &addr.epSet));
×
1595
    void* px = taosArrayPush(*ppTaskInfos, &addr);
×
1596
    QUERY_CHECK_NULL(px, code, lino, _exit, terrno);
×
1597
  }
1598

1599
_exit:
×
1600
  if (code != TSDB_CODE_SUCCESS) {
×
1601
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1602
  }
1603
  return code;
×
1604
}
1605

1606
int32_t tDeserializeDispatcherVtableMap(SDecoder* pDecoder, SSHashObj** ppVtables) {
×
1607
  int32_t code = TSDB_CODE_SUCCESS;
×
1608
  int32_t lino = 0;
×
1609
  int32_t tbNum = 0;
×
1610
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &tbNum));
×
1611
  if (tbNum <= 0) {
×
1612
    return code;
×
1613
  }
1614

1615
  *ppVtables = tSimpleHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1616
  QUERY_CHECK_NULL(*ppVtables, code, lino, _exit, terrno);
×
1617

1618
  uint64_t uid = 0;
×
1619
  int32_t  idx = 0;
×
1620
  for (int32_t i = 0; i < tbNum; ++i) {
×
1621
    TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &uid));
×
1622
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &idx));
×
1623
    TAOS_CHECK_EXIT(tSimpleHashPut(*ppVtables, &uid, sizeof(uid), &idx, sizeof(idx)));
×
1624
  }
1625

1626
_exit:
×
1627

1628
  if (code != TSDB_CODE_SUCCESS) {
×
1629
    stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1630
  }
1631
  return code;
×
1632
}
1633

UNCOV
1634
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
×
UNCOV
1635
  int32_t code = 0;
×
1636
  int32_t lino;
1637

UNCOV
1638
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
UNCOV
1639
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
×
UNCOV
1640
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
×
UNCOV
1641
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
×
UNCOV
1642
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
×
UNCOV
1643
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
×
UNCOV
1644
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
×
UNCOV
1645
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
×
1646

UNCOV
1647
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
×
UNCOV
1648
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
×
1649

UNCOV
1650
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
×
UNCOV
1651
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
×
UNCOV
1652
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
×
UNCOV
1653
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
×
1654

UNCOV
1655
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
×
UNCOV
1656
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
×
UNCOV
1657
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
×
1658

UNCOV
1659
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
×
UNCOV
1660
  int32_t taskId = pTask->hTaskInfo.id.taskId;
×
UNCOV
1661
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
×
1662

UNCOV
1663
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
×
UNCOV
1664
  taskId = pTask->streamTaskId.taskId;
×
UNCOV
1665
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
×
1666

UNCOV
1667
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
×
UNCOV
1668
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
×
UNCOV
1669
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
×
UNCOV
1670
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
×
1671

UNCOV
1672
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
×
UNCOV
1673
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
×
UNCOV
1674
  for (int32_t i = 0; i < epSz; i++) {
×
UNCOV
1675
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
×
UNCOV
1676
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
×
1677
  }
1678

UNCOV
1679
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
×
UNCOV
1680
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
×
1681
  }
1682

UNCOV
1683
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
×
UNCOV
1684
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
×
UNCOV
1685
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
×
UNCOV
1686
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
×
UNCOV
1687
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
×
1688
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
×
UNCOV
1689
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
×
1690
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
UNCOV
1691
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
×
UNCOV
1692
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
×
UNCOV
1693
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
×
UNCOV
1694
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
×
UNCOV
1695
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
UNCOV
1696
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
×
UNCOV
1697
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
×
UNCOV
1698
  } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
×
1699
    TAOS_CHECK_EXIT(tSerializeDispatcherTaskInfo(pEncoder, pTask->outputInfo.vtableMapDispatcher.taskInfos));
×
1700
    TAOS_CHECK_EXIT(tSerializeDispatcherVtableMap(pEncoder, pTask->outputInfo.vtableMapDispatcher.vtableMap));
×
1701
  }
UNCOV
1702
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
×
UNCOV
1703
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
×
UNCOV
1704
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
×
1705

UNCOV
1706
  if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
×
UNCOV
1707
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.hasAggTasks));
×
UNCOV
1708
    TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo));
×
UNCOV
1709
    TAOS_CHECK_EXIT(tEncodeVTablesInfo(pEncoder, pTask->pVTables));
×
1710
  }
1711

UNCOV
1712
  tEndEncode(pEncoder);
×
UNCOV
1713
_exit:
×
UNCOV
1714
  return code;
×
1715
}
1716

UNCOV
1717
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
×
UNCOV
1718
  int32_t taskId = 0;
×
UNCOV
1719
  int32_t code = 0;
×
1720
  int32_t lino;
1721

UNCOV
1722
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
UNCOV
1723
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
×
UNCOV
1724
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
×
1725
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
1726
  }
1727

UNCOV
1728
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
×
UNCOV
1729
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
×
UNCOV
1730
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
×
UNCOV
1731
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
×
UNCOV
1732
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
×
UNCOV
1733
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
×
1734

UNCOV
1735
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
×
UNCOV
1736
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
×
1737

UNCOV
1738
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
×
UNCOV
1739
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
×
UNCOV
1740
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
×
UNCOV
1741
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
×
1742

UNCOV
1743
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
×
UNCOV
1744
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
×
UNCOV
1745
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
×
1746

UNCOV
1747
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
×
UNCOV
1748
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
×
UNCOV
1749
  pTask->hTaskInfo.id.taskId = taskId;
×
1750

UNCOV
1751
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
×
UNCOV
1752
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
×
UNCOV
1753
  pTask->streamTaskId.taskId = taskId;
×
1754

UNCOV
1755
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
×
UNCOV
1756
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
×
UNCOV
1757
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
×
UNCOV
1758
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
×
1759

UNCOV
1760
  int32_t epSz = -1;
×
UNCOV
1761
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
×
1762

UNCOV
1763
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
×
UNCOV
1764
    TAOS_CHECK_EXIT(terrno);
×
1765
  }
UNCOV
1766
  for (int32_t i = 0; i < epSz; i++) {
×
UNCOV
1767
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
×
UNCOV
1768
    if (pInfo == NULL) {
×
1769
      TAOS_CHECK_EXIT(terrno);
×
1770
    }
UNCOV
1771
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
×
1772
      taosMemoryFreeClear(pInfo);
×
1773
      goto _exit;
×
1774
    }
UNCOV
1775
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
×
1776
      TAOS_CHECK_EXIT(terrno);
×
1777
    }
1778
  }
1779

UNCOV
1780
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
×
UNCOV
1781
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
×
1782
  }
1783

UNCOV
1784
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
×
UNCOV
1785
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
×
UNCOV
1786
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
×
UNCOV
1787
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
×
UNCOV
1788
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
×
1789
      TAOS_CHECK_EXIT(terrno);
×
1790
    }
UNCOV
1791
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
×
UNCOV
1792
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
×
1793
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
×
UNCOV
1794
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
×
1795
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
UNCOV
1796
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
×
UNCOV
1797
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
×
UNCOV
1798
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
×
UNCOV
1799
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
×
UNCOV
1800
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
UNCOV
1801
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
×
UNCOV
1802
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
×
UNCOV
1803
  } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
×
1804
    TAOS_CHECK_EXIT(tDeserializeDispatcherTaskInfo(pDecoder, &pTask->outputInfo.vtableMapDispatcher.taskInfos));
×
1805
    TAOS_CHECK_EXIT(tDeserializeDispatcherVtableMap(pDecoder, &pTask->outputInfo.vtableMapDispatcher.vtableMap));
×
1806
  }
UNCOV
1807
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
×
UNCOV
1808
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
UNCOV
1809
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
×
1810
  }
UNCOV
1811
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
×
1812

UNCOV
1813
  if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
×
UNCOV
1814
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.hasAggTasks));
×
UNCOV
1815
    TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo));
×
UNCOV
1816
    TAOS_CHECK_EXIT(tDecodeVTablesInfo(pDecoder, &pTask->pVTables));
×
1817
  }
1818

UNCOV
1819
  tEndDecode(pDecoder);
×
1820

UNCOV
1821
_exit:
×
UNCOV
1822
  return code;
×
1823
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc