• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3661

17 Mar 2025 05:39AM UTC coverage: 62.007% (-0.03%) from 62.039%
#3661

push

travis-ci

web-flow
tests: add tdb ut (#30093)

* fix: compile warnings

* tests: add tdb ut

* test(tdb): fix return code

* test: recover ut

* fix: minor changes

* fix: enable test

* fix: ut errors

---------

Co-authored-by: Minglei Jin <mljin@taosdata.com>

153829 of 317582 branches covered (48.44%)

Branch coverage included in aggregate %.

240310 of 318051 relevant lines covered (75.56%)

19602636.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.29
/source/libs/stream/src/streamCheckStatus.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cos.h"
17
#include "rsync.h"
18
#include "streamBackendRocksdb.h"
19
#include "streamInt.h"
20

21
#define CHECK_NOT_RSP_DURATION 60 * 1000  // 60 sec
22

23
static void    processDownstreamReadyRsp(SStreamTask* pTask);
24
static void    rspMonitorFn(void* param, void* tmrId);
25
static void    streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
26
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
27
static void    streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
28
static void    streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id);
29
static int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
30
static void    handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList);
31
static void    handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList);
32
static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs,
33
                                         int64_t reqId, int32_t* pNotReady, const char* id);
34
static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId);
35
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
36
                              int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
37
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId);
38
static void    findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo);
39

40
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
22,686✔
41
                              int64_t* oldStage) {
42
  SStreamUpstreamEpInfo* pInfo = NULL;
22,686✔
43
  streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo);
22,686✔
44
  if (pInfo == NULL) {
22,686!
45
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
46
  }
47

48
  *oldStage = pInfo->stage;
22,686✔
49
  const char* id = pTask->id.idStr;
22,686✔
50
  if (stage == -1) {
22,686!
51
    stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
×
52
            upstreamTaskId, vgId, stage);
53
    return 0;
×
54
  }
55

56
  if (pInfo->stage == -1) {
22,686✔
57
    pInfo->stage = stage;
20,235✔
58
    stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id,
20,235✔
59
            upstreamTaskId, vgId, stage);
60
  }
61

62
  if (pInfo->stage < stage) {
22,686✔
63
    stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
16!
64
            ", prev:%" PRId64,
65
            id, upstreamTaskId, vgId, stage, pInfo->stage);
66
    // record the checkpoint failure id and sent to mnode
67
    streamTaskSetCheckpointFailed(pTask);
16✔
68
  }
69

70
  if (pInfo->stage != stage) {
22,686✔
71
    return TASK_UPSTREAM_NEW_STAGE;
16✔
72
  } else if (pTask->status.downstreamReady != 1) {
22,670✔
73
    stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
2,445✔
74
    return TASK_DOWNSTREAM_NOT_READY;
2,445✔
75
  } else {
76
    return TASK_DOWNSTREAM_READY;
20,225✔
77
  }
78
}
79

80
// check status
81
void streamTaskSendCheckMsg(SStreamTask* pTask) {
14,208✔
82
  SDataRange*  pRange = &pTask->dataRange;
14,208✔
83
  STimeWindow* pWindow = &pRange->window;
14,208✔
84
  const char*  idstr = pTask->id.idStr;
14,208✔
85
  int32_t      code = 0;
14,208✔
86

87
  SStreamTaskCheckReq req = {
14,208✔
88
      .streamId = pTask->id.streamId,
14,208✔
89
      .upstreamTaskId = pTask->id.taskId,
14,208✔
90
      .upstreamNodeId = pTask->info.nodeId,
14,208✔
91
      .childId = pTask->info.selfChildId,
14,208✔
92
      .stage = pTask->pMeta->stage,
14,208✔
93
  };
94

95
  // serialize streamProcessScanHistoryFinishRsp
96
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
14,208✔
97
    streamTaskStartMonitorCheckRsp(pTask);
866✔
98

99
    STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
866✔
100

101
    setCheckDownstreamReqInfo(&req, tGenIdPI64(), pDispatch->taskId, pDispatch->nodeId);
866✔
102
    streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr);
866✔
103

104
    stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
866✔
105
            " window:%" PRId64 "-%" PRId64 " QID:0x%" PRIx64,
106
            idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
107
            pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
108

109
    code = streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
866✔
110
                              &pTask->outputInfo.fixedDispatcher.epSet);
111

112
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
13,342✔
113
    streamTaskStartMonitorCheckRsp(pTask);
6,283✔
114

115
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
6,283✔
116

117
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
6,283✔
118
    stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, idstr,
6,283✔
119
            numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
120

121
    for (int32_t i = 0; i < numOfVgs; i++) {
25,822✔
122
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
19,539✔
123
      if (pVgInfo == NULL) {
19,539!
124
        continue;
×
125
      }
126

127
      setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId);
19,539✔
128
      streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
19,539✔
129

130
      stDebug("s-task:%s (vgId:%d) stage:%" PRId64
19,539✔
131
              " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, QID:0x%" PRIx64,
132
              idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
133
      code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
19,539✔
134
    }
135
  } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
7,059!
136
    streamTaskStartMonitorCheckRsp(pTask);
×
137

138
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
139
    int32_t numTasks = taosArrayGetSize(pTaskInfos);
×
140
    stDebug("s-task:%s check %d vtable downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
×
141
            idstr, numTasks, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
142

143
    for (int32_t i = 0; i < numTasks; ++i) {
×
144
      STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
145
      if (pAddr == NULL) {
×
146
        continue;
×
147
      }
148

149
      setCheckDownstreamReqInfo(&req, tGenIdPI64(), pAddr->taskId, pAddr->nodeId);
×
150
      streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pAddr->taskId, pAddr->nodeId, idstr);
×
151

152
      stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check vtable downstream task:0x%x (vgId:%d), QID:0x%" PRIx64,
×
153
              idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
154
      code = streamSendCheckMsg(pTask, &req, pAddr->nodeId, &pAddr->epSet);
×
155
      if (code != TSDB_CODE_SUCCESS) {
×
156
        stError("s-task:%s failed to send check msg to vtable downstream task:0x%x (vgId:%d), code:%s", idstr,
×
157
                req.downstreamTaskId, req.downstreamNodeId, tstrerror(code));
158
      }
159
    }
160
  } else {  // for sink task, set it ready directly.
161
    stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
7,059✔
162
    streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
7,059✔
163
    processDownstreamReadyRsp(pTask);
7,059✔
164
  }
165

166
  if (code) {
14,207!
167
    stError("s-task:%s failed to send check msg to downstream, code:%s", idstr, tstrerror(code));
×
168
  }
169
}
14,207✔
170

171
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) {
22,709✔
172
  int32_t taskId = pReq->downstreamTaskId;
22,709✔
173

174
  *pRsp = (SStreamTaskCheckRsp){
22,709✔
175
      .reqId = pReq->reqId,
22,709✔
176
      .streamId = pReq->streamId,
22,709✔
177
      .childId = pReq->childId,
22,709✔
178
      .downstreamNodeId = pReq->downstreamNodeId,
22,709✔
179
      .downstreamTaskId = pReq->downstreamTaskId,
22,709✔
180
      .upstreamNodeId = pReq->upstreamNodeId,
22,709✔
181
      .upstreamTaskId = pReq->upstreamTaskId,
22,709✔
182
  };
183

184
  // only the leader node handle the check request
185
  if (pMeta->role == NODE_ROLE_FOLLOWER) {
22,709!
186
    stError(
×
187
        "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
188
        taskId, pReq->upstreamTaskId, pReq->upstreamNodeId, pMeta->vgId);
189
    pRsp->status = TASK_DOWNSTREAM_NOT_LEADER;
×
190
  } else {
191
    SStreamTask* pTask = NULL;
22,709✔
192
    int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, taskId, &pTask);
22,709✔
193
    if (pTask != NULL) {
22,709✔
194
      pRsp->status =
22,686✔
195
          streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
22,686✔
196

197
      SStreamTaskState pState = streamTaskGetStatus(pTask);
22,686✔
198
      stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(QID:0x%" PRIx64
22,686✔
199
              ") task:0x%x (vgId:%d), check_status:%d",
200
              pTask->id.idStr, pState.name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
201
              pRsp->status);
202
      streamMetaReleaseTask(pMeta, pTask);
22,686✔
203
    } else {
204
      pRsp->status = TASK_DOWNSTREAM_NOT_READY;
23✔
205
      stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(QID:0x%" PRIx64
23✔
206
              ") from task:0x%x (vgId:%d), rsp check_status %d",
207
              pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status);
208
    }
209
  }
210
}
22,709✔
211

212
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
22,675✔
213
  int64_t         now = taosGetTimestampMs();
22,675✔
214
  const char*     id = pTask->id.idStr;
22,675✔
215
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
22,675✔
216
  int32_t         total = streamTaskGetNumOfDownstream(pTask);
22,675✔
217
  int32_t         left = -1;
22,673✔
218

219
  if (streamTaskShouldStop(pTask)) {
22,673!
220
    stDebug("s-task:%s should stop, do not do check downstream again", id);
×
221
    return TSDB_CODE_SUCCESS;
×
222
  }
223

224
  if (pTask->id.taskId != pRsp->upstreamTaskId) {
22,673!
225
    stError("s-task:%s invalid check downstream rsp, upstream task:0x%x discard", id, pRsp->upstreamTaskId);
×
226
    return TSDB_CODE_INVALID_MSG;
×
227
  }
228

229
  if (pRsp->status == TASK_DOWNSTREAM_READY) {
22,673✔
230
    int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
20,213✔
231
    if (code != TSDB_CODE_SUCCESS) {
20,216!
232
      return TSDB_CODE_SUCCESS;
×
233
    }
234

235
    if (left == 0) {
20,216✔
236
      processDownstreamReadyRsp(pTask);  // all downstream tasks are ready, set the complete check downstream flag
7,084✔
237
      streamTaskStopMonitorCheckRsp(pInfo, id);
7,084✔
238
    } else {
239
      stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
13,132✔
240
              pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
241
    }
242
  } else {  // not ready, wait for 100ms and retry
243
    int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
2,460✔
244
    if (code != TSDB_CODE_SUCCESS) {
2,461✔
245
      return TSDB_CODE_SUCCESS;  // return success in any cases.
2✔
246
    }
247

248
    if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
2,459!
249
      if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
16!
250
        stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
16!
251
                ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
252
                id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
253
        code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
16✔
254
      } else {
255
        stError(
×
256
            "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
257
            "downstream again, nodeUpdate needed",
258
            id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
259
        code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
×
260
      }
261

262
      streamMetaAddFailedTaskSelf(pTask, now);
16✔
263
    } else {  // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
264
      stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
2,443✔
265
              pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
266
    }
267
  }
268

269
  return 0;
22,675✔
270
}
271

272
int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
22,709✔
273
                               SRpcHandleInfo* pRpcInfo, int32_t taskId) {
274
  SEncoder encoder;
275
  int32_t  code = 0;
22,709✔
276
  int32_t  len;
277

278
  tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
22,709!
279
  if (code < 0) {
22,709!
280
    stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
×
281
    return TSDB_CODE_INVALID_MSG;
×
282
  }
283

284
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
22,709✔
285
  if (buf == NULL) {
22,709!
286
    stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__,
×
287
            tstrerror(code));
288
    return terrno;
×
289
  }
290

291
  ((SMsgHead*)buf)->vgId = htonl(vgId);
22,709✔
292

293
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
22,709✔
294
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
22,709✔
295
  code = tEncodeStreamTaskCheckRsp(&encoder, pRsp);
22,709✔
296
  tEncoderClear(&encoder);
22,709✔
297

298
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
22,709✔
299
  tmsgSendRsp(&rspMsg);
22,709✔
300

301
  code = TMIN(code, 0);
22,709✔
302
  return code;
22,709✔
303
}
304

305
void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
7,149✔
306
  int32_t         vgId = pTask->pMeta->vgId;
7,149✔
307
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
7,149✔
308

309
  streamMutexLock(&pInfo->checkInfoLock);
7,149✔
310

311
  // drop procedure already started, not start check downstream now
312
  ETaskStatus s = streamTaskGetStatus(pTask).state;
7,149✔
313
  if (s == TASK_STATUS__DROPPING) {
7,149!
314
    stDebug("s-task:%s task not in uninit status, status:%s not start monitor check-rsp", pTask->id.idStr,
×
315
            streamTaskGetStatusStr(s));
316
    streamMutexUnlock(&pInfo->checkInfoLock);
×
317
    return;
×
318
  }
319

320
  int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
7,149✔
321
  if (code != TSDB_CODE_SUCCESS) {
7,149!
322
    streamMutexUnlock(&pInfo->checkInfoLock);
×
323
    return;
×
324
  }
325

326
  streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
7,149✔
327

328
  int64_t* pTaskRefId = NULL;
7,149✔
329
  code = streamTaskAllocRefId(pTask, &pTaskRefId);
7,149✔
330
  if (code == 0) {
7,149!
331
    streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTaskRefId, streamTimer, &pInfo->checkRspTmr, vgId,
7,149✔
332
                   "check-status-monitor");
333
  }
334

335
  streamMutexUnlock(&pInfo->checkInfoLock);
7,149✔
336
}
337

338
void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
14,191✔
339
  streamMutexLock(&pInfo->checkInfoLock);
14,191✔
340
  pInfo->stopCheckProcess = 1;
14,191✔
341
  streamMutexUnlock(&pInfo->checkInfoLock);
14,191✔
342

343
  stDebug("s-task:%s set stop check-rsp monitor flag", id);
14,191✔
344
}
14,191✔
345

346
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
62,577✔
347
  taosArrayDestroy(pInfo->pList);
62,577✔
348
  pInfo->pList = NULL;
62,582✔
349

350
  if (pInfo->checkRspTmr != NULL) {
62,582✔
351
    streamTmrStop(pInfo->checkRspTmr);
7,149✔
352
    pInfo->checkRspTmr = NULL;
7,149✔
353
  }
354
}
62,582✔
355

356
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
357
void processDownstreamReadyRsp(SStreamTask* pTask) {
14,143✔
358
  EStreamTaskEvent event = (pTask->info.fillHistory != STREAM_HISTORY_TASK) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
14,143✔
359
  int32_t          code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
14,143✔
360
  if (code) {
14,142✔
361
    stError("s-task:%s failed to set event succ, code:%s", pTask->id.idStr, tstrerror(code));
2!
362
  }
363

364
  int64_t checkTs = pTask->execInfo.checkTs;
14,142✔
365
  int64_t readyTs = pTask->execInfo.readyTs;
14,142✔
366
  code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
14,142✔
367
  if (code) {
14,141!
368
    stError("s-task:%s failed to record the downstream task status, code:%s", pTask->id.idStr, tstrerror(code));
×
369
  }
370

371
  if (pTask->status.taskStatus == TASK_STATUS__HALT) {
14,142✔
372
    if (!HAS_RELATED_FILLHISTORY_TASK(pTask) || (pTask->info.fillHistory != 0)) {
79!
373
      stError("s-task:%s status:halt fillhistory:%d not handle the ready rsp", pTask->id.idStr,
×
374
              pTask->info.fillHistory);
375
    }
376

377
    // halt itself for count window stream task until the related fill history task completed.
378
    stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
79✔
379
            pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
380
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
79✔
381
    if (code != 0) {  // todo: handle error
79!
382
      stError("s-task:%s failed to handle halt event, code:%s", pTask->id.idStr, tstrerror(code));
×
383
    }
384
  }
385

386
  // start the related fill-history task, when current task is ready
387
  // not invoke in success callback due to the deadlock.
388
  // todo: let's retry
389
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
14,141✔
390
    stDebug("s-task:%s try to launch related task", pTask->id.idStr);
4,801✔
391
    code = streamLaunchFillHistoryTask(pTask);
4,801✔
392
    if (code) {
4,802!
393
      stError("s-task:%s failed to launch related task, code:%s", pTask->id.idStr, tstrerror(code));
×
394
    }
395
  }
396
}
14,142✔
397

398
int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
69✔
399
  int32_t vgId = pTask->pMeta->vgId;
69✔
400
  int32_t code = 0;
69✔
401
  bool    existed = false;
69✔
402

403
  streamMutexLock(&pTask->lock);
69✔
404

405
  int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
69✔
406
  for (int i = 0; i < num; ++i) {
69✔
407
    SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
52✔
408
    if (p == NULL) {
52!
409
      continue;
×
410
    }
411

412
    if (p->nodeId == nodeId) {
52!
413
      existed = true;
52✔
414
      break;
52✔
415
    }
416
  }
417

418
  if (!existed) {
69✔
419
    SDownstreamTaskEpset t = {.nodeId = nodeId};
17✔
420

421
    void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t);
17✔
422
    if (p == NULL) {
17!
423
      code = terrno;
×
424
      stError("s-task:%s vgId:%d failed to update epset, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
425
    } else {
426
      stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr,
17!
427
             vgId, t.nodeId, (num + 1));
428
    }
429
  }
430

431
  streamMutexUnlock(&pTask->lock);
69✔
432
  return code;
69✔
433
}
434

435
void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
7,149✔
436
  taosArrayClear(pInfo->pList);
7,149✔
437

438
  if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
7,149✔
439
    pInfo->notReadyTasks = 1;
866✔
440
  } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
6,283!
441
    pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
6,283✔
442
  } else if (pOutputInfo->type == TASK_OUTPUT__VTABLE_MAP) {
×
443
    pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->vtableMapDispatcher.taskInfos);
×
444
  }
445

446
  pInfo->startTs = startTs;
7,149✔
447
  pInfo->timeoutStartTs = startTs;
7,149✔
448
  pInfo->stopCheckProcess = 0;
7,149✔
449
}
7,149✔
450

451
void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo) {
45,516✔
452
  if (pStatusInfo == NULL) {
45,516!
453
    return;
×
454
  }
455

456
  *pStatusInfo = NULL;
45,516✔
457
  for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
146,861✔
458
    SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
101,345✔
459
    if (p == NULL) {
101,345!
460
      continue;
×
461
    }
462

463
    if (p->taskId == taskId) {
101,345✔
464
      *pStatusInfo = p;
25,109✔
465
    }
466
  }
467
}
468

469
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
22,672✔
470
                                  int32_t* pNotReady, const char* id) {
471
  SDownstreamStatusInfo* p = NULL;
22,672✔
472

473
  streamMutexLock(&pInfo->checkInfoLock);
22,672✔
474
  findCheckRspStatus(pInfo, taskId, &p);
22,677✔
475
  if (p != NULL) {
22,677✔
476
    if (reqId != p->reqId) {
22,675!
477
      stError("s-task:%sQID:0x%" PRIx64 " expected:0x%" PRIx64
×
478
              " expired check-rsp recv from downstream task:0x%x, discarded",
479
              id, reqId, p->reqId, taskId);
480
      streamMutexUnlock(&pInfo->checkInfoLock);
×
481
      return TSDB_CODE_FAILED;
×
482
    }
483

484
    // subtract one not-ready-task, since it is ready now
485
    if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
22,675!
486
      *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
20,216✔
487
    } else {
488
      *pNotReady = pInfo->notReadyTasks;
2,459✔
489
    }
490

491
    p->status = status;
22,675✔
492
    p->rspTs = rspTs;
22,675✔
493

494
    streamMutexUnlock(&pInfo->checkInfoLock);
22,675✔
495
    return TSDB_CODE_SUCCESS;
22,675✔
496
  }
497

498
  streamMutexUnlock(&pInfo->checkInfoLock);
2✔
499
  stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, QID:%" PRIx64 " discarded", id, taskId,
2!
500
          reqId);
501
  return TSDB_CODE_FAILED;
2✔
502
}
503

504
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
7,149✔
505
  if (pInfo->inCheckProcess == 0) {
7,149!
506
    pInfo->inCheckProcess = 1;
7,149✔
507
  } else {
508
    stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
×
509
            pInfo->startTs);
510
    pInfo->stopCheckProcess = 0;  // disable auto stop of check process
×
511
    return TSDB_CODE_FAILED;
×
512
  }
513

514
  stDebug("s-task:%s set the in check-rsp flag", id);
7,149✔
515
  return TSDB_CODE_SUCCESS;
7,149✔
516
}
517

518
void streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) {
6,767✔
519
  if (lock) {
6,767✔
520
    streamMutexLock(&pInfo->checkInfoLock);
4,298✔
521
  }
522

523
  if (pInfo->inCheckProcess) {
6,767!
524
    int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
13,534!
525
    stDebug("s-task:%s clear the in check-rsp flag, set the check-rsp done, elapsed time:%" PRId64 " ms", id, el);
6,767✔
526

527
    pInfo->startTs = 0;
6,767✔
528
    pInfo->timeoutStartTs = 0;
6,767✔
529
    pInfo->notReadyTasks = 0;
6,767✔
530
    pInfo->inCheckProcess = 0;
6,767✔
531
    pInfo->stopCheckProcess = 0;
6,767✔
532

533
    pInfo->notReadyRetryCount = 0;
6,767✔
534
    pInfo->timeoutRetryCount = 0;
6,767✔
535

536
    taosArrayClear(pInfo->pList);
6,767✔
537
  } else {
538
    stDebug("s-task:%s already not in check-rsp procedure", id);
×
539
  }
540

541
  if (lock) {
6,767✔
542
    streamMutexUnlock(&pInfo->checkInfoLock);
4,298✔
543
  }
544
}
6,767✔
545

546
// todo: retry until success
547
void streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) {
20,405✔
548
  SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
20,405✔
549
  streamMutexLock(&pInfo->checkInfoLock);
20,405✔
550

551
  SDownstreamStatusInfo* p = NULL;
20,405✔
552
  findCheckRspStatus(pInfo, taskId, &p);
20,405✔
553
  if (p != NULL) {
20,404!
554
    stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
×
555
    streamMutexUnlock(&pInfo->checkInfoLock);
×
556
    return;
×
557
  }
558

559
  void* px = taosArrayPush(pInfo->pList, &info);
40,808✔
560
  if (px == NULL) {
561
    // todo: retry
562
  }
563

564
  streamMutexUnlock(&pInfo->checkInfoLock);
20,404✔
565
}
566

567
int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
2,434✔
568
  const char* id = pTask->id.idStr;
2,434✔
569
  int32_t     code = 0;
2,434✔
570

571
  SStreamTaskCheckReq req = {
2,434✔
572
      .streamId = pTask->id.streamId,
2,434✔
573
      .upstreamTaskId = pTask->id.taskId,
2,434✔
574
      .upstreamNodeId = pTask->info.nodeId,
2,434✔
575
      .childId = pTask->info.selfChildId,
2,434✔
576
      .stage = pTask->pMeta->stage,
2,434✔
577
  };
578

579
  // update the reqId for the new check msg
580
  p->reqId = tGenIdPI64();
2,434✔
581

582
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
2,434✔
583
  if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
2,434✔
584
    STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
454✔
585
    setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId);
454✔
586

587
    stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) QID:0x%" PRIx64, id,
454✔
588
            pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
589

590
    code = streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
454✔
591
  } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
1,980!
592
    SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
1,980✔
593
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
1,980✔
594

595
    for (int32_t i = 0; i < numOfVgs; i++) {
3,910!
596
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
3,910✔
597
      if (pVgInfo == NULL) {
3,910!
598
        continue;
×
599
      }
600

601
      if (p->taskId == pVgInfo->taskId) {
3,910✔
602
        setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
1,980✔
603

604
        stDebug("s-task:%s (vgId:%d) stage:%" PRId64
1,980✔
605
                " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d QID:0x%" PRIx64,
606
                id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId);
607
        code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
1,980✔
608
        break;
1,980✔
609
      }
610
    }
611
  } else if (pOutputInfo->type == TASK_OUTPUT__VTABLE_MAP) {
×
612
    SArray* pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
×
613
    int32_t numTasks = taosArrayGetSize(pTaskInfos);
×
614

615
    for (int32_t i = 0; i < numTasks; ++i) {
×
616
      STaskDispatcherFixed* pAddr = taosArrayGet(pTaskInfos, i);
×
617
      if (pAddr == NULL) {
×
618
        continue;
×
619
      }
620

621
      if (p->taskId == pAddr->taskId) {
×
622
        setCheckDownstreamReqInfo(&req, p->reqId, pAddr->taskId, pAddr->nodeId);
×
623

624
        stDebug("s-task:%s (vgId:%d) stage:%" PRId64
×
625
                " re-send check vtable downstream task:0x%x(vgId:%d), QID:0x%" PRIx64,
626
                id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, p->reqId);
627
        code = streamSendCheckMsg(pTask, &req, pAddr->nodeId, &pAddr->epSet);
×
628
        break;
×
629
      }
630
    }
631
  }
632

633
  if (code) {
2,434!
634
    stError("s-task:%s failed to send check msg to downstream, code:%s", pTask->id.idStr, tstrerror(code));
×
635
  }
636
  return code;
2,434✔
637
}
638

639
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
2,324✔
640
                       int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
641
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
8,394✔
642
    SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
6,070✔
643
    if (p == NULL) {
6,070!
644
      continue;
×
645
    }
646

647
    if (p->status == TASK_DOWNSTREAM_READY) {
6,070✔
648
      (*numOfReady) += 1;
468✔
649
    } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
5,602!
650
      stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
16✔
651
              p->taskId);
652
      (*numOfFault) += 1;
16✔
653
    } else {                                 // TASK_DOWNSTREAM_NOT_READY
654
      if (p->rspTs == 0) {                   // not response yet
5,586✔
655
        if (el >= CHECK_NOT_RSP_DURATION) {  // not receive info for 10 sec.
3,152!
656
          void* px = taosArrayPush(pTimeoutList, &p->taskId);
×
657
          if (px == NULL) {
×
658
            stError("s-task:%s failed to record time out task:0x%x", id, p->taskId);
×
659
          }
660
        } else {                // el < CHECK_NOT_RSP_DURATION
661
          (*numOfNotRsp) += 1;  // do nothing and continue waiting for their rsp
3,152✔
662
        }
663
      } else {
664
        void* px = taosArrayPush(pNotReadyList, &p->taskId);
2,434✔
665
        if (px == NULL) {
2,434!
666
          stError("s-task:%s failed to record not ready task:0x%x", id, p->taskId);
×
667
        }
668
      }
669
    }
670
  }
671
}
2,324✔
672

673
void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId) {
22,839✔
674
  pReq->reqId = reqId;
22,839✔
675
  pReq->downstreamTaskId = dstTaskId;
22,839✔
676
  pReq->downstreamNodeId = dstNodeId;
22,839✔
677
}
22,839✔
678

679
void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
×
680
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
×
681
  const char*     id = pTask->id.idStr;
×
682
  int32_t         vgId = pTask->pMeta->vgId;
×
683
  int32_t         numOfTimeout = taosArrayGetSize(pTimeoutList);
×
684
  int32_t         code = 0;
×
685

686
  pInfo->timeoutStartTs = taosGetTimestampMs();
×
687
  for (int32_t i = 0; i < numOfTimeout; ++i) {
×
688
    int32_t* px = taosArrayGet(pTimeoutList, i);
×
689
    if (px == NULL) {
×
690
      continue;
×
691
    }
692

693
    int32_t                taskId = *px;
×
694
    SDownstreamStatusInfo* p = NULL;
×
695
    findCheckRspStatus(pInfo, taskId, &p);
×
696

697
    if (p != NULL) {
×
698
      if (p->status != -1 || p->rspTs != 0) {
×
699
        stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, id, i, p->status, p->rspTs);
×
700
        continue;
×
701
      }
702
      code = doSendCheckMsg(pTask, p);
×
703
    }
704
  }
705

706
  pInfo->timeoutRetryCount += 1;
×
707

708
  // timeout more than 600 sec, add into node update list
709
  if (pInfo->timeoutRetryCount > 10) {
×
710
    pInfo->timeoutRetryCount = 0;
×
711

712
    for (int32_t i = 0; i < numOfTimeout; ++i) {
×
713
      int32_t* pTaskId = taosArrayGet(pTimeoutList, i);
×
714
      if (pTaskId == NULL) {
×
715
        continue;
×
716
      }
717

718
      SDownstreamStatusInfo* p = NULL;
×
719
      findCheckRspStatus(pInfo, *pTaskId, &p);
×
720
      if (p != NULL) {
×
721
        code = streamTaskAddIntoNodeUpdateList(pTask, p->vgId);
×
722
        stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 600sec, add into nodeUpdate list",
×
723
                id, vgId, p->taskId, p->vgId);
724
      }
725
    }
726

727
    stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
×
728
  } else {
729
    stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
×
730
            vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs);
731
  }
732
}
×
733

734
void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
1,306✔
735
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
1,306✔
736
  const char*     id = pTask->id.idStr;
1,306✔
737
  int32_t         vgId = pTask->pMeta->vgId;
1,306✔
738
  int32_t         numOfNotReady = taosArrayGetSize(pNotReadyList);
1,306✔
739

740
  // reset the info, and send the check msg to failure downstream again
741
  for (int32_t i = 0; i < numOfNotReady; ++i) {
3,740✔
742
    int32_t* pTaskId = taosArrayGet(pNotReadyList, i);
2,434✔
743
    if (pTaskId == NULL) {
2,434!
744
      continue;
×
745
    }
746

747
    SDownstreamStatusInfo* p = NULL;
2,434✔
748
    findCheckRspStatus(pInfo, *pTaskId, &p);
2,434✔
749
    if (p != NULL) {
2,434!
750
      p->rspTs = 0;
2,434✔
751
      p->status = -1;
2,434✔
752
      int32_t code = doSendCheckMsg(pTask, p);
2,434✔
753
    }
754
  }
755

756
  pInfo->notReadyRetryCount += 1;
1,306✔
757
  stDebug("s-task:%s vgId:%d %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
1,306✔
758
          vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
759
}
1,306✔
760

761
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
762
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
763
// of restart in timer thread will result in a deadlock.
764
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
×
765
  return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK, false);
×
766
}
767

768
static void doCleanup(SStreamTask* pTask, SArray* pNotReadyList, SArray* pTimeoutList, void* param) {
9,081✔
769
  streamMetaReleaseTask(pTask->pMeta, pTask);
9,081✔
770

771
  taosArrayDestroy(pNotReadyList);
9,081✔
772
  taosArrayDestroy(pTimeoutList);
9,081✔
773
  streamTaskFreeRefId(param);
9,081✔
774
}
9,081✔
775

776
// this function is executed in timer thread
777
void rspMonitorFn(void* param, void* tmrId) {
9,082✔
778
  int32_t         numOfReady = 0;
9,082✔
779
  int32_t         numOfFault = 0;
9,082✔
780
  int32_t         numOfNotRsp = 0;
9,082✔
781
  int32_t         numOfNotReady = 0;
9,082✔
782
  int32_t         numOfTimeout = 0;
9,082✔
783
  int64_t         taskRefId = *(int64_t*)param;
9,082✔
784
  int64_t         now = taosGetTimestampMs();
9,082✔
785
  SArray*         pNotReadyList = NULL;
9,082✔
786
  SArray*         pTimeoutList = NULL;
9,082✔
787
  SStreamMeta*    pMeta = NULL;
9,082✔
788
  STaskCheckInfo* pInfo = NULL;
9,082✔
789
  int32_t         vgId = -1;
9,082✔
790
  int64_t         timeoutDuration = 0;
9,082✔
791
  const char*     id = NULL;
9,082✔
792
  int32_t         total = 0;
9,082✔
793

794
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
9,082✔
795
  if (pTask == NULL) {
9,082✔
796
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
1!
797
    streamTaskFreeRefId(param);
1✔
798
    return;
6,768✔
799
  }
800

801
  pMeta = pTask->pMeta;
9,081✔
802
  pInfo = &pTask->taskCheckInfo;
9,081✔
803
  vgId = pTask->pMeta->vgId;
9,081✔
804
  timeoutDuration = now - pInfo->timeoutStartTs;
9,081✔
805
  id = pTask->id.idStr;
9,081✔
806
  total = (int32_t) taosArrayGetSize(pInfo->pList);
9,081✔
807

808
  stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
9,081✔
809

810
  streamMutexLock(&pTask->lock);
9,081✔
811
  SStreamTaskState state = streamTaskGetStatus(pTask);
9,081✔
812
  streamMutexUnlock(&pTask->lock);
9,081✔
813

814
  if (state.state == TASK_STATUS__STOP) {
9,081!
815
    stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
×
816
    streamTaskCompleteCheckRsp(pInfo, true, id);
×
817

818
    // not record the failure of the current task if try to close current vnode
819
    // otherwise, the put of message operation may incur invalid read of message queue.
820
    if (!pMeta->closeFlag) {
×
821
      int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
×
822
      if (code) {
×
823
        stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
×
824
      }
825
    }
826

827
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
828
    return;
×
829
  }
830

831
  if (state.state == TASK_STATUS__DROPPING || state.state == TASK_STATUS__READY) {
9,081!
832
    stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
4,298✔
833

834
    streamTaskCompleteCheckRsp(pInfo, true, id);
4,298✔
835
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
4,298✔
836
    return;
4,298✔
837
  }
838

839
  streamMutexLock(&pInfo->checkInfoLock);
4,783✔
840
  if (pInfo->notReadyTasks == 0) {
4,783✔
841
    stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr", id, state.name, vgId);
2,459✔
842

843
    streamTaskCompleteCheckRsp(pInfo, false, id);
2,459✔
844
    streamMutexUnlock(&pInfo->checkInfoLock);
2,459✔
845
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
2,459✔
846
    return;
2,459✔
847
  }
848

849
  pNotReadyList = taosArrayInit(4, sizeof(int64_t));
2,324✔
850
  pTimeoutList = taosArrayInit(4, sizeof(int64_t));
2,324✔
851

852
  if (state.state == TASK_STATUS__UNINIT) {
2,324!
853
    getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
2,324✔
854

855
    numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
2,324✔
856
    numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
2,324✔
857

858
    // fault tasks detected, not try anymore
859
    bool jumpOut = false;
2,324✔
860
    if ((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) != total) {
2,324!
861
      stError(
×
862
          "s-task:%s vgId:%d internal error in handling the check downstream procedure, rsp number is inconsistent, "
863
          "stop rspMonitor tmr, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
864
          id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
865
      jumpOut = true;
×
866
    }
867

868
    if (numOfFault > 0) {
2,324✔
869
      stDebug(
10✔
870
          "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
871
          "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
872
          id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
873
      jumpOut = true;
10✔
874
    }
875

876
    if (jumpOut) {
2,324✔
877
      streamTaskCompleteCheckRsp(pInfo, false, id);
10✔
878
      streamMutexUnlock(&pInfo->checkInfoLock);
10✔
879
      doCleanup(pTask, pNotReadyList, pTimeoutList, param);
10✔
880
      return;
10✔
881
    }
882
  } else {  // unexpected status
883
    stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, state.name);
×
884
  }
885

886
  // checking of downstream tasks has been stopped by other threads
887
  if (pInfo->stopCheckProcess == 1) {
2,314!
888
    stDebug(
×
889
        "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, "
890
        "notReady:%d, fault:%d, timeout:%d, ready:%d",
891
        id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
892

893
    streamTaskCompleteCheckRsp(pInfo, false, id);
×
894
    streamMutexUnlock(&pInfo->checkInfoLock);
×
895

896
    int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
×
897
    if (code) {
×
898
      stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
×
899
    }
900

901
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
902
    return;
×
903
  }
904

905
  if (numOfNotReady > 0) {  // check to make sure not in recheck timer
2,314✔
906
    handleNotReadyDownstreamTask(pTask, pNotReadyList);
1,306✔
907
  }
908

909
  if (numOfTimeout > 0) {
2,314!
910
    handleTimeoutDownstreamTasks(pTask, pTimeoutList);
×
911
  }
912

913
  streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, param, streamTimer, &pInfo->checkRspTmr, vgId,
2,314✔
914
                 "check-status-monitor");
915
  streamMutexUnlock(&pInfo->checkInfoLock);
2,314✔
916

917
  stDebug(
2,314✔
918
      "s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, "
919
      "ready:%d",
920
      id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
921
  doCleanup(pTask, pNotReadyList, pTimeoutList, NULL);
2,314✔
922
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc