• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.98
/source/libs/scheduler/src/schTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "command.h"
18
#include "query.h"
19
#include "qworker.h"
20
#include "schInt.h"
21
#include "tglobal.h"
22
#include "tmsg.h"
23
#include "tref.h"
24
#include "trpc.h"
25

26
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
3,952,840✔
27
  schDeregisterTaskHb(pJob, pTask);
3,952,840✔
28

29
  if (pTask->candidateAddrs) {
3,952,841✔
30
    taosArrayDestroy(pTask->candidateAddrs);
3,449,099✔
31
  }
32

33
  taosMemoryFreeClear(pTask->msg);
3,952,842✔
34

35
  if (pTask->children) {
3,952,844✔
36
    taosArrayDestroy(pTask->children);
689,294✔
37
  }
38

39
  if (pTask->parents) {
3,952,844✔
40
    taosArrayDestroy(pTask->parents);
1,443,652✔
41
  }
42

43
  if (pTask->execNodes) {
3,952,844✔
44
    taosHashCleanup(pTask->execNodes);
3,952,842✔
45
  }
46

47
  taosArrayDestroy(pTask->profile.execTime);
3,952,843✔
48
}
3,952,843✔
49

50
void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
3,952,831✔
51
  if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
3,952,831!
52
    pTask->maxRetryTimes = SCH_DEFAULT_MAX_RETRY_NUM;
3,917,343✔
53
  } else {
54
    int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
35,488✔
55
    pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM);
35,488✔
56
  }
57

58
  pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
3,952,831✔
59
}
3,952,831✔
60

61
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
3,952,820✔
62
  int32_t code = 0;
3,952,820✔
63

64
  pTask->plan = pPlan;
3,952,820✔
65
  pTask->level = pLevel;
3,952,820✔
66
  pTask->execId = -1;
3,952,820✔
67
  pTask->failedExecId = -2;
3,952,820✔
68
  pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
3,952,820✔
69
  pTask->clientId = getClientId();
3,952,820✔
70
  pTask->taskId = schGenTaskId();
3,952,823✔
71

72
  schInitTaskRetryTimes(pJob, pTask, pLevel);
3,952,843✔
73

74
  pTask->execNodes =
3,952,829✔
75
      taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
3,952,834✔
76
  pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
3,952,829✔
77
  if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
3,952,840!
78
    SCH_ERR_JRET(terrno);
×
79
  }
80

81
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
3,952,840✔
82

83
  SCH_TASK_DLOG("task initialized, max times %d:%d", pTask->maxRetryTimes, pTask->maxExecTimes);
3,952,843!
84

85
  return TSDB_CODE_SUCCESS;
3,952,841✔
86

87
_return:
×
88

89
  taosArrayDestroy(pTask->profile.execTime);
×
90
  taosHashCleanup(pTask->execNodes);
×
91

92
  SCH_RET(code);
×
93
}
94

95
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
3,519,515✔
96
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
3,519,515✔
97
    return TSDB_CODE_SUCCESS;
72,352✔
98
  }
99

100
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
3,447,163✔
101
  if (NULL == addr) {
3,447,205!
102
    SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
×
103
                  (int32_t)taosArrayGetSize(pTask->candidateAddrs));
104
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
105
  }
106

107
  pTask->succeedAddr = *addr;
3,447,135✔
108

109
  return TSDB_CODE_SUCCESS;
3,447,135✔
110
}
111

112
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
1,689,353✔
113
  SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
1,689,353!
114

115
  if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
1,689,353!
116
    SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
×
117
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
118
  }
119

120
  SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
1,690,160!
121

122
  return TSDB_CODE_SUCCESS;
1,689,993✔
123
}
124

125
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
93✔
126
  if (NULL == pTask->execNodes) {
93!
127
    return TSDB_CODE_SUCCESS;
×
128
  }
129

130
  if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
93!
131
    SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
×
132
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
133
  } else {
134
    SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
93!
135
  }
136

137
  if ((execId != pTask->execId) || pTask->waitRetry) {  // ignore it
93!
138
    SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
×
139
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
140
  }
141

142
  return TSDB_CODE_SUCCESS;
93✔
143
}
144

145
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
2,388,119✔
146
  if (taosHashGetSize(pTask->execNodes) <= 0) {
2,388,119✔
147
    return TSDB_CODE_SUCCESS;
1✔
148
  }
149

150
  SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
2,388,304✔
151
  if (NULL == nodeInfo) {  // ignore it
2,388,319!
152
    SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
×
153
                  pTask->execId, pTask->waitRetry);
154
    return TSDB_CODE_SUCCESS;
×
155
  }
156

157
  nodeInfo->handle = handle;
2,388,319✔
158

159
  SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);
2,388,319!
160

161
  return TSDB_CODE_SUCCESS;
2,388,296✔
162
}
163

164
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId) {
2,388,246✔
165
  if (dropExecNode) {
2,388,246✔
166
    SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
93!
167
  }
168

169
  SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId));
2,388,153!
170

171
  if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) {  // ignore it
2,388,303!
172
    SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
×
173
                  pTask->execId, pTask->waitRetry);
174
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
175
  }
176

177
  SCH_SET_TASK_HANDLE(pTask, handle);
2,388,297✔
178

179
  return TSDB_CODE_SUCCESS;
2,388,297✔
180
}
181

182
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
1,742✔
183
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
1,742✔
184
    return TSDB_CODE_SCH_IGNORE_ERROR;
3✔
185
  }
186

187
  pTask->failedExecId = pTask->execId;
1,739✔
188

189
  int8_t jobStatus = 0;
1,739✔
190
  if (schJobNeedToStop(pJob, &jobStatus)) {
1,739✔
191
    SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
12!
192
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
12!
193
  }
194

195
  int8_t taskStatus = SCH_GET_TASK_STATUS(pTask);
1,727✔
196
  if (taskStatus == JOB_TASK_STATUS_FAIL || taskStatus == JOB_TASK_STATUS_SUCC) {
1,727!
197
    SCH_TASK_ELOG("task already done, status:%s", jobTaskStatusStr(taskStatus));
×
198
    SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
×
199
  }
200

201
  if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
1,727!
202
    SCH_LOG_TASK_WAIT_TS(pTask);
×
203
  } else {
204
    SCH_LOG_TASK_END_TS(pTask);
1,727!
205
  }
206

207
  bool    needRetry = false;
1,727✔
208
  bool    moved = false;
1,727✔
209
  int32_t taskDone = 0;
1,727✔
210

211
  SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
1,727!
212

213
  SCH_ERR_RET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
1,727!
214

215
  if (!needRetry) {
1,727!
216
    SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
1,727!
217

218
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);
1,727✔
219

220
    if (SCH_JOB_NEED_WAIT(pJob)) {
1,727✔
221
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
1,336!
222
      pTask->level->taskFailed++;
1,336✔
223
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
1,336✔
224
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
1,336!
225

226
      schUpdateJobErrCode(pJob, errCode);
1,336✔
227

228
      if (taskDone < pTask->level->taskNum) {
1,336✔
229
        SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
6!
230
        SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
6!
231
      }
232

233
      SCH_RET(atomic_load_32(&pJob->errCode));
1,330!
234
    }
235
  } else {
236
    SCH_ERR_RET(schHandleTaskRetry(pJob, pTask));
×
237

238
    return TSDB_CODE_SUCCESS;
×
239
  }
240

241
  SCH_RET(errCode);
391!
242
}
243

244
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
3,519,514✔
245
  bool    moved = false;
3,519,514✔
246
  int32_t code = 0;
3,519,514✔
247

248
  SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
3,519,514!
249

250
  SCH_LOG_TASK_END_TS(pTask);
3,519,523!
251

252
  int32_t taskDone = atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1);
3,519,532✔
253

254
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
3,519,564✔
255

256
  SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
3,519,565!
257

258
  SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
3,519,485!
259

260
  int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
3,519,534✔
261
  if (parentNum == 0) {
3,519,531✔
262
    int32_t taskDone = 0;
2,388,139✔
263
    if (SCH_JOB_NEED_WAIT(pJob)) {
2,388,139✔
264
      SCH_LOCK(SCH_WRITE, &pTask->level->lock);
1,757,952!
265
      pTask->level->taskSucceed++;
1,757,952✔
266
      taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
1,757,952✔
267
      SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
1,757,952!
268

269
      if (taskDone < pTask->level->taskNum) {
1,757,951✔
270
        SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
54,185!
271
        return TSDB_CODE_SUCCESS;
54,185✔
272
      } else if (taskDone > pTask->level->taskNum) {
1,703,766!
273
        SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
×
274
      }
275

276
      if (pTask->level->taskFailed > 0) {
1,703,766✔
277
        SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
2!
278
      } else {
279
        SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
1,703,764!
280
      }
281
    } else {
282
      pJob->resNode = pTask->succeedAddr;
630,187✔
283
    }
284

285
    pJob->fetchTask = pTask;
630,187✔
286

287
    SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
630,187!
288
  }
289

290
  /*
291
    if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
292
      tstrncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
293
      job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
294

295
      ++job->dataSrcEps.numOfEps;
296
    }
297
  */
298

299
  for (int32_t i = 0; i < parentNum; ++i) {
2,262,828✔
300
    SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
1,131,360✔
301
    if (NULL == parent) {
1,131,366!
302
      SCH_TASK_ELOG("fail to get task %d parent, parentNum: %d", i, parentNum);
×
303
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
304
    }
305

306
    SCH_LOCK(SCH_WRITE, &parent->planLock);
1,131,366!
307
    SDownstreamSourceNode source = {
3,394,311✔
308
        .type = QUERY_NODE_DOWNSTREAM_SOURCE,
309
        .clientId = pTask->clientId,
1,131,437✔
310
        .taskId = pTask->taskId,
1,131,437✔
311
        .schedId = schMgmt.sId,
1,131,437✔
312
        .execId = pTask->execId,
1,131,437✔
313
        .addr = pTask->succeedAddr,
314
        .fetchMsgType = SCH_FETCH_TYPE(pTask),
1,131,437✔
315
        .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
1,131,437!
316
    };
317
    code = qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
1,131,437✔
318
    if (TSDB_CODE_SUCCESS != code) {
1,131,431!
319
      SCH_TASK_ELOG("qSetSubplanExecutionNode failed, groupId: %d", pTask->plan->id.groupId);
×
320
    }
321
    SCH_UNLOCK(SCH_WRITE, &parent->planLock);
1,131,431!
322

323
    SCH_ERR_RET(code);
1,131,436!
324

325
    int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
1,131,436✔
326

327
    if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
1,131,444✔
328
      SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
562,511!
329
      SCH_ERR_RET(schLaunchTask(pJob, parent));
562,511!
330
    }
331
  }
332

333
  if (taskDone == pTask->level->taskNum) {
1,131,468✔
334
    SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));
459,617!
335
  }
336

337
  return TSDB_CODE_SUCCESS;
1,131,467✔
338
}
339

340
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
25✔
341
  if (!schMgmt.cfg.enableReSchedule) {
25!
342
    return TSDB_CODE_SUCCESS;
25✔
343
  }
344

345
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
×
346
    return TSDB_CODE_SUCCESS;
×
347
  }
348

349
  if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask &&
×
350
      taosArrayGetSize(pTask->candidateAddrs) > 1) {
×
351
    SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
×
352
    schDropTaskOnExecNode(pJob, pTask);
×
353
    taosHashClear(pTask->execNodes);
×
354

355
    SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR));
×
356
  }
357

358
  return TSDB_CODE_SUCCESS;
×
359
}
360

361
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode) {
485✔
362
  SSchRedirectCtx *pCtx = &pTask->redirectCtx;
485✔
363
  if (!pCtx->inRedirect) {
485✔
364
    pCtx->inRedirect = true;
55✔
365
    pCtx->periodMs = tsRedirectPeriod;
55✔
366
    pCtx->startTs = taosGetTimestampMs();
55✔
367

368
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
55!
369
      if (pEpSet) {
55✔
370
        pCtx->roundTotal = pEpSet->numOfEps;
41✔
371
      } else if (pTask->candidateAddrs && taosArrayGetSize(pTask->candidateAddrs) > 0) {
14!
372
        SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
14✔
373
        pCtx->roundTotal = pAddr->epSet.numOfEps;
14✔
374
      } else {
375
        pCtx->roundTotal = SCH_DEFAULT_RETRY_TOTAL_ROUND;
×
376
      }
377
    } else {
378
      pCtx->roundTotal = 1;
×
379
    }
380

381
    goto _return;
55✔
382
  }
383

384
  pCtx->totalTimes++;
430✔
385
  pCtx->roundTimes++;
430✔
386

387
  if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
430!
388
    pCtx->roundTotal = pEpSet->numOfEps;
336✔
389
  }
390

391
  if (pCtx->roundTimes >= pCtx->roundTotal) {
430✔
392
    int64_t nowTs = taosGetTimestampMs();
194✔
393
    int64_t lastTime = nowTs - pCtx->startTs;
194✔
394
    if (lastTime > tsMaxRetryWaitTime) {
194✔
395
      SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
3!
396
                    nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
397
      pJob->noMoreRetry = true;
3✔
398
      SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode));
3!
399
    }
400

401
    pCtx->periodMs *= tsRedirectFactor;
191✔
402
    if (pCtx->periodMs > tsRedirectMaxPeriod) {
191✔
403
      pCtx->periodMs = tsRedirectMaxPeriod;
69✔
404
    }
405

406
    int64_t leftTime = tsMaxRetryWaitTime - lastTime;
191✔
407
    pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
191✔
408

409
    pCtx->roundTimes = 0;
191✔
410

411
    goto _return;
191✔
412
  }
413

414
  pTask->delayExecMs = 0;
236✔
415

416
_return:
482✔
417

418
  SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal,
482!
419
                pCtx->totalTimes, pTask->delayExecMs);
420

421
  return TSDB_CODE_SUCCESS;
482✔
422
}
423

424
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
482✔
425
  pTask->waitRetry = true;
482✔
426

427
  schDropTaskOnExecNode(pJob, pTask);
482✔
428
  if (pTask->delayTimer) {
482✔
429
    if (!taosTmrStopA(&pTask->delayTimer)) {
174!
430
      SCH_TASK_WLOG("stop task delayTimer failed, may stopped, status:%d", pTask->status);
174!
431
    }
432
  }
433
  taosHashClear(pTask->execNodes);
482✔
434
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
482✔
435
  schDeregisterTaskHb(pJob, pTask);
482✔
436
  taosMemoryFreeClear(pTask->msg);
482!
437
  pTask->msgLen = 0;
482✔
438
  pTask->lastMsgType = 0;
482✔
439
  pTask->childReady = 0;
482✔
440
  TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
482✔
441
}
482✔
442

443
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
485✔
444
  int32_t code = 0;
485✔
445

446
  SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
485!
447

448
  if (!NO_RET_REDIRECT_ERROR(rspCode)) {
485!
449
    SCH_UPDATE_REDIRECT_CODE(pJob, rspCode);
392✔
450
  }
451

452
  SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
485!
453

454
  schResetTaskForRetry(pJob, pTask);
482✔
455

456
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
482!
457
    if (pData && pData->pEpSet) {
482!
458
      SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
377!
459
    } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
105!
460
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
×
461
      if (NULL == addr) {
×
462
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
×
463
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
464
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
465
      }
466

467
      SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
×
468
      SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
×
469
                    addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
470
    } else {
471
      SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
105✔
472
      if (NULL == addr) {
105!
473
        SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
×
474
                      (int32_t)taosArrayGetSize(pTask->candidateAddrs));
475
        SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
476
      }
477

478
      SCH_SWITCH_EPSET(addr);
105✔
479
      SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
105!
480
    }
481

482
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
482✔
483

484
    SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
482!
485

486
    return TSDB_CODE_SUCCESS;
482✔
487
  }
488

489
  // merge plan
490

491
  pTask->childReady = 0;
×
492

493
  qClearSubplanExecutionNode(pTask->plan);
×
494

495
  // Note: current error task and upper level merge task
496
  if ((pData && 0 == pData->len) || NULL == pData) {
×
497
    SCH_ERR_JRET(schSwitchTaskCandidateAddr(pJob, pTask));
×
498
  }
499

500
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
×
501

502
  int32_t childrenNum = taosArrayGetSize(pTask->children);
×
503
  for (int32_t i = 0; i < childrenNum; ++i) {
×
504
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
×
505
    SCH_LOCK_TASK(pChild);
×
506
    (void)schDoTaskRedirect(pJob, pChild, NULL, rspCode);  // error handled internal
×
507
    SCH_UNLOCK_TASK(pChild);
×
508
  }
509

510
  return TSDB_CODE_SUCCESS;
×
511

512
_return:
3✔
513

514
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
3!
515
}
516

517
int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) {
485✔
518
  SSchLevel *pLevel = pTask->level;
485✔
519

520
  SCH_TASK_DLOG("start to reset level for current task set, execDone:%d, launched:%d",
485!
521
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
522

523
  if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
485!
524
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
×
525
  }
526

527
  (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
485✔
528

529
  int32_t childrenNum = taosArrayGetSize(pTask->children);
485✔
530
  for (int32_t i = 0; i < childrenNum; ++i) {
485!
531
    SSchTask *pChild = taosArrayGetP(pTask->children, i);
×
532
    if (NULL == pChild) {
×
533
      SCH_TASK_ELOG("fail to get the %dth child, childrenNum:%d", i, childrenNum);
×
534
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
535
    }
536

537
    SCH_LOCK_TASK(pChild);
×
538
    pLevel = pChild->level;
×
539
    (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
×
540
    (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
×
541
    SCH_UNLOCK_TASK(pChild);
×
542
  }
543

544
  SCH_TASK_DLOG("end to reset level for current task set, execDone:%d, launched:%d",
485!
545
                atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
546

547
  return TSDB_CODE_SUCCESS;
485✔
548
}
549

550
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
485✔
551
  int32_t code = 0;
485✔
552

553
  SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));
485!
554

555
  if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
485!
556
    if (NULL == pData->pEpSet) {
×
557
      SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
×
558
      code = TSDB_CODE_INVALID_MSG;
×
559
      goto _return;
×
560
    }
561
  }
562

563
  SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
485!
564

565
  SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask));
485!
566

567
  SCH_RESET_JOB_LEVEL_IDX(pJob);
485!
568

569
  code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
485✔
570

571
  taosMemoryFreeClear(pData->pData);
485!
572
  taosMemoryFreeClear(pData->pEpSet);
485✔
573

574
  SCH_RET(code);
485!
575

UNCOV
576
_return:
×
577

UNCOV
578
  taosMemoryFreeClear(pData->pData);
×
UNCOV
579
  taosMemoryFreeClear(pData->pEpSet);
×
580

UNCOV
581
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
×
582
}
583

584
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
3,520,931✔
585
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
3,520,931✔
586
  if (0 != code) {
3,521,515!
587
    if (HASH_NODE_EXIST(code)) {
×
588
      SCH_TASK_ELOG("task already in execTask list, code:%x", code);
×
589
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
590
    }
591

592
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
×
593
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
594
  }
595

596
  SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
3,521,515!
597

598
  return TSDB_CODE_SUCCESS;
3,521,503✔
599
}
600

601
/*
602
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
603
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
604
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
605
  } else {
606
    SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
607
  }
608

609
  int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
610
  if (0 != code) {
611
    if (HASH_NODE_EXIST(code)) {
612
      *moved = true;
613
      SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
614
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
615
    }
616

617
    SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
618
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
619
  }
620

621
  *moved = true;
622

623
  SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
624

625
  return TSDB_CODE_SUCCESS;
626
}
627

628
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
629
  *moved = false;
630

631
  if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
632
    SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
633
  }
634

635
  int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
636
  if (0 != code) {
637
    if (HASH_NODE_EXIST(code)) {
638
      *moved = true;
639

640
      SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
641
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
642
    }
643

644
    SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
645
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
646
  }
647

648
  *moved = true;
649

650
  SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
651

652
  return TSDB_CODE_SUCCESS;
653
}
654

655
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
656
  if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
657
    SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
658
  }
659

660
  int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
661
  if (0 != code) {
662
    if (HASH_NODE_EXIST(code)) {
663
      *moved = true;
664

665
      SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
666
      SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
667
    }
668

669
    SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
670
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
671
  }
672

673
  *moved = true;
674

675
  SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
676

677
  return TSDB_CODE_SUCCESS;
678
}
679
*/
680

681
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
1,727✔
682
  if (pJob->noMoreRetry) {
1,727✔
683
    *needRetry = false;
3✔
684
    SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes,
3!
685
                  pTask->maxRetryTimes);
686
    return TSDB_CODE_SUCCESS;
3✔
687
  }
688

689
  if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
1,724!
690
    pTask->maxExecTimes++;
×
691
    pTask->maxRetryTimes++;
×
692
    if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
×
693
      pTask->timeoutUsec *= 2;
×
694
      if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
×
695
        pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
×
696
      }
697
    }
698
  }
699

700
  if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) {
1,724✔
701
    *needRetry = false;
3✔
702
    SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes,
3!
703
                  pTask->maxRetryTimes);
704
    return TSDB_CODE_SUCCESS;
3✔
705
  }
706

707
  if ((pTask->execId + 1) >= pTask->maxExecTimes) {
1,721!
708
    *needRetry = false;
×
709
    SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d", pTask->execId, pTask->maxExecTimes);
×
710
    return TSDB_CODE_SUCCESS;
×
711
  }
712

713
  if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) {
1,721!
714
    *needRetry = false;
1,721✔
715
    SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
1,721!
716
    return TSDB_CODE_SUCCESS;
1,721✔
717
  }
718

719
  /*
720
    if (SCH_IS_DATA_BIND_TASK(pTask)) {
721
      if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
722
        *needRetry = false;
723
        SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
724
                      SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
725
        return TSDB_CODE_SUCCESS;
726
      }
727
    } else {
728
      int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
729

730
      if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
731
        *needRetry = false;
732
        SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
733
                      pTask->candidateIdx, candidateNum);
734
        return TSDB_CODE_SUCCESS;
735
      }
736
    }
737
  */
738

739
  *needRetry = true;
×
740
  SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));
×
741

742
  return TSDB_CODE_SUCCESS;
×
743
}
744

745
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
×
746
  (void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
×
747

748
  (void)schRemoveTaskFromExecList(pJob, pTask);  // ignore error
×
749
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
×
750

751
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
×
752
    SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
×
753
  }
754

755
  schDeregisterTaskHb(pJob, pTask);
×
756

757
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
×
758
    SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
×
759
    if (NULL == addr) {
×
760
      SCH_TASK_ELOG("fail to the %dth condidateAddr, totalNum:%d", pTask->candidateIdx,
×
761
                    (int32_t)taosArrayGetSize(pTask->candidateAddrs));
762
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
763
    }
764

765
    SCH_SWITCH_EPSET(addr);
×
766
  } else {
767
    SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
×
768
  }
769

770
  SCH_ERR_RET(schLaunchTask(pJob, pTask));
×
771

772
  return TSDB_CODE_SUCCESS;
×
773
}
774

775
int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
443,768✔
776
  int32_t addNum = 0;
443,768✔
777
  int32_t nodeNum = 0;
443,768✔
778

779
  if (pJob->nodeList) {
443,768✔
780
    nodeNum = taosArrayGetSize(pJob->nodeList);
443,765✔
781

782
    for (int32_t i = 0; i < nodeNum; ++i) {
1,051,739✔
783
      SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
607,983✔
784
      if (NULL == nload) {
607,982!
785
        SCH_TASK_ELOG("fail to get the %dth node in nodeList, nodeNum:%d", i, nodeNum);
×
786
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
787
      }
788

789
      SQueryNodeAddr *naddr = &nload->addr;
607,982✔
790

791
      if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
1,215,961!
792
        SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
×
793
        SCH_ERR_RET(terrno);
×
794
      }
795

796
      SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
607,979!
797
                    naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
798
                    SCH_GET_CUR_EP(naddr)->port);
799

800
      ++addNum;
607,977✔
801
    }
802
  }
803

804
  if (addNum <= 0) {
443,759✔
805
    SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
2!
806
    SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
5!
807
  }
808

809
  return TSDB_CODE_SUCCESS;
443,760✔
810
}
811

812
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
3,445,472✔
813
  if (NULL != pTask->candidateAddrs) {
3,445,472✔
814
    return TSDB_CODE_SUCCESS;
481✔
815
  }
816

817
  pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
3,444,991✔
818
  if (NULL == pTask->candidateAddrs) {
3,446,925!
819
    SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
×
820
    SCH_ERR_RET(terrno);
×
821
  }
822

823
  if (pTask->plan->execNode.epSet.numOfEps > 0) {
3,447,042✔
824
    if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
6,006,995!
825
      SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
×
826
      SCH_ERR_RET(terrno);
×
827
    }
828

829
    SCH_TASK_DLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
3,003,716!
830

831
    return TSDB_CODE_SUCCESS;
3,004,703✔
832
  }
833

834
  if (SCH_IS_DATA_BIND_TASK(pTask)) {
443,763!
835
    SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
1!
836
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
1!
837
  }
838

839
  SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
443,763!
840

841
  pTask->candidateIdx = taosRand() % taosArrayGetSize(pTask->candidateAddrs);
443,760✔
842

843
  /*
844
    for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
845
      tstrncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
846
      epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
847

848
      ++epSet->numOfEps;
849
    }
850
  */
851

852
  return TSDB_CODE_SUCCESS;
443,785✔
853
}
854

855
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
377✔
856
  int32_t code = TSDB_CODE_SUCCESS;
377✔
857
  if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
377!
858
    SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
×
859
                  (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
860
    SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
861
  }
862

863
  SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
377✔
864
  if (NULL == pAddr) {
377!
865
    SCH_TASK_ELOG("fail to get task 0th condidataAddr, totalNum:%d", (int32_t)taosArrayGetSize(pTask->candidateAddrs));
×
866
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
867
  }
868

869
  char *origEpset = NULL;
377✔
870
  char *newEpset = NULL;
377✔
871

872
  SCH_ERR_RET(schDumpEpSet(&pAddr->epSet, &origEpset));
377!
873
  SCH_ERR_JRET(schDumpEpSet(pEpSet, &newEpset));
377!
874

875
  SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
377!
876

877
  TAOS_MEMCPY(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
377✔
878

879
_return:
377✔
880

881
  taosMemoryFree(origEpset);
377✔
882
  taosMemoryFree(newEpset);
377✔
883

884
  return code;
377✔
885
}
886

887
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
×
888
  int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
×
889
  if (candidateNum <= 1) {
×
890
    goto _return;
×
891
  }
892

893
  switch (schMgmt.cfg.schPolicy) {
×
894
    case SCH_LOAD_SEQ:
×
895
    case SCH_ALL:
896
    default:
897
      if (++pTask->candidateIdx >= candidateNum) {
×
898
        pTask->candidateIdx = 0;
×
899
      }
900
      break;
×
901
    case SCH_RANDOM: {
×
902
      int32_t lastIdx = pTask->candidateIdx;
×
903
      while (lastIdx == pTask->candidateIdx) {
×
904
        pTask->candidateIdx = taosRand() % candidateNum;
×
905
      }
906
      break;
×
907
    }
908
  }
909

910
_return:
×
911

912
  SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum);
×
913

914
  return TSDB_CODE_SUCCESS;
×
915
}
916

917
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
482✔
918
  int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
482✔
919
  if (code) {
482!
920
    SCH_TASK_WLOG("task already not in execTask list, code:%x", code);
×
921
  }
922

923
  return TSDB_CODE_SUCCESS;
482✔
924
}
925

926
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
1,762,646✔
927
  if (NULL == pTask->execNodes) {
1,762,646!
928
    SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
×
929
    return;
×
930
  }
931

932
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
1,762,646✔
933
  if (size <= 0) {
1,762,648✔
934
    SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
72,453!
935
    return;
72,453✔
936
  }
937

938
  int32_t       i = 0;
1,690,195✔
939
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
1,690,195✔
940
  while (nodeInfo) {
3,380,389✔
941
    if (nodeInfo->handle) {
1,690,195!
942
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
1,690,195✔
943
      void *pExecId = taosHashGetKey(nodeInfo, NULL);
1,690,195✔
944
      (void)schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId);  // ignore error and continue
1,690,194✔
945

946
      SCH_TASK_DLOG("start to drop task's %dth execNode", i);
1,690,193!
947
    } else {
948
      SCH_TASK_DLOG("no need to drop task %dth execNode", i);
×
949
    }
950

951
    ++i;
1,690,195✔
952
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
1,690,195✔
953
  }
954

955
  SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
1,690,194!
956
}
957

958
int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) {
×
959
  int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
×
960
  if (size <= 0) {
×
961
    SCH_TASK_DLOG("task no exec address to notify, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
×
962
    return TSDB_CODE_SUCCESS;
×
963
  }
964

965
  int32_t       i = 0;
×
966
  SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
×
967
  while (nodeInfo) {
×
968
    if (nodeInfo->handle) {
×
969
      SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
×
970
      SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_TASK_NOTIFY, &type));
×
971
      SCH_TASK_DLOG("start to notify %d to task's %dth execNode", type, i);
×
972
    } else {
973
      SCH_TASK_DLOG("no need to notify %d to task %dth execNode", type, i);
×
974
    }
975

976
    ++i;
×
977
    nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
×
978
  }
979

980
  SCH_TASK_DLOG("task has been notified %d on %d exec nodes", type, size);
×
981
  return TSDB_CODE_SUCCESS;
×
982
}
983

984
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
958,157✔
985
  int32_t   taskNum = (int32_t)taosArrayGetSize(pStatusList);
958,157✔
986
  SSchTask *pTask = NULL;
958,213✔
987
  SSchJob  *pJob = NULL;
958,213✔
988

989
  qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn,
958,213✔
990
         pEpId->ep.port);
991

992
  for (int32_t i = 0; i < taskNum; ++i) {
971,459✔
993
    STaskStatus *pStatus = taosArrayGet(pStatusList, i);
13,223✔
994
    if (NULL == pStatus) {
13,223!
995
      qError("fail to get the %dth task status in hb rsp, taskNum:%d", i, taskNum);
×
996
      continue;
×
997
    }
998

999
    int32_t code = 0;
13,223✔
1000

1001
    qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId,
13,223✔
1002
           pStatus->clientId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status));
1003

1004
    if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
13,223✔
1005
      continue;
3,888✔
1006
    }
1007

1008
    if (pStatus->execId != pTask->execId) {
9,334!
1009
      // TODO
1010
      SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId);
×
1011
      schProcessOnCbEnd(pJob, pTask, 0);
×
1012
      continue;
×
1013
    }
1014

1015
    if (pStatus->status == JOB_TASK_STATUS_FAIL) {
9,334!
1016
      // RECORD AND HANDLE ERROR!!!!
1017
      schProcessOnCbEnd(pJob, pTask, 0);
×
1018
      continue;
×
1019
    }
1020

1021
    if (pStatus->status == JOB_TASK_STATUS_INIT) {
9,334✔
1022
      code = schRescheduleTask(pJob, pTask);
25✔
1023
    }
1024

1025
    schProcessOnCbEnd(pJob, pTask, code);
9,334✔
1026
  }
1027

1028
  return TSDB_CODE_SUCCESS;
958,236✔
1029
}
1030

1031
int32_t schHandleExplainRes(SArray *pExplainRes) {
1,764✔
1032
  int32_t code = 0;
1,764✔
1033
  int32_t resNum = taosArrayGetSize(pExplainRes);
1,764✔
1034
  if (resNum <= 0) {
1,764✔
1035
    goto _return;
1,176✔
1036
  }
1037

1038
  SSchTask *pTask = NULL;
588✔
1039
  SSchJob  *pJob = NULL;
588✔
1040

1041
  for (int32_t i = 0; i < resNum; ++i) {
1,176✔
1042
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
588✔
1043
    if (NULL == localRsp) {
588!
1044
      qError("fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1045
      continue;
×
1046
    }
1047

1048
    qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg",
588!
1049
           localRsp->qId, localRsp->cId, localRsp->tId);
1050

1051
    pJob = NULL;
588✔
1052
    (void)schAcquireJob(localRsp->rId, &pJob);
588✔
1053
    if (NULL == pJob) {
588!
1054
      qWarn("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64,
×
1055
            localRsp->qId, localRsp->cId, localRsp->tId, localRsp->rId);
1056
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
×
1057
    }
1058

1059
    int8_t status = 0;
588✔
1060
    if (schJobNeedToStop(pJob, &status)) {
588!
1061
      SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
×
1062
      (void)schReleaseJob(pJob->refId);
×
1063
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1064
    }
1065

1066
    code = schGetTaskInJob(pJob, localRsp->tId, &pTask);
588✔
1067

1068
    if (TSDB_CODE_SUCCESS == code) {
588!
1069
      code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
588✔
1070
    }
1071

1072
    (void)schReleaseJob(pJob->refId);
588✔
1073

1074
    qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x",
588!
1075
           localRsp->qId, localRsp->cId, localRsp->tId, code);
1076

1077
    SCH_ERR_JRET(code);
588!
1078

1079
    localRsp->rsp.numOfPlans = 0;
588✔
1080
    localRsp->rsp.subplanInfo = NULL;
588✔
1081
    pTask = NULL;
588✔
1082
  }
1083

1084
_return:
588✔
1085

1086
  for (int32_t i = 0; i < resNum; ++i) {
2,352✔
1087
    SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
588✔
1088
    if (NULL == localRsp) {
588!
1089
      qError("in _return fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
×
1090
      continue;
×
1091
    }
1092

1093
    tFreeSExplainRsp(&localRsp->rsp);
588✔
1094
  }
1095

1096
  taosArrayDestroy(pExplainRes);
1,764✔
1097

1098
  SCH_RET(code);
1,764!
1099
}
1100

1101
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
3,447,296✔
1102
  SSubplan *plan = pTask->plan;
3,447,296✔
1103
  int32_t   code = 0;
3,447,296✔
1104

1105
  if (NULL == pTask->msg) {  // TODO add more detailed reason for failure
3,447,296!
1106
    code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
3,447,740✔
1107
    if (TSDB_CODE_SUCCESS != code) {
3,444,733!
1108
      SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
×
1109
                    pTask->msgLen);
1110
      SCH_ERR_RET(code);
×
1111
    } else if (tsQueryPlannerTrace) {
3,444,733!
1112
      char   *msg = NULL;
×
1113
      int32_t msgLen = 0;
×
1114
      SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
×
1115
      SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
×
1116
      taosMemoryFree(msg);
×
1117
    }
1118
  }
1119

1120
  SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
3,444,289!
1121

1122
  if (SCH_IS_QUERY_JOB(pJob)) {
3,448,863✔
1123
    SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
1,689,592!
1124
  }
1125

1126
  SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType, NULL));
3,448,706!
1127
}
1128

1129
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
72,358✔
1130
  // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
1131
  if (NULL == schMgmt.queryMgmt) {
72,358✔
1132
    SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL));
129!
1133
  }
1134

1135
  SArray *explainRes = NULL;
72,358✔
1136
  int32_t code = 0;
72,358✔
1137
  SQWMsg  qwMsg = {0};
72,358✔
1138
  qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
72,358✔
1139
  qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
72,358✔
1140
  qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
72,358✔
1141
  qwMsg.msg = pTask->plan;
72,358✔
1142
  qwMsg.msgType = pTask->plan->msgType;
72,358✔
1143
  qwMsg.connInfo.handle = pJob->conn.pTrans;
72,358✔
1144
  qwMsg.pWorkerCb = pJob->pWorkerCb;
72,358✔
1145

1146
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
72,358✔
1147
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
588✔
1148
    if (NULL == explainRes) {
588!
1149
      SCH_ERR_RET(terrno);
×
1150
    }
1151
  }
1152

1153
  SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->clientId, pTask->taskId,
72,358!
1154
                                        pJob->refId, pTask->execId, &qwMsg, explainRes));
1155

1156
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
72,352✔
1157
    SCH_ERR_RET(schHandleExplainRes(explainRes));
588!
1158
    explainRes = NULL;
588✔
1159
  }
1160

1161
  SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
72,352!
1162

1163
_return:
72,352✔
1164

1165
  taosArrayDestroy(explainRes);
72,358✔
1166

1167
  SCH_RET(code);
72,358!
1168
}
1169

1170
int32_t schLaunchTaskImpl(void *param) {
3,521,577✔
1171
  SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
3,521,577✔
1172
  SSchJob     *pJob = NULL;
3,521,577✔
1173

1174
  (void)schAcquireJob(pCtx->jobRid, &pJob);
3,521,577✔
1175
  if (NULL == pJob) {
3,521,927!
1176
    qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
×
1177
    taosMemoryFree(param);
×
1178
    SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
×
1179
  }
1180

1181
  SSchTask *pTask = pCtx->pTask;
3,521,927✔
1182

1183
  if (pCtx->asyncLaunch) {
3,521,927✔
1184
    SCH_LOCK_TASK(pTask);
1,363,873!
1185
  }
1186

1187
  int8_t  status = 0;
3,521,469✔
1188
  int32_t code = 0;
3,521,469✔
1189

1190
  (void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
3,521,469✔
1191
  pTask->execId++;
3,521,851✔
1192
  pTask->retryTimes++;
3,521,851✔
1193
  pTask->waitRetry = false;
3,521,851✔
1194

1195
  SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d",
3,521,851!
1196
                SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
1197

1198
  SCH_LOG_TASK_START_TS(pTask);
7,041,927!
1199

1200
  if (schJobNeedToStop(pJob, &status)) {
3,520,250✔
1201
    SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
1!
1202
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
1!
1203
  }
1204

1205
  if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
3,521,903!
1206
    SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
3,521,879!
1207
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
3,521,459✔
1208
  }
1209

1210
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
3,521,316✔
1211
    SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
72,358!
1212
  } else {
1213
    SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
3,448,958!
1214
  }
1215

1216
#if 0
1217
  if (SCH_IS_QUERY_JOB(pJob)) {
1218
    SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
1219
  }
1220
#endif
1221

1222
_return:
3,448,707✔
1223

1224
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
3,521,068✔
1225
    if (code) {
1,363,343✔
1226
      code = schProcessOnTaskFailure(pJob, pTask, code);
9✔
1227
    }
1228
    if (code) {
1,363,343✔
1229
      code = schHandleJobFailure(pJob, code);
9✔
1230
    }
1231
  }
1232

1233
  if (pCtx->asyncLaunch) {
3,521,068✔
1234
    SCH_UNLOCK_TASK(pTask);
1,363,604!
1235
  }
1236

1237
  (void)schReleaseJob(pJob->refId);
3,521,088✔
1238

1239
  taosMemoryFree(param);
3,521,916✔
1240

1241
  SCH_RET(code);
3,521,588!
1242
}
1243

1244
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
3,521,897✔
1245
  SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
3,521,897✔
1246
  if (NULL == param) {
3,521,937✔
1247
    SCH_ERR_RET(terrno);
4!
1248
  }
1249

1250
  param->jobRid = pJob->refId;
3,521,933✔
1251
  param->pTask = pTask;
3,521,933✔
1252

1253
  if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
3,521,933✔
1254
    param->asyncLaunch = true;
1,363,895✔
1255
    SCH_ERR_RET(taosAsyncExec(schLaunchTaskImpl, param, NULL));
1,363,895!
1256
  } else {
1257
    SCH_ERR_RET(schLaunchTaskImpl(param));
2,158,038!
1258
  }
1259

1260
  return TSDB_CODE_SUCCESS;
3,521,929✔
1261
}
1262

1263
// Note: no more error processing, handled in function internal
1264
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
3,521,902✔
1265
  bool    enough = false;
3,521,902✔
1266
  int32_t code = 0;
3,521,902✔
1267

1268
  SCH_SET_TASK_HANDLE(pTask, NULL);
3,521,902✔
1269

1270
  if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
3,521,902!
1271
    SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
×
1272

1273
    if (enough) {
×
1274
      SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
×
1275
    }
1276
  } else {
1277
    SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
3,521,902!
1278
  }
1279

1280
  return TSDB_CODE_SUCCESS;
3,521,928✔
1281

1282
_return:
×
1283

1284
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
×
1285
}
1286

1287
void schHandleTimerEvent(void *param, void *tmrId) {
191✔
1288
  SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
191✔
1289
  SSchTask       *pTask = NULL;
191✔
1290
  SSchJob        *pJob = NULL;
191✔
1291
  int32_t         code = 0;
191✔
1292

1293
  int64_t  rId = pTimerParam->rId;
191✔
1294
  uint64_t queryId = pTimerParam->queryId;
191✔
1295
  uint64_t taskId = pTimerParam->taskId;
191✔
1296
  taosMemoryFree(pTimerParam);
191✔
1297

1298
  if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, taskId)) {
191✔
1299
    return;
1✔
1300
  }
1301

1302
  code = schLaunchTask(pJob, pTask);
190✔
1303

1304
  schProcessOnCbEnd(pJob, pTask, code);
190✔
1305
}
1306

1307
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
482✔
1308
  if (pTask->delayExecMs > 0) {
482✔
1309
    SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam));
191✔
1310
    if (NULL == param) {
191!
1311
      SCH_TASK_ELOG("taosMemoryMalloc %d failed", (int)sizeof(SSchTimerParam));
×
1312
      SCH_ERR_RET(terrno);
×
1313
    }
1314

1315
    param->rId = pJob->refId;
191✔
1316
    param->queryId = pJob->queryId;
191✔
1317
    param->taskId = pTask->taskId;
191✔
1318

1319
    if (NULL == pTask->delayTimer) {
191!
1320
      pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer);
191✔
1321
      if (NULL == pTask->delayTimer) {
191!
1322
        SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
×
1323
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1324
      }
1325

1326
      return TSDB_CODE_SUCCESS;
191✔
1327
    }
1328

1329
    if (taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer)) {
×
1330
      SCH_TASK_ELOG("taosTmrReset delayExec timer failed, handle:%p", schMgmt.timer);
×
1331
    }
1332

1333
    return TSDB_CODE_SUCCESS;
×
1334
  }
1335

1336
  SCH_RET(schLaunchTask(pJob, pTask));
291!
1337
}
1338

1339
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
2,335,640✔
1340
  SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
2,335,640!
1341

1342
  for (int32_t i = 0; i < level->taskNum; ++i) {
5,278,648✔
1343
    SSchTask *pTask = taosArrayGet(level->subTasks, i);
2,942,978✔
1344

1345
    SCH_ERR_RET(schLaunchTask(pJob, pTask));
2,942,982!
1346
  }
1347

1348
  return TSDB_CODE_SUCCESS;
2,335,670✔
1349
}
1350

1351
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
2,455,140✔
1352
  if (!SCH_JOB_NEED_DROP(pJob)) {
2,455,140✔
1353
    return;
1,705,241✔
1354
  }
1355

1356
  void *pIter = taosHashIterate(list, NULL);
749,899✔
1357
  while (pIter) {
2,512,065✔
1358
    SSchTask *pTask = *(SSchTask **)pIter;
1,762,165✔
1359

1360
    SCH_LOCK_TASK(pTask);
1,762,165!
1361
    if (pTask->delayTimer) {
1,762,166✔
1362
      if (!taosTmrStopA(&pTask->delayTimer)) {
16!
1363
        SCH_TASK_WLOG("stop delayTimer failed, status:%d", pTask->status);
16!
1364
      }
1365
    }
1366
    schDropTaskOnExecNode(pJob, pTask);
1,762,166✔
1367
    SCH_UNLOCK_TASK(pTask);
1,762,165!
1368

1369
    pIter = taosHashIterate(list, pIter);
1,762,164✔
1370
  }
1371
}
1372

1373
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pCurrTask) {
×
1374
  int32_t code = TSDB_CODE_SUCCESS;
×
1375

1376
  SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type));
×
1377

1378
  void *pIter = taosHashIterate(list, NULL);
×
1379
  while (pIter) {
×
1380
    SSchTask *pTask = *(SSchTask **)pIter;
×
1381
    if (pTask != pCurrTask) {
×
1382
      SCH_LOCK_TASK(pTask);
×
1383
      code = schNotifyTaskOnExecNode(pJob, pTask, type);
×
1384
      SCH_UNLOCK_TASK(pTask);
×
1385

1386
      if (TSDB_CODE_SUCCESS != code) {
×
1387
        break;
×
1388
      }
1389
    }
1390

1391
    pIter = taosHashIterate(list, pIter);
×
1392
  }
1393

1394
  SCH_RET(code);
×
1395
}
1396

1397
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
675,953✔
1398
  SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL));
675,953!
1399
}
1400

1401
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
96,737✔
1402
  void   *pRsp = NULL;
96,737✔
1403
  int32_t code = 0;
96,737✔
1404
  SArray *explainRes = NULL;
96,737✔
1405

1406
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
96,737✔
1407
    explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
1,176✔
1408
    if (NULL == explainRes) {
1,176!
1409
      SCH_ERR_RET(terrno);
×
1410
    }
1411
  }
1412

1413
  SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->clientId, pTask->taskId,
96,737!
1414
                                        pJob->refId, pTask->execId, &pRsp, explainRes));
1415

1416
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
96,737✔
1417
    SCH_ERR_RET(schHandleExplainRes(explainRes));
1,176!
1418
    explainRes = NULL;
1,176✔
1419
  }
1420

1421
  SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
96,737!
1422

1423
_return:
96,737✔
1424

1425
  taosArrayDestroy(explainRes);
96,737✔
1426

1427
  SCH_RET(code);
96,737!
1428
}
1429

1430
// Note: no more error processing, handled in function internal
1431
int32_t schLaunchFetchTask(SSchJob *pJob) {
772,689✔
1432
  int32_t code = 0;
772,689✔
1433

1434
  void *fetchRes = atomic_load_ptr(&pJob->fetchRes);
772,689✔
1435
  if (fetchRes) {
772,688!
1436
    SCH_JOB_DLOG("res already fetched, res:%p", fetchRes);
×
1437
    return TSDB_CODE_SUCCESS;
×
1438
  }
1439

1440
  SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH);
772,688✔
1441

1442
  if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
772,690!
1443
    SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
96,737!
1444
  } else {
1445
    SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
675,953!
1446
  }
1447

1448
  return TSDB_CODE_SUCCESS;
772,689✔
1449

1450
_return:
×
1451

1452
  SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
×
1453
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc