• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4917

07 Jan 2026 03:52PM UTC coverage: 65.42% (+0.02%) from 65.402%
#4917

push

travis-ci

web-flow
merge: from main to 3.0 branch #34204

31 of 34 new or added lines in 2 files covered. (91.18%)

819 existing lines in 129 files now uncovered.

202679 of 309814 relevant lines covered (65.42%)

116724351.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.81
/source/libs/scheduler/src/schTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "query.h"
17
#include "qworker.h"
18
#include "schInt.h"
19
#include "taoserror.h"
20
#include "tglobal.h"
21
#include "tmsg.h"
22
#include "trpc.h"
23
#include "tmisce.h"
24

25
static int32_t schUpdateCurrentEpset(SSchTask *pTask, SSchJob* pJob);
26

27
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
1,015,215,900✔
28
  schDeregisterTaskHb(pJob, pTask);
1,015,215,900✔
29

30
  if (pTask->candidateAddrs) {
1,015,217,502✔
31
    taosArrayDestroy(pTask->candidateAddrs);
942,310,580✔
32
  }
33

34
  taosMemoryFreeClear(pTask->msg);
1,015,216,683✔
35

36
  if (pTask->children) {
1,015,216,909✔
37
    taosArrayDestroy(pTask->children);
101,339,546✔
38
  }
39

40
  if (pTask->parents) {
1,015,220,599✔
41
    taosArrayDestroy(pTask->parents);
223,463,566✔
42
  }
43

44
  if (pTask->execNodes) {
1,015,219,397✔
45
    taosHashCleanup(pTask->execNodes);
1,015,216,642✔
46
  }
47

48
  taosArrayDestroy(pTask->profile.execTime);
1,015,218,097✔
49
}
1,015,213,537✔
50

51
void schInitTaskRetryInfo(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
1,015,205,791✔
52
  pTask->redirectCtx.redirectDelayMs = 2000;  // 2s by default
1,015,205,791✔
53

54
  // 3 is the maximum replica factor in tsdb, so here multiply 3 to increase the retry chance
55
  int32_t REPLICA_FACTOR = 3;
1,015,216,447✔
56

57
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
1,015,216,447✔
58
    int32_t retryNum = ceil((tsMaxRetryWaitTime * 1.0) / pTask->redirectCtx.redirectDelayMs);
1,015,212,513✔
59
    pTask->maxRetryTimes = TMAX(retryNum, SCH_DEFAULT_MAX_RETRY_NUM * REPLICA_FACTOR);
1,015,206,034✔
60
  } else {
UNCOV
61
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
×
UNCOV
62
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM * REPLICA_FACTOR);
×
63
  }
64

65
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
1,015,201,034✔
66
}
1,015,212,198✔
67

68
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
1,015,202,136✔
69
  int32_t code = 0;
1,015,202,136✔
70

71
  pTask->plan = pPlan;
1,015,202,136✔
72
  pTask->level = pLevel;
1,015,215,145✔
73
  pTask->seriesId = pJob->seriesId;
1,015,215,134✔
74
  pTask->execId = -1;
1,015,203,891✔
75
  pTask->failedExecId = -2;
1,015,216,591✔
76
  pTask->failedSeriesId = 0;
1,015,205,434✔
77
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
1,015,208,860✔
78
  pTask->clientId = getClientId();
1,015,202,696✔
79
  pTask->taskId = schGenTaskId();
1,015,194,779✔
80

81
  schInitTaskRetryInfo(pJob, pTask, pLevel);
1,015,209,143✔
82

83
  pTask->execNodes =
1,015,223,595✔
84
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1,015,194,532✔
85
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
1,015,223,595✔
86
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
1,015,185,049✔
87
    SCH_ERR_JRET(terrno);
×
88
  }
89

90
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
1,015,210,552✔
91

92
  SCH_TASK_DLOG("task initialized, max retry(exec):%d(%d), max retry duration:%.2fs", pTask->maxRetryTimes,
1,015,216,204✔
93
                pTask->maxExecTimes, (pTask->redirectCtx.redirectDelayMs * pTask->maxRetryTimes) / 1000.0);
94

95
  return TSDB_CODE_SUCCESS;
1,015,213,618✔
96

97
_return:
×
98

99
  taosArrayDestroy(pTask->profile.execTime);
×
100
  taosHashCleanup(pTask->execNodes);
×
101

102
  SCH_RET(code);
×
103
}
104

105
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
926,372,289✔
106
  char            buf[256] = {0};
926,372,289✔
107
  SQueryNodeAddr *pAddr = NULL;
926,375,734✔
108

109
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
926,353,512✔
110
    return TSDB_CODE_SUCCESS;
3,300✔
111
  }
112

113
  int32_t code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
926,352,887✔
114
  if (code != TSDB_CODE_SUCCESS) {
926,373,700✔
115
    SCH_ERR_RET(code);
×
116
  }
117

118
  pTask->succeedAddr = *pAddr;
926,373,700✔
119
  code = epsetToStr(&pAddr->epSet, buf, tListLen(buf));
926,371,853✔
120
  if (code == TSDB_CODE_SUCCESS) {
926,345,607✔
121
    SCH_TASK_DLOG("recode the success addr:%s", buf);
926,345,620✔
122
  } else {
123
    SCH_TASK_ELOG("failed to print epset due to convert to string failed, code:%s, ignore and continue",
×
124
                  tstrerror(code));
125
  }
126

127
  return TSDB_CODE_SUCCESS;
926,355,928✔
128
}
129

130
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
361,215,939✔
131
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
361,215,939✔
132

133
  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
361,332,648✔
134
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", ERRNO);
×
135
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
136
  }
137

138
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
361,737,207✔
139

140
  return TSDB_CODE_SUCCESS;
361,393,048✔
141
}
142

143
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
133,846✔
144
  if (NULL == pTask->execNodes) {
133,846✔
145
    return TSDB_CODE_SUCCESS;
×
146
  }
147

148
  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
133,846✔
149
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
×
150
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
151
  } else {
152
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
133,846✔
153
  }
154

155
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
133,846✔
156
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
×
157
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
158
  }
159

160
  return TSDB_CODE_SUCCESS;
133,846✔
161
}
162

163
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
478,166,748✔
164
  if (taosHashGetSize(pTask->execNodes) <= 0) {
478,166,748✔
165
    return TSDB_CODE_SUCCESS;
×
166
  }
167

168
  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
478,174,843✔
169
  if (NULL == nodeInfo) {  // ignore it
478,176,831✔
170
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
×
171
                  pTask->execId, pTask->waitRetry);
172
    return TSDB_CODE_SUCCESS;
×
173
  }
174

175
  nodeInfo->handle = handle;
478,176,831✔
176

177
  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);
478,185,087✔
178

179
  return TSDB_CODE_SUCCESS;
478,179,932✔
180
}
181

182
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, uint64_t seriesId, int32_t execId) {
478,299,566✔
183
  if (dropExecNode) {
478,299,566✔
184
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
133,846✔
185
  }
186

187
  SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId));
478,165,720✔
188

189
  if ((seriesId != pTask->seriesId || seriesId <= pTask->failedSeriesId) || 
478,179,348✔
190
      (execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) {  // ignore it
478,186,072✔
191
    SCH_TASK_DLOG("handle not updated since seriesId:0x%" PRIx64 " or execId:%d is not lastest,"
×
192
                  "current seriesId:0x%" PRIx64 " execId %d, failedSeriesId:0x%" PRIx64 " failedExecId:%d, waitRetry %d", 
193
                  seriesId, execId, pTask->seriesId, pTask->execId, pTask->failedSeriesId, pTask->failedExecId, pTask->waitRetry);
194
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
195
  }
196

197
  SCH_SET_TASK_HANDLE(pTask, handle);
478,169,851✔
198

199
  return TSDB_CODE_SUCCESS;
478,118,141✔
200
}
201

202
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
12,572,145✔
203
  bool    needRetry = false;
12,572,145✔
204
  bool    moved = false;
12,572,145✔
205
  int32_t taskDone = 0;
12,572,145✔
206
  int8_t  jobStatus = 0;
12,572,145✔
207

208
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
12,567,505✔
209
    return TSDB_CODE_SCH_IGNORE_ERROR;
9,787✔
210
  }
211

212
  pTask->failedExecId = pTask->execId;
12,557,718✔
213
  if (schJobNeedToStop(pJob, &jobStatus)) {
12,560,908✔
214
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
21,515✔
215
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
21,515✔
216
  }
217

218
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
12,552,535✔
219
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
12,550,604✔
220
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
×
221
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
×
222
  }
223

224
  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
12,549,734✔
225
    SCH_LOG_TASK_WAIT_TS(pTask);
×
226
  } else {
227
    SCH_LOG_TASK_END_TS(pTask);
12,544,613✔
228
  }
229

230
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
12,550,215✔
231

232
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
12,550,215✔
233

234
  if (!needRetry) {
12,549,734✔
235
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
12,433,241✔
236
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);
12,439,812✔
237

238
    if (SCH_JOB_NEED_WAIT(pJob)) {
12,436,042✔
239
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
647,852✔
240
      pTask->level->taskFailed++;
647,852✔
241
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
647,852✔
242
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
647,852✔
243

244
      schUpdateJobErrCode(pJob, errCode);
647,852✔
245

246
      if (taskDone < pTask->level->taskNum) {
647,852✔
247
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
1,183✔
248
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
1,183✔
249
      }
250

251
      SCH_RET(atomic_load_32(&pJob->errCode));
646,669✔
252
    }
253
  } else {
254
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
116,493✔
255
    return TSDB_CODE_SUCCESS;
116,493✔
256
  }
257

258
  SCH_RET(errCode);
11,788,190✔
259
}
260

261
int32_t schProcessOnSubJobSuccess(SSchJob *pJob) {
64,616,268✔
262
  SSchJob* pParent = pJob->parent;
64,616,268✔
263
  int64_t doneNum = atomic_add_fetch_32(&pParent->subJobDoneNum, 1);
64,616,268✔
264

265
  if (pParent->subJobExecIdx < pParent->subJobs->size) {
64,616,268✔
266
    SCH_RET(schLaunchJobImpl(taosArrayGetP(pParent->subJobs, pParent->subJobExecIdx++)));
45,787,710✔
267
  }
268
  
269
  if (SCH_SUB_JOBS_EXEC_FINISHED(pParent, doneNum)) {
18,828,558✔
270
    return schLaunchJobImpl(pParent);
18,828,558✔
271
  }
272

273
  return TSDB_CODE_SUCCESS;
×
274
}
275

276
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
926,368,539✔
277
  bool    moved = false;
926,368,539✔
278
  int32_t code = 0;
926,368,539✔
279

280
  SCH_TASK_TLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
926,368,539✔
281

282
  SCH_LOG_TASK_END_TS(pTask);
926,376,533✔
283

284
  int32_t taskDone = atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1);
926,400,480✔
285

286
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
926,396,945✔
287

288
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
926,399,894✔
289
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
926,356,173✔
290

291
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
926,359,673✔
292
  if (parentNum == 0) {
926,332,636✔
293
    int32_t taskDone = 0;
752,950,298✔
294
    if (SCH_JOB_NEED_WAIT(pJob)) {
752,950,298✔
295
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
582,697,803✔
296
      pTask->level->taskSucceed++;
582,689,539✔
297
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
582,693,130✔
298
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
582,696,092✔
299

300
      if (taskDone < pTask->level->taskNum) {
582,694,920✔
301
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
18,185,446✔
302
        return TSDB_CODE_SUCCESS;
18,185,434✔
303
      }
304
      
305
      SCH_TASK_TLOG("taskDone number reach level task number, done:%d, total:%d", taskDone, pTask->level->taskNum);
564,503,980✔
306

307
      if (pTask->level->taskFailed > 0) {
564,503,892✔
308
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
674✔
309
      }
310

311
      SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
564,501,026✔
312
    }
313
    
314
    pJob->resNode = pTask->succeedAddr;
170,253,362✔
315
    pJob->fetchTask = pTask;
170,253,880✔
316

317
    if (SCH_IS_PARENT_JOB(pJob)) {
170,253,814✔
318
      SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
105,637,542✔
319
    } else {
320
      SCH_RET(schProcessOnSubJobSuccess(pJob));
64,616,269✔
321
    }
322
  }
323

324
  for (int32_t i = 0; i < parentNum; ++i) {
346,827,662✔
325
    SSchTask *pParent = *(SSchTask **)taosArrayGet(pTask->parents, i);
173,408,100✔
326
    if (NULL == pParent) {
173,421,309✔
327
      SCH_TASK_ELOG("fail to get task %d pParent, parentNum:%d", i, parentNum);
×
328
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
329
    }
330

331
    SCH_LOCK(SCH_WRITE, &pParent->planLock);
173,421,309✔
332
    SDownstreamSourceNode source = {
347,022,818✔
333
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
334
        .clientId = pTask->clientId,
173,443,202✔
335
        .taskId = pTask->taskId,
173,436,477✔
336
        .sId = pTask->seriesId,
173,444,025✔
337
        .execId = pTask->execId,
173,440,255✔
338
        .addr = pTask->succeedAddr,
173,443,735✔
339
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
173,445,185✔
340
        .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
173,441,411✔
341
    };
342

343
    code = qSetSubplanExecutionNode(pParent->plan, pTask->plan->id.groupId, &source);
173,443,735✔
344
    if (TSDB_CODE_SUCCESS != code) {
173,441,844✔
345
      SCH_TASK_ELOG("qSetSubplanExecutionNode failed, groupId: %d", pTask->plan->id.groupId);
×
346
    }
347

348
    SCH_UNLOCK(SCH_WRITE, &pParent->planLock);
173,441,844✔
349
    SCH_ERR_RET(code);
173,441,552✔
350

351
    int32_t readyNum = atomic_add_fetch_32(&pParent->childReady, 1);
173,441,552✔
352

353
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, pParent)) {
173,442,042✔
354
      // this is a redirect process for parent task, since the original pParent task has already failed before.
355
      // TODO refactor optimize: update the candidate address
356
      // set the address from the pTask->succeedAddr, the vnode that successfully executed subquery already
357
      if (pParent->redirectCtx.inRedirect && (!SCH_IS_DATA_BIND_TASK(pParent))) {
79,354,578✔
358
        code = schSwitchTaskCandidateAddr(pJob, pParent);
1,270✔
359

360
        // if all vnodes are tried, let's switch the epset for each vnode for the next round
361
        if (pParent->retryTimes > taosArrayGetSize(pParent->candidateAddrs)) {
1,270✔
362
          SCH_ERR_RET(schUpdateCurrentEpset(pParent, pJob));
×
363
        }
364
      }
365

366
      SCH_TASK_DLOG("all %d children task done, start to launch parent task, TID:0x%" PRIx64, readyNum, pParent->taskId);
79,354,578✔
367

368
      pParent->seriesId = pJob->seriesId;
79,356,458✔
369
      TSWAP(pTask, pParent);
79,354,288✔
370
      SCH_TASK_TLOG("task seriesId set to 0x%" PRIx64, pTask->seriesId);
79,353,998✔
371
      TSWAP(pTask, pParent);
79,353,998✔
372

373
      SCH_ERR_RET(schDelayLaunchTask(pJob, pParent));
79,353,128✔
374
    }
375
  }
376

377
  if (taskDone == pTask->level->taskNum) {
173,419,562✔
378
    SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));
71,838,538✔
379
  }
380

381
  return TSDB_CODE_SUCCESS;
173,444,801✔
382
}
383

384
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
53,376✔
385
  if (!schMgmt.cfg.enableReSchedule) {
53,376✔
386
    return TSDB_CODE_SUCCESS;
53,376✔
387
  }
388

389
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
×
390
    return TSDB_CODE_SUCCESS;
×
391
  }
392

393
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
×
394
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
×
395
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
×
396
    schDropTaskOnExecNode(pJob, pTask);
×
397
    taosHashClear(pTask->execNodes);
×
398

399
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
×
400
  }
401

402
  return TSDB_CODE_SUCCESS;
×
403
}
404

405
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, bool resetRetry) {
2,824,678✔
406
  SSchRedirectCtx *pCtx = &pTask->redirectCtx;
2,824,678✔
407

408
  pCtx->inRedirect = true;
2,824,678✔
409

410
  if (resetRetry) {
2,824,678✔
411
    pCtx->startTs = taosGetTimestampMs();  //always reset startTs & retryTimes for succ task
27,532✔
412
    pTask->retryTimes = 0;
27,532✔
413

414
    SCH_TASK_DLOG("reset succ task retryInfo, start %d/%d redirect retry, delayExec:%.2fs, startTs:%" PRId64,
27,532✔
415
                  pTask->retryTimes, pTask->maxRetryTimes, pTask->delayExecMs / 1000.0, pCtx->startTs);
416
  } else {
417
    double elapsed = 0;
2,797,146✔
418
    if (pCtx->startTs <= 0) {
2,797,146✔
419
      pCtx->startTs = taosGetTimestampMs();
80,715✔
420
    } else {
421
      elapsed = (taosGetTimestampMs() - pCtx->startTs) / 1000.0;
2,716,431✔
422
    }
423

424
    SCH_TASK_DLOG("update failed task redirectCtx, current %d/%d redirect retry, delayExec:%.2fs, startTs:%" PRId64
2,797,146✔
425
                  " ,elapsed:%.2fs",
426
                  pTask->retryTimes, pTask->maxRetryTimes, pTask->delayExecMs / 1000.0, pCtx->startTs, elapsed);
427
  }
428

429
  return TSDB_CODE_SUCCESS;
2,824,678✔
430
}
431

432
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
2,824,678✔
433
  pTask->waitRetry = true;
2,824,678✔
434

435
  if (pTask->delayTimer) {
2,824,678✔
436
    UNUSED(taosTmrStop(pTask->delayTimer));
4,864✔
437
  }
438

439
  schDropTaskOnExecNode(pJob, pTask);
2,824,678✔
440
  taosHashClear(pTask->execNodes);
2,824,678✔
441
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
2,824,678✔
442
  schDeregisterTaskHb(pJob, pTask);
2,824,678✔
443
  taosMemoryFreeClear(pTask->msg);
2,824,678✔
444

445
  pTask->msgLen = 0;
2,824,678✔
446
  pTask->lastMsgType = 0;
2,824,678✔
447
  pTask->childReady = 0;
2,824,678✔
448
  TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
2,824,678✔
449
}
2,824,678✔
450

451
#if 0
452

453
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
454
  int32_t code = 0;
455

456
  SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
457

458
  if (!NO_RET_REDIRECT_ERROR(rspCode)) {
459
    SCH_UPDATE_REDIRECT_CODE(pJob, rspCode);
460
  }
461

462
  SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
463

464
  schResetTaskForRetry(pJob, pTask);
465

466
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
467
    if (pData && pData->pEpSet) {
468
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
469
    } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
470
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
471
      if (NULL == addr) {
472
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
473
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
474
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
475
      }
476

477
      SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
478
      SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
479
                    addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
480
    } else {
481
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
482
      if (NULL == addr) {
483
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
484
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
485
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
486
      }
487

488
      SCH_SWITCH_EPSET(addr);
489
      SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
490
    }
491

492
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
493

494
    SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
495

496
    return TSDB_CODE_SUCCESS;
497
  }
498

499
  // merge plan
500

501
  pTask->childReady = 0;
502

503
  qClearSubplanExecutionNode(pTask->plan);
504

505
  // Note: current error task and upper level merge task
506
  if ((pData && 0 == pData->len) || NULL == pData) {
507
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
508
  }
509

510
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
511

512
  int32_t childrenNum = taosArrayGetSize(pTask->children);
513
  for (int32_t i = 0; i < childrenNum; ++i) {
514
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
515
    SCH_LOCK_TASK(pChild);
516
    (void)schDoTaskRedirect(pJob, pChild, NULL, rspCode);  // error handled internal
517
    SCH_UNLOCK_TASK(pChild);
518
  }
519

520
  return TSDB_CODE_SUCCESS;
521

522
_return:
523

524
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
525
}
526

527
int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) {
528
  SSchLevel *pLevel = pTask->level;
529

530
  SCH_TASK_DLOG("start to reset level for current task set, execDone:%d, launched:%d",
531
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
532

533
  if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
534
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
535
  }
536

537
  (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
538

539
  int32_t childrenNum = taosArrayGetSize(pTask->children);
540
  for (int32_t i = 0; i < childrenNum; ++i) {
541
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
542
    if (NULL == pChild) {
543
      SCH_TASK_ELOG("fail to get the %dth child, childrenNum:%d", i, childrenNum);
544
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
545
    }
546

547
    SCH_LOCK_TASK(pChild);
548
    pLevel = pChild->level;
549
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
550
    (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
551
    SCH_UNLOCK_TASK(pChild);
552
  }
553

554
  SCH_TASK_DLOG("end to reset level for current task set, execDone:%d, launched:%d",
555
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
556

557
  return TSDB_CODE_SUCCESS;
558
}
559

560
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
561
  int32_t code = 0;
562

563
  SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));
564

565
  if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
566
    if (NULL == pData->pEpSet) {
567
      SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
568
      code = TSDB_CODE_INVALID_MSG;
569
      goto _return;
570
    }
571
  }
572

573
  SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
574

575
  SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask));
576

577
  SCH_RESET_JOB_LEVEL_IDX(pJob);
578
  atomic_add_fetch_64(&pJob->seriesId, 1);
579

580
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
581

582
  taosMemoryFreeClear(pData->pData);
583
  taosMemoryFreeClear(pData->pEpSet);
584

585
  SCH_RET(code);
586

587
_return:
588

589
  taosMemoryFreeClear(pData->pData);
590
  taosMemoryFreeClear(pData->pEpSet);
591

592
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
593
}
594
#endif
595

596
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
942,347,330✔
597
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
942,347,330✔
598
  if (0 != code) {
942,348,200✔
599
    if (HASH_NODE_EXIST(code)) {
×
600
      SCH_TASK_DLOG("task already in execTask list, code:0x%x", code);
×
601
      return TSDB_CODE_SUCCESS;
×
602
    }
603

604
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, code:0x%x", code);
×
605
    SCH_ERR_RET(code);
×
606
  }
607

608
  SCH_TASK_TLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
942,348,200✔
609

610
  return TSDB_CODE_SUCCESS;
942,128,873✔
611
}
612

613
/*
614
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
615
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
616
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
617
  } else {
618
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
619
  }
620

621
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
622
  if (0 != code) {
623
    if (HASH_NODE_EXIST(code)) {
624
      *moved = true;
625
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
626
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
627
    }
628

629
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", ERRNO);
630
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
631
  }
632

633
  *moved = true;
634

635
  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
636

637
  return TSDB_CODE_SUCCESS;
638
}
639

640
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
641
  *moved = false;
642

643
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
644
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
645
  }
646

647
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
648
  if (0 != code) {
649
    if (HASH_NODE_EXIST(code)) {
650
      *moved = true;
651

652
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
653
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
654
    }
655

656
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", ERRNO);
657
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
658
  }
659

660
  *moved = true;
661

662
  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
663

664
  return TSDB_CODE_SUCCESS;
665
}
666

667
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
668
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
669
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
670
  }
671

672
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
673
  if (0 != code) {
674
    if (HASH_NODE_EXIST(code)) {
675
      *moved = true;
676

677
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
678
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
679
    }
680

681
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", ERRNO);
682
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
683
  }
684

685
  *moved = true;
686

687
  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
688

689
  return TSDB_CODE_SUCCESS;
690
}
691
*/
692

693
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
12,547,704✔
694
  int64_t now = taosGetTimestampMs();
12,549,055✔
695
  double  el = 0.0;
12,549,055✔
696
  if (pTask->redirectCtx.startTs != 0) {
12,549,055✔
697
    el = (now - pTask->redirectCtx.startTs) / 1000.0;
180,673✔
698
  }
699

700
  if (pJob->noMoreRetry) {
12,547,124✔
701
    *needRetry = false;
79,879✔
702
    SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes,
79,879✔
703
                  pTask->maxRetryTimes);
704
    return TSDB_CODE_SUCCESS;
79,879✔
705
  }
706

707
  // handle transport time out issue
708
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
12,470,145✔
709
    pTask->maxExecTimes++;
×
710
    pTask->maxRetryTimes++;
×
711
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
×
712
      pTask->timeoutUsec *= 2;
×
713
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
×
714
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
×
715
      }
716
    }
717
  }
718

719
  int32_t code = schFailedTaskNeedRetry(pTask, pJob, errCode);
12,470,145✔
720
  if (code != TSDB_CODE_SUCCESS) {
12,469,855✔
721
    *needRetry = false;
1,338✔
722
    return TSDB_CODE_SUCCESS;
1,338✔
723
  }
724

725
  if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) {
12,468,517✔
726
    *needRetry = false;
12,351,925✔
727
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:0x%x - %s", errCode, tstrerror(errCode));
12,349,124✔
728
    return TSDB_CODE_SUCCESS;
12,352,024✔
729
  }
730

731
  *needRetry = true;
112,150✔
732

733
  SCH_TASK_DLOG("task need the %d/%d retry, elapsedTime:%.2fs, errCode:0x%x - %s", pTask->execId + 1,
116,493✔
734
                pTask->maxRetryTimes, el, errCode, tstrerror(errCode));
735
  return TSDB_CODE_SUCCESS;
116,493✔
736
}
737

738
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
116,493✔
739
  (void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
116,493✔
740

741
  if (pTask->delayTimer) {
116,493✔
742
    UNUSED(taosTmrStop(pTask->delayTimer));
98,358✔
743
  }
744

745
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
116,493✔
746
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
116,493✔
747

748
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
116,493✔
749
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
×
750
  }
751

752
  schDeregisterTaskHb(pJob, pTask);
116,493✔
753

754
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
116,493✔
755
    pTask->delayExecMs = pTask->redirectCtx.redirectDelayMs;
116,493✔
756
    if (pTask->redirectCtx.startTs == 0) {
116,493✔
757
      pTask->redirectCtx.startTs = taosGetTimestampMs();
17,148✔
758
    }
759

760
    SCH_ERR_RET(schUpdateCurrentEpset(pTask, pJob));
116,493✔
761
  } else {
762
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
×
763
  }
764

765
  SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
116,493✔
766
  return TSDB_CODE_SUCCESS;
116,493✔
767
}
768

769
int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
71,547,289✔
770
  int32_t addNum = 0;
71,547,289✔
771
  int32_t nodeNum = 0;
71,547,289✔
772

773
  if (pJob->nodeList) {
71,547,289✔
774
    nodeNum = taosArrayGetSize(pJob->nodeList);
71,544,575✔
775

776
    for (int32_t i = 0; i < nodeNum; ++i) {
225,407,389✔
777
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
153,870,195✔
778
      SQueryNodeAddr *naddr = &nload->addr;
153,871,751✔
779

780
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
307,733,973✔
781
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, ERRNO);
×
782
        SCH_ERR_RET(terrno);
×
783
      }
784

785
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
153,862,512✔
786
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
787
                    SCH_GET_CUR_EP(naddr)->port);
788

789
      ++addNum;
153,863,685✔
790
    }
791
  }
792

793
  if (addNum <= 0) {
71,541,648✔
794
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
×
UNCOV
795
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
×
796
  }
797

798
  return TSDB_CODE_SUCCESS;
71,540,963✔
799
}
800

801
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
944,016,511✔
802
  if (NULL != pTask->candidateAddrs) {
944,016,511✔
803
    return TSDB_CODE_SUCCESS;
2,939,500✔
804
  }
805

806
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
941,681,384✔
807
  if (NULL == pTask->candidateAddrs) {
942,014,906✔
UNCOV
808
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
×
UNCOV
809
    SCH_ERR_RET(terrno);
×
810
  }
811

812
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
941,982,564✔
813
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
1,740,432,135✔
814
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", ERRNO);
×
815
      SCH_ERR_RET(terrno);
×
816
    }
817

818
    SCH_TASK_TLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
870,093,140✔
819

820
    return TSDB_CODE_SUCCESS;
869,977,916✔
821
  }
822

823
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
71,546,709✔
824
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
×
825
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
826
  }
827

828
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
71,548,356✔
829

830
  pTask->candidateIdx = taosRand() % taosArrayGetSize(pTask->candidateAddrs);
71,540,476✔
831
  return TSDB_CODE_SUCCESS;
71,549,029✔
832
}
833

834
#if 0
835
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
836
  int32_t code = TSDB_CODE_SUCCESS;
837
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
838
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
839
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
840
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
841
  }
842

843
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
844
  if (NULL == pAddr) {
845
    SCH_TASK_ELOG("fail to get task 0th condidataAddr, totalNum:%d", (int32_t)taosArrayGetSize(pTask->candidateAddrs));
846
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
847
  }
848

849
  char *origEpset = NULL;
850
  char *newEpset = NULL;
851

852
  SCH_ERR_RET(schDumpEpSet(&pAddr->epSet, &origEpset));
853
  SCH_ERR_JRET(schDumpEpSet(pEpSet, &newEpset));
854

855
  SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
856

857
  TAOS_MEMCPY(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
858

859
_return:
860

861
  taosMemoryFree(origEpset);
862
  taosMemoryFree(newEpset);
863

864
  return code;
865
}
866
#endif
867

868
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
1,270✔
869
  SQueryNodeAddr *pAddr = NULL;
1,270✔
870
  int32_t         code = 0;
1,270✔
871

872
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
1,270✔
873
  if (candidateNum <= 1) {
1,270✔
874
    goto _return;
×
875
  }
876

877
  switch (schMgmt.cfg.schPolicy) {
1,270✔
878
    case SCH_LOAD_SEQ:
1,270✔
879
    case SCH_ALL:
880
    default:
881
      if (++pTask->candidateIdx >= candidateNum) {
1,270✔
882
        pTask->candidateIdx = 0;
109✔
883
      }
884
      break;
1,270✔
885
    case SCH_RANDOM: {
×
886
      int32_t lastIdx = pTask->candidateIdx;
×
887
      while (lastIdx == pTask->candidateIdx) {
×
888
        pTask->candidateIdx = taosRand() % candidateNum;
×
889
      }
890
      break;
×
891
    }
892
  }
893

894
_return:
1,270✔
895
  code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
1,270✔
896
  if (code == TSDB_CODE_SUCCESS) {
1,270✔
897
    SCH_TASK_DLOG("switch task exec on candidateIdx:%d/%d, vgId:%d", pTask->candidateIdx, candidateNum, pAddr->nodeId);
1,270✔
898
  }
899
  return code;
1,270✔
900
}
901

902
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
2,941,171✔
903
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
2,941,171✔
904
  if (code) {
2,941,171✔
905
    SCH_TASK_WLOG("task already not in execTask list, code:0x%x", code);
2,716,431✔
906
  }
907

908
  return TSDB_CODE_SUCCESS;
2,941,171✔
909
}
910

911
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
361,712,832✔
912
  if (NULL == pTask->execNodes) {
361,712,832✔
913
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
×
914
    return;
×
915
  }
916

917
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
361,712,409✔
918
  if (size <= 0) {
361,712,699✔
919
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
37,462✔
920
    return;
37,462✔
921
  }
922

923
  int32_t       i = 0;
361,675,237✔
924
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
361,675,237✔
925
  while (nodeInfo) {
723,366,739✔
926
    if (nodeInfo->handle) {
361,691,465✔
927
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
361,691,469✔
928
      void *pExecId = taosHashGetKey(nodeInfo, NULL);
361,691,477✔
929
      (void)schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId);  // ignore error and continue
361,691,074✔
930

931
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
361,691,454✔
932
    } else {
933
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
×
934
    }
935

936
    ++i;
361,691,454✔
937
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
361,691,454✔
938
  }
939

940
  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
361,675,274✔
941
}
942

943
int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) {
9,563✔
944
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
9,563✔
945
  if (size <= 0) {
9,563✔
946
    SCH_TASK_DLOG("task no exec address to notify, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
×
947
    return TSDB_CODE_SUCCESS;
×
948
  }
949

950
  int32_t       i = 0;
9,563✔
951
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
9,563✔
952
  while (nodeInfo) {
19,126✔
953
    if (nodeInfo->handle) {
9,563✔
954
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
9,563✔
955
      SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_TASK_NOTIFY, &type));
9,563✔
956
      SCH_TASK_DLOG("start to notify %d to task's %dth execNode", type, i);
9,563✔
957
    } else {
958
      SCH_TASK_DLOG("no need to notify %d to task %dth execNode", type, i);
×
959
    }
960

961
    ++i;
9,563✔
962
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
9,563✔
963
  }
964

965
  SCH_TASK_DLOG("task has been notified %d on %d exec nodes", type, size);
9,563✔
966
  return TSDB_CODE_SUCCESS;
9,563✔
967
}
968

969
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
170,341,113✔
970
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
170,341,113✔
971
  SSchTask *pTask = NULL;
170,342,769✔
972
  SSchJob  *pJob = NULL;
170,342,479✔
973

974
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
170,342,315✔
975
         pEpId->ep.port);
976

977
  for (int32_t i = 0; i < taskNum; ++i) {
177,002,463✔
978
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
6,660,418✔
979
    if (NULL == pStatus) {
6,660,418✔
980
      qError("fail to get the %dth task status in hb rsp, taskNum:%d", i, taskNum);
×
981
      continue;
×
982
    }
983

984
    int32_t code = 0;
6,660,418✔
985

986
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d task status in server: %s", pStatus->queryId,
6,660,418✔
987
           pStatus->clientId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status));
988

989
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->subJobId, pStatus->taskId)) {
6,660,418✔
990
      continue;
2,214,191✔
991
    }
992

993
    if (pStatus->execId != pTask->execId) {
4,445,739✔
994
      // TODO
995
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
×
996
      schProcessOnCbEnd(pJob, pTask, 0);
×
997
      continue;
×
998
    }
999

1000
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
4,445,264✔
1001
      // RECORD AND HANDLE ERROR!!!!
1002
      schProcessOnCbEnd(pJob, pTask, 0);
1,160✔
1003
      continue;
1,160✔
1004
    }
1005

1006
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
4,444,347✔
1007
      code = schRescheduleTask(pJob, pTask);
53,376✔
1008
    }
1009

1010
    schProcessOnCbEnd(pJob, pTask, code);
4,444,710✔
1011
  }
1012

1013
  return TSDB_CODE_SUCCESS;
170,342,045✔
1014
}
1015

1016
int32_t schHandleExplainRes(SArray *pExplainRes) {
×
1017
  int32_t code = 0;
×
1018
  int32_t resNum = taosArrayGetSize(pExplainRes);
×
1019
  if (resNum <= 0) {
×
1020
    goto _return;
×
1021
  }
1022

1023
  SSchTask *pTask = NULL;
×
1024
  SSchJob  *pJob = NULL;
×
1025

1026
  for (int32_t i = 0; i < resNum; ++i) {
×
1027
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
×
1028
    if (NULL == localRsp) {
×
1029
      qError("fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1030
      continue;
×
1031
    }
1032

1033
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg",
×
1034
           localRsp->qId, localRsp->cId, localRsp->tId);
1035

1036
    pJob = NULL;
×
1037
    (void)schAcquireJob(localRsp->rId, &pJob);
×
1038
    if (NULL == pJob) {
×
1039
      qWarn("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " job no exist, may be dropped, refId:0x%" PRIx64,
×
1040
            localRsp->qId, localRsp->cId, localRsp->tId, localRsp->rId);
1041
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
×
1042
    }
1043

1044
    int8_t status = 0;
×
1045
    if (schJobNeedToStop(pJob, &status)) {
×
1046
      SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
×
1047
      (void)schReleaseJob(pJob->refId);
×
1048
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1049
    }
1050

1051
    code = schGetTaskInJob(pJob, localRsp->tId, &pTask);
×
1052

1053
    if (TSDB_CODE_SUCCESS == code) {
×
1054
      code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
×
1055
    }
1056

1057
    (void)schReleaseJob(pJob->refId);
×
1058

1059
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x",
×
1060
           localRsp->qId, localRsp->cId, localRsp->tId, code);
1061

1062
    SCH_ERR_JRET(code);
×
1063

1064
    localRsp->rsp.numOfPlans = 0;
×
1065
    localRsp->rsp.subplanInfo = NULL;
×
1066
    pTask = NULL;
×
1067
  }
1068

1069
_return:
×
1070

1071
  for (int32_t i = 0; i < resNum; ++i) {
×
1072
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
×
1073
    if (NULL == localRsp) {
×
1074
      qError("in _return fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1075
      continue;
×
1076
    }
1077

1078
    tFreeSExplainRsp(&localRsp->rsp);
×
1079
  }
1080

1081
  taosArrayDestroy(pExplainRes);
×
1082

1083
  SCH_RET(code);
×
1084
}
1085

1086
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
944,934,254✔
1087
  SSubplan *plan = pTask->plan;
944,934,254✔
1088
  int32_t   code = 0;
945,120,379✔
1089

1090
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
945,120,379✔
1091
    SCH_LOCK(SCH_WRITE, &pTask->planLock);
945,011,794✔
1092
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
944,656,873✔
1093
    SCH_UNLOCK(SCH_WRITE, &pTask->planLock);
944,063,510✔
1094

1095
    if (TSDB_CODE_SUCCESS != code) {
943,841,701✔
1096
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
×
1097
                    pTask->msgLen);
1098
      SCH_ERR_RET(code);
×
1099
    } else if (tsQueryPlannerTrace) {
943,841,701✔
1100
      char   *msg = NULL;
24,000✔
1101
      int32_t msgLen = 0;
24,000✔
1102
      SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
24,000✔
1103
      SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
24,000✔
1104
      taosMemoryFree(msg);
24,000✔
1105
    }
1106
  }
1107

1108
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
943,809,531✔
1109

1110
  if (SCH_IS_QUERY_JOB(pJob)) {
944,417,959✔
1111
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
361,220,212✔
1112
  }
1113

1114
  SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType, NULL));
944,707,556✔
1115
}
1116

1117
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
3,300✔
1118
  // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
1119
  if (NULL == schMgmt.queryMgmt) {
3,300✔
1120
    void* p = NULL;
75✔
1121
    SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, &p, NULL));
75✔
1122
    if (atomic_val_compare_exchange_ptr(&schMgmt.queryMgmt, NULL, p)) {
75✔
1123
      qWorkerDestroy(&p);
×
1124
    }
1125
  }
1126

1127
  SArray *explainRes = NULL;
3,300✔
1128
  int32_t code = 0;
3,300✔
1129
  SQWMsg  qwMsg = {0};
3,300✔
1130
  qwMsg.msgInfo.taskType = (pJob->attr.type == JOB_TYPE_HQUERY)? TASK_TYPE_HQUERY:TASK_TYPE_QUERY;
3,300✔
1131
  qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
3,300✔
1132
  qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
3,300✔
1133
  qwMsg.msg = pTask->plan;
3,300✔
1134
  qwMsg.msgType = pTask->plan->msgType;
3,300✔
1135
  qwMsg.connInfo.handle = pJob->conn.pTrans;
3,300✔
1136
  qwMsg.pWorkerCb = pJob->pWorkerCb;
3,300✔
1137
  qwMsg.subEndPoints = NULL; // TODO
3,300✔
1138

1139
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
3,300✔
1140
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
×
1141
    if (NULL == explainRes) {
×
1142
      SCH_ERR_RET(terrno);
×
1143
    }
1144
  }
1145

1146
  SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, pJob->seriesId, pJob->queryId, pTask->clientId, pTask->taskId,
3,300✔
1147
                                        pJob->refId, pTask->execId, &qwMsg, explainRes));
1148

1149
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
3,300✔
1150
    SCH_ERR_RET(schHandleExplainRes(explainRes));
×
1151
    explainRes = NULL;
×
1152
  }
1153

1154
  SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
3,300✔
1155

1156
_return:
3,300✔
1157

1158
  taosArrayDestroy(explainRes);
3,300✔
1159

1160
  SCH_RET(code);
3,300✔
1161
}
1162

1163
int32_t schLaunchTaskImpl(void *param) {
945,215,650✔
1164
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
945,215,650✔
1165
  SSchJob     *pJob = NULL, *pParent = NULL;
945,215,650✔
1166

1167
  (void)schAcquireJob(pCtx->jobRid, &pParent);
945,233,455✔
1168
  if (NULL == pParent) {
945,260,268✔
1169
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
2,337✔
1170
    taosMemoryFree(param);
2,337✔
1171
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
1,962✔
1172
  }
1173

1174
  if (pCtx->subJobId >= 0) {
945,257,931✔
1175
    pJob = taosArrayGetP(pParent->subJobs, pCtx->subJobId);
113,992,295✔
1176
    if (NULL == pJob) {
113,956,782✔
1177
      qDebug("subJobId %d not found in subJobs, totalSubJobNum:%d", pCtx->subJobId, (int32_t)taosArrayGetSize(pParent->subJobs));
×
1178
      (void)schReleaseJob(pParent->refId);
×
1179

1180
      taosMemoryFree(param);
×
1181
      SCH_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1182
    }
1183
  } else {
1184
    pJob = pParent;
831,247,673✔
1185
  }
1186

1187
  SSchTask *pTask = pCtx->pTask;
945,204,455✔
1188

1189
  if (pCtx->asyncLaunch) {
945,227,399✔
1190
    SCH_LOCK_TASK(pTask);
232,878,484✔
1191
  }
1192

1193
  pTask->execId++;
945,159,895✔
1194
  pTask->retryTimes++;
945,199,121✔
1195
  pTask->waitRetry = false;
945,156,647✔
1196

1197
  int8_t  status = 0;
945,157,590✔
1198
  int32_t code = 0;
945,163,831✔
1199

1200
  if (atomic_load_64(&pTask->seriesId) < atomic_load_64(&pJob->seriesId)) {
945,163,831✔
1201
    SCH_TASK_DLOG("task seriesId:0x%" PRIx64 " is smaller than job seriesId:0x%" PRIx64 ", skip launch",
×
1202
                  pTask->seriesId, pJob->seriesId);
1203
    goto _return;
×
1204
  }
1205

1206
  (void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
945,100,010✔
1207

1208
  SCH_TASK_TLOG("start to launch %s task, execId %d, retry %d",
945,250,196✔
1209
                SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
1210

1211
  SCH_LOG_TASK_START_TS(pTask);
2,147,483,647✔
1212

1213
  if (schJobNeedToStop(pJob, &status)) {
945,126,297✔
1214
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
9,113✔
1215
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
9,113✔
1216
  }
1217

1218
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
945,247,569✔
1219
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
942,332,685✔
1220
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
942,029,270✔
1221
  }
1222

1223
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
945,188,564✔
1224
    SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
3,300✔
1225
  } else {
1226
    SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
945,189,275✔
1227
  }
1228

1229
#if 0
1230
  if (SCH_IS_QUERY_JOB(pJob)) {
1231
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
1232
  }
1233
#endif
1234

1235
_return:
945,070,519✔
1236

1237
  if (pJob && pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
945,082,627✔
1238
    if (code) {
232,833,271✔
1239
      code = schProcessOnTaskFailure(pJob, pTask, code);
9,113✔
1240
    }
1241
    if (code) {
232,833,271✔
1242
      code = schHandleJobFailure(pJob, code);
9,113✔
1243
    }
1244
  }
1245

1246
  if (pCtx->asyncLaunch) {
945,106,801✔
1247
    SCH_UNLOCK_TASK(pTask);
232,874,139✔
1248
  }
1249

1250
  (void)schReleaseJob(pParent->refId);
945,042,451✔
1251

1252
  taosMemoryFree(param);
945,269,229✔
1253

1254
  SCH_RET(code);
945,148,997✔
1255
}
1256

1257
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
945,241,290✔
1258
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
945,241,290✔
1259
  if (NULL == param) {
945,226,072✔
1260
    SCH_ERR_RET(terrno);
×
1261
  }
1262

1263
  param->jobRid = pJob->refId;
945,226,072✔
1264
  param->subJobId = pJob->subJobId;
945,249,119✔
1265
  param->pTask = pTask;
945,253,425✔
1266

1267
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
945,243,825✔
1268
    param->asyncLaunch = true;
232,916,524✔
1269
    SCH_ERR_RET(taosAsyncExec(schLaunchTaskImpl, param, NULL));
232,917,112✔
1270
  } else {
1271
    SCH_ERR_RET(schLaunchTaskImpl(param));
712,301,675✔
1272
  }
1273

1274
  return TSDB_CODE_SUCCESS;
945,268,288✔
1275
}
1276

1277
// Note: no more error processing, handled in function internal
1278
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
945,239,042✔
1279
  bool    enough = false;
945,239,042✔
1280
  int32_t code = 0;
945,255,715✔
1281

1282
  SCH_SET_TASK_HANDLE(pTask, NULL);
945,255,715✔
1283

1284
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
945,252,111✔
1285
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
×
1286

1287
    if (enough) {
×
1288
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
×
1289
    }
1290
  } else {
1291
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
945,246,374✔
1292
  }
1293

1294
  return TSDB_CODE_SUCCESS;
945,266,305✔
1295

1296
_return:
×
1297
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
×
1298
}
1299

1300
void schHandleTimerEvent(void *param, void *tmrId) {
119,908✔
1301
  SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
119,908✔
1302
  SSchTask       *pTask = NULL;
119,908✔
1303
  SSchJob        *pJob = NULL;
119,908✔
1304
  int32_t         code = 0;
119,908✔
1305

1306
  qDebug("delayTimer:%" PRIuPTR " is launched", (uintptr_t)tmrId);
119,908✔
1307

1308
  int64_t  rId = pTimerParam->rId;
119,908✔
1309
  uint64_t queryId = pTimerParam->queryId;
119,908✔
1310
  uint64_t taskId = pTimerParam->taskId;
119,908✔
1311
  int32_t subJobId = pTimerParam->subJobId;
119,908✔
1312

1313
  if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, subJobId, taskId)) {
119,908✔
1314
    return;
111✔
1315
  }
1316

1317
  if (0 == atomic_load_8(&pTask->delayLaunchPar.exit)) {
119,797✔
1318
    code = schLaunchTask(pJob, pTask);
119,797✔
1319
  } else {
1320
    SCH_TASK_DLOG("task will not be launched since query job exiting, status: %d", pTask->status);
×
1321
  }
1322

1323
  schProcessOnCbEnd(pJob, pTask, code);
119,797✔
1324
}
1325

1326
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
945,244,781✔
1327
  if (pTask->delayExecMs > 0) {
945,244,781✔
1328
    pTask->delayLaunchPar.rId = pJob->refId;
121,357✔
1329
    pTask->delayLaunchPar.queryId = pJob->queryId;
121,357✔
1330
    pTask->delayLaunchPar.taskId = pTask->taskId;
121,357✔
1331
    pTask->delayLaunchPar.subJobId = pJob->subJobId;
121,357✔
1332

1333
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
121,357✔
1334
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
121,357✔
1335

1336
    if (NULL == pTask->delayTimer) {
121,357✔
1337
      pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer);
18,135✔
1338
      if (NULL == pTask->delayTimer) {
18,135✔
1339
        SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
×
1340
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1341
      }
1342

1343
      SCH_TASK_DLOG("task delayTimer:%" PRIuPTR " is started to launch task after:%.2fs", (uintptr_t)pTask->delayTimer,
18,135✔
1344
                    pTask->delayExecMs/1000.0);
1345
      return TSDB_CODE_SUCCESS;
18,135✔
1346
    }
1347

1348
    if (taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer,
103,222✔
1349
                     &pTask->delayTimer)) {
1350
      SCH_TASK_ELOG("taosTmrReset delayExec timer failed, handle:%p, tmr:%" PRIuPTR, schMgmt.timer,
×
1351
                    (uintptr_t)pTask->delayTimer);
1352
    } else {
1353
      SCH_TASK_DLOG("task start in %.2fs later by handler:%p, tmr:%" PRIuPTR, pTask->delayExecMs / 1000.0,
103,222✔
1354
                    schMgmt.timer, (uintptr_t)pTask->delayTimer);
1355
    }
1356

1357
    return TSDB_CODE_SUCCESS;
103,222✔
1358
  }
1359

1360
  SCH_RET(schLaunchTask(pJob, pTask));
945,118,662✔
1361
}
1362

1363
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
749,859,884✔
1364
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
749,859,884✔
1365

1366
  for (int32_t i = 0; i < level->taskNum; ++i) {
1,614,686,483✔
1367
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
864,821,160✔
1368
    pTask->failedSeriesId = pJob->seriesId - 1;
864,808,031✔
1369
    pTask->seriesId = pJob->seriesId;
864,809,150✔
1370
    
1371
    SCH_TASK_TLOG("task sId is set:%" PRIx64, pTask->seriesId);
864,811,959✔
1372
    SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
864,811,959✔
1373
  }
1374

1375
  return TSDB_CODE_SUCCESS;
749,883,861✔
1376
}
1377

1378
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
773,606,839✔
1379
  if (!SCH_JOB_NEED_DROP(pJob)) {
773,606,839✔
1380
    return;
565,203,416✔
1381
  }
1382

1383
  void *pIter = taosHashIterate(list, NULL);
208,406,471✔
1384
  while (pIter) {
567,296,281✔
1385
    SSchTask *pTask = *(SSchTask **)pIter;
358,888,022✔
1386

1387
    if (pTask->delayTimer) {
358,888,054✔
1388
      schStopTaskDelayTimer(pJob, pTask, true);
18,135✔
1389
    }
1390

1391
    SCH_LOCK_TASK(pTask);
358,888,066✔
1392
    schDropTaskOnExecNode(pJob, pTask);
358,888,041✔
1393
    SCH_UNLOCK_TASK(pTask);
358,888,066✔
1394

1395
    pIter = taosHashIterate(list, pIter);
358,888,118✔
1396
  }
1397
}
1398

1399
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pCurrTask) {
1,187✔
1400
  int32_t code = TSDB_CODE_SUCCESS;
1,187✔
1401

1402
  SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type));
1,187✔
1403

1404
  void *pIter = taosHashIterate(list, NULL);
1,187✔
1405
  while (pIter) {
10,750✔
1406
    SSchTask *pTask = *(SSchTask **)pIter;
9,563✔
1407
    if (pTask != pCurrTask) {
9,563✔
1408
      SCH_LOCK_TASK(pTask);
8,376✔
1409
      code = schNotifyTaskOnExecNode(pJob, pTask, type);
8,376✔
1410
      SCH_UNLOCK_TASK(pTask);
8,376✔
1411

1412
      if (TSDB_CODE_SUCCESS != code) {
8,376✔
1413
        break;
×
1414
      }
1415
    }
1416

1417
    pIter = taosHashIterate(list, pIter);
9,563✔
1418
  }
1419

1420
  SCH_RET(code);
1,187✔
1421
}
1422

1423
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
116,407,682✔
1424
  SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL));
116,407,682✔
1425
}
1426

1427
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
3,300✔
1428
  void   *pRsp = NULL;
3,300✔
1429
  int32_t code = 0;
3,300✔
1430
  SArray *explainRes = NULL;
3,300✔
1431

1432
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
3,300✔
1433
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
×
1434
    if (NULL == explainRes) {
×
1435
      SCH_ERR_RET(terrno);
×
1436
    }
1437
  }
1438

1439
  SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, pJob->seriesId, pJob->queryId, pTask->clientId, pTask->taskId,
3,300✔
1440
                                        pJob->refId, pTask->execId, &pRsp, explainRes));
1441

1442
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
3,300✔
1443
    SCH_ERR_RET(schHandleExplainRes(explainRes));
×
1444
    explainRes = NULL;
×
1445
  }
1446

1447
  SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
3,300✔
1448

1449
_return:
3,300✔
1450

1451
  taosArrayDestroy(explainRes);
3,300✔
1452

1453
  SCH_RET(code);
3,300✔
1454
}
1455

1456
// Note: no more error processing, handled in function internal
1457
int32_t schLaunchFetchTask(SSchJob *pJob) {
116,410,982✔
1458
  int32_t code = 0;
116,410,982✔
1459

1460
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
116,410,982✔
1461
  if (fetchRes) {
116,410,978✔
1462
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
×
1463
    return TSDB_CODE_SUCCESS;
×
1464
  }
1465

1466
  SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH);
116,410,978✔
1467

1468
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
116,411,014✔
1469
    SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
3,300✔
1470
  } else {
1471
    SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
116,407,714✔
1472
  }
1473

1474
  return TSDB_CODE_SUCCESS;
116,410,978✔
1475

1476
_return:
×
1477

1478
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
×
1479
}
1480

1481
int32_t schUpdateCurrentEpset(SSchTask *pTask, SSchJob *pJob) {
116,493✔
1482
  char            buf[256] = {0};
116,493✔
1483
  SQueryNodeAddr *pAddr = NULL;
116,493✔
1484
  int32_t         code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
116,493✔
1485
  if (code != TSDB_CODE_SUCCESS) {
116,493✔
1486
    SCH_ERR_RET(code);
×
1487
  }
1488

1489
  // switch to the next ep in the epset
1490
  SCH_SWITCH_EPSET(pAddr);
116,493✔
1491

1492
  int32_t ret = epsetToStr(&pAddr->epSet, buf, tListLen(buf));
116,493✔
1493
  if (ret != 0) {  // print error and continue
116,493✔
1494
    SCH_TASK_ELOG("failed to print vgId:%d epset, code:%s", pAddr->nodeId, tstrerror(ret));
×
1495
  }
1496

1497
  // Wait for a while since the vnode leader/follower switch may cost from several seconds
1498
  // to serveral minitues to complete.
1499
  SCH_TASK_DLOG("vgId:%d switch to next ep:%s to start task delay:%.2fs, startTs:%" PRId64, pAddr->nodeId, buf,
116,493✔
1500
                pTask->delayExecMs / 1000.0, pTask->redirectCtx.startTs);
1501

1502
  return TSDB_CODE_SUCCESS;
116,493✔
1503
}
1504

1505
int32_t schFailedTaskNeedRetry(SSchTask *pTask, SSchJob *pJob, int32_t rspCode) {
15,347,170✔
1506
  double el = (pTask->redirectCtx.startTs > 0) ? (taosGetTimestampMs() - pTask->redirectCtx.startTs) / 1000.0 : 0.0;
18,244,274✔
1507

1508
  // check if the failed task may cause the query job to quit retrying
1509
  if (pTask->retryTimes >= pTask->maxRetryTimes) {
15,347,170✔
1510
    pJob->noMoreRetry = true;
81,217✔
1511
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d, elapsedTime:%.2fs",
81,217✔
1512
                  pTask->retryTimes, pTask->maxRetryTimes, el);
1513
    return rspCode;
81,217✔
1514
  }
1515

1516
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
15,257,932✔
1517
    pJob->noMoreRetry = true;
×
1518
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d, elapsedTime:%.2fs", pTask->execId,
×
1519
                  pTask->maxExecTimes, el);
1520
    return rspCode;
×
1521
  }
1522

1523
  return TSDB_CODE_SUCCESS;
15,264,503✔
1524
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc