• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.93
/source/libs/scheduler/src/schTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "query.h"
17
#include "qworker.h"
18
#include "schInt.h"
19
#include "taoserror.h"
20
#include "tglobal.h"
21
#include "tmsg.h"
22
#include "trpc.h"
23
#include "tmisce.h"
24

25
static int32_t schUpdateCurrentEpset(SSchTask *pTask, SSchJob* pJob);
26

27
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
1,504,864,960✔
28
  schDeregisterTaskHb(pJob, pTask);
1,504,864,960✔
29

30
  if (pTask->candidateAddrs) {
1,504,867,662✔
31
    taosArrayDestroy(pTask->candidateAddrs);
1,269,512,107✔
32
  }
33

34
  taosMemoryFreeClear(pTask->msg);
1,504,872,183✔
35

36
  if (pTask->children) {
1,504,870,521✔
37
    taosArrayDestroy(pTask->children);
155,997,774✔
38
  }
39

40
  if (pTask->parents) {
1,504,877,546✔
41
    taosArrayDestroy(pTask->parents);
393,902,257✔
42
  }
43

44
  if (pTask->execNodes) {
1,504,883,738✔
45
    taosHashCleanup(pTask->execNodes);
1,504,864,084✔
46
  }
47

48
  taosArrayDestroy(pTask->profile.execTime);
1,504,862,803✔
49
}
1,504,851,586✔
50

51
void schInitTaskRetryInfo(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
1,504,674,200✔
52
  pTask->redirectCtx.redirectDelayMs = 2000;  // 2s by default
1,504,674,200✔
53

54
  // 3 is the maximum replica factor in tsdb, so here multiply 3 to increase the retry chance
55
  int32_t REPLICA_FACTOR = 3;
1,504,786,403✔
56

57
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
1,504,786,403✔
58
    int32_t retryNum = ceil((tsMaxRetryWaitTime * 1.0) / pTask->redirectCtx.redirectDelayMs);
1,504,775,782✔
59
    pTask->maxRetryTimes = TMAX(retryNum, SCH_DEFAULT_MAX_RETRY_NUM * REPLICA_FACTOR);
1,504,741,349✔
60
  } else {
61
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
7,500✔
62
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM * REPLICA_FACTOR);
7,500✔
63
  }
64

65
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
1,504,735,836✔
66
}
1,504,758,901✔
67

68
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
1,504,664,833✔
69
  int32_t code = 0;
1,504,664,833✔
70

71
  pTask->plan = pPlan;
1,504,664,833✔
72
  pTask->level = pLevel;
1,504,794,930✔
73
  pTask->seriesId = pJob->seriesId;
1,504,768,418✔
74
  pTask->execId = -1;
1,504,773,486✔
75
  pTask->failedExecId = -2;
1,504,738,618✔
76
  pTask->failedSeriesId = 0;
1,504,674,543✔
77
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
1,504,662,445✔
78
  pTask->clientId = getClientId();
1,504,657,571✔
79
  pTask->taskId = schGenTaskId();
1,504,715,299✔
80

81
  schInitTaskRetryInfo(pJob, pTask, pLevel);
1,504,786,002✔
82

83
  pTask->execNodes =
1,504,881,450✔
84
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1,504,699,034✔
85
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
1,504,840,914✔
86
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
1,504,688,801✔
87
    SCH_ERR_JRET(terrno);
5,141✔
88
  }
89

90
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
1,504,797,002✔
91
  pTask->profile.startTs = taosGetTimestampUs();
1,504,860,047✔
92

93
  SCH_TASK_DLOG("task initialized, max retry(exec):%d(%d), max retry duration:%.2fs", pTask->maxRetryTimes,
1,504,838,162✔
94
                pTask->maxExecTimes, (pTask->redirectCtx.redirectDelayMs * pTask->maxRetryTimes) / 1000.0);
95

96
  return TSDB_CODE_SUCCESS;
1,504,752,000✔
97

98
_return:
×
99

100
  taosArrayDestroy(pTask->profile.execTime);
×
101
  taosHashCleanup(pTask->execNodes);
×
102

103
  SCH_RET(code);
×
104
}
105

106
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
1,250,991,970✔
107
  char            buf[256] = {0};
1,250,991,970✔
108
  SQueryNodeAddr *pAddr = NULL;
1,250,997,309✔
109

110
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
1,250,997,640✔
111
    return TSDB_CODE_SUCCESS;
2,829✔
112
  }
113

114
  int32_t code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
1,251,001,003✔
115
  if (code != TSDB_CODE_SUCCESS) {
1,251,051,174✔
116
    SCH_ERR_RET(code);
2,020✔
117
  }
118

119
  pTask->succeedAddr = *pAddr;
1,251,049,154✔
120
  code = epsetToStr(&pAddr->epSet, buf, tListLen(buf));
1,251,062,382✔
121
  if (code == TSDB_CODE_SUCCESS) {
1,251,020,256✔
122
    SCH_TASK_DLOG("recode the success addr:%s", buf);
1,251,022,957✔
123
  } else {
UNCOV
124
    SCH_TASK_ELOG("failed to print epset due to convert to string failed, code:%s, ignore and continue",
×
125
                  tstrerror(code));
126
  }
127

128
  return TSDB_CODE_SUCCESS;
1,251,004,438✔
129
}
130

131
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
553,030,952✔
132
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
553,030,952✔
133

134
  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
553,062,587✔
135
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", ERRNO);
×
136
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
137
  }
138

139
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
553,386,323✔
140

141
  return TSDB_CODE_SUCCESS;
553,302,036✔
142
}
143

144
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
1,659,847✔
145
  if (NULL == pTask->execNodes) {
1,659,847✔
146
    return TSDB_CODE_SUCCESS;
2,020✔
147
  }
148

149
  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
1,657,827✔
150
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
2,020✔
151
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
2,020✔
152
  } else {
153
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
1,655,807✔
154
  }
155

156
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
1,655,807✔
157
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
2,020✔
158
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
2,020✔
159
  }
160

161
  return TSDB_CODE_SUCCESS;
1,653,787✔
162
}
163

164
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
914,153,234✔
165
  if (taosHashGetSize(pTask->execNodes) <= 0) {
914,153,234✔
166
    return TSDB_CODE_SUCCESS;
×
167
  }
168

169
  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
914,202,235✔
170
  if (NULL == nodeInfo) {  // ignore it
914,220,253✔
171
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
×
172
                  pTask->execId, pTask->waitRetry);
173
    return TSDB_CODE_SUCCESS;
×
174
  }
175

176
  nodeInfo->handle = handle;
914,220,253✔
177

178
  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);
914,221,624✔
179

180
  return TSDB_CODE_SUCCESS;
914,213,724✔
181
}
182

183
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, uint64_t seriesId, int32_t execId) {
915,822,694✔
184
  if (dropExecNode) {
915,822,694✔
185
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
1,653,787✔
186
  }
187

188
  SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId));
914,168,907✔
189

190
  if ((seriesId != pTask->seriesId || seriesId <= pTask->failedSeriesId) || 
914,213,674✔
191
      (execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) {  // ignore it
914,215,904✔
192
    SCH_TASK_DLOG("handle not updated since seriesId:0x%" PRIx64 " or execId:%d is not lastest,"
×
193
                  "current seriesId:0x%" PRIx64 " execId %d, failedSeriesId:0x%" PRIx64 " failedExecId:%d, waitRetry %d", 
194
                  seriesId, execId, pTask->seriesId, pTask->execId, pTask->failedSeriesId, pTask->failedExecId, pTask->waitRetry);
195
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
196
  }
197

198
  SCH_SET_TASK_HANDLE(pTask, handle);
914,203,597✔
199

200
  return TSDB_CODE_SUCCESS;
914,188,543✔
201
}
202

203
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
14,688,834✔
204
  bool    needRetry = false;
14,688,834✔
205
  bool    moved = false;
14,688,834✔
206
  int32_t taskDone = 0;
14,688,834✔
207
  int8_t  jobStatus = 0;
14,688,834✔
208

209
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
14,688,260✔
210
    return TSDB_CODE_SCH_IGNORE_ERROR;
16,188✔
211
  }
212

213
  pTask->failedExecId = pTask->execId;
14,672,072✔
214
  if (schJobNeedToStop(pJob, &jobStatus)) {
14,673,220✔
215
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
31,914✔
216
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
31,914✔
217
  }
218

219
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
14,643,290✔
220
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
14,642,137✔
221
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
1,778✔
222
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
1,254✔
223
  }
224

225
  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
14,639,593✔
226
    SCH_LOG_TASK_WAIT_TS(pTask);
2,020✔
227
  } else {
228
    SCH_LOG_TASK_END_TS(pTask);
14,637,667✔
229
  }
230

231
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
14,639,708✔
232

233
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
14,639,708✔
234

235
  if (!needRetry) {
14,638,580✔
236
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
14,485,414✔
237
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);
14,487,250✔
238

239
    if (SCH_JOB_NEED_WAIT(pJob)) {
14,487,503✔
240
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
714,835✔
241
      pTask->level->taskFailed++;
714,835✔
242
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
714,835✔
243
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
714,835✔
244

245
      schUpdateJobErrCode(pJob, errCode);
714,835✔
246

247
      if (taskDone < pTask->level->taskNum) {
714,835✔
248
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
5,438✔
249
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
5,438✔
250
      }
251

252
      SCH_RET(atomic_load_32(&pJob->errCode));
709,397✔
253
    }
254
  } else {
255
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
153,166✔
256
    return TSDB_CODE_SUCCESS;
151,747✔
257
  }
258

259
  SCH_RET(errCode);
13,772,668✔
260
}
261

262
int32_t schProcessOnSubJobSuccess(SSchJob *pJob) {
101,155,673✔
263
  SSchJob* pParent = pJob->parent;
101,155,673✔
264
  int64_t doneNum = atomic_add_fetch_32(&pParent->subJobDoneNum, 1);
101,155,673✔
265

266
  if (pParent->subJobExecIdx < pParent->subJobs->size) {
101,161,244✔
267
    SCH_RET(schLaunchJobImpl(taosArrayGetP(pParent->subJobs, pParent->subJobExecIdx++)));
45,439,992✔
268
  }
269
  
270
  if (SCH_SUB_JOBS_EXEC_FINISHED(pParent, doneNum)) {
55,721,252✔
271
    return schLaunchJobImpl(pParent);
55,721,252✔
272
  }
273

274
  return TSDB_CODE_SUCCESS;
×
275
}
276

277
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
1,251,009,526✔
278
  bool    moved = false;
1,251,009,526✔
279
  int32_t code = 0;
1,251,009,526✔
280

281
  SCH_TASK_TLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
1,251,009,526✔
282

283
  SCH_LOG_TASK_END_TS(pTask);
1,251,037,209✔
284

285
  int32_t taskDone = atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1);
1,251,080,799✔
286

287
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
1,251,087,720✔
288

289
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
1,251,090,851✔
290
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
1,251,033,463✔
291

292
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
1,251,032,529✔
293
  if (parentNum > 0) {
1,251,029,230✔
294
    int32_t curPhase = SCH_GET_JOB_PHASE(pJob);
267,846,209✔
295
    if (curPhase == QUERY_PHASE_EXEC_DATA_QUERY) {
267,818,155✔
296
      SCH_SET_JOB_PHASE(pJob, QUERY_PHASE_EXEC_WAITING_CHILDREN);
223,256,854✔
297
    }
298
  }
299
  if (parentNum == 0) {
1,251,018,954✔
300
    int32_t taskDone = 0;
983,195,156✔
301
    if (SCH_JOB_NEED_WAIT(pJob)) {
983,195,156✔
302
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
720,012,357✔
303
      pTask->level->taskSucceed++;
719,990,929✔
304
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
719,990,220✔
305
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
719,989,831✔
306

307
      if (taskDone < pTask->level->taskNum) {
719,997,146✔
308
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
20,188,776✔
309
        return TSDB_CODE_SUCCESS;
20,188,730✔
310
      }
311
      
312
      SCH_TASK_TLOG("taskDone number reach level task number, done:%d, total:%d", taskDone, pTask->level->taskNum);
699,800,973✔
313

314
      if (pTask->level->taskFailed > 0) {
699,800,998✔
315
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
2,858✔
316
      }
317

318
      SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
699,761,700✔
319
    }
320
    
321
    pJob->resNode = pTask->succeedAddr;
263,198,226✔
322
    pJob->fetchTask = pTask;
263,198,338✔
323

324
    if (SCH_IS_PARENT_JOB(pJob)) {
263,197,907✔
325
      SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
162,044,585✔
326
    } else {
327
      SCH_RET(schProcessOnSubJobSuccess(pJob));
101,153,611✔
328
    }
329
  }
330

331
  for (int32_t i = 0; i < parentNum; ++i) {
535,680,717✔
332
    SSchTask *pParent = *(SSchTask **)taosArrayGet(pTask->parents, i);
267,819,494✔
333
    if (NULL == pParent) {
267,829,532✔
334
      SCH_TASK_ELOG("fail to get task %d pParent, parentNum:%d", i, parentNum);
×
335
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
336
    }
337

338
    SCH_LOCK(SCH_WRITE, &pParent->planLock);
267,829,532✔
339
    SDownstreamSourceNode source = {
582,786,019✔
340
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
341
        .clientId = pTask->clientId,
267,858,538✔
342
        .taskId = pTask->taskId,
267,857,402✔
343
        .sId = pTask->seriesId,
267,857,964✔
344
        .execId = pTask->execId,
267,857,402✔
345
        .addr = pTask->succeedAddr,
267,857,964✔
346
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
267,858,492✔
347
        .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
267,857,918✔
348
    };
349

350
    code = qSetSubplanExecutionNode(pParent->plan, pTask->plan->id.groupId, &source);
267,857,964✔
351
    if (TSDB_CODE_SUCCESS != code) {
267,854,251✔
352
      SCH_TASK_ELOG("qSetSubplanExecutionNode failed, groupId: %d", pTask->plan->id.groupId);
×
353
    }
354

355
    SCH_UNLOCK(SCH_WRITE, &pParent->planLock);
267,854,251✔
356
    SCH_ERR_RET(code);
267,851,771✔
357

358
    int32_t readyNum = atomic_add_fetch_32(&pParent->childReady, 1);
267,851,771✔
359

360
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, pParent)) {
267,855,731✔
361
      // this is a redirect process for parent task, since the original pParent task has already failed before.
362
      // TODO refactor optimize: update the candidate address
363
      // set the address from the pTask->succeedAddr, the vnode that successfully executed subquery already
364
      if (pParent->redirectCtx.inRedirect && (!SCH_IS_DATA_BIND_TASK(pParent))) {
98,830,651✔
365
        code = schSwitchTaskCandidateAddr(pJob, pParent);
3,593✔
366

367
        // if all vnodes are tried, let's switch the epset for each vnode for the next round
368
        if (pParent->retryTimes > taosArrayGetSize(pParent->candidateAddrs)) {
3,593✔
369
          SCH_ERR_RET(schUpdateCurrentEpset(pParent, pJob));
×
370
        }
371
      }
372

373
      SCH_TASK_DLOG("all %d children task done, start to launch parent task, TID:0x%" PRIx64, readyNum, pParent->taskId);
98,830,123✔
374

375
      pParent->seriesId = pJob->seriesId;
98,831,179✔
376
      TSWAP(pTask, pParent);
98,830,651✔
377
      SCH_TASK_TLOG("task seriesId set to 0x%" PRIx64, pTask->seriesId);
98,830,559✔
378
      TSWAP(pTask, pParent);
98,830,559✔
379

380
      SCH_ERR_RET(schDelayLaunchTask(pJob, pParent));
98,830,605✔
381
    }
382
  }
383

384
  if (taskDone == pTask->level->taskNum) {
267,861,223✔
385
    SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));
90,012,244✔
386
  }
387

388
  return TSDB_CODE_SUCCESS;
267,864,596✔
389
}
390

391
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
581,061✔
392
  if (!schMgmt.cfg.enableReSchedule) {
581,061✔
393
    return TSDB_CODE_SUCCESS;
575,001✔
394
  }
395

396
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
6,060✔
397
    return TSDB_CODE_SUCCESS;
4,040✔
398
  }
399

400
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
2,020✔
401
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
×
402
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
×
403
    schDropTaskOnExecNode(pJob, pTask);
×
404
    taosHashClear(pTask->execNodes);
×
405

406
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
×
407
  }
408

409
  return TSDB_CODE_SUCCESS;
2,020✔
410
}
411

412
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, bool resetRetry) {
4,469,894✔
413
  SSchRedirectCtx *pCtx = &pTask->redirectCtx;
4,469,894✔
414

415
  pCtx->inRedirect = true;
4,469,894✔
416

417
  if (resetRetry) {
4,469,894✔
418
    pCtx->startTs = taosGetTimestampMs();  //always reset startTs & retryTimes for succ task
53,640✔
419
    pTask->retryTimes = 0;
53,640✔
420

421
    SCH_TASK_DLOG("reset succ task retryInfo, start %d/%d redirect retry, delayExec:%.2fs, startTs:%" PRId64,
53,640✔
422
                  pTask->retryTimes, pTask->maxRetryTimes, pTask->delayExecMs / 1000.0, pCtx->startTs);
423
  } else {
424
    double elapsed = 0;
4,416,254✔
425
    if (pCtx->startTs <= 0) {
4,416,254✔
426
      pCtx->startTs = taosGetTimestampMs();
137,673✔
427
    } else {
428
      elapsed = (taosGetTimestampMs() - pCtx->startTs) / 1000.0;
4,278,581✔
429
    }
430

431
    SCH_TASK_DLOG("update failed task redirectCtx, current %d/%d redirect retry, delayExec:%.2fs, startTs:%" PRId64
4,416,254✔
432
                  " ,elapsed:%.2fs",
433
                  pTask->retryTimes, pTask->maxRetryTimes, pTask->delayExecMs / 1000.0, pCtx->startTs, elapsed);
434
  }
435

436
  return TSDB_CODE_SUCCESS;
4,469,894✔
437
}
438

439
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
4,467,874✔
440
  pTask->waitRetry = true;
4,467,874✔
441

442
  if (pTask->delayTimer) {
4,467,874✔
443
    UNUSED(taosTmrStop(pTask->delayTimer));
16,686✔
444
  }
445

446
  schDropTaskOnExecNode(pJob, pTask);
4,467,874✔
447
  taosHashClear(pTask->execNodes);
4,467,874✔
448
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
4,467,874✔
449
  schDeregisterTaskHb(pJob, pTask);
4,467,874✔
450
  taosMemoryFreeClear(pTask->msg);
4,467,874✔
451

452
  pTask->msgLen = 0;
4,467,874✔
453
  pTask->lastMsgType = 0;
4,467,874✔
454
  pTask->childReady = 0;
4,467,874✔
455
  TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
4,467,874✔
456
}
4,467,874✔
457

458
#if 0
459

460
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
461
  int32_t code = 0;
462

463
  SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
464

465
  if (!NO_RET_REDIRECT_ERROR(rspCode)) {
466
    SCH_UPDATE_REDIRECT_CODE(pJob, rspCode);
467
  }
468

469
  SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
470

471
  schResetTaskForRetry(pJob, pTask);
472

473
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
474
    if (pData && pData->pEpSet) {
475
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
476
    } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
477
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
478
      if (NULL == addr) {
479
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
480
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
481
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
482
      }
483

484
      SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
485
      SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
486
                    addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
487
    } else {
488
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
489
      if (NULL == addr) {
490
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
491
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
492
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
493
      }
494

495
      SCH_SWITCH_EPSET(addr);
496
      SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
497
    }
498

499
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
500

501
    SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
502

503
    return TSDB_CODE_SUCCESS;
504
  }
505

506
  // merge plan
507

508
  pTask->childReady = 0;
509

510
  qClearSubplanExecutionNode(pTask->plan);
511

512
  // Note: current error task and upper level merge task
513
  if ((pData && 0 == pData->len) || NULL == pData) {
514
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
515
  }
516

517
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
518

519
  int32_t childrenNum = taosArrayGetSize(pTask->children);
520
  for (int32_t i = 0; i < childrenNum; ++i) {
521
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
522
    SCH_LOCK_TASK(pChild);
523
    (void)schDoTaskRedirect(pJob, pChild, NULL, rspCode);  // error handled internal
524
    SCH_UNLOCK_TASK(pChild);
525
  }
526

527
  return TSDB_CODE_SUCCESS;
528

529
_return:
530

531
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
532
}
533

534
int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) {
535
  SSchLevel *pLevel = pTask->level;
536

537
  SCH_TASK_DLOG("start to reset level for current task set, execDone:%d, launched:%d",
538
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
539

540
  if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
541
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
542
  }
543

544
  (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
545

546
  int32_t childrenNum = taosArrayGetSize(pTask->children);
547
  for (int32_t i = 0; i < childrenNum; ++i) {
548
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
549
    if (NULL == pChild) {
550
      SCH_TASK_ELOG("fail to get the %dth child, childrenNum:%d", i, childrenNum);
551
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
552
    }
553

554
    SCH_LOCK_TASK(pChild);
555
    pLevel = pChild->level;
556
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
557
    (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
558
    SCH_UNLOCK_TASK(pChild);
559
  }
560

561
  SCH_TASK_DLOG("end to reset level for current task set, execDone:%d, launched:%d",
562
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
563

564
  return TSDB_CODE_SUCCESS;
565
}
566

567
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
568
  int32_t code = 0;
569

570
  SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));
571

572
  if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
573
    if (NULL == pData->pEpSet) {
574
      SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
575
      code = TSDB_CODE_INVALID_MSG;
576
      goto _return;
577
    }
578
  }
579

580
  SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
581

582
  SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask));
583

584
  SCH_RESET_JOB_LEVEL_IDX(pJob);
585
  atomic_add_fetch_64(&pJob->seriesId, 1);
586

587
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
588

589
  taosMemoryFreeClear(pData->pData);
590
  taosMemoryFreeClear(pData->pEpSet);
591

592
  SCH_RET(code);
593

594
_return:
595

596
  taosMemoryFreeClear(pData->pData);
597
  taosMemoryFreeClear(pData->pEpSet);
598

599
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
600
}
601
#endif
602

603
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
1,269,408,870✔
604
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
1,269,408,870✔
605
  if (0 != code) {
1,269,656,793✔
606
    if (HASH_NODE_EXIST(code)) {
4,040✔
607
      SCH_TASK_DLOG("task already in execTask list, code:0x%x", code);
2,020✔
608
      return TSDB_CODE_SUCCESS;
2,020✔
609
    }
610

611
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, code:0x%x", code);
2,020✔
612
    SCH_ERR_RET(code);
2,020✔
613
  }
614

615
  SCH_TASK_TLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
1,269,652,753✔
616

617
  return TSDB_CODE_SUCCESS;
1,269,281,404✔
618
}
619

620
/*
621
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
622
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
623
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
624
  } else {
625
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
626
  }
627

628
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
629
  if (0 != code) {
630
    if (HASH_NODE_EXIST(code)) {
631
      *moved = true;
632
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
633
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
634
    }
635

636
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", ERRNO);
637
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
638
  }
639

640
  *moved = true;
641

642
  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
643

644
  return TSDB_CODE_SUCCESS;
645
}
646

647
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
648
  *moved = false;
649

650
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
651
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
652
  }
653

654
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
655
  if (0 != code) {
656
    if (HASH_NODE_EXIST(code)) {
657
      *moved = true;
658

659
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
660
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
661
    }
662

663
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", ERRNO);
664
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
665
  }
666

667
  *moved = true;
668

669
  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
670

671
  return TSDB_CODE_SUCCESS;
672
}
673

674
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
675
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
676
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
677
  }
678

679
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
680
  if (0 != code) {
681
    if (HASH_NODE_EXIST(code)) {
682
      *moved = true;
683

684
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
685
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
686
    }
687

688
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", ERRNO);
689
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
690
  }
691

692
  *moved = true;
693

694
  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
695

696
  return TSDB_CODE_SUCCESS;
697
}
698
*/
699

700
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
14,645,830✔
701
  int64_t now = taosGetTimestampMs();
14,646,835✔
702
  double  el = 0.0;
14,646,835✔
703
  if (pTask->redirectCtx.startTs != 0) {
14,646,835✔
704
    el = (now - pTask->redirectCtx.startTs) / 1000.0;
245,738✔
705
  }
706

707
  if (pJob->noMoreRetry) {
14,646,835✔
708
    *needRetry = false;
120,728✔
709
    SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes,
120,728✔
710
                  pTask->maxRetryTimes);
711
    return TSDB_CODE_SUCCESS;
120,728✔
712
  }
713

714
  // handle transport time out issue
715
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
14,526,107✔
716
    pTask->maxExecTimes++;
8,080✔
717
    pTask->maxRetryTimes++;
8,080✔
718
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
8,080✔
719
      pTask->timeoutUsec *= 2;
4,040✔
720
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
4,040✔
721
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
2,020✔
722
      }
723
    }
724
  }
725

726
  int32_t code = schFailedTaskNeedRetry(pTask, pJob, errCode);
14,526,107✔
727
  if (code != TSDB_CODE_SUCCESS) {
14,524,670✔
728
    *needRetry = false;
1,506✔
729
    return TSDB_CODE_SUCCESS;
1,506✔
730
  }
731

732
  if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) {
14,523,164✔
733
    *needRetry = false;
14,363,877✔
734
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:0x%x - %s", errCode, tstrerror(errCode));
14,364,451✔
735
    return TSDB_CODE_SUCCESS;
14,363,418✔
736
  }
737

738
  *needRetry = true;
158,713✔
739

740
  SCH_TASK_DLOG("task need the %d/%d retry, elapsedTime:%.2fs, errCode:0x%x - %s", pTask->execId + 1,
158,713✔
741
                pTask->maxRetryTimes, el, errCode, tstrerror(errCode));
742
  return TSDB_CODE_SUCCESS;
159,827✔
743
}
744

745
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
153,767✔
746
  (void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
153,767✔
747

748
  if (pTask->delayTimer) {
153,767✔
749
    UNUSED(taosTmrStop(pTask->delayTimer));
112,253✔
750
  }
751

752
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
153,767✔
753
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
153,767✔
754

755
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
153,767✔
756
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
×
757
  }
758

759
  schDeregisterTaskHb(pJob, pTask);
153,767✔
760

761
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
153,767✔
762
    pTask->delayExecMs = pTask->redirectCtx.redirectDelayMs;
151,747✔
763
    if (pTask->redirectCtx.startTs == 0) {
151,747✔
764
      pTask->redirectCtx.startTs = taosGetTimestampMs();
38,459✔
765
    }
766

767
    SCH_ERR_RET(schUpdateCurrentEpset(pTask, pJob));
151,747✔
768
  } else {
769
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
2,020✔
770
  }
771

772
  SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
151,747✔
773
  return TSDB_CODE_SUCCESS;
151,747✔
774
}
775

776
int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
87,108,951✔
777
  int32_t addNum = 0;
87,108,951✔
778
  int32_t nodeNum = 0;
87,108,951✔
779

780
  if (pJob->nodeList) {
87,108,951✔
781
    nodeNum = taosArrayGetSize(pJob->nodeList);
87,106,437✔
782

783
    for (int32_t i = 0; i < nodeNum; ++i) {
280,607,686✔
784
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
193,505,082✔
785
      SQueryNodeAddr *naddr = &nload->addr;
193,505,476✔
786

787
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
387,001,753✔
788
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, ERRNO);
×
789
        SCH_ERR_RET(terrno);
×
790
      }
791

792
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
193,496,277✔
793
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
794
                    SCH_GET_CUR_EP(naddr)->port);
795

796
      ++addNum;
193,499,219✔
797
    }
798
  }
799

800
  if (addNum <= 0) {
87,105,680✔
801
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
2,020✔
802
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
4,582✔
803
  }
804

805
  return TSDB_CODE_SUCCESS;
87,106,222✔
806
}
807

808
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
1,272,967,274✔
809
  if (NULL != pTask->candidateAddrs) {
1,272,967,274✔
810
    return TSDB_CODE_SUCCESS;
4,617,884✔
811
  }
812

813
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
1,268,766,342✔
814
  if (NULL == pTask->candidateAddrs) {
1,269,250,525✔
815
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
10✔
816
    SCH_ERR_RET(terrno);
10✔
817
  }
818

819
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
1,269,240,081✔
820
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
2,147,483,647✔
821
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", ERRNO);
×
822
      SCH_ERR_RET(terrno);
×
823
    }
824

825
    SCH_TASK_TLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
1,181,890,702✔
826

827
    return TSDB_CODE_SUCCESS;
1,181,812,679✔
828
  }
829

830
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
87,106,775✔
831
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
192✔
832
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
192✔
833
  }
834

835
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
87,107,328✔
836

837
  pTask->candidateIdx = taosRand() % taosArrayGetSize(pTask->candidateAddrs);
87,105,741✔
838
  return TSDB_CODE_SUCCESS;
87,109,055✔
839
}
840

841
#if 0
842
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
843
  int32_t code = TSDB_CODE_SUCCESS;
844
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
845
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
846
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
847
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
848
  }
849

850
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
851
  if (NULL == pAddr) {
852
    SCH_TASK_ELOG("fail to get task 0th condidataAddr, totalNum:%d", (int32_t)taosArrayGetSize(pTask->candidateAddrs));
853
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
854
  }
855

856
  char *origEpset = NULL;
857
  char *newEpset = NULL;
858

859
  SCH_ERR_RET(schDumpEpSet(&pAddr->epSet, &origEpset));
860
  SCH_ERR_JRET(schDumpEpSet(pEpSet, &newEpset));
861

862
  SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
863

864
  TAOS_MEMCPY(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
865

866
_return:
867

868
  taosMemoryFree(origEpset);
869
  taosMemoryFree(newEpset);
870

871
  return code;
872
}
873
#endif
874

875
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
11,673✔
876
  SQueryNodeAddr *pAddr = NULL;
11,673✔
877
  int32_t         code = 0;
11,673✔
878

879
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
11,673✔
880
  if (candidateNum <= 1) {
11,673✔
881
    goto _return;
4,040✔
882
  }
883

884
  switch (schMgmt.cfg.schPolicy) {
7,633✔
885
    case SCH_LOAD_SEQ:
5,613✔
886
    case SCH_ALL:
887
    default:
888
      if (++pTask->candidateIdx >= candidateNum) {
5,613✔
889
        pTask->candidateIdx = 0;
2,717✔
890
      }
891
      break;
5,613✔
892
    case SCH_RANDOM: {
2,020✔
893
      int32_t lastIdx = pTask->candidateIdx;
2,020✔
894
      while (lastIdx == pTask->candidateIdx) {
5,834✔
895
        pTask->candidateIdx = taosRand() % candidateNum;
3,814✔
896
      }
897
      break;
2,020✔
898
    }
899
  }
900

901
_return:
11,673✔
902
  code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
11,673✔
903
  if (code == TSDB_CODE_SUCCESS) {
11,673✔
904
    SCH_TASK_DLOG("switch task exec on candidateIdx:%d/%d, vgId:%d", pTask->candidateIdx, candidateNum, pAddr->nodeId);
7,633✔
905
  }
906
  return code;
11,673✔
907
}
908

909
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
4,621,641✔
910
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
4,621,641✔
911
  if (code) {
4,621,641✔
912
    SCH_TASK_WLOG("task already not in execTask list, code:0x%x", code);
4,280,601✔
913
  }
914

915
  return TSDB_CODE_SUCCESS;
4,621,641✔
916
}
917

918
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
553,136,539✔
919
  if (NULL == pTask->execNodes) {
553,136,539✔
920
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
2,020✔
921
    return;
2,020✔
922
  }
923

924
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
553,133,796✔
925
  if (size <= 0) {
553,141,863✔
926
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
1,483,113✔
927
    return;
1,483,113✔
928
  }
929

930
  int32_t       i = 0;
551,658,750✔
931
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
551,658,750✔
932
  while (nodeInfo) {
1,103,334,284✔
933
    if (nodeInfo->handle) {
551,677,487✔
934
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
551,622,923✔
935
      void *pExecId = taosHashGetKey(nodeInfo, NULL);
551,622,885✔
936
      (void)schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId);  // ignore error and continue
551,621,112✔
937

938
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
551,607,329✔
939
    } else {
940
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
54,540✔
941
    }
942

943
    ++i;
551,661,869✔
944
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
551,661,869✔
945
  }
946

947
  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
551,656,797✔
948
}
949

950
int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) {
33,938✔
951
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
33,938✔
952
  if (size <= 0) {
33,938✔
953
    SCH_TASK_DLOG("task no exec address to notify, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
2,020✔
954
    return TSDB_CODE_SUCCESS;
2,020✔
955
  }
956

957
  int32_t       i = 0;
31,918✔
958
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
31,918✔
959
  while (nodeInfo) {
63,836✔
960
    if (nodeInfo->handle) {
31,918✔
961
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
31,918✔
962
      SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_TASK_NOTIFY, &type));
31,918✔
963
      SCH_TASK_DLOG("start to notify %d to task's %dth execNode", type, i);
31,918✔
964
    } else {
965
      SCH_TASK_DLOG("no need to notify %d to task %dth execNode", type, i);
×
966
    }
967

968
    ++i;
31,918✔
969
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
31,918✔
970
  }
971

972
  SCH_TASK_DLOG("task has been notified %d on %d exec nodes", type, size);
31,918✔
973
  return TSDB_CODE_SUCCESS;
31,918✔
974
}
975

976
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
281,556,541✔
977
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
281,556,541✔
978
  SSchTask *pTask = NULL;
281,557,458✔
979
  SSchJob  *pJob = NULL;
281,557,458✔
980

981
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
281,557,458✔
982
         pEpId->ep.port);
983

984
  for (int32_t i = 0; i < taskNum; ++i) {
353,905,727✔
985
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
72,342,521✔
986
    if (NULL == pStatus) {
72,342,521✔
987
      qError("fail to get the %dth task status in hb rsp, taskNum:%d", i, taskNum);
×
988
      continue;
×
989
    }
990

991
    int32_t code = 0;
72,342,521✔
992

993
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d task status in server: %s", pStatus->queryId,
72,342,521✔
994
           pStatus->clientId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status));
995

996
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->subJobId, pStatus->taskId)) {
72,342,521✔
997
      continue;
10,075,524✔
998
    }
999

1000
    if (pStatus->execId != pTask->execId) {
62,266,754✔
1001
      // TODO
1002
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
×
1003
      schProcessOnCbEnd(pJob, pTask, 0);
×
1004
      continue;
×
1005
    }
1006

1007
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
62,263,043✔
1008
      // RECORD AND HANDLE ERROR!!!!
1009
      schProcessOnCbEnd(pJob, pTask, 0);
628✔
1010
      continue;
628✔
1011
    }
1012

1013
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
62,265,092✔
1014
      code = schRescheduleTask(pJob, pTask);
575,001✔
1015
    }
1016

1017
    schProcessOnCbEnd(pJob, pTask, code);
62,263,067✔
1018
  }
1019

1020
  return TSDB_CODE_SUCCESS;
281,563,206✔
1021
}
1022

1023
int32_t schHandleExplainRes(SArray *pExplainRes) {
×
1024
  int32_t code = 0;
×
1025
  int32_t resNum = taosArrayGetSize(pExplainRes);
×
1026
  if (resNum <= 0) {
×
1027
    goto _return;
×
1028
  }
1029

1030
  SSchTask *pTask = NULL;
×
1031
  SSchJob  *pJob = NULL;
×
1032

1033
  for (int32_t i = 0; i < resNum; ++i) {
×
1034
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
×
1035
    if (NULL == localRsp) {
×
1036
      qError("fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1037
      continue;
×
1038
    }
1039

1040
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg",
×
1041
           localRsp->qId, localRsp->cId, localRsp->tId);
1042

1043
    pJob = NULL;
×
1044
    (void)schAcquireJob(localRsp->rId, &pJob);
×
1045
    if (NULL == pJob) {
×
1046
      qWarn("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " job no exist, may be dropped, refId:0x%" PRIx64,
×
1047
            localRsp->qId, localRsp->cId, localRsp->tId, localRsp->rId);
1048
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
×
1049
    }
1050

1051
    int8_t status = 0;
×
1052
    if (schJobNeedToStop(pJob, &status)) {
×
1053
      SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
×
1054
      (void)schReleaseJob(pJob->refId);
×
1055
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1056
    }
1057

1058
    code = schGetTaskInJob(pJob, localRsp->tId, &pTask);
×
1059

1060
    if (TSDB_CODE_SUCCESS == code) {
×
1061
      code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
×
1062
    }
1063

1064
    (void)schReleaseJob(pJob->refId);
×
1065

1066
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x",
×
1067
           localRsp->qId, localRsp->cId, localRsp->tId, code);
1068

1069
    SCH_ERR_JRET(code);
×
1070

1071
    localRsp->rsp.numOfPlans = 0;
×
1072
    localRsp->rsp.subplanInfo = NULL;
×
1073
    pTask = NULL;
×
1074
  }
1075

1076
_return:
×
1077

1078
  for (int32_t i = 0; i < resNum; ++i) {
×
1079
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
×
1080
    if (NULL == localRsp) {
×
1081
      qError("in _return fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1082
      continue;
×
1083
    }
1084

1085
    tFreeSExplainRsp(&localRsp->rsp);
×
1086
  }
1087

1088
  taosArrayDestroy(pExplainRes);
×
1089

1090
  SCH_RET(code);
×
1091
}
1092

1093
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
1,273,647,232✔
1094
  SSubplan *plan = pTask->plan;
1,273,647,232✔
1095
  int32_t   code = 0;
1,273,834,155✔
1096

1097
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
1,273,834,155✔
1098
    SCH_LOCK(SCH_WRITE, &pTask->planLock);
1,273,783,424✔
1099
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
1,273,327,258✔
1100
    if (TSDB_CODE_SUCCESS == code && tsQueryPlannerTrace) {
1,273,072,762✔
1101
      if (SUBPLAN_TYPE_MODIFY == plan->subplanType) {
24,090✔
1102
        SDataInserterNode *insert = (SDataInserterNode *)plan->pDataSink;
×
1103
        SCH_TASK_DLOG("MODIFY plan, tables:%d, payload size:%u", insert ? insert->numOfTables : 0,
×
1104
                      insert ? insert->size : 0);
1105
      } else {
1106
        char   *msg = NULL;
24,090✔
1107
        int32_t msgLen = 0;
24,090✔
1108
        int32_t traceCode = qSubPlanToString(plan, &msg, &msgLen);
24,090✔
1109
        if (TSDB_CODE_SUCCESS == traceCode) {
24,090✔
1110
          SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
24,090✔
1111
          taosMemoryFree(msg);
24,090✔
1112
        } else {
1113
          SCH_TASK_WLOG("plan trace failed, code:%s", tstrerror(traceCode));
×
1114
        }
1115
      }
1116
    }
1117
    SCH_UNLOCK(SCH_WRITE, &pTask->planLock);
1,273,072,762✔
1118

1119
    if (TSDB_CODE_SUCCESS != code) {
1,273,010,751✔
1120
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
2,020✔
1121
                    pTask->msgLen);
1122
      SCH_ERR_RET(code);
2,020✔
1123
    }
1124
  }
1125

1126
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
1,272,885,055✔
1127

1128
  if (SCH_IS_QUERY_JOB(pJob)) {
1,273,497,366✔
1129
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
552,993,441✔
1130
  }
1131

1132
  SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType, NULL));
1,273,706,582✔
1133
}
1134

1135
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
2,829✔
1136
  // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
1137
  if (NULL == schMgmt.queryMgmt) {
2,829✔
1138
    void* p = NULL;
69✔
1139
    SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, &p, NULL));
69✔
1140
    if (atomic_val_compare_exchange_ptr(&schMgmt.queryMgmt, NULL, p)) {
69✔
1141
      qWorkerDestroy(&p);
×
1142
    }
1143
  }
1144

1145
  SArray *explainRes = NULL;
2,829✔
1146
  int32_t code = 0;
2,829✔
1147
  SQWMsg  qwMsg = {0};
2,829✔
1148
  qwMsg.msgInfo.taskType = (pJob->attr.type == JOB_TYPE_HQUERY)? TASK_TYPE_HQUERY:TASK_TYPE_QUERY;
2,829✔
1149
  qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
2,829✔
1150
  qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
2,829✔
1151
  qwMsg.msg = pTask->plan;
2,829✔
1152
  qwMsg.msgType = pTask->plan->msgType;
2,829✔
1153
  qwMsg.connInfo.handle = pJob->conn.pTrans;
2,829✔
1154
  qwMsg.pWorkerCb = pJob->pWorkerCb;
2,829✔
1155
  qwMsg.subEndPoints = NULL; // TODO
2,829✔
1156

1157
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
2,829✔
1158
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
×
1159
    if (NULL == explainRes) {
×
1160
      SCH_ERR_RET(terrno);
×
1161
    }
1162
  }
1163

1164
  SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, pJob->seriesId, pJob->queryId, pTask->clientId, pTask->taskId,
2,829✔
1165
                                        pJob->refId, pTask->execId, &qwMsg, explainRes));
1166

1167
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
2,829✔
1168
    SCH_ERR_RET(schHandleExplainRes(explainRes));
×
1169
    explainRes = NULL;
×
1170
  }
1171

1172
  SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
2,829✔
1173

1174
_return:
2,829✔
1175

1176
  taosArrayDestroy(explainRes);
2,829✔
1177

1178
  SCH_RET(code);
2,829✔
1179
}
1180

1181
int32_t schLaunchTaskImpl(void *param) {
1,273,928,409✔
1182
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
1,273,928,409✔
1183
  SSchJob     *pJob = NULL, *pParent = NULL;
1,273,928,409✔
1184

1185
  (void)schAcquireJob(pCtx->jobRid, &pParent);
1,273,949,355✔
1186
  if (NULL == pParent) {
1,274,076,231✔
1187
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
11,374✔
1188
    taosMemoryFree(param);
11,374✔
1189
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
4,371✔
1190
  }
1191

1192
  if (pCtx->subJobId >= 0) {
1,274,064,857✔
1193
    pJob = taosArrayGetP(pParent->subJobs, pCtx->subJobId);
155,957,831✔
1194
    if (NULL == pJob) {
155,921,910✔
1195
      qDebug("subJobId %d not found in subJobs, totalSubJobNum:%d", pCtx->subJobId, (int32_t)taosArrayGetSize(pParent->subJobs));
×
1196
      (void)schReleaseJob(pParent->refId);
×
1197

1198
      taosMemoryFree(param);
×
1199
      SCH_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1200
    }
1201
  } else {
1202
    pJob = pParent;
1,117,997,238✔
1203
  }
1204

1205
  SSchTask *pTask = pCtx->pTask;
1,273,919,148✔
1206

1207
  if (pCtx->asyncLaunch) {
1,274,050,861✔
1208
    SCH_LOCK_TASK(pTask);
341,522,503✔
1209
  }
1210

1211
  pTask->execId++;
1,273,960,210✔
1212
  pTask->retryTimes++;
1,273,995,349✔
1213
  pTask->waitRetry = false;
1,273,910,239✔
1214

1215
  int8_t  status = 0;
1,273,916,098✔
1216
  int32_t code = 0;
1,273,874,739✔
1217

1218
  if (atomic_load_64(&pTask->seriesId) < atomic_load_64(&pJob->seriesId)) {
1,273,874,739✔
1219
    SCH_TASK_DLOG("task seriesId:0x%" PRIx64 " is smaller than job seriesId:0x%" PRIx64 ", skip launch",
×
1220
                  pTask->seriesId, pJob->seriesId);
1221
    goto _return;
×
1222
  }
1223

1224
  (void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
1,273,700,265✔
1225

1226
  SCH_TASK_TLOG("start to launch %s task, execId %d, retry %d",
1,274,022,184✔
1227
                SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
1228

1229
  SCH_LOG_TASK_START_TS(pTask);
2,147,483,647✔
1230

1231
  if (schJobNeedToStop(pJob, &status)) {
1,273,824,179✔
1232
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
13,330✔
1233
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
13,330✔
1234
  }
1235

1236
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
1,274,160,305✔
1237
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
1,269,539,328✔
1238
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
1,269,109,371✔
1239
  }
1240

1241
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
1,274,036,794✔
1242
    SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
2,829✔
1243
  } else {
1244
    SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
1,274,059,075✔
1245
  }
1246

1247
#if 0
1248
  if (SCH_IS_QUERY_JOB(pJob)) {
1249
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
1250
  }
1251
#endif
1252

1253
_return:
1,273,904,435✔
1254

1255
  if (pJob && pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
1,273,916,786✔
1256
    if (code) {
341,444,153✔
1257
      code = schProcessOnTaskFailure(pJob, pTask, code);
13,330✔
1258
    }
1259
    if (code) {
341,444,153✔
1260
      code = schHandleJobFailure(pJob, code);
13,330✔
1261
    }
1262
  }
1263

1264
  if (pCtx->asyncLaunch) {
1,273,912,946✔
1265
    SCH_UNLOCK_TASK(pTask);
341,522,396✔
1266
  }
1267

1268
  (void)schReleaseJob(pParent->refId);
1,273,892,491✔
1269

1270
  taosMemoryFree(param);
1,274,164,717✔
1271

1272
  SCH_RET(code);
1,274,080,132✔
1273
}
1274

1275
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
1,273,924,251✔
1276
  if (!mayCreateAsyncWork()) {
1,273,924,251✔
NEW
1277
    SCH_ERR_RET(TSDB_CODE_APP_IS_STOPPING);
×
1278
  }
1279

1280
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
1,274,010,501✔
1281
  if (NULL == param) {
1,273,950,930✔
UNCOV
1282
    SCH_ERR_RET(terrno);
×
1283
  }
1284

1285
  param->jobRid = pJob->refId;
1,273,950,930✔
1286
  param->subJobId = pJob->subJobId;
1,274,030,028✔
1287
  param->pTask = pTask;
1,273,986,254✔
1288

1289
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
1,274,095,018✔
1290
    param->asyncLaunch = true;
341,578,178✔
1291
    SCH_ERR_RET(taosAsyncExec(schLaunchTaskImpl, param, NULL));
341,578,752✔
1292
  } else {
1293
    SCH_ERR_RET(schLaunchTaskImpl(param));
932,373,979✔
1294
  }
1295

1296
  return TSDB_CODE_SUCCESS;
1,274,124,531✔
1297
}
1298

1299
// Note: no more error processing, handled in function internal
1300
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
1,273,926,453✔
1301
  bool    enough = false;
1,273,926,453✔
1302
  int32_t code = 0;
1,273,994,110✔
1303

1304
  SCH_SET_TASK_HANDLE(pTask, NULL);
1,273,994,110✔
1305

1306
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
1,274,008,409✔
1307
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
42,420✔
1308

1309
    if (enough) {
40,400✔
1310
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
25,307✔
1311
    }
1312
  } else {
1313
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
1,274,039,074✔
1314
  }
1315

1316
  return TSDB_CODE_SUCCESS;
1,274,120,916✔
1317

1318
_return:
2,020✔
1319
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
2,020✔
1320
}
1321

1322
void schHandleTimerEvent(void *param, void *tmrId) {
168,831✔
1323
  SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
168,831✔
1324
  SSchTask       *pTask = NULL;
168,831✔
1325
  SSchJob        *pJob = NULL;
168,831✔
1326
  int32_t         code = 0;
168,831✔
1327

1328
  qDebug("delayTimer:%" PRIuPTR " is launched", (uintptr_t)tmrId);
168,831✔
1329

1330
  int64_t  rId = pTimerParam->rId;
168,831✔
1331
  uint64_t queryId = pTimerParam->queryId;
168,831✔
1332
  uint64_t taskId = pTimerParam->taskId;
168,831✔
1333
  int32_t subJobId = pTimerParam->subJobId;
168,831✔
1334

1335
  if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, subJobId, taskId)) {
168,831✔
1336
    return;
2,020✔
1337
  }
1338

1339
  if (0 == atomic_load_8(&pTask->delayLaunchPar.exit)) {
166,811✔
1340
    code = schLaunchTask(pJob, pTask);
166,811✔
1341
  } else {
UNCOV
1342
    SCH_TASK_DLOG("task will not be launched since query job exiting, status: %d", pTask->status);
×
1343
  }
1344

1345
  schProcessOnCbEnd(pJob, pTask, code);
166,811✔
1346
}
1347

1348
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
1,273,930,839✔
1349
  if (pTask->delayExecMs > 0) {
1,273,930,839✔
1350
    pTask->delayLaunchPar.rId = pJob->refId;
170,453✔
1351
    pTask->delayLaunchPar.queryId = pJob->queryId;
170,453✔
1352
    pTask->delayLaunchPar.taskId = pTask->taskId;
170,453✔
1353
    pTask->delayLaunchPar.subJobId = pJob->subJobId;
170,453✔
1354

1355
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
170,453✔
1356
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
170,453✔
1357

1358
    if (NULL == pTask->delayTimer) {
170,453✔
1359
      pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer);
41,514✔
1360
      if (NULL == pTask->delayTimer) {
41,514✔
1361
        SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
2,020✔
1362
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
2,020✔
1363
      }
1364

1365
      SCH_TASK_DLOG("task delayTimer:%" PRIuPTR " is started to launch task after:%.2fs", (uintptr_t)pTask->delayTimer,
39,494✔
1366
                    pTask->delayExecMs/1000.0);
1367
      return TSDB_CODE_SUCCESS;
39,494✔
1368
    }
1369

1370
    if (taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer,
128,939✔
1371
                     &pTask->delayTimer)) {
UNCOV
1372
      SCH_TASK_ELOG("taosTmrReset delayExec timer failed, handle:%p, tmr:%" PRIuPTR, schMgmt.timer,
×
1373
                    (uintptr_t)pTask->delayTimer);
1374
    } else {
1375
      SCH_TASK_DLOG("task start in %.2fs later by handler:%p, tmr:%" PRIuPTR, pTask->delayExecMs / 1000.0,
128,939✔
1376
                    schMgmt.timer, (uintptr_t)pTask->delayTimer);
1377
    }
1378

1379
    return TSDB_CODE_SUCCESS;
128,939✔
1380
  }
1381

1382
  SCH_RET(schLaunchTask(pJob, pTask));
1,273,748,159✔
1383
}
1384

1385
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
981,640,510✔
1386
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
981,640,510✔
1387

1388
  for (int32_t i = 0; i < level->taskNum; ++i) {
2,147,483,647✔
1389
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
1,174,146,555✔
1390
    pTask->failedSeriesId = pJob->seriesId - 1;
1,174,142,217✔
1391
    pTask->seriesId = pJob->seriesId;
1,174,096,057✔
1392
    
1393
    SCH_TASK_TLOG("task sId is set:%" PRIx64, pTask->seriesId);
1,174,162,075✔
1394
    SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
1,174,162,075✔
1395
  }
1396

1397
  return TSDB_CODE_SUCCESS;
981,878,254✔
1398
}
1399

1400
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
1,090,812,478✔
1401
  if (!SCH_JOB_NEED_DROP(pJob)) {
1,090,812,478✔
1402
    return;
700,592,708✔
1403
  }
1404

1405
  void *pIter = taosHashIterate(list, NULL);
390,255,241✔
1406
  while (pIter) {
938,932,736✔
1407
    SSchTask *pTask = *(SSchTask **)pIter;
548,674,203✔
1408

1409
    if (pTask->delayTimer) {
548,674,116✔
1410
      schStopTaskDelayTimer(pJob, pTask, true);
39,494✔
1411
    }
1412

1413
    SCH_LOCK_TASK(pTask);
548,674,116✔
1414
    schDropTaskOnExecNode(pJob, pTask);
548,674,288✔
1415
    SCH_UNLOCK_TASK(pTask);
548,671,714✔
1416

1417
    pIter = taosHashIterate(list, pIter);
548,671,132✔
1418
  }
1419
}
1420

1421
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pCurrTask) {
4,166✔
1422
  int32_t code = TSDB_CODE_SUCCESS;
4,166✔
1423

1424
  if (NULL != pCurrTask) {
4,166✔
1425
    SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type));
4,166✔
1426
  }
1427
  
1428
  void *pIter = taosHashIterate(list, NULL);
4,166✔
1429
  while (pIter) {
36,084✔
1430
    SSchTask *pTask = *(SSchTask **)pIter;
31,918✔
1431
    if (pTask != pCurrTask) {
31,918✔
1432
      SCH_LOCK_TASK(pTask);
27,752✔
1433
      code = schNotifyTaskOnExecNode(pJob, pTask, type);
27,752✔
1434
      SCH_UNLOCK_TASK(pTask);
27,752✔
1435

1436
      if (TSDB_CODE_SUCCESS != code) {
27,752✔
UNCOV
1437
        break;
×
1438
      }
1439
    }
1440

1441
    pIter = taosHashIterate(list, pIter);
31,918✔
1442
  }
1443

1444
  SCH_RET(code);
4,166✔
1445
}
1446

1447
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
280,544,405✔
1448
  SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL));
280,544,405✔
1449
}
1450

1451
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
4,849✔
1452
  void   *pRsp = NULL;
4,849✔
1453
  int32_t code = 0;
4,849✔
1454
  SArray *explainRes = NULL;
4,849✔
1455

1456
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
4,849✔
1457
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
2,020✔
1458
    if (NULL == explainRes) {
2,020✔
UNCOV
1459
      SCH_ERR_RET(terrno);
×
1460
    }
1461
  }
1462

1463
  SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, pJob->seriesId, pJob->queryId, pTask->clientId, pTask->taskId,
4,849✔
1464
                                        pJob->refId, pTask->execId, &pRsp, explainRes));
1465

1466
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
2,829✔
UNCOV
1467
    SCH_ERR_RET(schHandleExplainRes(explainRes));
×
UNCOV
1468
    explainRes = NULL;
×
1469
  }
1470

1471
  SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
2,829✔
1472

1473
_return:
2,829✔
1474

1475
  taosArrayDestroy(explainRes);
4,849✔
1476

1477
  SCH_RET(code);
4,849✔
1478
}
1479

1480
// Note: no more error processing, handled in function internal
1481
int32_t schLaunchFetchTask(SSchJob *pJob) {
280,550,488✔
1482
  int32_t code = 0;
280,550,488✔
1483

1484
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
280,550,488✔
1485
  if (fetchRes) {
280,551,084✔
1486
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
2,020✔
1487
    return TSDB_CODE_SUCCESS;
2,020✔
1488
  }
1489

1490
  SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH);
280,549,064✔
1491

1492
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
280,549,262✔
1493
    SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
4,849✔
1494
  } else {
1495
    SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
280,544,413✔
1496
  }
1497

1498
  return TSDB_CODE_SUCCESS;
280,547,175✔
1499

1500
_return:
2,020✔
1501

1502
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
2,020✔
1503
}
1504

1505
int32_t schUpdateCurrentEpset(SSchTask *pTask, SSchJob *pJob) {
151,747✔
1506
  char            buf[256] = {0};
151,747✔
1507
  SQueryNodeAddr *pAddr = NULL;
151,747✔
1508
  int32_t         code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
151,747✔
1509
  if (code != TSDB_CODE_SUCCESS) {
151,747✔
UNCOV
1510
    SCH_ERR_RET(code);
×
1511
  }
1512

1513
  // switch to the next ep in the epset
1514
  SCH_SWITCH_EPSET(pAddr);
151,747✔
1515

1516
  int32_t ret = epsetToStr(&pAddr->epSet, buf, tListLen(buf));
151,747✔
1517
  if (ret != 0) {  // print error and continue
151,747✔
UNCOV
1518
    SCH_TASK_ELOG("failed to print vgId:%d epset, code:%s", pAddr->nodeId, tstrerror(ret));
×
1519
  }
1520

1521
  // Wait for a while since the vnode leader/follower switch may cost from several seconds
1522
  // to serveral minitues to complete.
1523
  SCH_TASK_DLOG("vgId:%d switch to next ep:%s to start task delay:%.2fs, startTs:%" PRId64, pAddr->nodeId, buf,
151,747✔
1524
                pTask->delayExecMs / 1000.0, pTask->redirectCtx.startTs);
1525

1526
  return TSDB_CODE_SUCCESS;
151,747✔
1527
}
1528

1529
int32_t schFailedTaskNeedRetry(SSchTask *pTask, SSchJob *pJob, int32_t rspCode) {
19,059,726✔
1530
  double el = (pTask->redirectCtx.startTs > 0) ? (taosGetTimestampMs() - pTask->redirectCtx.startTs) / 1000.0 : 0.0;
23,584,045✔
1531

1532
  // check if the failed task may cause the query job to quit retrying
1533
  if (pTask->retryTimes >= pTask->maxRetryTimes) {
19,059,726✔
1534
    pJob->noMoreRetry = true;
122,234✔
1535
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d, elapsedTime:%.2fs",
122,234✔
1536
                  pTask->retryTimes, pTask->maxRetryTimes, el);
1537
    return rspCode;
122,234✔
1538
  }
1539

1540
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
18,937,492✔
UNCOV
1541
    pJob->noMoreRetry = true;
×
UNCOV
1542
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d, elapsedTime:%.2fs", pTask->execId,
×
1543
                  pTask->maxExecTimes, el);
UNCOV
1544
    return rspCode;
×
1545
  }
1546

1547
  return TSDB_CODE_SUCCESS;
18,937,492✔
1548
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc