• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
You are now the owner of this repo.

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.35
/source/libs/scheduler/src/schTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "query.h"
17
#include "qworker.h"
18
#include "schInt.h"
19
#include "taoserror.h"
20
#include "tglobal.h"
21
#include "tmsg.h"
22
#include "trpc.h"
23
#include "tmisce.h"
24

25
static int32_t schUpdateCurrentEpset(SSchTask *pTask, SSchJob* pJob);
26

27
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
1,280,379,778✔
28
  schDeregisterTaskHb(pJob, pTask);
1,280,379,778✔
29

30
  if (pTask->candidateAddrs) {
1,280,383,598✔
31
    taosArrayDestroy(pTask->candidateAddrs);
1,069,756,386✔
32
  }
33

34
  taosMemoryFreeClear(pTask->msg);
1,280,378,757✔
35

36
  if (pTask->children) {
1,280,381,401✔
37
    taosArrayDestroy(pTask->children);
143,073,627✔
38
  }
39

40
  if (pTask->parents) {
1,280,387,550✔
41
    taosArrayDestroy(pTask->parents);
367,132,284✔
42
  }
43

44
  if (pTask->execNodes) {
1,280,391,495✔
45
    taosHashCleanup(pTask->execNodes);
1,280,387,276✔
46
  }
47

48
  taosArrayDestroy(pTask->profile.execTime);
1,280,389,897✔
49
}
1,280,383,870✔
50

51
void schInitTaskRetryInfo(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
1,280,323,765✔
52
  pTask->redirectCtx.redirectDelayMs = 2000;  // 2s by default
1,280,323,765✔
53

54
  // 3 is the maximum replica factor in tsdb, so here multiply 3 to increase the retry chance
55
  int32_t REPLICA_FACTOR = 3;
1,280,350,098✔
56

57
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
1,280,350,098✔
58
    int32_t retryNum = ceil((tsMaxRetryWaitTime * 1.0) / pTask->redirectCtx.redirectDelayMs);
1,278,855,675✔
59
    pTask->maxRetryTimes = TMAX(retryNum, SCH_DEFAULT_MAX_RETRY_NUM * REPLICA_FACTOR);
1,278,838,116✔
60
  } else {
61
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
1,494,020✔
62
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM * REPLICA_FACTOR);
1,494,020✔
63
  }
64

65
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
1,280,327,112✔
66
}
1,280,329,680✔
67

68
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
1,280,319,743✔
69
  int32_t code = 0;
1,280,319,743✔
70

71
  pTask->plan = pPlan;
1,280,319,743✔
72
  pTask->level = pLevel;
1,280,344,126✔
73
  pTask->seriesId = pJob->seriesId;
1,280,334,413✔
74
  pTask->execId = -1;
1,280,323,914✔
75
  pTask->failedExecId = -2;
1,280,319,561✔
76
  pTask->failedSeriesId = 0;
1,280,316,989✔
77
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
1,280,333,324✔
78
  pTask->clientId = getClientId();
1,280,319,178✔
79
  pTask->taskId = schGenTaskId();
1,280,337,997✔
80

81
  schInitTaskRetryInfo(pJob, pTask, pLevel);
1,280,347,610✔
82

83
  pTask->execNodes =
1,280,393,970✔
84
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1,280,296,632✔
85
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
1,280,382,601✔
86
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
1,280,290,552✔
UNCOV
87
    SCH_ERR_JRET(terrno);
×
88
  }
89

90
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
1,280,333,186✔
91
  pTask->profile.startTs = taosGetTimestampUs();
1,280,389,300✔
92

93
  SCH_TASK_DLOG("task initialized, max retry(exec):%d(%d), max retry duration:%.2fs", pTask->maxRetryTimes,
1,280,370,602✔
94
                pTask->maxExecTimes, (pTask->redirectCtx.redirectDelayMs * pTask->maxRetryTimes) / 1000.0);
95

96
  return TSDB_CODE_SUCCESS;
1,280,355,850✔
97

98
_return:
×
99

100
  taosArrayDestroy(pTask->profile.execTime);
×
101
  taosHashCleanup(pTask->execNodes);
×
102

103
  SCH_RET(code);
×
104
}
105

106
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
1,053,301,533✔
107
  char            buf[256] = {0};
1,053,301,533✔
108
  SQueryNodeAddr *pAddr = NULL;
1,053,308,733✔
109

110
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
1,053,276,889✔
111
    return TSDB_CODE_SUCCESS;
2,508✔
112
  }
113

114
  int32_t code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
1,053,285,451✔
115
  if (code != TSDB_CODE_SUCCESS) {
1,053,334,280✔
116
    SCH_ERR_RET(code);
1,960✔
117
  }
118

119
  pTask->succeedAddr = *pAddr;
1,053,332,320✔
120
  code = epsetToStr(&pAddr->epSet, buf, tListLen(buf));
1,053,347,224✔
121
  if (code == TSDB_CODE_SUCCESS) {
1,053,326,759✔
122
    SCH_TASK_DLOG("recode the success addr:%s", buf);
1,053,302,792✔
123
  } else {
124
    SCH_TASK_ELOG("failed to print epset due to convert to string failed, code:%s, ignore and continue",
23,967✔
125
                  tstrerror(code));
126
  }
127

128
  return TSDB_CODE_SUCCESS;
1,053,314,619✔
129
}
130

131
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
516,102,761✔
132
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
516,102,761✔
133

134
  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
516,105,513✔
135
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", ERRNO);
×
136
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
137
  }
138

139
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
516,426,354✔
140

141
  return TSDB_CODE_SUCCESS;
516,320,653✔
142
}
143

144
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
1,481,379✔
145
  if (NULL == pTask->execNodes) {
1,481,379✔
146
    return TSDB_CODE_SUCCESS;
1,960✔
147
  }
148

149
  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
1,479,419✔
150
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
1,960✔
151
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
1,960✔
152
  } else {
153
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
1,477,459✔
154
  }
155

156
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
1,477,459✔
157
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
1,960✔
158
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
1,960✔
159
  }
160

161
  return TSDB_CODE_SUCCESS;
1,475,499✔
162
}
163

164
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
848,914,507✔
165
  if (taosHashGetSize(pTask->execNodes) <= 0) {
848,914,507✔
166
    return TSDB_CODE_SUCCESS;
×
167
  }
168

169
  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
848,969,722✔
170
  if (NULL == nodeInfo) {  // ignore it
848,979,714✔
171
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
×
172
                  pTask->execId, pTask->waitRetry);
173
    return TSDB_CODE_SUCCESS;
×
174
  }
175

176
  nodeInfo->handle = handle;
848,979,714✔
177

178
  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);
848,983,292✔
179

180
  return TSDB_CODE_SUCCESS;
848,972,344✔
181
}
182

183
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, uint64_t seriesId, int32_t execId) {
850,399,502✔
184
  if (dropExecNode) {
850,399,502✔
185
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
1,475,499✔
186
  }
187

188
  SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId));
848,924,003✔
189

190
  if ((seriesId != pTask->seriesId || seriesId <= pTask->failedSeriesId) || 
848,975,443✔
191
      (execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) {  // ignore it
848,985,236✔
UNCOV
192
    SCH_TASK_DLOG("handle not updated since seriesId:0x%" PRIx64 " or execId:%d is not lastest,"
×
193
                  "current seriesId:0x%" PRIx64 " execId %d, failedSeriesId:0x%" PRIx64 " failedExecId:%d, waitRetry %d", 
194
                  seriesId, execId, pTask->seriesId, pTask->execId, pTask->failedSeriesId, pTask->failedExecId, pTask->waitRetry);
195
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
196
  }
197

198
  SCH_SET_TASK_HANDLE(pTask, handle);
848,980,304✔
199

200
  return TSDB_CODE_SUCCESS;
848,961,879✔
201
}
202

203
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
13,104,950✔
204
  bool    needRetry = false;
13,104,950✔
205
  bool    moved = false;
13,104,950✔
206
  int32_t taskDone = 0;
13,104,950✔
207
  int8_t  jobStatus = 0;
13,104,950✔
208

209
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
13,104,950✔
210
    return TSDB_CODE_SCH_IGNORE_ERROR;
8,618✔
211
  }
212

213
  pTask->failedExecId = pTask->execId;
13,096,332✔
214
  if (schJobNeedToStop(pJob, &jobStatus)) {
13,096,332✔
215
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
36,233✔
216
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
36,233✔
217
  }
218

219
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
13,062,622✔
220
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
13,061,087✔
221
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
1,585✔
222
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
1,161✔
223
  }
224

225
  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
13,058,703✔
226
    SCH_LOG_TASK_WAIT_TS(pTask);
1,960✔
227
  } else {
228
    SCH_LOG_TASK_END_TS(pTask);
13,058,208✔
229
  }
230

231
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
13,060,059✔
232

233
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
13,060,059✔
234

235
  if (!needRetry) {
13,058,713✔
236
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
12,936,357✔
237
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);
12,938,087✔
238

239
    if (SCH_JOB_NEED_WAIT(pJob)) {
12,938,531✔
240
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
703,580✔
241
      pTask->level->taskFailed++;
703,580✔
242
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
703,580✔
243
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
703,580✔
244

245
      schUpdateJobErrCode(pJob, errCode);
703,580✔
246

247
      if (taskDone < pTask->level->taskNum) {
703,580✔
248
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
4,572✔
249
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
4,572✔
250
      }
251

252
      SCH_RET(atomic_load_32(&pJob->errCode));
699,008✔
253
    }
254
  } else {
255
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
122,356✔
256
    return TSDB_CODE_SUCCESS;
120,171✔
257
  }
258

259
  SCH_RET(errCode);
12,234,951✔
260
}
261

262
int32_t schProcessOnSubJobSuccess(SSchJob *pJob) {
91,059,612✔
263
  SSchJob* pParent = pJob->parent;
91,059,612✔
264
  int64_t doneNum = atomic_add_fetch_32(&pParent->subJobDoneNum, 1);
91,059,612✔
265

266
  if (pParent->subJobExecIdx < pParent->subJobs->size) {
91,065,270✔
267
    SCH_RET(schLaunchJobImpl(taosArrayGetP(pParent->subJobs, pParent->subJobExecIdx++)));
40,267,764✔
268
  }
269
  
270
  if (SCH_SUB_JOBS_EXEC_FINISHED(pParent, doneNum)) {
50,797,506✔
271
    return schLaunchJobImpl(pParent);
50,797,506✔
272
  }
273

274
  return TSDB_CODE_SUCCESS;
×
275
}
276

277
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
1,053,331,129✔
278
  bool    moved = false;
1,053,331,129✔
279
  int32_t code = 0;
1,053,331,129✔
280

281
  SCH_TASK_TLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
1,053,331,129✔
282

283
  SCH_LOG_TASK_END_TS(pTask);
1,053,337,517✔
284

285
  int32_t taskDone = atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1);
1,053,368,110✔
286

287
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
1,053,372,457✔
288

289
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
1,053,369,190✔
290
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
1,053,339,069✔
291

292
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
1,053,332,888✔
293
  if (parentNum > 0) {
1,053,349,465✔
294
    int32_t curPhase = SCH_GET_JOB_PHASE(pJob);
254,349,831✔
295
    if (curPhase == QUERY_PHASE_EXEC_DATA_QUERY) {
254,328,449✔
296
      SCH_SET_JOB_PHASE(pJob, QUERY_PHASE_EXEC_WAITING_CHILDREN);
202,837,820✔
297
    }
298
  }
299
  if (parentNum == 0) {
1,053,320,088✔
300
    int32_t taskDone = 0;
798,990,801✔
301
    if (SCH_JOB_NEED_WAIT(pJob)) {
798,990,801✔
302
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
556,886,239✔
303
      pTask->level->taskSucceed++;
556,874,864✔
304
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
556,882,286✔
305
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
556,884,147✔
306

307
      if (taskDone < pTask->level->taskNum) {
556,880,138✔
308
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
19,146,854✔
309
        return TSDB_CODE_SUCCESS;
19,146,843✔
310
      }
311
      
312
      SCH_TASK_TLOG("taskDone number reach level task number, done:%d, total:%d", taskDone, pTask->level->taskNum);
537,738,495✔
313

314
      if (pTask->level->taskFailed > 0) {
537,738,625✔
315
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
2,069✔
316
      }
317

318
      SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
537,720,194✔
319
    }
320
    
321
    pJob->resNode = pTask->succeedAddr;
242,108,008✔
322
    pJob->fetchTask = pTask;
242,108,317✔
323

324
    if (SCH_IS_PARENT_JOB(pJob)) {
242,107,975✔
325
      SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
151,053,016✔
326
    } else {
327
      SCH_RET(schProcessOnSubJobSuccess(pJob));
91,055,113✔
328
    }
329
  }
330

331
  for (int32_t i = 0; i < parentNum; ++i) {
508,687,738✔
332
    SSchTask *pParent = *(SSchTask **)taosArrayGet(pTask->parents, i);
254,325,066✔
333
    if (NULL == pParent) {
254,333,447✔
334
      SCH_TASK_ELOG("fail to get task %d pParent, parentNum:%d", i, parentNum);
×
335
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
336
    }
337

338
    SCH_LOCK(SCH_WRITE, &pParent->planLock);
254,333,447✔
339
    SDownstreamSourceNode source = {
549,677,365✔
340
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
341
        .clientId = pTask->clientId,
254,362,189✔
342
        .taskId = pTask->taskId,
254,362,598✔
343
        .sId = pTask->seriesId,
254,362,715✔
344
        .execId = pTask->execId,
254,361,090✔
345
        .addr = pTask->succeedAddr,
254,362,568✔
346
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
254,361,473✔
347
        .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
254,363,111✔
348
    };
349

350
    code = qSetSubplanExecutionNode(pParent->plan, pTask->plan->id.groupId, &source);
254,363,141✔
351
    if (TSDB_CODE_SUCCESS != code) {
254,357,496✔
352
      SCH_TASK_ELOG("qSetSubplanExecutionNode failed, groupId: %d", pTask->plan->id.groupId);
×
353
    }
354

355
    SCH_UNLOCK(SCH_WRITE, &pParent->planLock);
254,357,496✔
356
    SCH_ERR_RET(code);
254,355,228✔
357

358
    int32_t readyNum = atomic_add_fetch_32(&pParent->childReady, 1);
254,355,228✔
359

360
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, pParent)) {
254,359,123✔
361
      // this is a redirect process for parent task, since the original pParent task has already failed before.
362
      // TODO refactor optimize: update the candidate address
363
      // set the address from the pTask->succeedAddr, the vnode that successfully executed subquery already
364
      if (pParent->redirectCtx.inRedirect && (!SCH_IS_DATA_BIND_TASK(pParent))) {
92,065,861✔
365
        code = schSwitchTaskCandidateAddr(pJob, pParent);
1,460✔
366

367
        // if all vnodes are tried, let's switch the epset for each vnode for the next round
368
        if (pParent->retryTimes > taosArrayGetSize(pParent->candidateAddrs)) {
1,460✔
369
          SCH_ERR_RET(schUpdateCurrentEpset(pParent, pJob));
×
370
        }
371
      }
372

373
      SCH_TASK_DLOG("all %d children task done, start to launch parent task, TID:0x%" PRIx64, readyNum, pParent->taskId);
92,066,404✔
374

375
      pParent->seriesId = pJob->seriesId;
92,067,406✔
376
      TSWAP(pTask, pParent);
92,066,404✔
377
      SCH_TASK_TLOG("task seriesId set to 0x%" PRIx64, pTask->seriesId);
92,065,848✔
378
      TSWAP(pTask, pParent);
92,065,848✔
379

380
      SCH_ERR_RET(schDelayLaunchTask(pJob, pParent));
92,066,404✔
381
    }
382
  }
383

384
  if (taskDone == pTask->level->taskNum) {
254,362,672✔
385
    SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));
83,773,350✔
386
  }
387

388
  return TSDB_CODE_SUCCESS;
254,366,734✔
389
}
390

391
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
1,280,321✔
392
  if (!schMgmt.cfg.enableReSchedule) {
1,280,321✔
393
    return TSDB_CODE_SUCCESS;
1,274,441✔
394
  }
395

396
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
5,880✔
397
    return TSDB_CODE_SUCCESS;
3,920✔
398
  }
399

400
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
1,960✔
401
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
×
402
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
×
403
    schDropTaskOnExecNode(pJob, pTask);
×
404
    taosHashClear(pTask->execNodes);
×
405

406
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
×
407
  }
408

409
  return TSDB_CODE_SUCCESS;
1,960✔
410
}
411

412
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, bool resetRetry) {
4,192,071✔
413
  SSchRedirectCtx *pCtx = &pTask->redirectCtx;
4,192,071✔
414

415
  pCtx->inRedirect = true;
4,192,071✔
416

417
  if (resetRetry) {
4,192,071✔
418
    pCtx->startTs = taosGetTimestampMs();  //always reset startTs & retryTimes for succ task
17,020✔
419
    pTask->retryTimes = 0;
17,020✔
420

421
    SCH_TASK_DLOG("reset succ task retryInfo, start %d/%d redirect retry, delayExec:%.2fs, startTs:%" PRId64,
17,020✔
422
                  pTask->retryTimes, pTask->maxRetryTimes, pTask->delayExecMs / 1000.0, pCtx->startTs);
423
  } else {
424
    double elapsed = 0;
4,175,051✔
425
    if (pCtx->startTs <= 0) {
4,175,051✔
426
      pCtx->startTs = taosGetTimestampMs();
129,498✔
427
    } else {
428
      elapsed = (taosGetTimestampMs() - pCtx->startTs) / 1000.0;
4,045,553✔
429
    }
430

431
    SCH_TASK_DLOG("update failed task redirectCtx, current %d/%d redirect retry, delayExec:%.2fs, startTs:%" PRId64
4,175,051✔
432
                  " ,elapsed:%.2fs",
433
                  pTask->retryTimes, pTask->maxRetryTimes, pTask->delayExecMs / 1000.0, pCtx->startTs, elapsed);
434
  }
435

436
  return TSDB_CODE_SUCCESS;
4,192,071✔
437
}
438

439
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
4,190,111✔
440
  pTask->waitRetry = true;
4,190,111✔
441

442
  if (pTask->delayTimer) {
4,190,111✔
443
    UNUSED(taosTmrStop(pTask->delayTimer));
4,160✔
444
  }
445

446
  schDropTaskOnExecNode(pJob, pTask);
4,190,111✔
447
  taosHashClear(pTask->execNodes);
4,190,111✔
448
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
4,190,111✔
449
  schDeregisterTaskHb(pJob, pTask);
4,190,111✔
450
  taosMemoryFreeClear(pTask->msg);
4,190,111✔
451

452
  pTask->msgLen = 0;
4,190,111✔
453
  pTask->lastMsgType = 0;
4,190,111✔
454
  pTask->childReady = 0;
4,190,111✔
455
  TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
4,190,111✔
456
}
4,190,111✔
457

458
#if 0
459

460
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
461
  int32_t code = 0;
462

463
  SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
464

465
  if (!NO_RET_REDIRECT_ERROR(rspCode)) {
466
    SCH_UPDATE_REDIRECT_CODE(pJob, rspCode);
467
  }
468

469
  SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
470

471
  schResetTaskForRetry(pJob, pTask);
472

473
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
474
    if (pData && pData->pEpSet) {
475
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
476
    } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
477
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
478
      if (NULL == addr) {
479
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
480
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
481
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
482
      }
483

484
      SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
485
      SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
486
                    addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
487
    } else {
488
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
489
      if (NULL == addr) {
490
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
491
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
492
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
493
      }
494

495
      SCH_SWITCH_EPSET(addr);
496
      SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
497
    }
498

499
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
500

501
    SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
502

503
    return TSDB_CODE_SUCCESS;
504
  }
505

506
  // merge plan
507

508
  pTask->childReady = 0;
509

510
  qClearSubplanExecutionNode(pTask->plan);
511

512
  // Note: current error task and upper level merge task
513
  if ((pData && 0 == pData->len) || NULL == pData) {
514
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
515
  }
516

517
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
518

519
  int32_t childrenNum = taosArrayGetSize(pTask->children);
520
  for (int32_t i = 0; i < childrenNum; ++i) {
521
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
522
    SCH_LOCK_TASK(pChild);
523
    (void)schDoTaskRedirect(pJob, pChild, NULL, rspCode);  // error handled internal
524
    SCH_UNLOCK_TASK(pChild);
525
  }
526

527
  return TSDB_CODE_SUCCESS;
528

529
_return:
530

531
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
532
}
533

534
int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) {
535
  SSchLevel *pLevel = pTask->level;
536

537
  SCH_TASK_DLOG("start to reset level for current task set, execDone:%d, launched:%d",
538
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
539

540
  if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
541
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
542
  }
543

544
  (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
545

546
  int32_t childrenNum = taosArrayGetSize(pTask->children);
547
  for (int32_t i = 0; i < childrenNum; ++i) {
548
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
549
    if (NULL == pChild) {
550
      SCH_TASK_ELOG("fail to get the %dth child, childrenNum:%d", i, childrenNum);
551
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
552
    }
553

554
    SCH_LOCK_TASK(pChild);
555
    pLevel = pChild->level;
556
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
557
    (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
558
    SCH_UNLOCK_TASK(pChild);
559
  }
560

561
  SCH_TASK_DLOG("end to reset level for current task set, execDone:%d, launched:%d",
562
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
563

564
  return TSDB_CODE_SUCCESS;
565
}
566

567
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
568
  int32_t code = 0;
569

570
  SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));
571

572
  if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
573
    if (NULL == pData->pEpSet) {
574
      SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
575
      code = TSDB_CODE_INVALID_MSG;
576
      goto _return;
577
    }
578
  }
579

580
  SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
581

582
  SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask));
583

584
  SCH_RESET_JOB_LEVEL_IDX(pJob);
585
  atomic_add_fetch_64(&pJob->seriesId, 1);
586

587
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
588

589
  taosMemoryFreeClear(pData->pData);
590
  taosMemoryFreeClear(pData->pEpSet);
591

592
  SCH_RET(code);
593

594
_return:
595

596
  taosMemoryFreeClear(pData->pData);
597
  taosMemoryFreeClear(pData->pEpSet);
598

599
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
600
}
601
#endif
602

603
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
1,069,696,075✔
604
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
1,069,696,075✔
605
  if (0 != code) {
1,069,817,946✔
606
    if (HASH_NODE_EXIST(code)) {
3,920✔
607
      SCH_TASK_DLOG("task already in execTask list, code:0x%x", code);
1,960✔
608
      return TSDB_CODE_SUCCESS;
1,960✔
609
    }
610

611
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, code:0x%x", code);
1,960✔
612
    SCH_ERR_RET(code);
1,960✔
613
  }
614

615
  SCH_TASK_TLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
1,069,814,026✔
616

617
  return TSDB_CODE_SUCCESS;
1,069,560,607✔
618
}
619

620
/*
621
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
622
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
623
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
624
  } else {
625
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
626
  }
627

628
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
629
  if (0 != code) {
630
    if (HASH_NODE_EXIST(code)) {
631
      *moved = true;
632
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
633
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
634
    }
635

636
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", ERRNO);
637
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
638
  }
639

640
  *moved = true;
641

642
  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
643

644
  return TSDB_CODE_SUCCESS;
645
}
646

647
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
648
  *moved = false;
649

650
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
651
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
652
  }
653

654
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
655
  if (0 != code) {
656
    if (HASH_NODE_EXIST(code)) {
657
      *moved = true;
658

659
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
660
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
661
    }
662

663
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", ERRNO);
664
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
665
  }
666

667
  *moved = true;
668

669
  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
670

671
  return TSDB_CODE_SUCCESS;
672
}
673

674
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
675
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
676
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
677
  }
678

679
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
680
  if (0 != code) {
681
    if (HASH_NODE_EXIST(code)) {
682
      *moved = true;
683

684
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
685
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
686
    }
687

688
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", ERRNO);
689
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
690
  }
691

692
  *moved = true;
693

694
  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
695

696
  return TSDB_CODE_SUCCESS;
697
}
698
*/
699

700
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
13,064,666✔
701
  int64_t now = taosGetTimestampMs();
13,065,834✔
702
  double  el = 0.0;
13,065,834✔
703
  if (pTask->redirectCtx.startTs != 0) {
13,065,834✔
704
    el = (now - pTask->redirectCtx.startTs) / 1000.0;
229,695✔
705
  }
706

707
  if (pJob->noMoreRetry) {
13,065,834✔
708
    *needRetry = false;
114,124✔
709
    SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes,
114,124✔
710
                  pTask->maxRetryTimes);
711
    return TSDB_CODE_SUCCESS;
114,124✔
712
  }
713

714
  // handle transport time out issue
715
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
12,951,710✔
716
    pTask->maxExecTimes++;
7,840✔
717
    pTask->maxRetryTimes++;
7,840✔
718
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
7,840✔
719
      pTask->timeoutUsec *= 2;
3,920✔
720
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
3,920✔
721
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
1,960✔
722
      }
723
    }
724
  }
725

726
  int32_t code = schFailedTaskNeedRetry(pTask, pJob, errCode);
12,951,710✔
727
  if (code != TSDB_CODE_SUCCESS) {
12,949,767✔
728
    *needRetry = false;
1,436✔
729
    return TSDB_CODE_SUCCESS;
1,436✔
730
  }
731

732
  if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) {
12,948,331✔
733
    *needRetry = false;
12,821,111✔
734
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:0x%x - %s", errCode, tstrerror(errCode));
12,821,111✔
735
    return TSDB_CODE_SUCCESS;
12,821,753✔
736
  }
737

738
  *needRetry = true;
127,220✔
739

740
  SCH_TASK_DLOG("task need the %d/%d retry, elapsedTime:%.2fs, errCode:0x%x - %s", pTask->execId + 1,
127,220✔
741
                pTask->maxRetryTimes, el, errCode, tstrerror(errCode));
742
  return TSDB_CODE_SUCCESS;
128,011✔
743
}
744

745
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
122,131✔
746
  (void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
122,131✔
747

748
  if (pTask->delayTimer) {
122,131✔
749
    UNUSED(taosTmrStop(pTask->delayTimer));
103,485✔
750
  }
751

752
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
122,131✔
753
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
122,131✔
754

755
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
122,131✔
756
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
×
757
  }
758

759
  schDeregisterTaskHb(pJob, pTask);
122,131✔
760

761
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
122,131✔
762
    pTask->delayExecMs = pTask->redirectCtx.redirectDelayMs;
120,171✔
763
    if (pTask->redirectCtx.startTs == 0) {
120,171✔
764
      pTask->redirectCtx.startTs = taosGetTimestampMs();
16,576✔
765
    }
766

767
    SCH_ERR_RET(schUpdateCurrentEpset(pTask, pJob));
120,171✔
768
  } else {
769
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
1,960✔
770
  }
771

772
  SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
120,171✔
773
  return TSDB_CODE_SUCCESS;
120,171✔
774
}
775

776
int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
80,602,322✔
777
  int32_t addNum = 0;
80,602,322✔
778
  int32_t nodeNum = 0;
80,602,322✔
779

780
  if (pJob->nodeList) {
80,602,322✔
781
    nodeNum = taosArrayGetSize(pJob->nodeList);
80,599,800✔
782

783
    for (int32_t i = 0; i < nodeNum; ++i) {
260,550,358✔
784
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
179,959,599✔
785
      SQueryNodeAddr *naddr = &nload->addr;
179,964,513✔
786

787
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
359,913,621✔
788
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, ERRNO);
×
789
        SCH_ERR_RET(terrno);
×
790
      }
791

792
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
179,948,545✔
793
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
794
                    SCH_GET_CUR_EP(naddr)->port);
795

796
      ++addNum;
179,950,888✔
797
    }
798
  }
799

800
  if (addNum <= 0) {
80,595,499✔
801
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
1,960✔
802
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
4,782✔
803
  }
804

805
  return TSDB_CODE_SUCCESS;
80,596,348✔
806
}
807

808
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
1,072,827,061✔
809
  if (NULL != pTask->candidateAddrs) {
1,072,827,061✔
810
    return TSDB_CODE_SUCCESS;
4,308,846✔
811
  }
812

813
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
1,068,931,763✔
814
  if (NULL == pTask->candidateAddrs) {
1,069,548,047✔
815
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
19✔
816
    SCH_ERR_RET(terrno);
19✔
817
  }
818

819
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
1,069,530,856✔
820
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
1,977,627,951✔
821
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", ERRNO);
×
822
      SCH_ERR_RET(terrno);
×
823
    }
824

825
    SCH_TASK_TLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
988,691,523✔
826

827
    return TSDB_CODE_SUCCESS;
988,607,850✔
828
  }
829

830
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
80,599,719✔
831
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
66✔
832
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
66✔
833
  }
834

835
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
80,600,100✔
836

837
  pTask->candidateIdx = taosRand() % taosArrayGetSize(pTask->candidateAddrs);
80,597,444✔
838
  return TSDB_CODE_SUCCESS;
80,605,343✔
839
}
840

841
#if 0
842
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
843
  int32_t code = TSDB_CODE_SUCCESS;
844
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
845
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
846
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
847
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
848
  }
849

850
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
851
  if (NULL == pAddr) {
852
    SCH_TASK_ELOG("fail to get task 0th condidataAddr, totalNum:%d", (int32_t)taosArrayGetSize(pTask->candidateAddrs));
853
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
854
  }
855

856
  char *origEpset = NULL;
857
  char *newEpset = NULL;
858

859
  SCH_ERR_RET(schDumpEpSet(&pAddr->epSet, &origEpset));
860
  SCH_ERR_JRET(schDumpEpSet(pEpSet, &newEpset));
861

862
  SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
863

864
  TAOS_MEMCPY(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
865

866
_return:
867

868
  taosMemoryFree(origEpset);
869
  taosMemoryFree(newEpset);
870

871
  return code;
872
}
873
#endif
874

875
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
9,300✔
876
  SQueryNodeAddr *pAddr = NULL;
9,300✔
877
  int32_t         code = 0;
9,300✔
878

879
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
9,300✔
880
  if (candidateNum <= 1) {
9,300✔
881
    goto _return;
3,920✔
882
  }
883

884
  switch (schMgmt.cfg.schPolicy) {
5,380✔
885
    case SCH_LOAD_SEQ:
3,420✔
886
    case SCH_ALL:
887
    default:
888
      if (++pTask->candidateIdx >= candidateNum) {
3,420✔
889
        pTask->candidateIdx = 0;
1,960✔
890
      }
891
      break;
3,420✔
892
    case SCH_RANDOM: {
1,960✔
893
      int32_t lastIdx = pTask->candidateIdx;
1,960✔
894
      while (lastIdx == pTask->candidateIdx) {
6,760✔
895
        pTask->candidateIdx = taosRand() % candidateNum;
4,800✔
896
      }
897
      break;
1,960✔
898
    }
899
  }
900

901
_return:
9,300✔
902
  code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
9,300✔
903
  if (code == TSDB_CODE_SUCCESS) {
9,300✔
904
    SCH_TASK_DLOG("switch task exec on candidateIdx:%d/%d, vgId:%d", pTask->candidateIdx, candidateNum, pAddr->nodeId);
5,380✔
905
  }
906
  return code;
9,300✔
907
}
908

909
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
4,312,242✔
910
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
4,312,242✔
911
  if (code) {
4,312,242✔
912
    SCH_TASK_WLOG("task already not in execTask list, code:0x%x", code);
4,047,513✔
913
  }
914

915
  return TSDB_CODE_SUCCESS;
4,312,242✔
916
}
917

918
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
516,231,179✔
919
  if (NULL == pTask->execNodes) {
516,231,179✔
920
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
1,960✔
921
    return;
1,960✔
922
  }
923

924
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
516,228,618✔
925
  if (size <= 0) {
516,231,146✔
926
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
1,334,447✔
927
    return;
1,334,447✔
928
  }
929

930
  int32_t       i = 0;
514,896,699✔
931
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
514,896,699✔
932
  while (nodeInfo) {
1,029,802,920✔
933
    if (nodeInfo->handle) {
514,909,565✔
934
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
514,856,793✔
935
      void *pExecId = taosHashGetKey(nodeInfo, NULL);
514,856,625✔
936
      (void)schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId);  // ignore error and continue
514,853,844✔
937

938
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
514,841,791✔
939
    } else {
940
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
52,920✔
941
    }
942

943
    ++i;
514,894,711✔
944
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
514,894,711✔
945
  }
946

947
  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
514,893,355✔
948
}
949

950
int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) {
33,043✔
951
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
33,043✔
952
  if (size <= 0) {
33,043✔
953
    SCH_TASK_DLOG("task no exec address to notify, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
1,960✔
954
    return TSDB_CODE_SUCCESS;
1,960✔
955
  }
956

957
  int32_t       i = 0;
31,083✔
958
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
31,083✔
959
  while (nodeInfo) {
62,166✔
960
    if (nodeInfo->handle) {
31,083✔
961
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
31,083✔
962
      SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_TASK_NOTIFY, &type));
31,083✔
963
      SCH_TASK_DLOG("start to notify %d to task's %dth execNode", type, i);
31,083✔
964
    } else {
965
      SCH_TASK_DLOG("no need to notify %d to task %dth execNode", type, i);
×
966
    }
967

968
    ++i;
31,083✔
969
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
31,083✔
970
  }
971

972
  SCH_TASK_DLOG("task has been notified %d on %d exec nodes", type, size);
31,083✔
973
  return TSDB_CODE_SUCCESS;
31,083✔
974
}
975

976
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
262,283,116✔
977
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
262,283,116✔
978
  SSchTask *pTask = NULL;
262,286,945✔
979
  SSchJob  *pJob = NULL;
262,286,797✔
980

981
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
262,286,945✔
982
         pEpId->ep.port);
983

984
  for (int32_t i = 0; i < taskNum; ++i) {
336,230,165✔
985
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
73,937,132✔
986
    if (NULL == pStatus) {
73,936,532✔
987
      qError("fail to get the %dth task status in hb rsp, taskNum:%d", i, taskNum);
×
988
      continue;
×
989
    }
990

991
    int32_t code = 0;
73,936,532✔
992

993
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d task status in server: %s", pStatus->queryId,
73,936,532✔
994
           pStatus->clientId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status));
995

996
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->subJobId, pStatus->taskId)) {
73,937,132✔
997
      continue;
6,521,926✔
998
    }
999

1000
    if (pStatus->execId != pTask->execId) {
67,395,298✔
1001
      // TODO
1002
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
×
1003
      schProcessOnCbEnd(pJob, pTask, 0);
×
1004
      continue;
×
1005
    }
1006

1007
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
67,392,409✔
1008
      // RECORD AND HANDLE ERROR!!!!
1009
      schProcessOnCbEnd(pJob, pTask, 0);
1,651✔
1010
      continue;
1,651✔
1011
    }
1012

1013
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
67,390,075✔
1014
      code = schRescheduleTask(pJob, pTask);
1,274,441✔
1015
    }
1016

1017
    schProcessOnCbEnd(pJob, pTask, code);
67,397,457✔
1018
  }
1019

1020
  return TSDB_CODE_SUCCESS;
262,293,033✔
1021
}
1022

1023
int32_t schHandleExplainRes(SArray *pExplainRes) {
×
1024
  int32_t code = 0;
×
1025
  int32_t resNum = taosArrayGetSize(pExplainRes);
×
1026
  if (resNum <= 0) {
×
1027
    goto _return;
×
1028
  }
1029

1030
  SSchTask *pTask = NULL;
×
1031
  SSchJob  *pJob = NULL;
×
1032

1033
  for (int32_t i = 0; i < resNum; ++i) {
×
1034
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
×
1035
    if (NULL == localRsp) {
×
1036
      qError("fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1037
      continue;
×
1038
    }
1039

1040
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg",
×
1041
           localRsp->qId, localRsp->cId, localRsp->tId);
1042

1043
    pJob = NULL;
×
1044
    (void)schAcquireJob(localRsp->rId, &pJob);
×
1045
    if (NULL == pJob) {
×
1046
      qWarn("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " job no exist, may be dropped, refId:0x%" PRIx64,
×
1047
            localRsp->qId, localRsp->cId, localRsp->tId, localRsp->rId);
1048
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
×
1049
    }
1050

1051
    int8_t status = 0;
×
1052
    if (schJobNeedToStop(pJob, &status)) {
×
1053
      SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
×
1054
      (void)schReleaseJob(pJob->refId);
×
1055
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1056
    }
1057

1058
    code = schGetTaskInJob(pJob, localRsp->tId, &pTask);
×
1059

1060
    if (TSDB_CODE_SUCCESS == code) {
×
1061
      code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
×
1062
    }
1063

1064
    (void)schReleaseJob(pJob->refId);
×
1065

1066
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x",
×
1067
           localRsp->qId, localRsp->cId, localRsp->tId, code);
1068

1069
    SCH_ERR_JRET(code);
×
1070

1071
    localRsp->rsp.numOfPlans = 0;
×
1072
    localRsp->rsp.subplanInfo = NULL;
×
1073
    pTask = NULL;
×
1074
  }
1075

1076
_return:
×
1077

1078
  for (int32_t i = 0; i < resNum; ++i) {
×
1079
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
×
1080
    if (NULL == localRsp) {
×
1081
      qError("in _return fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1082
      continue;
×
1083
    }
1084

1085
    tFreeSExplainRsp(&localRsp->rsp);
×
1086
  }
1087

1088
  taosArrayDestroy(pExplainRes);
×
1089

1090
  SCH_RET(code);
×
1091
}
1092

1093
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
1,073,689,852✔
1094
  SSubplan *plan = pTask->plan;
1,073,689,852✔
1095
  int32_t   code = 0;
1,073,799,024✔
1096

1097
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
1,073,799,024✔
1098
    SCH_LOCK(SCH_WRITE, &pTask->planLock);
1,073,785,269✔
1099
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
1,073,386,437✔
1100
    SCH_UNLOCK(SCH_WRITE, &pTask->planLock);
1,072,873,332✔
1101

1102
    if (TSDB_CODE_SUCCESS != code) {
1,072,668,007✔
1103
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
1,960✔
1104
                    pTask->msgLen);
1105
      SCH_ERR_RET(code);
1,960✔
1106
    } else if (tsQueryPlannerTrace) {
1,072,666,047✔
1107
      char   *msg = NULL;
20,720✔
1108
      int32_t msgLen = 0;
20,720✔
1109
      SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
20,720✔
1110
      SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
20,720✔
1111
      taosMemoryFree(msg);
20,720✔
1112
    }
1113
  }
1114

1115
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
1,072,576,885✔
1116

1117
  if (SCH_IS_QUERY_JOB(pJob)) {
1,073,455,443✔
1118
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
516,000,055✔
1119
  }
1120

1121
  SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType, NULL));
1,073,658,530✔
1122
}
1123

1124
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
2,508✔
1125
  // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
1126
  if (NULL == schMgmt.queryMgmt) {
2,508✔
1127
    void* p = NULL;
57✔
1128
    SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, &p, NULL));
57✔
1129
    if (atomic_val_compare_exchange_ptr(&schMgmt.queryMgmt, NULL, p)) {
57✔
1130
      qWorkerDestroy(&p);
×
1131
    }
1132
  }
1133

1134
  SArray *explainRes = NULL;
2,508✔
1135
  int32_t code = 0;
2,508✔
1136
  SQWMsg  qwMsg = {0};
2,508✔
1137
  qwMsg.msgInfo.taskType = (pJob->attr.type == JOB_TYPE_HQUERY)? TASK_TYPE_HQUERY:TASK_TYPE_QUERY;
2,508✔
1138
  qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
2,508✔
1139
  qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
2,508✔
1140
  qwMsg.msg = pTask->plan;
2,508✔
1141
  qwMsg.msgType = pTask->plan->msgType;
2,508✔
1142
  qwMsg.connInfo.handle = pJob->conn.pTrans;
2,508✔
1143
  qwMsg.pWorkerCb = pJob->pWorkerCb;
2,508✔
1144
  qwMsg.subEndPoints = NULL; // TODO
2,508✔
1145

1146
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
2,508✔
1147
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
×
1148
    if (NULL == explainRes) {
×
1149
      SCH_ERR_RET(terrno);
×
1150
    }
1151
  }
1152

1153
  SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, pJob->seriesId, pJob->queryId, pTask->clientId, pTask->taskId,
2,508✔
1154
                                        pJob->refId, pTask->execId, &qwMsg, explainRes));
1155

1156
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
2,508✔
1157
    SCH_ERR_RET(schHandleExplainRes(explainRes));
×
1158
    explainRes = NULL;
×
1159
  }
1160

1161
  SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
2,508✔
1162

1163
_return:
2,508✔
1164

1165
  taosArrayDestroy(explainRes);
2,508✔
1166

1167
  SCH_RET(code);
2,508✔
1168
}
1169

1170
int32_t schLaunchTaskImpl(void *param) {
1,074,004,328✔
1171
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
1,074,004,328✔
1172
  SSchJob     *pJob = NULL, *pParent = NULL;
1,074,004,328✔
1173

1174
  (void)schAcquireJob(pCtx->jobRid, &pParent);
1,074,001,161✔
1175
  if (NULL == pParent) {
1,074,068,646✔
1176
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
16,779✔
1177
    taosMemoryFree(param);
16,779✔
1178
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
5,670✔
1179
  }
1180

1181
  if (pCtx->subJobId >= 0) {
1,074,051,867✔
1182
    pJob = taosArrayGetP(pParent->subJobs, pCtx->subJobId);
138,936,166✔
1183
    if (NULL == pJob) {
138,913,544✔
1184
      qDebug("subJobId %d not found in subJobs, totalSubJobNum:%d", pCtx->subJobId, (int32_t)taosArrayGetSize(pParent->subJobs));
×
1185
      (void)schReleaseJob(pParent->refId);
×
1186

1187
      taosMemoryFree(param);
×
1188
      SCH_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1189
    }
1190
  } else {
1191
    pJob = pParent;
935,074,347✔
1192
  }
1193

1194
  SSchTask *pTask = pCtx->pTask;
1,073,987,891✔
1195

1196
  if (pCtx->asyncLaunch) {
1,074,019,805✔
1197
    SCH_LOCK_TASK(pTask);
322,101,302✔
1198
  }
1199

1200
  pTask->execId++;
1,073,951,894✔
1201
  pTask->retryTimes++;
1,073,984,589✔
1202
  pTask->waitRetry = false;
1,073,926,773✔
1203

1204
  int8_t  status = 0;
1,073,906,485✔
1205
  int32_t code = 0;
1,073,936,046✔
1206

1207
  if (atomic_load_64(&pTask->seriesId) < atomic_load_64(&pJob->seriesId)) {
1,073,936,046✔
1208
    SCH_TASK_DLOG("task seriesId:0x%" PRIx64 " is smaller than job seriesId:0x%" PRIx64 ", skip launch",
×
1209
                  pTask->seriesId, pJob->seriesId);
1210
    goto _return;
×
1211
  }
1212

1213
  (void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
1,073,842,104✔
1214

1215
  SCH_TASK_TLOG("start to launch %s task, execId %d, retry %d",
1,074,033,588✔
1216
                SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
1217

1218
  SCH_LOG_TASK_START_TS(pTask);
2,147,483,647✔
1219

1220
  if (schJobNeedToStop(pJob, &status)) {
1,073,901,670✔
1221
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
6,549✔
1222
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
6,549✔
1223
  }
1224

1225
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
1,074,076,796✔
1226
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
1,069,760,841✔
1227
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
1,069,422,545✔
1228
  }
1229

1230
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
1,074,011,898✔
1231
    SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
2,508✔
1232
  } else {
1233
    SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
1,074,016,584✔
1234
  }
1235

1236
#if 0
1237
  if (SCH_IS_QUERY_JOB(pJob)) {
1238
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
1239
  }
1240
#endif
1241

1242
_return:
1,073,817,396✔
1243

1244
  if (pJob && pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
1,073,826,403✔
1245
    if (code) {
322,056,110✔
1246
      code = schProcessOnTaskFailure(pJob, pTask, code);
6,549✔
1247
    }
1248
    if (code) {
322,056,110✔
1249
      code = schHandleJobFailure(pJob, code);
6,549✔
1250
    }
1251
  }
1252

1253
  if (pCtx->asyncLaunch) {
1,073,829,462✔
1254
    SCH_UNLOCK_TASK(pTask);
322,068,996✔
1255
  }
1256

1257
  (void)schReleaseJob(pParent->refId);
1,073,773,123✔
1258

1259
  taosMemoryFree(param);
1,074,081,527✔
1260

1261
  SCH_RET(code);
1,074,050,168✔
1262
}
1263

1264
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
1,074,021,741✔
1265
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
1,074,021,741✔
1266
  if (NULL == param) {
1,074,009,433✔
1267
    SCH_ERR_RET(terrno);
×
1268
  }
1269

1270
  param->jobRid = pJob->refId;
1,074,009,433✔
1271
  param->subJobId = pJob->subJobId;
1,074,053,475✔
1272
  param->pTask = pTask;
1,074,060,177✔
1273

1274
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
1,074,039,922✔
1275
    param->asyncLaunch = true;
322,161,169✔
1276
    SCH_ERR_RET(taosAsyncExec(schLaunchTaskImpl, param, NULL));
322,161,725✔
1277
  } else {
1278
    SCH_ERR_RET(schLaunchTaskImpl(param));
751,840,322✔
1279
  }
1280

1281
  return TSDB_CODE_SUCCESS;
1,074,064,933✔
1282
}
1283

1284
// Note: no more error processing, handled in function internal
1285
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
1,074,024,442✔
1286
  bool    enough = false;
1,074,024,442✔
1287
  int32_t code = 0;
1,074,043,525✔
1288

1289
  SCH_SET_TASK_HANDLE(pTask, NULL);
1,074,043,525✔
1290

1291
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
1,074,036,750✔
1292
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
41,160✔
1293

1294
    if (enough) {
39,200✔
1295
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
24,408✔
1296
    }
1297
  } else {
1298
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
1,074,004,705✔
1299
  }
1300

1301
  return TSDB_CODE_SUCCESS;
1,074,061,715✔
1302

1303
_return:
1,960✔
1304
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
1,960✔
1305
}
1306

1307
void schHandleTimerEvent(void *param, void *tmrId) {
124,855✔
1308
  SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
124,855✔
1309
  SSchTask       *pTask = NULL;
124,855✔
1310
  SSchJob        *pJob = NULL;
124,855✔
1311
  int32_t         code = 0;
124,855✔
1312

1313
  qDebug("delayTimer:%" PRIuPTR " is launched", (uintptr_t)tmrId);
124,855✔
1314

1315
  int64_t  rId = pTimerParam->rId;
124,855✔
1316
  uint64_t queryId = pTimerParam->queryId;
124,855✔
1317
  uint64_t taskId = pTimerParam->taskId;
124,855✔
1318
  int32_t subJobId = pTimerParam->subJobId;
124,855✔
1319

1320
  if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, subJobId, taskId)) {
124,855✔
1321
    return;
1,960✔
1322
  }
1323

1324
  if (0 == atomic_load_8(&pTask->delayLaunchPar.exit)) {
122,895✔
1325
    code = schLaunchTask(pJob, pTask);
122,895✔
1326
  } else {
1327
    SCH_TASK_DLOG("task will not be launched since query job exiting, status: %d", pTask->status);
×
1328
  }
1329

1330
  schProcessOnCbEnd(pJob, pTask, code);
122,895✔
1331
}
1332

1333
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
1,074,022,342✔
1334
  if (pTask->delayExecMs > 0) {
1,074,022,342✔
1335
    pTask->delayLaunchPar.rId = pJob->refId;
126,291✔
1336
    pTask->delayLaunchPar.queryId = pJob->queryId;
126,291✔
1337
    pTask->delayLaunchPar.taskId = pTask->taskId;
126,291✔
1338
    pTask->delayLaunchPar.subJobId = pJob->subJobId;
126,291✔
1339

1340
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
126,291✔
1341
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
126,291✔
1342

1343
    if (NULL == pTask->delayTimer) {
126,291✔
1344
      pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer);
18,646✔
1345
      if (NULL == pTask->delayTimer) {
18,646✔
1346
        SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
1,960✔
1347
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
1,960✔
1348
      }
1349

1350
      SCH_TASK_DLOG("task delayTimer:%" PRIuPTR " is started to launch task after:%.2fs", (uintptr_t)pTask->delayTimer,
16,686✔
1351
                    pTask->delayExecMs/1000.0);
1352
      return TSDB_CODE_SUCCESS;
16,686✔
1353
    }
1354

1355
    if (taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer,
107,645✔
1356
                     &pTask->delayTimer)) {
1357
      SCH_TASK_ELOG("taosTmrReset delayExec timer failed, handle:%p, tmr:%" PRIuPTR, schMgmt.timer,
×
1358
                    (uintptr_t)pTask->delayTimer);
1359
    } else {
1360
      SCH_TASK_DLOG("task start in %.2fs later by handler:%p, tmr:%" PRIuPTR, pTask->delayExecMs / 1000.0,
107,645✔
1361
                    schMgmt.timer, (uintptr_t)pTask->delayTimer);
1362
    }
1363

1364
    return TSDB_CODE_SUCCESS;
107,645✔
1365
  }
1366

1367
  SCH_RET(schLaunchTask(pJob, pTask));
1,073,869,680✔
1368
}
1369

1370
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
796,851,212✔
1371
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
796,851,212✔
1372

1373
  for (int32_t i = 0; i < level->taskNum; ++i) {
1,777,637,518✔
1374
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
980,806,194✔
1375
    pTask->failedSeriesId = pJob->seriesId - 1;
980,785,415✔
1376
    pTask->seriesId = pJob->seriesId;
980,781,594✔
1377
    
1378
    SCH_TASK_TLOG("task sId is set:%" PRIx64, pTask->seriesId);
980,762,812✔
1379
    SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
980,762,812✔
1380
  }
1381

1382
  return TSDB_CODE_SUCCESS;
796,900,292✔
1383
}
1384

1385
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
894,159,242✔
1386
  if (!SCH_JOB_NEED_DROP(pJob)) {
894,159,242✔
1387
    return;
538,502,609✔
1388
  }
1389

1390
  void *pIter = taosHashIterate(list, NULL);
355,663,479✔
1391
  while (pIter) {
867,708,846✔
1392
    SSchTask *pTask = *(SSchTask **)pIter;
512,040,891✔
1393

1394
    if (pTask->delayTimer) {
512,040,830✔
1395
      schStopTaskDelayTimer(pJob, pTask, true);
16,686✔
1396
    }
1397

1398
    SCH_LOCK_TASK(pTask);
512,041,012✔
1399
    schDropTaskOnExecNode(pJob, pTask);
512,041,501✔
1400
    SCH_UNLOCK_TASK(pTask);
512,036,992✔
1401

1402
    pIter = taosHashIterate(list, pIter);
512,034,074✔
1403
  }
1404
}
1405

1406
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pCurrTask) {
4,523✔
1407
  int32_t code = TSDB_CODE_SUCCESS;
4,523✔
1408

1409
  if (NULL != pCurrTask) {
4,523✔
1410
    SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type));
4,523✔
1411
  }
1412
  
1413
  void *pIter = taosHashIterate(list, NULL);
4,523✔
1414
  while (pIter) {
35,606✔
1415
    SSchTask *pTask = *(SSchTask **)pIter;
31,083✔
1416
    if (pTask != pCurrTask) {
31,083✔
1417
      SCH_LOCK_TASK(pTask);
26,560✔
1418
      code = schNotifyTaskOnExecNode(pJob, pTask, type);
26,560✔
1419
      SCH_UNLOCK_TASK(pTask);
26,560✔
1420

1421
      if (TSDB_CODE_SUCCESS != code) {
26,560✔
1422
        break;
×
1423
      }
1424
    }
1425

1426
    pIter = taosHashIterate(list, pIter);
31,083✔
1427
  }
1428

1429
  SCH_RET(code);
4,523✔
1430
}
1431

1432
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
262,649,632✔
1433
  SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL));
262,649,632✔
1434
}
1435

1436
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
4,468✔
1437
  void   *pRsp = NULL;
4,468✔
1438
  int32_t code = 0;
4,468✔
1439
  SArray *explainRes = NULL;
4,468✔
1440

1441
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
4,468✔
1442
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
1,960✔
1443
    if (NULL == explainRes) {
1,960✔
1444
      SCH_ERR_RET(terrno);
×
1445
    }
1446
  }
1447

1448
  SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, pJob->seriesId, pJob->queryId, pTask->clientId, pTask->taskId,
4,468✔
1449
                                        pJob->refId, pTask->execId, &pRsp, explainRes));
1450

1451
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
2,508✔
1452
    SCH_ERR_RET(schHandleExplainRes(explainRes));
×
1453
    explainRes = NULL;
×
1454
  }
1455

1456
  SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
2,508✔
1457

1458
_return:
2,508✔
1459

1460
  taosArrayDestroy(explainRes);
4,468✔
1461

1462
  SCH_RET(code);
4,468✔
1463
}
1464

1465
// Note: no more error processing, handled in function internal
1466
int32_t schLaunchFetchTask(SSchJob *pJob) {
262,656,073✔
1467
  int32_t code = 0;
262,656,073✔
1468

1469
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
262,656,073✔
1470
  if (fetchRes) {
262,656,032✔
1471
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
1,960✔
1472
    return TSDB_CODE_SUCCESS;
1,960✔
1473
  }
1474

1475
  SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH);
262,654,072✔
1476

1477
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
262,654,126✔
1478
    SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
4,468✔
1479
  } else {
1480
    SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
262,649,658✔
1481
  }
1482

1483
  return TSDB_CODE_SUCCESS;
262,652,140✔
1484

1485
_return:
1,960✔
1486

1487
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
1,960✔
1488
}
1489

1490
int32_t schUpdateCurrentEpset(SSchTask *pTask, SSchJob *pJob) {
120,171✔
1491
  char            buf[256] = {0};
120,171✔
1492
  SQueryNodeAddr *pAddr = NULL;
120,171✔
1493
  int32_t         code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
120,171✔
1494
  if (code != TSDB_CODE_SUCCESS) {
120,171✔
1495
    SCH_ERR_RET(code);
×
1496
  }
1497

1498
  // switch to the next ep in the epset
1499
  SCH_SWITCH_EPSET(pAddr);
120,171✔
1500

1501
  int32_t ret = epsetToStr(&pAddr->epSet, buf, tListLen(buf));
120,171✔
1502
  if (ret != 0) {  // print error and continue
120,171✔
1503
    SCH_TASK_ELOG("failed to print vgId:%d epset, code:%s", pAddr->nodeId, tstrerror(ret));
×
1504
  }
1505

1506
  // Wait for a while since the vnode leader/follower switch may cost from several seconds
1507
  // to serveral minitues to complete.
1508
  SCH_TASK_DLOG("vgId:%d switch to next ep:%s to start task delay:%.2fs, startTs:%" PRId64, pAddr->nodeId, buf,
120,171✔
1509
                pTask->delayExecMs / 1000.0, pTask->redirectCtx.startTs);
1510

1511
  return TSDB_CODE_SUCCESS;
120,171✔
1512
}
1513

1514
int32_t schFailedTaskNeedRetry(SSchTask *pTask, SSchJob *pJob, int32_t rspCode) {
17,237,531✔
1515
  double el = (pTask->redirectCtx.startTs > 0) ? (taosGetTimestampMs() - pTask->redirectCtx.startTs) / 1000.0 : 0.0;
21,512,779✔
1516

1517
  // check if the failed task may cause the query job to quit retrying
1518
  if (pTask->retryTimes >= pTask->maxRetryTimes) {
17,237,531✔
1519
    pJob->noMoreRetry = true;
115,560✔
1520
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d, elapsedTime:%.2fs",
115,560✔
1521
                  pTask->retryTimes, pTask->maxRetryTimes, el);
1522
    return rspCode;
115,560✔
1523
  }
1524

1525
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
17,121,971✔
1526
    pJob->noMoreRetry = true;
×
1527
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d, elapsedTime:%.2fs", pTask->execId,
×
1528
                  pTask->maxExecTimes, el);
1529
    return rspCode;
×
1530
  }
1531

1532
  return TSDB_CODE_SUCCESS;
17,121,971✔
1533
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc