• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.04
/source/libs/scheduler/src/schTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "query.h"
17
#include "qworker.h"
18
#include "schInt.h"
19
#include "tglobal.h"
20
#include "tmsg.h"
21
#include "trpc.h"
22
#include "tmisce.h"
23

24
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
168,362✔
25
  schDeregisterTaskHb(pJob, pTask);
168,362✔
26

27
  if (pTask->candidateAddrs) {
168,364✔
28
    taosArrayDestroy(pTask->candidateAddrs);
164,513✔
29
  }
30

31
  taosMemoryFreeClear(pTask->msg);
168,350!
32

33
  if (pTask->children) {
168,339✔
34
    taosArrayDestroy(pTask->children);
8,461✔
35
  }
36

37
  if (pTask->parents) {
168,339✔
38
    taosArrayDestroy(pTask->parents);
22,789✔
39
  }
40

41
  if (pTask->execNodes) {
168,339!
42
    taosHashCleanup(pTask->execNodes);
168,351✔
43
  }
44

45
  taosArrayDestroy(pTask->profile.execTime);
168,328✔
46
}
168,358✔
47

48
void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
168,403✔
49
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
168,403!
50
    pTask->maxRetryTimes = SCH_DEFAULT_MAX_RETRY_NUM * 2;
168,373✔
51
  } else {
52
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
30✔
53
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM);
30✔
54
  }
55

56
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
168,403✔
57
}
168,403✔
58

59
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
168,355✔
60
  int32_t code = 0;
168,355✔
61

62
  pTask->plan = pPlan;
168,355✔
63
  pTask->level = pLevel;
168,355✔
64
  pTask->seriesId = pJob->seriesId;
168,355✔
65
  pTask->execId = -1;
168,355✔
66
  pTask->failedExecId = -2;
168,355✔
67
  pTask->failedSeriesId = 0;
168,355✔
68
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
168,355✔
69
  pTask->clientId = getClientId();
168,355✔
70
  pTask->taskId = schGenTaskId();
168,378✔
71

72
  schInitTaskRetryTimes(pJob, pTask, pLevel);
168,495✔
73

74
  pTask->execNodes =
168,479✔
75
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
168,401✔
76
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
168,479✔
77
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
168,433!
78
    SCH_ERR_JRET(terrno);
×
79
  }
80

81
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
168,443✔
82

83
  SCH_TASK_TLOG("task initialized, max times %d:%d", pTask->maxRetryTimes, pTask->maxExecTimes);
168,493!
84

85
  return TSDB_CODE_SUCCESS;
168,496✔
86

87
_return:
×
88

89
  taosArrayDestroy(pTask->profile.execTime);
×
90
  taosHashCleanup(pTask->execNodes);
×
91

92
  SCH_RET(code);
×
93
}
94

95
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
163,613✔
96
  char buf[256] = {0};
163,613✔
97
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
163,613!
98
    return TSDB_CODE_SUCCESS;
×
99
  }
100

101
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
163,613✔
102
  if (NULL == addr) {
163,676✔
103
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
30!
104
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
105
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
30!
106
  }
107

108
  pTask->succeedAddr = *addr;
163,646✔
109
  epsetToStr(&addr->epSet, buf, tListLen(buf));
163,646✔
110

111
  SCH_TASK_DLOG("recode the success addr, %s", buf);
163,614!
112
  return TSDB_CODE_SUCCESS;
163,596✔
113
}
114

115
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
36,799✔
116
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
36,799!
117

118
  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
36,799!
119
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", ERRNO);
×
120
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
121
  }
122

123
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
36,808!
124

125
  return TSDB_CODE_SUCCESS;
36,803✔
126
}
127

128
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
90✔
129
  if (NULL == pTask->execNodes) {
90✔
130
    return TSDB_CODE_SUCCESS;
30✔
131
  }
132

133
  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
60✔
134
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
30!
135
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
30!
136
  } else {
137
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
30!
138
  }
139

140
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
30!
141
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
30!
142
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
30!
143
  }
144

145
  return TSDB_CODE_SUCCESS;
×
146
}
147

148
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
54,458✔
149
  if (taosHashGetSize(pTask->execNodes) <= 0) {
54,458✔
150
    return TSDB_CODE_SUCCESS;
30✔
151
  }
152

153
  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
54,429✔
154
  if (NULL == nodeInfo) {  // ignore it
54,432!
155
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
×
156
                  pTask->execId, pTask->waitRetry);
157
    return TSDB_CODE_SUCCESS;
×
158
  }
159

160
  nodeInfo->handle = handle;
54,432✔
161

162
  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);
54,432!
163

164
  return TSDB_CODE_SUCCESS;
54,431✔
165
}
166

167
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, uint64_t seriesId, int32_t execId) {
54,459✔
168
  if (dropExecNode) {
54,459!
169
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
×
170
  }
171

172
  SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId));
54,459!
173

174
  if ((seriesId != pTask->seriesId || seriesId <= pTask->failedSeriesId) || 
54,461!
175
      (execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) {  // ignore it
54,402!
176
    SCH_TASK_DLOG("handle not updated since seriesId:0x%" PRIx64 " or execId:%d is not lastest,"
59!
177
                  "current seriesId:0x%" PRIx64 " execId %d, failedSeriesId:0x%" PRIx64 " failedExecId:%d, waitRetry %d", 
178
                  seriesId, execId, pTask->seriesId, pTask->execId, pTask->failedSeriesId, pTask->failedExecId, pTask->waitRetry);
179
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
58!
180
  }
181

182
  SCH_SET_TASK_HANDLE(pTask, handle);
54,400✔
183

184
  return TSDB_CODE_SUCCESS;
54,400✔
185
}
186

187
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
1,477✔
188
  bool    needRetry = false;
1,477✔
189
  bool    moved = false;
1,477✔
190
  int32_t taskDone = 0;
1,477✔
191
  int8_t  jobStatus = 0;
1,477✔
192

193
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
1,477✔
194
    return TSDB_CODE_SCH_IGNORE_ERROR;
60✔
195
  }
196

197
  pTask->failedExecId = pTask->execId;
1,417✔
198
  if (schJobNeedToStop(pJob, &jobStatus)) {
1,417✔
199
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
360!
200
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
360!
201
  }
202

203
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
1,057✔
204
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
1,057!
205
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
60!
206
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
60!
207
  }
208

209
  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
997✔
210
    SCH_LOG_TASK_WAIT_TS(pTask);
30!
211
  } else {
212
    SCH_LOG_TASK_END_TS(pTask);
967✔
213
  }
214

215
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
997!
216

217
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
997!
218

219
  if (!needRetry) {
997✔
220
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
967!
221
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);
967✔
222

223
    if (SCH_JOB_NEED_WAIT(pJob)) {
967✔
224
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
929!
225
      pTask->level->taskFailed++;
929✔
226
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
929✔
227
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
929!
228

229
      schUpdateJobErrCode(pJob, errCode);
929✔
230

231
      if (taskDone < pTask->level->taskNum) {
929!
232
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
×
233
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
234
      }
235

236
      SCH_RET(atomic_load_32(&pJob->errCode));
929!
237
    }
238
  } else {
239
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
30!
240
    return TSDB_CODE_SUCCESS;
×
241
  }
242

243
  SCH_RET(errCode);
38!
244
}
245

246
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
163,587✔
247
  bool    moved = false;
163,587✔
248
  int32_t code = 0;
163,587✔
249

250
  SCH_TASK_TLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
163,587!
251

252
  SCH_LOG_TASK_END_TS(pTask);
163,612✔
253

254
  int32_t taskDone = atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1);
163,653✔
255

256
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
163,650✔
257

258
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
163,660!
259
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
163,601!
260

261
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
163,598✔
262
  if (parentNum == 0) {
163,598✔
263
    int32_t taskDone = 0;
143,466✔
264
    if (SCH_JOB_NEED_WAIT(pJob)) {
143,466✔
265
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
126,870!
266
      pTask->level->taskSucceed++;
126,864✔
267
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
126,864✔
268
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
126,864!
269

270
      if (taskDone < pTask->level->taskNum) {
126,835✔
271
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
7,573!
272
        return TSDB_CODE_SUCCESS;
7,572✔
273
      }
274
      
275
      SCH_TASK_TLOG("taskDone number reach level task number, done:%d, total:%d", taskDone, pTask->level->taskNum);
119,262!
276

277
      if (pTask->level->taskFailed > 0) {
119,250!
278
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
×
279
      }
280

281
      SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
119,250!
282
    }
283
    
284
    pJob->resNode = pTask->succeedAddr;
16,596✔
285
    pJob->fetchTask = pTask;
16,596✔
286

287
    SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
16,596!
288
  }
289

290
  for (int32_t i = 0; i < parentNum; ++i) {
40,303✔
291
    SSchTask *pParent = *(SSchTask **)taosArrayGet(pTask->parents, i);
20,170✔
292
    if (NULL == pParent) {
20,170!
293
      SCH_TASK_ELOG("fail to get task %d pParent, parentNum:%d", i, parentNum);
×
294
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
295
    }
296

297
    SCH_LOCK(SCH_WRITE, &pParent->planLock);
20,170!
298
    SDownstreamSourceNode source = {
60,513✔
299
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
300
        .clientId = pTask->clientId,
20,171✔
301
        .taskId = pTask->taskId,
20,171✔
302
        .sId = pTask->seriesId,
20,171✔
303
        .execId = pTask->execId,
20,171✔
304
        .addr = pTask->succeedAddr,
20,171✔
305
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
20,171✔
306
        .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
20,171!
307
    };
308

309
    code = qSetSubplanExecutionNode(pParent->plan, pTask->plan->id.groupId, &source);
20,171✔
310
    if (TSDB_CODE_SUCCESS != code) {
20,171!
311
      SCH_TASK_ELOG("qSetSubplanExecutionNode failed, groupId: %d", pTask->plan->id.groupId);
×
312
    }
313

314
    SCH_UNLOCK(SCH_WRITE, &pParent->planLock);
20,171!
315
    SCH_ERR_RET(code);
20,171!
316

317
    int32_t readyNum = atomic_add_fetch_32(&pParent->childReady, 1);
20,171✔
318

319
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, pParent)) {
20,171✔
320
      // this is a redirect process for parent task, since the original pParent task has already failed before.
321
      // TODO refactor optimize: update the candidate address
322
      // set the address from the pTask->succeedAddr, the vnode that successfully executed subquery already
323
      if (pParent->redirectCtx.inRedirect && (!SCH_IS_DATA_BIND_TASK(pParent))) {
7,204!
324
        schSwitchTaskCandidateAddr(pJob, pParent);
×
325
      }
326

327
      SCH_TASK_DLOG("all %d children task done, start to launch parent task, TID:0x%" PRIx64, readyNum, pParent->taskId);
7,204!
328

329
      pParent->seriesId = pJob->seriesId;
7,204✔
330
      TSWAP(pTask, pParent);
7,204✔
331
      SCH_TASK_TLOG("task seriesId set to 0x%" PRIx64, pTask->seriesId);
7,204!
332
      TSWAP(pTask, pParent);
7,204✔
333

334
      SCH_ERR_RET(schDelayLaunchTask(pJob, pParent));
7,204!
335
    }
336
  }
337

338
  if (taskDone == pTask->level->taskNum) {
20,133✔
339
    SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));
7,204!
340
  }
341

342
  return TSDB_CODE_SUCCESS;
20,133✔
343
}
344

345
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
91✔
346
  if (!schMgmt.cfg.enableReSchedule) {
91✔
347
    return TSDB_CODE_SUCCESS;
1✔
348
  }
349

350
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
90!
351
    return TSDB_CODE_SUCCESS;
60✔
352
  }
353

354
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
30!
355
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
×
356
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
×
357
    schDropTaskOnExecNode(pJob, pTask);
×
358
    taosHashClear(pTask->execNodes);
×
359

360
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
×
361
  }
362

363
  return TSDB_CODE_SUCCESS;
30✔
364
}
365

366
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode) {
60✔
367
  SSchRedirectCtx *pCtx = &pTask->redirectCtx;
60✔
368

369
  if (!pCtx->inRedirect) {
60✔
370
    pCtx->inRedirect = true;
30✔
371
    pCtx->periodMs = tsRedirectPeriod;
30✔
372
    pCtx->startTs = taosGetTimestampMs();
30✔
373

374
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
30!
375
      if (pEpSet) {
30!
376
        pCtx->roundTotal = pEpSet->numOfEps;
30✔
377
      } else if (pTask->candidateAddrs && taosArrayGetSize(pTask->candidateAddrs) > 0) {
×
378
        SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
×
379
        pCtx->roundTotal = pAddr->epSet.numOfEps;
×
380
      } else {
381
        pCtx->roundTotal = SCH_DEFAULT_RETRY_TOTAL_ROUND;
×
382
      }
383
    } else {
384
      // for not data bind task, we still needs to retry for at least SCH_DEFAULT_RETRY_TOTAL_ROUND times.
385
      pCtx->roundTotal = SCH_DEFAULT_RETRY_TOTAL_ROUND;
×
386
    }
387

388
    goto _return;
30✔
389
  }
390

391
  pCtx->totalTimes++;
30✔
392
  pCtx->roundTimes++;
30✔
393

394
  if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
30!
395
    pCtx->roundTotal = pEpSet->numOfEps;
×
396
  }
397

398
  SCH_TASK_DLOG("job ctx roundTotal:%d, roundTimes:%d, totalTimes:%d", pCtx->roundTotal, pCtx->roundTimes, pCtx->totalTimes);
30!
399
  if (pCtx->roundTimes >= pCtx->roundTotal) {
30!
400
    int64_t nowTs = taosGetTimestampMs();
×
401
    int64_t lastTime = nowTs - pCtx->startTs;
×
402

403
    if (lastTime > tsMaxRetryWaitTime) {
×
404
      SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 " duration:%.2fs, max:%ds, total:%d",
×
405
                    nowTs, pCtx->startTs, lastTime/1000.0, tsMaxRetryWaitTime/1000, pCtx->totalTimes);
406
      pJob->noMoreRetry = true;
×
407
      SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode));
×
408
    }
409

410
    pCtx->periodMs *= tsRedirectFactor;
×
411
    if (pCtx->periodMs > tsRedirectMaxPeriod) {
×
412
      pCtx->periodMs = tsRedirectMaxPeriod;
×
413
    }
414

415
    if (SCH_IS_DATA_BIND_TASK(pTask)) {    
×
416
      int64_t leftTime = tsMaxRetryWaitTime - lastTime;
×
417
      pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
×
418
    }
419
    
420
    pCtx->roundTimes = 0;
×
421
    goto _return;
×
422
  }
423

424
  pTask->delayExecMs = 0;
30✔
425

426
_return:
60✔
427

428
  SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%.2fs", pCtx->roundTimes, pCtx->roundTotal,
60!
429
                pCtx->totalTimes, pTask->delayExecMs/1000.0);
430
  return TSDB_CODE_SUCCESS;
60✔
431
}
432

433
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
×
434
  pTask->waitRetry = true;
×
435

436
  if (pTask->delayTimer) {
×
437
    UNUSED(taosTmrStop(pTask->delayTimer));
×
438
  }
439

440
  schDropTaskOnExecNode(pJob, pTask);
×
441
  taosHashClear(pTask->execNodes);
×
442
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
×
443
  schDeregisterTaskHb(pJob, pTask);
×
444
  taosMemoryFreeClear(pTask->msg);
×
445

446
  pTask->msgLen = 0;
×
447
  pTask->lastMsgType = 0;
×
448
  pTask->childReady = 0;
×
449
  TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
×
450
}
×
451

452
#if 0
453

454
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
455
  int32_t code = 0;
456

457
  SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
458

459
  if (!NO_RET_REDIRECT_ERROR(rspCode)) {
460
    SCH_UPDATE_REDIRECT_CODE(pJob, rspCode);
461
  }
462

463
  SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
464

465
  schResetTaskForRetry(pJob, pTask);
466

467
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
468
    if (pData && pData->pEpSet) {
469
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
470
    } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
471
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
472
      if (NULL == addr) {
473
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
474
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
475
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
476
      }
477

478
      SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
479
      SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
480
                    addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
481
    } else {
482
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
483
      if (NULL == addr) {
484
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
485
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
486
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
487
      }
488

489
      SCH_SWITCH_EPSET(addr);
490
      SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
491
    }
492

493
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
494

495
    SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
496

497
    return TSDB_CODE_SUCCESS;
498
  }
499

500
  // merge plan
501

502
  pTask->childReady = 0;
503

504
  qClearSubplanExecutionNode(pTask->plan);
505

506
  // Note: current error task and upper level merge task
507
  if ((pData && 0 == pData->len) || NULL == pData) {
508
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
509
  }
510

511
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
512

513
  int32_t childrenNum = taosArrayGetSize(pTask->children);
514
  for (int32_t i = 0; i < childrenNum; ++i) {
515
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
516
    SCH_LOCK_TASK(pChild);
517
    (void)schDoTaskRedirect(pJob, pChild, NULL, rspCode);  // error handled internal
518
    SCH_UNLOCK_TASK(pChild);
519
  }
520

521
  return TSDB_CODE_SUCCESS;
522

523
_return:
524

525
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
526
}
527

528
int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) {
529
  SSchLevel *pLevel = pTask->level;
530

531
  SCH_TASK_DLOG("start to reset level for current task set, execDone:%d, launched:%d",
532
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
533

534
  if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
535
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
536
  }
537

538
  (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
539

540
  int32_t childrenNum = taosArrayGetSize(pTask->children);
541
  for (int32_t i = 0; i < childrenNum; ++i) {
542
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
543
    if (NULL == pChild) {
544
      SCH_TASK_ELOG("fail to get the %dth child, childrenNum:%d", i, childrenNum);
545
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
546
    }
547

548
    SCH_LOCK_TASK(pChild);
549
    pLevel = pChild->level;
550
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
551
    (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
552
    SCH_UNLOCK_TASK(pChild);
553
  }
554

555
  SCH_TASK_DLOG("end to reset level for current task set, execDone:%d, launched:%d",
556
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
557

558
  return TSDB_CODE_SUCCESS;
559
}
560

561
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
562
  int32_t code = 0;
563

564
  SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));
565

566
  if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
567
    if (NULL == pData->pEpSet) {
568
      SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
569
      code = TSDB_CODE_INVALID_MSG;
570
      goto _return;
571
    }
572
  }
573

574
  SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
575

576
  SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask));
577

578
  SCH_RESET_JOB_LEVEL_IDX(pJob);
579
  atomic_add_fetch_64(&pJob->seriesId, 1);
580

581
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
582

583
  taosMemoryFreeClear(pData->pData);
584
  taosMemoryFreeClear(pData->pEpSet);
585

586
  SCH_RET(code);
587

588
_return:
589

590
  taosMemoryFreeClear(pData->pData);
591
  taosMemoryFreeClear(pData->pEpSet);
592

593
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
594
}
595
#endif
596

597
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
164,300✔
598
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
164,300✔
599
  if (0 != code) {
164,651✔
600
    if (HASH_NODE_EXIST(code)) {
60✔
601
      SCH_TASK_DLOG("task already in execTask list, code:0x%x", code);
30!
602
      return TSDB_CODE_SUCCESS;
30✔
603
    }
604

605
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, code:0x%x", code);
30!
606
    SCH_ERR_RET(code);
30!
607
  }
608

609
  SCH_TASK_TLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
164,591!
610

611
  return TSDB_CODE_SUCCESS;
164,435✔
612
}
613

614
/*
615
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
616
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
617
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
618
  } else {
619
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
620
  }
621

622
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
623
  if (0 != code) {
624
    if (HASH_NODE_EXIST(code)) {
625
      *moved = true;
626
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
627
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
628
    }
629

630
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", ERRNO);
631
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
632
  }
633

634
  *moved = true;
635

636
  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
637

638
  return TSDB_CODE_SUCCESS;
639
}
640

641
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
642
  *moved = false;
643

644
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
645
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
646
  }
647

648
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
649
  if (0 != code) {
650
    if (HASH_NODE_EXIST(code)) {
651
      *moved = true;
652

653
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
654
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
655
    }
656

657
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", ERRNO);
658
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
659
  }
660

661
  *moved = true;
662

663
  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
664

665
  return TSDB_CODE_SUCCESS;
666
}
667

668
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
669
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
670
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
671
  }
672

673
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
674
  if (0 != code) {
675
    if (HASH_NODE_EXIST(code)) {
676
      *moved = true;
677

678
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
679
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
680
    }
681

682
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", ERRNO);
683
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
684
  }
685

686
  *moved = true;
687

688
  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
689

690
  return TSDB_CODE_SUCCESS;
691
}
692
*/
693

694
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
1,087✔
695
  if (pJob->noMoreRetry) {
1,087!
696
    *needRetry = false;
×
697
    SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes,
×
698
                  pTask->maxRetryTimes);
699
    return TSDB_CODE_SUCCESS;
×
700
  }
701

702
  // handle transport time out issue
703
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
1,087✔
704
    pTask->maxExecTimes++;
120✔
705
    pTask->maxRetryTimes++;
120✔
706
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
120✔
707
      pTask->timeoutUsec *= 2;
60✔
708
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
60✔
709
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
30✔
710
      }
711
    }
712
  }
713

714
  if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) {
1,087!
715
    *needRetry = false;
×
716
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes,
×
717
                  pTask->maxRetryTimes);
718
    return TSDB_CODE_SUCCESS;
×
719
  }
720

721
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
1,087!
722
    *needRetry = false;
×
723
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d", pTask->execId, pTask->maxExecTimes);
×
724
    return TSDB_CODE_SUCCESS;
×
725
  }
726

727
  if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) {
1,087!
728
    *needRetry = false;
967✔
729
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
967!
730
    return TSDB_CODE_SUCCESS;
967✔
731
  }
732

733
  *needRetry = true;
120✔
734

735
  SCH_TASK_DLOG("task need the %d/%d retry, errCode:%x - %s", pTask->execId + 1, pTask->maxRetryTimes, errCode, tstrerror(errCode));
120!
736
  return TSDB_CODE_SUCCESS;
120✔
737
}
738

739
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
30✔
740
  char buf[256] = {0};
30✔
741

742
  (void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
30✔
743

744
  if (pTask->delayTimer) {
30!
745
    UNUSED(taosTmrStop(pTask->delayTimer));
×
746
  }
747

748
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
30✔
749
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
30✔
750

751
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
30!
752
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
×
753
  }
754

755
  schDeregisterTaskHb(pJob, pTask);
30✔
756

757
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
30!
758
    SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
×
759
    if (NULL == pAddr) {
×
760
      SCH_TASK_ELOG("fail to the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
×
761
                    (int32_t)taosArrayGetSize(pTask->candidateAddrs));
762
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
763
    }
764

765
    // switch to the next ep in the epset
766
    SCH_SWITCH_EPSET(pAddr);
×
767
    int32_t ret = epsetToStr(&pAddr->epSet, buf, tListLen(buf));
×
768
    if (ret != 0) {  // print error and continue
×
769
      SCH_TASK_ELOG("failed to print vgId:%d epset, code:%s", pAddr->nodeId, tstrerror(ret));
×
770
    }
771

772
    // Wait for a while since the vnode leader/follower switch may cost from several seconds 
773
    // to serveral minitues to complete.
774
    if (pTask->delayExecMs == 0) {
×
775
      pTask->delayExecMs = 500;  // 0.5s by default
×
776
    } else {
777
      pTask->delayExecMs = TMIN(pTask->delayExecMs * tsRedirectFactor * 1.5, tsMaxRetryWaitTime);
×
778
    }
779

780
    SCH_TASK_DLOG("vgId:%d switch to next ep in %s to start task delay:%.2fs", pAddr->nodeId, buf,
×
781
                  pTask->delayExecMs / 1000.0);
782
  } else {
783
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
30!
784
  }
785

786
  SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
30!
787
  return TSDB_CODE_SUCCESS;
×
788
}
789

790
int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
3,252✔
791
  int32_t addNum = 0;
3,252✔
792
  int32_t nodeNum = 0;
3,252✔
793

794
  if (pJob->nodeList) {
3,252✔
795
    nodeNum = taosArrayGetSize(pJob->nodeList);
3,222✔
796

797
    for (int32_t i = 0; i < nodeNum; ++i) {
11,022✔
798
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
7,800✔
799
      SQueryNodeAddr *naddr = &nload->addr;
7,800✔
800

801
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
15,600!
802
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, ERRNO);
×
803
        SCH_ERR_RET(terrno);
×
804
      }
805

806
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
7,800!
807
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
808
                    SCH_GET_CUR_EP(naddr)->port);
809

810
      ++addNum;
7,800✔
811
    }
812
  }
813

814
  if (addNum <= 0) {
3,252✔
815
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
30!
816
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
30!
817
  }
818

819
  return TSDB_CODE_SUCCESS;
3,222✔
820
}
821

822
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
164,109✔
823
  if (NULL != pTask->candidateAddrs) {
164,109!
824
    return TSDB_CODE_SUCCESS;
×
825
  }
826

827
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
164,109✔
828
  if (NULL == pTask->candidateAddrs) {
164,517!
829
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
×
830
    SCH_ERR_RET(terrno);
×
831
  }
832

833
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
164,536✔
834
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
322,590!
835
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", ERRNO);
×
836
      SCH_ERR_RET(terrno);
×
837
    }
838

839
    SCH_TASK_TLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
161,276!
840

841
    return TSDB_CODE_SUCCESS;
161,225✔
842
  }
843

844
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
3,222!
845
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
×
846
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
847
  }
848

849
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
3,222!
850

851
  pTask->candidateIdx = taosRand() % taosArrayGetSize(pTask->candidateAddrs);
3,222✔
852

853
  /*
854
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
855
      tstrncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
856
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
857

858
      ++epSet->numOfEps;
859
    }
860
  */
861

862
  return TSDB_CODE_SUCCESS;
3,222✔
863
}
864

865
#if 0
866
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
867
  int32_t code = TSDB_CODE_SUCCESS;
868
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
869
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
870
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
871
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
872
  }
873

874
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
875
  if (NULL == pAddr) {
876
    SCH_TASK_ELOG("fail to get task 0th condidataAddr, totalNum:%d", (int32_t)taosArrayGetSize(pTask->candidateAddrs));
877
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
878
  }
879

880
  char *origEpset = NULL;
881
  char *newEpset = NULL;
882

883
  SCH_ERR_RET(schDumpEpSet(&pAddr->epSet, &origEpset));
884
  SCH_ERR_JRET(schDumpEpSet(pEpSet, &newEpset));
885

886
  SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
887

888
  TAOS_MEMCPY(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
889

890
_return:
891

892
  taosMemoryFree(origEpset);
893
  taosMemoryFree(newEpset);
894

895
  return code;
896
}
897
#endif
898

899
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
120✔
900
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
120✔
901
  if (candidateNum <= 1) {
120✔
902
    goto _return;
60✔
903
  }
904

905
  switch (schMgmt.cfg.schPolicy) {
60✔
906
    case SCH_LOAD_SEQ:
30✔
907
    case SCH_ALL:
908
    default:
909
      if (++pTask->candidateIdx >= candidateNum) {
30!
910
        pTask->candidateIdx = 0;
30✔
911
      }
912
      break;
30✔
913
    case SCH_RANDOM: {
30✔
914
      int32_t lastIdx = pTask->candidateIdx;
30✔
915
      while (lastIdx == pTask->candidateIdx) {
105✔
916
        pTask->candidateIdx = taosRand() % candidateNum;
75✔
917
      }
918
      break;
30✔
919
    }
920
  }
921

922
_return:
120✔
923
  SCH_TASK_DLOG("switch task exec on candiateIdx:%d/%d", pTask->candidateIdx, candidateNum);
120!
924
  return TSDB_CODE_SUCCESS;
120✔
925
}
926

927
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
30✔
928
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
30✔
929
  if (code) {
30!
930
    SCH_TASK_WLOG("task already not in execTask list, code:%x", code);
30!
931
  }
932

933
  return TSDB_CODE_SUCCESS;
30✔
934
}
935

936
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
36,807✔
937
  if (NULL == pTask->execNodes) {
36,807✔
938
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
30!
939
    return;
30✔
940
  }
941

942
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
36,777✔
943
  if (size <= 0) {
36,755!
944
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
×
945
    return;
×
946
  }
947

948
  int32_t       i = 0;
36,755✔
949
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
36,755✔
950
  while (nodeInfo) {
73,525✔
951
    if (nodeInfo->handle) {
36,768✔
952
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
35,958✔
953
      void *pExecId = taosHashGetKey(nodeInfo, NULL);
35,958✔
954
      (void)schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId);  // ignore error and continue
35,925✔
955

956
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
35,962!
957
    } else {
958
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
810!
959
    }
960

961
    ++i;
36,772✔
962
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
36,772✔
963
  }
964

965
  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
36,757!
966
}
967

968
int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) {
30✔
969
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
30✔
970
  if (size <= 0) {
30!
971
    SCH_TASK_DLOG("task no exec address to notify, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
30!
972
    return TSDB_CODE_SUCCESS;
30✔
973
  }
974

975
  int32_t       i = 0;
×
976
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
×
977
  while (nodeInfo) {
×
978
    if (nodeInfo->handle) {
×
979
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
×
980
      SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_TASK_NOTIFY, &type));
×
981
      SCH_TASK_DLOG("start to notify %d to task's %dth execNode", type, i);
×
982
    } else {
983
      SCH_TASK_DLOG("no need to notify %d to task %dth execNode", type, i);
×
984
    }
985

986
    ++i;
×
987
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
×
988
  }
989

990
  SCH_TASK_DLOG("task has been notified %d on %d exec nodes", type, size);
×
991
  return TSDB_CODE_SUCCESS;
×
992
}
993

994
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
22,132✔
995
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
22,132✔
996
  SSchTask *pTask = NULL;
22,131✔
997
  SSchJob  *pJob = NULL;
22,131✔
998

999
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
22,131✔
1000
         pEpId->ep.port);
1001

1002
  for (int32_t i = 0; i < taskNum; ++i) {
22,668✔
1003
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
535✔
1004
    if (NULL == pStatus) {
535!
1005
      qError("fail to get the %dth task status in hb rsp, taskNum:%d", i, taskNum);
×
1006
      continue;
×
1007
    }
1008

1009
    int32_t code = 0;
535✔
1010

1011
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d task status in server: %s", pStatus->queryId,
535✔
1012
           pStatus->clientId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status));
1013

1014
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
535✔
1015
      continue;
40✔
1016
    }
1017

1018
    if (pStatus->execId != pTask->execId) {
495!
1019
      // TODO
1020
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
×
1021
      schProcessOnCbEnd(pJob, pTask, 0);
×
1022
      continue;
×
1023
    }
1024

1025
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
495!
1026
      // RECORD AND HANDLE ERROR!!!!
1027
      schProcessOnCbEnd(pJob, pTask, 0);
×
1028
      continue;
×
1029
    }
1030

1031
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
495✔
1032
      code = schRescheduleTask(pJob, pTask);
1✔
1033
    }
1034

1035
    schProcessOnCbEnd(pJob, pTask, code);
495✔
1036
  }
1037

1038
  return TSDB_CODE_SUCCESS;
22,133✔
1039
}
1040

1041
int32_t schHandleExplainRes(SArray *pExplainRes) {
×
1042
  int32_t code = 0;
×
1043
  int32_t resNum = taosArrayGetSize(pExplainRes);
×
1044
  if (resNum <= 0) {
×
1045
    goto _return;
×
1046
  }
1047

1048
  SSchTask *pTask = NULL;
×
1049
  SSchJob  *pJob = NULL;
×
1050

1051
  for (int32_t i = 0; i < resNum; ++i) {
×
1052
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
×
1053
    if (NULL == localRsp) {
×
1054
      qError("fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1055
      continue;
×
1056
    }
1057

1058
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg",
×
1059
           localRsp->qId, localRsp->cId, localRsp->tId);
1060

1061
    pJob = NULL;
×
1062
    (void)schAcquireJob(localRsp->rId, &pJob);
×
1063
    if (NULL == pJob) {
×
1064
      qWarn("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " job no exist, may be dropped, refId:0x%" PRIx64,
×
1065
            localRsp->qId, localRsp->cId, localRsp->tId, localRsp->rId);
1066
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
×
1067
    }
1068

1069
    int8_t status = 0;
×
1070
    if (schJobNeedToStop(pJob, &status)) {
×
1071
      SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
×
1072
      (void)schReleaseJob(pJob->refId);
×
1073
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1074
    }
1075

1076
    code = schGetTaskInJob(pJob, localRsp->tId, &pTask);
×
1077

1078
    if (TSDB_CODE_SUCCESS == code) {
×
1079
      code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
×
1080
    }
1081

1082
    (void)schReleaseJob(pJob->refId);
×
1083

1084
    qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x",
×
1085
           localRsp->qId, localRsp->cId, localRsp->tId, code);
1086

1087
    SCH_ERR_JRET(code);
×
1088

1089
    localRsp->rsp.numOfPlans = 0;
×
1090
    localRsp->rsp.subplanInfo = NULL;
×
1091
    pTask = NULL;
×
1092
  }
1093

1094
_return:
×
1095

1096
  for (int32_t i = 0; i < resNum; ++i) {
×
1097
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
×
1098
    if (NULL == localRsp) {
×
1099
      qError("in _return fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1100
      continue;
×
1101
    }
1102

1103
    tFreeSExplainRsp(&localRsp->rsp);
×
1104
  }
1105

1106
  taosArrayDestroy(pExplainRes);
×
1107

1108
  SCH_RET(code);
×
1109
}
1110

1111
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
164,225✔
1112
  SSubplan *plan = pTask->plan;
164,225✔
1113
  int32_t   code = 0;
164,225✔
1114

1115
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
164,225!
1116
    SCH_LOCK(SCH_WRITE, &pTask->planLock);
164,386!
1117
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
164,519✔
1118
    SCH_UNLOCK(SCH_WRITE, &pTask->planLock);
164,427!
1119

1120
    if (TSDB_CODE_SUCCESS != code) {
164,330✔
1121
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
30!
1122
                    pTask->msgLen);
1123
      SCH_ERR_RET(code);
30!
1124
    } else if (tsQueryPlannerTrace) {
164,300!
1125
      char   *msg = NULL;
×
1126
      int32_t msgLen = 0;
×
1127
      SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
×
1128
      SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
×
1129
      taosMemoryFree(msg);
×
1130
    }
1131
  }
1132

1133
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
164,139!
1134

1135
  if (SCH_IS_QUERY_JOB(pJob)) {
164,424✔
1136
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
36,779!
1137
  }
1138

1139
  SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType, NULL));
164,420!
1140
}
1141

1142
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
×
1143
  // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
1144
  if (NULL == schMgmt.queryMgmt) {
×
1145
    void* p = NULL;
×
1146
    SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, &p, NULL));
×
1147
    if (atomic_val_compare_exchange_ptr(&schMgmt.queryMgmt, NULL, p)) {
×
1148
      qWorkerDestroy(&p);
×
1149
    }
1150
  }
1151

1152
  SArray *explainRes = NULL;
×
1153
  int32_t code = 0;
×
1154
  SQWMsg  qwMsg = {0};
×
1155
  qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
×
1156
  qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
×
1157
  qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
×
1158
  qwMsg.msg = pTask->plan;
×
1159
  qwMsg.msgType = pTask->plan->msgType;
×
1160
  qwMsg.connInfo.handle = pJob->conn.pTrans;
×
1161
  qwMsg.pWorkerCb = pJob->pWorkerCb;
×
1162

1163
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
×
1164
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
×
1165
    if (NULL == explainRes) {
×
1166
      SCH_ERR_RET(terrno);
×
1167
    }
1168
  }
1169

1170
  SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, pJob->seriesId, pJob->queryId, pTask->clientId, pTask->taskId,
×
1171
                                        pJob->refId, pTask->execId, &qwMsg, explainRes));
1172

1173
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
×
1174
    SCH_ERR_RET(schHandleExplainRes(explainRes));
×
1175
    explainRes = NULL;
×
1176
  }
1177

1178
  SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
×
1179

1180
_return:
×
1181

1182
  taosArrayDestroy(explainRes);
×
1183

1184
  SCH_RET(code);
×
1185
}
1186

1187
int32_t schLaunchTaskImpl(void *param) {
164,424✔
1188
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
164,424✔
1189
  SSchJob     *pJob = NULL;
164,424✔
1190

1191
  (void)schAcquireJob(pCtx->jobRid, &pJob);
164,424✔
1192
  if (NULL == pJob) {
164,662✔
1193
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
60!
1194
    taosMemoryFree(param);
60!
1195
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
60!
1196
  }
1197

1198
  SSchTask *pTask = pCtx->pTask;
164,602✔
1199

1200
  if (pCtx->asyncLaunch) {
164,602✔
1201
    SCH_LOCK_TASK(pTask);
28,184!
1202
  }
1203

1204
  pTask->execId++;
164,501✔
1205
  pTask->retryTimes++;
164,501✔
1206
  pTask->waitRetry = false;
164,501✔
1207

1208
  int8_t  status = 0;
164,501✔
1209
  int32_t code = 0;
164,501✔
1210

1211
  if (atomic_load_64(&pTask->seriesId) < atomic_load_64(&pJob->seriesId)) {
164,501!
1212
    SCH_TASK_DLOG("task seriesId:0x%" PRIx64 " is smaller than job seriesId:0x%" PRIx64 ", skip launch",
×
1213
                  pTask->seriesId, pJob->seriesId);
1214
    goto _return;
×
1215
  }
1216

1217
  (void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
164,360✔
1218

1219
  SCH_TASK_TLOG("start to launch %s task, execId %d, retry %d",
164,577!
1220
                SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
1221

1222
  SCH_LOG_TASK_START_TS(pTask);
328,947!
1223

1224
  if (schJobNeedToStop(pJob, &status)) {
164,458!
1225
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
×
1226
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1227
  }
1228

1229
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
164,586!
1230
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
164,549!
1231
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
164,388✔
1232
  }
1233

1234
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
164,523!
1235
    SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
×
1236
  } else {
1237
    SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
164,523!
1238
  }
1239

1240
#if 0
1241
  if (SCH_IS_QUERY_JOB(pJob)) {
1242
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
1243
  }
1244
#endif
1245

1246
_return:
164,469✔
1247

1248
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
164,469✔
1249
    if (code) {
28,170!
1250
      code = schProcessOnTaskFailure(pJob, pTask, code);
×
1251
    }
1252
    if (code) {
28,170!
1253
      code = schHandleJobFailure(pJob, code);
×
1254
    }
1255
  }
1256

1257
  if (pCtx->asyncLaunch) {
164,469✔
1258
    SCH_UNLOCK_TASK(pTask);
28,184!
1259
  }
1260

1261
  (void)schReleaseJob(pJob->refId);
164,456✔
1262

1263
  taosMemoryFree(param);
164,559!
1264

1265
  SCH_RET(code);
164,540!
1266
}
1267

1268
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
164,367✔
1269
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
164,367!
1270
  if (NULL == param) {
164,457!
1271
    SCH_ERR_RET(terrno);
×
1272
  }
1273

1274
  param->jobRid = pJob->refId;
164,457✔
1275
  param->pTask = pTask;
164,457✔
1276

1277
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
164,457✔
1278
    param->asyncLaunch = true;
28,186✔
1279
    SCH_ERR_RET(taosAsyncExec(schLaunchTaskImpl, param, NULL));
28,186!
1280
  } else {
1281
    SCH_ERR_RET(schLaunchTaskImpl(param));
136,271!
1282
  }
1283

1284
  return TSDB_CODE_SUCCESS;
164,542✔
1285
}
1286

1287
// Note: no more error processing, handled in function internal
1288
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
164,409✔
1289
  bool    enough = false;
164,409✔
1290
  int32_t code = 0;
164,409✔
1291

1292
  SCH_SET_TASK_HANDLE(pTask, NULL);
164,409✔
1293

1294
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
164,409!
1295
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
630!
1296

1297
    if (enough) {
600✔
1298
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
373!
1299
    }
1300
  } else {
1301
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
163,779!
1302
  }
1303

1304
  return TSDB_CODE_SUCCESS;
164,521✔
1305

1306
_return:
60✔
1307
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
60!
1308
}
1309

1310
void schHandleTimerEvent(void *param, void *tmrId) {
30✔
1311
  SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
30✔
1312
  SSchTask       *pTask = NULL;
30✔
1313
  SSchJob        *pJob = NULL;
30✔
1314
  int32_t         code = 0;
30✔
1315

1316
  qDebug("delayTimer:%" PRIuPTR " is launched", (uintptr_t)tmrId);
30!
1317

1318
  int64_t  rId = pTimerParam->rId;
30✔
1319
  uint64_t queryId = pTimerParam->queryId;
30✔
1320
  uint64_t taskId = pTimerParam->taskId;
30✔
1321

1322
  if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, taskId)) {
30!
1323
    return;
30✔
1324
  }
1325

1326
  if (0 == atomic_load_8(&pTask->delayLaunchPar.exit)) {
×
1327
    code = schLaunchTask(pJob, pTask);
×
1328
  } else {
1329
    SCH_TASK_DLOG("task will not be launched since query job exiting, status: %d", pTask->status);
×
1330
  }
1331

1332
  schProcessOnCbEnd(pJob, pTask, code);
×
1333
}
1334

1335
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
164,391✔
1336
  if (pTask->delayExecMs > 0) {
164,391✔
1337
    pTask->delayLaunchPar.rId = pJob->refId;
30✔
1338
    pTask->delayLaunchPar.queryId = pJob->queryId;
30✔
1339
    pTask->delayLaunchPar.taskId = pTask->taskId;
30✔
1340

1341
    SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
30!
1342
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
30✔
1343

1344
    if (NULL == pTask->delayTimer) {
30!
1345
      pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer);
30✔
1346
      if (NULL == pTask->delayTimer) {
30!
1347
        SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
30!
1348
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
30!
1349
      }
1350

1351
      SCH_TASK_DLOG("task delayTimer:%" PRIuPTR " is started to launch task after:%.2fs", (uintptr_t)pTask->delayTimer,
×
1352
                    pTask->delayExecMs/1000.0);
1353
      return TSDB_CODE_SUCCESS;
×
1354
    }
1355

1356
    if (taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer,
×
1357
                     &pTask->delayTimer)) {
1358
      SCH_TASK_ELOG("taosTmrReset delayExec timer failed, handle:%p, tmr:%" PRIuPTR, schMgmt.timer,
×
1359
                    (uintptr_t)pTask->delayTimer);
1360
    } else {
1361
      SCH_TASK_DLOG("task start in %.2fs later by handler:%p, tmr:%" PRIuPTR, pTask->delayExecMs / 1000.0,
×
1362
                    schMgmt.timer, (uintptr_t)pTask->delayTimer);
1363
    }
1364

1365
    return TSDB_CODE_SUCCESS;
×
1366
  }
1367

1368
  SCH_RET(schLaunchTask(pJob, pTask));
164,361!
1369
}
1370

1371
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
136,540✔
1372
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
136,540!
1373

1374
  for (int32_t i = 0; i < level->taskNum; ++i) {
293,052✔
1375
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
156,190✔
1376
    pTask->failedSeriesId = pJob->seriesId - 1;
156,190✔
1377
    pTask->seriesId = pJob->seriesId;
156,190✔
1378
    
1379
    SCH_TASK_TLOG("task seriesId set to 0x%" PRIx64, pTask->seriesId);
156,190!
1380

1381
    SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
156,190!
1382
  }
1383

1384
  return TSDB_CODE_SUCCESS;
136,862✔
1385
}
1386

1387
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
138,018✔
1388
  if (!SCH_JOB_NEED_DROP(pJob)) {
138,018✔
1389
    return;
120,184✔
1390
  }
1391

1392
  void *pIter = taosHashIterate(list, NULL);
17,834✔
1393
  while (pIter) {
54,637✔
1394
    SSchTask *pTask = *(SSchTask **)pIter;
36,778✔
1395

1396
    if (pTask->delayTimer) {
36,778!
1397
      schStopTaskDelayTimer(pJob, pTask, true);
×
1398
    }
1399

1400
    SCH_LOCK_TASK(pTask);
36,778!
1401
    schDropTaskOnExecNode(pJob, pTask);
36,770✔
1402
    SCH_UNLOCK_TASK(pTask);
36,761!
1403

1404
    pIter = taosHashIterate(list, pIter);
36,770✔
1405
  }
1406
}
1407

1408
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pCurrTask) {
×
1409
  int32_t code = TSDB_CODE_SUCCESS;
×
1410

1411
  SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type));
×
1412

1413
  void *pIter = taosHashIterate(list, NULL);
×
1414
  while (pIter) {
×
1415
    SSchTask *pTask = *(SSchTask **)pIter;
×
1416
    if (pTask != pCurrTask) {
×
1417
      SCH_LOCK_TASK(pTask);
×
1418
      code = schNotifyTaskOnExecNode(pJob, pTask, type);
×
1419
      SCH_UNLOCK_TASK(pTask);
×
1420

1421
      if (TSDB_CODE_SUCCESS != code) {
×
1422
        break;
×
1423
      }
1424
    }
1425

1426
    pIter = taosHashIterate(list, pIter);
×
1427
  }
1428

1429
  SCH_RET(code);
×
1430
}
1431

1432
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
17,626✔
1433
  SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL));
17,626!
1434
}
1435

1436
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
30✔
1437
  void   *pRsp = NULL;
30✔
1438
  int32_t code = 0;
30✔
1439
  SArray *explainRes = NULL;
30✔
1440

1441
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
30!
1442
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
30✔
1443
    if (NULL == explainRes) {
30!
1444
      SCH_ERR_RET(terrno);
×
1445
    }
1446
  }
1447

1448
  SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, pJob->seriesId, pJob->queryId, pTask->clientId, pTask->taskId,
30!
1449
                                        pJob->refId, pTask->execId, &pRsp, explainRes));
1450

1451
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
×
1452
    SCH_ERR_RET(schHandleExplainRes(explainRes));
×
1453
    explainRes = NULL;
×
1454
  }
1455

1456
  SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
×
1457

1458
_return:
×
1459

1460
  taosArrayDestroy(explainRes);
30✔
1461

1462
  SCH_RET(code);
30!
1463
}
1464

1465
// Note: no more error processing, handled in function internal
1466
int32_t schLaunchFetchTask(SSchJob *pJob) {
17,686✔
1467
  int32_t code = 0;
17,686✔
1468

1469
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
17,686✔
1470
  if (fetchRes) {
17,686✔
1471
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
30!
1472
    return TSDB_CODE_SUCCESS;
30✔
1473
  }
1474

1475
  SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH);
17,656✔
1476

1477
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
17,656!
1478
    SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
30!
1479
  } else {
1480
    SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
17,626!
1481
  }
1482

1483
  return TSDB_CODE_SUCCESS;
17,626✔
1484

1485
_return:
30✔
1486

1487
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
30!
1488
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc