• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3525

10 Nov 2024 03:50AM UTC coverage: 60.818% (-0.08%) from 60.898%
#3525

push

travis-ci

web-flow
Merge pull request #28709 from taosdata/main

merge: from main to 3.0 branch

118634 of 249004 branches covered (47.64%)

Branch coverage included in aggregate %.

136 of 169 new or added lines in 23 files covered. (80.47%)

542 existing lines in 129 files now uncovered.

199071 of 273386 relevant lines covered (72.82%)

15691647.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.45
/source/libs/stream/src/streamCheckStatus.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cos.h"
17
#include "rsync.h"
18
#include "streamBackendRocksdb.h"
19
#include "streamInt.h"
20

21
#define CHECK_NOT_RSP_DURATION 10 * 1000  // 10 sec
22

23
static void    processDownstreamReadyRsp(SStreamTask* pTask);
24
static int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
25
static void    rspMonitorFn(void* param, void* tmrId);
26
static void    streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
27
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
28
static void    streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
29
static void    streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id);
30
static int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
31
static void    handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList);
32
static void    handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList);
33
static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs,
34
                                         int64_t reqId, int32_t* pNotReady, const char* id);
35
static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId);
36
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
37
                              int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
38
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId);
39
static void    findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo);
40

41
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
21,479✔
42
                              int64_t* oldStage) {
43
  SStreamUpstreamEpInfo* pInfo = NULL;
21,479✔
44
  streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo);
21,479✔
45
  if (pInfo == NULL) {
21,497!
46
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
47
  }
48

49
  *oldStage = pInfo->stage;
21,497✔
50
  const char* id = pTask->id.idStr;
21,497✔
51
  if (stage == -1) {
21,497!
52
    stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
×
53
            upstreamTaskId, vgId, stage);
54
    return 0;
×
55
  }
56

57
  if (pInfo->stage == -1) {
21,497✔
58
    pInfo->stage = stage;
19,447✔
59
    stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id,
19,447✔
60
            upstreamTaskId, vgId, stage);
61
  }
62

63
  if (pInfo->stage < stage) {
21,497✔
64
    stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
57!
65
            ", prev:%" PRId64,
66
            id, upstreamTaskId, vgId, stage, pInfo->stage);
67
    // record the checkpoint failure id and sent to mnode
68
    streamTaskSetCheckpointFailed(pTask);
57✔
69
  }
70

71
  if (pInfo->stage != stage) {
21,491✔
72
    return TASK_UPSTREAM_NEW_STAGE;
57✔
73
  } else if (pTask->status.downstreamReady != 1) {
21,434✔
74
    stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
2,052✔
75
    return TASK_DOWNSTREAM_NOT_READY;
2,052✔
76
  } else {
77
    return TASK_DOWNSTREAM_READY;
19,382✔
78
  }
79
}
80

81
// check status
82
void streamTaskSendCheckMsg(SStreamTask* pTask) {
13,719✔
83
  SDataRange*  pRange = &pTask->dataRange;
13,719✔
84
  STimeWindow* pWindow = &pRange->window;
13,719✔
85
  const char*  idstr = pTask->id.idStr;
13,719✔
86
  int32_t      code = 0;
13,719✔
87

88
  SStreamTaskCheckReq req = {
13,719✔
89
      .streamId = pTask->id.streamId,
13,719✔
90
      .upstreamTaskId = pTask->id.taskId,
13,719✔
91
      .upstreamNodeId = pTask->info.nodeId,
13,719✔
92
      .childId = pTask->info.selfChildId,
13,719✔
93
      .stage = pTask->pMeta->stage,
13,719✔
94
  };
95

96
  // serialize streamProcessScanHistoryFinishRsp
97
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
13,719✔
98
    streamTaskStartMonitorCheckRsp(pTask);
944✔
99

100
    STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
944✔
101

102
    setCheckDownstreamReqInfo(&req, tGenIdPI64(), pDispatch->taskId, pDispatch->nodeId);
944✔
103
    streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr);
944✔
104

105
    stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
944✔
106
            " window:%" PRId64 "-%" PRId64 " QID:0x%" PRIx64,
107
            idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
108
            pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
109

110
    code = streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
944✔
111
                              &pTask->outputInfo.fixedDispatcher.epSet);
112

113
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
12,775✔
114
    streamTaskStartMonitorCheckRsp(pTask);
6,017✔
115

116
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
6,018✔
117

118
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
6,018✔
119
    stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, idstr,
6,018✔
120
            numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
121

122
    for (int32_t i = 0; i < numOfVgs; i++) {
24,716✔
123
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
18,698✔
124
      if (pVgInfo == NULL) {
18,698!
125
        continue;
×
126
      }
127

128
      setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId);
18,698✔
129
      streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
18,698✔
130

131
      stDebug("s-task:%s (vgId:%d) stage:%" PRId64
18,698✔
132
              " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, QID:0x%" PRIx64,
133
              idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
134
      code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
18,698✔
135
    }
136
  } else {  // for sink task, set it ready directly.
137
    stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
6,758✔
138
    streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
6,758✔
139
    processDownstreamReadyRsp(pTask);
6,760✔
140
  }
141

142
  if (code) {
13,723!
143
    stError("s-task:%s failed to send check msg to downstream, code:%s", idstr, tstrerror(code));
×
144
  }
145
}
13,723✔
146

147
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) {
21,546✔
148
  int32_t taskId = pReq->downstreamTaskId;
21,546✔
149

150
  *pRsp = (SStreamTaskCheckRsp){
21,546✔
151
      .reqId = pReq->reqId,
21,546✔
152
      .streamId = pReq->streamId,
21,546✔
153
      .childId = pReq->childId,
21,546✔
154
      .downstreamNodeId = pReq->downstreamNodeId,
21,546✔
155
      .downstreamTaskId = pReq->downstreamTaskId,
21,546✔
156
      .upstreamNodeId = pReq->upstreamNodeId,
21,546✔
157
      .upstreamTaskId = pReq->upstreamTaskId,
21,546✔
158
  };
159

160
  // only the leader node handle the check request
161
  if (pMeta->role == NODE_ROLE_FOLLOWER) {
21,546✔
162
    stError(
3!
163
        "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
164
        taskId, pReq->upstreamTaskId, pReq->upstreamNodeId, pMeta->vgId);
165
    pRsp->status = TASK_DOWNSTREAM_NOT_LEADER;
3✔
166
  } else {
167
    SStreamTask* pTask = NULL;
21,543✔
168
    int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, taskId, &pTask);
21,543✔
169
    if (pTask != NULL) {
21,523✔
170
      pRsp->status =
21,495✔
171
          streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
21,517✔
172

173
      SStreamTaskState pState = streamTaskGetStatus(pTask);
21,495✔
174
      stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(QID:0x%" PRIx64
21,490✔
175
              ") task:0x%x (vgId:%d), check_status:%d",
176
              pTask->id.idStr, pState.name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
177
              pRsp->status);
178
      streamMetaReleaseTask(pMeta, pTask);
21,490✔
179
    } else {
180
      pRsp->status = TASK_DOWNSTREAM_NOT_READY;
6✔
181
      stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(QID:0x%" PRIx64
6!
182
              ") from task:0x%x (vgId:%d), rsp check_status %d",
183
              pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status);
184
    }
185
  }
186
}
21,528✔
187

188
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
21,479✔
189
  int64_t         now = taosGetTimestampMs();
21,500✔
190
  const char*     id = pTask->id.idStr;
21,500✔
191
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
21,500✔
192
  int32_t         total = streamTaskGetNumOfDownstream(pTask);
21,500✔
193
  int32_t         left = -1;
21,448✔
194

195
  if (streamTaskShouldStop(pTask)) {
21,448!
196
    stDebug("s-task:%s should stop, do not do check downstream again", id);
×
197
    return TSDB_CODE_SUCCESS;
×
198
  }
199

200
  if (pTask->id.taskId != pRsp->upstreamTaskId) {
21,464!
201
    stError("s-task:%s invalid check downstream rsp, upstream task:0x%x discard", id, pRsp->upstreamTaskId);
×
202
    return TSDB_CODE_INVALID_MSG;
×
203
  }
204

205
  if (pRsp->status == TASK_DOWNSTREAM_READY) {
21,464✔
206
    int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
19,344✔
207
    if (code != TSDB_CODE_SUCCESS) {
19,409✔
208
      return TSDB_CODE_SUCCESS;
2✔
209
    }
210

211
    if (left == 0) {
19,407✔
212
      processDownstreamReadyRsp(pTask);  // all downstream tasks are ready, set the complete check downstream flag
6,841✔
213
      streamTaskStopMonitorCheckRsp(pInfo, id);
6,841✔
214
    } else {
215
      stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
12,566✔
216
              pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
217
    }
218
  } else {  // not ready, wait for 100ms and retry
219
    int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
2,120✔
220
    if (code != TSDB_CODE_SUCCESS) {
2,121✔
221
      return TSDB_CODE_SUCCESS;  // return success in any cases.
2✔
222
    }
223

224
    if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
2,119✔
225
      if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
60✔
226
        stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
57!
227
                ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
228
                id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
229
        code = addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
57✔
230
      } else {
231
        stError(
3!
232
            "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
233
            "downstream again, nodeUpdate needed",
234
            id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
235
        code = addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
3✔
236
      }
237

238
      streamMetaAddFailedTaskSelf(pTask, now);
60✔
239
    } else {  // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
240
      stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
2,059✔
241
              pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
242
    }
243
  }
244

245
  return 0;
21,531✔
246
}
247

248
int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
21,509✔
249
                               SRpcHandleInfo* pRpcInfo, int32_t taskId) {
250
  SEncoder encoder;
251
  int32_t  code = 0;
21,509✔
252
  int32_t  len;
253

254
  tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
21,509!
255
  if (code < 0) {
21,509!
256
    stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
×
257
    return TSDB_CODE_INVALID_MSG;
×
258
  }
259

260
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
21,509✔
261
  if (buf == NULL) {
21,548!
262
    stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__,
×
263
            tstrerror(code));
264
    return terrno;
×
265
  }
266

267
  ((SMsgHead*)buf)->vgId = htonl(vgId);
21,548✔
268

269
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
21,548✔
270
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
21,548✔
271
  code = tEncodeStreamTaskCheckRsp(&encoder, pRsp);
21,531✔
272
  tEncoderClear(&encoder);
21,552✔
273

274
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
21,563✔
275
  tmsgSendRsp(&rspMsg);
21,563✔
276

277
  code = TMIN(code, 0);
21,545✔
278
  return code;
21,545✔
279
}
280

281
void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
6,961✔
282
  int32_t         vgId = pTask->pMeta->vgId;
6,961✔
283
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
6,961✔
284

285
  streamMutexLock(&pInfo->checkInfoLock);
6,961✔
286

287
  // drop procedure already started, not start check downstream now
288
  ETaskStatus s = streamTaskGetStatus(pTask).state;
6,962✔
289
  if (s == TASK_STATUS__DROPPING) {
6,961!
UNCOV
290
    stDebug("s-task:%s task not in uninit status, status:%s not start monitor check-rsp", pTask->id.idStr,
×
291
            streamTaskGetStatusStr(s));
UNCOV
292
    streamMutexUnlock(&pInfo->checkInfoLock);
×
UNCOV
293
    return;
×
294
  }
295

296
  int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
6,961✔
297
  if (code != TSDB_CODE_SUCCESS) {
6,959!
298
    streamMutexUnlock(&pInfo->checkInfoLock);
×
299
    return;
×
300
  }
301

302
  streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
6,961✔
303

304
  int64_t* pTaskRefId = NULL;
6,962✔
305
  code = streamTaskAllocRefId(pTask, &pTaskRefId);
6,962✔
306
  if (code == 0) {
6,962!
307
    streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTaskRefId, streamTimer, &pInfo->checkRspTmr, vgId,
6,962✔
308
                   "check-status-monitor");
309
  }
310

311
  streamMutexUnlock(&pInfo->checkInfoLock);
6,962✔
312
}
313

314
void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
13,742✔
315
  streamMutexLock(&pInfo->checkInfoLock);
13,742✔
316
  pInfo->stopCheckProcess = 1;
13,744✔
317
  streamMutexUnlock(&pInfo->checkInfoLock);
13,744✔
318

319
  stDebug("s-task:%s set stop check-rsp monitor flag", id);
13,743✔
320
}
13,743✔
321

322
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
59,216✔
323
  taosArrayDestroy(pInfo->pList);
59,216✔
324
  pInfo->pList = NULL;
59,221✔
325

326
  if (pInfo->checkRspTmr != NULL) {
59,221✔
327
    streamTmrStop(pInfo->checkRspTmr);
6,845✔
328
    pInfo->checkRspTmr = NULL;
6,844✔
329
  }
330

331
  streamMutexDestroy(&pInfo->checkInfoLock);
59,220✔
332
}
59,219✔
333

334
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
335
void processDownstreamReadyRsp(SStreamTask* pTask) {
13,600✔
336
  EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
13,600✔
337
  int32_t          code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
13,600✔
338
  if (code) {
13,602✔
339
    stError("s-task:%s failed to set event succ, code:%s", pTask->id.idStr, tstrerror(code));
1!
340
  }
341

342
  int64_t checkTs = pTask->execInfo.checkTs;
13,602✔
343
  int64_t readyTs = pTask->execInfo.readyTs;
13,602✔
344
  code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
13,602✔
345
  if (code) {
13,602✔
346
    stError("s-task:%s failed to record the downstream task status, code:%s", pTask->id.idStr, tstrerror(code));
2!
347
  }
348

349
  if (pTask->status.taskStatus == TASK_STATUS__HALT) {
13,602✔
350
    if (!HAS_RELATED_FILLHISTORY_TASK(pTask) || (pTask->info.fillHistory != 0)) {
96!
351
      stError("s-task:%s status:halt fillhistory:%d not handle the ready rsp", pTask->id.idStr,
×
352
              pTask->info.fillHistory);
353
    }
354

355
    // halt it self for count window stream task until the related fill history task completed.
356
    stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
96✔
357
            pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
358
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
96✔
359
    if (code != 0) {  // todo: handle error
96!
360
      stError("s-task:%s failed to handle halt event, code:%s", pTask->id.idStr, tstrerror(code));
×
361
    }
362
  }
363

364
  // start the related fill-history task, when current task is ready
365
  // not invoke in success callback due to the deadlock.
366
  // todo: let's retry
367
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
13,602✔
368
    stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr);
4,881✔
369
    code = streamLaunchFillHistoryTask(pTask);
4,881✔
370
    if (code) {
4,881!
371
      stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
×
372
    }
373
  }
374
}
13,602✔
375

376
int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
60✔
377
  int32_t vgId = pTask->pMeta->vgId;
60✔
378
  int32_t code = 0;
60✔
379
  ;
380
  bool existed = false;
60✔
381

382
  streamMutexLock(&pTask->lock);
60✔
383

384
  int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
60✔
385
  for (int i = 0; i < num; ++i) {
60✔
386
    SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
25✔
387
    if (p == NULL) {
25!
388
      continue;
×
389
    }
390

391
    if (p->nodeId == nodeId) {
25!
392
      existed = true;
25✔
393
      break;
25✔
394
    }
395
  }
396

397
  if (!existed) {
60✔
398
    SDownstreamTaskEpset t = {.nodeId = nodeId};
35✔
399

400
    void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t);
35✔
401
    if (p == NULL) {
35!
402
      code = terrno;
×
403
      stError("s-task:%s vgId:%d failed to update epset, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
404
    } else {
405
      stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr,
35!
406
             vgId, t.nodeId, (num + 1));
407
    }
408
  }
409

410
  streamMutexUnlock(&pTask->lock);
60✔
411
  return code;
60✔
412
}
413

414
void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
6,959✔
415
  taosArrayClear(pInfo->pList);
6,959✔
416

417
  if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
6,959✔
418
    pInfo->notReadyTasks = 1;
944✔
419
  } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
6,015!
420
    pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
6,015✔
421
  }
422

423
  pInfo->startTs = startTs;
6,962✔
424
  pInfo->timeoutStartTs = startTs;
6,962✔
425
  pInfo->stopCheckProcess = 0;
6,962✔
426
}
6,962✔
427

428
void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo) {
43,213✔
429
  if (pStatusInfo == NULL) {
43,213!
430
    return;
×
431
  }
432

433
  *pStatusInfo = NULL;
43,213✔
434
  for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
137,993✔
435
    SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
94,786✔
436
    if (p == NULL) {
94,780!
437
      continue;
×
438
    }
439

440
    if (p->taskId == taskId) {
94,780✔
441
      *pStatusInfo = p;
23,561✔
442
    }
443
  }
444
}
445

446
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
21,439✔
447
                                  int32_t* pNotReady, const char* id) {
448
  SDownstreamStatusInfo* p = NULL;
21,439✔
449

450
  streamMutexLock(&pInfo->checkInfoLock);
21,439✔
451
  findCheckRspStatus(pInfo, taskId, &p);
21,532✔
452
  if (p != NULL) {
21,514✔
453
    if (reqId != p->reqId) {
21,512✔
454
      stError("s-task:%sQID:0x%" PRIx64 " expected:0x%" PRIx64
2!
455
              " expired check-rsp recv from downstream task:0x%x, discarded",
456
              id, reqId, p->reqId, taskId);
457
      streamMutexUnlock(&pInfo->checkInfoLock);
2✔
458
      return TSDB_CODE_FAILED;
2✔
459
    }
460

461
    // subtract one not-ready-task, since it is ready now
462
    if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
21,510!
463
      *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
19,395✔
464
    } else {
465
      *pNotReady = pInfo->notReadyTasks;
2,115✔
466
    }
467

468
    p->status = status;
21,527✔
469
    p->rspTs = rspTs;
21,527✔
470

471
    streamMutexUnlock(&pInfo->checkInfoLock);
21,527✔
472
    return TSDB_CODE_SUCCESS;
21,527✔
473
  }
474

475
  streamMutexUnlock(&pInfo->checkInfoLock);
2✔
476
  stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x,QID:%" PRIx64 " discarded", id, taskId,
2!
477
          reqId);
478
  return TSDB_CODE_FAILED;
2✔
479
}
480

481
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
6,959✔
482
  if (pInfo->inCheckProcess == 0) {
6,959!
483
    pInfo->inCheckProcess = 1;
6,960✔
484
  } else {
485
    stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
×
486
            pInfo->startTs);
487
    pInfo->stopCheckProcess = 0;  // disable auto stop of check process
×
488
    return TSDB_CODE_FAILED;
×
489
  }
490

491
  stDebug("s-task:%s set the in check-rsp flag", id);
6,960✔
492
  return TSDB_CODE_SUCCESS;
6,960✔
493
}
494

495
void streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) {
6,554✔
496
  if (lock) {
6,554✔
497
    streamMutexLock(&pInfo->checkInfoLock);
4,272✔
498
  }
499

500
  if (pInfo->inCheckProcess) {
6,554!
501
    int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
13,108!
502
    stDebug("s-task:%s clear the in check-rsp flag, set the check-rsp done, elapsed time:%" PRId64 " ms", id, el);
6,554✔
503

504
    pInfo->startTs = 0;
6,554✔
505
    pInfo->timeoutStartTs = 0;
6,554✔
506
    pInfo->notReadyTasks = 0;
6,554✔
507
    pInfo->inCheckProcess = 0;
6,554✔
508
    pInfo->stopCheckProcess = 0;
6,554✔
509

510
    pInfo->notReadyRetryCount = 0;
6,554✔
511
    pInfo->timeoutRetryCount = 0;
6,554✔
512

513
    taosArrayClear(pInfo->pList);
6,554✔
514
  } else {
515
    stDebug("s-task:%s already not in check-rsp procedure", id);
×
516
  }
517

518
  if (lock) {
6,554✔
519
    streamMutexUnlock(&pInfo->checkInfoLock);
4,272✔
520
  }
521
}
6,554✔
522

523
// todo: retry until success
524
void streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) {
19,642✔
525
  SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
19,642✔
526
  streamMutexLock(&pInfo->checkInfoLock);
19,642✔
527

528
  SDownstreamStatusInfo* p = NULL;
19,641✔
529
  findCheckRspStatus(pInfo, taskId, &p);
19,641✔
530
  if (p != NULL) {
19,640!
531
    stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
×
532
    streamMutexUnlock(&pInfo->checkInfoLock);
×
533
    return;
×
534
  }
535

536
  void* px = taosArrayPush(pInfo->pList, &info);
39,279✔
537
  if (px == NULL) {
538
    // todo: retry
539
  }
540

541
  streamMutexUnlock(&pInfo->checkInfoLock);
19,639✔
542
}
543

544
int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
2,041✔
545
  const char* id = pTask->id.idStr;
2,041✔
546
  int32_t     code = 0;
2,041✔
547

548
  SStreamTaskCheckReq req = {
2,041✔
549
      .streamId = pTask->id.streamId,
2,041✔
550
      .upstreamTaskId = pTask->id.taskId,
2,041✔
551
      .upstreamNodeId = pTask->info.nodeId,
2,041✔
552
      .childId = pTask->info.selfChildId,
2,041✔
553
      .stage = pTask->pMeta->stage,
2,041✔
554
  };
555

556
  // update the reqId for the new check msg
557
  p->reqId = tGenIdPI64();
2,041✔
558

559
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
2,041✔
560
  if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
2,041✔
561
    STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
529✔
562
    setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId);
529✔
563

564
    stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) QID:0x%" PRIx64, id,
529✔
565
            pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
566

567
    code = streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
529✔
568
  } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
1,512!
569
    SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
1,512✔
570
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
1,512✔
571

572
    for (int32_t i = 0; i < numOfVgs; i++) {
2,885!
573
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
2,885✔
574
      if (pVgInfo == NULL) {
2,885!
575
        continue;
×
576
      }
577

578
      if (p->taskId == pVgInfo->taskId) {
2,885✔
579
        setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
1,512✔
580

581
        stDebug("s-task:%s (vgId:%d) stage:%" PRId64
1,512✔
582
                " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d QID:0x%" PRIx64,
583
                id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId);
584
        code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
1,512✔
585
        break;
1,512✔
586
      }
587
    }
588
  }
589

590
  if (code) {
2,041!
591
    stError("s-task:%s failed to send check msg to downstream, code:%s", pTask->id.idStr, tstrerror(code));
×
592
  }
593
  return code;
2,041✔
594
}
595

596
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
2,197✔
597
                       int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
598
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
7,602✔
599
    SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
5,405✔
600
    if (p == NULL) {
5,405!
601
      continue;
×
602
    }
603

604
    if (p->status == TASK_DOWNSTREAM_READY) {
5,405✔
605
      (*numOfReady) += 1;
968✔
606
    } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
4,437✔
607
      stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
60!
608
              p->taskId);
609
      (*numOfFault) += 1;
60✔
610
    } else {                                 // TASK_DOWNSTREAM_NOT_READY
611
      if (p->rspTs == 0) {                   // not response yet
4,377✔
612
        if (el >= CHECK_NOT_RSP_DURATION) {  // not receive info for 10 sec.
2,337✔
613
          void* px = taosArrayPush(pTimeoutList, &p->taskId);
2✔
614
          if (px == NULL) {
2!
615
            stError("s-task:%s failed to record time out task:0x%x", id, p->taskId);
×
616
          }
617
        } else {                // el < CHECK_NOT_RSP_DURATION
618
          (*numOfNotRsp) += 1;  // do nothing and continue waiting for their rsp
2,335✔
619
        }
620
      } else {
621
        void* px = taosArrayPush(pNotReadyList, &p->taskId);
2,040✔
622
        if (px == NULL) {
2,040!
623
          stError("s-task:%s failed to record not ready task:0x%x", id, p->taskId);
×
624
        }
625
      }
626
    }
627
  }
628
}
2,197✔
629

630
void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId) {
21,683✔
631
  pReq->reqId = reqId;
21,683✔
632
  pReq->downstreamTaskId = dstTaskId;
21,683✔
633
  pReq->downstreamNodeId = dstNodeId;
21,683✔
634
}
21,683✔
635

636
void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
2✔
637
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
2✔
638
  const char*     id = pTask->id.idStr;
2✔
639
  int32_t         vgId = pTask->pMeta->vgId;
2✔
640
  int32_t         numOfTimeout = taosArrayGetSize(pTimeoutList);
2✔
641
  int32_t         code = 0;
2✔
642

643
  pInfo->timeoutStartTs = taosGetTimestampMs();
2✔
644
  for (int32_t i = 0; i < numOfTimeout; ++i) {
4✔
645
    int32_t* px = taosArrayGet(pTimeoutList, i);
2✔
646
    if (px == NULL) {
2!
647
      continue;
×
648
    }
649

650
    int32_t                taskId = *px;
2✔
651
    SDownstreamStatusInfo* p = NULL;
2✔
652
    findCheckRspStatus(pInfo, taskId, &p);
2✔
653

654
    if (p != NULL) {
2!
655
      if (p->status != -1 || p->rspTs != 0) {
2!
656
        stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, id, i, p->status, p->rspTs);
×
657
        continue;
×
658
      }
659
      code = doSendCheckMsg(pTask, p);
2✔
660
    }
661
  }
662

663
  pInfo->timeoutRetryCount += 1;
2✔
664

665
  // timeout more than 100 sec, add into node update list
666
  if (pInfo->timeoutRetryCount > 10) {
2!
667
    pInfo->timeoutRetryCount = 0;
×
668

669
    for (int32_t i = 0; i < numOfTimeout; ++i) {
×
670
      int32_t* pTaskId = taosArrayGet(pTimeoutList, i);
×
671
      if (pTaskId == NULL) {
×
672
        continue;
×
673
      }
674

675
      SDownstreamStatusInfo* p = NULL;
×
676
      findCheckRspStatus(pInfo, *pTaskId, &p);
×
677
      if (p != NULL) {
×
678
        code = addIntoNodeUpdateList(pTask, p->vgId);
×
679
        stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list",
×
680
                id, vgId, p->taskId, p->vgId);
681
      }
682
    }
683

684
    stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
×
685
  } else {
686
    stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
2✔
687
            vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs);
688
  }
689
}
2✔
690

691
void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
1,162✔
692
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
1,162✔
693
  const char*     id = pTask->id.idStr;
1,162✔
694
  int32_t         vgId = pTask->pMeta->vgId;
1,162✔
695
  int32_t         numOfNotReady = taosArrayGetSize(pNotReadyList);
1,162✔
696

697
  // reset the info, and send the check msg to failure downstream again
698
  for (int32_t i = 0; i < numOfNotReady; ++i) {
3,201✔
699
    int32_t* pTaskId = taosArrayGet(pNotReadyList, i);
2,039✔
700
    if (pTaskId == NULL) {
2,039!
701
      continue;
×
702
    }
703

704
    SDownstreamStatusInfo* p = NULL;
2,039✔
705
    findCheckRspStatus(pInfo, *pTaskId, &p);
2,039✔
706
    if (p != NULL) {
2,039!
707
      p->rspTs = 0;
2,039✔
708
      p->status = -1;
2,039✔
709
      int32_t code = doSendCheckMsg(pTask, p);
2,039✔
710
    }
711
  }
712

713
  pInfo->notReadyRetryCount += 1;
1,162✔
714
  stDebug("s-task:%s vgId:%d %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
1,162✔
715
          vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
716
}
1,162✔
717

718
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
719
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
720
// of restart in timer thread will result in a dead lock.
721
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
1✔
722
  return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK);
1✔
723
}
724

725
static void doCleanup(SStreamTask* pTask, SArray* pNotReadyList, SArray* pTimeoutList, void* param) {
8,716✔
726
  streamMetaReleaseTask(pTask->pMeta, pTask);
8,716✔
727

728
  taosArrayDestroy(pNotReadyList);
8,716✔
729
  taosArrayDestroy(pTimeoutList);
8,716✔
730
  streamTaskFreeRefId(param);
8,716✔
731
}
8,716✔
732

733
// this function is executed in timer thread
734
void rspMonitorFn(void* param, void* tmrId) {
8,717✔
735
  int32_t         numOfReady = 0;
8,717✔
736
  int32_t         numOfFault = 0;
8,717✔
737
  int32_t         numOfNotRsp = 0;
8,717✔
738
  int32_t         numOfNotReady = 0;
8,717✔
739
  int32_t         numOfTimeout = 0;
8,717✔
740
  int64_t         taskRefId = *(int64_t*)param;
8,717✔
741
  int64_t         now = taosGetTimestampMs();
8,717✔
742
  SArray*         pNotReadyList = NULL;
8,717✔
743
  SArray*         pTimeoutList = NULL;
8,717✔
744
  SStreamMeta*    pMeta = NULL;
8,717✔
745
  STaskCheckInfo* pInfo = NULL;
8,717✔
746
  int32_t         vgId = -1;
8,717✔
747
  int64_t         timeoutDuration = 0;
8,717✔
748
  const char*     id = NULL;
8,717✔
749
  int32_t         total = 0;
8,717✔
750

751
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
8,717✔
752
  if (pTask == NULL) {
8,717✔
753
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
1!
754
    streamTaskFreeRefId(param);
1✔
755
    return;
6,555✔
756
  }
757

758
  pMeta = pTask->pMeta;
8,716✔
759
  pInfo = &pTask->taskCheckInfo;
8,716✔
760
  vgId = pTask->pMeta->vgId;
8,716✔
761
  timeoutDuration = now - pInfo->timeoutStartTs;
8,716✔
762
  id = pTask->id.idStr;
8,716✔
763
  total = (int32_t) taosArrayGetSize(pInfo->pList);
8,716✔
764

765
  stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
8,716✔
766

767
  streamMutexLock(&pTask->lock);
8,716✔
768
  SStreamTaskState state = streamTaskGetStatus(pTask);
8,716✔
769
  streamMutexUnlock(&pTask->lock);
8,716✔
770

771
  if (state.state == TASK_STATUS__STOP) {
8,716✔
772
    stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
1!
773
    streamTaskCompleteCheckRsp(pInfo, true, id);
1✔
774

775
    // not record the failure of the current task if try to close current vnode
776
    // otherwise, the put of message operation may incur invalid read of message queue.
777
    if (!pMeta->closeFlag) {
1!
778
      int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
1✔
779
      if (code) {
1!
780
        stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
×
781
      }
782
    }
783

784
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
1✔
785
    return;
1✔
786
  }
787

788
  if (state.state == TASK_STATUS__DROPPING || state.state == TASK_STATUS__READY) {
8,715!
789
    stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
4,271✔
790

791
    streamTaskCompleteCheckRsp(pInfo, true, id);
4,271✔
792
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
4,271✔
793
    return;
4,271✔
794
  }
795

796
  streamMutexLock(&pInfo->checkInfoLock);
4,444✔
797
  if (pInfo->notReadyTasks == 0) {
4,444✔
798
    stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr", id, state.name, vgId);
2,247✔
799

800
    streamTaskCompleteCheckRsp(pInfo, false, id);
2,247✔
801
    streamMutexUnlock(&pInfo->checkInfoLock);
2,247✔
802
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
2,247✔
803
    return;
2,247✔
804
  }
805

806
  pNotReadyList = taosArrayInit(4, sizeof(int64_t));
2,197✔
807
  pTimeoutList = taosArrayInit(4, sizeof(int64_t));
2,197✔
808

809
  if (state.state == TASK_STATUS__UNINIT) {
2,197!
810
    getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
2,197✔
811

812
    numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
2,197✔
813
    numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
2,197✔
814

815
    // fault tasks detected, not try anymore
816
    bool jumpOut = false;
2,197✔
817
    if ((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) != total) {
2,197!
818
      stError(
×
819
          "s-task:%s vgId:%d internal error in handling the check downstream procedure, rsp number is inconsistent, "
820
          "stop rspMonitor tmr, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
821
          id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
822
      jumpOut = true;
×
823
    }
824

825
    if (numOfFault > 0) {
2,197✔
826
      stDebug(
35!
827
          "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
828
          "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
829
          id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
830
      jumpOut = true;
35✔
831
    }
832

833
    if (jumpOut) {
2,197✔
834
      streamTaskCompleteCheckRsp(pInfo, false, id);
35✔
835
      streamMutexUnlock(&pInfo->checkInfoLock);
35✔
836
      doCleanup(pTask, pNotReadyList, pTimeoutList, param);
35✔
837
      return;
35✔
838
    }
839
  } else {  // unexpected status
840
    stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, state.name);
×
841
  }
842

843
  // checking of downstream tasks has been stopped by other threads
844
  if (pInfo->stopCheckProcess == 1) {
2,162!
845
    stDebug(
×
846
        "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, "
847
        "notReady:%d, fault:%d, timeout:%d, ready:%d",
848
        id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
849

850
    streamTaskCompleteCheckRsp(pInfo, false, id);
×
851
    streamMutexUnlock(&pInfo->checkInfoLock);
×
852

853
    int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
×
854
    if (code) {
×
855
      stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
×
856
    }
857

858
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
859
    return;
×
860
  }
861

862
  if (numOfNotReady > 0) {  // check to make sure not in recheck timer
2,162✔
863
    handleNotReadyDownstreamTask(pTask, pNotReadyList);
1,162✔
864
  }
865

866
  if (numOfTimeout > 0) {
2,162✔
867
    handleTimeoutDownstreamTasks(pTask, pTimeoutList);
2✔
868
  }
869

870
  streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, param, streamTimer, &pInfo->checkRspTmr, vgId,
2,162✔
871
                 "check-status-monitor");
872
  streamMutexUnlock(&pInfo->checkInfoLock);
2,162✔
873

874
  stDebug(
2,162✔
875
      "s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, "
876
      "ready:%d",
877
      id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
878
  doCleanup(pTask, pNotReadyList, pTimeoutList, NULL);
2,162✔
879
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc