• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.84
/source/libs/stream/src/streamCheckStatus.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cos.h"
17
#include "rsync.h"
18
#include "streamBackendRocksdb.h"
19
#include "streamInt.h"
20

21
#define CHECK_NOT_RSP_DURATION 60 * 1000  // 60 sec
22

23
static void    processDownstreamReadyRsp(SStreamTask* pTask);
24
static void    rspMonitorFn(void* param, void* tmrId);
25
static void    streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
26
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
27
static void    streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
28
static void    streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id);
29
static int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
30
static void    handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList);
31
static void    handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList);
32
static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs,
33
                                         int64_t reqId, int32_t* pNotReady, const char* id);
34
static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId);
35
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
36
                              int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
37
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId);
38
static void    findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo);
39

40
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
23,303✔
41
                              int64_t* oldStage) {
42
  SStreamUpstreamEpInfo* pInfo = NULL;
23,303✔
43
  streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo);
23,303✔
44
  if (pInfo == NULL) {
23,303!
45
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
46
  }
47

48
  *oldStage = pInfo->stage;
23,303✔
49
  const char* id = pTask->id.idStr;
23,303✔
50
  if (stage == -1) {
23,303!
51
    stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
×
52
            upstreamTaskId, vgId, stage);
53
    return 0;
×
54
  }
55

56
  if (pInfo->stage == -1) {
23,303✔
57
    pInfo->stage = stage;
20,417✔
58
    stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id,
20,417✔
59
            upstreamTaskId, vgId, stage);
60
  }
61

62
  if (pInfo->stage < stage) {
23,303✔
63
    stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
26!
64
            ", prev:%" PRId64,
65
            id, upstreamTaskId, vgId, stage, pInfo->stage);
66
    // record the checkpoint failure id and sent to mnode
67
    streamTaskSetCheckpointFailed(pTask);
26✔
68
  }
69

70
  if (pInfo->stage != stage) {
23,303✔
71
    return TASK_UPSTREAM_NEW_STAGE;
26✔
72
  } else if (pTask->status.downstreamReady != 1) {
23,277✔
73
    stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
2,885✔
74
    return TASK_DOWNSTREAM_NOT_READY;
2,885✔
75
  } else {
76
    return TASK_DOWNSTREAM_READY;
20,392✔
77
  }
78
}
79

80
// check status
81
void streamTaskSendCheckMsg(SStreamTask* pTask) {
14,315✔
82
  SDataRange*  pRange = &pTask->dataRange;
14,315✔
83
  STimeWindow* pWindow = &pRange->window;
14,315✔
84
  const char*  idstr = pTask->id.idStr;
14,315✔
85
  int32_t      code = 0;
14,315✔
86

87
  SStreamTaskCheckReq req = {
14,315✔
88
      .streamId = pTask->id.streamId,
14,315✔
89
      .upstreamTaskId = pTask->id.taskId,
14,315✔
90
      .upstreamNodeId = pTask->info.nodeId,
14,315✔
91
      .childId = pTask->info.selfChildId,
14,315✔
92
      .stage = pTask->pMeta->stage,
14,315✔
93
  };
94

95
  // serialize streamProcessScanHistoryFinishRsp
96
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
14,315✔
97
    streamTaskStartMonitorCheckRsp(pTask);
903✔
98

99
    STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
903✔
100

101
    setCheckDownstreamReqInfo(&req, tGenIdPI64(), pDispatch->taskId, pDispatch->nodeId);
903✔
102
    streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr);
903✔
103

104
    stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
903✔
105
            " window:%" PRId64 "-%" PRId64 " QID:0x%" PRIx64,
106
            idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
107
            pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
108

109
    code = streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
903✔
110
                              &pTask->outputInfo.fixedDispatcher.epSet);
111

112
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
13,412✔
113
    streamTaskStartMonitorCheckRsp(pTask);
6,301✔
114

115
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
6,301✔
116

117
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
6,301✔
118
    stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, idstr,
6,301✔
119
            numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
120

121
    for (int32_t i = 0; i < numOfVgs; i++) {
25,896✔
122
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
19,595✔
123
      if (pVgInfo == NULL) {
19,595!
124
        continue;
×
125
      }
126

127
      setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId);
19,595✔
128
      streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
19,594✔
129

130
      stDebug("s-task:%s (vgId:%d) stage:%" PRId64
19,595✔
131
              " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, QID:0x%" PRIx64,
132
              idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
133
      code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
19,595✔
134
    }
135
  } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
7,111!
NEW
136
    streamTaskStartMonitorCheckRsp(pTask);
×
137

NEW
138
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
NEW
139
    int32_t numTasks = taosArrayGetSize(pTaskInfos);
×
NEW
140
    stDebug("s-task:%s check %d vtable downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
×
141
            idstr, numTasks, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
142

NEW
143
    for (int32_t i = 0; i < numTasks; ++i) {
×
NEW
144
      STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
NEW
145
      if (pAddr == NULL) {
×
NEW
146
        continue;
×
147
      }
148

NEW
149
      setCheckDownstreamReqInfo(&req, tGenIdPI64(), pAddr->taskId, pAddr->nodeId);
×
NEW
150
      streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pAddr->taskId, pAddr->nodeId, idstr);
×
151

NEW
152
      stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check vtable downstream task:0x%x (vgId:%d), QID:0x%" PRIx64,
×
153
              idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
NEW
154
      code = streamSendCheckMsg(pTask, &req, pAddr->nodeId, &pAddr->epSet);
×
NEW
155
      if (code != TSDB_CODE_SUCCESS) {
×
NEW
156
        stError("s-task:%s failed to send check msg to vtable downstream task:0x%x (vgId:%d), code:%s", idstr,
×
157
                req.downstreamTaskId, req.downstreamNodeId, tstrerror(code));
158
      }
159
    }
160
  } else {  // for sink task, set it ready directly.
161
    stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
7,111✔
162
    streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
7,111✔
163
    processDownstreamReadyRsp(pTask);
7,114✔
164
  }
165

166
  if (code) {
14,317!
167
    stError("s-task:%s failed to send check msg to downstream, code:%s", idstr, tstrerror(code));
×
168
  }
169
}
14,317✔
170

171
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) {
23,331✔
172
  int32_t taskId = pReq->downstreamTaskId;
23,331✔
173

174
  *pRsp = (SStreamTaskCheckRsp){
23,331✔
175
      .reqId = pReq->reqId,
23,331✔
176
      .streamId = pReq->streamId,
23,331✔
177
      .childId = pReq->childId,
23,331✔
178
      .downstreamNodeId = pReq->downstreamNodeId,
23,331✔
179
      .downstreamTaskId = pReq->downstreamTaskId,
23,331✔
180
      .upstreamNodeId = pReq->upstreamNodeId,
23,331✔
181
      .upstreamTaskId = pReq->upstreamTaskId,
23,331✔
182
  };
183

184
  // only the leader node handle the check request
185
  if (pMeta->role == NODE_ROLE_FOLLOWER) {
23,331✔
186
    stError(
1!
187
        "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
188
        taskId, pReq->upstreamTaskId, pReq->upstreamNodeId, pMeta->vgId);
189
    pRsp->status = TASK_DOWNSTREAM_NOT_LEADER;
1✔
190
  } else {
191
    SStreamTask* pTask = NULL;
23,330✔
192
    int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, taskId, &pTask);
23,330✔
193
    if (pTask != NULL) {
23,330✔
194
      pRsp->status =
23,303✔
195
          streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
23,303✔
196

197
      SStreamTaskState pState = streamTaskGetStatus(pTask);
23,303✔
198
      stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(QID:0x%" PRIx64
23,303✔
199
              ") task:0x%x (vgId:%d), check_status:%d",
200
              pTask->id.idStr, pState.name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
201
              pRsp->status);
202
      streamMetaReleaseTask(pMeta, pTask);
23,303✔
203
    } else {
204
      pRsp->status = TASK_DOWNSTREAM_NOT_READY;
27✔
205
      stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(QID:0x%" PRIx64
27✔
206
              ") from task:0x%x (vgId:%d), rsp check_status %d",
207
              pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status);
208
    }
209
  }
210
}
23,331✔
211

212
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
23,296✔
213
  int64_t         now = taosGetTimestampMs();
23,297✔
214
  const char*     id = pTask->id.idStr;
23,297✔
215
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
23,297✔
216
  int32_t         total = streamTaskGetNumOfDownstream(pTask);
23,297✔
217
  int32_t         left = -1;
23,296✔
218

219
  if (streamTaskShouldStop(pTask)) {
23,296!
220
    stDebug("s-task:%s should stop, do not do check downstream again", id);
×
221
    return TSDB_CODE_SUCCESS;
×
222
  }
223

224
  if (pTask->id.taskId != pRsp->upstreamTaskId) {
23,297!
225
    stError("s-task:%s invalid check downstream rsp, upstream task:0x%x discard", id, pRsp->upstreamTaskId);
×
226
    return TSDB_CODE_INVALID_MSG;
×
227
  }
228

229
  if (pRsp->status == TASK_DOWNSTREAM_READY) {
23,297✔
230
    int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
20,388✔
231
    if (code != TSDB_CODE_SUCCESS) {
20,388!
232
      return TSDB_CODE_SUCCESS;
×
233
    }
234

235
    if (left == 0) {
20,388✔
236
      processDownstreamReadyRsp(pTask);  // all downstream tasks are ready, set the complete check downstream flag
7,144✔
237
      streamTaskStopMonitorCheckRsp(pInfo, id);
7,144✔
238
    } else {
239
      stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
13,244✔
240
              pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
241
    }
242
  } else {  // not ready, wait for 100ms and retry
243
    int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
2,909✔
244
    if (code != TSDB_CODE_SUCCESS) {
2,909✔
245
      return TSDB_CODE_SUCCESS;  // return success in any cases.
2✔
246
    }
247

248
    if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
2,907✔
249
      if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
27✔
250
        stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
26!
251
                ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
252
                id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
253
        code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
26✔
254
      } else {
255
        stError(
1!
256
            "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
257
            "downstream again, nodeUpdate needed",
258
            id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
259
        code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
1✔
260
      }
261

262
      streamMetaAddFailedTaskSelf(pTask, now);
27✔
263
    } else {  // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
264
      stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
2,880✔
265
              pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
266
    }
267
  }
268

269
  return 0;
23,295✔
270
}
271

272
int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
23,331✔
273
                               SRpcHandleInfo* pRpcInfo, int32_t taskId) {
274
  SEncoder encoder;
275
  int32_t  code = 0;
23,331✔
276
  int32_t  len;
277

278
  tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
23,331!
279
  if (code < 0) {
23,331!
280
    stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
×
281
    return TSDB_CODE_INVALID_MSG;
×
282
  }
283

284
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
23,331✔
285
  if (buf == NULL) {
23,330!
286
    stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__,
×
287
            tstrerror(code));
288
    return terrno;
×
289
  }
290

291
  ((SMsgHead*)buf)->vgId = htonl(vgId);
23,330✔
292

293
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
23,330✔
294
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
23,330✔
295
  code = tEncodeStreamTaskCheckRsp(&encoder, pRsp);
23,330✔
296
  tEncoderClear(&encoder);
23,331✔
297

298
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
23,331✔
299
  tmsgSendRsp(&rspMsg);
23,331✔
300

301
  code = TMIN(code, 0);
23,331✔
302
  return code;
23,331✔
303
}
304

305
void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
7,202✔
306
  int32_t         vgId = pTask->pMeta->vgId;
7,202✔
307
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
7,202✔
308

309
  streamMutexLock(&pInfo->checkInfoLock);
7,202✔
310

311
  // drop procedure already started, not start check downstream now
312
  ETaskStatus s = streamTaskGetStatus(pTask).state;
7,201✔
313
  if (s == TASK_STATUS__DROPPING) {
7,200!
314
    stDebug("s-task:%s task not in uninit status, status:%s not start monitor check-rsp", pTask->id.idStr,
×
315
            streamTaskGetStatusStr(s));
316
    streamMutexUnlock(&pInfo->checkInfoLock);
×
317
    return;
×
318
  }
319

320
  int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
7,200✔
321
  if (code != TSDB_CODE_SUCCESS) {
7,199!
322
    streamMutexUnlock(&pInfo->checkInfoLock);
×
323
    return;
×
324
  }
325

326
  streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
7,197✔
327

328
  int64_t* pTaskRefId = NULL;
7,201✔
329
  code = streamTaskAllocRefId(pTask, &pTaskRefId);
7,201✔
330
  if (code == 0) {
7,204!
331
    streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTaskRefId, streamTimer, &pInfo->checkRspTmr, vgId,
7,204✔
332
                   "check-status-monitor");
333
  }
334

335
  streamMutexUnlock(&pInfo->checkInfoLock);
7,204✔
336
}
337

338
void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
14,374✔
339
  streamMutexLock(&pInfo->checkInfoLock);
14,374✔
340
  pInfo->stopCheckProcess = 1;
14,375✔
341
  streamMutexUnlock(&pInfo->checkInfoLock);
14,375✔
342

343
  stDebug("s-task:%s set stop check-rsp monitor flag", id);
14,376✔
344
}
14,376✔
345

346
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
63,784✔
347
  taosArrayDestroy(pInfo->pList);
63,784✔
348
  pInfo->pList = NULL;
63,793✔
349

350
  if (pInfo->checkRspTmr != NULL) {
63,793✔
351
    streamTmrStop(pInfo->checkRspTmr);
7,202✔
352
    pInfo->checkRspTmr = NULL;
7,204✔
353
  }
354
}
63,795✔
355

356
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
357
void processDownstreamReadyRsp(SStreamTask* pTask) {
14,256✔
358
  EStreamTaskEvent event = (pTask->info.fillHistory != STREAM_HISTORY_TASK) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
14,256✔
359
  int32_t          code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
14,256✔
360
  if (code) {
14,258✔
361
    stError("s-task:%s failed to set event succ, code:%s", pTask->id.idStr, tstrerror(code));
2!
362
  }
363

364
  int64_t checkTs = pTask->execInfo.checkTs;
14,258✔
365
  int64_t readyTs = pTask->execInfo.readyTs;
14,258✔
366
  code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
14,258✔
367
  if (code) {
14,255!
368
    stError("s-task:%s failed to record the downstream task status, code:%s", pTask->id.idStr, tstrerror(code));
×
369
  }
370

371
  if (pTask->status.taskStatus == TASK_STATUS__HALT) {
14,255✔
372
    if (!HAS_RELATED_FILLHISTORY_TASK(pTask) || (pTask->info.fillHistory != 0)) {
79!
373
      stError("s-task:%s status:halt fillhistory:%d not handle the ready rsp", pTask->id.idStr,
×
374
              pTask->info.fillHistory);
375
    }
376

377
    // halt itself for count window stream task until the related fill history task completed.
378
    stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
79✔
379
            pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
380
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
79✔
381
    if (code != 0) {  // todo: handle error
79!
382
      stError("s-task:%s failed to handle halt event, code:%s", pTask->id.idStr, tstrerror(code));
×
383
    }
384
  }
385

386
  // start the related fill-history task, when current task is ready
387
  // not invoke in success callback due to the deadlock.
388
  // todo: let's retry
389
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
14,255✔
390
    stDebug("s-task:%s try to launch related task", pTask->id.idStr);
4,820✔
391
    code = streamLaunchFillHistoryTask(pTask);
4,820✔
392
    if (code) {
4,822!
393
      stError("s-task:%s failed to launch related task, code:%s", pTask->id.idStr, tstrerror(code));
×
394
    }
395
  }
396
}
14,257✔
397

398
int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
103✔
399
  int32_t vgId = pTask->pMeta->vgId;
103✔
400
  int32_t code = 0;
103✔
401
  bool    existed = false;
103✔
402

403
  streamMutexLock(&pTask->lock);
103✔
404

405
  int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
103✔
406
  for (int i = 0; i < num; ++i) {
103✔
407
    SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
69✔
408
    if (p == NULL) {
69!
409
      continue;
×
410
    }
411

412
    if (p->nodeId == nodeId) {
69!
413
      existed = true;
69✔
414
      break;
69✔
415
    }
416
  }
417

418
  if (!existed) {
103✔
419
    SDownstreamTaskEpset t = {.nodeId = nodeId};
34✔
420

421
    void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t);
34✔
422
    if (p == NULL) {
34!
423
      code = terrno;
×
424
      stError("s-task:%s vgId:%d failed to update epset, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
425
    } else {
426
      stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr,
34!
427
             vgId, t.nodeId, (num + 1));
428
    }
429
  }
430

431
  streamMutexUnlock(&pTask->lock);
103✔
432
  return code;
103✔
433
}
434

435
void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
7,203✔
436
  taosArrayClear(pInfo->pList);
7,203✔
437

438
  if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
7,193✔
439
    pInfo->notReadyTasks = 1;
903✔
440
  } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
6,290!
441
    pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
6,290✔
NEW
442
  } else if (pOutputInfo->type == TASK_OUTPUT__VTABLE_MAP) {
×
NEW
443
    pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->vtableMapDispatcher.taskInfos);
×
444
  }
445

446
  pInfo->startTs = startTs;
7,202✔
447
  pInfo->timeoutStartTs = startTs;
7,202✔
448
  pInfo->stopCheckProcess = 0;
7,202✔
449
}
7,202✔
450

451
void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo) {
46,657✔
452
  if (pStatusInfo == NULL) {
46,657!
453
    return;
×
454
  }
455

456
  *pStatusInfo = NULL;
46,657✔
457
  for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
149,681✔
458
    SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
103,024✔
459
    if (p == NULL) {
103,024!
460
      continue;
×
461
    }
462

463
    if (p->taskId == taskId) {
103,024✔
464
      *pStatusInfo = p;
26,157✔
465
    }
466
  }
467
}
468

469
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
23,295✔
470
                                  int32_t* pNotReady, const char* id) {
471
  SDownstreamStatusInfo* p = NULL;
23,295✔
472

473
  streamMutexLock(&pInfo->checkInfoLock);
23,295✔
474
  findCheckRspStatus(pInfo, taskId, &p);
23,297✔
475
  if (p != NULL) {
23,297✔
476
    if (reqId != p->reqId) {
23,295!
477
      stError("s-task:%sQID:0x%" PRIx64 " expected:0x%" PRIx64
×
478
              " expired check-rsp recv from downstream task:0x%x, discarded",
479
              id, reqId, p->reqId, taskId);
480
      streamMutexUnlock(&pInfo->checkInfoLock);
×
481
      return TSDB_CODE_FAILED;
×
482
    }
483

484
    // subtract one not-ready-task, since it is ready now
485
    if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
23,295!
486
      *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
20,388✔
487
    } else {
488
      *pNotReady = pInfo->notReadyTasks;
2,907✔
489
    }
490

491
    p->status = status;
23,295✔
492
    p->rspTs = rspTs;
23,295✔
493

494
    streamMutexUnlock(&pInfo->checkInfoLock);
23,295✔
495
    return TSDB_CODE_SUCCESS;
23,295✔
496
  }
497

498
  streamMutexUnlock(&pInfo->checkInfoLock);
2✔
499
  stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, QID:%" PRIx64 " discarded", id, taskId,
2!
500
          reqId);
501
  return TSDB_CODE_FAILED;
2✔
502
}
503

504
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
7,202✔
505
  if (pInfo->inCheckProcess == 0) {
7,202✔
506
    pInfo->inCheckProcess = 1;
7,201✔
507
  } else {
508
    stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
1!
509
            pInfo->startTs);
510
    pInfo->stopCheckProcess = 0;  // disable auto stop of check process
×
511
    return TSDB_CODE_FAILED;
×
512
  }
513

514
  stDebug("s-task:%s set the in check-rsp flag", id);
7,201✔
515
  return TSDB_CODE_SUCCESS;
7,198✔
516
}
517

518
void streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) {
6,921✔
519
  if (lock) {
6,921✔
520
    streamMutexLock(&pInfo->checkInfoLock);
4,307✔
521
  }
522

523
  if (pInfo->inCheckProcess) {
6,921!
524
    int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
13,842!
525
    stDebug("s-task:%s clear the in check-rsp flag, set the check-rsp done, elapsed time:%" PRId64 " ms", id, el);
6,921✔
526

527
    pInfo->startTs = 0;
6,921✔
528
    pInfo->timeoutStartTs = 0;
6,921✔
529
    pInfo->notReadyTasks = 0;
6,921✔
530
    pInfo->inCheckProcess = 0;
6,921✔
531
    pInfo->stopCheckProcess = 0;
6,921✔
532

533
    pInfo->notReadyRetryCount = 0;
6,921✔
534
    pInfo->timeoutRetryCount = 0;
6,921✔
535

536
    taosArrayClear(pInfo->pList);
6,921✔
537
  } else {
538
    stDebug("s-task:%s already not in check-rsp procedure", id);
×
539
  }
540

541
  if (lock) {
6,921✔
542
    streamMutexUnlock(&pInfo->checkInfoLock);
4,307✔
543
  }
544
}
6,921✔
545

546
// todo: retry until success
547
void streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) {
20,498✔
548
  SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
20,498✔
549
  streamMutexLock(&pInfo->checkInfoLock);
20,498✔
550

551
  SDownstreamStatusInfo* p = NULL;
20,498✔
552
  findCheckRspStatus(pInfo, taskId, &p);
20,498✔
553
  if (p != NULL) {
20,498!
554
    stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
×
555
    streamMutexUnlock(&pInfo->checkInfoLock);
×
556
    return;
×
557
  }
558

559
  void* px = taosArrayPush(pInfo->pList, &info);
40,995✔
560
  if (px == NULL) {
561
    // todo: retry
562
  }
563

564
  streamMutexUnlock(&pInfo->checkInfoLock);
20,497✔
565
}
566

567
int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
2,862✔
568
  const char* id = pTask->id.idStr;
2,862✔
569
  int32_t     code = 0;
2,862✔
570

571
  SStreamTaskCheckReq req = {
2,862✔
572
      .streamId = pTask->id.streamId,
2,862✔
573
      .upstreamTaskId = pTask->id.taskId,
2,862✔
574
      .upstreamNodeId = pTask->info.nodeId,
2,862✔
575
      .childId = pTask->info.selfChildId,
2,862✔
576
      .stage = pTask->pMeta->stage,
2,862✔
577
  };
578

579
  // update the reqId for the new check msg
580
  p->reqId = tGenIdPI64();
2,862✔
581

582
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
2,862✔
583
  if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
2,862✔
584
    STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
846✔
585
    setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId);
846✔
586

587
    stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) QID:0x%" PRIx64, id,
846✔
588
            pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
589

590
    code = streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
846✔
591
  } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
2,016!
592
    SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
2,016✔
593
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
2,016✔
594

595
    for (int32_t i = 0; i < numOfVgs; i++) {
3,894!
596
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
3,894✔
597
      if (pVgInfo == NULL) {
3,894!
598
        continue;
×
599
      }
600

601
      if (p->taskId == pVgInfo->taskId) {
3,894✔
602
        setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
2,016✔
603

604
        stDebug("s-task:%s (vgId:%d) stage:%" PRId64
2,016✔
605
                " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d QID:0x%" PRIx64,
606
                id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId);
607
        code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
2,016✔
608
        break;
2,016✔
609
      }
610
    }
NEW
611
  } else if (pOutputInfo->type == TASK_OUTPUT__VTABLE_MAP) {
×
NEW
612
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
NEW
613
    int32_t numTasks = taosArrayGetSize(pTaskInfos);
×
614

NEW
615
    for (int32_t i = 0; i < numTasks; ++i) {
×
NEW
616
      STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
NEW
617
      if (pAddr == NULL) {
×
NEW
618
        continue;
×
619
      }
620

NEW
621
      if (p->taskId == pAddr->taskId) {
×
NEW
622
        setCheckDownstreamReqInfo(&req, p->reqId, pAddr->taskId, pAddr->nodeId);
×
623

NEW
624
        stDebug("s-task:%s (vgId:%d) stage:%" PRId64
×
625
                " re-send check vtable downstream task:0x%x(vgId:%d), QID:0x%" PRIx64,
626
                id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, p->reqId);
NEW
627
        code = streamSendCheckMsg(pTask, &req, pAddr->nodeId, &pAddr->epSet);
×
NEW
628
        break;
×
629
      }
630
    }
631
  }
632

633
  if (code) {
2,862!
634
    stError("s-task:%s failed to send check msg to downstream, code:%s", pTask->id.idStr, tstrerror(code));
×
635
  }
636
  return code;
2,862✔
637
}
638

639
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
2,420✔
640
                       int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
641
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
7,442✔
642
    SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
5,022✔
643
    if (p == NULL) {
5,022!
644
      continue;
×
645
    }
646

647
    if (p->status == TASK_DOWNSTREAM_READY) {
5,022✔
648
      (*numOfReady) += 1;
1,038✔
649
    } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
3,984✔
650
      stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
27✔
651
              p->taskId);
652
      (*numOfFault) += 1;
27✔
653
    } else {                                 // TASK_DOWNSTREAM_NOT_READY
654
      if (p->rspTs == 0) {                   // not response yet
3,957✔
655
        if (el >= CHECK_NOT_RSP_DURATION) {  // not receive info for 10 sec.
1,096✔
656
          void* px = taosArrayPush(pTimeoutList, &p->taskId);
1✔
657
          if (px == NULL) {
1!
658
            stError("s-task:%s failed to record time out task:0x%x", id, p->taskId);
×
659
          }
660
        } else {                // el < CHECK_NOT_RSP_DURATION
661
          (*numOfNotRsp) += 1;  // do nothing and continue waiting for their rsp
1,095✔
662
        }
663
      } else {
664
        void* px = taosArrayPush(pNotReadyList, &p->taskId);
2,861✔
665
        if (px == NULL) {
2,861!
666
          stError("s-task:%s failed to record not ready task:0x%x", id, p->taskId);
×
667
        }
668
      }
669
    }
670
  }
671
}
2,420✔
672

673
void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId) {
23,360✔
674
  pReq->reqId = reqId;
23,360✔
675
  pReq->downstreamTaskId = dstTaskId;
23,360✔
676
  pReq->downstreamNodeId = dstNodeId;
23,360✔
677
}
23,360✔
678

679
void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
1✔
680
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
1✔
681
  const char*     id = pTask->id.idStr;
1✔
682
  int32_t         vgId = pTask->pMeta->vgId;
1✔
683
  int32_t         numOfTimeout = taosArrayGetSize(pTimeoutList);
1✔
684
  int32_t         code = 0;
1✔
685

686
  pInfo->timeoutStartTs = taosGetTimestampMs();
1✔
687
  for (int32_t i = 0; i < numOfTimeout; ++i) {
2✔
688
    int32_t* px = taosArrayGet(pTimeoutList, i);
1✔
689
    if (px == NULL) {
1!
690
      continue;
×
691
    }
692

693
    int32_t                taskId = *px;
1✔
694
    SDownstreamStatusInfo* p = NULL;
1✔
695
    findCheckRspStatus(pInfo, taskId, &p);
1✔
696

697
    if (p != NULL) {
1!
698
      if (p->status != -1 || p->rspTs != 0) {
1!
699
        stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, id, i, p->status, p->rspTs);
×
700
        continue;
×
701
      }
702
      code = doSendCheckMsg(pTask, p);
1✔
703
    }
704
  }
705

706
  pInfo->timeoutRetryCount += 1;
1✔
707

708
  // timeout more than 600 sec, add into node update list
709
  if (pInfo->timeoutRetryCount > 10) {
1!
710
    pInfo->timeoutRetryCount = 0;
×
711

712
    for (int32_t i = 0; i < numOfTimeout; ++i) {
×
713
      int32_t* pTaskId = taosArrayGet(pTimeoutList, i);
×
714
      if (pTaskId == NULL) {
×
715
        continue;
×
716
      }
717

718
      SDownstreamStatusInfo* p = NULL;
×
719
      findCheckRspStatus(pInfo, *pTaskId, &p);
×
720
      if (p != NULL) {
×
721
        code = streamTaskAddIntoNodeUpdateList(pTask, p->vgId);
×
722
        stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 600sec, add into nodeUpdate list",
×
723
                id, vgId, p->taskId, p->vgId);
724
      }
725
    }
726

727
    stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
×
728
  } else {
729
    stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
1!
730
            vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs);
731
  }
732
}
1✔
733

734
void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
1,730✔
735
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
1,730✔
736
  const char*     id = pTask->id.idStr;
1,730✔
737
  int32_t         vgId = pTask->pMeta->vgId;
1,730✔
738
  int32_t         numOfNotReady = taosArrayGetSize(pNotReadyList);
1,730✔
739

740
  // reset the info, and send the check msg to failure downstream again
741
  for (int32_t i = 0; i < numOfNotReady; ++i) {
4,591✔
742
    int32_t* pTaskId = taosArrayGet(pNotReadyList, i);
2,861✔
743
    if (pTaskId == NULL) {
2,861!
744
      continue;
×
745
    }
746

747
    SDownstreamStatusInfo* p = NULL;
2,861✔
748
    findCheckRspStatus(pInfo, *pTaskId, &p);
2,861✔
749
    if (p != NULL) {
2,861!
750
      p->rspTs = 0;
2,861✔
751
      p->status = -1;
2,861✔
752
      int32_t code = doSendCheckMsg(pTask, p);
2,861✔
753
    }
754
  }
755

756
  pInfo->notReadyRetryCount += 1;
1,730✔
757
  stDebug("s-task:%s vgId:%d %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
1,730✔
758
          vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
759
}
1,730✔
760

761
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
762
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
763
// of restart in timer thread will result in a deadlock.
764
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
1✔
765
  return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK, false);
1✔
766
}
767

768
static void doCleanup(SStreamTask* pTask, SArray* pNotReadyList, SArray* pTimeoutList, void* param) {
9,316✔
769
  streamMetaReleaseTask(pTask->pMeta, pTask);
9,316✔
770

771
  taosArrayDestroy(pNotReadyList);
9,316✔
772
  taosArrayDestroy(pTimeoutList);
9,316✔
773
  streamTaskFreeRefId(param);
9,316✔
774
}
9,316✔
775

776
// this function is executed in timer thread
777
void rspMonitorFn(void* param, void* tmrId) {
9,316✔
778
  int32_t         numOfReady = 0;
9,316✔
779
  int32_t         numOfFault = 0;
9,316✔
780
  int32_t         numOfNotRsp = 0;
9,316✔
781
  int32_t         numOfNotReady = 0;
9,316✔
782
  int32_t         numOfTimeout = 0;
9,316✔
783
  int64_t         taskRefId = *(int64_t*)param;
9,316✔
784
  int64_t         now = taosGetTimestampMs();
9,316✔
785
  SArray*         pNotReadyList = NULL;
9,316✔
786
  SArray*         pTimeoutList = NULL;
9,316✔
787
  SStreamMeta*    pMeta = NULL;
9,316✔
788
  STaskCheckInfo* pInfo = NULL;
9,316✔
789
  int32_t         vgId = -1;
9,316✔
790
  int64_t         timeoutDuration = 0;
9,316✔
791
  const char*     id = NULL;
9,316✔
792
  int32_t         total = 0;
9,316✔
793

794
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
9,316✔
795
  if (pTask == NULL) {
9,316!
UNCOV
796
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
×
UNCOV
797
    streamTaskFreeRefId(param);
×
798
    return;
6,921✔
799
  }
800

801
  pMeta = pTask->pMeta;
9,316✔
802
  pInfo = &pTask->taskCheckInfo;
9,316✔
803
  vgId = pTask->pMeta->vgId;
9,316✔
804
  timeoutDuration = now - pInfo->timeoutStartTs;
9,316✔
805
  id = pTask->id.idStr;
9,316✔
806
  total = (int32_t) taosArrayGetSize(pInfo->pList);
9,316✔
807

808
  stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
9,316✔
809

810
  streamMutexLock(&pTask->lock);
9,316✔
811
  SStreamTaskState state = streamTaskGetStatus(pTask);
9,316✔
812
  streamMutexUnlock(&pTask->lock);
9,316✔
813

814
  if (state.state == TASK_STATUS__STOP) {
9,316✔
815
    stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
1!
816
    streamTaskCompleteCheckRsp(pInfo, true, id);
1✔
817

818
    // not record the failure of the current task if try to close current vnode
819
    // otherwise, the put of message operation may incur invalid read of message queue.
820
    if (!pMeta->closeFlag) {
1!
821
      int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
1✔
822
      if (code) {
1!
823
        stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
×
824
      }
825
    }
826

827
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
1✔
828
    return;
1✔
829
  }
830

831
  if (state.state == TASK_STATUS__DROPPING || state.state == TASK_STATUS__READY) {
9,315✔
832
    stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
4,306✔
833

834
    streamTaskCompleteCheckRsp(pInfo, true, id);
4,306✔
835
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
4,306✔
836
    return;
4,306✔
837
  }
838

839
  streamMutexLock(&pInfo->checkInfoLock);
5,009✔
840
  if (pInfo->notReadyTasks == 0) {
5,009✔
841
    stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr", id, state.name, vgId);
2,589✔
842

843
    streamTaskCompleteCheckRsp(pInfo, false, id);
2,589✔
844
    streamMutexUnlock(&pInfo->checkInfoLock);
2,589✔
845
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
2,589✔
846
    return;
2,589✔
847
  }
848

849
  pNotReadyList = taosArrayInit(4, sizeof(int64_t));
2,420✔
850
  pTimeoutList = taosArrayInit(4, sizeof(int64_t));
2,420✔
851

852
  if (state.state == TASK_STATUS__UNINIT) {
2,420!
853
    getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
2,420✔
854

855
    numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
2,420✔
856
    numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
2,420✔
857

858
    // fault tasks detected, not try anymore
859
    bool jumpOut = false;
2,420✔
860
    if ((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) != total) {
2,420!
861
      stError(
×
862
          "s-task:%s vgId:%d internal error in handling the check downstream procedure, rsp number is inconsistent, "
863
          "stop rspMonitor tmr, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
864
          id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
865
      jumpOut = true;
×
866
    }
867

868
    if (numOfFault > 0) {
2,420✔
869
      stDebug(
25✔
870
          "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
871
          "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
872
          id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
873
      jumpOut = true;
25✔
874
    }
875

876
    if (jumpOut) {
2,420✔
877
      streamTaskCompleteCheckRsp(pInfo, false, id);
25✔
878
      streamMutexUnlock(&pInfo->checkInfoLock);
25✔
879
      doCleanup(pTask, pNotReadyList, pTimeoutList, param);
25✔
880
      return;
25✔
881
    }
882
  } else {  // unexpected status
883
    stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, state.name);
×
884
  }
885

886
  // checking of downstream tasks has been stopped by other threads
887
  if (pInfo->stopCheckProcess == 1) {
2,395!
888
    stDebug(
×
889
        "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, "
890
        "notReady:%d, fault:%d, timeout:%d, ready:%d",
891
        id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
892

893
    streamTaskCompleteCheckRsp(pInfo, false, id);
×
894
    streamMutexUnlock(&pInfo->checkInfoLock);
×
895

896
    int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
×
897
    if (code) {
×
898
      stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
×
899
    }
900

901
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
902
    return;
×
903
  }
904

905
  if (numOfNotReady > 0) {  // check to make sure not in recheck timer
2,395✔
906
    handleNotReadyDownstreamTask(pTask, pNotReadyList);
1,730✔
907
  }
908

909
  if (numOfTimeout > 0) {
2,395✔
910
    handleTimeoutDownstreamTasks(pTask, pTimeoutList);
1✔
911
  }
912

913
  streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, param, streamTimer, &pInfo->checkRspTmr, vgId,
2,395✔
914
                 "check-status-monitor");
915
  streamMutexUnlock(&pInfo->checkInfoLock);
2,395✔
916

917
  stDebug(
2,395✔
918
      "s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, "
919
      "ready:%d",
920
      id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
921
  doCleanup(pTask, pNotReadyList, pTimeoutList, NULL);
2,395✔
922
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc