• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.57
/source/libs/scheduler/src/schJob.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "command.h"
18
#include "plannodes.h"
19
#include "query.h"
20
#include "schInt.h"
21
#include "tmsg.h"
22
#include "tref.h"
23
#include "trpc.h"
24

25
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
1,075,414,689✔
26
  if (TSDB_CODE_SUCCESS == errCode) {
1,075,414,689✔
27
    return;
524,704,339✔
28
  }
29

30
  int32_t origCode = atomic_load_32(&pJob->errCode);
550,710,350✔
31
  if (TSDB_CODE_SUCCESS == origCode) {
550,716,157✔
32
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
369,127,286✔
33
      goto _return;
369,127,081✔
34
    }
35

UNCOV
36
    origCode = atomic_load_32(&pJob->errCode);
×
37
  }
38

39
  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
181,591,375✔
40
    return;
21,737✔
41
  }
42

43
  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
181,569,638✔
44
    atomic_store_32(&pJob->errCode, errCode);
×
45
    goto _return;
×
46
  }
47

48
  return;
181,570,286✔
49

50
_return:
369,127,081✔
51

52
  if (SCH_IS_SUBQ_JOB(pJob)) {
369,127,081✔
53
    schUpdateJobErrCode((SSchJob*)pJob->parent, errCode);
161,549,149✔
54
  }
55

56
  SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode));
369,127,928✔
57
}
58

59
bool schJobDone(SSchJob *pJob) {
2,147,483,647✔
60
  int8_t status = SCH_GET_JOB_STATUS(pJob);
2,147,483,647✔
61

62
  if (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || status == JOB_TASK_STATUS_SUCC) {
2,147,483,647✔
63
    return true;
5,076,786✔
64
  }
65

66
  if (SCH_IS_SUBQ_JOB(pJob)) {
2,147,483,647✔
67
    return schJobDone((SSchJob*)pJob->parent);
341,994,042✔
68
  }
69

70
  return false;
2,147,483,647✔
71
}
72

73
FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
1,086,636,058✔
74
  int8_t status = SCH_GET_JOB_STATUS(pJob);
2,147,483,647✔
75
  if (pStatus) {
2,147,483,647✔
76
    *pStatus = status;
2,147,483,647✔
77
  }
78

79
  if (schJobDone(pJob)) {
2,147,483,647✔
80
    return true;
5,095,978✔
81
  }
82

83
  if (pJob->chkKillFp && (*pJob->chkKillFp)(pJob->chkKillParam)) {
2,147,483,647✔
84
    schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
48,983✔
85
    return true;
3,470✔
86
  }
87

88
  return false;
2,147,483,647✔
89
}
90

91
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
2,147,483,647✔
92
  int32_t code = 0;
2,147,483,647✔
93

94
  int8_t oriStatus = 0;
2,147,483,647✔
95

96
  while (true) {
97
    oriStatus = SCH_GET_JOB_STATUS(pJob);
2,147,483,647✔
98

99
    if (oriStatus == newStatus) {
2,147,483,647✔
100
      if (JOB_TASK_STATUS_FETCH == newStatus) {
126,451,589✔
101
        return code;
126,399,443✔
102
      }
103

104
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
52,146✔
105
    }
106

107
    switch (oriStatus) {
2,147,483,647✔
108
      case JOB_TASK_STATUS_NULL:
900,189,955✔
109
        if (newStatus != JOB_TASK_STATUS_INIT) {
900,189,955✔
110
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
168,103,851✔
111
        }
112

113
        break;
732,085,903✔
114
      case JOB_TASK_STATUS_INIT:
732,110,875✔
115
        if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_DROP) {
732,110,875✔
116
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
117
        }
118

119
        break;
732,110,875✔
120
      case JOB_TASK_STATUS_EXEC:
732,120,315✔
121
        if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_FAIL &&
732,120,315✔
122
            newStatus != JOB_TASK_STATUS_DROP) {
123
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
124
        }
125

126
        break;
732,120,315✔
127
      case JOB_TASK_STATUS_PART_SUCC:
719,268,793✔
128
        if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
719,268,793✔
129
            newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC &&
146,363,517✔
130
            newStatus != JOB_TASK_STATUS_FETCH) {
131
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
132
        }
133

134
        break;
719,268,793✔
135
      case JOB_TASK_STATUS_FETCH:
146,363,467✔
136
        if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
146,363,467✔
137
            newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC &&
×
138
            newStatus != JOB_TASK_STATUS_FETCH) {
139
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
140
        }
141

142
        break;
146,363,467✔
143
      case JOB_TASK_STATUS_SUCC:
159,219,115✔
144
      case JOB_TASK_STATUS_FAIL:
145
        if (newStatus != JOB_TASK_STATUS_DROP) {
159,219,115✔
146
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
147
        }
148

149
        break;
159,219,115✔
150
      case JOB_TASK_STATUS_DROP:
6,441✔
151
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
6,441✔
152
        break;
×
153

154
      default:
1,118✔
155
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
1,118✔
156
        SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
1,118✔
157
    }
158

159
    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
2,147,483,647✔
160
      continue;
3,129✔
161
    }
162

163
    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
2,147,483,647✔
164

165
    break;
2,147,483,647✔
166
  }
167

168
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
169

170
_return:
168,163,997✔
171

172
  if (TSDB_CODE_SCH_IGNORE_ERROR == code) {
168,163,997✔
173
    SCH_JOB_DLOG("ignore job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
52,146✔
174
  } else {
175
    SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
168,111,851✔
176
  }
177
  SCH_RET(code);
168,163,537✔
178
}
179

180
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
893,634,340✔
181
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
1,918,550,931✔
182
    SSchLevel* pLevel = taosArrayGet(pJob->levels, i);
1,024,984,253✔
183
    if (NULL == pLevel) {
1,024,973,067✔
184
      SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, pJob->levelNum);
×
185
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
1,773✔
186
    }
187

188
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
2,147,483,647✔
189
      SSchTask* pTask = taosArrayGet(pLevel->subTasks, m);
1,279,787,853✔
190
      if (NULL == pTask) {
1,279,813,044✔
191
        SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum: %d", m, pLevel->level, pLevel->taskNum);
×
192
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
1,957✔
193
      }
194

195
      SSubplan* pPlan = pTask->plan;
1,279,814,310✔
196
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
1,279,826,080✔
197
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
1,279,793,581✔
198

199
      if (childNum > 0) {
1,279,802,011✔
200
        if (pJob->levelIdx == pLevel->level) {
143,023,471✔
201
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
×
202
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
203
        }
204

205
        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
143,023,471✔
206
        if (NULL == pTask->children) {
143,019,849✔
207
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
20✔
208
          SCH_ERR_RET(terrno);
20✔
209
        }
210
      }
211

212
      int32_t n = 0;
1,279,800,941✔
213
      SNode*  childNode = NULL;
1,279,800,941✔
214
      FOREACH(childNode, pPlan->pChildren) {
1,646,829,268✔
215
        SSubplan* child = (SSubplan*)childNode;
367,027,862✔
216
        if (NULL == child) {
367,027,862✔
217
          SCH_JOB_ELOG("fail to get the %dth child subplan, childNum: %d", n, childNum);
×
218
          SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
219
        }
220

221
        if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(child)) {
367,027,862✔
222
          SCH_JOB_ELOG("invalid subplan type for the %dth child, level:%d, subplanNodeType:%d", n, i, nodeType(child));
×
223
          SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
224
        }
225

226
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
367,027,862✔
227
        if (NULL == childTask || NULL == *childTask) {
367,030,637✔
228
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
×
229
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
230
        }
231

232
        if (NULL == taosArrayPush(pTask->children, childTask)) {
734,058,756✔
233
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
×
234
          SCH_ERR_RET(terrno);
×
235
        }
236

237
        SCH_TASK_DLOG("children info, the %d child TID 0x%" PRIx64, n, (*childTask)->taskId);
367,028,119✔
238
        ++n;
367,028,327✔
239
      }
240

241
      if (parentNum > 0) {
1,279,808,697✔
242
        if (0 == pLevel->level) {
367,028,788✔
243
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
×
244
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
245
        }
246

247
        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
367,028,788✔
248
        if (NULL == pTask->parents) {
367,028,701✔
249
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
×
250
          SCH_ERR_RET(terrno);
×
251
        }
252
      } else {
253
        if (0 != pLevel->level) {
912,779,909✔
254
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
×
UNCOV
255
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
256
        }
257
      }
258

259
      n = 0;
1,279,803,784✔
260
      SNode* parentNode = NULL;
1,279,803,784✔
261
      FOREACH(parentNode, pPlan->pParents) {
1,646,832,616✔
262
        SSubplan* parent = (SSubplan*)parentNode;
367,031,076✔
263
        if (NULL == parent) {
367,031,076✔
264
          SCH_JOB_ELOG("fail to get the %dth parent subplan, parentNum: %d", n, parentNum);
×
265
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
266
        }
267

268
        if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(parent)) {
367,031,076✔
269
          SCH_JOB_ELOG("invalid subplan type for the %dth parent, level:%d, subplanNodeType:%d", n, i, nodeType(parent));
×
270
          SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
271
        }
272

273
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
367,031,076✔
274
        if (NULL == parentTask || NULL == *parentTask) {
367,031,351✔
275
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
×
276
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
277
        }
278

279
        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
734,054,641✔
280
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
×
281
          SCH_ERR_RET(terrno);
×
282
        }
283

284
        SCH_TASK_DLOG("parents info, the %d parent TID 0x%" PRIx64, n, (*parentTask)->taskId);
367,023,290✔
285
        ++n;
367,028,832✔
286
      }
287

288
      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
1,279,777,374✔
289
    }
290
  }
291

292
  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
893,592,341✔
293
  if (NULL == pLevel) {
893,634,516✔
294
    SCH_JOB_ELOG("fail to get level 0 level, levelNum:%d", (int32_t)taosArrayGetSize(pJob->levels));
×
295
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
296
  }
297

298
  if (SCH_IS_QUERY_JOB(pJob)) {
893,634,516✔
299
    if (pLevel->taskNum > 1) {
355,357,427✔
300
      SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
×
301
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
302
    }
303

304
    SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
355,347,350✔
305
    if (NULL == pLevel) {
355,332,279✔
306
      SCH_JOB_ELOG("fail to get the first task in level 0, taskNum:%d", (int32_t)taosArrayGetSize(pLevel->subTasks));
×
307
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
6,232✔
308
    }
309

310
    if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType || EXPLAIN_MODE_DISABLE != pJob->attr.explainMode) {
355,338,511✔
311
      pJob->attr.needFetch = true;
354,803,222✔
312
    }
313
  }
314

315
  return TSDB_CODE_SUCCESS;
893,634,602✔
316
}
317

318
int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
1,279,817,879✔
319
  if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) {
1,279,817,879✔
320
    return TSDB_CODE_SUCCESS;
702,292,617✔
321
  }
322

323
  if (NULL == taosArrayPush(pJob->dataSrcTasks, &pTask)) {
1,155,084,293✔
324
    SCH_ERR_RET(terrno);
×
325
  }
326

327
  return TSDB_CODE_SUCCESS;
577,553,749✔
328
}
329

330
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
893,674,734✔
331
  int32_t code = 0;
893,674,734✔
332

333
  if (pDag->numOfSubplans <= 0) {
893,674,734✔
334
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
×
335
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
336
  }
337

338
  pJob->dataSrcTasks = taosArrayInit(SCH_GET_TASK_CAPACITY(pDag->numOfSubplans), POINTER_BYTES);
893,689,946✔
339
  if (NULL == pJob->dataSrcTasks) {
893,682,711✔
340
    SCH_ERR_RET(terrno);
×
341
  }
342

343
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
893,712,301✔
344
  if (levelNum <= 0) {
893,727,782✔
345
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
×
346
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
347
  }
348

349
  SHashObj *planToTask = taosHashInit(
893,727,782✔
350
      SCH_GET_TASK_CAPACITY(pDag->numOfSubplans),
893,698,630✔
351
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
352
      HASH_NO_LOCK);
353
  if (NULL == planToTask) {
893,750,386✔
354
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
×
355
    SCH_ERR_JRET(terrno);
×
356
  }
357

358
  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
893,750,386✔
359
  if (NULL == pJob->levels) {
893,645,144✔
360
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
×
361
    SCH_ERR_JRET(terrno);
×
362
  }
363

364
  pJob->levelNum = levelNum;
893,670,478✔
365
  SCH_RESET_JOB_LEVEL_IDX(pJob);
893,691,224✔
366

367
  qDebug("QID:0x%" PRIx64 ", job seriesId set to SID:%" PRId64 ", levelIdx:%d", pJob->queryId, pJob->seriesId,
893,689,164✔
368
         pJob->levelIdx);
369

370
  SSchLevel      level = {0};
893,698,467✔
371
  SNodeListNode *plans = NULL;
893,706,655✔
372
  int32_t        taskNum = 0;
893,706,655✔
373
  int32_t        totalTaskNum = 0;
893,706,655✔
374
  SSchLevel     *pLevel = NULL;
893,706,655✔
375

376
  level.status = JOB_TASK_STATUS_INIT;
893,706,655✔
377

378
  int32_t i = 0;
893,706,655✔
379
  SNode*  levelNode = NULL;
893,706,655✔
380
  FOREACH(levelNode, pDag->pSubplans) {
1,918,698,258✔
381
    if (NULL == taosArrayPush(pJob->levels, &level)) {
2,050,142,353✔
382
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
×
383
      SCH_ERR_JRET(terrno);
×
384
    }
385

386
    pLevel = taosArrayGet(pJob->levels, i);
1,025,068,175✔
387
    if (NULL == pLevel) {
1,025,031,397✔
388
      SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum);
×
389
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
390
    }
391

392
    pLevel->level = i;
1,025,031,397✔
393

394
    plans = (SNodeListNode*)levelNode;
1,025,023,090✔
395
    if (NULL == plans) {
1,025,023,090✔
396
      SCH_JOB_ELOG("empty level plan, level:%d", i);
×
397
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
398
    }
399

400
    if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
1,025,023,090✔
401
      SCH_JOB_ELOG("invalid level plan, level:%d, planNodeType:%d", i, nodeType(plans));
×
UNCOV
402
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
403
    }
404

405
    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
1,025,023,459✔
406
    if (taskNum <= 0) {
1,025,047,899✔
407
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
×
408
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
409
    }
410

411
    totalTaskNum += taskNum;
1,025,047,899✔
412
    if (totalTaskNum > pDag->numOfSubplans) {
1,025,047,899✔
413
      SCH_JOB_ELOG("current totalTaskNum %d is bigger than numOfSubplans %d, level:%d", totalTaskNum, pDag->numOfSubplans, i);
×
414
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
415
    }
416
    
417
    pLevel->taskNum = taskNum;
1,025,032,100✔
418

419
    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
1,025,041,986✔
420
    if (NULL == pLevel->subTasks) {
1,024,999,797✔
421
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
×
422
      SCH_ERR_JRET(terrno);
×
423
    }
424

425
    int32_t n = 0;
1,025,013,566✔
426
    SNode*  planNode = NULL;
1,025,013,566✔
427
    FOREACH(planNode, plans->pNodeList) {
2,147,483,647✔
428
      SSubplan* plan = (SSubplan*)planNode;
1,279,875,422✔
429

430
      SCH_ERR_JRET(schValidateSubplan(pJob, plan, pLevel->level, n, taskNum));
1,279,861,434✔
431
      schSetJobType(pJob, plan->subplanType);
1,279,779,317✔
432

433
      SSchTask  task = {0};
1,279,772,223✔
434
      SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
1,279,788,125✔
435
      if (NULL == pTask) {
1,279,851,740✔
436
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
×
437
        SCH_ERR_JRET(terrno);
×
438
      }
439

440
      SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel));
1,279,851,740✔
441

442
      SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
1,279,832,489✔
443

444
      code = taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES);
1,279,824,771✔
445
      if (0 != code) {
1,279,852,903✔
446
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d, error:%s", n, tstrerror(code));
×
447
        SCH_ERR_JRET(code);
×
448
      }
449

450
      code = taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
1,279,852,903✔
451
      if (0 != code) {
1,279,875,134✔
452
        SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d, error:%s", n, tstrerror(code));
×
453
        SCH_ERR_JRET(code);
×
454
      }
455

456
      if (NULL != pJob->parent) {
1,279,875,134✔
457
        code = taosHashPut(((SSchJob*)pJob->parent)->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
259,452,320✔
458
        if (0 != code) {
259,462,285✔
459
          SCH_TASK_ELOG("taosHashPut to parent taskList failed, taskIdx:%d, error:%s", n, tstrerror(code));
×
460
          SCH_ERR_JRET(code);
×
461
        }
462
      }
463
      
464
      ++pJob->taskNum;
1,279,856,631✔
465
      ++n;
1,279,854,756✔
466
    }
467

468
    SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum);
1,025,003,509✔
469
    ++i;
1,024,991,603✔
470
  }
471

472
  if (!SCH_JOB_GOT_SUB_JOBS(pJob) && totalTaskNum != pDag->numOfSubplans) {
893,633,462✔
473
    SCH_JOB_ELOG("totalTaskNum %d mis-match with numOfSubplans %d", totalTaskNum, pDag->numOfSubplans);
×
474
    SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
475
  }
476

477
  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
893,663,321✔
478

479
_return:
893,683,402✔
480

481
  if (planToTask) {
893,683,936✔
482
    taosHashCleanup(planToTask);
893,694,258✔
483
  }
484

485
  SCH_RET(code);
893,692,802✔
486
}
487

488
void schDumpSubJobsExecRes(SSchJob *pParent, SExecResult *pRes) {
76,764,875✔
489
  int32_t subNum = taosArrayGetSize(pParent->subJobs);
76,764,875✔
490
  for (int32_t i = 0; i < subNum; ++i) {
238,314,755✔
491
    SSchJob* pJob = taosArrayGetP(pParent->subJobs, i);
161,536,815✔
492
    if (NULL == pJob) {
161,540,962✔
493
      continue;
×
494
    }
495
    
496
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
161,540,962✔
497
    if (pJob->execRes.res) {
161,544,677✔
498
      if (pRes->res) {
86,479,244✔
499
        void* p = taosArrayAddAll((SArray*)pRes->res, (SArray*)pJob->execRes.res);
75,391,376✔
500
        SCH_JOB_DLOG("copy sub job %p execRes %p to output", pJob, pJob->execRes.res);
75,390,379✔
501
      } else {
502
        TSWAP(pRes->res, pJob->execRes.res);
11,087,868✔
503
        pRes->msgType = pJob->execRes.msgType;
11,087,868✔
504
        SCH_JOB_DLOG("move sub job %p execRes %p to output", pJob, pRes->res);
11,087,868✔
505
      }
506
    }
507
    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
161,543,680✔
508
  }
509
}
76,777,940✔
510

511
void schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
732,137,729✔
512
  pRes->code = atomic_load_32(&pJob->errCode);
732,137,729✔
513
  pRes->numOfRows = pJob->resNumOfRows;
732,142,617✔
514

515
  SCH_LOCK(SCH_WRITE, &pJob->resLock);
732,141,642✔
516
  pRes->res = pJob->execRes.res;
732,130,808✔
517
  pRes->msgType = pJob->execRes.msgType;
732,137,370✔
518
  pRes->numOfBytes = pJob->execRes.numOfBytes;
732,141,189✔
519
  pJob->execRes.res = NULL;
732,131,958✔
520
  
521
  SCH_JOB_DLOG("move job %p execRes %p to output", pJob, pRes->res);
732,132,263✔
522
  
523
  if (SCH_JOB_GOT_SUB_JOBS(pJob)) {
732,132,263✔
524
    schDumpSubJobsExecRes(pJob, pRes);
76,768,837✔
525
  }
526
  SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
732,143,828✔
527

528
  SCH_JOB_DLOG("exec result dumped, code:%s", tstrerror(pRes->code));
732,136,804✔
529
}
732,136,804✔
530

531
int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
272,762,949✔
532
  int32_t code = 0;
272,762,949✔
533

534
  SCH_LOCK(SCH_WRITE, &pJob->resLock);
272,762,949✔
535

536
  pJob->fetched = true;
272,762,345✔
537

538
  if (pJob->fetchRes && ((SRetrieveTableRsp *)pJob->fetchRes)->completed) {
272,762,923✔
539
    SCH_UPDATE_JOB_PHASE_IF_CHANGED(pJob, QUERY_PHASE_DONE);
292,724,512✔
540
    SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL));
146,362,297✔
541
  }
542

543
  while (true) {
544
    *pData = atomic_load_ptr(&pJob->fetchRes);
272,762,919✔
545
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->fetchRes, *pData, NULL)) {
272,762,923✔
546
      continue;
×
547
    }
548

549
    break;
272,762,949✔
550
  }
551

552
  if (NULL == *pData) {
272,762,949✔
553
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
×
554
    if (NULL == rsp) {
×
555
      SCH_JOB_ELOG("malloc SRetrieveTableRsp %d failed, code:%x", (int32_t)sizeof(SRetrieveTableRsp), terrno);
×
556
      SCH_ERR_JRET(terrno);
×
557
    }
558

559
    rsp->completed = 1;
×
560

561
    *pData = rsp;
×
562
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
×
563
  }
564

565
  SCH_JOB_DLOG("fetch done, totalRows:%" PRId64, pJob->resNumOfRows);
272,762,923✔
566

567
_return:
272,762,919✔
568

569
  SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
272,762,919✔
570

571
  return code;
272,762,880✔
572
}
573

574
int32_t schNotifyUserExecRes(SSchJob *pJob) {
723,959,098✔
575
  SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
723,959,098✔
576
  if (NULL == pRes) {
723,955,223✔
577
    qError("malloc execResult %d failed, error: %x", (int32_t)sizeof(SExecResult), terrno);
×
578
    SCH_RET(terrno);
×
579
  }
580

581
  schDumpJobExecRes(pJob, pRes);
723,955,223✔
582

583
  SCH_JOB_TLOG("sch start to invoke exec cb, code:%s", tstrerror(pJob->errCode));
723,959,075✔
584
  (*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
723,959,075✔
585
  SCH_JOB_TLOG("sch end from exec cb, code:%s", tstrerror(pJob->errCode));
723,970,331✔
586

587
  return TSDB_CODE_SUCCESS;
723,967,616✔
588
}
589

590
int32_t schNotifyUserFetchRes(SSchJob *pJob) {
272,755,109✔
591
  void *pRes = NULL;
272,755,109✔
592

593
  int32_t code = schDumpJobFetchRes(pJob, &pRes);
272,755,109✔
594
  if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == atomic_load_32(&pJob->errCode)) {
272,755,040✔
595
    atomic_store_32(&pJob->errCode, code);
×
596
  }
597
  // Fetch response received, transition to RETURNED state
598
  SCH_UPDATE_JOB_PHASE_IF_CHANGED(pJob, QUERY_PHASE_FETCH_RETURNED);
545,510,149✔
599

600
  SCH_JOB_DLOG("sch start to invoke fetch cb, code:%s", tstrerror(pJob->errCode));
272,755,109✔
601
  (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
272,755,109✔
602
  SCH_JOB_DLOG("sch end from fetch cb, code:%s", tstrerror(pJob->errCode));
272,755,035✔
603

604
  return TSDB_CODE_SUCCESS;
272,755,035✔
605
}
606

607
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
1,905,176,454✔
608
  int32_t code = 0;
1,905,176,454✔
609
  if (!SCH_IS_PARENT_JOB(pJob)) {
1,905,176,454✔
610
    return;
168,103,650✔
611
  }
612
  
613
  SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
1,737,086,852✔
614

615
  if (SCH_OP_NULL == pJob->opStatus.op) {
1,737,085,850✔
616
    SCH_JOB_TLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
732,193,830✔
617
    goto _return;
732,187,103✔
618
  }
619

620
  if (SCH_OP_NULL != op && pJob->opStatus.op != op) {
1,004,894,965✔
621
    SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
×
622
    goto _return;
×
623
  }
624

625
  if (SCH_JOB_IN_SYNC_OP(pJob)) {
1,004,894,978✔
626
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
8,183,876✔
627
    code = tsem_post(&pJob->rspSem);
8,180,913✔
628
    if (code) {
8,186,703✔
629
      SCH_JOB_ELOG("tsem_post failed for syncOp, error:%s", tstrerror(code));
×
630
    }
631
  } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
996,711,255✔
632
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
723,967,127✔
633
    (void)schNotifyUserExecRes(pJob);  // ignore error
723,962,854✔
634
  } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) {
272,750,367✔
635
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
272,755,096✔
636
    (void)schNotifyUserFetchRes(pJob);  // ignore error
272,755,079✔
637
  } else {
638
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
13✔
639
    SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status));
×
640
  }
641

642
  return;
1,004,900,690✔
643

644
_return:
732,187,103✔
645

646
  SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
732,187,103✔
647
}
648

649
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
913,159,774✔
650
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
913,159,774✔
651
    schPostJobRes(pJob, 0);
×
652
    return TSDB_CODE_SCH_IGNORE_ERROR;
×
653
  }
654

655
  schUpdateJobErrCode(pJob, errCode);
913,159,774✔
656

657
  int32_t code = atomic_load_32(&pJob->errCode);
913,161,650✔
658
  if (code) {
913,160,647✔
659
    SCH_JOB_DLOG("job failed with error %s", tstrerror(code));
388,605,817✔
660
  }
661

662
  schPostJobRes(pJob, 0);
913,160,647✔
663

664
  SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
913,156,454✔
665
}
666

667
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
12,950,138✔
668
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
12,950,138✔
669
    return TSDB_CODE_SCH_IGNORE_ERROR;
23,942✔
670
  }
671

672
  (void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode);  // ignore error
12,926,196✔
673
  if (SCH_IS_SUBQ_JOB(pJob)) {
12,926,378✔
674
    SCH_JOB_DLOG("set parent job FAILED since the %dth subJob failed, error:%s", pJob->subJobId, tstrerror(errCode));
6,554,222✔
675
    (void)schSwitchJobStatus((SSchJob*)pJob->parent, JOB_TASK_STATUS_FAIL, &errCode);  // ignore error
6,554,222✔
676
  }
677

678
  return TSDB_CODE_SCH_IGNORE_ERROR;
12,926,696✔
679
}
680

681
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailure(pJob, errCode)); }
732,130,928✔
682

683
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
893,676,388✔
684
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
893,676,388✔
685
    return TSDB_CODE_SCH_IGNORE_ERROR;
×
686
  }
687

688
  (void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode);  // ignore error
893,676,388✔
689

690
  return TSDB_CODE_SCH_IGNORE_ERROR;
893,682,281✔
691
}
692

693
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
719,266,585✔
694
  if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) {
719,266,585✔
695
    SCH_ERR_RET(schLaunchFetchTask(pJob));
×
696
  } else {
697
    schPostJobRes(pJob, 0);
719,269,514✔
698
  }
699

700
  return TSDB_CODE_SUCCESS;
719,270,258✔
701
}
702

703
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
261,594,903✔
704

705
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
1,073,815✔
706
  SCH_TASK_DLOG("got explain rsp, rows:%" PRId64 ", complete:%d", htobe64(pRsp->numOfRows), pRsp->completed);
1,073,815✔
707

708
  atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows));
1,073,815✔
709
  atomic_store_ptr(&pJob->fetchRes, pRsp);
1,073,815✔
710

711
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
1,073,815✔
712

713
  if (!SCH_IS_INSERT_JOB(pJob)) {
1,073,815✔
714
    schProcessOnDataFetched(pJob);
1,069,018✔
715
  }
716

717
  return TSDB_CODE_SUCCESS;
1,073,815✔
718
}
719

720
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
83,746,043✔
721
  if (!SCH_IS_QUERY_JOB(pJob)) {
83,746,043✔
722
    return TSDB_CODE_SUCCESS;
×
723
  }
724

725
  SSchLevel *pLevel = pTask->level;
83,746,043✔
726
  int32_t    doneNum = atomic_load_32(&pLevel->taskExecDoneNum);
83,746,043✔
727
  if (doneNum == pLevel->taskNum) {
83,746,434✔
728
    (void)atomic_sub_fetch_32(&pJob->levelIdx, 1);
83,747,011✔
729

730
    pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
83,747,476✔
731
    if (NULL == pLevel) {
83,747,422✔
732
      SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, (int32_t)taosArrayGetSize(pJob->levels));
×
733
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
734
    }
735

736
    for (int32_t i = 0; i < pLevel->taskNum; ++i) {
176,740,393✔
737
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
92,993,055✔
738
      if (NULL == pTask) {
92,993,561✔
739
        SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", i, pLevel->level, pLevel->taskNum);
×
740
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
741
      }
742

743
      if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
92,993,561✔
744
        continue;
91,929,952✔
745
      }
746

747
      if (SCH_TASK_ALREADY_LAUNCHED(pTask)) {
1,064,364✔
748
        continue;
×
749
      }
750

751
      SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
1,063,676✔
752
    }
753
  }
754

755
  return TSDB_CODE_SUCCESS;
83,746,761✔
756
}
757

758
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
496,230,791✔
759
  if (rsp->tbVerInfo) {
496,230,791✔
760
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
356,990,355✔
761

762
    if (NULL == pJob->execRes.res) {
357,016,831✔
763
      pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
230,109,705✔
764
      if (NULL == pJob->execRes.res) {
230,105,074✔
765
        SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
766
        SCH_ERR_RET(terrno);
×
767
      }
768
      SCH_JOB_DLOG("init job %p execRes %p", pJob, pJob->execRes.res);
230,105,076✔
769
    }
770

771
    pJob->execRes.msgType = TDMT_SCH_QUERY;
357,012,190✔
772

773
    if (NULL == taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0),
357,013,516✔
774
                                  taosArrayGetSize(rsp->tbVerInfo))) {
357,013,478✔
775
      SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
776
      SCH_ERR_RET(terrno);
×
777
    }
778

779
    taosArrayDestroy(rsp->tbVerInfo);
357,016,601✔
780

781
    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
357,016,146✔
782
  }
783

784
  return TSDB_CODE_SUCCESS;
496,262,324✔
785
}
786

787
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
1,474,894,289✔
788
  schGetTaskFromList(pJob->taskList, taskId, pTask);
1,474,894,289✔
789
  if (NULL == *pTask) {
1,474,952,436✔
790
    SCH_JOB_ELOG("task not found in job task list, taskId:0x%" PRIx64, taskId);
×
791
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
203✔
792
  }
793

794
  return TSDB_CODE_SUCCESS;
1,474,844,342✔
795
}
796

797
int32_t schLaunchJobImpl(SSchJob *pJob) {
796,584,126✔
798
  SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
796,584,126✔
799
  if (NULL == level) {
796,612,197✔
800
    SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, (int32_t)taosArrayGetSize(pJob->levels));
×
801
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
802
  }
803

804
  SCH_UPDATE_JOB_PHASE_IF_CHANGED(pJob, QUERY_PHASE_SCHEDULE_NODE_SELECTION);
1,593,232,684✔
805

806
  SCH_ERR_RET(schLaunchLevelTasks(pJob, level));
796,623,612✔
807

808
  // Note: We don't set RESOURCE_ALLOC phase here because tasks may still be
809
  // in flow control queue and not actually launched yet. The EXEC phase will
810
  // be set when actual query messages are sent in schBuildAndSendMsg.
811

812
  return TSDB_CODE_SUCCESS;
796,624,477✔
813
}
814

815
int32_t schLaunchJob(SSchJob *pJob) {
736,255,364✔
816
  if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) {
736,255,364✔
817
    SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes));
30,632,607✔
818
    SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
30,638,933✔
819

820
    return TSDB_CODE_SUCCESS;
30,635,670✔
821
  }
822

823
  if (SCH_IS_PARENT_JOB(pJob) && SCH_JOB_GOT_SUB_JOBS(pJob) && pJob->subJobExecIdx < taosArrayGetSize(pJob->subJobs)) {
705,595,236✔
824
    SCH_ERR_RET(schLaunchJobImpl(taosArrayGetP(pJob->subJobs, pJob->subJobExecIdx++)));
57,274,064✔
825

826
    return TSDB_CODE_SUCCESS;
57,273,374✔
827
  }
828

829
  return schLaunchJobImpl(pJob);
648,344,619✔
830
}
831

832
void schDropJobAllTasks(SSchJob *pJob) {
893,741,536✔
833
  schDropTaskInHashList(pJob, pJob->execTasks);
893,741,536✔
834
  //  schDropTaskInHashList(pJob, pJob->succTasks);
835
  //  schDropTaskInHashList(pJob, pJob->failTasks);
836
}
893,740,912✔
837

838
int32_t schNotifyJobAllTasks(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) {
4,523✔
839
  int32_t code = TSDB_CODE_SUCCESS;
4,523✔
840
  
841
  SCH_ERR_RET(schNotifyTaskInHashList(pJob, pJob->execTasks, type, pTask));
4,523✔
842

843
  if (!SCH_JOB_GOT_SUB_JOBS(pJob)) {
4,523✔
844
    return TSDB_CODE_SUCCESS;
4,523✔
845
  }
846
  
847
  for (int32_t i = 0; i < pJob->subJobs->size; ++i) {
×
848
    SSchJob* pSub = taosArrayGetP(pJob->subJobs, i);
×
849
    if (NULL == pSub) {
×
850
      continue;
×
851
    }
852

853
    SCH_ERR_RET(schNotifyTaskInHashList(pSub, pSub->execTasks, type, NULL));
×
854
  }  
855

856
  return code;
×
857
}
858

859
void schFreeJobImpl(void *job) {
893,741,141✔
860
  if (NULL == job) {
893,741,141✔
861
    return;
×
862
  }
863

864
  SSchJob *pJob = job;
893,741,141✔
865
  uint64_t queryId = pJob->queryId;
893,741,141✔
866
  int64_t  refId = pJob->refId;
893,749,186✔
867
  bool     isParentJob = SCH_IS_PARENT_JOB(pJob);
893,747,015✔
868

869
  qDebug("QID:0x%" PRIx64 ", begin to free sch job, jobId:0x%" PRIx64 ", pointer:%p, isParent:%d", queryId, refId, pJob, isParentJob);
893,748,556✔
870

871
  schDropJobAllTasks(pJob);
893,748,545✔
872

873
  if (SCH_JOB_GOT_SUB_JOBS(pJob)) {
893,732,663✔
874
    taosArrayDestroyP(pJob->subJobs, schFreeJobImpl);
76,772,508✔
875
  }
876

877
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
893,742,934✔
878
  for (int32_t i = 0; i < numOfLevels; ++i) {
1,918,833,260✔
879
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
1,025,089,248✔
880
    if (NULL == pLevel) {
1,025,084,197✔
881
      SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels);
×
882
      continue;
×
883
    }
884

885
    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
1,025,084,197✔
886
    for (int32_t j = 0; j < numOfTasks; ++j) {
2,147,483,647✔
887
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
1,279,873,549✔
888
      if (NULL == pLevel) {
1,279,867,512✔
889
        SCH_JOB_ELOG("fail to get the %dth task, taskNum:%d", j, numOfTasks);
×
890
        continue;
×
891
      }
892

893
      schFreeTask(pJob, pTask);
1,279,867,512✔
894
    }
895

896
    taosArrayDestroy(pLevel->subTasks);
1,025,078,331✔
897
  }
898

899
  schFreeFlowCtrl(pJob);
893,744,012✔
900

901
  taosHashCleanup(pJob->execTasks);
893,741,628✔
902
  //  taosHashCleanup(pJob->failTasks);
903
  //  taosHashCleanup(pJob->succTasks);
904
  taosHashCleanup(pJob->taskList);
893,737,611✔
905

906
  taosArrayDestroy(pJob->levels);
893,734,771✔
907
  taosArrayDestroy(pJob->dataSrcTasks);
893,731,971✔
908

909
  qExplainFreeCtx(pJob->explainCtx);
893,740,920✔
910

911
  destroyQueryExecRes(&pJob->execRes);
893,734,810✔
912

913
  if (isParentJob) {
893,744,072✔
914
    taosArrayDestroy(pJob->nodeList);
732,199,011✔
915
    qDestroyQueryPlan(pJob->pDag);
732,194,802✔
916
    qDebug("QID:0x%" PRIx64 " pDag: %p destroyed", queryId, pJob->pDag);
732,182,777✔
917
    (void)nodesReleaseAllocatorWeakRef(pJob->allocatorRefId);  // ignore error
732,183,693✔
918
  }
919
  
920
  taosMemoryFreeClear(pJob->userRes.execRes);
893,750,106✔
921
  taosMemoryFreeClear(pJob->fetchRes);
893,751,909✔
922
  taosMemoryFreeClear(pJob->sql);
893,746,710✔
923
  int32_t code = tsem_destroy(&pJob->rspSem);
893,724,319✔
924
  if (code) {
893,736,418✔
925
    qError("tsem_destroy failed, error:%s", tstrerror(code));
×
926
  }
927
  taosMemoryFree(pJob);
893,736,418✔
928

929
  if (refId > 0 && isParentJob) {
893,693,450✔
930
    int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
732,150,115✔
931
    if (jobNum == 0) {
732,200,669✔
932
      schCloseJobRef();
606,721,835✔
933
    }
934
  }
935

936
  qDebug("QID:0x%" PRIx64 ", sch job freed, jobId:0x%" PRIx64 ", pointer:%p, isParent:%d", queryId, refId, pJob, isParentJob);
893,744,004✔
937
}
938

939
int32_t schJobFetchRows(SSchJob *pJob) {
272,762,949✔
940
  int32_t code = 0;
272,762,949✔
941

942
  if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC) && !(SCH_IS_EXPLAIN_JOB(pJob) && SCH_IS_INSERT_JOB(pJob))) {
272,762,949✔
943
    SCH_ERR_RET(schLaunchFetchTask(pJob));
261,594,942✔
944

945
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
261,594,929✔
946
      SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
7,840✔
947
      code = tsem_wait(&pJob->rspSem);
7,840✔
948
      if (code) {
7,840✔
949
        qError("tsem_wait for fetch rspSem failed, error:%s", tstrerror(code));
×
950
        SCH_RET(code);
×
951
      }
952

953
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
7,840✔
954
    }
955
  } else {
956
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
11,168,020✔
957
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
×
958
    } else {
959
      schPostJobRes(pJob, SCH_OP_FETCH);
11,168,020✔
960
    }
961
  }
962

963
  SCH_RET(code);
272,755,048✔
964
}
965

966
int32_t schInitSubJob(SSchJob* pParent, SQueryPlan* pDag, int32_t subJobId, SSchJob** ppRes, SSchedulerReq *pReq) {
161,508,861✔
967
  int32_t  code = 0;
161,508,861✔
968
  int64_t  refId = -1;
161,508,861✔
969
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
161,508,861✔
970
  if (NULL == pJob) {
161,509,900✔
971
    qError("QID:0x%" PRIx64 ", calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
×
972
    SCH_ERR_JRET(terrno);
×
973
  }
974

975
  if (pDag->subSql) {
161,509,900✔
976
    pJob->sql = taosStrdup(pDag->subSql);
161,523,669✔
977
    if (NULL == pJob->sql) {
161,538,224✔
978
      qError("QID:0x%" PRIx64 ", strdup subSql %s failed", pDag->queryId, pDag->subSql);
×
979
      SCH_ERR_JRET(terrno);
×
980
    }
981
  }
982

983
  pJob->parent = pParent;
161,524,455✔
984
  pJob->queryId = pParent->queryId;
161,524,455✔
985
  pJob->seriesId = pParent->seriesId;
161,524,455✔
986
  pJob->refId = pParent->refId;
161,524,455✔
987
  pJob->subJobId = subJobId;
161,524,455✔
988
  pJob->attr.explainMode = pParent->attr.explainMode;
161,524,455✔
989
  pJob->attr.localExec = false;
161,524,455✔
990
  pJob->conn = pParent->conn;
161,524,455✔
991
  
992
  // TODO COPY SQL
993
  /*
994
  pJob->sql = taosStrdup(pReq->sql);
995
  if (NULL == pJob->sql) {
996
    qError("QID:0x%" PRIx64 ", strdup sql %s failed", pReq->pDag->queryId, pReq->sql);
997
    SCH_ERR_JRET(terrno);
998
  }
999
  */
1000
  
1001
  pJob->pDag = pDag;
161,524,455✔
1002
  pJob->chkKillFp = pParent->chkKillFp;
161,524,455✔
1003
  pJob->chkKillParam = pParent->chkKillParam;
161,524,455✔
1004
  pJob->userRes.execFp = pParent->userRes.execFp;
161,524,455✔
1005
  pJob->userRes.cbParam = pParent->userRes.cbParam;
161,524,455✔
1006
  pJob->source       = pParent->source;
161,524,455✔
1007
  pJob->secureDelete = pParent->secureDelete;
161,524,455✔
1008
  pJob->pWorkerCb    = pParent->pWorkerCb;
161,524,455✔
1009
  pJob->nodeList = pParent->nodeList;
161,524,455✔
1010

1011
  qDebug("QID:0x%" PRIx64 " subJob %d init with pTrans:%p, pJob:%p, pDag:%p, subQType:%d", 
161,524,455✔
1012
    pParent->queryId, subJobId, pJob->conn.pTrans, pJob, pJob->pDag, pJob->pDag->subQType);
1013

1014
  pJob->taskList = taosHashInit(SCH_GET_TASK_CAPACITY(pDag->numOfSubplans), taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
161,524,455✔
1015
                                HASH_ENTRY_LOCK);
1016
  if (NULL == pJob->taskList) {
161,545,273✔
1017
    SCH_JOB_ELOG("taosHashInit %d taskList failed", SCH_GET_TASK_CAPACITY(pDag->numOfSubplans));
×
1018
    SCH_ERR_JRET(terrno);
×
1019
  }
1020

1021
  SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
161,545,273✔
1022

1023
  pJob->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
161,523,758✔
1024
                                 HASH_ENTRY_LOCK);
1025
  if (NULL == pJob->execTasks) {
161,543,899✔
1026
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
×
1027
    SCH_ERR_JRET(terrno);
×
1028
  }
1029

1030
  if (tsem_init(&pJob->rspSem, 0, 0)) {
161,543,899✔
1031
    SCH_JOB_ELOG("tsem_init failed, errno:%d", ERRNO);
×
1032
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
1033
  }
1034

1035
  SCH_JOB_TLOG("jobId:0x%" PRIx64 "-%d, job created", pJob->refId, subJobId);
161,517,148✔
1036

1037
  *ppRes = pJob;
161,515,892✔
1038

1039
  return TSDB_CODE_SUCCESS;
161,515,892✔
1040

1041
_return:
×
1042

1043
  if (NULL == pJob) {
×
1044
    qDestroyQueryPlan(pDag);
×
1045
  } else {
1046
    schFreeJobImpl(pJob);
×
1047
  }
1048
  
1049
  SCH_RET(code);
×
1050
}
1051

1052
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
732,128,500✔
1053
  int32_t  code = 0;
732,128,500✔
1054
  int32_t  i = 0;
732,128,500✔
1055
  int64_t  refId = -1;
732,128,500✔
1056
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
732,128,500✔
1057
  if (NULL == pJob) {
732,082,980✔
1058
    qError("QID:0x%" PRIx64 ", calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
×
1059
    SCH_ERR_JRET(terrno);
×
1060
  }
1061

1062
  pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
732,082,980✔
1063
  pJob->attr.localExec = pReq->localReq;
732,105,743✔
1064
  pJob->conn = *pReq->pConn;
732,142,375✔
1065
  
1066
  if (pReq->sql) {
732,132,884✔
1067
    pJob->sql = taosStrdup(pReq->sql);
732,198,165✔
1068
    if (NULL == pJob->sql) {
732,160,257✔
1069
      qError("QID:0x%" PRIx64 ", strdup sql %s failed", pReq->pDag->queryId, pReq->sql);
×
1070
      SCH_ERR_JRET(terrno);
×
1071
    }
1072
  }
1073
  pJob->pDag = pReq->pDag;
732,154,798✔
1074
  if (pReq->allocatorRefId > 0) {
732,161,963✔
1075
    pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
255,092,796✔
1076
    if (pJob->allocatorRefId <= 0) {
255,128,662✔
1077
      qError("QID:0x%" PRIx64 ", nodesMakeAllocatorWeakRef failed", pReq->pDag->queryId);
×
1078
      SCH_ERR_JRET(terrno);
×
1079
    }
1080
  }
1081
  pJob->chkKillFp = pReq->chkKillFp;
732,194,805✔
1082
  pJob->chkKillParam = pReq->chkKillParam;
732,186,707✔
1083
  pJob->userRes.execFp = pReq->execFp;
732,172,434✔
1084
  pJob->userRes.cbParam = pReq->cbParam;
732,183,732✔
1085
  pJob->source       = pReq->source;
732,151,074✔
1086
  pJob->secureDelete = pReq->secureDelete;
732,176,976✔
1087
  pJob->pWorkerCb    = pReq->pWorkerCb;
732,160,130✔
1088
  pJob->subJobId = -1;
732,179,067✔
1089
  pJob->queryId = pReq->pDag->queryId;
732,174,800✔
1090
  (void)atomic_add_fetch_64(&pJob->seriesId, 1);
732,178,623✔
1091

1092
  if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
732,185,598✔
1093
    qTrace("QID:0x%" PRIx64 ", input exec nodeList is empty", pReq->pDag->queryId);
528,768,789✔
1094
  } else {
1095
    pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL);
203,410,398✔
1096
    if (NULL == pJob->nodeList) {
203,425,151✔
1097
      qError("QID:0x%" PRIx64 ", taosArrayDup failed, origNum:%d", pReq->pDag->queryId,
×
1098
             (int32_t)taosArrayGetSize(pReq->pNodeList));
1099
      SCH_ERR_JRET(terrno);
×
1100
    }
1101
  }
1102

1103
  pJob->taskList = taosHashInit(SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans), taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
732,185,071✔
1104
                                HASH_ENTRY_LOCK);
1105
  if (NULL == pJob->taskList) {
732,191,546✔
1106
    SCH_JOB_ELOG("taosHashInit %d taskList failed", SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans));
×
1107
    SCH_ERR_JRET(terrno);
×
1108
  }
1109

1110
  pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
732,192,162✔
1111
  if (pJob->refId < 0) {
732,192,747✔
1112
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
×
1113
    SCH_ERR_JRET(terrno);
×
1114
  }
1115

1116
  if (pReq->pDag->pChildren && pReq->pDag->pChildren->length > 0) {
732,191,977✔
1117
    if (pReq->localReq) {
76,762,492✔
1118
      SCH_JOB_ELOG("local policy not supported for query with subJobs, subJobNum:%d", pReq->pDag->pChildren->length);
×
1119
      SCH_ERR_JRET(TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_POLICY);
×
1120
    }
1121
    
1122
    pJob->subJobs = taosArrayInit_s(POINTER_BYTES, pReq->pDag->pChildren->length);
76,762,492✔
1123
    if (NULL == pJob->subJobs) {
76,750,606✔
1124
      SCH_JOB_ELOG("taosArrayInit %d subJobs failed", pReq->pDag->pChildren->length);
5,570✔
1125
      SCH_ERR_JRET(terrno);
5,570✔
1126
    }
1127
    
1128
    SNode* pNode = NULL;
76,745,036✔
1129
    SSchJob* pSubJob = NULL;
76,745,036✔
1130
    FOREACH(pNode, pReq->pDag->pChildren) {
238,274,470✔
1131
      SCH_ERR_JRET(schInitSubJob(pJob, (SQueryPlan*)pNode, i, &pSubJob, pReq));
161,534,392✔
1132
      taosArraySet(pJob->subJobs, i, &pSubJob);
161,522,428✔
1133
      i++;
161,529,434✔
1134
    }
1135
  }
1136

1137
  qDebug("QID:0x%" PRIx64 " subJob %d init with pTrans:%p, pJob:%p, subJobNum:%d, pDag:%p", 
732,152,595✔
1138
    pJob->queryId, pJob->subJobId, pJob->conn.pTrans, pJob, (int32_t)taosArrayGetSize(pJob->subJobs), pJob->pDag);
1139
  
1140
  SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob));
732,154,126✔
1141

1142
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
732,097,890✔
1143
    SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
15,849,808✔
1144
  }
1145

1146
  pJob->execTasks = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
732,117,418✔
1147
                                 HASH_ENTRY_LOCK);
1148
  if (NULL == pJob->execTasks) {
732,148,460✔
1149
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
×
1150
    SCH_ERR_JRET(terrno);
×
1151
  }
1152

1153
  if (tsem_init(&pJob->rspSem, 0, 0)) {
732,138,680✔
1154
    SCH_JOB_ELOG("tsem_init failed, errno:%d", ERRNO);
×
1155
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
1156
  }
1157

1158
  (void)atomic_add_fetch_32(&schMgmt.jobNum, 1);
732,104,463✔
1159

1160
  *pJobId = pJob->refId;
732,136,046✔
1161

1162
  SCH_JOB_TLOG("jobId:0x%" PRIx64 ", job created", pJob->refId);
732,135,264✔
1163

1164
  return TSDB_CODE_SUCCESS;
732,124,453✔
1165

1166
_return:
55,010✔
1167

1168
  if (NULL == pJob) {
55,010✔
1169
    qDestroyQueryPlan(pReq->pDag);
×
1170
  } else if (pJob->refId <= 0) {
55,010✔
1171
    schFreeJobImpl(pJob);
×
1172
  } else {
1173
    int32_t tmpCode = taosRemoveRef(schMgmt.jobRef, pJob->refId);
55,010✔
1174
    if (tmpCode) {
55,010✔
1175
      SCH_JOB_DLOG("jobId:0x%" PRIx64 ", taosRemoveRef job from jobRef, error:%s", pJob->refId, tstrerror(tmpCode));
×
1176
    }
1177
  }
1178

1179
  SCH_RET(code);
55,010✔
1180
}
1181

1182
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
732,087,200✔
1183
  int32_t code = 0;
732,087,200✔
1184
  qTrace("QID:0x%" PRIx64 ", jobId:0x%" PRIx64 ", sch job started", pReq->pDag->queryId, pJob->refId);
732,087,200✔
1185

1186
  SCH_ERR_RET(schLaunchJob(pJob));
732,087,200✔
1187

1188
  if (pReq->syncReq) {
732,138,168✔
1189
    SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
8,178,581✔
1190
    code = tsem_wait(&pJob->rspSem);
8,178,581✔
1191
    if (code) {
8,178,463✔
1192
      qError("QID:0x%" PRIx64 ", tsem_wait sync rspSem failed, error:%s", pReq->pDag->queryId, tstrerror(code));
×
1193
      SCH_ERR_RET(code);
×
1194
    }
1195
  }
1196

1197
  SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
732,138,320✔
1198

1199
  return TSDB_CODE_SUCCESS;
732,138,558✔
1200
}
1201

1202
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
5,958,423✔
1203
  if (NULL == pReq || pReq->syncReq) {
5,958,423✔
1204
    return;
5,903,413✔
1205
  }
1206

1207
  if (pReq->execFp) {
55,010✔
1208
    (*pReq->execFp)(NULL, pReq->cbParam, errCode);
55,010✔
1209
  } else if (pReq->fetchFp) {
×
1210
    (*pReq->fetchFp)(NULL, pReq->cbParam, errCode);
×
1211
  }
1212
}
1213

1214
int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) {
4,172,827✔
1215
  if (pJob->status >= JOB_TASK_STATUS_PART_SUCC) {
4,172,827✔
1216
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
×
1217
    if (pJob->fetched) {
×
1218
      SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
1219
      pJob->noMoreRetry = true;
×
1220
      SCH_JOB_ELOG("already fetched while got error %s", tstrerror(rspCode));
×
1221
      SCH_ERR_RET(rspCode);
×
1222
    }
1223

1224
    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
1225
    SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC));
×
1226
  }
1227

1228
  return TSDB_CODE_SUCCESS;
4,172,827✔
1229
}
1230

1231
int32_t schResetJobForRetry(SSchJob *pJob, SSchTask *pTask, int32_t rspCode, bool *inRetry) {
4,172,827✔
1232
  int32_t code = 0;
4,172,827✔
1233
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
4,172,827✔
1234

1235
  while (true) {
216,059✔
1236
    if (pTask->seriesId < atomic_load_64(&pJob->seriesId)) {
4,388,886✔
1237
      SCH_TASK_DLOG("task sId %" PRId64 " is smaller than current job sId %" PRId64, pTask->seriesId, pJob->seriesId);
×
1238
      return TSDB_CODE_SCH_IGNORE_ERROR;
×
1239
    }
1240

1241
    int8_t origInRetry = atomic_val_compare_exchange_8(&pJob->inRetry, 0, 1);
4,388,886✔
1242
    if (0 != origInRetry) {
4,388,886✔
1243
      SCH_JOB_DLOG("job already in retry, origInRetry: %d", pJob->inRetry);
216,059✔
1244
      taosUsleep(1);
216,059✔
1245
      continue;
216,059✔
1246
    }
1247

1248
    if (pTask->seriesId < atomic_load_64(&pJob->seriesId)) {
4,172,827✔
1249
      SCH_TASK_DLOG("task sId:%" PRId64 " is smaller than current job sId:%" PRId64, pTask->seriesId, pJob->seriesId);
×
1250
      return TSDB_CODE_SCH_IGNORE_ERROR;
×
1251
    }
1252

1253
    break;
4,172,827✔
1254
  }
1255

1256
  *inRetry = true;
4,172,827✔
1257
  SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode));
4,172,827✔
1258

1259
  (void)atomic_add_fetch_64(&pJob->seriesId, 1);
4,172,827✔
1260

1261
  for (int32_t i = 0; i < numOfLevels; ++i) {
8,347,114✔
1262
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
4,174,287✔
1263
    if (NULL == pLevel) {
4,174,287✔
1264
      SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels);
×
1265
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1266
    }
1267

1268
    pLevel->taskExecDoneNum = 0;
4,174,287✔
1269
    pLevel->taskLaunchedNum = 0;
4,174,287✔
1270

1271
    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
4,174,287✔
1272
    for (int32_t j = 0; j < numOfTasks; ++j) {
8,364,134✔
1273
      SSchTask *p = taosArrayGet(pLevel->subTasks, j);
4,189,847✔
1274
      if (NULL == p) {
4,189,847✔
1275
        SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", j, i, numOfTasks);
×
1276
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1277
      }
1278

1279
      SCH_LOCK_TASK(p);
4,189,847✔
1280

1281
      code = schChkUpdateRedirectCtx(pJob, p, (p->taskId != pTask->taskId));
4,189,847✔
1282
      if (TSDB_CODE_SUCCESS != code) {
4,189,847✔
1283
        SCH_UNLOCK_TASK(p);
×
1284
        SCH_RET(code);
×
1285
      }
1286

1287
      schResetTaskForRetry(pJob, p);
4,189,847✔
1288

1289
      SCH_LOCK(SCH_WRITE, &p->planLock);
4,189,847✔
1290
      qClearSubplanExecutionNode(p->plan);
4,189,847✔
1291
      SCH_UNLOCK(SCH_WRITE, &p->planLock);
4,189,847✔
1292

1293
      SCH_UNLOCK_TASK(p);
4,189,847✔
1294
    }
1295
  }
1296

1297
  SCH_RESET_JOB_LEVEL_IDX(pJob);
4,172,827✔
1298
  
1299
  SCH_JOB_DLOG("update job sId:%" PRId64 ", levelIdx:%d", pJob->seriesId, pJob->levelIdx);
4,172,827✔
1300

1301
  return TSDB_CODE_SUCCESS;
4,172,827✔
1302
}
1303

1304
int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
4,286,944✔
1305
  int32_t code = 0;
4,286,944✔
1306
  bool    inRetry = false;
4,286,944✔
1307

1308
  taosMemoryFreeClear(pMsg->pData);
4,286,944✔
1309
  taosMemoryFreeClear(pMsg->pEpSet);
4,286,944✔
1310

1311
  SCH_UNLOCK_TASK(pTask);
4,286,944✔
1312
  SCH_TASK_DLOG("start to restart all tasks by error:%s from TID:0x%" PRIx64, tstrerror(rspCode), pTask->taskId);
4,286,944✔
1313

1314
  SCH_ERR_JRET(schFailedTaskNeedRetry(pTask, pJob, rspCode));
4,286,944✔
1315
  SCH_ERR_JRET(schResetJobForRetry(pJob, pTask, rspCode, &inRetry));
4,172,827✔
1316

1317
  SCH_ERR_JRET(schLaunchJob(pJob));
4,172,827✔
1318

1319
  SCH_LOCK_TASK(pTask);
4,172,827✔
1320
  atomic_store_8(&pJob->inRetry, 0);
4,172,827✔
1321

1322
  SCH_RET(code);
4,172,827✔
1323

1324
_return:
114,117✔
1325
  SCH_LOCK_TASK(pTask);
114,117✔
1326
  code = schProcessOnTaskFailure(pJob, pTask, code);
114,117✔
1327

1328
  if (inRetry) {
114,117✔
1329
    atomic_store_8(&pJob->inRetry, 0);
×
1330
  }
1331
  
1332
  SCH_RET(code);
114,117✔
1333
}
1334

1335
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
992,030,422✔
1336
  bool r = false;
992,030,422✔
1337
  SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
992,030,422✔
1338
  if (sync >= 0) {
992,019,327✔
1339
    r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync);
272,762,875✔
1340
  } else {
1341
    r = (pJob->opStatus.op == op);
719,256,452✔
1342
  }
1343
  SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock);
992,031,144✔
1344

1345
  return r;
992,028,124✔
1346
}
1347

1348
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode) {
1,020,262,865✔
1349
  int32_t op = 0;
1,020,262,865✔
1350

1351
  switch (type) {
1,020,262,865✔
1352
    case SCH_OP_EXEC:
732,150,169✔
1353
      if (pReq && pReq->syncReq) {
732,150,169✔
1354
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
8,179,511✔
1355
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
8,177,804✔
1356
        if (SCH_OP_NULL == op || op != type) {
8,178,856✔
1357
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
×
1358
                       jobTaskStatusStr(pJob->status));
1359
        }
1360
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
8,178,856✔
1361
        schDumpJobExecRes(pJob, pReq->pExecRes);
8,178,415✔
1362
      }
1363
      break;
732,134,460✔
1364
    case SCH_OP_FETCH:
272,762,811✔
1365
      if (pReq && pReq->syncReq) {
272,762,811✔
1366
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
7,840✔
1367
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
7,840✔
1368
        if (SCH_OP_NULL == op || op != type) {
7,840✔
1369
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
×
1370
                       jobTaskStatusStr(pJob->status));
1371
        }
1372
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
7,840✔
1373
      }
1374
      break;
272,762,888✔
1375
    case SCH_OP_GET_STATUS:
15,364,335✔
1376
      errCode = TSDB_CODE_SUCCESS;
15,364,335✔
1377
      break;
15,364,335✔
1378
    default:
×
1379
      break;
×
1380
  }
1381

1382
  if (errCode) {
1,020,247,233✔
1383
    (void)schHandleJobFailure(pJob, errCode);  // handle internal
×
1384
  }
1385

1386
  SCH_JOB_TLOG("job end %s operation with code:%s", schGetOpStr(type), tstrerror(errCode));
1,020,247,233✔
1387
}
1,020,247,233✔
1388

1389
int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq) {
1,020,222,920✔
1390
  int32_t code = 0;
1,020,222,920✔
1391
  int8_t  status = SCH_GET_JOB_STATUS(pJob);
1,020,222,920✔
1392

1393
  switch (type) {
1,020,228,821✔
1394
    case SCH_OP_EXEC:
732,102,871✔
1395
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
732,102,871✔
1396
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
732,090,826✔
1397
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
×
1398
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
×
1399
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
×
1400
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
1401
      }
1402

1403
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
732,118,393✔
1404

1405
      pJob->opStatus.syncReq = pReq->syncReq;
732,115,512✔
1406
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
732,118,929✔
1407
      break;
732,069,065✔
1408
    case SCH_OP_FETCH:
272,762,949✔
1409
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
272,762,949✔
1410
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
272,762,960✔
1411
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
×
1412
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
×
1413
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
×
1414
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
1415
      }
1416

1417
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
272,762,927✔
1418

1419
      pJob->userRes.fetchRes = pReq->pFetchRes;
272,762,927✔
1420
      pJob->userRes.fetchFp = pReq->fetchFp;
272,762,962✔
1421
      pJob->userRes.cbParam = pReq->cbParam;
272,762,899✔
1422

1423
      pJob->opStatus.syncReq = pReq->syncReq;
272,762,901✔
1424
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
272,762,949✔
1425

1426
      if (!SCH_JOB_NEED_FETCH(pJob)) {
272,762,962✔
1427
        SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
×
1428
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
1429
      }
1430

1431
      if (status != JOB_TASK_STATUS_PART_SUCC && status != JOB_TASK_STATUS_FETCH) {
272,762,949✔
1432
        SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
×
UNCOV
1433
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
×
1434
      }
1435

1436
      break;
272,762,949✔
1437
    case SCH_OP_GET_STATUS:
15,364,335✔
1438
      if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
15,364,335✔
1439
        qDebug("jobId:0x%" PRIx64 ", job not initialized or not executable job", pJob->refId);
9,203✔
1440
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
9,203✔
1441
      }
1442
      return TSDB_CODE_SUCCESS;
15,355,132✔
1443
    default:
×
1444
      SCH_JOB_ELOG("unknown operation type %d", type);
×
1445
      SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
1446
  }
1447

1448
  if (schJobNeedToStop(pJob, &status)) {
1,004,884,512✔
1449
    SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
×
1450
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1451
  }
1452

1453
  return TSDB_CODE_SUCCESS;
1,004,854,203✔
1454
}
1455

1456
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
1,474,962,633✔
1457
  if (pTask) {
1,474,962,633✔
1458
    SCH_UNLOCK_TASK(pTask);
1,474,981,574✔
1459
  }
1460

1461
  if (errCode) {
1,474,965,605✔
1462
    (void)schHandleJobFailure(pJob, errCode);  // ignore error
12,942,198✔
1463
  }
1464

1465
  if (pJob) {
1,474,965,583✔
1466
    (void)schReleaseJob(pJob->refId);  // ignore error
1,474,980,449✔
1467
  }
1468
}
1,475,005,737✔
1469

1470
int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, int32_t sjId, uint64_t tId) {
1,484,967,102✔
1471
  int32_t code = 0;
1,484,967,102✔
1472
  int8_t  status = 0;
1,484,967,102✔
1473

1474
  SSchTask *pTask = NULL;
1,484,968,811✔
1475
  SSchJob  *pJob = NULL;
1,484,965,492✔
1476

1477
  (void)schAcquireJob(rId, &pJob);
1,484,965,433✔
1478
  if (NULL == pJob) {
1,485,054,676✔
1479
    qWarn("QID:0x%" PRIx64 ", TID:0x%" PRIx64 " job doesn't exist, may be dropped, jobId:0x%" PRIx64, qId, tId, rId);
4,967,545✔
1480
    SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
4,967,545✔
1481
  }
1482

1483
  if (schJobNeedToStop(pJob, &status)) {
2,147,483,647✔
1484
    SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
5,055,094✔
1485
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
5,055,409✔
1486
  }
1487

1488
  if (sjId >= 0) {
1,475,017,921✔
1489
    pJob = taosArrayGetP(pJob->subJobs, sjId);
196,719,709✔
1490
    if (NULL == pJob) {
196,631,205✔
1491
      qWarn("QID:0x%" PRIx64 ", TID:0x%" PRIx64 " SJID:%d sub job doesn't exist, jobId:0x%" PRIx64, qId, tId, sjId, rId);
3,920✔
1492
      SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
3,920✔
1493
    }
1494

1495
    if (schJobNeedToStop(pJob, &status)) {
393,364,270✔
1496
      SCH_TASK_DLOG("will not do further processing cause of sub job status %s", jobTaskStatusStr(status));
1,979✔
1497
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
1,979✔
1498
    }
1499
  }
1500

1501
  SCH_ERR_JRET(schGetTaskInJob(pJob, tId, &pTask));
1,475,033,218✔
1502

1503
  SCH_LOCK_TASK(pTask);
1,474,859,385✔
1504

1505
  *job = pJob;
1,474,959,574✔
1506
  *task = pTask;
1,474,958,402✔
1507

1508
  return TSDB_CODE_SUCCESS;
1,474,961,066✔
1509

1510
_return:
5,061,308✔
1511

1512
  if (pTask) {
5,061,308✔
1513
    SCH_UNLOCK_TASK(pTask);
×
1514
  }
1515
  if (pJob) {
5,061,308✔
1516
    (void)schReleaseJob(rId);  // ignore error
5,057,388✔
1517
  }
1518

1519
  SCH_RET(code);
5,061,308✔
1520
}
1521

1522
void schSetJobType(SSchJob *pJob, ESubplanType type) {
1,279,790,172✔
1523
  if (type == SUBPLAN_TYPE_MODIFY) {
1,279,790,172✔
1524
    pJob->attr.type |= JOB_TYPE_INSERT;
557,986,511✔
1525
  } else {
1526
    if (type == SUBPLAN_TYPE_HSYSSCAN) {
721,803,661✔
1527
      pJob->attr.type |= JOB_TYPE_HQUERY;
5,943,180✔
1528
    } else {
1529
      pJob->attr.type |= JOB_TYPE_QUERY;
715,860,481✔
1530
    }
1531
  }
1532
}
1,279,812,293✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc