• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4308

14 Jun 2025 02:06PM UTC coverage: 62.454% (-0.3%) from 62.777%
#4308

push

travis-ci

web-flow
fix: taosdump windows pthread_mutex_unlock crash(3.0) (#31357)

* fix: windows pthread_mutex_unlock crash

* enh: sync from main fix taosdump crash windows

* fix: restore .github action branch to main

153985 of 315105 branches covered (48.87%)

Branch coverage included in aggregate %.

238120 of 312727 relevant lines covered (76.14%)

6462519.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.74
/source/libs/scheduler/src/schTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "command.h"
18
#include "query.h"
19
#include "qworker.h"
20
#include "schInt.h"
21
#include "tglobal.h"
22
#include "tmsg.h"
23
#include "tref.h"
24
#include "trpc.h"
25

26
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
4,384,241✔
27
  schDeregisterTaskHb(pJob, pTask);
4,384,241✔
28

29
  if (pTask->candidateAddrs) {
4,384,323✔
30
    taosArrayDestroy(pTask->candidateAddrs);
3,883,596✔
31
  }
32

33
  taosMemoryFreeClear(pTask->msg);
4,384,341!
34

35
  if (pTask->children) {
4,384,340✔
36
    taosArrayDestroy(pTask->children);
718,564✔
37
  }
38

39
  if (pTask->parents) {
4,384,341✔
40
    taosArrayDestroy(pTask->parents);
1,556,641✔
41
  }
42

43
  if (pTask->execNodes) {
4,384,340✔
44
    taosHashCleanup(pTask->execNodes);
4,384,317✔
45
  }
46

47
  taosArrayDestroy(pTask->profile.execTime);
4,384,401✔
48
}
4,384,368✔
49

50
void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
4,383,899✔
51
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
4,383,899!
52
    pTask->maxRetryTimes = SCH_DEFAULT_MAX_RETRY_NUM;
4,347,987✔
53
  } else {
54
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
35,912✔
55
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM);
35,912✔
56
  }
57

58
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
4,383,899✔
59
}
4,383,899✔
60

61
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
4,383,934✔
62
  int32_t code = 0;
4,383,934✔
63

64
  pTask->plan = pPlan;
4,383,934✔
65
  pTask->level = pLevel;
4,383,934✔
66
  pTask->seriousId = pJob->seriousId;
4,383,934✔
67
  pTask->execId = -1;
4,383,934✔
68
  pTask->failedExecId = -2;
4,383,934✔
69
  pTask->failedSeriousId = 0;
4,383,934✔
70
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
4,383,934✔
71
  pTask->clientId = getClientId();
4,383,934✔
72
  pTask->taskId = schGenTaskId();
4,383,934✔
73

74
  schInitTaskRetryTimes(pJob, pTask, pLevel);
4,384,391✔
75

76
  pTask->execNodes =
4,384,341✔
77
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
4,384,099✔
78
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
4,384,341✔
79
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
4,384,395!
80
    SCH_ERR_JRET(terrno);
×
81
  }
82

83
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
4,384,399✔
84

85
  SCH_TASK_TLOG("task initialized, max times %d:%d", pTask->maxRetryTimes, pTask->maxExecTimes);
4,384,297!
86

87
  return TSDB_CODE_SUCCESS;
4,384,312✔
88

89
_return:
×
90

91
  taosArrayDestroy(pTask->profile.execTime);
×
92
  taosHashCleanup(pTask->execNodes);
×
93

94
  SCH_RET(code);
×
95
}
96

97
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
3,956,849✔
98
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
3,956,849✔
99
    return TSDB_CODE_SUCCESS;
74,975✔
100
  }
101

102
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
3,881,874✔
103
  if (NULL == addr) {
3,882,034✔
104
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
10!
105
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
106
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
107
  }
108

109
  pTask->succeedAddr = *addr;
3,881,846✔
110

111
  return TSDB_CODE_SUCCESS;
3,881,846✔
112
}
113

114
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
1,855,298✔
115
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
1,855,298!
116

117
  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
1,855,298!
118
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", ERRNO);
×
119
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
120
  }
121

122
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
1,857,459!
123

124
  return TSDB_CODE_SUCCESS;
1,857,183✔
125
}
126

127
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
957✔
128
  if (NULL == pTask->execNodes) {
957✔
129
    return TSDB_CODE_SUCCESS;
10✔
130
  }
131

132
  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
947✔
133
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
22!
134
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
22!
135
  } else {
136
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
925!
137
  }
138

139
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
925!
140
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
10!
141
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
10!
142
  }
143

144
  return TSDB_CODE_SUCCESS;
915✔
145
}
146

147
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
2,622,793✔
148
  if (taosHashGetSize(pTask->execNodes) <= 0) {
2,622,793✔
149
    return TSDB_CODE_SUCCESS;
640✔
150
  }
151

152
  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
2,622,353✔
153
  if (NULL == nodeInfo) {  // ignore it
2,622,374✔
154
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
618!
155
                  pTask->execId, pTask->waitRetry);
156
    return TSDB_CODE_SUCCESS;
619✔
157
  }
158

159
  nodeInfo->handle = handle;
2,621,756✔
160

161
  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);
2,621,756!
162

163
  return TSDB_CODE_SUCCESS;
2,621,773✔
164
}
165

166
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, uint64_t seriousId, int32_t execId) {
2,623,740✔
167
  if (dropExecNode) {
2,623,740✔
168
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
927!
169
  }
170

171
  SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId));
2,622,813!
172

173
  if ((seriousId != pTask->seriousId || seriousId <= pTask->failedSeriousId) || 
2,623,034!
174
      (execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) {  // ignore it
2,621,994!
175
    SCH_TASK_DLOG("handle not updated since seriousId:0x%" PRIx64 " or execId:%d is not lastest,"
1,247!
176
                  "current seriousId:0x%" PRIx64 " execId %d, failedSeriousId:0x%" PRIx64 " failedExecId:%d, waitRetry %d", 
177
                  seriousId, execId, pTask->seriousId, pTask->execId, pTask->failedSeriousId, pTask->failedExecId, pTask->waitRetry);
178
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
1,227!
179
  }
180

181
  SCH_SET_TASK_HANDLE(pTask, handle);
2,621,745✔
182

183
  return TSDB_CODE_SUCCESS;
2,621,745✔
184
}
185

186
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
6,619✔
187
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
6,619✔
188
    return TSDB_CODE_SCH_IGNORE_ERROR;
4,096✔
189
  }
190

191
  pTask->failedExecId = pTask->execId;
2,523✔
192

193
  int8_t jobStatus = 0;
2,523✔
194
  if (schJobNeedToStop(pJob, &jobStatus)) {
2,523✔
195
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
132!
196
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
132!
197
  }
198

199
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
2,391✔
200
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
2,391!
201
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
20!
202
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
20!
203
  }
204

205
  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
2,371✔
206
    SCH_LOG_TASK_WAIT_TS(pTask);
10!
207
  } else {
208
    SCH_LOG_TASK_END_TS(pTask);
2,361✔
209
  }
210

211
  bool    needRetry = false;
2,371✔
212
  bool    moved = false;
2,371✔
213
  int32_t taskDone = 0;
2,371✔
214

215
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
2,371!
216

217
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
2,371!
218

219
  if (!needRetry) {
2,371✔
220
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
2,361!
221

222
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);
2,361✔
223

224
    if (SCH_JOB_NEED_WAIT(pJob)) {
2,361✔
225
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
1,142!
226
      pTask->level->taskFailed++;
1,142✔
227
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
1,142✔
228
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
1,142!
229

230
      schUpdateJobErrCode(pJob, errCode);
1,142✔
231

232
      if (taskDone < pTask->level->taskNum) {
1,142✔
233
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
7!
234
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
7!
235
      }
236

237
      SCH_RET(atomic_load_32(&pJob->errCode));
1,135!
238
    }
239
  } else {
240
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
10!
241

242
    return TSDB_CODE_SUCCESS;
×
243
  }
244

245
  SCH_RET(errCode);
1,219!
246
}
247

248
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
3,956,804✔
249
  bool    moved = false;
3,956,804✔
250
  int32_t code = 0;
3,956,804✔
251

252
  SCH_TASK_TLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
3,956,804!
253

254
  SCH_LOG_TASK_END_TS(pTask);
3,956,862✔
255

256
  int32_t taskDone = atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1);
3,956,949✔
257

258
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
3,957,018✔
259

260
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
3,957,033!
261

262
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
3,956,824!
263

264
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
3,956,890✔
265
  if (parentNum == 0) {
3,956,885✔
266
    int32_t taskDone = 0;
2,712,223✔
267
    if (SCH_JOB_NEED_WAIT(pJob)) {
2,712,223✔
268
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
2,032,664!
269
      pTask->level->taskSucceed++;
2,032,697✔
270
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
2,032,697✔
271
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
2,032,697!
272

273
      if (taskDone < pTask->level->taskNum) {
2,032,647✔
274
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
56,724!
275
        return TSDB_CODE_SUCCESS;
56,724✔
276
      }
277
      
278
      SCH_TASK_TLOG("taskDone number reach level task number, done:%d, total:%d", taskDone, pTask->level->taskNum);
1,975,923!
279

280
      if (pTask->level->taskFailed > 0) {
1,975,900✔
281
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
3!
282
      }
283

284
      SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
1,975,897!
285
    }
286
    
287
    pJob->resNode = pTask->succeedAddr;
679,559✔
288

289
    pJob->fetchTask = pTask;
679,559✔
290

291
    SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
679,559!
292
  }
293

294
  /*
295
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
296
      tstrncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
297
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
298

299
      ++job->dataSrcEps.numOfEps;
300
    }
301
  */
302

303
  for (int32_t i = 0; i < parentNum; ++i) {
2,489,430✔
304
    SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
1,244,585✔
305
    if (NULL == parent) {
1,244,599!
306
      SCH_TASK_ELOG("fail to get task %d parent, parentNum: %d", i, parentNum);
×
307
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
308
    }
309

310
    SCH_LOCK(SCH_WRITE, &parent->planLock);
1,244,599!
311
    SDownstreamSourceNode source = {
3,734,322✔
312
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
313
        .clientId = pTask->clientId,
1,244,774✔
314
        .taskId = pTask->taskId,
1,244,774✔
315
        .sId = pTask->seriousId,
1,244,774✔
316
        .execId = pTask->execId,
1,244,774✔
317
        .addr = pTask->succeedAddr,
1,244,774✔
318
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
1,244,774✔
319
        .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
1,244,774!
320
    };
321
    code = qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
1,244,774✔
322
    if (TSDB_CODE_SUCCESS != code) {
1,244,774!
323
      SCH_TASK_ELOG("qSetSubplanExecutionNode failed, groupId: %d", pTask->plan->id.groupId);
×
324
    }
325
    SCH_UNLOCK(SCH_WRITE, &parent->planLock);
1,244,774!
326

327
    SCH_ERR_RET(code);
1,244,776!
328

329
    int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
1,244,776✔
330

331
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
1,244,781✔
332
      SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
595,981!
333

334
      parent->seriousId = pJob->seriousId;
595,982✔
335
      TSWAP(pTask, parent);
595,982✔
336
      SCH_TASK_TLOG("task seriousId set to 0x%" PRIx64, pTask->seriousId);
595,982!
337
      TSWAP(pTask, parent);
595,982✔
338

339
      SCH_ERR_RET(schDelayLaunchTask(pJob, parent));
595,982!
340
    }
341
  }
342

343
  if (taskDone == pTask->level->taskNum) {
1,244,845✔
344
    SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));
489,878!
345
  }
346

347
  return TSDB_CODE_SUCCESS;
1,244,845✔
348
}
349

350
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
61✔
351
  if (!schMgmt.cfg.enableReSchedule) {
61✔
352
    return TSDB_CODE_SUCCESS;
31✔
353
  }
354

355
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
30!
356
    return TSDB_CODE_SUCCESS;
20✔
357
  }
358

359
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
10!
360
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
×
361
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
×
362
    schDropTaskOnExecNode(pJob, pTask);
×
363
    taosHashClear(pTask->execNodes);
×
364

365
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
×
366
  }
367

368
  return TSDB_CODE_SUCCESS;
10✔
369
}
370

371
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode) {
9,219✔
372
  SSchRedirectCtx *pCtx = &pTask->redirectCtx;
9,219✔
373
  if (!pCtx->inRedirect) {
9,219✔
374
    pCtx->inRedirect = true;
1,518✔
375
    pCtx->periodMs = tsRedirectPeriod;
1,518✔
376
    pCtx->startTs = taosGetTimestampMs();
1,518✔
377

378
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
1,518!
379
      if (pEpSet) {
1,489✔
380
        pCtx->roundTotal = pEpSet->numOfEps;
10✔
381
      } else if (pTask->candidateAddrs && taosArrayGetSize(pTask->candidateAddrs) > 0) {
1,479!
382
        SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
1,479✔
383
        pCtx->roundTotal = pAddr->epSet.numOfEps;
1,479✔
384
      } else {
385
        pCtx->roundTotal = SCH_DEFAULT_RETRY_TOTAL_ROUND;
×
386
      }
387
    } else {
388
      pCtx->roundTotal = 1;
29✔
389
    }
390

391
    goto _return;
1,518✔
392
  }
393

394
  pCtx->totalTimes++;
7,701✔
395
  pCtx->roundTimes++;
7,701✔
396

397
  if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
7,701!
398
    pCtx->roundTotal = pEpSet->numOfEps;
×
399
  }
400

401
  if (pCtx->roundTimes >= pCtx->roundTotal) {
7,701✔
402
    int64_t nowTs = taosGetTimestampMs();
3,694✔
403
    int64_t lastTime = nowTs - pCtx->startTs;
3,694✔
404
    if (lastTime > tsMaxRetryWaitTime) {
3,694✔
405
      SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
72!
406
                    nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
407
      pJob->noMoreRetry = true;
72✔
408
      SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode));
72!
409
    }
410

411
    pCtx->periodMs *= tsRedirectFactor;
3,622✔
412
    if (pCtx->periodMs > tsRedirectMaxPeriod) {
3,622✔
413
      pCtx->periodMs = tsRedirectMaxPeriod;
1,846✔
414
    }
415

416
    if (SCH_IS_DATA_BIND_TASK(pTask)) {    
3,622!
417
      int64_t leftTime = tsMaxRetryWaitTime - lastTime;
2,940✔
418
      pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
2,940✔
419
    }
420
    
421
    pCtx->roundTimes = 0;
3,622✔
422

423
    goto _return;
3,622✔
424
  }
425

426
  pTask->delayExecMs = 0;
4,007✔
427

428
_return:
9,147✔
429

430
  SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal,
9,147!
431
                pCtx->totalTimes, pTask->delayExecMs);
432

433
  return TSDB_CODE_SUCCESS;
9,147✔
434
}
435

436
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
9,127✔
437
  pTask->waitRetry = true;
9,127✔
438

439
  if (pTask->delayTimer) {
9,127✔
440
    UNUSED(taosTmrStop(pTask->delayTimer));
7,319✔
441
  }
442

443
  schDropTaskOnExecNode(pJob, pTask);
9,127✔
444
  taosHashClear(pTask->execNodes);
9,127✔
445
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
9,127✔
446
  schDeregisterTaskHb(pJob, pTask);
9,127✔
447
  taosMemoryFreeClear(pTask->msg);
9,127!
448
  pTask->msgLen = 0;
9,127✔
449
  pTask->lastMsgType = 0;
9,127✔
450
  pTask->childReady = 0;
9,127✔
451
  TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
9,127✔
452
}
9,127✔
453

454
#if 0
455

456
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
457
  int32_t code = 0;
458

459
  SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
460

461
  if (!NO_RET_REDIRECT_ERROR(rspCode)) {
462
    SCH_UPDATE_REDIRECT_CODE(pJob, rspCode);
463
  }
464

465
  SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
466

467
  schResetTaskForRetry(pJob, pTask);
468

469
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
470
    if (pData && pData->pEpSet) {
471
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
472
    } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
473
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
474
      if (NULL == addr) {
475
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
476
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
477
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
478
      }
479

480
      SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
481
      SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
482
                    addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
483
    } else {
484
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
485
      if (NULL == addr) {
486
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
487
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
488
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
489
      }
490

491
      SCH_SWITCH_EPSET(addr);
492
      SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
493
    }
494

495
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
496

497
    SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
498

499
    return TSDB_CODE_SUCCESS;
500
  }
501

502
  // merge plan
503

504
  pTask->childReady = 0;
505

506
  qClearSubplanExecutionNode(pTask->plan);
507

508
  // Note: current error task and upper level merge task
509
  if ((pData && 0 == pData->len) || NULL == pData) {
510
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
511
  }
512

513
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
514

515
  int32_t childrenNum = taosArrayGetSize(pTask->children);
516
  for (int32_t i = 0; i < childrenNum; ++i) {
517
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
518
    SCH_LOCK_TASK(pChild);
519
    (void)schDoTaskRedirect(pJob, pChild, NULL, rspCode);  // error handled internal
520
    SCH_UNLOCK_TASK(pChild);
521
  }
522

523
  return TSDB_CODE_SUCCESS;
524

525
_return:
526

527
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
528
}
529

530
int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) {
531
  SSchLevel *pLevel = pTask->level;
532

533
  SCH_TASK_DLOG("start to reset level for current task set, execDone:%d, launched:%d",
534
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
535

536
  if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
537
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
538
  }
539

540
  (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
541

542
  int32_t childrenNum = taosArrayGetSize(pTask->children);
543
  for (int32_t i = 0; i < childrenNum; ++i) {
544
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
545
    if (NULL == pChild) {
546
      SCH_TASK_ELOG("fail to get the %dth child, childrenNum:%d", i, childrenNum);
547
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
548
    }
549

550
    SCH_LOCK_TASK(pChild);
551
    pLevel = pChild->level;
552
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
553
    (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
554
    SCH_UNLOCK_TASK(pChild);
555
  }
556

557
  SCH_TASK_DLOG("end to reset level for current task set, execDone:%d, launched:%d",
558
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
559

560
  return TSDB_CODE_SUCCESS;
561
}
562

563
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
564
  int32_t code = 0;
565

566
  SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));
567

568
  if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
569
    if (NULL == pData->pEpSet) {
570
      SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
571
      code = TSDB_CODE_INVALID_MSG;
572
      goto _return;
573
    }
574
  }
575

576
  SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
577

578
  SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask));
579

580
  SCH_RESET_JOB_LEVEL_IDX(pJob);
581
  atomic_add_fetch_64(&pJob->seriousId, 1);
582

583
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
584

585
  taosMemoryFreeClear(pData->pData);
586
  taosMemoryFreeClear(pData->pEpSet);
587

588
  SCH_RET(code);
589

590
_return:
591

592
  taosMemoryFreeClear(pData->pData);
593
  taosMemoryFreeClear(pData->pEpSet);
594

595
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
596
}
597
#endif
598

599
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
3,953,944✔
600
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
3,953,944✔
601
  if (0 != code) {
3,961,040✔
602
    if (HASH_NODE_EXIST(code)) {
20✔
603
      SCH_TASK_DLOG("task already in execTask list, code:0x%x", code);
10!
604
      return TSDB_CODE_SUCCESS;
10✔
605
    }
606

607
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, code:0x%x", code);
10!
608
    SCH_ERR_RET(code);
10!
609
  }
610

611
  SCH_TASK_TLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
3,961,020!
612

613
  return TSDB_CODE_SUCCESS;
3,948,774✔
614
}
615

616
/*
617
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
618
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
619
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
620
  } else {
621
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
622
  }
623

624
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
625
  if (0 != code) {
626
    if (HASH_NODE_EXIST(code)) {
627
      *moved = true;
628
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
629
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
630
    }
631

632
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", ERRNO);
633
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
634
  }
635

636
  *moved = true;
637

638
  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
639

640
  return TSDB_CODE_SUCCESS;
641
}
642

643
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
644
  *moved = false;
645

646
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
647
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
648
  }
649

650
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
651
  if (0 != code) {
652
    if (HASH_NODE_EXIST(code)) {
653
      *moved = true;
654

655
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
656
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
657
    }
658

659
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", ERRNO);
660
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
661
  }
662

663
  *moved = true;
664

665
  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
666

667
  return TSDB_CODE_SUCCESS;
668
}
669

670
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
671
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
672
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
673
  }
674

675
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
676
  if (0 != code) {
677
    if (HASH_NODE_EXIST(code)) {
678
      *moved = true;
679

680
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
681
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
682
    }
683

684
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", ERRNO);
685
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
686
  }
687

688
  *moved = true;
689

690
  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
691

692
  return TSDB_CODE_SUCCESS;
693
}
694
*/
695

696
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
2,401✔
697
  if (pJob->noMoreRetry) {
2,401✔
698
    *needRetry = false;
72✔
699
    SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes,
72!
700
                  pTask->maxRetryTimes);
701
    return TSDB_CODE_SUCCESS;
72✔
702
  }
703

704
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
2,329✔
705
    pTask->maxExecTimes++;
40✔
706
    pTask->maxRetryTimes++;
40✔
707
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
40✔
708
      pTask->timeoutUsec *= 2;
20✔
709
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
20✔
710
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
10✔
711
      }
712
    }
713
  }
714

715
  if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) {
2,329✔
716
    *needRetry = false;
2✔
717
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes,
2!
718
                  pTask->maxRetryTimes);
719
    return TSDB_CODE_SUCCESS;
2✔
720
  }
721

722
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
2,327!
723
    *needRetry = false;
×
724
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d", pTask->execId, pTask->maxExecTimes);
×
725
    return TSDB_CODE_SUCCESS;
×
726
  }
727

728
  if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) {
2,327!
729
    *needRetry = false;
2,287✔
730
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
2,287!
731
    return TSDB_CODE_SUCCESS;
2,287✔
732
  }
733

734
  /*
735
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
736
      if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
737
        *needRetry = false;
738
        SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
739
                      SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
740
        return TSDB_CODE_SUCCESS;
741
      }
742
    } else {
743
      int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
744

745
      if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
746
        *needRetry = false;
747
        SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
748
                      pTask->candidateIdx, candidateNum);
749
        return TSDB_CODE_SUCCESS;
750
      }
751
    }
752
  */
753

754
  *needRetry = true;
40✔
755
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));
40!
756

757
  return TSDB_CODE_SUCCESS;
40✔
758
}
759

760
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
10✔
761
  (void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
10✔
762

763
  if (pTask->delayTimer) {
10!
764
    UNUSED(taosTmrStop(pTask->delayTimer));
×
765
  }
766

767
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
10✔
768
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
10✔
769

770
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
10!
771
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
×
772
  }
773

774
  schDeregisterTaskHb(pJob, pTask);
10✔
775

776
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
10!
777
    SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
×
778
    if (NULL == addr) {
×
779
      SCH_TASK_ELOG("fail to the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
×
780
                    (int32_t)taosArrayGetSize(pTask->candidateAddrs));
781
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
782
    }
783

784
    SCH_SWITCH_EPSET(addr);
×
785
  } else {
786
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
10!
787
  }
788

789
  SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
10!
790

791
  return TSDB_CODE_SUCCESS;
×
792
}
793

794
int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
453,887✔
795
  int32_t addNum = 0;
453,887✔
796
  int32_t nodeNum = 0;
453,887✔
797

798
  if (pJob->nodeList) {
453,887✔
799
    nodeNum = taosArrayGetSize(pJob->nodeList);
453,880✔
800

801
    for (int32_t i = 0; i < nodeNum; ++i) {
1,104,736✔
802
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
650,870✔
803
      SQueryNodeAddr *naddr = &nload->addr;
650,863✔
804

805
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
1,301,720!
806
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, ERRNO);
×
807
        SCH_ERR_RET(terrno);
×
808
      }
809

810
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
650,857!
811
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
812
                    SCH_GET_CUR_EP(naddr)->port);
813

814
      ++addNum;
650,859✔
815
    }
816
  }
817

818
  if (addNum <= 0) {
453,873✔
819
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
12!
820
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
26!
821
  }
822

823
  return TSDB_CODE_SUCCESS;
453,875✔
824
}
825

826
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
3,877,708✔
827
  if (NULL != pTask->candidateAddrs) {
3,877,708✔
828
    return TSDB_CODE_SUCCESS;
7,484✔
829
  }
830

831
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
3,870,224✔
832
  if (NULL == pTask->candidateAddrs) {
3,875,911!
833
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
×
834
    SCH_ERR_RET(terrno);
×
835
  }
836

837
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
3,876,309✔
838
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
6,846,609!
839
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", ERRNO);
×
840
      SCH_ERR_RET(terrno);
×
841
    }
842

843
    SCH_TASK_TLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
3,424,168!
844

845
    return TSDB_CODE_SUCCESS;
3,423,431✔
846
  }
847

848
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
453,868!
849
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
×
850
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
851
  }
852

853
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
453,868!
854

855
  pTask->candidateIdx = taosRand() % taosArrayGetSize(pTask->candidateAddrs);
453,871✔
856

857
  /*
858
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
859
      tstrncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
860
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
861

862
      ++epSet->numOfEps;
863
    }
864
  */
865

866
  return TSDB_CODE_SUCCESS;
453,906✔
867
}
868

869
#if 0
870
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
871
  int32_t code = TSDB_CODE_SUCCESS;
872
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
873
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
874
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
875
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
876
  }
877

878
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
879
  if (NULL == pAddr) {
880
    SCH_TASK_ELOG("fail to get task 0th condidataAddr, totalNum:%d", (int32_t)taosArrayGetSize(pTask->candidateAddrs));
881
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
882
  }
883

884
  char *origEpset = NULL;
885
  char *newEpset = NULL;
886

887
  SCH_ERR_RET(schDumpEpSet(&pAddr->epSet, &origEpset));
888
  SCH_ERR_JRET(schDumpEpSet(pEpSet, &newEpset));
889

890
  SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
891

892
  TAOS_MEMCPY(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
893

894
_return:
895

896
  taosMemoryFree(origEpset);
897
  taosMemoryFree(newEpset);
898

899
  return code;
900
}
901
#endif
902

903
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
40✔
904
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
40✔
905
  if (candidateNum <= 1) {
40✔
906
    goto _return;
20✔
907
  }
908

909
  switch (schMgmt.cfg.schPolicy) {
20✔
910
    case SCH_LOAD_SEQ:
10✔
911
    case SCH_ALL:
912
    default:
913
      if (++pTask->candidateIdx >= candidateNum) {
10!
914
        pTask->candidateIdx = 0;
10✔
915
      }
916
      break;
10✔
917
    case SCH_RANDOM: {
10✔
918
      int32_t lastIdx = pTask->candidateIdx;
10✔
919
      while (lastIdx == pTask->candidateIdx) {
36✔
920
        pTask->candidateIdx = taosRand() % candidateNum;
26✔
921
      }
922
      break;
10✔
923
    }
924
  }
925

926
_return:
40✔
927

928
  SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum);
40!
929

930
  return TSDB_CODE_SUCCESS;
40✔
931
}
932

933
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
9,137✔
934
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
9,137✔
935
  if (code) {
9,137✔
936
    SCH_TASK_WLOG("task already not in execTask list, code:%x", code);
4,900!
937
  }
938

939
  return TSDB_CODE_SUCCESS;
9,137✔
940
}
941

942
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
1,933,798✔
943
  if (NULL == pTask->execNodes) {
1,933,798✔
944
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
10!
945
    return;
10✔
946
  }
947

948
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
1,933,788✔
949
  if (size <= 0) {
1,933,788✔
950
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
77,186!
951
    return;
77,186✔
952
  }
953

954
  int32_t       i = 0;
1,856,602✔
955
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
1,856,602✔
956
  while (nodeInfo) {
3,713,203✔
957
    if (nodeInfo->handle) {
1,856,604✔
958
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
1,856,334✔
959
      void *pExecId = taosHashGetKey(nodeInfo, NULL);
1,856,334✔
960
      (void)schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId);  // ignore error and continue
1,856,332✔
961

962
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
1,856,333!
963
    } else {
964
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
270!
965
    }
966

967
    ++i;
1,856,602✔
968
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
1,856,602✔
969
  }
970

971
  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
1,856,599!
972
}
973

974
int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) {
36✔
975
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
36✔
976
  if (size <= 0) {
36✔
977
    SCH_TASK_DLOG("task no exec address to notify, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
10!
978
    return TSDB_CODE_SUCCESS;
10✔
979
  }
980

981
  int32_t       i = 0;
26✔
982
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
26✔
983
  while (nodeInfo) {
52✔
984
    if (nodeInfo->handle) {
26!
985
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
26✔
986
      SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_TASK_NOTIFY, &type));
26!
987
      SCH_TASK_DLOG("start to notify %d to task's %dth execNode", type, i);
26!
988
    } else {
989
      SCH_TASK_DLOG("no need to notify %d to task %dth execNode", type, i);
×
990
    }
991

992
    ++i;
26✔
993
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
26✔
994
  }
995

996
  SCH_TASK_DLOG("task has been notified %d on %d exec nodes", type, size);
26!
997
  return TSDB_CODE_SUCCESS;
26✔
998
}
999

1000
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
1,077,785✔
1001
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
1,077,785✔
1002
  SSchTask *pTask = NULL;
1,077,764✔
1003
  SSchJob  *pJob = NULL;
1,077,764✔
1004

1005
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
1,077,764✔
1006
         pEpId->ep.port);
1007

1008
  for (int32_t i = 0; i < taskNum; ++i) {
1,095,966✔
1009
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
18,181✔
1010
    if (NULL == pStatus) {
18,178!
1011
      qError("fail to get the %dth task status in hb rsp, taskNum:%d", i, taskNum);
×
1012
      continue;
×
1013
    }
1014

1015
    int32_t code = 0;
18,178✔
1016

1017
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d task status in server: %s", pStatus->queryId,
18,178✔
1018
           pStatus->clientId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status));
1019

1020
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
18,179✔
1021
      continue;
4,972✔
1022
    }
1023

1024
    if (pStatus->execId != pTask->execId) {
13,204!
1025
      // TODO
1026
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
×
1027
      schProcessOnCbEnd(pJob, pTask, 0);
×
1028
      continue;
×
1029
    }
1030

1031
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
13,204!
1032
      // RECORD AND HANDLE ERROR!!!!
1033
      schProcessOnCbEnd(pJob, pTask, 0);
×
1034
      continue;
×
1035
    }
1036

1037
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
13,204✔
1038
      code = schRescheduleTask(pJob, pTask);
31✔
1039
    }
1040

1041
    schProcessOnCbEnd(pJob, pTask, code);
13,204✔
1042
  }
1043

1044
  return TSDB_CODE_SUCCESS;
1,077,785✔
1045
}
1046

1047
int32_t schHandleExplainRes(SArray *pExplainRes) {
1,476✔
1048
  int32_t code = 0;
1,476✔
1049
  int32_t resNum = taosArrayGetSize(pExplainRes);
1,476✔
1050
  if (resNum <= 0) {
1,476✔
1051
    goto _return;
888✔
1052
  }
1053

1054
  SSchTask *pTask = NULL;
588✔
1055
  SSchJob  *pJob = NULL;
588✔
1056

1057
  for (int32_t i = 0; i < resNum; ++i) {
1,176✔
1058
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
588✔
1059
    if (NULL == localRsp) {
588!
1060
      qError("fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1061
      continue;
×
1062
    }
1063

1064
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg",
588!
1065
           localRsp->qId, localRsp->cId, localRsp->tId);
1066

1067
    pJob = NULL;
588✔
1068
    (void)schAcquireJob(localRsp->rId, &pJob);
588✔
1069
    if (NULL == pJob) {
588!
1070
      qWarn("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " job no exist, may be dropped, refId:0x%" PRIx64,
×
1071
            localRsp->qId, localRsp->cId, localRsp->tId, localRsp->rId);
1072
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
×
1073
    }
1074

1075
    int8_t status = 0;
588✔
1076
    if (schJobNeedToStop(pJob, &status)) {
588!
1077
      SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
×
1078
      (void)schReleaseJob(pJob->refId);
×
1079
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1080
    }
1081

1082
    code = schGetTaskInJob(pJob, localRsp->tId, &pTask);
588✔
1083

1084
    if (TSDB_CODE_SUCCESS == code) {
588!
1085
      code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
588✔
1086
    }
1087

1088
    (void)schReleaseJob(pJob->refId);
588✔
1089

1090
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x",
588!
1091
           localRsp->qId, localRsp->cId, localRsp->tId, code);
1092

1093
    SCH_ERR_JRET(code);
588!
1094

1095
    localRsp->rsp.numOfPlans = 0;
588✔
1096
    localRsp->rsp.subplanInfo = NULL;
588✔
1097
    pTask = NULL;
588✔
1098
  }
1099

1100
_return:
588✔
1101

1102
  for (int32_t i = 0; i < resNum; ++i) {
2,064✔
1103
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
588✔
1104
    if (NULL == localRsp) {
588!
1105
      qError("in _return fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1106
      continue;
×
1107
    }
1108

1109
    tFreeSExplainRsp(&localRsp->rsp);
588✔
1110
  }
1111

1112
  taosArrayDestroy(pExplainRes);
1,476✔
1113

1114
  SCH_RET(code);
1,476!
1115
}
1116

1117
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
3,866,984✔
1118
  SSubplan *plan = pTask->plan;
3,866,984✔
1119
  int32_t   code = 0;
3,866,984✔
1120

1121
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
3,866,984!
1122
    SCH_LOCK(SCH_WRITE, &pTask->planLock);
3,876,013!
1123
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
3,883,497✔
1124
    SCH_UNLOCK(SCH_WRITE, &pTask->planLock);
3,873,943!
1125

1126
    if (TSDB_CODE_SUCCESS != code) {
3,882,539✔
1127
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
10!
1128
                    pTask->msgLen);
1129
      SCH_ERR_RET(code);
10!
1130
    } else if (tsQueryPlannerTrace) {
3,882,529✔
1131
      char   *msg = NULL;
316✔
1132
      int32_t msgLen = 0;
316✔
1133
      SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
316!
1134
      SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
316!
1135
      taosMemoryFree(msg);
316!
1136
    }
1137
  }
1138

1139
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
3,873,500!
1140

1141
  if (SCH_IS_QUERY_JOB(pJob)) {
3,884,148✔
1142
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
1,851,912!
1143
  }
1144

1145
  SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType, NULL));
3,886,175!
1146
}
1147

1148
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
74,973✔
1149
  // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
1150
  if (NULL == schMgmt.queryMgmt) {
74,973✔
1151
    void* p = NULL;
135✔
1152
    SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, &p, NULL));
135!
1153
    if (atomic_val_compare_exchange_ptr(&schMgmt.queryMgmt, NULL, p)) {
135✔
1154
      qWorkerDestroy(&p);
3✔
1155
    }
1156
  }
1157

1158
  SArray *explainRes = NULL;
74,973✔
1159
  int32_t code = 0;
74,973✔
1160
  SQWMsg  qwMsg = {0};
74,973✔
1161
  qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
74,973✔
1162
  qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
74,973✔
1163
  qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
74,973✔
1164
  qwMsg.msg = pTask->plan;
74,973✔
1165
  qwMsg.msgType = pTask->plan->msgType;
74,973✔
1166
  qwMsg.connInfo.handle = pJob->conn.pTrans;
74,973✔
1167
  qwMsg.pWorkerCb = pJob->pWorkerCb;
74,973✔
1168

1169
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
74,973✔
1170
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
588✔
1171
    if (NULL == explainRes) {
588!
1172
      SCH_ERR_RET(terrno);
×
1173
    }
1174
  }
1175

1176
  SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, pJob->seriousId, pJob->queryId, pTask->clientId, pTask->taskId,
74,973!
1177
                                        pJob->refId, pTask->execId, &qwMsg, explainRes));
1178

1179
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
74,975✔
1180
    SCH_ERR_RET(schHandleExplainRes(explainRes));
588!
1181
    explainRes = NULL;
588✔
1182
  }
1183

1184
  SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
74,975!
1185

1186
_return:
74,975✔
1187

1188
  taosArrayDestroy(explainRes);
74,980✔
1189

1190
  SCH_RET(code);
74,980!
1191
}
1192

1193
int32_t schLaunchTaskImpl(void *param) {
3,965,598✔
1194
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
3,965,598✔
1195
  SSchJob     *pJob = NULL;
3,965,598✔
1196

1197
  (void)schAcquireJob(pCtx->jobRid, &pJob);
3,965,598✔
1198
  if (NULL == pJob) {
3,966,314✔
1199
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
28!
1200
    taosMemoryFree(param);
28!
1201
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
28!
1202
  }
1203

1204
  SSchTask *pTask = pCtx->pTask;
3,966,286✔
1205

1206
  if (pCtx->asyncLaunch) {
3,966,286✔
1207
    SCH_LOCK_TASK(pTask);
1,496,722!
1208
  }
1209

1210
  pTask->execId++;
3,961,461✔
1211
  pTask->retryTimes++;
3,961,461✔
1212
  pTask->waitRetry = false;
3,961,461✔
1213

1214
  int8_t  status = 0;
3,961,461✔
1215
  int32_t code = 0;
3,961,461✔
1216

1217
  if (atomic_load_64(&pTask->seriousId) < atomic_load_64(&pJob->seriousId)) {
3,961,461✔
1218
    SCH_TASK_DLOG("task seriousId:0x%" PRIx64 " is smaller than job seriousId:0x%" PRIx64 ", skip launch",
165!
1219
                  pTask->seriousId, pJob->seriousId);
1220
    goto _return;
165✔
1221
  }
1222

1223
  (void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
3,954,020✔
1224

1225
  SCH_TASK_TLOG("start to launch %s task, execId %d, retry %d",
3,965,576!
1226
                SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
1227

1228
  SCH_LOG_TASK_START_TS(pTask);
7,915,193!
1229

1230
  if (schJobNeedToStop(pJob, &status)) {
3,954,607✔
1231
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
3!
1232
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
3!
1233
  }
1234

1235
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
3,966,243✔
1236
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
3,958,995!
1237
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
3,941,620✔
1238
  }
1239

1240
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
3,965,296✔
1241
    SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
74,979!
1242
  } else {
1243
    SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
3,890,317!
1244
  }
1245

1246
#if 0
1247
  if (SCH_IS_QUERY_JOB(pJob)) {
1248
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
1249
  }
1250
#endif
1251

1252
_return:
3,890,281✔
1253

1254
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
3,965,431✔
1255
    if (code) {
1,496,561✔
1256
      code = schProcessOnTaskFailure(pJob, pTask, code);
10✔
1257
    }
1258
    if (code) {
1,496,561✔
1259
      code = schHandleJobFailure(pJob, code);
10✔
1260
    }
1261
  }
1262

1263
  if (pCtx->asyncLaunch) {
3,965,431✔
1264
    SCH_UNLOCK_TASK(pTask);
1,497,219!
1265
  }
1266

1267
  (void)schReleaseJob(pJob->refId);
3,965,396✔
1268

1269
  taosMemoryFree(param);
3,966,507!
1270

1271
  SCH_RET(code);
3,965,741!
1272
}
1273

1274
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
3,966,007✔
1275
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
3,966,007!
1276
  if (NULL == param) {
3,966,447!
1277
    SCH_ERR_RET(terrno);
×
1278
  }
1279

1280
  param->jobRid = pJob->refId;
3,966,447✔
1281
  param->pTask = pTask;
3,966,447✔
1282

1283
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
3,966,447✔
1284
    param->asyncLaunch = true;
1,497,521✔
1285
    SCH_ERR_RET(taosAsyncExec(schLaunchTaskImpl, param, NULL));
1,497,521!
1286
  } else {
1287
    SCH_ERR_RET(schLaunchTaskImpl(param));
2,468,926!
1288
  }
1289

1290
  return TSDB_CODE_SUCCESS;
3,966,403✔
1291
}
1292

1293
// Note: no more error processing, handled in function internal
1294
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
3,966,028✔
1295
  bool    enough = false;
3,966,028✔
1296
  int32_t code = 0;
3,966,028✔
1297

1298
  SCH_SET_TASK_HANDLE(pTask, NULL);
3,966,028✔
1299

1300
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
3,966,028!
1301
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
210!
1302

1303
    if (enough) {
200✔
1304
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
122!
1305
    }
1306
  } else {
1307
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
3,965,818!
1308
  }
1309

1310
  return TSDB_CODE_SUCCESS;
3,966,388✔
1311

1312
_return:
20✔
1313

1314
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
20!
1315
}
1316

1317
void schHandleTimerEvent(void *param, void *tmrId) {
2,743✔
1318
  SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
2,743✔
1319
  SSchTask       *pTask = NULL;
2,743✔
1320
  SSchJob        *pJob = NULL;
2,743✔
1321
  int32_t         code = 0;
2,743✔
1322

1323
  qDebug("delayTimer %" PRIuPTR " is launched", (uintptr_t)tmrId);
2,743!
1324

1325
  int64_t  rId = pTimerParam->rId;
2,743✔
1326
  uint64_t queryId = pTimerParam->queryId;
2,743✔
1327
  uint64_t taskId = pTimerParam->taskId;
2,743✔
1328

1329
  if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, taskId)) {
2,743✔
1330
    return;
10✔
1331
  }
1332

1333
  if (0 == atomic_load_8(&pTask->delayLaunchPar.exit)) {
2,733!
1334
    code = schLaunchTask(pJob, pTask);
2,733✔
1335
  } else {
1336
    SCH_TASK_DLOG("task will not be launched since query job exiting, status: %d", pTask->status);
×
1337
  }
1338

1339
  schProcessOnCbEnd(pJob, pTask, code);
2,733✔
1340
}
1341

1342
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
3,966,473✔
1343
  if (pTask->delayExecMs > 0) {
3,966,473✔
1344
    pTask->delayLaunchPar.rId = pJob->refId;
3,161✔
1345
    pTask->delayLaunchPar.queryId = pJob->queryId;
3,161✔
1346
    pTask->delayLaunchPar.taskId = pTask->taskId;
3,161✔
1347

1348
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
3,161!
1349
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
3,161✔
1350

1351
    if (NULL == pTask->delayTimer) {
3,161✔
1352
      pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer);
276✔
1353
      if (NULL == pTask->delayTimer) {
276✔
1354
        SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
10!
1355
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
10!
1356
      }
1357

1358
      SCH_TASK_DLOG("task delayTimer %" PRIuPTR " is started", (uintptr_t)pTask->delayTimer);
266!
1359

1360
      return TSDB_CODE_SUCCESS;
266✔
1361
    }
1362

1363
    if (taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer, &pTask->delayTimer)) {
2,885!
1364
      SCH_TASK_ELOG("taosTmrReset delayExec timer failed, handle:%p", schMgmt.timer);
×
1365
    }
1366

1367
    return TSDB_CODE_SUCCESS;
2,885✔
1368
  }
1369

1370
  SCH_RET(schLaunchTask(pJob, pTask));
3,963,312!
1371
}
1372

1373
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
2,659,841✔
1374
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
2,659,841!
1375

1376
  for (int32_t i = 0; i < level->taskNum; ++i) {
6,013,365✔
1377
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
3,352,686✔
1378
    pTask->failedSeriousId = pJob->seriousId - 1;
3,352,865✔
1379
    pTask->seriousId = pJob->seriousId;
3,352,865✔
1380
    
1381
    SCH_TASK_TLOG("task seriousId set to 0x%" PRIx64, pTask->seriousId);
3,352,865!
1382

1383
    SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
3,352,865!
1384
  }
1385

1386
  return TSDB_CODE_SUCCESS;
2,660,679✔
1387
}
1388

1389
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
2,771,026✔
1390
  if (!SCH_JOB_NEED_DROP(pJob)) {
2,771,026✔
1391
    return;
1,977,185✔
1392
  }
1393

1394
  void *pIter = taosHashIterate(list, NULL);
793,841✔
1395
  while (pIter) {
2,718,550✔
1396
    SSchTask *pTask = *(SSchTask **)pIter;
1,924,663✔
1397

1398
    if (pTask->delayTimer) {
1,924,663✔
1399
      schStopTaskDelayTimer(pJob, pTask, true);
194✔
1400
    }
1401

1402
    SCH_LOCK_TASK(pTask);
1,924,663!
1403
    schDropTaskOnExecNode(pJob, pTask);
1,924,664✔
1404
    SCH_UNLOCK_TASK(pTask);
1,924,662!
1405

1406
    pIter = taosHashIterate(list, pIter);
1,924,663✔
1407
  }
1408
}
1409

1410
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pCurrTask) {
2✔
1411
  int32_t code = TSDB_CODE_SUCCESS;
2✔
1412

1413
  SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type));
2!
1414

1415
  void *pIter = taosHashIterate(list, NULL);
2✔
1416
  while (pIter) {
28✔
1417
    SSchTask *pTask = *(SSchTask **)pIter;
26✔
1418
    if (pTask != pCurrTask) {
26✔
1419
      SCH_LOCK_TASK(pTask);
24!
1420
      code = schNotifyTaskOnExecNode(pJob, pTask, type);
24✔
1421
      SCH_UNLOCK_TASK(pTask);
24!
1422

1423
      if (TSDB_CODE_SUCCESS != code) {
24!
1424
        break;
×
1425
      }
1426
    }
1427

1428
    pIter = taosHashIterate(list, pIter);
26✔
1429
  }
1430

1431
  SCH_RET(code);
2!
1432
}
1433

1434
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
743,731✔
1435
  SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL));
743,731!
1436
}
1437

1438
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
83,519✔
1439
  void   *pRsp = NULL;
83,519✔
1440
  int32_t code = 0;
83,519✔
1441
  SArray *explainRes = NULL;
83,519✔
1442

1443
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
83,519✔
1444
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
898✔
1445
    if (NULL == explainRes) {
898!
1446
      SCH_ERR_RET(terrno);
×
1447
    }
1448
  }
1449

1450
  SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, pJob->seriousId, pJob->queryId, pTask->clientId, pTask->taskId,
83,519!
1451
                                        pJob->refId, pTask->execId, &pRsp, explainRes));
1452

1453
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
83,509✔
1454
    SCH_ERR_RET(schHandleExplainRes(explainRes));
888!
1455
    explainRes = NULL;
888✔
1456
  }
1457

1458
  SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
83,509!
1459

1460
_return:
83,509✔
1461

1462
  taosArrayDestroy(explainRes);
83,519✔
1463

1464
  SCH_RET(code);
83,519!
1465
}
1466

1467
// Note: no more error processing, handled in function internal
1468
int32_t schLaunchFetchTask(SSchJob *pJob) {
827,260✔
1469
  int32_t code = 0;
827,260✔
1470

1471
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
827,260✔
1472
  if (fetchRes) {
827,260✔
1473
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
10!
1474
    return TSDB_CODE_SUCCESS;
10✔
1475
  }
1476

1477
  SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH);
827,250✔
1478

1479
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
827,250!
1480
    SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
83,519!
1481
  } else {
1482
    SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
743,731!
1483
  }
1484

1485
  return TSDB_CODE_SUCCESS;
827,240✔
1486

1487
_return:
10✔
1488

1489
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
10!
1490
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc