• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.74
/source/libs/stream/src/streamCheckStatus.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cos.h"
17
#include "rsync.h"
18
#include "streamBackendRocksdb.h"
19
#include "streamInt.h"
20

21
#define CHECK_NOT_RSP_DURATION 60 * 1000  // 60 sec
22

23
static void    processDownstreamReadyRsp(SStreamTask* pTask);
24
static void    rspMonitorFn(void* param, void* tmrId);
25
static void    streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
26
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
27
static void    streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
28
static void    streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id);
29
static int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
30
static void    handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList);
31
static void    handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList);
32
static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs,
33
                                         int64_t reqId, int32_t* pNotReady, const char* id);
34
static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId);
35
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
36
                              int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
37
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId);
38
static void    findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo);
39

UNCOV
40
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
×
41
                              int64_t* oldStage) {
UNCOV
42
  SStreamUpstreamEpInfo* pInfo = NULL;
×
UNCOV
43
  streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo);
×
UNCOV
44
  if (pInfo == NULL) {
×
45
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
46
  }
47

UNCOV
48
  *oldStage = pInfo->stage;
×
UNCOV
49
  const char* id = pTask->id.idStr;
×
UNCOV
50
  if (stage == -1) {
×
51
    stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
×
52
            upstreamTaskId, vgId, stage);
53
    return 0;
×
54
  }
55

UNCOV
56
  if (pInfo->stage == -1) {
×
UNCOV
57
    pInfo->stage = stage;
×
UNCOV
58
    stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id,
×
59
            upstreamTaskId, vgId, stage);
60
  }
61

UNCOV
62
  if (pInfo->stage < stage) {
×
UNCOV
63
    stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
×
64
            ", prev:%" PRId64,
65
            id, upstreamTaskId, vgId, stage, pInfo->stage);
66
    // record the checkpoint failure id and sent to mnode
UNCOV
67
    streamTaskSetCheckpointFailed(pTask);
×
68
  }
69

UNCOV
70
  if (pInfo->stage != stage) {
×
UNCOV
71
    return TASK_UPSTREAM_NEW_STAGE;
×
UNCOV
72
  } else if (pTask->status.downstreamReady != 1) {
×
UNCOV
73
    stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
×
UNCOV
74
    return TASK_DOWNSTREAM_NOT_READY;
×
75
  } else {
UNCOV
76
    return TASK_DOWNSTREAM_READY;
×
77
  }
78
}
79

80
// check status
UNCOV
81
void streamTaskSendCheckMsg(SStreamTask* pTask) {
×
UNCOV
82
  SDataRange*  pRange = &pTask->dataRange;
×
UNCOV
83
  STimeWindow* pWindow = &pRange->window;
×
UNCOV
84
  const char*  idstr = pTask->id.idStr;
×
UNCOV
85
  int32_t      code = 0;
×
86

UNCOV
87
  SStreamTaskCheckReq req = {
×
UNCOV
88
      .streamId = pTask->id.streamId,
×
UNCOV
89
      .upstreamTaskId = pTask->id.taskId,
×
UNCOV
90
      .upstreamNodeId = pTask->info.nodeId,
×
UNCOV
91
      .childId = pTask->info.selfChildId,
×
UNCOV
92
      .stage = pTask->pMeta->stage,
×
93
  };
94

95
  // serialize streamProcessScanHistoryFinishRsp
UNCOV
96
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
×
UNCOV
97
    streamTaskStartMonitorCheckRsp(pTask);
×
98

UNCOV
99
    STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
×
100

UNCOV
101
    setCheckDownstreamReqInfo(&req, tGenIdPI64(), pDispatch->taskId, pDispatch->nodeId);
×
UNCOV
102
    streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr);
×
103

UNCOV
104
    stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
×
105
            " window:%" PRId64 "-%" PRId64 " QID:0x%" PRIx64,
106
            idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
107
            pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
108

UNCOV
109
    code = streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
×
110
                              &pTask->outputInfo.fixedDispatcher.epSet);
111

UNCOV
112
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
UNCOV
113
    streamTaskStartMonitorCheckRsp(pTask);
×
114

UNCOV
115
    SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
116

UNCOV
117
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
×
UNCOV
118
    stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, idstr,
×
119
            numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
120

UNCOV
121
    for (int32_t i = 0; i < numOfVgs; i++) {
×
UNCOV
122
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
×
UNCOV
123
      if (pVgInfo == NULL) {
×
124
        continue;
×
125
      }
126

UNCOV
127
      setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId);
×
UNCOV
128
      streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
×
129

UNCOV
130
      stDebug("s-task:%s (vgId:%d) stage:%" PRId64
×
131
              " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, QID:0x%" PRIx64,
132
              idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
UNCOV
133
      code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
×
134
    }
135
  } else {  // for sink task, set it ready directly.
UNCOV
136
    stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
×
UNCOV
137
    streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
×
UNCOV
138
    processDownstreamReadyRsp(pTask);
×
139
  }
140

UNCOV
141
  if (code) {
×
142
    stError("s-task:%s failed to send check msg to downstream, code:%s", idstr, tstrerror(code));
×
143
  }
UNCOV
144
}
×
145

UNCOV
146
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) {
×
UNCOV
147
  int32_t taskId = pReq->downstreamTaskId;
×
148

UNCOV
149
  *pRsp = (SStreamTaskCheckRsp){
×
UNCOV
150
      .reqId = pReq->reqId,
×
UNCOV
151
      .streamId = pReq->streamId,
×
UNCOV
152
      .childId = pReq->childId,
×
UNCOV
153
      .downstreamNodeId = pReq->downstreamNodeId,
×
UNCOV
154
      .downstreamTaskId = pReq->downstreamTaskId,
×
UNCOV
155
      .upstreamNodeId = pReq->upstreamNodeId,
×
UNCOV
156
      .upstreamTaskId = pReq->upstreamTaskId,
×
157
  };
158

159
  // only the leader node handle the check request
UNCOV
160
  if (pMeta->role == NODE_ROLE_FOLLOWER) {
×
161
    stError(
×
162
        "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
163
        taskId, pReq->upstreamTaskId, pReq->upstreamNodeId, pMeta->vgId);
164
    pRsp->status = TASK_DOWNSTREAM_NOT_LEADER;
×
165
  } else {
UNCOV
166
    SStreamTask* pTask = NULL;
×
UNCOV
167
    int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, taskId, &pTask);
×
UNCOV
168
    if (pTask != NULL) {
×
UNCOV
169
      pRsp->status =
×
UNCOV
170
          streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
×
171

UNCOV
172
      SStreamTaskState pState = streamTaskGetStatus(pTask);
×
UNCOV
173
      stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(QID:0x%" PRIx64
×
174
              ") task:0x%x (vgId:%d), check_status:%d",
175
              pTask->id.idStr, pState.name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
176
              pRsp->status);
UNCOV
177
      streamMetaReleaseTask(pMeta, pTask);
×
178
    } else {
UNCOV
179
      pRsp->status = TASK_DOWNSTREAM_NOT_READY;
×
UNCOV
180
      stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(QID:0x%" PRIx64
×
181
              ") from task:0x%x (vgId:%d), rsp check_status %d",
182
              pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status);
183
    }
184
  }
UNCOV
185
}
×
186

UNCOV
187
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
×
UNCOV
188
  int64_t         now = taosGetTimestampMs();
×
UNCOV
189
  const char*     id = pTask->id.idStr;
×
UNCOV
190
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
×
UNCOV
191
  int32_t         total = streamTaskGetNumOfDownstream(pTask);
×
UNCOV
192
  int32_t         left = -1;
×
193

UNCOV
194
  if (streamTaskShouldStop(pTask)) {
×
195
    stDebug("s-task:%s should stop, do not do check downstream again", id);
×
196
    return TSDB_CODE_SUCCESS;
×
197
  }
198

UNCOV
199
  if (pTask->id.taskId != pRsp->upstreamTaskId) {
×
200
    stError("s-task:%s invalid check downstream rsp, upstream task:0x%x discard", id, pRsp->upstreamTaskId);
×
201
    return TSDB_CODE_INVALID_MSG;
×
202
  }
203

UNCOV
204
  if (pRsp->status == TASK_DOWNSTREAM_READY) {
×
UNCOV
205
    int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
×
UNCOV
206
    if (code != TSDB_CODE_SUCCESS) {
×
207
      return TSDB_CODE_SUCCESS;
×
208
    }
209

UNCOV
210
    if (left == 0) {
×
UNCOV
211
      processDownstreamReadyRsp(pTask);  // all downstream tasks are ready, set the complete check downstream flag
×
UNCOV
212
      streamTaskStopMonitorCheckRsp(pInfo, id);
×
213
    } else {
UNCOV
214
      stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
×
215
              pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
216
    }
217
  } else {  // not ready, wait for 100ms and retry
UNCOV
218
    int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
×
UNCOV
219
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
220
      return TSDB_CODE_SUCCESS;  // return success in any cases.
×
221
    }
222

UNCOV
223
    if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
×
UNCOV
224
      if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
×
UNCOV
225
        stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
×
226
                ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
227
                id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
UNCOV
228
        code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
×
229
      } else {
230
        stError(
×
231
            "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
232
            "downstream again, nodeUpdate needed",
233
            id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
234
        code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
×
235
      }
236

UNCOV
237
      streamMetaAddFailedTaskSelf(pTask, now);
×
238
    } else {  // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
UNCOV
239
      stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
×
240
              pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
241
    }
242
  }
243

UNCOV
244
  return 0;
×
245
}
246

UNCOV
247
int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
×
248
                               SRpcHandleInfo* pRpcInfo, int32_t taskId) {
249
  SEncoder encoder;
UNCOV
250
  int32_t  code = 0;
×
251
  int32_t  len;
252

UNCOV
253
  tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
×
UNCOV
254
  if (code < 0) {
×
255
    stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
×
256
    return TSDB_CODE_INVALID_MSG;
×
257
  }
258

UNCOV
259
  void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
×
UNCOV
260
  if (buf == NULL) {
×
261
    stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__,
×
262
            tstrerror(code));
263
    return terrno;
×
264
  }
265

UNCOV
266
  ((SMsgHead*)buf)->vgId = htonl(vgId);
×
267

UNCOV
268
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
UNCOV
269
  tEncoderInit(&encoder, (uint8_t*)abuf, len);
×
UNCOV
270
  code = tEncodeStreamTaskCheckRsp(&encoder, pRsp);
×
UNCOV
271
  tEncoderClear(&encoder);
×
272

UNCOV
273
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
×
UNCOV
274
  tmsgSendRsp(&rspMsg);
×
275

UNCOV
276
  code = TMIN(code, 0);
×
UNCOV
277
  return code;
×
278
}
279

UNCOV
280
void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
×
UNCOV
281
  int32_t         vgId = pTask->pMeta->vgId;
×
UNCOV
282
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
×
283

UNCOV
284
  streamMutexLock(&pInfo->checkInfoLock);
×
285

286
  // drop procedure already started, not start check downstream now
UNCOV
287
  ETaskStatus s = streamTaskGetStatus(pTask).state;
×
UNCOV
288
  if (s == TASK_STATUS__DROPPING) {
×
289
    stDebug("s-task:%s task not in uninit status, status:%s not start monitor check-rsp", pTask->id.idStr,
×
290
            streamTaskGetStatusStr(s));
291
    streamMutexUnlock(&pInfo->checkInfoLock);
×
292
    return;
×
293
  }
294

UNCOV
295
  int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
×
UNCOV
296
  if (code != TSDB_CODE_SUCCESS) {
×
297
    streamMutexUnlock(&pInfo->checkInfoLock);
×
298
    return;
×
299
  }
300

UNCOV
301
  streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
×
302

UNCOV
303
  int64_t* pTaskRefId = NULL;
×
UNCOV
304
  code = streamTaskAllocRefId(pTask, &pTaskRefId);
×
UNCOV
305
  if (code == 0) {
×
UNCOV
306
    streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTaskRefId, streamTimer, &pInfo->checkRspTmr, vgId,
×
307
                   "check-status-monitor");
308
  }
309

UNCOV
310
  streamMutexUnlock(&pInfo->checkInfoLock);
×
311
}
312

UNCOV
313
void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
×
UNCOV
314
  streamMutexLock(&pInfo->checkInfoLock);
×
UNCOV
315
  pInfo->stopCheckProcess = 1;
×
UNCOV
316
  streamMutexUnlock(&pInfo->checkInfoLock);
×
317

UNCOV
318
  stDebug("s-task:%s set stop check-rsp monitor flag", id);
×
UNCOV
319
}
×
320

321
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
7✔
322
  taosArrayDestroy(pInfo->pList);
7✔
323
  pInfo->pList = NULL;
7✔
324

325
  if (pInfo->checkRspTmr != NULL) {
7!
UNCOV
326
    streamTmrStop(pInfo->checkRspTmr);
×
UNCOV
327
    pInfo->checkRspTmr = NULL;
×
328
  }
329
}
7✔
330

331
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
UNCOV
332
void processDownstreamReadyRsp(SStreamTask* pTask) {
×
UNCOV
333
  EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
×
UNCOV
334
  int32_t          code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
×
UNCOV
335
  if (code) {
×
UNCOV
336
    stError("s-task:%s failed to set event succ, code:%s", pTask->id.idStr, tstrerror(code));
×
337
  }
338

UNCOV
339
  int64_t checkTs = pTask->execInfo.checkTs;
×
UNCOV
340
  int64_t readyTs = pTask->execInfo.readyTs;
×
UNCOV
341
  code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
×
UNCOV
342
  if (code) {
×
UNCOV
343
    stError("s-task:%s failed to record the downstream task status, code:%s", pTask->id.idStr, tstrerror(code));
×
344
  }
345

UNCOV
346
  if (pTask->status.taskStatus == TASK_STATUS__HALT) {
×
UNCOV
347
    if (!HAS_RELATED_FILLHISTORY_TASK(pTask) || (pTask->info.fillHistory != 0)) {
×
UNCOV
348
      stError("s-task:%s status:halt fillhistory:%d not handle the ready rsp", pTask->id.idStr,
×
349
              pTask->info.fillHistory);
350
    }
351

352
    // halt it self for count window stream task until the related fill history task completed.
UNCOV
353
    stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
×
354
            pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
UNCOV
355
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
×
UNCOV
356
    if (code != 0) {  // todo: handle error
×
UNCOV
357
      stError("s-task:%s failed to handle halt event, code:%s", pTask->id.idStr, tstrerror(code));
×
358
    }
359
  }
360

361
  // start the related fill-history task, when current task is ready
362
  // not invoke in success callback due to the deadlock.
363
  // todo: let's retry
UNCOV
364
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
×
UNCOV
365
    stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr);
×
UNCOV
366
    code = streamLaunchFillHistoryTask(pTask);
×
UNCOV
367
    if (code) {
×
UNCOV
368
      stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
×
369
    }
370
  }
UNCOV
371
}
×
372

UNCOV
373
int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
×
UNCOV
374
  int32_t vgId = pTask->pMeta->vgId;
×
UNCOV
375
  int32_t code = 0;
×
UNCOV
376
  bool    existed = false;
×
377

UNCOV
378
  streamMutexLock(&pTask->lock);
×
379

UNCOV
380
  int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
×
UNCOV
381
  for (int i = 0; i < num; ++i) {
×
UNCOV
382
    SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
×
UNCOV
383
    if (p == NULL) {
×
UNCOV
384
      continue;
×
385
    }
386

UNCOV
387
    if (p->nodeId == nodeId) {
×
UNCOV
388
      existed = true;
×
UNCOV
389
      break;
×
390
    }
391
  }
392

UNCOV
393
  if (!existed) {
×
UNCOV
394
    SDownstreamTaskEpset t = {.nodeId = nodeId};
×
395

UNCOV
396
    void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t);
×
UNCOV
397
    if (p == NULL) {
×
UNCOV
398
      code = terrno;
×
UNCOV
399
      stError("s-task:%s vgId:%d failed to update epset, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
400
    } else {
401
      stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr,
×
402
             vgId, t.nodeId, (num + 1));
403
    }
404
  }
405

UNCOV
406
  streamMutexUnlock(&pTask->lock);
×
UNCOV
407
  return code;
×
408
}
409

UNCOV
410
void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
×
UNCOV
411
  taosArrayClear(pInfo->pList);
×
412

UNCOV
413
  if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
×
UNCOV
414
    pInfo->notReadyTasks = 1;
×
UNCOV
415
  } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
UNCOV
416
    pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
×
417
  }
418

UNCOV
419
  pInfo->startTs = startTs;
×
UNCOV
420
  pInfo->timeoutStartTs = startTs;
×
UNCOV
421
  pInfo->stopCheckProcess = 0;
×
UNCOV
422
}
×
423

UNCOV
424
void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatusInfo** pStatusInfo) {
×
UNCOV
425
  if (pStatusInfo == NULL) {
×
UNCOV
426
    return;
×
427
  }
428

UNCOV
429
  *pStatusInfo = NULL;
×
UNCOV
430
  for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
×
UNCOV
431
    SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
×
UNCOV
432
    if (p == NULL) {
×
UNCOV
433
      continue;
×
434
    }
435

UNCOV
436
    if (p->taskId == taskId) {
×
UNCOV
437
      *pStatusInfo = p;
×
438
    }
439
  }
440
}
441

UNCOV
442
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
×
443
                                  int32_t* pNotReady, const char* id) {
UNCOV
444
  SDownstreamStatusInfo* p = NULL;
×
445

UNCOV
446
  streamMutexLock(&pInfo->checkInfoLock);
×
UNCOV
447
  findCheckRspStatus(pInfo, taskId, &p);
×
UNCOV
448
  if (p != NULL) {
×
UNCOV
449
    if (reqId != p->reqId) {
×
UNCOV
450
      stError("s-task:%sQID:0x%" PRIx64 " expected:0x%" PRIx64
×
451
              " expired check-rsp recv from downstream task:0x%x, discarded",
452
              id, reqId, p->reqId, taskId);
UNCOV
453
      streamMutexUnlock(&pInfo->checkInfoLock);
×
UNCOV
454
      return TSDB_CODE_FAILED;
×
455
    }
456

457
    // subtract one not-ready-task, since it is ready now
UNCOV
458
    if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) {
×
UNCOV
459
      *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
×
460
    } else {
UNCOV
461
      *pNotReady = pInfo->notReadyTasks;
×
462
    }
463

UNCOV
464
    p->status = status;
×
UNCOV
465
    p->rspTs = rspTs;
×
466

UNCOV
467
    streamMutexUnlock(&pInfo->checkInfoLock);
×
UNCOV
468
    return TSDB_CODE_SUCCESS;
×
469
  }
470

UNCOV
471
  streamMutexUnlock(&pInfo->checkInfoLock);
×
UNCOV
472
  stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, QID:%" PRIx64 " discarded", id, taskId,
×
473
          reqId);
UNCOV
474
  return TSDB_CODE_FAILED;
×
475
}
476

UNCOV
477
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
×
UNCOV
478
  if (pInfo->inCheckProcess == 0) {
×
UNCOV
479
    pInfo->inCheckProcess = 1;
×
480
  } else {
UNCOV
481
    stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
×
482
            pInfo->startTs);
483
    pInfo->stopCheckProcess = 0;  // disable auto stop of check process
×
UNCOV
484
    return TSDB_CODE_FAILED;
×
485
  }
486

UNCOV
487
  stDebug("s-task:%s set the in check-rsp flag", id);
×
UNCOV
488
  return TSDB_CODE_SUCCESS;
×
489
}
490

UNCOV
491
void streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) {
×
UNCOV
492
  if (lock) {
×
UNCOV
493
    streamMutexLock(&pInfo->checkInfoLock);
×
494
  }
495

UNCOV
496
  if (pInfo->inCheckProcess) {
×
UNCOV
497
    int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
×
UNCOV
498
    stDebug("s-task:%s clear the in check-rsp flag, set the check-rsp done, elapsed time:%" PRId64 " ms", id, el);
×
499

UNCOV
500
    pInfo->startTs = 0;
×
UNCOV
501
    pInfo->timeoutStartTs = 0;
×
UNCOV
502
    pInfo->notReadyTasks = 0;
×
UNCOV
503
    pInfo->inCheckProcess = 0;
×
UNCOV
504
    pInfo->stopCheckProcess = 0;
×
505

UNCOV
506
    pInfo->notReadyRetryCount = 0;
×
UNCOV
507
    pInfo->timeoutRetryCount = 0;
×
508

UNCOV
509
    taosArrayClear(pInfo->pList);
×
510
  } else {
UNCOV
511
    stDebug("s-task:%s already not in check-rsp procedure", id);
×
512
  }
513

UNCOV
514
  if (lock) {
×
UNCOV
515
    streamMutexUnlock(&pInfo->checkInfoLock);
×
516
  }
UNCOV
517
}
×
518

519
// todo: retry until success
UNCOV
520
void streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) {
×
UNCOV
521
  SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
×
UNCOV
522
  streamMutexLock(&pInfo->checkInfoLock);
×
523

UNCOV
524
  SDownstreamStatusInfo* p = NULL;
×
UNCOV
525
  findCheckRspStatus(pInfo, taskId, &p);
×
UNCOV
526
  if (p != NULL) {
×
UNCOV
527
    stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
×
UNCOV
528
    streamMutexUnlock(&pInfo->checkInfoLock);
×
529
    return;
×
530
  }
531

UNCOV
532
  void* px = taosArrayPush(pInfo->pList, &info);
×
533
  if (px == NULL) {
534
    // todo: retry
535
  }
536

UNCOV
537
  streamMutexUnlock(&pInfo->checkInfoLock);
×
538
}
539

UNCOV
540
int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
×
UNCOV
541
  const char* id = pTask->id.idStr;
×
UNCOV
542
  int32_t     code = 0;
×
543

UNCOV
544
  SStreamTaskCheckReq req = {
×
UNCOV
545
      .streamId = pTask->id.streamId,
×
UNCOV
546
      .upstreamTaskId = pTask->id.taskId,
×
UNCOV
547
      .upstreamNodeId = pTask->info.nodeId,
×
UNCOV
548
      .childId = pTask->info.selfChildId,
×
UNCOV
549
      .stage = pTask->pMeta->stage,
×
550
  };
551

552
  // update the reqId for the new check msg
UNCOV
553
  p->reqId = tGenIdPI64();
×
554

UNCOV
555
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
×
UNCOV
556
  if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
×
UNCOV
557
    STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
×
UNCOV
558
    setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId);
×
559

UNCOV
560
    stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) QID:0x%" PRIx64, id,
×
561
            pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
562

UNCOV
563
    code = streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
×
UNCOV
564
  } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
UNCOV
565
    SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
×
UNCOV
566
    int32_t numOfVgs = taosArrayGetSize(vgInfo);
×
567

UNCOV
568
    for (int32_t i = 0; i < numOfVgs; i++) {
×
UNCOV
569
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
×
UNCOV
570
      if (pVgInfo == NULL) {
×
UNCOV
571
        continue;
×
572
      }
573

UNCOV
574
      if (p->taskId == pVgInfo->taskId) {
×
UNCOV
575
        setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
×
576

UNCOV
577
        stDebug("s-task:%s (vgId:%d) stage:%" PRId64
×
578
                " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d QID:0x%" PRIx64,
579
                id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId);
UNCOV
580
        code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
×
UNCOV
581
        break;
×
582
      }
583
    }
584
  }
585

UNCOV
586
  if (code) {
×
UNCOV
587
    stError("s-task:%s failed to send check msg to downstream, code:%s", pTask->id.idStr, tstrerror(code));
×
588
  }
589
  return code;
×
590
}
591

UNCOV
592
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
×
593
                       int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
UNCOV
594
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
×
UNCOV
595
    SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
×
UNCOV
596
    if (p == NULL) {
×
UNCOV
597
      continue;
×
598
    }
599

UNCOV
600
    if (p->status == TASK_DOWNSTREAM_READY) {
×
UNCOV
601
      (*numOfReady) += 1;
×
UNCOV
602
    } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
×
UNCOV
603
      stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id,
×
604
              p->taskId);
UNCOV
605
      (*numOfFault) += 1;
×
606
    } else {                                 // TASK_DOWNSTREAM_NOT_READY
UNCOV
607
      if (p->rspTs == 0) {                   // not response yet
×
UNCOV
608
        if (el >= CHECK_NOT_RSP_DURATION) {  // not receive info for 10 sec.
×
UNCOV
609
          void* px = taosArrayPush(pTimeoutList, &p->taskId);
×
UNCOV
610
          if (px == NULL) {
×
UNCOV
611
            stError("s-task:%s failed to record time out task:0x%x", id, p->taskId);
×
612
          }
613
        } else {                // el < CHECK_NOT_RSP_DURATION
UNCOV
614
          (*numOfNotRsp) += 1;  // do nothing and continue waiting for their rsp
×
615
        }
616
      } else {
UNCOV
617
        void* px = taosArrayPush(pNotReadyList, &p->taskId);
×
UNCOV
618
        if (px == NULL) {
×
UNCOV
619
          stError("s-task:%s failed to record not ready task:0x%x", id, p->taskId);
×
620
        }
621
      }
622
    }
623
  }
UNCOV
624
}
×
625

UNCOV
626
void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId) {
×
UNCOV
627
  pReq->reqId = reqId;
×
UNCOV
628
  pReq->downstreamTaskId = dstTaskId;
×
UNCOV
629
  pReq->downstreamNodeId = dstNodeId;
×
UNCOV
630
}
×
631

UNCOV
632
void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
×
UNCOV
633
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
×
UNCOV
634
  const char*     id = pTask->id.idStr;
×
UNCOV
635
  int32_t         vgId = pTask->pMeta->vgId;
×
UNCOV
636
  int32_t         numOfTimeout = taosArrayGetSize(pTimeoutList);
×
UNCOV
637
  int32_t         code = 0;
×
638

UNCOV
639
  pInfo->timeoutStartTs = taosGetTimestampMs();
×
UNCOV
640
  for (int32_t i = 0; i < numOfTimeout; ++i) {
×
UNCOV
641
    int32_t* px = taosArrayGet(pTimeoutList, i);
×
UNCOV
642
    if (px == NULL) {
×
UNCOV
643
      continue;
×
644
    }
645

UNCOV
646
    int32_t                taskId = *px;
×
UNCOV
647
    SDownstreamStatusInfo* p = NULL;
×
UNCOV
648
    findCheckRspStatus(pInfo, taskId, &p);
×
649

UNCOV
650
    if (p != NULL) {
×
UNCOV
651
      if (p->status != -1 || p->rspTs != 0) {
×
UNCOV
652
        stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, id, i, p->status, p->rspTs);
×
UNCOV
653
        continue;
×
654
      }
655
      code = doSendCheckMsg(pTask, p);
×
656
    }
657
  }
658

UNCOV
659
  pInfo->timeoutRetryCount += 1;
×
660

661
  // timeout more than 600 sec, add into node update list
UNCOV
662
  if (pInfo->timeoutRetryCount > 10) {
×
UNCOV
663
    pInfo->timeoutRetryCount = 0;
×
664

665
    for (int32_t i = 0; i < numOfTimeout; ++i) {
×
UNCOV
666
      int32_t* pTaskId = taosArrayGet(pTimeoutList, i);
×
667
      if (pTaskId == NULL) {
×
668
        continue;
×
669
      }
670

UNCOV
671
      SDownstreamStatusInfo* p = NULL;
×
UNCOV
672
      findCheckRspStatus(pInfo, *pTaskId, &p);
×
673
      if (p != NULL) {
×
674
        code = streamTaskAddIntoNodeUpdateList(pTask, p->vgId);
×
675
        stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 600sec, add into nodeUpdate list",
×
676
                id, vgId, p->taskId, p->vgId);
677
      }
678
    }
679

UNCOV
680
    stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
×
681
  } else {
682
    stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
×
683
            vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs);
684
  }
UNCOV
685
}
×
686

UNCOV
687
void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
×
UNCOV
688
  STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
×
UNCOV
689
  const char*     id = pTask->id.idStr;
×
UNCOV
690
  int32_t         vgId = pTask->pMeta->vgId;
×
UNCOV
691
  int32_t         numOfNotReady = taosArrayGetSize(pNotReadyList);
×
692

693
  // reset the info, and send the check msg to failure downstream again
UNCOV
694
  for (int32_t i = 0; i < numOfNotReady; ++i) {
×
UNCOV
695
    int32_t* pTaskId = taosArrayGet(pNotReadyList, i);
×
UNCOV
696
    if (pTaskId == NULL) {
×
UNCOV
697
      continue;
×
698
    }
699

UNCOV
700
    SDownstreamStatusInfo* p = NULL;
×
UNCOV
701
    findCheckRspStatus(pInfo, *pTaskId, &p);
×
UNCOV
702
    if (p != NULL) {
×
UNCOV
703
      p->rspTs = 0;
×
UNCOV
704
      p->status = -1;
×
UNCOV
705
      int32_t code = doSendCheckMsg(pTask, p);
×
706
    }
707
  }
708

UNCOV
709
  pInfo->notReadyRetryCount += 1;
×
UNCOV
710
  stDebug("s-task:%s vgId:%d %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
×
711
          vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
UNCOV
712
}
×
713

714
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
715
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
716
// of restart in timer thread will result in a deadlock.
UNCOV
717
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
×
UNCOV
718
  return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK, false);
×
719
}
720

UNCOV
721
static void doCleanup(SStreamTask* pTask, SArray* pNotReadyList, SArray* pTimeoutList, void* param) {
×
UNCOV
722
  streamMetaReleaseTask(pTask->pMeta, pTask);
×
723

UNCOV
724
  taosArrayDestroy(pNotReadyList);
×
UNCOV
725
  taosArrayDestroy(pTimeoutList);
×
UNCOV
726
  streamTaskFreeRefId(param);
×
UNCOV
727
}
×
728

729
// this function is executed in timer thread
UNCOV
730
void rspMonitorFn(void* param, void* tmrId) {
×
UNCOV
731
  int32_t         numOfReady = 0;
×
UNCOV
732
  int32_t         numOfFault = 0;
×
UNCOV
733
  int32_t         numOfNotRsp = 0;
×
UNCOV
734
  int32_t         numOfNotReady = 0;
×
UNCOV
735
  int32_t         numOfTimeout = 0;
×
UNCOV
736
  int64_t         taskRefId = *(int64_t*)param;
×
UNCOV
737
  int64_t         now = taosGetTimestampMs();
×
UNCOV
738
  SArray*         pNotReadyList = NULL;
×
UNCOV
739
  SArray*         pTimeoutList = NULL;
×
UNCOV
740
  SStreamMeta*    pMeta = NULL;
×
UNCOV
741
  STaskCheckInfo* pInfo = NULL;
×
UNCOV
742
  int32_t         vgId = -1;
×
UNCOV
743
  int64_t         timeoutDuration = 0;
×
UNCOV
744
  const char*     id = NULL;
×
UNCOV
745
  int32_t         total = 0;
×
746

UNCOV
747
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
×
UNCOV
748
  if (pTask == NULL) {
×
UNCOV
749
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
×
UNCOV
750
    streamTaskFreeRefId(param);
×
751
    return;
×
752
  }
753

UNCOV
754
  pMeta = pTask->pMeta;
×
UNCOV
755
  pInfo = &pTask->taskCheckInfo;
×
UNCOV
756
  vgId = pTask->pMeta->vgId;
×
UNCOV
757
  timeoutDuration = now - pInfo->timeoutStartTs;
×
UNCOV
758
  id = pTask->id.idStr;
×
UNCOV
759
  total = (int32_t) taosArrayGetSize(pInfo->pList);
×
760

UNCOV
761
  stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
×
762

UNCOV
763
  streamMutexLock(&pTask->lock);
×
UNCOV
764
  SStreamTaskState state = streamTaskGetStatus(pTask);
×
UNCOV
765
  streamMutexUnlock(&pTask->lock);
×
766

UNCOV
767
  if (state.state == TASK_STATUS__STOP) {
×
UNCOV
768
    stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
×
UNCOV
769
    streamTaskCompleteCheckRsp(pInfo, true, id);
×
770

771
    // not record the failure of the current task if try to close current vnode
772
    // otherwise, the put of message operation may incur invalid read of message queue.
UNCOV
773
    if (!pMeta->closeFlag) {
×
UNCOV
774
      int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
×
UNCOV
775
      if (code) {
×
UNCOV
776
        stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
×
777
      }
778
    }
779

UNCOV
780
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
UNCOV
781
    return;
×
782
  }
783

UNCOV
784
  if (state.state == TASK_STATUS__DROPPING || state.state == TASK_STATUS__READY) {
×
UNCOV
785
    stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
×
786

UNCOV
787
    streamTaskCompleteCheckRsp(pInfo, true, id);
×
UNCOV
788
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
UNCOV
789
    return;
×
790
  }
791

UNCOV
792
  streamMutexLock(&pInfo->checkInfoLock);
×
UNCOV
793
  if (pInfo->notReadyTasks == 0) {
×
UNCOV
794
    stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr", id, state.name, vgId);
×
795

UNCOV
796
    streamTaskCompleteCheckRsp(pInfo, false, id);
×
UNCOV
797
    streamMutexUnlock(&pInfo->checkInfoLock);
×
UNCOV
798
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
UNCOV
799
    return;
×
800
  }
801

UNCOV
802
  pNotReadyList = taosArrayInit(4, sizeof(int64_t));
×
UNCOV
803
  pTimeoutList = taosArrayInit(4, sizeof(int64_t));
×
804

UNCOV
805
  if (state.state == TASK_STATUS__UNINIT) {
×
UNCOV
806
    getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
×
807

UNCOV
808
    numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
×
UNCOV
809
    numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
×
810

811
    // fault tasks detected, not try anymore
UNCOV
812
    bool jumpOut = false;
×
UNCOV
813
    if ((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) != total) {
×
UNCOV
814
      stError(
×
815
          "s-task:%s vgId:%d internal error in handling the check downstream procedure, rsp number is inconsistent, "
816
          "stop rspMonitor tmr, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
817
          id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
UNCOV
818
      jumpOut = true;
×
819
    }
820

UNCOV
821
    if (numOfFault > 0) {
×
UNCOV
822
      stDebug(
×
823
          "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
824
          "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
825
          id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
UNCOV
826
      jumpOut = true;
×
827
    }
828

UNCOV
829
    if (jumpOut) {
×
UNCOV
830
      streamTaskCompleteCheckRsp(pInfo, false, id);
×
UNCOV
831
      streamMutexUnlock(&pInfo->checkInfoLock);
×
UNCOV
832
      doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
UNCOV
833
      return;
×
834
    }
835
  } else {  // unexpected status
UNCOV
836
    stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, state.name);
×
837
  }
838

839
  // checking of downstream tasks has been stopped by other threads
UNCOV
840
  if (pInfo->stopCheckProcess == 1) {
×
UNCOV
841
    stDebug(
×
842
        "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, "
843
        "notReady:%d, fault:%d, timeout:%d, ready:%d",
844
        id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
845

UNCOV
846
    streamTaskCompleteCheckRsp(pInfo, false, id);
×
UNCOV
847
    streamMutexUnlock(&pInfo->checkInfoLock);
×
848

849
    int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
×
UNCOV
850
    if (code) {
×
851
      stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
×
852
    }
853

UNCOV
854
    doCleanup(pTask, pNotReadyList, pTimeoutList, param);
×
UNCOV
855
    return;
×
856
  }
857

UNCOV
858
  if (numOfNotReady > 0) {  // check to make sure not in recheck timer
×
UNCOV
859
    handleNotReadyDownstreamTask(pTask, pNotReadyList);
×
860
  }
861

UNCOV
862
  if (numOfTimeout > 0) {
×
UNCOV
863
    handleTimeoutDownstreamTasks(pTask, pTimeoutList);
×
864
  }
865

UNCOV
866
  streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, param, streamTimer, &pInfo->checkRspTmr, vgId,
×
867
                 "check-status-monitor");
UNCOV
868
  streamMutexUnlock(&pInfo->checkInfoLock);
×
869

UNCOV
870
  stDebug(
×
871
      "s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, "
872
      "ready:%d",
873
      id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
UNCOV
874
  doCleanup(pTask, pNotReadyList, pTimeoutList, NULL);
×
875
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc