• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.14
/source/libs/stream/src/streamCheckStatus.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cos.h"
17
#include "rsync.h"
18
#include "streamBackendRocksdb.h"
19
#include "streamInt.h"
20

21
#define CHECK_NOT_RSP_DURATION 10 * 1000  // 10 sec
22

23
static void    processDownstreamReadyRsp(SStreamTask* pTask);
24
static void    rspMonitorFn(void* param, void* tmrId);
25
static void    streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
26
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
27
static void    streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
28
static void    streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id);
29
static int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
30
static void    handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList);
31
static void    handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList);
32
static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs,
33
                                         int64_t reqId, int32_t* pNotReady, const char* id);
34
static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId);
35
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
36
                              int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
37
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId);
38
static void    findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo);
39

40
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
21,201✔
41
                              int64_t* oldStage) {
42
  SStreamUpstreamEpInfo* pInfo = NULL;
21,201✔
43
  streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo);
21,201✔
44
  if (pInfo == NULL) {
21,188!
45
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
46
  }
47

48
  *oldStage = pInfo->stage;
21,188✔
49
  const char* id = pTask->id.idStr;
21,188✔
50
  if (stage == -1) {
21,188!
51
    stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
×
52
            upstreamTaskId, vgId, stage);
53
    return 0;
×
54
  }
55

56
  if (pInfo->stage == -1) {
21,188✔
57
    pInfo->stage = stage;
19,391✔
58
    stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id,
19,391✔
59
            upstreamTaskId, vgId, stage);
60
  }
61

62
  if (pInfo->stage < stage) {
21,188✔
63
    stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
42!
64
            ", prev:%" PRId64,
65
            id, upstreamTaskId, vgId, stage, pInfo->stage);
66
    // record the checkpoint failure id and sent to mnode
67
    streamTaskSetCheckpointFailed(pTask);
42✔
68
  }
69

70
  if (pInfo->stage != stage) {
21,186✔
71
    return TASK_UPSTREAM_NEW_STAGE;
42✔
72
  } else if (pTask->status.downstreamReady != 1) {
21,144✔
73
    stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
1,788✔
74
    return TASK_DOWNSTREAM_NOT_READY;
1,788✔
75
  } else {
76
    return TASK_DOWNSTREAM_READY;
19,356✔
77
  }
78
}
79

80
// check status
81
void streamTaskSendCheckMsg(SStreamTask* pTask) {
13,595✔
82
  SDataRange*  pRange = &pTask->dataRange;
13,595✔
83
  STimeWindow* pWindow = &pRange->window;
13,595✔
84
  const char*  idstr = pTask->id.idStr;
13,595✔
85
  int32_t      code = 0;
13,595✔
86

87
  SStreamTaskCheckReq req = {
13,595✔
88
      .streamId = pTask->id.streamId,
13,595✔
89
      .upstreamTaskId = pTask->id.taskId,
13,595✔
90
      .upstreamNodeId = pTask->info.nodeId,
13,595✔
91
      .childId = pTask->info.selfChildId,
13,595✔
92
      .stage = pTask->pMeta->stage,
13,595✔
93
  };
94

95
  // serialize streamProcessScanHistoryFinishRsp
96
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
13,595✔
97
    streamTaskStartMonitorCheckRsp(pTask);
914✔
98

99
    STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
914✔
100

101
    setCheckDownstreamReqInfo(&req, tGenIdPI64(), pDispatch->taskId, pDispatch->nodeId);
914✔
102
    streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr);
914✔
103

104
    stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
914✔
105
            " window:%" PRId64 "-%" PRId64 " QID:0x%" PRIx64,
106
            idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
107
            pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
108

109
    code = streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
914✔
110
                              &pTask->outputInfo.fixedDispatcher.epSet);
111

112
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
12,681✔
113
    streamTaskStartMonitorCheckRsp(pTask);
5,980✔
114

115
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
5,980✔
116

117
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
5,980✔
118
    stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, idstr,
5,980✔
119
            numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
120

121
    for (int32_t i = 0; i < numOfVgs; i++) {
24,605✔
122
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
18,625✔
123
      if (pVgInfo == NULL) {
18,625!
124
        continue;
×
125
      }
126

127
      setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId);
18,625✔
128
      streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
18,623✔
129

130
      stDebug("s-task:%s (vgId:%d) stage:%" PRId64
18,624✔
131
              " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, QID:0x%" PRIx64,
132
              idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
133
      code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
18,624✔
134
    }
135
  } else {  // for sink task, set it ready directly.
136
    stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
6,701✔
137
    streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
6,701✔
138
    processDownstreamReadyRsp(pTask);
6,701✔
139
  }
140

141
  if (code) {
13,594!
142
    stError("s-task:%s failed to send check msg to downstream, code:%s", idstr, tstrerror(code));
×
143
  }
144
}
13,594✔
145

146
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) {
21,205✔
147
  int32_t taskId = pReq->downstreamTaskId;
21,205✔
148

149
  *pRsp = (SStreamTaskCheckRsp){
21,205✔
150
      .reqId = pReq->reqId,
21,205✔
151
      .streamId = pReq->streamId,
21,205✔
152
      .childId = pReq->childId,
21,205✔
153
      .downstreamNodeId = pReq->downstreamNodeId,
21,205✔
154
      .downstreamTaskId = pReq->downstreamTaskId,
21,205✔
155
      .upstreamNodeId = pReq->upstreamNodeId,
21,205✔
156
      .upstreamTaskId = pReq->upstreamTaskId,
21,205✔
157
  };
158

159
  // only the leader node handle the check request
160
  if (pMeta->role == NODE_ROLE_FOLLOWER) {
21,205✔
161
    stError(
1!
162
        "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
163
        taskId, pReq->upstreamTaskId, pReq->upstreamNodeId, pMeta->vgId);
164
    pRsp->status = TASK_DOWNSTREAM_NOT_LEADER;
1✔
165
  } else {
166
    SStreamTask* pTask = NULL;
21,204✔
167
    int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, taskId, &pTask);
21,204✔
168
    if (pTask != NULL) {
21,208!
169
      pRsp->status =
21,187✔
170
          streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
21,208✔
171

172
      SStreamTaskState pState = streamTaskGetStatus(pTask);
21,187✔
173
      stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(QID:0x%" PRIx64
21,189✔
174
              ") task:0x%x (vgId:%d), check_status:%d",
175
              pTask->id.idStr, pState.name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
176
              pRsp->status);
177
      streamMetaReleaseTask(pMeta, pTask);
21,189✔
178
    } else {
UNCOV
179
      pRsp->status = TASK_DOWNSTREAM_NOT_READY;
×
UNCOV
180
      stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(QID:0x%" PRIx64
×
181
              ") from task:0x%x (vgId:%d), rsp check_status %d",
182
              pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status);
183
    }
184
  }
185
}
21,212✔
186

187
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
21,158✔
188
  int64_t         now = taosGetTimestampMs();
21,155✔
189
  const char*     id = pTask->id.idStr;
21,155✔
190
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
21,155✔
191
  int32_t         total = streamTaskGetNumOfDownstream(pTask);
21,155✔
192
  int32_t         left = -1;
21,134✔
193

194
  if (streamTaskShouldStop(pTask)) {
21,134!
195
    stDebug("s-task:%s should stop, do not do check downstream again", id);
×
196
    return TSDB_CODE_SUCCESS;
×
197
  }
198

199
  if (pTask->id.taskId != pRsp->upstreamTaskId) {
21,131!
200
    stError("s-task:%s invalid check downstream rsp, upstream task:0x%x discard", id, pRsp->upstreamTaskId);
×
201
    return TSDB_CODE_INVALID_MSG;
×
202
  }
203

204
  if (pRsp->status == TASK_DOWNSTREAM_READY) {
21,131✔
205
    int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
19,307✔
206
    if (code != TSDB_CODE_SUCCESS) {
19,343!
UNCOV
207
      return TSDB_CODE_SUCCESS;
×
208
    }
209

210
    if (left == 0) {
19,343✔
211
      processDownstreamReadyRsp(pTask);  // all downstream tasks are ready, set the complete check downstream flag
6,793✔
212
      streamTaskStopMonitorCheckRsp(pInfo, id);
6,793✔
213
    } else {
214
      stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
12,550✔
215
              pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
216
    }
217
  } else {  // not ready, wait for 100ms and retry
218
    int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
1,824✔
219
    if (code != TSDB_CODE_SUCCESS) {
1,827✔
220
      return TSDB_CODE_SUCCESS;  // return success in any cases.
3✔
221
    }
222

223
    if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
1,824✔
224
      if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
42✔
225
        stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
41!
226
                ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
227
                id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
228
        code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
41✔
229
      } else {
230
        stError(
1!
231
            "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
232
            "downstream again, nodeUpdate needed",
233
            id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
234
        code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
1✔
235
      }
236

237
      streamMetaAddFailedTaskSelf(pTask, now);
42✔
238
    } else {  // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
239
      stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
1,782✔
240
              pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
241
    }
242
  }
243

244
  return 0;
21,170✔
245
}
246

247
int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
21,200✔
248
                               SRpcHandleInfo* pRpcInfo, int32_t taskId) {
249
  SEncoder encoder;
250
  int32_t  code = 0;
21,200✔
251
  int32_t  len;
252

253
  tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
21,200!
254
  if (code < 0) {
21,191!
255
    stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
×
256
    return TSDB_CODE_INVALID_MSG;
×
257
  }
258

259
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
21,191✔
260
  if (buf == NULL) {
21,207!
261
    stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__,
×
262
            tstrerror(code));
263
    return terrno;
×
264
  }
265

266
  ((SMsgHead*)buf)->vgId = htonl(vgId);
21,207✔
267

268
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
21,207✔
269
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
21,207✔
270
  code = tEncodeStreamTaskCheckRsp(&encoder, pRsp);
21,195✔
271
  tEncoderClear(&encoder);
21,208✔
272

273
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
21,218✔
274
  tmsgSendRsp(&rspMsg);
21,218✔
275

276
  code = TMIN(code, 0);
21,219✔
277
  return code;
21,219✔
278
}
279

280
void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
6,894✔
281
  int32_t         vgId = pTask->pMeta->vgId;
6,894✔
282
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
6,894✔
283

284
  streamMutexLock(&pInfo->checkInfoLock);
6,894✔
285

286
  // drop procedure already started, not start check downstream now
287
  ETaskStatus s = streamTaskGetStatus(pTask).state;
6,894✔
288
  if (s == TASK_STATUS__DROPPING) {
6,894!
289
    stDebug("s-task:%s task not in uninit status, status:%s not start monitor check-rsp", pTask->id.idStr,
×
290
            streamTaskGetStatusStr(s));
291
    streamMutexUnlock(&pInfo->checkInfoLock);
×
292
    return;
×
293
  }
294

295
  int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
6,894✔
296
  if (code != TSDB_CODE_SUCCESS) {
6,894!
297
    streamMutexUnlock(&pInfo->checkInfoLock);
×
298
    return;
×
299
  }
300

301
  streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
6,894✔
302

303
  int64_t* pTaskRefId = NULL;
6,892✔
304
  code = streamTaskAllocRefId(pTask, &pTaskRefId);
6,892✔
305
  if (code == 0) {
6,894!
306
    streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTaskRefId, streamTimer, &pInfo->checkRspTmr, vgId,
6,894✔
307
                   "check-status-monitor");
308
  }
309

310
  streamMutexUnlock(&pInfo->checkInfoLock);
6,894✔
311
}
312

313
void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
13,598✔
314
  streamMutexLock(&pInfo->checkInfoLock);
13,598✔
315
  pInfo->stopCheckProcess = 1;
13,598✔
316
  streamMutexUnlock(&pInfo->checkInfoLock);
13,598✔
317

318
  stDebug("s-task:%s set stop check-rsp monitor flag", id);
13,597✔
319
}
13,597✔
320

321
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
55,033✔
322
  taosArrayDestroy(pInfo->pList);
55,033✔
323
  pInfo->pList = NULL;
55,038✔
324

325
  if (pInfo->checkRspTmr != NULL) {
55,038✔
326
    streamTmrStop(pInfo->checkRspTmr);
6,765✔
327
    pInfo->checkRspTmr = NULL;
6,764✔
328
  }
329

330
  streamMutexDestroy(&pInfo->checkInfoLock);
55,037✔
331
}
55,032✔
332

333
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
334
void processDownstreamReadyRsp(SStreamTask* pTask) {
13,493✔
335
  EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
13,493✔
336
  int32_t          code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
13,493✔
337
  if (code) {
13,491!
UNCOV
338
    stError("s-task:%s failed to set event succ, code:%s", pTask->id.idStr, tstrerror(code));
×
339
  }
340

341
  int64_t checkTs = pTask->execInfo.checkTs;
13,491✔
342
  int64_t readyTs = pTask->execInfo.readyTs;
13,491✔
343
  code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
13,491✔
344
  if (code) {
13,494✔
345
    stError("s-task:%s failed to record the downstream task status, code:%s", pTask->id.idStr, tstrerror(code));
2!
346
  }
347

348
  if (pTask->status.taskStatus == TASK_STATUS__HALT) {
13,493✔
349
    if (!HAS_RELATED_FILLHISTORY_TASK(pTask) || (pTask->info.fillHistory != 0)) {
85!
350
      stError("s-task:%s status:halt fillhistory:%d not handle the ready rsp", pTask->id.idStr,
×
351
              pTask->info.fillHistory);
352
    }
353

354
    // halt it self for count window stream task until the related fill history task completed.
355
    stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
85✔
356
            pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
357
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
85✔
358
    if (code != 0) {  // todo: handle error
85!
359
      stError("s-task:%s failed to handle halt event, code:%s", pTask->id.idStr, tstrerror(code));
×
360
    }
361
  }
362

363
  // start the related fill-history task, when current task is ready
364
  // not invoke in success callback due to the deadlock.
365
  // todo: let's retry
366
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
13,492✔
367
    stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr);
4,804✔
368
    code = streamLaunchFillHistoryTask(pTask);
4,804✔
369
    if (code) {
4,805!
370
      stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
×
371
    }
372
  }
373
}
13,493✔
374

375
int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
121✔
376
  int32_t vgId = pTask->pMeta->vgId;
121✔
377
  int32_t code = 0;
121✔
378
  bool    existed = false;
121✔
379

380
  streamMutexLock(&pTask->lock);
121✔
381

382
  int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
121✔
383
  for (int i = 0; i < num; ++i) {
121✔
384
    SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
80✔
385
    if (p == NULL) {
80!
386
      continue;
×
387
    }
388

389
    if (p->nodeId == nodeId) {
80!
390
      existed = true;
80✔
391
      break;
80✔
392
    }
393
  }
394

395
  if (!existed) {
121✔
396
    SDownstreamTaskEpset t = {.nodeId = nodeId};
41✔
397

398
    void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t);
41✔
399
    if (p == NULL) {
41!
400
      code = terrno;
×
401
      stError("s-task:%s vgId:%d failed to update epset, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
402
    } else {
403
      stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr,
41!
404
             vgId, t.nodeId, (num + 1));
405
    }
406
  }
407

408
  streamMutexUnlock(&pTask->lock);
121✔
409
  return code;
121✔
410
}
411

412
void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
6,894✔
413
  taosArrayClear(pInfo->pList);
6,894✔
414

415
  if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
6,892✔
416
    pInfo->notReadyTasks = 1;
914✔
417
  } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
5,978!
418
    pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
5,978✔
419
  }
420

421
  pInfo->startTs = startTs;
6,892✔
422
  pInfo->timeoutStartTs = startTs;
6,892✔
423
  pInfo->stopCheckProcess = 0;
6,892✔
424
}
6,892✔
425

426
void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo) {
42,494✔
427
  if (pStatusInfo == NULL) {
42,494!
428
    return;
×
429
  }
430

431
  *pStatusInfo = NULL;
42,494✔
432
  for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
135,256✔
433
    SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
92,765✔
434
    if (p == NULL) {
92,762!
435
      continue;
×
436
    }
437

438
    if (p->taskId == taskId) {
92,762✔
439
      *pStatusInfo = p;
22,943✔
440
    }
441
  }
442
}
443

444
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
21,127✔
445
                                  int32_t* pNotReady, const char* id) {
446
  SDownstreamStatusInfo* p = NULL;
21,127✔
447

448
  streamMutexLock(&pInfo->checkInfoLock);
21,127✔
449
  findCheckRspStatus(pInfo, taskId, &p);
21,174✔
450
  if (p != NULL) {
21,159✔
451
    if (reqId != p->reqId) {
21,156!
UNCOV
452
      stError("s-task:%sQID:0x%" PRIx64 " expected:0x%" PRIx64
×
453
              " expired check-rsp recv from downstream task:0x%x, discarded",
454
              id, reqId, p->reqId, taskId);
UNCOV
455
      streamMutexUnlock(&pInfo->checkInfoLock);
×
UNCOV
456
      return TSDB_CODE_FAILED;
×
457
    }
458

459
    // subtract one not-ready-task, since it is ready now
460
    if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
21,156!
461
      *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
19,333✔
462
    } else {
463
      *pNotReady = pInfo->notReadyTasks;
1,823✔
464
    }
465

466
    p->status = status;
21,170✔
467
    p->rspTs = rspTs;
21,170✔
468

469
    streamMutexUnlock(&pInfo->checkInfoLock);
21,170✔
470
    return TSDB_CODE_SUCCESS;
21,168✔
471
  }
472

473
  streamMutexUnlock(&pInfo->checkInfoLock);
3✔
474
  stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x,QID:%" PRIx64 " discarded", id, taskId,
3!
475
          reqId);
476
  return TSDB_CODE_FAILED;
3✔
477
}
478

479
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
6,894✔
480
  if (pInfo->inCheckProcess == 0) {
6,894!
481
    pInfo->inCheckProcess = 1;
6,894✔
482
  } else {
483
    stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
×
484
            pInfo->startTs);
485
    pInfo->stopCheckProcess = 0;  // disable auto stop of check process
×
486
    return TSDB_CODE_FAILED;
×
487
  }
488

489
  stDebug("s-task:%s set the in check-rsp flag", id);
6,894✔
490
  return TSDB_CODE_SUCCESS;
6,894✔
491
}
492

493
void streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) {
6,498✔
494
  if (lock) {
6,498✔
495
    streamMutexLock(&pInfo->checkInfoLock);
3,601✔
496
  }
497

498
  if (pInfo->inCheckProcess) {
6,498!
499
    int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
12,996!
500
    stDebug("s-task:%s clear the in check-rsp flag, set the check-rsp done, elapsed time:%" PRId64 " ms", id, el);
6,498✔
501

502
    pInfo->startTs = 0;
6,498✔
503
    pInfo->timeoutStartTs = 0;
6,498✔
504
    pInfo->notReadyTasks = 0;
6,498✔
505
    pInfo->inCheckProcess = 0;
6,498✔
506
    pInfo->stopCheckProcess = 0;
6,498✔
507

508
    pInfo->notReadyRetryCount = 0;
6,498✔
509
    pInfo->timeoutRetryCount = 0;
6,498✔
510

511
    taosArrayClear(pInfo->pList);
6,498✔
512
  } else {
513
    stDebug("s-task:%s already not in check-rsp procedure", id);
×
514
  }
515

516
  if (lock) {
6,498✔
517
    streamMutexUnlock(&pInfo->checkInfoLock);
3,601✔
518
  }
519
}
6,498✔
520

521
// todo: retry until success
522
void streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) {
19,537✔
523
  SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
19,537✔
524
  streamMutexLock(&pInfo->checkInfoLock);
19,537✔
525

526
  SDownstreamStatusInfo* p = NULL;
19,539✔
527
  findCheckRspStatus(pInfo, taskId, &p);
19,539✔
528
  if (p != NULL) {
19,538!
529
    stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
×
530
    streamMutexUnlock(&pInfo->checkInfoLock);
×
531
    return;
×
532
  }
533

534
  void* px = taosArrayPush(pInfo->pList, &info);
39,075✔
535
  if (px == NULL) {
536
    // todo: retry
537
  }
538

539
  streamMutexUnlock(&pInfo->checkInfoLock);
19,537✔
540
}
541

542
int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
1,784✔
543
  const char* id = pTask->id.idStr;
1,784✔
544
  int32_t     code = 0;
1,784✔
545

546
  SStreamTaskCheckReq req = {
1,784✔
547
      .streamId = pTask->id.streamId,
1,784✔
548
      .upstreamTaskId = pTask->id.taskId,
1,784✔
549
      .upstreamNodeId = pTask->info.nodeId,
1,784✔
550
      .childId = pTask->info.selfChildId,
1,784✔
551
      .stage = pTask->pMeta->stage,
1,784✔
552
  };
553

554
  // update the reqId for the new check msg
555
  p->reqId = tGenIdPI64();
1,784✔
556

557
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
1,784✔
558
  if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
1,784✔
559
    STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
560✔
560
    setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId);
560✔
561

562
    stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) QID:0x%" PRIx64, id,
560✔
563
            pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
564

565
    code = streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
560✔
566
  } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
1,224!
567
    SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
1,224✔
568
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
1,224✔
569

570
    for (int32_t i = 0; i < numOfVgs; i++) {
2,309!
571
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
2,309✔
572
      if (pVgInfo == NULL) {
2,309!
573
        continue;
×
574
      }
575

576
      if (p->taskId == pVgInfo->taskId) {
2,309✔
577
        setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
1,224✔
578

579
        stDebug("s-task:%s (vgId:%d) stage:%" PRId64
1,224✔
580
                " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d QID:0x%" PRIx64,
581
                id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId);
582
        code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
1,224✔
583
        break;
1,224✔
584
      }
585
    }
586
  }
587

588
  if (code) {
1,784!
589
    stError("s-task:%s failed to send check msg to downstream, code:%s", pTask->id.idStr, tstrerror(code));
×
590
  }
591
  return code;
1,784✔
592
}
593

594
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
2,197✔
595
                       int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
596
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
7,545✔
597
    SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
5,348✔
598
    if (p == NULL) {
5,348!
599
      continue;
×
600
    }
601

602
    if (p->status == TASK_DOWNSTREAM_READY) {
5,348✔
603
      (*numOfReady) += 1;
1,348✔
604
    } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
4,000✔
605
      stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
42!
606
              p->taskId);
607
      (*numOfFault) += 1;
42✔
608
    } else {                                 // TASK_DOWNSTREAM_NOT_READY
609
      if (p->rspTs == 0) {                   // not response yet
3,958✔
610
        if (el >= CHECK_NOT_RSP_DURATION) {  // not receive info for 10 sec.
2,178✔
611
          void* px = taosArrayPush(pTimeoutList, &p->taskId);
4✔
612
          if (px == NULL) {
4!
613
            stError("s-task:%s failed to record time out task:0x%x", id, p->taskId);
×
614
          }
615
        } else {                // el < CHECK_NOT_RSP_DURATION
616
          (*numOfNotRsp) += 1;  // do nothing and continue waiting for their rsp
2,174✔
617
        }
618
      } else {
619
        void* px = taosArrayPush(pNotReadyList, &p->taskId);
1,780✔
620
        if (px == NULL) {
1,780!
621
          stError("s-task:%s failed to record not ready task:0x%x", id, p->taskId);
×
622
        }
623
      }
624
    }
625
  }
626
}
2,197✔
627

628
void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId) {
21,321✔
629
  pReq->reqId = reqId;
21,321✔
630
  pReq->downstreamTaskId = dstTaskId;
21,321✔
631
  pReq->downstreamNodeId = dstNodeId;
21,321✔
632
}
21,321✔
633

634
void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
4✔
635
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
4✔
636
  const char*     id = pTask->id.idStr;
4✔
637
  int32_t         vgId = pTask->pMeta->vgId;
4✔
638
  int32_t         numOfTimeout = taosArrayGetSize(pTimeoutList);
4✔
639
  int32_t         code = 0;
4✔
640

641
  pInfo->timeoutStartTs = taosGetTimestampMs();
4✔
642
  for (int32_t i = 0; i < numOfTimeout; ++i) {
8✔
643
    int32_t* px = taosArrayGet(pTimeoutList, i);
4✔
644
    if (px == NULL) {
4!
645
      continue;
×
646
    }
647

648
    int32_t                taskId = *px;
4✔
649
    SDownstreamStatusInfo* p = NULL;
4✔
650
    findCheckRspStatus(pInfo, taskId, &p);
4✔
651

652
    if (p != NULL) {
4!
653
      if (p->status != -1 || p->rspTs != 0) {
4!
654
        stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, id, i, p->status, p->rspTs);
×
655
        continue;
×
656
      }
657
      code = doSendCheckMsg(pTask, p);
4✔
658
    }
659
  }
660

661
  pInfo->timeoutRetryCount += 1;
4✔
662

663
  // timeout more than 100 sec, add into node update list
664
  if (pInfo->timeoutRetryCount > 10) {
4!
665
    pInfo->timeoutRetryCount = 0;
×
666

667
    for (int32_t i = 0; i < numOfTimeout; ++i) {
×
668
      int32_t* pTaskId = taosArrayGet(pTimeoutList, i);
×
669
      if (pTaskId == NULL) {
×
670
        continue;
×
671
      }
672

673
      SDownstreamStatusInfo* p = NULL;
×
674
      findCheckRspStatus(pInfo, *pTaskId, &p);
×
675
      if (p != NULL) {
×
NEW
676
        code = streamTaskAddIntoNodeUpdateList(pTask, p->vgId);
×
NEW
677
        stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpdate list",
×
678
                id, vgId, p->taskId, p->vgId);
679
      }
680
    }
681

682
    stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
×
683
  } else {
684
    stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
4!
685
            vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs);
686
  }
687
}
4✔
688

689
void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
1,117✔
690
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
1,117✔
691
  const char*     id = pTask->id.idStr;
1,117✔
692
  int32_t         vgId = pTask->pMeta->vgId;
1,117✔
693
  int32_t         numOfNotReady = taosArrayGetSize(pNotReadyList);
1,117✔
694

695
  // reset the info, and send the check msg to failure downstream again
696
  for (int32_t i = 0; i < numOfNotReady; ++i) {
2,897✔
697
    int32_t* pTaskId = taosArrayGet(pNotReadyList, i);
1,780✔
698
    if (pTaskId == NULL) {
1,780!
699
      continue;
×
700
    }
701

702
    SDownstreamStatusInfo* p = NULL;
1,780✔
703
    findCheckRspStatus(pInfo, *pTaskId, &p);
1,780✔
704
    if (p != NULL) {
1,780!
705
      p->rspTs = 0;
1,780✔
706
      p->status = -1;
1,780✔
707
      int32_t code = doSendCheckMsg(pTask, p);
1,780✔
708
    }
709
  }
710

711
  pInfo->notReadyRetryCount += 1;
1,117✔
712
  stDebug("s-task:%s vgId:%d %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
1,117✔
713
          vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
714
}
1,117✔
715

716
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
717
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
718
// of restart in timer thread will result in a deadlock.
UNCOV
719
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
×
UNCOV
720
  return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK);
×
721
}
722

723
static void doCleanup(SStreamTask* pTask, SArray* pNotReadyList, SArray* pTimeoutList, void* param) {
8,664✔
724
  streamMetaReleaseTask(pTask->pMeta, pTask);
8,664✔
725

726
  taosArrayDestroy(pNotReadyList);
8,664✔
727
  taosArrayDestroy(pTimeoutList);
8,664✔
728
  streamTaskFreeRefId(param);
8,664✔
729
}
8,664✔
730

731
// this function is executed in timer thread
732
void rspMonitorFn(void* param, void* tmrId) {
8,667✔
733
  int32_t         numOfReady = 0;
8,667✔
734
  int32_t         numOfFault = 0;
8,667✔
735
  int32_t         numOfNotRsp = 0;
8,667✔
736
  int32_t         numOfNotReady = 0;
8,667✔
737
  int32_t         numOfTimeout = 0;
8,667✔
738
  int64_t         taskRefId = *(int64_t*)param;
8,667✔
739
  int64_t         now = taosGetTimestampMs();
8,667✔
740
  SArray*         pNotReadyList = NULL;
8,667✔
741
  SArray*         pTimeoutList = NULL;
8,667✔
742
  SStreamMeta*    pMeta = NULL;
8,667✔
743
  STaskCheckInfo* pInfo = NULL;
8,667✔
744
  int32_t         vgId = -1;
8,667✔
745
  int64_t         timeoutDuration = 0;
8,667✔
746
  const char*     id = NULL;
8,667✔
747
  int32_t         total = 0;
8,667✔
748

749
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
8,667✔
750
  if (pTask == NULL) {
8,667✔
751
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
3!
752
    streamTaskFreeRefId(param);
3✔
753
    return;
6,501✔
754
  }
755

756
  pMeta = pTask->pMeta;
8,664✔
757
  pInfo = &pTask->taskCheckInfo;
8,664✔
758
  vgId = pTask->pMeta->vgId;
8,664✔
759
  timeoutDuration = now - pInfo->timeoutStartTs;
8,664✔
760
  id = pTask->id.idStr;
8,664✔
761
  total = (int32_t) taosArrayGetSize(pInfo->pList);
8,664✔
762

763
  stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
8,664✔
764

765
  streamMutexLock(&pTask->lock);
8,664✔
766
  SStreamTaskState state = streamTaskGetStatus(pTask);
8,664✔
767
  streamMutexUnlock(&pTask->lock);
8,664✔
768

769
  if (state.state == TASK_STATUS__STOP) {
8,664!
UNCOV
770
    stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
×
UNCOV
771
    streamTaskCompleteCheckRsp(pInfo, true, id);
×
772

773
    // not record the failure of the current task if try to close current vnode
774
    // otherwise, the put of message operation may incur invalid read of message queue.
UNCOV
775
    if (!pMeta->closeFlag) {
×
UNCOV
776
      int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
×
UNCOV
777
      if (code) {
×
778
        stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
×
779
      }
780
    }
781

UNCOV
782
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
UNCOV
783
    return;
×
784
  }
785

786
  if (state.state == TASK_STATUS__DROPPING || state.state == TASK_STATUS__READY) {
8,664✔
787
    stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
3,601✔
788

789
    streamTaskCompleteCheckRsp(pInfo, true, id);
3,601✔
790
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
3,601✔
791
    return;
3,601✔
792
  }
793

794
  streamMutexLock(&pInfo->checkInfoLock);
5,063✔
795
  if (pInfo->notReadyTasks == 0) {
5,063✔
796
    stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr", id, state.name, vgId);
2,866✔
797

798
    streamTaskCompleteCheckRsp(pInfo, false, id);
2,866✔
799
    streamMutexUnlock(&pInfo->checkInfoLock);
2,866✔
800
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
2,866✔
801
    return;
2,866✔
802
  }
803

804
  pNotReadyList = taosArrayInit(4, sizeof(int64_t));
2,197✔
805
  pTimeoutList = taosArrayInit(4, sizeof(int64_t));
2,197✔
806

807
  if (state.state == TASK_STATUS__UNINIT) {
2,197!
808
    getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
2,197✔
809

810
    numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
2,197✔
811
    numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
2,197✔
812

813
    // fault tasks detected, not try anymore
814
    bool jumpOut = false;
2,197✔
815
    if ((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) != total) {
2,197!
816
      stError(
×
817
          "s-task:%s vgId:%d internal error in handling the check downstream procedure, rsp number is inconsistent, "
818
          "stop rspMonitor tmr, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
819
          id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
820
      jumpOut = true;
×
821
    }
822

823
    if (numOfFault > 0) {
2,197✔
824
      stDebug(
31!
825
          "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
826
          "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
827
          id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
828
      jumpOut = true;
31✔
829
    }
830

831
    if (jumpOut) {
2,197✔
832
      streamTaskCompleteCheckRsp(pInfo, false, id);
31✔
833
      streamMutexUnlock(&pInfo->checkInfoLock);
31✔
834
      doCleanup(pTask, pNotReadyList, pTimeoutList, param);
31✔
835
      return;
31✔
836
    }
837
  } else {  // unexpected status
838
    stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, state.name);
×
839
  }
840

841
  // checking of downstream tasks has been stopped by other threads
842
  if (pInfo->stopCheckProcess == 1) {
2,166!
843
    stDebug(
×
844
        "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, "
845
        "notReady:%d, fault:%d, timeout:%d, ready:%d",
846
        id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
847

848
    streamTaskCompleteCheckRsp(pInfo, false, id);
×
849
    streamMutexUnlock(&pInfo->checkInfoLock);
×
850

851
    int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
×
852
    if (code) {
×
853
      stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
×
854
    }
855

856
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
857
    return;
×
858
  }
859

860
  if (numOfNotReady > 0) {  // check to make sure not in recheck timer
2,166✔
861
    handleNotReadyDownstreamTask(pTask, pNotReadyList);
1,117✔
862
  }
863

864
  if (numOfTimeout > 0) {
2,166✔
865
    handleTimeoutDownstreamTasks(pTask, pTimeoutList);
4✔
866
  }
867

868
  streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, param, streamTimer, &pInfo->checkRspTmr, vgId,
2,166✔
869
                 "check-status-monitor");
870
  streamMutexUnlock(&pInfo->checkInfoLock);
2,166✔
871

872
  stDebug(
2,166✔
873
      "s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, "
874
      "ready:%d",
875
      id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
876
  doCleanup(pTask, pNotReadyList, pTimeoutList, NULL);
2,166✔
877
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc