• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.36
/source/libs/scheduler/src/schTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "command.h"
18
#include "query.h"
19
#include "qworker.h"
20
#include "schInt.h"
21
#include "tglobal.h"
22
#include "tmsg.h"
23
#include "tref.h"
24
#include "trpc.h"
25

26
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
11,375,050✔
27
  schDeregisterTaskHb(pJob, pTask);
11,375,050✔
28

29
  if (pTask->candidateAddrs) {
11,376,233✔
30
    taosArrayDestroy(pTask->candidateAddrs);
10,957,262✔
31
  }
32

33
  taosMemoryFreeClear(pTask->msg);
11,402,827!
34

35
  if (pTask->children) {
11,402,038✔
36
    taosArrayDestroy(pTask->children);
609,948✔
37
  }
38

39
  if (pTask->parents) {
11,402,038✔
40
    taosArrayDestroy(pTask->parents);
1,255,921✔
41
  }
42

43
  if (pTask->execNodes) {
11,402,039!
44
    taosHashCleanup(pTask->execNodes);
11,405,387✔
45
  }
46

47
  taosArrayDestroy(pTask->profile.execTime);
11,400,060✔
48
}
11,405,612✔
49

50
void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
11,364,695✔
51
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
11,364,695!
52
    pTask->maxRetryTimes = SCH_DEFAULT_MAX_RETRY_NUM;
11,364,695✔
53
  } else {
54
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
×
55
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM);
×
56
  }
57

58
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
11,364,695✔
59
}
11,364,695✔
60

61
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
11,337,391✔
62
  int32_t code = 0;
11,337,391✔
63

64
  pTask->plan = pPlan;
11,337,391✔
65
  pTask->level = pLevel;
11,337,391✔
66
  pTask->seriousId = pJob->seriousId;
11,337,391✔
67
  pTask->execId = -1;
11,337,391✔
68
  pTask->failedExecId = -2;
11,337,391✔
69
  pTask->failedSeriousId = 0;
11,337,391✔
70
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
11,337,391✔
71
  pTask->clientId = getClientId();
11,337,391✔
72
  pTask->taskId = schGenTaskId();
11,336,556✔
73

74
  schInitTaskRetryTimes(pJob, pTask, pLevel);
11,399,403✔
75

76
  pTask->execNodes =
11,353,018✔
77
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
11,384,014✔
78
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
11,353,018✔
79
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
11,390,995!
80
    SCH_ERR_JRET(terrno);
×
81
  }
82

83
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
11,392,010✔
84

85
  SCH_TASK_DLOG("task initialized, max times %d:%d", pTask->maxRetryTimes, pTask->maxExecTimes);
11,403,227!
86

87
  return TSDB_CODE_SUCCESS;
11,400,998✔
88

89
_return:
×
90

91
  taosArrayDestroy(pTask->profile.execTime);
×
92
  taosHashCleanup(pTask->execNodes);
×
93

94
  SCH_RET(code);
×
95
}
96

97
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
11,050,102✔
98
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
11,050,102✔
99
    return TSDB_CODE_SUCCESS;
71,267✔
100
  }
101

102
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
10,978,835✔
103
  if (NULL == addr) {
10,974,282!
104
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
×
105
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
106
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
107
  }
108

109
  pTask->succeedAddr = *addr;
10,958,685✔
110

111
  return TSDB_CODE_SUCCESS;
10,958,685✔
112
}
113

114
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
1,476,543✔
115
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
1,476,543!
116

117
  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
1,476,543!
118
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
×
119
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
120
  }
121

122
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
1,478,375!
123

124
  return TSDB_CODE_SUCCESS;
1,478,083✔
125
}
126

127
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
1,236✔
128
  if (NULL == pTask->execNodes) {
1,236!
129
    return TSDB_CODE_SUCCESS;
×
130
  }
131

132
  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
1,236✔
133
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
11!
134
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
11!
135
  } else {
136
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
1,225!
137
  }
138

139
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
1,225!
140
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
×
141
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
142
  }
143

144
  return TSDB_CODE_SUCCESS;
1,225✔
145
}
146

147
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
2,035,412✔
148
  if (taosHashGetSize(pTask->execNodes) <= 0) {
2,035,412✔
149
    return TSDB_CODE_SUCCESS;
483✔
150
  }
151

152
  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
2,035,088✔
153
  if (NULL == nodeInfo) {  // ignore it
2,035,114✔
154
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
474!
155
                  pTask->execId, pTask->waitRetry);
156
    return TSDB_CODE_SUCCESS;
474✔
157
  }
158

159
  nodeInfo->handle = handle;
2,034,640✔
160

161
  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);
2,034,640!
162

163
  return TSDB_CODE_SUCCESS;
2,034,575✔
164
}
165

166
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, uint64_t seriousId, int32_t execId) {
2,036,653✔
167
  if (dropExecNode) {
2,036,653✔
168
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
1,236!
169
  }
170

171
  SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId));
2,035,417!
172

173
  if ((seriousId != pTask->seriousId || seriousId <= pTask->failedSeriousId) || 
2,035,536!
174
      (execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) {  // ignore it
2,034,720!
175
    SCH_TASK_DLOG("handle not updated since seriousId:0x%" PRIx64 " or execId:%d is not lastest,"
919!
176
                  "current seriousId:0x%" PRIx64 " execId %d, failedSeriousId:0x%" PRIx64 " failedExecId:%d, waitRetry %d", 
177
                  seriousId, execId, pTask->seriousId, pTask->execId, pTask->failedSeriousId, pTask->failedExecId, pTask->waitRetry);
178
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
938!
179
  }
180

181
  SCH_SET_TASK_HANDLE(pTask, handle);
2,034,597✔
182

183
  return TSDB_CODE_SUCCESS;
2,034,597✔
184
}
185

186
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
2,240✔
187
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
2,240✔
188
    return TSDB_CODE_SCH_IGNORE_ERROR;
1,267✔
189
  }
190

191
  pTask->failedExecId = pTask->execId;
973✔
192
  pTask->failedSeriousId = pTask->seriousId;
973✔
193

194
  int8_t jobStatus = 0;
973✔
195
  if (schJobNeedToStop(pJob, &jobStatus)) {
973✔
196
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
11!
197
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
11!
198
  }
199

200
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
965✔
201
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
965!
202
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
×
203
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
×
204
  }
205

206
  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
964!
207
    SCH_LOG_TASK_WAIT_TS(pTask);
×
208
  } else {
209
    SCH_LOG_TASK_END_TS(pTask);
965!
210
  }
211

212
  bool    needRetry = false;
964✔
213
  bool    moved = false;
964✔
214
  int32_t taskDone = 0;
964✔
215

216
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
964!
217

218
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
964!
219

220
  if (!needRetry) {
964!
221
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
965!
222

223
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);
966✔
224

225
    if (SCH_JOB_NEED_WAIT(pJob)) {
965✔
226
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
706!
227
      pTask->level->taskFailed++;
704✔
228
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
704✔
229
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
704!
230

231
      schUpdateJobErrCode(pJob, errCode);
705✔
232

233
      if (taskDone < pTask->level->taskNum) {
706✔
234
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
3!
235
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
3!
236
      }
237

238
      SCH_RET(atomic_load_32(&pJob->errCode));
703!
239
    }
240
  } else {
241
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
×
242

243
    return TSDB_CODE_SUCCESS;
×
244
  }
245

246
  SCH_RET(errCode);
259!
247
}
248

249
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
11,017,928✔
250
  bool    moved = false;
11,017,928✔
251
  int32_t code = 0;
11,017,928✔
252

253
  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
11,017,928!
254

255
  SCH_LOG_TASK_END_TS(pTask);
11,020,580!
256

257
  int32_t taskDone = atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1);
11,034,528✔
258

259
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
11,052,438✔
260

261
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
11,054,500!
262

263
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
11,029,567!
264

265
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
11,034,136✔
266
  if (parentNum == 0) {
11,034,131✔
267
    int32_t taskDone = 0;
10,035,394✔
268
    if (SCH_JOB_NEED_WAIT(pJob)) {
10,035,394✔
269
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
9,488,278!
270
      pTask->level->taskSucceed++;
9,413,916✔
271
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
9,413,916✔
272
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
9,413,916!
273

274
      if (taskDone < pTask->level->taskNum) {
9,449,705✔
275
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
46,522!
276
        return TSDB_CODE_SUCCESS;
46,522✔
277
      } else if (taskDone > pTask->level->taskNum) {
9,403,183!
278
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
×
279
      }
280

281
      if (pTask->level->taskFailed > 0) {
9,401,569!
282
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
×
283
      } else {
284
        SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
9,401,569!
285
      }
286
    } else {
287
      pJob->resNode = pTask->succeedAddr;
547,116✔
288
    }
289

290
    pJob->fetchTask = pTask;
547,116✔
291

292
    SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
547,116!
293
  }
294

295
  /*
296
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
297
      tstrncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
298
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
299

300
      ++job->dataSrcEps.numOfEps;
301
    }
302
  */
303

304
  for (int32_t i = 0; i < parentNum; ++i) {
1,999,980✔
305
    SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
1,001,178✔
306
    if (NULL == parent) {
1,001,177!
307
      SCH_TASK_ELOG("fail to get task %d parent, parentNum: %d", i, parentNum);
×
308
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
309
    }
310

311
    SCH_LOCK(SCH_WRITE, &parent->planLock);
1,001,177!
312
    SDownstreamSourceNode source = {
3,003,741✔
313
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
314
        .clientId = pTask->clientId,
1,001,247✔
315
        .taskId = pTask->taskId,
1,001,247✔
316
        .sId = pTask->seriousId,
1,001,247✔
317
        .execId = pTask->execId,
1,001,247✔
318
        .addr = pTask->succeedAddr,
1,001,247✔
319
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
1,001,247✔
320
        .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
1,001,247!
321
    };
322
    code = qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
1,001,247✔
323
    if (TSDB_CODE_SUCCESS != code) {
1,001,241!
324
      SCH_TASK_ELOG("qSetSubplanExecutionNode failed, groupId: %d", pTask->plan->id.groupId);
×
325
    }
326
    SCH_UNLOCK(SCH_WRITE, &parent->planLock);
1,001,241!
327

328
    SCH_ERR_RET(code);
1,001,246!
329

330
    int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
1,001,246✔
331

332
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
1,001,248✔
333
      SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
506,463!
334

335
      parent->seriousId = pJob->seriousId;
506,463✔
336
      TSWAP(pTask, parent);
506,463✔
337
      SCH_TASK_DLOG("task seriousId set to 0x%" PRIx64, pTask->seriousId);
506,463!
338
      TSWAP(pTask, parent);
506,463✔
339

340
      SCH_ERR_RET(schDelayLaunchTask(pJob, parent));
506,463!
341
    }
342
  }
343

344
  if (taskDone == pTask->level->taskNum) {
998,802✔
345
    SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));
415,476!
346
  }
347

348
  return TSDB_CODE_SUCCESS;
998,802✔
349
}
350

351
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
29✔
352
  if (!schMgmt.cfg.enableReSchedule) {
29!
353
    return TSDB_CODE_SUCCESS;
29✔
354
  }
355

356
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
×
357
    return TSDB_CODE_SUCCESS;
×
358
  }
359

360
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
×
361
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
×
362
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
×
363
    schDropTaskOnExecNode(pJob, pTask);
×
364
    taosHashClear(pTask->execNodes);
×
365

366
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
×
367
  }
368

369
  return TSDB_CODE_SUCCESS;
×
370
}
371

372
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode) {
4,675✔
373
  SSchRedirectCtx *pCtx = &pTask->redirectCtx;
4,675✔
374
  if (!pCtx->inRedirect) {
4,675✔
375
    pCtx->inRedirect = true;
190✔
376
    pCtx->periodMs = tsRedirectPeriod;
190✔
377
    pCtx->startTs = taosGetTimestampMs();
190✔
378

379
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
190!
380
      if (pEpSet) {
175!
381
        pCtx->roundTotal = pEpSet->numOfEps;
×
382
      } else if (pTask->candidateAddrs && taosArrayGetSize(pTask->candidateAddrs) > 0) {
175!
383
        SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
175✔
384
        pCtx->roundTotal = pAddr->epSet.numOfEps;
175✔
385
      } else {
386
        pCtx->roundTotal = SCH_DEFAULT_RETRY_TOTAL_ROUND;
×
387
      }
388
    } else {
389
      pCtx->roundTotal = 1;
15✔
390
    }
391

392
    goto _return;
190✔
393
  }
394

395
  pCtx->totalTimes++;
4,485✔
396
  pCtx->roundTimes++;
4,485✔
397

398
  if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
4,485!
399
    pCtx->roundTotal = pEpSet->numOfEps;
×
400
  }
401

402
  if (pCtx->roundTimes >= pCtx->roundTotal) {
4,485✔
403
    int64_t nowTs = taosGetTimestampMs();
2,492✔
404
    int64_t lastTime = nowTs - pCtx->startTs;
2,492✔
405
    if (lastTime > tsMaxRetryWaitTime) {
2,492✔
406
      SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
87!
407
                    nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
408
      pJob->noMoreRetry = true;
87✔
409
      SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode));
87!
410
    }
411

412
    pCtx->periodMs *= tsRedirectFactor;
2,405✔
413
    if (pCtx->periodMs > tsRedirectMaxPeriod) {
2,405✔
414
      pCtx->periodMs = tsRedirectMaxPeriod;
1,447✔
415
    }
416

417
    if (SCH_IS_DATA_BIND_TASK(pTask)) {    
2,405!
418
      int64_t leftTime = tsMaxRetryWaitTime - lastTime;
1,954✔
419
      pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
1,954✔
420
    }
421
    
422
    pCtx->roundTimes = 0;
2,405✔
423

424
    goto _return;
2,405✔
425
  }
426

427
  pTask->delayExecMs = 0;
1,993✔
428

429
_return:
4,588✔
430

431
  SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal,
4,588!
432
                pCtx->totalTimes, pTask->delayExecMs);
433

434
  return TSDB_CODE_SUCCESS;
4,588✔
435
}
436

437
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
4,588✔
438
  pTask->waitRetry = true;
4,588✔
439

440
  if (pTask->delayTimer) {
4,588✔
441
    taosTmrStop(pTask->delayTimer);
3,663✔
442
  }
443

444
  schDropTaskOnExecNode(pJob, pTask);
4,588✔
445
  taosHashClear(pTask->execNodes);
4,588✔
446
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
4,588✔
447
  schDeregisterTaskHb(pJob, pTask);
4,588✔
448
  taosMemoryFreeClear(pTask->msg);
4,588!
449
  pTask->msgLen = 0;
4,588✔
450
  pTask->lastMsgType = 0;
4,588✔
451
  pTask->childReady = 0;
4,588✔
452
  TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
4,588✔
453
}
4,588✔
454

455
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
×
456
  int32_t code = 0;
×
457

458
  SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
×
459

460
  if (!NO_RET_REDIRECT_ERROR(rspCode)) {
×
461
    SCH_UPDATE_REDIRECT_CODE(pJob, rspCode);
×
462
  }
463

464
  SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
×
465

466
  schResetTaskForRetry(pJob, pTask);
×
467

468
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
×
469
    if (pData && pData->pEpSet) {
×
470
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
×
471
    } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
×
472
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
×
473
      if (NULL == addr) {
×
474
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
×
475
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
476
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
477
      }
478

479
      SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
×
480
      SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
×
481
                    addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
482
    } else {
483
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
×
484
      if (NULL == addr) {
×
485
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
×
486
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
487
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
488
      }
489

490
      SCH_SWITCH_EPSET(addr);
×
491
      SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
×
492
    }
493

494
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
×
495

496
    SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
×
497

498
    return TSDB_CODE_SUCCESS;
×
499
  }
500

501
  // merge plan
502

503
  pTask->childReady = 0;
×
504

505
  qClearSubplanExecutionNode(pTask->plan);
×
506

507
  // Note: current error task and upper level merge task
508
  if ((pData && 0 == pData->len) || NULL == pData) {
×
509
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
×
510
  }
511

512
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
×
513

514
  int32_t childrenNum = taosArrayGetSize(pTask->children);
×
515
  for (int32_t i = 0; i < childrenNum; ++i) {
×
516
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
×
517
    SCH_LOCK_TASK(pChild);
×
518
    (void)schDoTaskRedirect(pJob, pChild, NULL, rspCode);  // error handled internal
×
519
    SCH_UNLOCK_TASK(pChild);
×
520
  }
521

522
  return TSDB_CODE_SUCCESS;
×
523

524
_return:
×
525

526
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
×
527
}
528

529
int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) {
×
530
  SSchLevel *pLevel = pTask->level;
×
531

532
  SCH_TASK_DLOG("start to reset level for current task set, execDone:%d, launched:%d",
×
533
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
534

535
  if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
×
536
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
×
537
  }
538

539
  (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
×
540

541
  int32_t childrenNum = taosArrayGetSize(pTask->children);
×
542
  for (int32_t i = 0; i < childrenNum; ++i) {
×
543
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
×
544
    if (NULL == pChild) {
×
545
      SCH_TASK_ELOG("fail to get the %dth child, childrenNum:%d", i, childrenNum);
×
546
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
547
    }
548

549
    SCH_LOCK_TASK(pChild);
×
550
    pLevel = pChild->level;
×
551
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
×
552
    (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
×
553
    SCH_UNLOCK_TASK(pChild);
×
554
  }
555

556
  SCH_TASK_DLOG("end to reset level for current task set, execDone:%d, launched:%d",
×
557
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
558

559
  return TSDB_CODE_SUCCESS;
×
560
}
561

562
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
×
563
  int32_t code = 0;
×
564

565
  SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));
×
566

567
  if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
×
568
    if (NULL == pData->pEpSet) {
×
569
      SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
×
570
      code = TSDB_CODE_INVALID_MSG;
×
571
      goto _return;
×
572
    }
573
  }
574

575
  SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
×
576

577
  SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask));
×
578

579
  SCH_RESET_JOB_LEVEL_IDX(pJob);
×
580
  atomic_add_fetch_64(&pJob->seriousId, 1);
×
581

582
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
×
583

584
  taosMemoryFreeClear(pData->pData);
×
585
  taosMemoryFreeClear(pData->pEpSet);
×
586

587
  SCH_RET(code);
×
588

589
_return:
×
590

591
  taosMemoryFreeClear(pData->pData);
×
592
  taosMemoryFreeClear(pData->pEpSet);
×
593

594
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
×
595
}
596

597
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
10,941,515✔
598
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
10,941,515✔
599
  if (0 != code) {
11,017,670!
600
    if (HASH_NODE_EXIST(code)) {
×
601
      SCH_TASK_DLOG("task already in execTask list, code:%x", code);
×
602
      return TSDB_CODE_SUCCESS;
×
603
    }
604

605
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:0x%x", errno);
×
606
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
607
  }
608

609
  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
11,017,670!
610

611
  return TSDB_CODE_SUCCESS;
11,009,512✔
612
}
613

614
/*
615
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
616
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
617
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
618
  } else {
619
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
620
  }
621

622
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
623
  if (0 != code) {
624
    if (HASH_NODE_EXIST(code)) {
625
      *moved = true;
626
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
627
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
628
    }
629

630
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
631
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
632
  }
633

634
  *moved = true;
635

636
  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
637

638
  return TSDB_CODE_SUCCESS;
639
}
640

641
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
642
  *moved = false;
643

644
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
645
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
646
  }
647

648
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
649
  if (0 != code) {
650
    if (HASH_NODE_EXIST(code)) {
651
      *moved = true;
652

653
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
654
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
655
    }
656

657
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
658
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
659
  }
660

661
  *moved = true;
662

663
  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
664

665
  return TSDB_CODE_SUCCESS;
666
}
667

668
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
669
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
670
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
671
  }
672

673
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
674
  if (0 != code) {
675
    if (HASH_NODE_EXIST(code)) {
676
      *moved = true;
677

678
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
679
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
680
    }
681

682
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
683
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
684
  }
685

686
  *moved = true;
687

688
  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
689

690
  return TSDB_CODE_SUCCESS;
691
}
692
*/
693

694
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
964✔
695
  if (pJob->noMoreRetry) {
964✔
696
    *needRetry = false;
87✔
697
    SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes,
87!
698
                  pTask->maxRetryTimes);
699
    return TSDB_CODE_SUCCESS;
87✔
700
  }
701

702
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
877!
703
    pTask->maxExecTimes++;
×
704
    pTask->maxRetryTimes++;
×
705
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
×
706
      pTask->timeoutUsec *= 2;
×
707
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
×
708
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
×
709
      }
710
    }
711
  }
712

713
  if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) {
877!
714
    *needRetry = false;
×
715
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes,
×
716
                  pTask->maxRetryTimes);
717
    return TSDB_CODE_SUCCESS;
×
718
  }
719

720
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
877!
721
    *needRetry = false;
×
722
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d", pTask->execId, pTask->maxExecTimes);
×
723
    return TSDB_CODE_SUCCESS;
×
724
  }
725

726
  if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) {
877!
727
    *needRetry = false;
878✔
728
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
878!
729
    return TSDB_CODE_SUCCESS;
877✔
730
  }
731

732
  /*
733
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
734
      if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
735
        *needRetry = false;
736
        SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
737
                      SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
738
        return TSDB_CODE_SUCCESS;
739
      }
740
    } else {
741
      int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
742

743
      if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
744
        *needRetry = false;
745
        SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
746
                      pTask->candidateIdx, candidateNum);
747
        return TSDB_CODE_SUCCESS;
748
      }
749
    }
750
  */
751

752
  *needRetry = true;
×
753
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));
×
754

755
  return TSDB_CODE_SUCCESS;
×
756
}
757

758
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
×
759
  (void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
×
760

761
  if (pTask->delayTimer) {
×
762
    taosTmrStop(pTask->delayTimer);
×
763
  }
764

765
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
×
766
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
×
767

768
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
×
769
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
×
770
  }
771

772
  schDeregisterTaskHb(pJob, pTask);
×
773

774
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
×
775
    SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
×
776
    if (NULL == addr) {
×
777
      SCH_TASK_ELOG("fail to the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
×
778
                    (int32_t)taosArrayGetSize(pTask->candidateAddrs));
779
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
780
    }
781

782
    SCH_SWITCH_EPSET(addr);
×
783
  } else {
784
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
×
785
  }
786

787
  SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
×
788

789
  return TSDB_CODE_SUCCESS;
×
790
}
791

792
int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
394,230✔
793
  int32_t addNum = 0;
394,230✔
794
  int32_t nodeNum = 0;
394,230✔
795

796
  if (pJob->nodeList) {
394,230!
797
    nodeNum = taosArrayGetSize(pJob->nodeList);
394,230✔
798

799
    for (int32_t i = 0; i < nodeNum; ++i) {
917,655✔
800
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
523,426✔
801
      if (NULL == nload) {
523,424!
802
        SCH_TASK_ELOG("fail to get the %dth node in nodeList, nodeNum:%d", i, nodeNum);
×
803
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
804
      }
805

806
      SQueryNodeAddr *naddr = &nload->addr;
523,424✔
807

808
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
1,046,847!
809
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
×
810
        SCH_ERR_RET(terrno);
×
811
      }
812

813
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
523,423!
814
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
815
                    SCH_GET_CUR_EP(naddr)->port);
816

817
      ++addNum;
523,424✔
818
    }
819
  }
820

821
  if (addNum <= 0) {
394,229!
822
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
×
823
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
3!
824
  }
825

826
  return TSDB_CODE_SUCCESS;
394,232✔
827
}
828

829
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
10,874,251✔
830
  if (NULL != pTask->candidateAddrs) {
10,874,251✔
831
    return TSDB_CODE_SUCCESS;
4,037✔
832
  }
833

834
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
10,870,214✔
835
  if (NULL == pTask->candidateAddrs) {
10,949,737!
UNCOV
836
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
×
UNCOV
837
    SCH_ERR_RET(terrno);
×
838
  }
839

840
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
10,950,710✔
841
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
21,104,336!
842
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
×
843
      SCH_ERR_RET(terrno);
×
844
    }
845

846
    SCH_TASK_DLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
10,547,856!
847

848
    return TSDB_CODE_SUCCESS;
10,534,920✔
849
  }
850

851
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
394,230!
UNCOV
852
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
×
UNCOV
853
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
854
  }
855

856
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
394,230!
857

858
  pTask->candidateIdx = taosRand() % taosArrayGetSize(pTask->candidateAddrs);
394,233✔
859

860
  /*
861
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
862
      tstrncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
863
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
864

865
      ++epSet->numOfEps;
866
    }
867
  */
868

869
  return TSDB_CODE_SUCCESS;
394,248✔
870
}
871

872
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
×
873
  int32_t code = TSDB_CODE_SUCCESS;
×
874
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
×
875
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
×
876
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
877
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
878
  }
879

880
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
×
881
  if (NULL == pAddr) {
×
882
    SCH_TASK_ELOG("fail to get task 0th condidataAddr, totalNum:%d", (int32_t)taosArrayGetSize(pTask->candidateAddrs));
×
883
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
884
  }
885

886
  char *origEpset = NULL;
×
887
  char *newEpset = NULL;
×
888

889
  SCH_ERR_RET(schDumpEpSet(&pAddr->epSet, &origEpset));
×
890
  SCH_ERR_JRET(schDumpEpSet(pEpSet, &newEpset));
×
891

892
  SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
×
893

894
  TAOS_MEMCPY(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
×
895

896
_return:
×
897

898
  taosMemoryFree(origEpset);
×
899
  taosMemoryFree(newEpset);
×
900

901
  return code;
×
902
}
903

904
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
×
905
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
×
906
  if (candidateNum <= 1) {
×
907
    goto _return;
×
908
  }
909

910
  switch (schMgmt.cfg.schPolicy) {
×
911
    case SCH_LOAD_SEQ:
×
912
    case SCH_ALL:
913
    default:
914
      if (++pTask->candidateIdx >= candidateNum) {
×
915
        pTask->candidateIdx = 0;
×
916
      }
917
      break;
×
918
    case SCH_RANDOM: {
×
919
      int32_t lastIdx = pTask->candidateIdx;
×
920
      while (lastIdx == pTask->candidateIdx) {
×
921
        pTask->candidateIdx = taosRand() % candidateNum;
×
922
      }
923
      break;
×
924
    }
925
  }
926

927
_return:
×
928

929
  SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum);
×
930

931
  return TSDB_CODE_SUCCESS;
×
932
}
933

934
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
4,588✔
935
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
4,588✔
936
  if (code) {
4,588✔
937
    SCH_TASK_WLOG("task already not in execTask list, code:%x", code);
2,591!
938
  }
939

940
  return TSDB_CODE_SUCCESS;
4,588✔
941
}
942

943
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
1,550,294✔
944
  if (NULL == pTask->execNodes) {
1,550,294!
945
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
×
946
    return;
×
947
  }
948

949
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
1,550,294✔
950
  if (size <= 0) {
1,550,296✔
951
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
73,041!
952
    return;
73,041✔
953
  }
954

955
  int32_t       i = 0;
1,477,255✔
956
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
1,477,255✔
957
  while (nodeInfo) {
2,954,511✔
958
    if (nodeInfo->handle) {
1,477,256✔
959
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
1,477,231✔
960
      void *pExecId = taosHashGetKey(nodeInfo, NULL);
1,477,231✔
961
      (void)schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId);  // ignore error and continue
1,477,230✔
962

963
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
1,477,231!
964
    } else {
965
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
25!
966
    }
967

968
    ++i;
1,477,256✔
969
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
1,477,256✔
970
  }
971

972
  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
1,477,255!
973
}
974

975
int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) {
×
976
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
×
977
  if (size <= 0) {
×
978
    SCH_TASK_DLOG("task no exec address to notify, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
×
979
    return TSDB_CODE_SUCCESS;
×
980
  }
981

982
  int32_t       i = 0;
×
983
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
×
984
  while (nodeInfo) {
×
985
    if (nodeInfo->handle) {
×
986
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
×
987
      SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_TASK_NOTIFY, &type));
×
988
      SCH_TASK_DLOG("start to notify %d to task's %dth execNode", type, i);
×
989
    } else {
990
      SCH_TASK_DLOG("no need to notify %d to task %dth execNode", type, i);
×
991
    }
992

993
    ++i;
×
994
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
×
995
  }
996

997
  SCH_TASK_DLOG("task has been notified %d on %d exec nodes", type, size);
×
998
  return TSDB_CODE_SUCCESS;
×
999
}
1000

1001
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
779,629✔
1002
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
779,629✔
1003
  SSchTask *pTask = NULL;
779,637✔
1004
  SSchJob  *pJob = NULL;
779,637✔
1005

1006
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
779,637✔
1007
         pEpId->ep.port);
1008

1009
  for (int32_t i = 0; i < taskNum; ++i) {
791,944✔
1010
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
12,291✔
1011
    if (NULL == pStatus) {
12,291!
1012
      qError("fail to get the %dth task status in hb rsp, taskNum:%d", i, taskNum);
×
1013
      continue;
×
1014
    }
1015

1016
    int32_t code = 0;
12,291✔
1017

1018
    qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId,
12,291✔
1019
           pStatus->clientId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status));
1020

1021
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
12,291✔
1022
      continue;
3,819✔
1023
    }
1024

1025
    if (pStatus->execId != pTask->execId) {
8,472!
1026
      // TODO
1027
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
×
1028
      schProcessOnCbEnd(pJob, pTask, 0);
×
1029
      continue;
×
1030
    }
1031

1032
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
8,472!
1033
      // RECORD AND HANDLE ERROR!!!!
1034
      schProcessOnCbEnd(pJob, pTask, 0);
×
1035
      continue;
×
1036
    }
1037

1038
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
8,472✔
1039
      code = schRescheduleTask(pJob, pTask);
29✔
1040
    }
1041

1042
    schProcessOnCbEnd(pJob, pTask, code);
8,472✔
1043
  }
1044

1045
  return TSDB_CODE_SUCCESS;
779,653✔
1046
}
1047

1048
int32_t schHandleExplainRes(SArray *pExplainRes) {
1,476✔
1049
  int32_t code = 0;
1,476✔
1050
  int32_t resNum = taosArrayGetSize(pExplainRes);
1,476✔
1051
  if (resNum <= 0) {
1,476✔
1052
    goto _return;
888✔
1053
  }
1054

1055
  SSchTask *pTask = NULL;
588✔
1056
  SSchJob  *pJob = NULL;
588✔
1057

1058
  for (int32_t i = 0; i < resNum; ++i) {
1,176✔
1059
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
588✔
1060
    if (NULL == localRsp) {
588!
1061
      qError("fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1062
      continue;
×
1063
    }
1064

1065
    qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg",
588!
1066
           localRsp->qId, localRsp->cId, localRsp->tId);
1067

1068
    pJob = NULL;
588✔
1069
    (void)schAcquireJob(localRsp->rId, &pJob);
588✔
1070
    if (NULL == pJob) {
588!
1071
      qWarn("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64,
×
1072
            localRsp->qId, localRsp->cId, localRsp->tId, localRsp->rId);
1073
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
×
1074
    }
1075

1076
    int8_t status = 0;
588✔
1077
    if (schJobNeedToStop(pJob, &status)) {
588!
1078
      SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
×
1079
      (void)schReleaseJob(pJob->refId);
×
1080
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1081
    }
1082

1083
    code = schGetTaskInJob(pJob, localRsp->tId, &pTask);
588✔
1084

1085
    if (TSDB_CODE_SUCCESS == code) {
588!
1086
      code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
588✔
1087
    }
1088

1089
    (void)schReleaseJob(pJob->refId);
588✔
1090

1091
    qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x",
588!
1092
           localRsp->qId, localRsp->cId, localRsp->tId, code);
1093

1094
    SCH_ERR_JRET(code);
588!
1095

1096
    localRsp->rsp.numOfPlans = 0;
588✔
1097
    localRsp->rsp.subplanInfo = NULL;
588✔
1098
    pTask = NULL;
588✔
1099
  }
1100

1101
_return:
588✔
1102

1103
  for (int32_t i = 0; i < resNum; ++i) {
2,064✔
1104
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
588✔
1105
    if (NULL == localRsp) {
588!
1106
      qError("in _return fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1107
      continue;
×
1108
    }
1109

1110
    tFreeSExplainRsp(&localRsp->rsp);
588✔
1111
  }
1112

1113
  taosArrayDestroy(pExplainRes);
1,476✔
1114

1115
  SCH_RET(code);
1,476!
1116
}
1117

1118
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
10,960,543✔
1119
  SSubplan *plan = pTask->plan;
10,960,543✔
1120
  int32_t   code = 0;
10,960,543✔
1121

1122
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
10,960,543!
1123
    SCH_LOCK(SCH_WRITE, &pTask->planLock);
10,972,338!
1124
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
10,844,718✔
1125
    SCH_UNLOCK(SCH_WRITE, &pTask->planLock);
10,848,265!
1126

1127
    if (TSDB_CODE_SUCCESS != code) {
10,893,962!
1128
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
×
1129
                    pTask->msgLen);
1130
      SCH_ERR_RET(code);
×
1131
    } else if (tsQueryPlannerTrace) {
10,893,962✔
1132
      char   *msg = NULL;
1,278✔
1133
      int32_t msgLen = 0;
1,278✔
1134
      SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
1,278!
1135
      SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
1,280!
1136
      taosMemoryFree(msg);
1,280!
1137
    }
1138
  }
1139

1140
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
10,882,169!
1141

1142
  if (SCH_IS_QUERY_JOB(pJob)) {
10,938,607✔
1143
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
1,477,872!
1144
  }
1145

1146
  SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType, NULL));
10,938,430!
1147
}
1148

1149
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
71,272✔
1150
  // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
1151
  if (NULL == schMgmt.queryMgmt) {
71,272✔
1152
    void* p = NULL;
122✔
1153
    SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, &p, NULL));
122!
1154
    if (atomic_val_compare_exchange_ptr(&schMgmt.queryMgmt, NULL, p)) {
122✔
1155
      qWorkerDestroy(&p);
6✔
1156
    }
1157
  }
1158

1159
  SArray *explainRes = NULL;
71,272✔
1160
  int32_t code = 0;
71,272✔
1161
  SQWMsg  qwMsg = {0};
71,272✔
1162
  qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
71,272✔
1163
  qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
71,272✔
1164
  qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
71,272✔
1165
  qwMsg.msg = pTask->plan;
71,272✔
1166
  qwMsg.msgType = pTask->plan->msgType;
71,272✔
1167
  qwMsg.connInfo.handle = pJob->conn.pTrans;
71,272✔
1168
  qwMsg.pWorkerCb = pJob->pWorkerCb;
71,272✔
1169

1170
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
71,272✔
1171
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
588✔
1172
    if (NULL == explainRes) {
588!
1173
      SCH_ERR_RET(terrno);
×
1174
    }
1175
  }
1176

1177
  SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, pJob->seriousId, pJob->queryId, pTask->clientId, pTask->taskId,
71,272!
1178
                                        pJob->refId, pTask->execId, &qwMsg, explainRes));
1179

1180
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
71,267✔
1181
    SCH_ERR_RET(schHandleExplainRes(explainRes));
588!
1182
    explainRes = NULL;
588✔
1183
  }
1184

1185
  SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
71,267!
1186

1187
_return:
71,267✔
1188

1189
  taosArrayDestroy(explainRes);
71,272✔
1190

1191
  SCH_RET(code);
71,272!
1192
}
1193

1194
int32_t schLaunchTaskImpl(void *param) {
10,979,244✔
1195
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
10,979,244✔
1196
  SSchJob     *pJob = NULL;
10,979,244✔
1197

1198
  (void)schAcquireJob(pCtx->jobRid, &pJob);
10,979,244✔
1199
  if (NULL == pJob) {
11,041,542!
1200
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
×
1201
    taosMemoryFree(param);
×
1202
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
×
1203
  }
1204

1205
  SSchTask *pTask = pCtx->pTask;
11,041,542✔
1206

1207
  if (pCtx->asyncLaunch) {
11,041,542✔
1208
    SCH_LOCK_TASK(pTask);
1,203,099!
1209
  }
1210

1211
  pTask->execId++;
11,041,037✔
1212
  pTask->retryTimes++;
11,041,037✔
1213
  pTask->waitRetry = false;
11,041,037✔
1214

1215
  int8_t  status = 0;
11,041,037✔
1216
  int32_t code = 0;
11,041,037✔
1217

1218
  if (atomic_load_64(&pTask->seriousId) < atomic_load_64(&pJob->seriousId)) {
11,041,037!
1219
    SCH_TASK_DLOG("task seriousId:0x%" PRIx64 " is smaller than job seriousId:0x%" PRIx64 ", skip launch",
×
1220
                  pTask->seriousId, pJob->seriousId);
1221
    goto _return;
×
1222
  }
1223

1224
  (void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
10,951,268✔
1225

1226
  SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d",
11,053,089!
1227
                SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
1228

1229
  SCH_LOG_TASK_START_TS(pTask);
22,021,873!
1230

1231
  if (schJobNeedToStop(pJob, &status)) {
11,011,203!
1232
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
×
1233
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1234
  }
1235

1236
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
11,023,907!
1237
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
10,956,148!
1238
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
11,006,821✔
1239
  }
1240

1241
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
11,044,506✔
1242
    SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
71,272!
1243
  } else {
1244
    SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
10,973,234!
1245
  }
1246

1247
#if 0
1248
  if (SCH_IS_QUERY_JOB(pJob)) {
1249
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
1250
  }
1251
#endif
1252

1253
_return:
10,971,220✔
1254

1255
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
11,042,492✔
1256
    if (code) {
1,202,465✔
1257
      code = schProcessOnTaskFailure(pJob, pTask, code);
5✔
1258
    }
1259
    if (code) {
1,202,465✔
1260
      code = schHandleJobFailure(pJob, code);
5✔
1261
    }
1262
  }
1263

1264
  if (pCtx->asyncLaunch) {
11,042,492✔
1265
    SCH_UNLOCK_TASK(pTask);
1,202,905!
1266
  }
1267

1268
  (void)schReleaseJob(pJob->refId);
11,042,394✔
1269

1270
  taosMemoryFree(param);
11,042,885!
1271

1272
  SCH_RET(code);
11,047,305!
1273
}
1274

1275
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
10,956,039✔
1276
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
10,956,039!
1277
  if (NULL == param) {
11,011,866!
1278
    SCH_ERR_RET(terrno);
×
1279
  }
1280

1281
  param->jobRid = pJob->refId;
11,011,866✔
1282
  param->pTask = pTask;
11,011,866✔
1283

1284
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
11,011,866✔
1285
    param->asyncLaunch = true;
1,203,137✔
1286
    SCH_ERR_RET(taosAsyncExec(schLaunchTaskImpl, param, NULL));
1,203,137!
1287
  } else {
1288
    SCH_ERR_RET(schLaunchTaskImpl(param));
9,808,729!
1289
  }
1290

1291
  return TSDB_CODE_SUCCESS;
11,034,512✔
1292
}
1293

1294
// Note: no more error processing, handled in function internal
1295
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
10,955,975✔
1296
  bool    enough = false;
10,955,975✔
1297
  int32_t code = 0;
10,955,975✔
1298

1299
  SCH_SET_TASK_HANDLE(pTask, NULL);
10,955,975✔
1300

1301
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
10,955,975!
1302
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
20!
1303

1304
    if (enough) {
20!
1305
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
20!
1306
    }
1307
  } else {
1308
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
10,955,955!
1309
  }
1310

1311
  return TSDB_CODE_SUCCESS;
11,031,497✔
1312

1313
_return:
×
1314

1315
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
×
1316
}
1317

1318
void schHandleTimerEvent(void *param, void *tmrId) {
1,872✔
1319
  SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
1,872✔
1320
  SSchTask       *pTask = NULL;
1,872✔
1321
  SSchJob        *pJob = NULL;
1,872✔
1322
  int32_t         code = 0;
1,872✔
1323

1324
  qDebug("delayTimer %" PRIuPTR " is launched", (uintptr_t)tmrId);
1,872!
1325

1326
  int64_t  rId = pTimerParam->rId;
1,872✔
1327
  uint64_t queryId = pTimerParam->queryId;
1,872✔
1328
  uint64_t taskId = pTimerParam->taskId;
1,872✔
1329

1330
  if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, taskId)) {
1,872!
1331
    return;
×
1332
  }
1333

1334
  if (0 == atomic_load_8(&pTask->delayLaunchPar.exit)) {
1,872!
1335
    code = schLaunchTask(pJob, pTask);
1,872✔
1336
  } else {
1337
    SCH_TASK_DLOG("task will not be launched since query job exiting, status: %d", pTask->status);
×
1338
  }
1339

1340
  schProcessOnCbEnd(pJob, pTask, code);
1,872✔
1341
}
1342

1343
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
10,966,825✔
1344
  if (pTask->delayExecMs > 0) {
10,966,825✔
1345
    pTask->delayLaunchPar.rId = pJob->refId;
1,956✔
1346
    pTask->delayLaunchPar.queryId = pJob->queryId;
1,956✔
1347
    pTask->delayLaunchPar.taskId = pTask->taskId;
1,956✔
1348

1349
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
1,956!
1350
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
1,956✔
1351

1352
    if (NULL == pTask->delayTimer) {
1,956✔
1353
      pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer);
162✔
1354
      if (NULL == pTask->delayTimer) {
162!
1355
        SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
×
1356
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1357
      }
1358

1359
      SCH_TASK_DLOG("task delayTimer %" PRIuPTR " is started", (uintptr_t)pTask->delayTimer);
162!
1360

1361
      return TSDB_CODE_SUCCESS;
162✔
1362
    }
1363

1364
    if (taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer, &pTask->delayTimer)) {
1,794!
1365
      SCH_TASK_ELOG("taosTmrReset delayExec timer failed, handle:%p", schMgmt.timer);
×
1366
    }
1367

1368
    return TSDB_CODE_SUCCESS;
1,794✔
1369
  }
1370

1371
  SCH_RET(schLaunchTask(pJob, pTask));
10,964,869!
1372
}
1373

1374
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
9,922,725✔
1375
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
9,922,725!
1376

1377
  for (int32_t i = 0; i < level->taskNum; ++i) {
20,482,820✔
1378
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
10,450,763✔
1379
    pTask->seriousId = pJob->seriousId;
10,445,096✔
1380
    
1381
    SCH_TASK_DLOG("task seriousId set to 0x%" PRIx64, pTask->seriousId);
10,445,096!
1382

1383
    SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
10,445,097!
1384
  }
1385

1386
  return TSDB_CODE_SUCCESS;
10,032,057✔
1387
}
1388

1389
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
10,082,704✔
1390
  if (!SCH_JOB_NEED_DROP(pJob)) {
10,082,704✔
1391
    return;
9,446,377✔
1392
  }
1393

1394
  void *pIter = taosHashIterate(list, NULL);
636,327✔
1395
  while (pIter) {
2,185,544✔
1396
    SSchTask *pTask = *(SSchTask **)pIter;
1,545,707✔
1397

1398
    if (pTask->delayTimer) {
1,545,707✔
1399
      schStopTaskDelayTimer(pJob, pTask, true);
132✔
1400
    }
1401

1402
    SCH_LOCK_TASK(pTask);
1,545,707!
1403
    schDropTaskOnExecNode(pJob, pTask);
1,545,710✔
1404
    SCH_UNLOCK_TASK(pTask);
1,545,709!
1405

1406
    pIter = taosHashIterate(list, pIter);
1,545,709✔
1407
  }
1408
}
1409

1410
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pCurrTask) {
×
1411
  int32_t code = TSDB_CODE_SUCCESS;
×
1412

1413
  SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type));
×
1414

1415
  void *pIter = taosHashIterate(list, NULL);
×
1416
  while (pIter) {
×
1417
    SSchTask *pTask = *(SSchTask **)pIter;
×
1418
    if (pTask != pCurrTask) {
×
1419
      SCH_LOCK_TASK(pTask);
×
1420
      code = schNotifyTaskOnExecNode(pJob, pTask, type);
×
1421
      SCH_UNLOCK_TASK(pTask);
×
1422

1423
      if (TSDB_CODE_SUCCESS != code) {
×
1424
        break;
×
1425
      }
1426
    }
1427

1428
    pIter = taosHashIterate(list, pIter);
×
1429
  }
1430

1431
  SCH_RET(code);
×
1432
}
1433

1434
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
539,709✔
1435
  SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL));
539,709!
1436
}
1437

1438
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
73,608✔
1439
  void   *pRsp = NULL;
73,608✔
1440
  int32_t code = 0;
73,608✔
1441
  SArray *explainRes = NULL;
73,608✔
1442

1443
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
73,608✔
1444
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
888✔
1445
    if (NULL == explainRes) {
888!
1446
      SCH_ERR_RET(terrno);
×
1447
    }
1448
  }
1449

1450
  SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, pJob->seriousId, pJob->queryId, pTask->clientId, pTask->taskId,
73,608!
1451
                                        pJob->refId, pTask->execId, &pRsp, explainRes));
1452

1453
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
73,608✔
1454
    SCH_ERR_RET(schHandleExplainRes(explainRes));
888!
1455
    explainRes = NULL;
888✔
1456
  }
1457

1458
  SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
73,608!
1459

1460
_return:
73,608✔
1461

1462
  taosArrayDestroy(explainRes);
73,608✔
1463

1464
  SCH_RET(code);
73,608!
1465
}
1466

1467
// Note: no more error processing, handled in function internal
1468
int32_t schLaunchFetchTask(SSchJob *pJob) {
613,316✔
1469
  int32_t code = 0;
613,316✔
1470

1471
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
613,316✔
1472
  if (fetchRes) {
613,317!
1473
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
×
1474
    return TSDB_CODE_SUCCESS;
×
1475
  }
1476

1477
  SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH);
613,317✔
1478

1479
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
613,320!
1480
    SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
73,608!
1481
  } else {
1482
    SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
539,712!
1483
  }
1484

1485
  return TSDB_CODE_SUCCESS;
613,317✔
1486

1487
_return:
×
1488

1489
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
×
1490
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc