• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4103

17 May 2025 02:18AM UTC coverage: 63.264% (+0.4%) from 62.905%
#4103

push

travis-ci

web-flow
Merge pull request #31110 from taosdata/3.0

merge 3.0

158149 of 318142 branches covered (49.71%)

Branch coverage included in aggregate %.

3 of 5 new or added lines in 1 file covered. (60.0%)

1725 existing lines in 138 files now uncovered.

243642 of 316962 relevant lines covered (76.87%)

16346281.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.07
/source/libs/stream/src/streamCheckStatus.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cos.h"
17
#include "rsync.h"
18
#include "streamBackendRocksdb.h"
19
#include "streamInt.h"
20

21
#define CHECK_NOT_RSP_DURATION 60 * 1000  // 60 sec
22

23
static void    processDownstreamReadyRsp(SStreamTask* pTask, bool lock);
24
static void    rspMonitorFn(void* param, void* tmrId);
25
static void    streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
26
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
27
static void    streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
28
static void    streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id);
29
static int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
30
static void    handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList);
31
static void    handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList);
32
static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs,
33
                                         int64_t reqId, int32_t* pNotReady, const char* id);
34
static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId);
35
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
36
                              int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
37
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId);
38
static void    findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo);
39

40
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
18,864✔
41
                              int64_t* oldStage) {
42
  SStreamUpstreamEpInfo* pInfo = NULL;
18,864✔
43
  streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo);
18,864✔
44
  if (pInfo == NULL) {
18,847!
45
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
46
  }
47

48
  *oldStage = pInfo->stage;
18,847✔
49
  const char* id = pTask->id.idStr;
18,847✔
50
  if (stage == -1) {
18,847!
51
    stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
×
52
            upstreamTaskId, vgId, stage);
53
    return 0;
×
54
  }
55

56
  if (pInfo->stage == -1) {
18,847✔
57
    pInfo->stage = stage;
16,418✔
58
    stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id,
16,418✔
59
            upstreamTaskId, vgId, stage);
60
  }
61

62
  if (pInfo->stage < stage) {
18,847✔
63
    stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
7!
64
            ", prev:%" PRId64,
65
            id, upstreamTaskId, vgId, stage, pInfo->stage);
66
    // record the checkpoint failure id and sent to mnode
67
    streamTaskSetCheckpointFailed(pTask);
7✔
68
  }
69

70
  if (pInfo->stage != stage) {
18,841✔
71
    return TASK_UPSTREAM_NEW_STAGE;
7✔
72
  } else if (pTask->status.downstreamReady != 1) {
18,834✔
73
    stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
2,444✔
74
    return TASK_DOWNSTREAM_NOT_READY;
2,444✔
75
  } else {
76
    return TASK_DOWNSTREAM_READY;
16,390✔
77
  }
78
}
79

80
// check status
81
void streamTaskSendCheckMsg(SStreamTask* pTask) {
12,071✔
82
  SDataRange*  pRange = &pTask->dataRange;
12,071✔
83
  STimeWindow* pWindow = &pRange->window;
12,071✔
84
  const char*  idstr = pTask->id.idStr;
12,071✔
85
  int32_t      code = 0;
12,071✔
86

87
  SStreamTaskCheckReq req = {
12,071✔
88
      .streamId = pTask->id.streamId,
12,071✔
89
      .upstreamTaskId = pTask->id.taskId,
12,071✔
90
      .upstreamNodeId = pTask->info.nodeId,
12,071✔
91
      .childId = pTask->info.selfChildId,
12,071✔
92
      .stage = pTask->pMeta->stage,
12,071✔
93
  };
94

95
  // serialize streamProcessScanHistoryFinishRsp
96
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
12,071✔
97
    streamTaskStartMonitorCheckRsp(pTask);
844✔
98

99
    STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
844✔
100

101
    setCheckDownstreamReqInfo(&req, tGenIdPI64(), pDispatch->taskId, pDispatch->nodeId);
844✔
102
    streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr);
844✔
103

104
    stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
844✔
105
            " window:%" PRId64 "-%" PRId64 " QID:0x%" PRIx64,
106
            idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
107
            pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
108

109
    code = streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
844✔
110
                              &pTask->outputInfo.fixedDispatcher.epSet);
111

112
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
11,227✔
113
    streamTaskStartMonitorCheckRsp(pTask);
5,234✔
114

115
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
5,234✔
116

117
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
5,234✔
118
    stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, idstr,
5,234✔
119
            numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
120

121
    for (int32_t i = 0; i < numOfVgs; i++) {
21,035✔
122
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
15,801✔
123
      if (pVgInfo == NULL) {
15,801!
124
        continue;
×
125
      }
126

127
      setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId);
15,801✔
128
      streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
15,798✔
129

130
      stDebug("s-task:%s (vgId:%d) stage:%" PRId64
15,798✔
131
              " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, QID:0x%" PRIx64,
132
              idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
133
      code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
15,798✔
134
    }
135
  } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
5,993!
136
    streamTaskStartMonitorCheckRsp(pTask);
×
137

138
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
139
    int32_t numTasks = taosArrayGetSize(pTaskInfos);
×
140
    stDebug("s-task:%s check %d vtable downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
×
141
            idstr, numTasks, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
142

143
    for (int32_t i = 0; i < numTasks; ++i) {
×
144
      STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
145
      if (pAddr == NULL) {
×
146
        continue;
×
147
      }
148

149
      setCheckDownstreamReqInfo(&req, tGenIdPI64(), pAddr->taskId, pAddr->nodeId);
×
150
      streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pAddr->taskId, pAddr->nodeId, idstr);
×
151

152
      stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check vtable downstream task:0x%x (vgId:%d), QID:0x%" PRIx64,
×
153
              idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
154
      code = streamSendCheckMsg(pTask, &req, pAddr->nodeId, &pAddr->epSet);
×
155
      if (code != TSDB_CODE_SUCCESS) {
×
156
        stError("s-task:%s failed to send check msg to vtable downstream task:0x%x (vgId:%d), code:%s", idstr,
×
157
                req.downstreamTaskId, req.downstreamNodeId, tstrerror(code));
158
      }
159
    }
160
  } else {  // for sink task, set it ready directly.
161
//    streamTaskSetConsenChkptIdRecv(pTask, 0, taosGetTimestampMs());
162
//
163
    stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
5,993✔
164
    streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
5,993✔
165
    processDownstreamReadyRsp(pTask, false);
5,997✔
166
  }
167

168
  if (code) {
12,074!
169
    stError("s-task:%s failed to send check msg to downstream, code:%s", idstr, tstrerror(code));
×
170
  }
171
}
12,074✔
172

173
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) {
19,083✔
174
  int32_t taskId = pReq->downstreamTaskId;
19,083✔
175

176
  *pRsp = (SStreamTaskCheckRsp){
19,083✔
177
      .reqId = pReq->reqId,
19,083✔
178
      .streamId = pReq->streamId,
19,083✔
179
      .childId = pReq->childId,
19,083✔
180
      .downstreamNodeId = pReq->downstreamNodeId,
19,083✔
181
      .downstreamTaskId = pReq->downstreamTaskId,
19,083✔
182
      .upstreamNodeId = pReq->upstreamNodeId,
19,083✔
183
      .upstreamTaskId = pReq->upstreamTaskId,
19,083✔
184
  };
185

186
  // only the leader node handle the check request
187
  if (pMeta->role == NODE_ROLE_FOLLOWER) {
19,083!
UNCOV
188
    stError(
×
189
        "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
190
        taskId, pReq->upstreamTaskId, pReq->upstreamNodeId, pMeta->vgId);
UNCOV
191
    pRsp->status = TASK_DOWNSTREAM_NOT_LEADER;
×
192
  } else {
193
    SStreamTask* pTask = NULL;
19,083✔
194
    int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, taskId, &pTask);
19,083✔
195
    if (pTask != NULL) {
19,099✔
196
      pRsp->status =
18,841✔
197
          streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
18,871✔
198

199
      SStreamTaskState pState = streamTaskGetStatus(pTask);
18,841✔
200
      stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(QID:0x%" PRIx64
18,842✔
201
              ") task:0x%x (vgId:%d), check_status:%d",
202
              pTask->id.idStr, pState.name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
203
              pRsp->status);
204
      streamMetaReleaseTask(pMeta, pTask);
18,842✔
205
    } else {
206
      pRsp->status = TASK_DOWNSTREAM_NOT_READY;
228✔
207
      stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(QID:0x%" PRIx64
228✔
208
              ") from task:0x%x (vgId:%d), rsp check_status %d",
209
              pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status);
210
    }
211
  }
212
}
19,088✔
213

214
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
19,024✔
215
  int64_t         now = taosGetTimestampMs();
19,024✔
216
  const char*     id = pTask->id.idStr;
19,024✔
217
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
19,024✔
218
  int32_t         total = streamTaskGetNumOfDownstream(pTask);
19,024✔
219
  int32_t         left = -1;
19,014✔
220

221
  if (streamTaskShouldStop(pTask)) {
19,014!
222
    stDebug("s-task:%s should stop, do not do check downstream again", id);
×
223
    return TSDB_CODE_SUCCESS;
×
224
  }
225

226
  if (pTask->id.taskId != pRsp->upstreamTaskId) {
19,015!
227
    stError("s-task:%s invalid check downstream rsp, upstream task:0x%x discard", id, pRsp->upstreamTaskId);
×
228
    return TSDB_CODE_INVALID_MSG;
×
229
  }
230

231
  if (pRsp->status == TASK_DOWNSTREAM_READY) {
19,015✔
232
    int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
16,396✔
233
    if (code != TSDB_CODE_SUCCESS) {
16,404!
234
      return TSDB_CODE_SUCCESS;
×
235
    }
236

237
    if (left == 0) {
16,404✔
238
      processDownstreamReadyRsp(pTask, true);  // all downstream tasks are ready, set the complete check downstream flag
6,029✔
239
      streamTaskStopMonitorCheckRsp(pInfo, id);
6,029✔
240
    } else {
241
      stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
10,375✔
242
              pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
243
    }
244
  } else {  // not ready, wait for 100ms and retry
245
    int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
2,619✔
246
    if (code != TSDB_CODE_SUCCESS) {
2,620!
247
      return TSDB_CODE_SUCCESS;  // return success in any cases.
×
248
    }
249

250
    if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
2,620!
251
      if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
7!
252
        stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
7!
253
                ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
254
                id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
255
        code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
7✔
256
      } else {
UNCOV
257
        stError(
×
258
            "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
259
            "downstream again, nodeUpdate needed",
260
            id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
UNCOV
261
        code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
×
262
      }
263

264
      streamMetaAddFailedTaskSelf(pTask, now, true);
7✔
265
    } else {  // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
266
      stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
2,613✔
267
              pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
268
    }
269
  }
270

271
  return 0;
19,023✔
272
}
273

274
int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
19,086✔
275
                               SRpcHandleInfo* pRpcInfo, int32_t taskId) {
276
  SEncoder encoder;
277
  int32_t  code = 0;
19,086✔
278
  int32_t  len;
279

280
  tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
19,086!
281
  if (code < 0) {
19,094!
282
    stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
×
283
    return TSDB_CODE_INVALID_MSG;
×
284
  }
285

286
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
19,094✔
287
  if (buf == NULL) {
19,084!
288
    stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__,
×
289
            tstrerror(code));
290
    return terrno;
×
291
  }
292

293
  ((SMsgHead*)buf)->vgId = htonl(vgId);
19,084✔
294

295
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
19,084✔
296
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
19,084✔
297
  code = tEncodeStreamTaskCheckRsp(&encoder, pRsp);
19,085✔
298
  tEncoderClear(&encoder);
19,094✔
299

300
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
19,093✔
301
  tmsgSendRsp(&rspMsg);
19,093✔
302

303
  code = TMIN(code, 0);
19,104✔
304
  return code;
19,104✔
305
}
306

307
void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
6,077✔
308
  int32_t         vgId = pTask->pMeta->vgId;
6,077✔
309
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
6,077✔
310

311
  streamMutexLock(&pInfo->checkInfoLock);
6,077✔
312

313
  // drop procedure already started, not start check downstream now
314
  ETaskStatus s = streamTaskGetStatus(pTask).state;
6,078✔
315
  if (s == TASK_STATUS__DROPPING) {
6,078!
316
    stDebug("s-task:%s task not in uninit status, status:%s not start monitor check-rsp", pTask->id.idStr,
×
317
            streamTaskGetStatusStr(s));
318
    streamMutexUnlock(&pInfo->checkInfoLock);
×
319
    return;
×
320
  }
321

322
  int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
6,078✔
323
  if (code != TSDB_CODE_SUCCESS) {
6,076!
324
    streamMutexUnlock(&pInfo->checkInfoLock);
×
325
    return;
×
326
  }
327

328
  streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
6,078✔
329

330
  int64_t* pTaskRefId = NULL;
6,076✔
331
  code = streamTaskAllocRefId(pTask, &pTaskRefId);
6,076✔
332
  if (code == 0) {
6,078!
333
    streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTaskRefId, streamTimer, &pInfo->checkRspTmr, vgId,
6,078✔
334
                   "check-status-monitor");
335
  }
336

337
  streamMutexUnlock(&pInfo->checkInfoLock);
6,078✔
338
}
339

340
void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
12,476✔
341
  streamMutexLock(&pInfo->checkInfoLock);
12,476✔
342
  pInfo->stopCheckProcess = 1;
12,481✔
343
  streamMutexUnlock(&pInfo->checkInfoLock);
12,481✔
344

345
  stDebug("s-task:%s set stop check-rsp monitor flag", id);
12,480✔
346
}
12,480✔
347

348
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
47,455✔
349
  taosArrayDestroy(pInfo->pList);
47,455✔
350
  pInfo->pList = NULL;
47,460✔
351

352
  if (pInfo->checkRspTmr != NULL) {
47,460✔
353
    streamTmrStop(pInfo->checkRspTmr);
6,078✔
354
    pInfo->checkRspTmr = NULL;
6,078✔
355
  }
356
}
47,460✔
357

358
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
359
void processDownstreamReadyRsp(SStreamTask* pTask, bool lock) {
12,023✔
360
  EStreamTaskEvent event = (pTask->info.fillHistory != STREAM_HISTORY_TASK) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
12,023✔
361
  int32_t          code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
12,023✔
362
  if (code) {
12,026!
363
    stError("s-task:%s failed to set event succ, code:%s", pTask->id.idStr, tstrerror(code));
×
364
  }
365

366
  int64_t checkTs = pTask->execInfo.checkTs;
12,025✔
367
  int64_t readyTs = pTask->execInfo.readyTs;
12,025✔
368
  if (lock) {
12,025✔
369
    code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
6,029✔
370
  } else {
371
    code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
5,996✔
372
  }
373

374
  if (code) {
12,027!
375
    stError("s-task:%s failed to record the downstream task status, code:%s", pTask->id.idStr, tstrerror(code));
×
376
  }
377

378
  if (pTask->status.taskStatus == TASK_STATUS__HALT) {
12,024✔
379
    if (!HAS_RELATED_FILLHISTORY_TASK(pTask) || (pTask->info.fillHistory != 0)) {
79!
380
      stError("s-task:%s status:halt fillhistory:%d not handle the ready rsp", pTask->id.idStr,
×
381
              pTask->info.fillHistory);
382
    }
383

384
    // halt itself for count window stream task until the related fill history task completed.
385
    stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
79✔
386
            pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
387
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
79✔
388
    if (code != 0) {  // todo: handle error
79!
389
      stError("s-task:%s failed to handle halt event, code:%s", pTask->id.idStr, tstrerror(code));
×
390
    }
391
  }
392

393
  // start the related fill-history task, when current task is ready
394
  // not invoke in success callback due to the deadlock.
395
  // todo: let's retry
396
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
12,023✔
397
    stDebug("s-task:%s try to launch related task", pTask->id.idStr);
3,661✔
398
    code = streamLaunchFillHistoryTask(pTask, lock);
3,661✔
399
    if (code) {
3,663!
400
      stError("s-task:%s failed to launch related task, code:%s", pTask->id.idStr, tstrerror(code));
×
401
    }
402
  }
403
}
12,025✔
404

405
int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
71✔
406
  int32_t vgId = pTask->pMeta->vgId;
71✔
407
  int32_t code = 0;
71✔
408
  bool    existed = false;
71✔
409

410
  streamMutexLock(&pTask->lock);
71✔
411

412
  int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
71✔
413
  for (int i = 0; i < num; ++i) {
71✔
414
    SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
48✔
415
    if (p == NULL) {
48!
416
      continue;
×
417
    }
418

419
    if (p->nodeId == nodeId) {
48!
420
      existed = true;
48✔
421
      break;
48✔
422
    }
423
  }
424

425
  if (!existed) {
71✔
426
    SDownstreamTaskEpset t = {.nodeId = nodeId};
23✔
427

428
    void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t);
23✔
429
    if (p == NULL) {
23!
430
      code = terrno;
×
431
      stError("s-task:%s vgId:%d failed to update epset, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
432
    } else {
433
      stInfo("s-task:%s vgId:%d nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr,
23!
434
             vgId, t.nodeId, (num + 1));
435
    }
436
  }
437

438
  streamMutexUnlock(&pTask->lock);
71✔
439
  return code;
71✔
440
}
441

442
void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
6,077✔
443
  taosArrayClear(pInfo->pList);
6,077✔
444

445
  if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
6,076✔
446
    pInfo->notReadyTasks = 1;
844✔
447
  } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
5,232!
448
    pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
5,233✔
449
  } else if (pOutputInfo->type == TASK_OUTPUT__VTABLE_MAP) {
×
450
    pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->vtableMapDispatcher.taskInfos);
×
451
  }
452

453
  pInfo->startTs = startTs;
6,075✔
454
  pInfo->timeoutStartTs = startTs;
6,075✔
455
  pInfo->stopCheckProcess = 0;
6,075✔
456
}
6,075✔
457

458
void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo) {
38,136✔
459
  if (pStatusInfo == NULL) {
38,136!
460
    return;
×
461
  }
462

463
  *pStatusInfo = NULL;
38,136✔
464
  for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
119,909✔
465
    SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
81,772✔
466
    if (p == NULL) {
81,773!
467
      continue;
×
468
    }
469

470
    if (p->taskId == taskId) {
81,773✔
471
      *pStatusInfo = p;
21,471✔
472
    }
473
  }
474
}
475

476
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
19,012✔
477
                                  int32_t* pNotReady, const char* id) {
478
  SDownstreamStatusInfo* p = NULL;
19,012✔
479

480
  streamMutexLock(&pInfo->checkInfoLock);
19,012✔
481
  findCheckRspStatus(pInfo, taskId, &p);
19,026✔
482
  if (p != NULL) {
18,991!
483
    if (reqId != p->reqId) {
18,991!
484
      stError("s-task:%sQID:0x%" PRIx64 " expected:0x%" PRIx64
×
485
              " expired check-rsp recv from downstream task:0x%x, discarded",
486
              id, reqId, p->reqId, taskId);
487
      streamMutexUnlock(&pInfo->checkInfoLock);
×
488
      return TSDB_CODE_FAILED;
×
489
    }
490

491
    // subtract one not-ready-task, since it is ready now
492
    if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
18,991!
493
      *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
16,375✔
494
    } else {
495
      *pNotReady = pInfo->notReadyTasks;
2,616✔
496
    }
497

498
    p->status = status;
19,025✔
499
    p->rspTs = rspTs;
19,025✔
500

501
    streamMutexUnlock(&pInfo->checkInfoLock);
19,025✔
502
    return TSDB_CODE_SUCCESS;
19,023✔
503
  }
504

505
  streamMutexUnlock(&pInfo->checkInfoLock);
×
506
  stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, QID:0x%" PRIx64 " discarded", id, taskId,
×
507
          reqId);
508
  return TSDB_CODE_FAILED;
×
509
}
510

511
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
6,077✔
512
  if (pInfo->inCheckProcess == 0) {
6,077!
513
    pInfo->inCheckProcess = 1;
6,077✔
514
  } else {
515
    stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
×
516
            pInfo->startTs);
517
    pInfo->stopCheckProcess = 0;  // disable auto stop of check process
×
518
    return TSDB_CODE_FAILED;
×
519
  }
520

521
  stDebug("s-task:%s set the in check-rsp flag", id);
6,077✔
522
  return TSDB_CODE_SUCCESS;
6,077✔
523
}
524

525
void streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) {
5,869✔
526
  if (lock) {
5,869✔
527
    streamMutexLock(&pInfo->checkInfoLock);
3,915✔
528
  }
529

530
  if (pInfo->inCheckProcess) {
5,869!
531
    int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
11,738!
532
    stDebug("s-task:%s clear the in check-rsp flag, set the check-rsp done, elapsed time:%" PRId64 " ms", id, el);
5,869✔
533

534
    pInfo->startTs = 0;
5,869✔
535
    pInfo->timeoutStartTs = 0;
5,869✔
536
    pInfo->notReadyTasks = 0;
5,869✔
537
    pInfo->inCheckProcess = 0;
5,869✔
538
    pInfo->stopCheckProcess = 0;
5,869✔
539

540
    pInfo->notReadyRetryCount = 0;
5,869✔
541
    pInfo->timeoutRetryCount = 0;
5,869✔
542

543
    taosArrayClear(pInfo->pList);
5,869✔
544
  } else {
545
    stDebug("s-task:%s already not in check-rsp procedure", id);
×
546
  }
547

548
  if (lock) {
5,869✔
549
    streamMutexUnlock(&pInfo->checkInfoLock);
3,915✔
550
  }
551
}
5,869✔
552

553
// todo: retry until success
554
void streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) {
16,641✔
555
  SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
16,641✔
556
  streamMutexLock(&pInfo->checkInfoLock);
16,641✔
557

558
  SDownstreamStatusInfo* p = NULL;
16,644✔
559
  findCheckRspStatus(pInfo, taskId, &p);
16,644✔
560
  if (p != NULL) {
16,643!
561
    stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
×
562
    streamMutexUnlock(&pInfo->checkInfoLock);
×
563
    return;
×
564
  }
565

566
  void* px = taosArrayPush(pInfo->pList, &info);
33,284✔
567
  if (px == NULL) {
568
    // todo: retry
569
  }
570

571
  streamMutexUnlock(&pInfo->checkInfoLock);
16,641✔
572
}
573

574
int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
2,469✔
575
  const char* id = pTask->id.idStr;
2,469✔
576
  int32_t     code = 0;
2,469✔
577

578
  SStreamTaskCheckReq req = {
2,469✔
579
      .streamId = pTask->id.streamId,
2,469✔
580
      .upstreamTaskId = pTask->id.taskId,
2,469✔
581
      .upstreamNodeId = pTask->info.nodeId,
2,469✔
582
      .childId = pTask->info.selfChildId,
2,469✔
583
      .stage = pTask->pMeta->stage,
2,469✔
584
  };
585

586
  // update the reqId for the new check msg
587
  p->reqId = tGenIdPI64();
2,469✔
588

589
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
2,469✔
590
  if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
2,469✔
591
    STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
677✔
592
    setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId);
677✔
593

594
    stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) QID:0x%" PRIx64, id,
677✔
595
            pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
596

597
    code = streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
677✔
598
  } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
1,792!
599
    SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
1,792✔
600
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
1,792✔
601

602
    for (int32_t i = 0; i < numOfVgs; i++) {
3,235!
603
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
3,235✔
604
      if (pVgInfo == NULL) {
3,235!
605
        continue;
×
606
      }
607

608
      if (p->taskId == pVgInfo->taskId) {
3,235✔
609
        setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
1,792✔
610

611
        stDebug("s-task:%s (vgId:%d) stage:%" PRId64
1,792✔
612
                " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d QID:0x%" PRIx64,
613
                id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId);
614
        code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
1,792✔
615
        break;
1,792✔
616
      }
617
    }
618
  } else if (pOutputInfo->type == TASK_OUTPUT__VTABLE_MAP) {
×
619
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
620
    int32_t numTasks = taosArrayGetSize(pTaskInfos);
×
621

622
    for (int32_t i = 0; i < numTasks; ++i) {
×
623
      STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
624
      if (pAddr == NULL) {
×
625
        continue;
×
626
      }
627

628
      if (p->taskId == pAddr->taskId) {
×
629
        setCheckDownstreamReqInfo(&req, p->reqId, pAddr->taskId, pAddr->nodeId);
×
630

631
        stDebug("s-task:%s (vgId:%d) stage:%" PRId64
×
632
                " re-send check vtable downstream task:0x%x(vgId:%d), QID:0x%" PRIx64,
633
                id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, p->reqId);
634
        code = streamSendCheckMsg(pTask, &req, pAddr->nodeId, &pAddr->epSet);
×
635
        break;
×
636
      }
637
    }
638
  }
639

640
  if (code) {
2,469!
641
    stError("s-task:%s failed to send check msg to downstream, code:%s", pTask->id.idStr, tstrerror(code));
×
642
  }
643
  return code;
2,469✔
644
}
645

646
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
1,634✔
647
                       int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
648
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
4,545✔
649
    SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
2,911✔
650
    if (p == NULL) {
2,911!
651
      continue;
×
652
    }
653

654
    if (p->status == TASK_DOWNSTREAM_READY) {
2,911✔
655
      (*numOfReady) += 1;
275✔
656
    } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
2,636!
657
      stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
7!
658
              p->taskId);
659
      (*numOfFault) += 1;
7✔
660
    } else {                                 // TASK_DOWNSTREAM_NOT_READY
661
      if (p->rspTs == 0) {                   // not response yet
2,629✔
662
        if (el >= CHECK_NOT_RSP_DURATION) {  // not receive info for 10 sec.
160!
UNCOV
663
          void* px = taosArrayPush(pTimeoutList, &p->taskId);
×
UNCOV
664
          if (px == NULL) {
×
665
            stError("s-task:%s failed to record time out task:0x%x", id, p->taskId);
×
666
          }
667
        } else {                // el < CHECK_NOT_RSP_DURATION
668
          (*numOfNotRsp) += 1;  // do nothing and continue waiting for their rsp
160✔
669
        }
670
      } else {
671
        void* px = taosArrayPush(pNotReadyList, &p->taskId);
2,469✔
672
        if (px == NULL) {
2,469!
673
          stError("s-task:%s failed to record not ready task:0x%x", id, p->taskId);
×
674
        }
675
      }
676
    }
677
  }
678
}
1,634✔
679

680
void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId) {
19,112✔
681
  pReq->reqId = reqId;
19,112✔
682
  pReq->downstreamTaskId = dstTaskId;
19,112✔
683
  pReq->downstreamNodeId = dstNodeId;
19,112✔
684
}
19,112✔
685

UNCOV
686
void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
×
UNCOV
687
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
×
UNCOV
688
  const char*     id = pTask->id.idStr;
×
UNCOV
689
  int32_t         vgId = pTask->pMeta->vgId;
×
UNCOV
690
  int32_t         numOfTimeout = taosArrayGetSize(pTimeoutList);
×
UNCOV
691
  int32_t         code = 0;
×
692

UNCOV
693
  pInfo->timeoutStartTs = taosGetTimestampMs();
×
UNCOV
694
  for (int32_t i = 0; i < numOfTimeout; ++i) {
×
UNCOV
695
    int32_t* px = taosArrayGet(pTimeoutList, i);
×
UNCOV
696
    if (px == NULL) {
×
697
      continue;
×
698
    }
699

UNCOV
700
    int32_t                taskId = *px;
×
UNCOV
701
    SDownstreamStatusInfo* p = NULL;
×
UNCOV
702
    findCheckRspStatus(pInfo, taskId, &p);
×
703

UNCOV
704
    if (p != NULL) {
×
UNCOV
705
      if (p->status != -1 || p->rspTs != 0) {
×
706
        stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, id, i, p->status, p->rspTs);
×
707
        continue;
×
708
      }
UNCOV
709
      code = doSendCheckMsg(pTask, p);
×
710
    }
711
  }
712

UNCOV
713
  pInfo->timeoutRetryCount += 1;
×
714

715
  // timeout more than 600 sec, add into node update list
UNCOV
716
  if (pInfo->timeoutRetryCount > 10) {
×
717
    pInfo->timeoutRetryCount = 0;
×
718

719
    for (int32_t i = 0; i < numOfTimeout; ++i) {
×
720
      int32_t* pTaskId = taosArrayGet(pTimeoutList, i);
×
721
      if (pTaskId == NULL) {
×
722
        continue;
×
723
      }
724

725
      SDownstreamStatusInfo* p = NULL;
×
726
      findCheckRspStatus(pInfo, *pTaskId, &p);
×
727
      if (p != NULL) {
×
728
        code = streamTaskAddIntoNodeUpdateList(pTask, p->vgId);
×
729
        stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 600sec, add into nodeUpdate list",
×
730
                id, vgId, p->taskId, p->vgId);
731
      }
732
    }
733

734
    stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
×
735
  } else {
UNCOV
736
    stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
×
737
            vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs);
738
  }
UNCOV
739
}
×
740

741
void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
1,480✔
742
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
1,480✔
743
  const char*     id = pTask->id.idStr;
1,480✔
744
  int32_t         vgId = pTask->pMeta->vgId;
1,480✔
745
  int32_t         numOfNotReady = taosArrayGetSize(pNotReadyList);
1,480✔
746

747
  // reset the info, and send the check msg to failure downstream again
748
  for (int32_t i = 0; i < numOfNotReady; ++i) {
3,949✔
749
    int32_t* pTaskId = taosArrayGet(pNotReadyList, i);
2,469✔
750
    if (pTaskId == NULL) {
2,469!
751
      continue;
×
752
    }
753

754
    SDownstreamStatusInfo* p = NULL;
2,469✔
755
    findCheckRspStatus(pInfo, *pTaskId, &p);
2,469✔
756
    if (p != NULL) {
2,469!
757
      p->rspTs = 0;
2,469✔
758
      p->status = -1;
2,469✔
759
      int32_t code = doSendCheckMsg(pTask, p);
2,469✔
760
    }
761
  }
762

763
  pInfo->notReadyRetryCount += 1;
1,480✔
764
  stDebug("s-task:%s vgId:%d %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
1,480✔
765
          vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
766
}
1,480✔
767

768
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
769
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
770
// of restart in timer thread will result in a deadlock.
771
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
×
772
  return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK, false);
×
773
}
774

775
static void doCleanup(SStreamTask* pTask, SArray* pNotReadyList, SArray* pTimeoutList, void* param) {
7,498✔
776
  streamMetaReleaseTask(pTask->pMeta, pTask);
7,498✔
777

778
  taosArrayDestroy(pNotReadyList);
7,498✔
779
  taosArrayDestroy(pTimeoutList);
7,498✔
780
  streamTaskFreeRefId(param);
7,498✔
781
}
7,498✔
782

783
// this function is executed in timer thread
784
void rspMonitorFn(void* param, void* tmrId) {
7,505✔
785
  int32_t         numOfReady = 0;
7,505✔
786
  int32_t         numOfFault = 0;
7,505✔
787
  int32_t         numOfNotRsp = 0;
7,505✔
788
  int32_t         numOfNotReady = 0;
7,505✔
789
  int32_t         numOfTimeout = 0;
7,505✔
790
  int64_t         taskRefId = *(int64_t*)param;
7,505✔
791
  int64_t         now = taosGetTimestampMs();
7,505✔
792
  SArray*         pNotReadyList = NULL;
7,505✔
793
  SArray*         pTimeoutList = NULL;
7,505✔
794
  SStreamMeta*    pMeta = NULL;
7,505✔
795
  STaskCheckInfo* pInfo = NULL;
7,505✔
796
  int32_t         vgId = -1;
7,505✔
797
  int64_t         timeoutDuration = 0;
7,505✔
798
  const char*     id = NULL;
7,505✔
799
  int32_t         total = 0;
7,505✔
800

801
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
7,505✔
802
  if (pTask == NULL) {
7,505✔
803
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
7!
804
    streamTaskFreeRefId(param);
7✔
805
    return;
5,876✔
806
  }
807

808
  pMeta = pTask->pMeta;
7,498✔
809
  pInfo = &pTask->taskCheckInfo;
7,498✔
810
  vgId = pTask->pMeta->vgId;
7,498✔
811
  timeoutDuration = now - pInfo->timeoutStartTs;
7,498✔
812
  id = pTask->id.idStr;
7,498✔
813
  total = (int32_t) taosArrayGetSize(pInfo->pList);
7,498✔
814

815
  stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
7,498✔
816

817
  streamMutexLock(&pTask->lock);
7,498✔
818
  SStreamTaskState state = streamTaskGetStatus(pTask);
7,498✔
819
  streamMutexUnlock(&pTask->lock);
7,498✔
820

821
  if (state.state == TASK_STATUS__STOP) {
7,498!
822
    stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
×
823
    streamTaskCompleteCheckRsp(pInfo, true, id);
×
824

825
    // not record the failure of the current task if try to close current vnode
826
    // otherwise, the put of message operation may incur invalid read of message queue.
827
    if (!pMeta->closeFlag) {
×
828
      int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
×
829
      if (code) {
×
830
        stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
×
831
      }
832
    }
833

834
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
835
    return;
×
836
  }
837

838
  if (state.state == TASK_STATUS__DROPPING || state.state == TASK_STATUS__READY) {
7,498!
839
    stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
3,915✔
840

841
    streamTaskCompleteCheckRsp(pInfo, true, id);
3,915✔
842
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
3,915✔
843
    return;
3,915✔
844
  }
845

846
  streamMutexLock(&pInfo->checkInfoLock);
3,583✔
847
  if (pInfo->notReadyTasks == 0) {
3,583✔
848
    stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr", id, state.name, vgId);
1,949✔
849

850
    streamTaskCompleteCheckRsp(pInfo, false, id);
1,949✔
851
    streamMutexUnlock(&pInfo->checkInfoLock);
1,949✔
852
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
1,949✔
853
    return;
1,949✔
854
  }
855

856
  pNotReadyList = taosArrayInit(4, sizeof(int64_t));
1,634✔
857
  pTimeoutList = taosArrayInit(4, sizeof(int64_t));
1,634✔
858

859
  if (state.state == TASK_STATUS__UNINIT) {
1,634!
860
    getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
1,634✔
861

862
    numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
1,634✔
863
    numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
1,634✔
864

865
    // fault tasks detected, not try anymore
866
    bool jumpOut = false;
1,634✔
867
    if ((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) != total) {
1,634!
868
      stError(
×
869
          "s-task:%s vgId:%d internal error in handling the check downstream procedure, rsp number is inconsistent, "
870
          "stop rspMonitor tmr, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
871
          id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
872
      jumpOut = true;
×
873
    }
874

875
    if (numOfFault > 0) {
1,634✔
876
      stDebug(
5!
877
          "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
878
          "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
879
          id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
880
      jumpOut = true;
5✔
881
    }
882

883
    if (jumpOut) {
1,634✔
884
      streamTaskCompleteCheckRsp(pInfo, false, id);
5✔
885
      streamMutexUnlock(&pInfo->checkInfoLock);
5✔
886
      doCleanup(pTask, pNotReadyList, pTimeoutList, param);
5✔
887
      return;
5✔
888
    }
889
  } else {  // unexpected status
890
    stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, state.name);
×
891
  }
892

893
  // checking of downstream tasks has been stopped by other threads
894
  if (pInfo->stopCheckProcess == 1) {
1,629!
895
    stDebug(
×
896
        "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, "
897
        "notReady:%d, fault:%d, timeout:%d, ready:%d",
898
        id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
899

900
    streamTaskCompleteCheckRsp(pInfo, false, id);
×
901
    streamMutexUnlock(&pInfo->checkInfoLock);
×
902

903
    int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
×
904
    if (code) {
×
905
      stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
×
906
    }
907

908
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
909
    return;
×
910
  }
911

912
  if (numOfNotReady > 0) {  // check to make sure not in recheck timer
1,629✔
913
    handleNotReadyDownstreamTask(pTask, pNotReadyList);
1,480✔
914
  }
915

916
  if (numOfTimeout > 0) {
1,629!
UNCOV
917
    handleTimeoutDownstreamTasks(pTask, pTimeoutList);
×
918
  }
919

920
  streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, param, streamTimer, &pInfo->checkRspTmr, vgId,
1,629✔
921
                 "check-status-monitor");
922
  streamMutexUnlock(&pInfo->checkInfoLock);
1,629✔
923

924
  stDebug(
1,629✔
925
      "s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, "
926
      "ready:%d",
927
      id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
928
  doCleanup(pTask, pNotReadyList, pTimeoutList, NULL);
1,629✔
929
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc