• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4908

30 Dec 2025 10:52AM UTC coverage: 65.386% (-0.2%) from 65.541%
#4908

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

1330 existing lines in 113 files now uncovered.

193461 of 295877 relevant lines covered (65.39%)

115765274.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.09
/source/libs/scheduler/src/schTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "query.h"
17
#include "qworker.h"
18
#include "schInt.h"
19
#include "taoserror.h"
20
#include "tglobal.h"
21
#include "tmsg.h"
22
#include "trpc.h"
23
#include "tmisce.h"
24

25
static int32_t schUpdateCurrentEpset(SSchTask *pTask, SSchJob* pJob);
26

27
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
895,301,284✔
28
  schDeregisterTaskHb(pJob, pTask);
895,301,284✔
29

30
  if (pTask->candidateAddrs) {
895,296,356✔
31
    taosArrayDestroy(pTask->candidateAddrs);
850,406,014✔
32
  }
33

34
  taosMemoryFreeClear(pTask->msg);
895,292,224✔
35

36
  if (pTask->children) {
895,294,892✔
37
    taosArrayDestroy(pTask->children);
74,507,871✔
38
  }
39

40
  if (pTask->parents) {
895,301,900✔
41
    taosArrayDestroy(pTask->parents);
162,135,075✔
42
  }
43

44
  if (pTask->execNodes) {
895,300,898✔
45
    taosHashCleanup(pTask->execNodes);
895,300,858✔
46
  }
47

48
  taosArrayDestroy(pTask->profile.execTime);
895,292,152✔
49
}
895,290,240✔
50

51
void schInitTaskRetryInfo(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
895,272,242✔
52
  pTask->redirectCtx.redirectDelayMs = 2000;  // 2s by default
895,272,242✔
53

54
  // 3 is the maximum replica factor in tsdb, so here multiply 3 to increase the retry chance
55
  int32_t REPLICA_FACTOR = 3;
895,292,701✔
56

57
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
895,292,701✔
58
    int32_t retryNum = ceil((tsMaxRetryWaitTime * 1.0) / pTask->redirectCtx.redirectDelayMs);
895,273,469✔
59
    pTask->maxRetryTimes = TMAX(retryNum, SCH_DEFAULT_MAX_RETRY_NUM * REPLICA_FACTOR);
895,262,149✔
60
  } else {
UNCOV
61
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
×
UNCOV
62
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM * REPLICA_FACTOR);
×
63
  }
64

65
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
895,276,340✔
66
}
895,275,800✔
67

68
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
895,275,880✔
69
  int32_t code = 0;
895,275,880✔
70

71
  pTask->plan = pPlan;
895,275,880✔
72
  pTask->level = pLevel;
895,297,052✔
73
  pTask->seriesId = pJob->seriesId;
895,292,350✔
74
  pTask->execId = -1;
895,279,181✔
75
  pTask->failedExecId = -2;
895,289,454✔
76
  pTask->failedSeriesId = 0;
895,275,242✔
77
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
895,280,317✔
78
  pTask->clientId = getClientId();
895,255,128✔
79
  pTask->taskId = schGenTaskId();
895,250,305✔
80

81
  schInitTaskRetryInfo(pJob, pTask, pLevel);
895,287,527✔
82

83
  pTask->execNodes =
895,307,631✔
84
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
895,260,745✔
85
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
895,307,633✔
86
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
895,253,799✔
87
    SCH_ERR_JRET(terrno);
×
88
  }
89

90
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
895,288,226✔
91

92
  SCH_TASK_DLOG("task initialized, max retry(exec):%d(%d), max retry duration:%.2fs", pTask->maxRetryTimes,
895,304,704✔
93
                pTask->maxExecTimes, (pTask->redirectCtx.redirectDelayMs * pTask->maxRetryTimes) / 1000.0);
94

95
  return TSDB_CODE_SUCCESS;
895,294,751✔
96

97
_return:
×
98

99
  taosArrayDestroy(pTask->profile.execTime);
×
100
  taosHashCleanup(pTask->execNodes);
×
101

102
  SCH_RET(code);
×
103
}
104

105
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
842,241,775✔
106
  char            buf[256] = {0};
842,241,775✔
107
  SQueryNodeAddr *pAddr = NULL;
842,246,347✔
108

109
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
842,237,460✔
110
    return TSDB_CODE_SUCCESS;
2,068✔
111
  }
112

113
  int32_t code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
842,240,117✔
114
  if (code != TSDB_CODE_SUCCESS) {
842,255,103✔
115
    SCH_ERR_RET(code);
×
116
  }
117

118
  pTask->succeedAddr = *pAddr;
842,255,103✔
119
  code = epsetToStr(&pAddr->epSet, buf, tListLen(buf));
842,252,962✔
120
  if (code == TSDB_CODE_SUCCESS) {
842,244,567✔
121
    SCH_TASK_DLOG("recode the success addr:%s", buf);
842,244,577✔
122
  } else {
123
    SCH_TASK_ELOG("failed to print epset due to convert to string failed, code:%s, ignore and continue",
×
124
                  tstrerror(code));
125
  }
126

127
  return TSDB_CODE_SUCCESS;
842,250,597✔
128
}
129

130
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
292,239,302✔
131
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
292,239,302✔
132

133
  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
292,418,109✔
134
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", ERRNO);
×
135
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
136
  }
137

138
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
292,424,023✔
139

140
  return TSDB_CODE_SUCCESS;
292,304,165✔
141
}
142

143
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
133,809✔
144
  if (NULL == pTask->execNodes) {
133,809✔
145
    return TSDB_CODE_SUCCESS;
×
146
  }
147

148
  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
133,809✔
149
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
×
150
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
151
  } else {
152
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
133,809✔
153
  }
154

155
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
133,809✔
156
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
×
157
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
158
  }
159

160
  return TSDB_CODE_SUCCESS;
133,809✔
161
}
162

163
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
411,209,650✔
164
  if (taosHashGetSize(pTask->execNodes) <= 0) {
411,209,650✔
165
    return TSDB_CODE_SUCCESS;
×
166
  }
167

168
  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
411,222,620✔
169
  if (NULL == nodeInfo) {  // ignore it
411,227,037✔
170
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
×
171
                  pTask->execId, pTask->waitRetry);
172
    return TSDB_CODE_SUCCESS;
×
173
  }
174

175
  nodeInfo->handle = handle;
411,227,037✔
176

177
  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);
411,224,418✔
178

179
  return TSDB_CODE_SUCCESS;
411,230,322✔
180
}
181

182
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, uint64_t seriesId, int32_t execId) {
411,346,859✔
183
  if (dropExecNode) {
411,346,859✔
184
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
133,809✔
185
  }
186

187
  SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId));
411,213,050✔
188

189
  if ((seriesId != pTask->seriesId || seriesId <= pTask->failedSeriesId) || 
411,231,202✔
190
      (execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) {  // ignore it
411,233,257✔
191
    SCH_TASK_DLOG("handle not updated since seriesId:0x%" PRIx64 " or execId:%d is not lastest,"
×
192
                  "current seriesId:0x%" PRIx64 " execId %d, failedSeriesId:0x%" PRIx64 " failedExecId:%d, waitRetry %d", 
193
                  seriesId, execId, pTask->seriesId, pTask->execId, pTask->failedSeriesId, pTask->failedExecId, pTask->waitRetry);
194
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
195
  }
196

197
  SCH_SET_TASK_HANDLE(pTask, handle);
411,228,891✔
198

199
  return TSDB_CODE_SUCCESS;
411,204,980✔
200
}
201

202
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
7,828,606✔
203
  bool    needRetry = false;
7,828,606✔
204
  bool    moved = false;
7,828,606✔
205
  int32_t taskDone = 0;
7,828,606✔
206
  int8_t  jobStatus = 0;
7,828,606✔
207

208
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
7,828,157✔
209
    return TSDB_CODE_SCH_IGNORE_ERROR;
2,482✔
210
  }
211

212
  pTask->failedExecId = pTask->execId;
7,825,675✔
213
  if (schJobNeedToStop(pJob, &jobStatus)) {
7,825,675✔
214
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
1,905✔
215
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
1,905✔
216
  }
217

218
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
7,825,117✔
219
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
7,825,117✔
220
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
×
221
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
×
222
  }
223

224
  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
7,825,117✔
225
    SCH_LOG_TASK_WAIT_TS(pTask);
×
226
  } else {
227
    SCH_LOG_TASK_END_TS(pTask);
7,825,117✔
228
  }
229

230
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
7,825,117✔
231

232
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
7,825,117✔
233

234
  if (!needRetry) {
7,825,117✔
235
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
7,710,740✔
236
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);
7,712,197✔
237

238
    if (SCH_JOB_NEED_WAIT(pJob)) {
7,710,740✔
239
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
631,820✔
240
      pTask->level->taskFailed++;
631,820✔
241
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
631,820✔
242
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
631,820✔
243

244
      schUpdateJobErrCode(pJob, errCode);
631,820✔
245

246
      if (taskDone < pTask->level->taskNum) {
631,820✔
247
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
1,143✔
248
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
1,143✔
249
      }
250

251
      SCH_RET(atomic_load_32(&pJob->errCode));
630,677✔
252
    }
253
  } else {
254
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
114,377✔
255
    return TSDB_CODE_SUCCESS;
114,377✔
256
  }
257

258
  SCH_RET(errCode);
7,078,920✔
259
}
260

261
int32_t schProcessOnSubJobSuccess(SSchJob *pJob) {
48,331,173✔
262
  SSchJob* pParent = pJob->parent;
48,331,173✔
263
  int64_t doneNum = atomic_add_fetch_32(&pParent->subJobDoneNum, 1);
48,331,173✔
264

265
  if (pParent->subJobExecIdx < pParent->subJobs->size) {
48,331,173✔
266
    SCH_RET(schLaunchJobImpl(taosArrayGetP(pParent->subJobs, pParent->subJobExecIdx++)));
33,530,967✔
267
  }
268
  
269
  if (SCH_SUB_JOBS_EXEC_FINISHED(pParent, doneNum)) {
14,800,206✔
270
    return schLaunchJobImpl(pParent);
14,800,206✔
271
  }
272

273
  return TSDB_CODE_SUCCESS;
×
274
}
275

276
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
842,256,287✔
277
  bool    moved = false;
842,256,287✔
278
  int32_t code = 0;
842,256,287✔
279

280
  SCH_TASK_TLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
842,256,287✔
281

282
  SCH_LOG_TASK_END_TS(pTask);
842,259,500✔
283

284
  int32_t taskDone = atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1);
842,275,925✔
285

286
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
842,272,042✔
287

288
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
842,277,250✔
289
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
842,254,796✔
290

291
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
842,254,888✔
292
  if (parentNum == 0) {
842,240,598✔
293
    int32_t taskDone = 0;
705,898,398✔
294
    if (SCH_JOB_NEED_WAIT(pJob)) {
705,898,398✔
295
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
559,485,408✔
296
      pTask->level->taskSucceed++;
559,480,219✔
297
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
559,483,510✔
298
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
559,479,001✔
299

300
      if (taskDone < pTask->level->taskNum) {
559,479,794✔
301
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
17,452,690✔
302
        return TSDB_CODE_SUCCESS;
17,452,564✔
303
      }
304
      
305
      SCH_TASK_TLOG("taskDone number reach level task number, done:%d, total:%d", taskDone, pTask->level->taskNum);
542,012,755✔
306

307
      if (pTask->level->taskFailed > 0) {
542,012,618✔
308
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
651✔
309
      }
310

311
      SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
542,016,141✔
312
    }
313
    
314
    pJob->resNode = pTask->succeedAddr;
146,411,551✔
315
    pJob->fetchTask = pTask;
146,416,247✔
316

317
    if (SCH_IS_PARENT_JOB(pJob)) {
146,415,970✔
318
      SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
98,084,977✔
319
    } else {
320
      SCH_RET(schProcessOnSubJobSuccess(pJob));
48,331,173✔
321
    }
322
  }
323

324
  for (int32_t i = 0; i < parentNum; ++i) {
272,713,048✔
325
    SSchTask *pParent = *(SSchTask **)taosArrayGet(pTask->parents, i);
136,354,775✔
326
    if (NULL == pParent) {
136,363,392✔
327
      SCH_TASK_ELOG("fail to get task %d pParent, parentNum:%d", i, parentNum);
×
328
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
329
    }
330

331
    SCH_LOCK(SCH_WRITE, &pParent->planLock);
136,363,392✔
332
    SDownstreamSourceNode source = {
272,877,057✔
333
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
334
        .clientId = pTask->clientId,
136,371,357✔
335
        .taskId = pTask->taskId,
136,370,974✔
336
        .sId = pTask->seriesId,
136,370,475✔
337
        .execId = pTask->execId,
136,370,811✔
338
        .addr = pTask->succeedAddr,
136,371,362✔
339
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
136,371,362✔
340
        .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
136,371,352✔
341
    };
342

343
    code = qSetSubplanExecutionNode(pParent->plan, pTask->plan->id.groupId, &source);
136,371,362✔
344
    if (TSDB_CODE_SUCCESS != code) {
136,370,239✔
345
      SCH_TASK_ELOG("qSetSubplanExecutionNode failed, groupId: %d", pTask->plan->id.groupId);
×
346
    }
347

348
    SCH_UNLOCK(SCH_WRITE, &pParent->planLock);
136,370,239✔
349
    SCH_ERR_RET(code);
136,365,919✔
350

351
    int32_t readyNum = atomic_add_fetch_32(&pParent->childReady, 1);
136,365,919✔
352

353
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, pParent)) {
136,370,245✔
354
      // this is a redirect process for parent task, since the original pParent task has already failed before.
355
      // TODO refactor optimize: update the candidate address
356
      // set the address from the pTask->succeedAddr, the vnode that successfully executed subquery already
357
      if (pParent->redirectCtx.inRedirect && (!SCH_IS_DATA_BIND_TASK(pParent))) {
63,672,911✔
358
        code = schSwitchTaskCandidateAddr(pJob, pParent);
1,289✔
359

360
        // if all vnodes are tried, let's switch the epset for each vnode for the next round
361
        if (pParent->retryTimes > taosArrayGetSize(pParent->candidateAddrs)) {
1,289✔
362
          SCH_ERR_RET(schUpdateCurrentEpset(pParent, pJob));
×
363
        }
364
      }
365

366
      SCH_TASK_DLOG("all %d children task done, start to launch parent task, TID:0x%" PRIx64, readyNum, pParent->taskId);
63,672,911✔
367

368
      pParent->seriesId = pJob->seriesId;
63,674,712✔
369
      TSWAP(pTask, pParent);
63,672,911✔
370
      SCH_TASK_TLOG("task seriesId set to 0x%" PRIx64, pTask->seriesId);
63,672,911✔
371
      TSWAP(pTask, pParent);
63,672,911✔
372

373
      SCH_ERR_RET(schDelayLaunchTask(pJob, pParent));
63,672,911✔
374
    }
375
  }
376

377
  if (taskDone == pTask->level->taskNum) {
136,358,273✔
378
    SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));
57,611,428✔
379
  }
380

381
  return TSDB_CODE_SUCCESS;
136,369,797✔
382
}
383

384
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
45,146✔
385
  if (!schMgmt.cfg.enableReSchedule) {
45,146✔
386
    return TSDB_CODE_SUCCESS;
45,146✔
387
  }
388

389
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
×
390
    return TSDB_CODE_SUCCESS;
×
391
  }
392

393
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
×
394
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
×
395
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
×
396
    schDropTaskOnExecNode(pJob, pTask);
×
397
    taosHashClear(pTask->execNodes);
×
398

399
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
×
400
  }
401

402
  return TSDB_CODE_SUCCESS;
×
403
}
404

405
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, bool resetRetry) {
2,073,070✔
406
  SSchRedirectCtx *pCtx = &pTask->redirectCtx;
2,073,070✔
407

408
  pCtx->inRedirect = true;
2,073,070✔
409

410
  if (resetRetry) {
2,073,070✔
411
    pCtx->startTs = taosGetTimestampMs();  //always reset startTs & retryTimes for succ task
24,666✔
412
    pTask->retryTimes = 0;
24,666✔
413

414
    SCH_TASK_DLOG("reset succ task retryInfo, start %d/%d redirect retry, delayExec:%.2fs, startTs:%" PRId64,
24,666✔
415
                  pTask->retryTimes, pTask->maxRetryTimes, pTask->delayExecMs / 1000.0, pCtx->startTs);
416
  } else {
417
    double elapsed = 0;
2,048,404✔
418
    if (pCtx->startTs <= 0) {
2,048,404✔
419
      pCtx->startTs = taosGetTimestampMs();
59,298✔
420
    } else {
421
      elapsed = (taosGetTimestampMs() - pCtx->startTs) / 1000.0;
1,989,106✔
422
    }
423

424
    SCH_TASK_DLOG("update failed task redirectCtx, current %d/%d redirect retry, delayExec:%.2fs, startTs:%" PRId64
2,048,404✔
425
                  " ,elapsed:%.2fs",
426
                  pTask->retryTimes, pTask->maxRetryTimes, pTask->delayExecMs / 1000.0, pCtx->startTs, elapsed);
427
  }
428

429
  return TSDB_CODE_SUCCESS;
2,073,070✔
430
}
431

432
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
2,073,070✔
433
  pTask->waitRetry = true;
2,073,070✔
434

435
  if (pTask->delayTimer) {
2,073,070✔
436
    UNUSED(taosTmrStop(pTask->delayTimer));
3,017✔
437
  }
438

439
  schDropTaskOnExecNode(pJob, pTask);
2,073,070✔
440
  taosHashClear(pTask->execNodes);
2,073,070✔
441
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
2,073,070✔
442
  schDeregisterTaskHb(pJob, pTask);
2,073,070✔
443
  taosMemoryFreeClear(pTask->msg);
2,073,070✔
444

445
  pTask->msgLen = 0;
2,073,070✔
446
  pTask->lastMsgType = 0;
2,073,070✔
447
  pTask->childReady = 0;
2,073,070✔
448
  TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
2,073,070✔
449
}
2,073,070✔
450

451
#if 0
452

453
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
454
  int32_t code = 0;
455

456
  SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
457

458
  if (!NO_RET_REDIRECT_ERROR(rspCode)) {
459
    SCH_UPDATE_REDIRECT_CODE(pJob, rspCode);
460
  }
461

462
  SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
463

464
  schResetTaskForRetry(pJob, pTask);
465

466
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
467
    if (pData && pData->pEpSet) {
468
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
469
    } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
470
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
471
      if (NULL == addr) {
472
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
473
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
474
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
475
      }
476

477
      SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
478
      SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
479
                    addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
480
    } else {
481
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
482
      if (NULL == addr) {
483
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
484
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
485
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
486
      }
487

488
      SCH_SWITCH_EPSET(addr);
489
      SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
490
    }
491

492
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
493

494
    SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
495

496
    return TSDB_CODE_SUCCESS;
497
  }
498

499
  // merge plan
500

501
  pTask->childReady = 0;
502

503
  qClearSubplanExecutionNode(pTask->plan);
504

505
  // Note: current error task and upper level merge task
506
  if ((pData && 0 == pData->len) || NULL == pData) {
507
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
508
  }
509

510
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
511

512
  int32_t childrenNum = taosArrayGetSize(pTask->children);
513
  for (int32_t i = 0; i < childrenNum; ++i) {
514
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
515
    SCH_LOCK_TASK(pChild);
516
    (void)schDoTaskRedirect(pJob, pChild, NULL, rspCode);  // error handled internal
517
    SCH_UNLOCK_TASK(pChild);
518
  }
519

520
  return TSDB_CODE_SUCCESS;
521

522
_return:
523

524
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
525
}
526

527
int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) {
528
  SSchLevel *pLevel = pTask->level;
529

530
  SCH_TASK_DLOG("start to reset level for current task set, execDone:%d, launched:%d",
531
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
532

533
  if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
534
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
535
  }
536

537
  (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
538

539
  int32_t childrenNum = taosArrayGetSize(pTask->children);
540
  for (int32_t i = 0; i < childrenNum; ++i) {
541
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
542
    if (NULL == pChild) {
543
      SCH_TASK_ELOG("fail to get the %dth child, childrenNum:%d", i, childrenNum);
544
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
545
    }
546

547
    SCH_LOCK_TASK(pChild);
548
    pLevel = pChild->level;
549
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
550
    (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
551
    SCH_UNLOCK_TASK(pChild);
552
  }
553

554
  SCH_TASK_DLOG("end to reset level for current task set, execDone:%d, launched:%d",
555
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
556

557
  return TSDB_CODE_SUCCESS;
558
}
559

560
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
561
  int32_t code = 0;
562

563
  SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));
564

565
  if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
566
    if (NULL == pData->pEpSet) {
567
      SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
568
      code = TSDB_CODE_INVALID_MSG;
569
      goto _return;
570
    }
571
  }
572

573
  SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
574

575
  SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask));
576

577
  SCH_RESET_JOB_LEVEL_IDX(pJob);
578
  atomic_add_fetch_64(&pJob->seriesId, 1);
579

580
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
581

582
  taosMemoryFreeClear(pData->pData);
583
  taosMemoryFreeClear(pData->pEpSet);
584

585
  SCH_RET(code);
586

587
_return:
588

589
  taosMemoryFreeClear(pData->pData);
590
  taosMemoryFreeClear(pData->pEpSet);
591

592
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
593
}
594
#endif
595

596
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
850,405,736✔
597
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
850,405,736✔
598
  if (0 != code) {
850,487,610✔
599
    if (HASH_NODE_EXIST(code)) {
×
600
      SCH_TASK_DLOG("task already in execTask list, code:0x%x", code);
×
601
      return TSDB_CODE_SUCCESS;
×
602
    }
603

604
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, code:0x%x", code);
×
605
    SCH_ERR_RET(code);
×
606
  }
607

608
  SCH_TASK_TLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
850,487,610✔
609

610
  return TSDB_CODE_SUCCESS;
850,283,835✔
611
}
612

613
/*
614
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
615
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
616
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
617
  } else {
618
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
619
  }
620

621
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
622
  if (0 != code) {
623
    if (HASH_NODE_EXIST(code)) {
624
      *moved = true;
625
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
626
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
627
    }
628

629
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", ERRNO);
630
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
631
  }
632

633
  *moved = true;
634

635
  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
636

637
  return TSDB_CODE_SUCCESS;
638
}
639

640
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
641
  *moved = false;
642

643
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
644
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
645
  }
646

647
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
648
  if (0 != code) {
649
    if (HASH_NODE_EXIST(code)) {
650
      *moved = true;
651

652
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
653
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
654
    }
655

656
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", ERRNO);
657
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
658
  }
659

660
  *moved = true;
661

662
  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
663

664
  return TSDB_CODE_SUCCESS;
665
}
666

667
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
668
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
669
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
670
  }
671

672
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
673
  if (0 != code) {
674
    if (HASH_NODE_EXIST(code)) {
675
      *moved = true;
676

677
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
678
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
679
    }
680

681
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", ERRNO);
682
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
683
  }
684

685
  *moved = true;
686

687
  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
688

689
  return TSDB_CODE_SUCCESS;
690
}
691
*/
692

693
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
7,824,668✔
694
  int64_t now = taosGetTimestampMs();
7,823,770✔
695
  double  el = 0.0;
7,823,770✔
696
  if (pTask->redirectCtx.startTs != 0) {
7,823,770✔
697
    el = (now - pTask->redirectCtx.startTs) / 1000.0;
154,227✔
698
  }
699

700
  if (pJob->noMoreRetry) {
7,824,668✔
701
    *needRetry = false;
58,489✔
702
    SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes,
58,489✔
703
                  pTask->maxRetryTimes);
704
    return TSDB_CODE_SUCCESS;
58,489✔
705
  }
706

707
  // handle transport time out issue
708
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
7,765,675✔
709
    pTask->maxExecTimes++;
×
710
    pTask->maxRetryTimes++;
×
711
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
×
712
      pTask->timeoutUsec *= 2;
×
713
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
×
714
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
×
715
      }
716
    }
717
  }
718

719
  int32_t code = schFailedTaskNeedRetry(pTask, pJob, errCode);
7,765,675✔
720
  if (code != TSDB_CODE_SUCCESS) {
7,766,628✔
721
    *needRetry = false;
1,300✔
722
    return TSDB_CODE_SUCCESS;
1,300✔
723
  }
724

725
  if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) {
7,765,328✔
726
    *needRetry = false;
7,650,951✔
727
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:0x%x - %s", errCode, tstrerror(errCode));
7,650,447✔
728
    return TSDB_CODE_SUCCESS;
7,649,998✔
729
  }
730

731
  *needRetry = true;
113,818✔
732

733
  SCH_TASK_DLOG("task need the %d/%d retry, elapsedTime:%.2fs, errCode:0x%x - %s", pTask->execId + 1,
114,377✔
734
                pTask->maxRetryTimes, el, errCode, tstrerror(errCode));
735
  return TSDB_CODE_SUCCESS;
114,377✔
736
}
737

738
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
114,377✔
739
  (void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
114,377✔
740

741
  if (pTask->delayTimer) {
114,377✔
742
    UNUSED(taosTmrStop(pTask->delayTimer));
93,473✔
743
  }
744

745
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
114,377✔
746
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
114,377✔
747

748
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
114,377✔
749
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
×
750
  }
751

752
  schDeregisterTaskHb(pJob, pTask);
114,377✔
753

754
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
114,377✔
755
    pTask->delayExecMs = pTask->redirectCtx.redirectDelayMs;
114,377✔
756
    if (pTask->redirectCtx.startTs == 0) {
114,377✔
757
      pTask->redirectCtx.startTs = taosGetTimestampMs();
20,040✔
758
    }
759

760
    SCH_ERR_RET(schUpdateCurrentEpset(pTask, pJob));
114,377✔
761
  } else {
762
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
×
763
  }
764

765
  SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
114,377✔
766
  return TSDB_CODE_SUCCESS;
114,377✔
767
}
768

769
int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
53,147,631✔
770
  int32_t addNum = 0;
53,147,631✔
771
  int32_t nodeNum = 0;
53,147,631✔
772

773
  if (pJob->nodeList) {
53,147,631✔
774
    nodeNum = taosArrayGetSize(pJob->nodeList);
53,147,171✔
775

776
    for (int32_t i = 0; i < nodeNum; ++i) {
164,902,447✔
777
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
111,756,248✔
778
      SQueryNodeAddr *naddr = &nload->addr;
111,754,822✔
779

780
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
223,509,132✔
781
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, ERRNO);
×
782
        SCH_ERR_RET(terrno);
×
783
      }
784

785
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
111,753,850✔
786
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
787
                    SCH_GET_CUR_EP(naddr)->port);
788

789
      ++addNum;
111,754,816✔
790
    }
791
  }
792

793
  if (addNum <= 0) {
53,146,659✔
794
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
×
795
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
46✔
796
  }
797

798
  return TSDB_CODE_SUCCESS;
53,146,245✔
799
}
800

801
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
851,541,983✔
802
  if (NULL != pTask->candidateAddrs) {
851,541,983✔
803
    return TSDB_CODE_SUCCESS;
2,185,849✔
804
  }
805

806
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
849,871,692✔
807
  if (NULL == pTask->candidateAddrs) {
850,155,745✔
808
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
27✔
809
    SCH_ERR_RET(terrno);
27✔
810
  }
811

812
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
850,144,135✔
813
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
1,593,907,072✔
814
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", ERRNO);
×
815
      SCH_ERR_RET(terrno);
×
816
    }
817

818
    SCH_TASK_TLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
796,776,896✔
819

820
    return TSDB_CODE_SUCCESS;
796,666,381✔
821
  }
822

823
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
53,147,157✔
824
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
×
825
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
826
  }
827

828
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
53,147,631✔
829

830
  pTask->candidateIdx = taosRand() % taosArrayGetSize(pTask->candidateAddrs);
53,146,705✔
831
  return TSDB_CODE_SUCCESS;
53,147,631✔
832
}
833

834
#if 0
835
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
836
  int32_t code = TSDB_CODE_SUCCESS;
837
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
838
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
839
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
840
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
841
  }
842

843
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
844
  if (NULL == pAddr) {
845
    SCH_TASK_ELOG("fail to get task 0th condidataAddr, totalNum:%d", (int32_t)taosArrayGetSize(pTask->candidateAddrs));
846
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
847
  }
848

849
  char *origEpset = NULL;
850
  char *newEpset = NULL;
851

852
  SCH_ERR_RET(schDumpEpSet(&pAddr->epSet, &origEpset));
853
  SCH_ERR_JRET(schDumpEpSet(pEpSet, &newEpset));
854

855
  SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
856

857
  TAOS_MEMCPY(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
858

859
_return:
860

861
  taosMemoryFree(origEpset);
862
  taosMemoryFree(newEpset);
863

864
  return code;
865
}
866
#endif
867

868
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
1,289✔
869
  SQueryNodeAddr *pAddr = NULL;
1,289✔
870
  int32_t         code = 0;
1,289✔
871

872
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
1,289✔
873
  if (candidateNum <= 1) {
1,289✔
874
    goto _return;
×
875
  }
876

877
  switch (schMgmt.cfg.schPolicy) {
1,289✔
878
    case SCH_LOAD_SEQ:
1,289✔
879
    case SCH_ALL:
880
    default:
881
      if (++pTask->candidateIdx >= candidateNum) {
1,289✔
882
        pTask->candidateIdx = 0;
601✔
883
      }
884
      break;
1,289✔
885
    case SCH_RANDOM: {
×
886
      int32_t lastIdx = pTask->candidateIdx;
×
887
      while (lastIdx == pTask->candidateIdx) {
×
888
        pTask->candidateIdx = taosRand() % candidateNum;
×
889
      }
890
      break;
×
891
    }
892
  }
893

894
_return:
1,289✔
895
  code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
1,289✔
896
  if (code == TSDB_CODE_SUCCESS) {
1,289✔
897
    SCH_TASK_DLOG("switch task exec on candidateIdx:%d/%d, vgId:%d", pTask->candidateIdx, candidateNum, pAddr->nodeId);
1,289✔
898
  }
899
  return code;
1,289✔
900
}
901

902
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
2,187,447✔
903
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
2,187,447✔
904
  if (code) {
2,187,447✔
905
    SCH_TASK_WLOG("task already not in execTask list, code:0x%x", code);
1,989,106✔
906
  }
907

908
  return TSDB_CODE_SUCCESS;
2,187,447✔
909
}
910

911
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
292,306,564✔
912
  if (NULL == pTask->execNodes) {
292,306,564✔
913
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
×
914
    return;
×
915
  }
916

917
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
292,306,029✔
918
  if (size <= 0) {
292,306,193✔
919
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
34,781✔
920
    return;
34,781✔
921
  }
922

923
  int32_t       i = 0;
292,271,412✔
924
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
292,271,412✔
925
  while (nodeInfo) {
584,556,111✔
926
    if (nodeInfo->handle) {
292,284,304✔
927
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
292,284,383✔
928
      void *pExecId = taosHashGetKey(nodeInfo, NULL);
292,284,401✔
929
      (void)schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId);  // ignore error and continue
292,283,721✔
930

931
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
292,284,360✔
932
    } else {
933
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
×
934
    }
935

936
    ++i;
292,284,360✔
937
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
292,284,360✔
938
  }
939

940
  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
292,271,807✔
941
}
942

943
int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) {
19,746✔
944
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
19,746✔
945
  if (size <= 0) {
19,746✔
946
    SCH_TASK_DLOG("task no exec address to notify, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
×
947
    return TSDB_CODE_SUCCESS;
×
948
  }
949

950
  int32_t       i = 0;
19,746✔
951
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
19,746✔
952
  while (nodeInfo) {
39,492✔
953
    if (nodeInfo->handle) {
19,746✔
954
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
19,746✔
955
      SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_TASK_NOTIFY, &type));
19,746✔
956
      SCH_TASK_DLOG("start to notify %d to task's %dth execNode", type, i);
19,746✔
957
    } else {
958
      SCH_TASK_DLOG("no need to notify %d to task %dth execNode", type, i);
×
959
    }
960

961
    ++i;
19,746✔
962
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
19,746✔
963
  }
964

965
  SCH_TASK_DLOG("task has been notified %d on %d exec nodes", type, size);
19,746✔
966
  return TSDB_CODE_SUCCESS;
19,746✔
967
}
968

969
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
144,563,120✔
970
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
144,563,120✔
971
  SSchTask *pTask = NULL;
144,563,418✔
972
  SSchJob  *pJob = NULL;
144,563,786✔
973

974
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
144,563,786✔
975
         pEpId->ep.port);
976

977
  for (int32_t i = 0; i < taskNum; ++i) {
149,628,333✔
978
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
5,061,824✔
979
    if (NULL == pStatus) {
5,061,824✔
980
      qError("fail to get the %dth task status in hb rsp, taskNum:%d", i, taskNum);
×
981
      continue;
×
982
    }
983

984
    int32_t code = 0;
5,061,824✔
985

986
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d task status in server: %s", pStatus->queryId,
5,061,824✔
987
           pStatus->clientId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status));
988

989
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->subJobId, pStatus->taskId)) {
5,061,824✔
990
      continue;
1,656,805✔
991
    }
992

993
    if (pStatus->execId != pTask->execId) {
3,405,013✔
994
      // TODO
995
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
×
996
      schProcessOnCbEnd(pJob, pTask, 0);
×
997
      continue;
×
998
    }
999

1000
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
3,403,821✔
1001
      // RECORD AND HANDLE ERROR!!!!
1002
      schProcessOnCbEnd(pJob, pTask, 0);
449✔
1003
      continue;
449✔
1004
    }
1005

1006
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
3,404,474✔
1007
      code = schRescheduleTask(pJob, pTask);
45,146✔
1008
    }
1009

1010
    schProcessOnCbEnd(pJob, pTask, code);
3,403,996✔
1011
  }
1012

1013
  return TSDB_CODE_SUCCESS;
144,566,509✔
1014
}
1015

1016
int32_t schHandleExplainRes(SArray *pExplainRes) {
×
1017
  int32_t code = 0;
×
1018
  int32_t resNum = taosArrayGetSize(pExplainRes);
×
1019
  if (resNum <= 0) {
×
1020
    goto _return;
×
1021
  }
1022

1023
  SSchTask *pTask = NULL;
×
1024
  SSchJob  *pJob = NULL;
×
1025

1026
  for (int32_t i = 0; i < resNum; ++i) {
×
1027
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
×
1028
    if (NULL == localRsp) {
×
1029
      qError("fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1030
      continue;
×
1031
    }
1032

1033
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg",
×
1034
           localRsp->qId, localRsp->cId, localRsp->tId);
1035

1036
    pJob = NULL;
×
1037
    (void)schAcquireJob(localRsp->rId, &pJob);
×
1038
    if (NULL == pJob) {
×
1039
      qWarn("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " job no exist, may be dropped, refId:0x%" PRIx64,
×
1040
            localRsp->qId, localRsp->cId, localRsp->tId, localRsp->rId);
1041
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
×
1042
    }
1043

1044
    int8_t status = 0;
×
1045
    if (schJobNeedToStop(pJob, &status)) {
×
1046
      SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
×
1047
      (void)schReleaseJob(pJob->refId);
×
1048
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1049
    }
1050

1051
    code = schGetTaskInJob(pJob, localRsp->tId, &pTask);
×
1052

1053
    if (TSDB_CODE_SUCCESS == code) {
×
1054
      code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
×
1055
    }
1056

1057
    (void)schReleaseJob(pJob->refId);
×
1058

1059
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x",
×
1060
           localRsp->qId, localRsp->cId, localRsp->tId, code);
1061

1062
    SCH_ERR_JRET(code);
×
1063

1064
    localRsp->rsp.numOfPlans = 0;
×
1065
    localRsp->rsp.subplanInfo = NULL;
×
1066
    pTask = NULL;
×
1067
  }
1068

1069
_return:
×
1070

1071
  for (int32_t i = 0; i < resNum; ++i) {
×
1072
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
×
1073
    if (NULL == localRsp) {
×
1074
      qError("in _return fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1075
      continue;
×
1076
    }
1077

1078
    tFreeSExplainRsp(&localRsp->rsp);
×
1079
  }
1080

1081
  taosArrayDestroy(pExplainRes);
×
1082

1083
  SCH_RET(code);
×
1084
}
1085

1086
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
852,350,081✔
1087
  SSubplan *plan = pTask->plan;
852,350,081✔
1088
  int32_t   code = 0;
852,467,234✔
1089

1090
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
852,467,234✔
1091
    SCH_LOCK(SCH_WRITE, &pTask->planLock);
852,306,558✔
1092
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
852,083,100✔
1093
    SCH_UNLOCK(SCH_WRITE, &pTask->planLock);
851,618,742✔
1094

1095
    if (TSDB_CODE_SUCCESS != code) {
851,335,291✔
1096
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
×
1097
                    pTask->msgLen);
1098
      SCH_ERR_RET(code);
×
1099
    } else if (tsQueryPlannerTrace) {
851,335,291✔
1100
      char   *msg = NULL;
18,900✔
1101
      int32_t msgLen = 0;
18,900✔
1102
      SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
18,900✔
1103
      SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
18,900✔
1104
      taosMemoryFree(msg);
18,900✔
1105
    }
1106
  }
1107

1108
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
851,412,282✔
1109

1110
  if (SCH_IS_QUERY_JOB(pJob)) {
851,834,070✔
1111
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
291,881,611✔
1112
  }
1113

1114
  SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType, NULL));
852,275,173✔
1115
}
1116

1117
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
2,068✔
1118
  // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
1119
  if (NULL == schMgmt.queryMgmt) {
2,068✔
1120
    void* p = NULL;
47✔
1121
    SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, &p, NULL));
47✔
1122
    if (atomic_val_compare_exchange_ptr(&schMgmt.queryMgmt, NULL, p)) {
47✔
1123
      qWorkerDestroy(&p);
×
1124
    }
1125
  }
1126

1127
  SArray *explainRes = NULL;
2,068✔
1128
  int32_t code = 0;
2,068✔
1129
  SQWMsg  qwMsg = {0};
2,068✔
1130
  qwMsg.msgInfo.taskType = (pJob->attr.type == JOB_TYPE_HQUERY)? TASK_TYPE_HQUERY:TASK_TYPE_QUERY;
2,068✔
1131
  qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
2,068✔
1132
  qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
2,068✔
1133
  qwMsg.msg = pTask->plan;
2,068✔
1134
  qwMsg.msgType = pTask->plan->msgType;
2,068✔
1135
  qwMsg.connInfo.handle = pJob->conn.pTrans;
2,068✔
1136
  qwMsg.pWorkerCb = pJob->pWorkerCb;
2,068✔
1137
  qwMsg.subEndPoints = NULL; // TODO
2,068✔
1138

1139
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
2,068✔
1140
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
×
1141
    if (NULL == explainRes) {
×
1142
      SCH_ERR_RET(terrno);
×
1143
    }
1144
  }
1145

1146
  SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, pJob->seriesId, pJob->queryId, pTask->clientId, pTask->taskId,
2,068✔
1147
                                        pJob->refId, pTask->execId, &qwMsg, explainRes));
1148

1149
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
2,068✔
1150
    SCH_ERR_RET(schHandleExplainRes(explainRes));
×
1151
    explainRes = NULL;
×
1152
  }
1153

1154
  SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
2,068✔
1155

1156
_return:
2,068✔
1157

1158
  taosArrayDestroy(explainRes);
2,068✔
1159

1160
  SCH_RET(code);
2,068✔
1161
}
1162

1163
int32_t schLaunchTaskImpl(void *param) {
852,522,012✔
1164
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
852,522,012✔
1165
  SSchJob     *pJob = NULL, *pParent = NULL;
852,522,012✔
1166

1167
  (void)schAcquireJob(pCtx->jobRid, &pParent);
852,539,632✔
1168
  if (NULL == pParent) {
852,587,469✔
1169
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
971✔
1170
    taosMemoryFree(param);
971✔
1171
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
439✔
1172
  }
1173

1174
  if (pCtx->subJobId >= 0) {
852,586,498✔
1175
    pJob = taosArrayGetP(pParent->subJobs, pCtx->subJobId);
70,295,832✔
1176
    if (NULL == pJob) {
70,284,921✔
1177
      qDebug("subJobId %d not found in subJobs, totalSubJobNum:%d", pCtx->subJobId, (int32_t)taosArrayGetSize(pParent->subJobs));
×
1178
      (void)schReleaseJob(pParent->refId);
×
1179

1180
      taosMemoryFree(param);
×
1181
      SCH_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1182
    }
1183
  } else {
1184
    pJob = pParent;
782,258,247✔
1185
  }
1186

1187
  SSchTask *pTask = pCtx->pTask;
852,543,168✔
1188

1189
  if (pCtx->asyncLaunch) {
852,564,960✔
1190
    SCH_LOCK_TASK(pTask);
178,005,594✔
1191
  }
1192

1193
  pTask->execId++;
852,497,519✔
1194
  pTask->retryTimes++;
852,461,064✔
1195
  pTask->waitRetry = false;
852,463,276✔
1196

1197
  int8_t  status = 0;
852,448,527✔
1198
  int32_t code = 0;
852,499,744✔
1199

1200
  if (atomic_load_64(&pTask->seriesId) < atomic_load_64(&pJob->seriesId)) {
852,499,744✔
1201
    SCH_TASK_DLOG("task seriesId:0x%" PRIx64 " is smaller than job seriesId:0x%" PRIx64 ", skip launch",
×
1202
                  pTask->seriesId, pJob->seriesId);
1203
    goto _return;
×
1204
  }
1205

1206
  (void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
852,456,427✔
1207

1208
  SCH_TASK_TLOG("start to launch %s task, execId %d, retry %d",
852,592,734✔
1209
                SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
1210

1211
  SCH_LOG_TASK_START_TS(pTask);
2,147,483,647✔
1212

1213
  if (schJobNeedToStop(pJob, &status)) {
852,487,286✔
1214
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
1,831✔
1215
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
1,831✔
1216
  }
1217

1218
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
852,602,196✔
1219
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
850,424,685✔
1220
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
850,176,644✔
1221
  }
1222

1223
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
852,543,563✔
1224
    SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
2,068✔
1225
  } else {
1226
    SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
852,551,790✔
1227
  }
1228

1229
#if 0
1230
  if (SCH_IS_QUERY_JOB(pJob)) {
1231
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
1232
  }
1233
#endif
1234

1235
_return:
852,512,640✔
1236

1237
  if (pJob && pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
852,518,649✔
1238
    if (code) {
177,990,797✔
1239
      code = schProcessOnTaskFailure(pJob, pTask, code);
1,831✔
1240
    }
1241
    if (code) {
177,990,797✔
1242
      code = schHandleJobFailure(pJob, code);
1,831✔
1243
    }
1244
  }
1245

1246
  if (pCtx->asyncLaunch) {
852,524,591✔
1247
    SCH_UNLOCK_TASK(pTask);
177,997,540✔
1248
  }
1249

1250
  (void)schReleaseJob(pParent->refId);
852,531,713✔
1251

1252
  taosMemoryFree(param);
852,604,411✔
1253

1254
  SCH_RET(code);
852,532,524✔
1255
}
1256

1257
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
852,522,862✔
1258
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
852,522,862✔
1259
  if (NULL == param) {
852,548,448✔
1260
    SCH_ERR_RET(terrno);
×
1261
  }
1262

1263
  param->jobRid = pJob->refId;
852,548,448✔
1264
  param->subJobId = pJob->subJobId;
852,560,248✔
1265
  param->pTask = pTask;
852,556,174✔
1266

1267
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
852,548,339✔
1268
    param->asyncLaunch = true;
178,020,772✔
1269
    SCH_ERR_RET(taosAsyncExec(schLaunchTaskImpl, param, NULL));
178,020,772✔
1270
  } else {
1271
    SCH_ERR_RET(schLaunchTaskImpl(param));
674,502,747✔
1272
  }
1273

1274
  return TSDB_CODE_SUCCESS;
852,596,346✔
1275
}
1276

1277
// Note: no more error processing, handled in function internal
1278
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
852,567,712✔
1279
  bool    enough = false;
852,567,712✔
1280
  int32_t code = 0;
852,574,548✔
1281

1282
  SCH_SET_TASK_HANDLE(pTask, NULL);
852,574,548✔
1283

1284
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
852,576,483✔
1285
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
×
1286

1287
    if (enough) {
×
1288
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
×
1289
    }
1290
  } else {
1291
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
852,548,166✔
1292
  }
1293

1294
  return TSDB_CODE_SUCCESS;
852,595,283✔
1295

1296
_return:
×
1297
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
×
1298
}
1299

1300
void schHandleTimerEvent(void *param, void *tmrId) {
115,892✔
1301
  SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
115,892✔
1302
  SSchTask       *pTask = NULL;
115,892✔
1303
  SSchJob        *pJob = NULL;
115,892✔
1304
  int32_t         code = 0;
115,892✔
1305

1306
  qDebug("delayTimer:%" PRIuPTR " is launched", (uintptr_t)tmrId);
115,892✔
1307

1308
  int64_t  rId = pTimerParam->rId;
115,892✔
1309
  uint64_t queryId = pTimerParam->queryId;
115,892✔
1310
  uint64_t taskId = pTimerParam->taskId;
115,892✔
1311
  int32_t subJobId = pTimerParam->subJobId;
115,892✔
1312

1313
  if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, subJobId, taskId)) {
115,892✔
UNCOV
1314
    return;
×
1315
  }
1316

1317
  if (0 == atomic_load_8(&pTask->delayLaunchPar.exit)) {
115,892✔
1318
    code = schLaunchTask(pJob, pTask);
115,892✔
1319
  } else {
1320
    SCH_TASK_DLOG("task will not be launched since query job exiting, status: %d", pTask->status);
×
1321
  }
1322

1323
  schProcessOnCbEnd(pJob, pTask, code);
115,892✔
1324
}
1325

1326
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
852,557,002✔
1327
  if (pTask->delayExecMs > 0) {
852,557,002✔
1328
    pTask->delayLaunchPar.rId = pJob->refId;
117,394✔
1329
    pTask->delayLaunchPar.queryId = pJob->queryId;
117,394✔
1330
    pTask->delayLaunchPar.taskId = pTask->taskId;
117,394✔
1331
    pTask->delayLaunchPar.subJobId = pJob->subJobId;
117,394✔
1332

1333
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
117,394✔
1334
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
117,394✔
1335

1336
    if (NULL == pTask->delayTimer) {
117,394✔
1337
      pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer);
20,904✔
1338
      if (NULL == pTask->delayTimer) {
20,904✔
1339
        SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
×
1340
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1341
      }
1342

1343
      SCH_TASK_DLOG("task delayTimer:%" PRIuPTR " is started to launch task after:%.2fs", (uintptr_t)pTask->delayTimer,
20,904✔
1344
                    pTask->delayExecMs/1000.0);
1345
      return TSDB_CODE_SUCCESS;
20,904✔
1346
    }
1347

1348
    if (taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer,
96,490✔
1349
                     &pTask->delayTimer)) {
1350
      SCH_TASK_ELOG("taosTmrReset delayExec timer failed, handle:%p, tmr:%" PRIuPTR, schMgmt.timer,
×
1351
                    (uintptr_t)pTask->delayTimer);
1352
    } else {
1353
      SCH_TASK_DLOG("task start in %.2fs later by handler:%p, tmr:%" PRIuPTR, pTask->delayExecMs / 1000.0,
96,490✔
1354
                    schMgmt.timer, (uintptr_t)pTask->delayTimer);
1355
    }
1356

1357
    return TSDB_CODE_SUCCESS;
96,490✔
1358
  }
1359

1360
  SCH_RET(schLaunchTask(pJob, pTask));
852,415,363✔
1361
}
1362

1363
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
698,166,576✔
1364
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
698,166,576✔
1365

1366
  for (int32_t i = 0; i < level->taskNum; ++i) {
1,486,376,863✔
1367
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
788,214,113✔
1368
    pTask->failedSeriesId = pJob->seriesId - 1;
788,200,540✔
1369
    pTask->seriesId = pJob->seriesId;
788,189,897✔
1370
    
1371
    SCH_TASK_TLOG("task sId is set:%" PRIx64, pTask->seriesId);
788,211,422✔
1372
    SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
788,211,422✔
1373
  }
1374

1375
  return TSDB_CODE_SUCCESS;
698,197,449✔
1376
}
1377

1378
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
715,747,967✔
1379
  if (!SCH_JOB_NEED_DROP(pJob)) {
715,747,967✔
1380
    return;
542,705,383✔
1381
  }
1382

1383
  void *pIter = taosHashIterate(list, NULL);
173,047,530✔
1384
  while (pIter) {
463,282,352✔
1385
    SSchTask *pTask = *(SSchTask **)pIter;
290,233,373✔
1386

1387
    if (pTask->delayTimer) {
290,233,390✔
1388
      schStopTaskDelayTimer(pJob, pTask, true);
20,904✔
1389
    }
1390

1391
    SCH_LOCK_TASK(pTask);
290,233,412✔
1392
    schDropTaskOnExecNode(pJob, pTask);
290,230,823✔
1393
    SCH_UNLOCK_TASK(pTask);
290,233,551✔
1394

1395
    pIter = taosHashIterate(list, pIter);
290,233,527✔
1396
  }
1397
}
1398

1399
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pCurrTask) {
4,318✔
1400
  int32_t code = TSDB_CODE_SUCCESS;
4,318✔
1401

1402
  SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type));
4,318✔
1403

1404
  void *pIter = taosHashIterate(list, NULL);
4,318✔
1405
  while (pIter) {
24,064✔
1406
    SSchTask *pTask = *(SSchTask **)pIter;
19,746✔
1407
    if (pTask != pCurrTask) {
19,746✔
1408
      SCH_LOCK_TASK(pTask);
15,428✔
1409
      code = schNotifyTaskOnExecNode(pJob, pTask, type);
15,428✔
1410
      SCH_UNLOCK_TASK(pTask);
15,428✔
1411

1412
      if (TSDB_CODE_SUCCESS != code) {
15,428✔
1413
        break;
×
1414
      }
1415
    }
1416

1417
    pIter = taosHashIterate(list, pIter);
19,746✔
1418
  }
1419

1420
  SCH_RET(code);
4,318✔
1421
}
1422

1423
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
115,950,496✔
1424
  SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL));
115,950,496✔
1425
}
1426

1427
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
2,068✔
1428
  void   *pRsp = NULL;
2,068✔
1429
  int32_t code = 0;
2,068✔
1430
  SArray *explainRes = NULL;
2,068✔
1431

1432
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
2,068✔
1433
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
×
1434
    if (NULL == explainRes) {
×
1435
      SCH_ERR_RET(terrno);
×
1436
    }
1437
  }
1438

1439
  SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, pJob->seriesId, pJob->queryId, pTask->clientId, pTask->taskId,
2,068✔
1440
                                        pJob->refId, pTask->execId, &pRsp, explainRes));
1441

1442
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
2,068✔
1443
    SCH_ERR_RET(schHandleExplainRes(explainRes));
×
1444
    explainRes = NULL;
×
1445
  }
1446

1447
  SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
2,068✔
1448

1449
_return:
2,068✔
1450

1451
  taosArrayDestroy(explainRes);
2,068✔
1452

1453
  SCH_RET(code);
2,068✔
1454
}
1455

1456
// Note: no more error processing, handled in function internal
1457
int32_t schLaunchFetchTask(SSchJob *pJob) {
115,952,558✔
1458
  int32_t code = 0;
115,952,558✔
1459

1460
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
115,952,558✔
1461
  if (fetchRes) {
115,952,547✔
1462
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
×
1463
    return TSDB_CODE_SUCCESS;
×
1464
  }
1465

1466
  SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH);
115,952,547✔
1467

1468
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
115,952,553✔
1469
    SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
2,068✔
1470
  } else {
1471
    SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
115,950,485✔
1472
  }
1473

1474
  return TSDB_CODE_SUCCESS;
115,952,564✔
1475

1476
_return:
×
1477

1478
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
×
1479
}
1480

1481
int32_t schUpdateCurrentEpset(SSchTask *pTask, SSchJob *pJob) {
114,377✔
1482
  char            buf[256] = {0};
114,377✔
1483
  SQueryNodeAddr *pAddr = NULL;
114,377✔
1484
  int32_t         code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
114,377✔
1485
  if (code != TSDB_CODE_SUCCESS) {
114,377✔
1486
    SCH_ERR_RET(code);
×
1487
  }
1488

1489
  // switch to the next ep in the epset
1490
  SCH_SWITCH_EPSET(pAddr);
114,377✔
1491

1492
  int32_t ret = epsetToStr(&pAddr->epSet, buf, tListLen(buf));
114,377✔
1493
  if (ret != 0) {  // print error and continue
114,377✔
1494
    SCH_TASK_ELOG("failed to print vgId:%d epset, code:%s", pAddr->nodeId, tstrerror(ret));
×
1495
  }
1496

1497
  // Wait for a while since the vnode leader/follower switch may cost from several seconds
1498
  // to serveral minitues to complete.
1499
  SCH_TASK_DLOG("vgId:%d switch to next ep:%s to start task delay:%.2fs, startTs:%" PRId64, pAddr->nodeId, buf,
114,377✔
1500
                pTask->delayExecMs / 1000.0, pTask->redirectCtx.startTs);
1501

1502
  return TSDB_CODE_SUCCESS;
114,377✔
1503
}
1504

1505
int32_t schFailedTaskNeedRetry(SSchTask *pTask, SSchJob *pJob, int32_t rspCode) {
9,873,521✔
1506
  double el = (pTask->redirectCtx.startTs > 0) ? (taosGetTimestampMs() - pTask->redirectCtx.startTs) / 1000.0 : 0.0;
12,016,854✔
1507

1508
  // check if the failed task may cause the query job to quit retrying
1509
  if (pTask->retryTimes >= pTask->maxRetryTimes) {
9,873,521✔
1510
    pJob->noMoreRetry = true;
59,789✔
1511
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d, elapsedTime:%.2fs",
59,789✔
1512
                  pTask->retryTimes, pTask->maxRetryTimes, el);
1513
    return rspCode;
59,789✔
1514
  }
1515

1516
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
9,811,881✔
1517
    pJob->noMoreRetry = true;
×
1518
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d, elapsedTime:%.2fs", pTask->execId,
×
1519
                  pTask->maxExecTimes, el);
1520
    return rspCode;
×
1521
  }
1522

1523
  return TSDB_CODE_SUCCESS;
9,813,732✔
1524
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc