• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.91
/source/libs/scheduler/src/schJob.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "command.h"
18
#include "query.h"
19
#include "schInt.h"
20
#include "tmsg.h"
21
#include "tref.h"
22
#include "trpc.h"
23

24
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
139,705✔
25
  if (TSDB_CODE_SUCCESS == errCode) {
139,705✔
26
    return;
118,365✔
27
  }
28

29
  int32_t origCode = atomic_load_32(&pJob->errCode);
21,340✔
30
  if (TSDB_CODE_SUCCESS == origCode) {
21,342✔
31
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
19,549!
32
      goto _return;
19,555✔
33
    }
34

35
    origCode = atomic_load_32(&pJob->errCode);
×
36
  }
37

38
  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
1,793!
39
    return;
13✔
40
  }
41

42
  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
1,780!
43
    atomic_store_32(&pJob->errCode, errCode);
×
44
    goto _return;
×
45
  }
46

47
  return;
1,780✔
48

49
_return:
19,555✔
50

51
  SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode));
19,555✔
52
}
53

54
bool schJobDone(SSchJob *pJob) {
504,221✔
55
  int8_t status = SCH_GET_JOB_STATUS(pJob);
504,221✔
56

57
  return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || status == JOB_TASK_STATUS_SUCC);
504,392!
58
}
59

60
FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
165,597✔
61
  int8_t status = SCH_GET_JOB_STATUS(pJob);
322,505✔
62
  if (pStatus) {
504,346!
63
    *pStatus = status;
504,446✔
64
  }
65

66
  if (schJobDone(pJob)) {
504,346!
67
    return true;
393✔
68
  }
69

70
  if (pJob->chkKillFp && (*pJob->chkKillFp)(pJob->chkKillParam)) {
504,138!
71
    schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
×
72
    return true;
×
73
  }
74

75
  return false;
504,269✔
76
}
77

78
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
588,441✔
79
  int32_t code = 0;
588,441✔
80

81
  int8_t oriStatus = 0;
588,441✔
82

83
  while (true) {
84
    oriStatus = SCH_GET_JOB_STATUS(pJob);
588,441✔
85

86
    if (oriStatus == newStatus) {
588,537✔
87
      if (JOB_TASK_STATUS_FETCH == newStatus) {
1,066!
88
        return code;
1,066✔
89
      }
90

91
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
92
    }
93

94
    switch (oriStatus) {
587,471!
95
      case JOB_TASK_STATUS_NULL:
137,962✔
96
        if (newStatus != JOB_TASK_STATUS_INIT) {
137,962!
97
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
98
        }
99

100
        break;
137,962✔
101
      case JOB_TASK_STATUS_INIT:
138,052✔
102
        if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_DROP) {
138,052!
103
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
104
        }
105

106
        break;
138,052✔
107
      case JOB_TASK_STATUS_EXEC:
138,025✔
108
        if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_FAIL &&
138,025!
109
            newStatus != JOB_TASK_STATUS_DROP) {
110
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
111
        }
112

113
        break;
138,025✔
114
      case JOB_TASK_STATUS_PART_SUCC:
137,138✔
115
        if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
137,138!
116
            newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC &&
17,814!
117
            newStatus != JOB_TASK_STATUS_FETCH) {
118
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
119
        }
120

121
        break;
137,138✔
122
      case JOB_TASK_STATUS_FETCH:
17,814✔
123
        if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
17,814!
124
            newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC &&
×
125
            newStatus != JOB_TASK_STATUS_FETCH) {
126
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
127
        }
128

129
        break;
17,814✔
130
      case JOB_TASK_STATUS_SUCC:
18,587✔
131
      case JOB_TASK_STATUS_FAIL:
132
        if (newStatus != JOB_TASK_STATUS_DROP) {
18,587!
133
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
134
        }
135

136
        break;
18,587✔
137
      case JOB_TASK_STATUS_DROP:
×
138
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
×
139
        break;
×
140

141
      default:
×
142
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
×
143
        SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
144
    }
145

146
    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
587,471!
147
      continue;
×
148
    }
149

150
    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
587,571✔
151

152
    break;
587,583✔
153
  }
154

155
  return TSDB_CODE_SUCCESS;
587,583✔
156

157
_return:
×
158

159
  if (TSDB_CODE_SCH_IGNORE_ERROR == code) {
×
160
    SCH_JOB_DLOG("ignore job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
×
161
  } else {
162
    SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
×
163
  }
164
  SCH_RET(code);
×
165
}
166

167
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
137,963✔
168
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
284,416✔
169
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
146,395✔
170
    if (NULL == pLevel) {
146,480!
171
      SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, pJob->levelNum);
×
172
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
173
    }
174

175
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
314,791✔
176
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
168,338✔
177
      if (NULL == pTask) {
168,406!
178
        SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum: %d", m, pLevel->level, pLevel->taskNum);
×
179
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
180
      }
181

182
      SSubplan *pPlan = pTask->plan;
168,400✔
183
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
168,400✔
184
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
168,400✔
185

186
      if (childNum > 0) {
168,400✔
187
        if (pJob->levelIdx == pLevel->level) {
8,491!
188
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
×
189
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
190
        }
191

192
        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
8,491✔
193
        if (NULL == pTask->children) {
8,491✔
194
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
82!
195
          SCH_ERR_RET(terrno);
82!
196
        }
197
      }
198

199
      for (int32_t n = 0; n < childNum; ++n) {
191,137✔
200
        SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
22,819✔
201
        if (NULL == child) {
22,819!
202
          SCH_JOB_ELOG("fail to get the %dth child subplan, childNum: %d", n, childNum);
×
203
          SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
204
        }
205

206
        if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(child)) {
22,819!
207
          SCH_JOB_ELOG("invalid subplan type for the %dth child, level:%d, subplanNodeType:%d", n, i, nodeType(child));
×
208
          SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
209
        }
210

211
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
22,819✔
212
        if (NULL == childTask || NULL == *childTask) {
22,819!
213
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
×
214
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
215
        }
216

217
        if (NULL == taosArrayPush(pTask->children, childTask)) {
45,638!
218
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
×
219
          SCH_ERR_RET(terrno);
×
220
        }
221

222
        SCH_TASK_DLOG("children info, the %d child TID 0x%" PRIx64, n, (*childTask)->taskId);
22,819!
223
      }
224

225
      if (parentNum > 0) {
168,318✔
226
        if (0 == pLevel->level) {
22,819!
227
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
×
228
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
229
        }
230

231
        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
22,819✔
232
        if (NULL == pTask->parents) {
22,819!
233
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
×
234
          SCH_ERR_RET(terrno);
×
235
        }
236
      } else {
237
        if (0 != pLevel->level) {
145,499!
238
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
×
239
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
4!
240
        }
241
      }
242

243
      for (int32_t n = 0; n < parentNum; ++n) {
191,141✔
244
        SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
22,819✔
245
        if (NULL == parent) {
22,819!
246
          SCH_JOB_ELOG("fail to get the %dth parent subplan, parentNum: %d", n, parentNum);
×
247
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
248
        }
249

250
        if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(parent)) {
22,819!
251
          SCH_JOB_ELOG("invalid subplan type for the %dth parent, level:%d, subplanNodeType:%d", n, i, nodeType(parent));
×
252
          SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
253
        }
254

255
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
22,819✔
256
        if (NULL == parentTask || NULL == *parentTask) {
22,819!
257
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
×
258
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
259
        }
260

261
        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
45,638!
262
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
×
263
          SCH_ERR_RET(terrno);
×
264
        }
265

266
        SCH_TASK_DLOG("parents info, the %d parent TID 0x%" PRIx64, n, (*parentTask)->taskId);
22,819!
267
      }
268

269
      SCH_TASK_TLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
168,322!
270
    }
271
  }
272

273
  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
138,021✔
274
  if (NULL == pLevel) {
138,022!
275
    SCH_JOB_ELOG("fail to get level 0 level, levelNum:%d", (int32_t)taosArrayGetSize(pJob->levels));
×
276
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
277
  }
278

279
  if (SCH_IS_QUERY_JOB(pJob)) {
138,022✔
280
    if (pLevel->taskNum > 1) {
17,880!
281
      SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
×
282
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
283
    }
284

285
    SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
17,880✔
286
    if (NULL == pLevel) {
17,881!
287
      SCH_JOB_ELOG("fail to get the first task in level 0, taskNum:%d", (int32_t)taosArrayGetSize(pLevel->subTasks));
×
288
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
289
    }
290

291
    if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType || EXPLAIN_MODE_DISABLE != pJob->attr.explainMode) {
17,780!
292
      pJob->attr.needFetch = true;
17,873✔
293
    }
294
  }
295

296
  return TSDB_CODE_SUCCESS;
137,922✔
297
}
298

299
int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
168,385✔
300
  if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) {
168,385✔
301
    return TSDB_CODE_SUCCESS;
134,609✔
302
  }
303

304
  if (NULL == taosArrayPush(pJob->dataSrcTasks, &pTask)) {
67,600!
305
    SCH_ERR_RET(terrno);
×
306
  }
307

308
  return TSDB_CODE_SUCCESS;
33,824✔
309
}
310

311
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
137,861✔
312
  int32_t code = 0;
137,861✔
313
  pJob->queryId = pDag->queryId;
137,861✔
314

315
  if (pDag->numOfSubplans <= 0) {
137,861!
316
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
×
317
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
318
  }
319

320
  pJob->dataSrcTasks = taosArrayInit(SCH_GET_TASK_CAPACITY(pDag->numOfSubplans), POINTER_BYTES);
137,861✔
321
  if (NULL == pJob->dataSrcTasks) {
137,991✔
322
    SCH_ERR_RET(terrno);
3!
323
  }
324

325
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
137,988!
326
  if (levelNum <= 0) {
137,988!
327
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
×
328
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
329
  }
330

331
  SHashObj *planToTask = taosHashInit(
137,988✔
332
      SCH_GET_TASK_CAPACITY(pDag->numOfSubplans),
138,053✔
333
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
334
      HASH_NO_LOCK);
335
  if (NULL == planToTask) {
138,089!
336
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
×
337
    SCH_ERR_JRET(terrno);
×
338
  }
339

340
  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
138,089✔
341
  if (NULL == pJob->levels) {
137,939!
342
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
×
343
    SCH_ERR_JRET(terrno);
×
344
  }
345

346
  pJob->levelNum = levelNum;
137,939✔
347
  SCH_RESET_JOB_LEVEL_IDX(pJob);
137,939✔
348

349
  (void)atomic_add_fetch_64(&pJob->seriesId, 1);
137,939✔
350
  qDebug("QID:0x%" PRIx64 ", job seriesId set to SID:%" PRId64 ", levelIdx:%d", pJob->queryId, pJob->seriesId,
138,103✔
351
         pJob->levelIdx);
352

353
  SSchLevel      level = {0};
138,075✔
354
  SNodeListNode *plans = NULL;
138,075✔
355
  int32_t        taskNum = 0;
138,075✔
356
  int32_t        totalTaskNum = 0;
138,075✔
357
  SSchLevel     *pLevel = NULL;
138,075✔
358

359
  level.status = JOB_TASK_STATUS_INIT;
138,075✔
360

361
  for (int32_t i = 0; i < levelNum; ++i) {
284,656✔
362
    if (NULL == taosArrayPush(pJob->levels, &level)) {
293,043!
363
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
×
364
      SCH_ERR_JRET(terrno);
×
365
    }
366

367
    pLevel = taosArrayGet(pJob->levels, i);
146,537✔
368
    if (NULL == pLevel) {
146,427!
369
      SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum);
×
370
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
371
    }
372

373
    pLevel->level = i;
146,427✔
374

375
    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
146,427✔
376
    if (NULL == plans) {
146,538!
377
      SCH_JOB_ELOG("empty level plan, level:%d", i);
×
378
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
379
    }
380

381
    if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
146,538!
382
      SCH_JOB_ELOG("invalid level plan, level:%d, planNodeType:%d", i, nodeType(plans));
×
383
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
384
    }
385

386
    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
146,527!
387
    if (taskNum <= 0) {
146,527!
388
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
×
389
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
390
    }
391

392
    totalTaskNum += taskNum;
146,527✔
393
    if (totalTaskNum > pDag->numOfSubplans) {
146,527!
394
      SCH_JOB_ELOG("current totalTaskNum %d is bigger than numOfSubplans %d, level:%d", totalTaskNum, pDag->numOfSubplans, i);
×
395
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
396
    }
397
    
398
    pLevel->taskNum = taskNum;
146,527✔
399

400
    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
146,527✔
401
    if (NULL == pLevel->subTasks) {
146,495!
402
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
×
403
      SCH_ERR_JRET(terrno);
×
404
    }
405

406
    for (int32_t n = 0; n < taskNum; ++n) {
314,972✔
407
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
168,395✔
408

409
      SCH_ERR_JRET(schValidateSubplan(pJob, plan, pLevel->level, n, taskNum));
168,473!
410
      
411
      SCH_SET_JOB_TYPE(pJob, plan->subplanType);
168,387✔
412

413
      SSchTask  task = {0};
168,387✔
414
      SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
168,387✔
415
      if (NULL == pTask) {
168,400!
416
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
×
417
        SCH_ERR_JRET(terrno);
×
418
      }
419

420
      SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel));
168,400!
421

422
      SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
168,489!
423

424
      code = taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES);
168,431✔
425
      if (0 != code) {
168,472!
426
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d, error:%s", n, tstrerror(code));
×
427
        SCH_ERR_JRET(code);
×
428
      }
429

430
      code = taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
168,472✔
431
      if (0 != code) {
168,485!
432
        SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d, error:%s", n, tstrerror(code));
×
433
        SCH_ERR_JRET(code);
×
434
      }
435

436
      ++pJob->taskNum;
168,470✔
437
    }
438

439
    SCH_JOB_TLOG("level %d initialized, taskNum:%d", i, taskNum);
146,577✔
440
  }
441

442
  if (totalTaskNum != pDag->numOfSubplans) {
138,150!
443
    SCH_JOB_ELOG("totalTaskNum %d mis-match with numOfSubplans %d", totalTaskNum, pDag->numOfSubplans);
×
444
    SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
445
  }
446

447
  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
138,150!
448

449
_return:
137,974✔
450

451
  if (planToTask) {
137,974!
452
    taosHashCleanup(planToTask);
137,980✔
453
  }
454

455
  SCH_RET(code);
138,055!
456
}
457

458
void schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
138,006✔
459
  pRes->code = atomic_load_32(&pJob->errCode);
138,006✔
460
  pRes->numOfRows = pJob->resNumOfRows;
138,046✔
461

462
  SCH_LOCK(SCH_WRITE, &pJob->resLock);
138,046!
463
  pRes->res = pJob->execRes.res;
138,060✔
464
  pRes->msgType = pJob->execRes.msgType;
138,060✔
465
  pRes->numOfBytes = pJob->execRes.numOfBytes;
138,060✔
466
  pJob->execRes.res = NULL;
138,060✔
467
  SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
138,060!
468

469
  SCH_JOB_DLOG("exec result dumped, code:%s", tstrerror(pRes->code));
138,051✔
470
}
138,051✔
471

472
int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
18,880✔
473
  int32_t code = 0;
18,880✔
474

475
  SCH_LOCK(SCH_WRITE, &pJob->resLock);
18,880!
476

477
  pJob->fetched = true;
18,880✔
478

479
  if (pJob->fetchRes && ((SRetrieveTableRsp *)pJob->fetchRes)->completed) {
18,880!
480
    SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL));
17,808!
481
  }
482

483
  while (true) {
484
    *pData = atomic_load_ptr(&pJob->fetchRes);
18,880✔
485
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->fetchRes, *pData, NULL)) {
18,880!
486
      continue;
×
487
    }
488

489
    break;
18,880✔
490
  }
491

492
  if (NULL == *pData) {
18,880!
493
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
×
494
    if (NULL == rsp) {
×
495
      SCH_JOB_ELOG("malloc SRetrieveTableRsp %d failed, code:%x", (int32_t)sizeof(SRetrieveTableRsp), terrno);
×
496
      SCH_ERR_JRET(terrno);
×
497
    }
498

499
    rsp->completed = 1;
×
500

501
    *pData = rsp;
×
502
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
×
503
  }
504

505
  SCH_JOB_DLOG("fetch done, totalRows:%" PRId64, pJob->resNumOfRows);
18,880✔
506

507
_return:
18,880✔
508

509
  SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
18,880!
510

511
  return code;
18,880✔
512
}
513

514
int32_t schNotifyUserExecRes(SSchJob *pJob) {
116,681✔
515
  SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
116,681!
516
  if (NULL == pRes) {
116,696!
517
    qError("malloc execResult %d failed, error: %x", (int32_t)sizeof(SExecResult), terrno);
×
518
    SCH_RET(terrno);
×
519
  }
520

521
  schDumpJobExecRes(pJob, pRes);
116,696✔
522

523
  SCH_JOB_TLOG("sch start to invoke exec cb, code:%s", tstrerror(pJob->errCode));
116,714✔
524
  (*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
116,714✔
525
  SCH_JOB_TLOG("sch end from exec cb, code:%s", tstrerror(pJob->errCode));
116,743✔
526

527
  return TSDB_CODE_SUCCESS;
116,736✔
528
}
529

530
int32_t schNotifyUserFetchRes(SSchJob *pJob) {
18,760✔
531
  void *pRes = NULL;
18,760✔
532

533
  int32_t code = schDumpJobFetchRes(pJob, &pRes);
18,760✔
534
  if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == atomic_load_32(&pJob->errCode)) {
18,760!
535
    atomic_store_32(&pJob->errCode, code);
×
536
  }
537

538
  SCH_JOB_DLOG("sch start to invoke fetch cb, code:%s", tstrerror(pJob->errCode));
18,760✔
539
  (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
18,760✔
540
  SCH_JOB_DLOG("sch end from fetch cb, code:%s", tstrerror(pJob->errCode));
18,760✔
541

542
  return TSDB_CODE_SUCCESS;
18,760✔
543
}
544

545
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
294,779✔
546
  int32_t code = 0;
294,779✔
547
  SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
294,779!
548

549
  if (SCH_OP_NULL == pJob->opStatus.op) {
294,838✔
550
    SCH_JOB_TLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
137,930✔
551
    goto _return;
137,929✔
552
  }
553

554
  if (SCH_OP_NULL != op && pJob->opStatus.op != op) {
156,908!
555
    SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
×
556
    goto _return;
×
557
  }
558

559
  if (SCH_JOB_IN_SYNC_OP(pJob)) {
156,908✔
560
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
21,431!
561
    code = tsem_post(&pJob->rspSem);
21,445✔
562
    if (code) {
21,447!
563
      SCH_JOB_ELOG("tsem_post failed for syncOp, error:%s", tstrerror(code));
×
564
    }
565
  } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
135,477!
566
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
116,731!
567
    (void)schNotifyUserExecRes(pJob);  // ignore error
116,743✔
568
  } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) {
18,740!
569
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
18,760!
570
    (void)schNotifyUserFetchRes(pJob);  // ignore error
18,760✔
571
  } else {
572
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
×
573
    SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status));
×
574
  }
575

576
  return;
156,930✔
577

578
_return:
137,929✔
579

580
  SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
137,929!
581
}
582

583
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
138,780✔
584
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
138,780!
585
    schPostJobRes(pJob, 0);
×
586
    return TSDB_CODE_SCH_IGNORE_ERROR;
×
587
  }
588

589
  schUpdateJobErrCode(pJob, errCode);
138,780✔
590

591
  int32_t code = atomic_load_32(&pJob->errCode);
138,767✔
592
  if (code) {
138,784✔
593
    SCH_JOB_DLOG("job failed with error %s", tstrerror(code));
20,409✔
594
  }
595

596
  schPostJobRes(pJob, 0);
138,784✔
597

598
  SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
138,833!
599
}
600

601
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
967✔
602
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
967✔
603
    return TSDB_CODE_SCH_IGNORE_ERROR;
60✔
604
  }
605

606
  (void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode);  // ignore error
907✔
607

608
  return TSDB_CODE_SCH_IGNORE_ERROR;
907✔
609
}
610

611
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailure(pJob, errCode)); }
137,855!
612

613
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
137,882✔
614
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
137,882!
615
    return TSDB_CODE_SCH_IGNORE_ERROR;
×
616
  }
617

618
  (void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode);  // ignore error
137,882✔
619

620
  return TSDB_CODE_SCH_IGNORE_ERROR;
137,935✔
621
}
622

623
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
137,091✔
624
  if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) {
137,091!
625
    SCH_ERR_RET(schLaunchFetchTask(pJob));
×
626
  } else {
627
    schPostJobRes(pJob, 0);
137,155✔
628
  }
629

630
  return TSDB_CODE_SUCCESS;
137,146✔
631
}
632

633
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
17,626✔
634

635
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
×
636
  SCH_TASK_DLOG("got explain rsp, rows:%" PRId64 ", complete:%d", htobe64(pRsp->numOfRows), pRsp->completed);
×
637

638
  atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows));
×
639
  atomic_store_ptr(&pJob->fetchRes, pRsp);
×
640

641
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
×
642

643
  if (!SCH_IS_INSERT_JOB(pJob)) {
×
644
    schProcessOnDataFetched(pJob);
×
645
  }
646

647
  return TSDB_CODE_SUCCESS;
×
648
}
649

650
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
7,204✔
651
  if (!SCH_IS_QUERY_JOB(pJob)) {
7,204!
652
    return TSDB_CODE_SUCCESS;
×
653
  }
654

655
  SSchLevel *pLevel = pTask->level;
7,204✔
656
  int32_t    doneNum = atomic_load_32(&pLevel->taskExecDoneNum);
7,204✔
657
  if (doneNum == pLevel->taskNum) {
7,204!
658
    (void)atomic_sub_fetch_32(&pJob->levelIdx, 1);
7,204✔
659

660
    pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
7,204✔
661
    if (NULL == pLevel) {
7,204!
662
      SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, (int32_t)taosArrayGetSize(pJob->levels));
×
663
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
664
    }
665

666
    for (int32_t i = 0; i < pLevel->taskNum; ++i) {
15,404✔
667
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
8,200✔
668
      if (NULL == pTask) {
8,200!
669
        SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", i, pLevel->level, pLevel->taskNum);
×
670
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
671
      }
672

673
      if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
8,200!
674
        continue;
7,204✔
675
      }
676

677
      if (SCH_TASK_ALREADY_LAUNCHED(pTask)) {
996!
678
        continue;
×
679
      }
680

681
      SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
996!
682
    }
683
  }
684

685
  return TSDB_CODE_SUCCESS;
7,204✔
686
}
687

688
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
36,765✔
689
  if (rsp->tbVerInfo) {
36,765✔
690
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
29,448!
691

692
    if (NULL == pJob->execRes.res) {
29,449✔
693
      pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
15,690✔
694
      if (NULL == pJob->execRes.res) {
15,690!
695
        SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
696
        SCH_ERR_RET(terrno);
×
697
      }
698
    }
699

700
    if (NULL == taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0),
29,450!
701
                                  taosArrayGetSize(rsp->tbVerInfo))) {
29,449✔
702
      SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
703
      SCH_ERR_RET(terrno);
×
704
    }
705

706
    taosArrayDestroy(rsp->tbVerInfo);
29,450✔
707

708
    pJob->execRes.msgType = TDMT_SCH_QUERY;
29,450✔
709

710
    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
29,450!
711
  }
712

713
  return TSDB_CODE_SUCCESS;
36,768✔
714
}
715

716
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
181,703✔
717
  schGetTaskFromList(pJob->taskList, taskId, pTask);
181,703✔
718
  if (NULL == *pTask) {
181,734!
719
    SCH_JOB_ELOG("task not found in job task list, taskId:0x%" PRIx64, taskId);
×
720
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
721
  }
722

723
  return TSDB_CODE_SUCCESS;
181,732✔
724
}
725

726
int32_t schLaunchJob(SSchJob *pJob) {
137,865✔
727
  if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) {
137,865✔
728
    SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes));
1,254!
729
    SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
1,254!
730
  } else {
731
    SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
136,611✔
732
    if (NULL == level) {
136,676!
733
      SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, (int32_t)taosArrayGetSize(pJob->levels));
×
734
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
735
    }
736

737
    SCH_ERR_RET(schLaunchLevelTasks(pJob, level));
136,676!
738
  }
739

740
  return TSDB_CODE_SUCCESS;
137,991✔
741
}
742

743
void schDropJobAllTasks(SSchJob *pJob) {
138,025✔
744
  schDropTaskInHashList(pJob, pJob->execTasks);
138,025✔
745
  //  schDropTaskInHashList(pJob, pJob->succTasks);
746
  //  schDropTaskInHashList(pJob, pJob->failTasks);
747
}
138,030✔
748

749
int32_t schNotifyJobAllTasks(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) {
×
750
  SCH_RET(schNotifyTaskInHashList(pJob, pJob->execTasks, type, pTask));
×
751
}
752

753
void schFreeJobImpl(void *job) {
138,024✔
754
  if (NULL == job) {
138,024!
755
    return;
×
756
  }
757

758
  SSchJob *pJob = job;
138,024✔
759
  uint64_t queryId = pJob->queryId;
138,024✔
760
  int64_t  refId = pJob->refId;
138,024✔
761

762
  qTrace("QID:0x%" PRIx64 ", begin to free sch job, jobId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
138,024✔
763

764
  schDropJobAllTasks(pJob);
138,024✔
765

766
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
138,023✔
767
  for (int32_t i = 0; i < numOfLevels; ++i) {
284,483✔
768
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
146,492✔
769
    if (NULL == pLevel) {
146,469!
770
      SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels);
×
771
      continue;
×
772
    }
773

774
    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
146,469✔
775
    for (int32_t j = 0; j < numOfTasks; ++j) {
314,874✔
776
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
168,412✔
777
      if (NULL == pLevel) {
168,409!
778
        SCH_JOB_ELOG("fail to get the %dth task, taskNum:%d", j, numOfTasks);
×
779
        continue;
×
780
      }
781

782
      schFreeTask(pJob, pTask);
168,409✔
783
    }
784

785
    taosArrayDestroy(pLevel->subTasks);
146,462✔
786
  }
787

788
  schFreeFlowCtrl(pJob);
137,991✔
789

790
  taosHashCleanup(pJob->execTasks);
138,001✔
791
  //  taosHashCleanup(pJob->failTasks);
792
  //  taosHashCleanup(pJob->succTasks);
793
  taosHashCleanup(pJob->taskList);
137,998✔
794

795
  taosArrayDestroy(pJob->levels);
138,012✔
796
  taosArrayDestroy(pJob->nodeList);
137,998✔
797
  taosArrayDestroy(pJob->dataSrcTasks);
138,007✔
798

799
  qExplainFreeCtx(pJob->explainCtx);
138,016✔
800

801
  destroyQueryExecRes(&pJob->execRes);
138,027✔
802

803
  qDestroyQueryPlan(pJob->pDag);
138,012✔
804
  (void)nodesReleaseAllocatorWeakRef(pJob->allocatorRefId);  // ignore error
137,947✔
805

806
  taosMemoryFreeClear(pJob->userRes.execRes);
138,020!
807
  taosMemoryFreeClear(pJob->fetchRes);
138,020!
808
  taosMemoryFreeClear(pJob->sql);
138,020!
809
  int32_t code = tsem_destroy(&pJob->rspSem);
138,017✔
810
  if (code) {
137,946!
811
    qError("tsem_destroy failed, error:%s", tstrerror(code));
×
812
  }
813
  taosMemoryFree(pJob);
137,946!
814

815
  if (refId > 0) {
137,968!
816
    int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
137,972✔
817
    if (jobNum == 0) {
138,054✔
818
      schCloseJobRef();
43,427✔
819
    }
820
  }
821

822
  qTrace("QID:0x%" PRIx64 ", sch job freed, jobId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
138,050✔
823
}
824

825
int32_t schJobFetchRows(SSchJob *pJob) {
18,880✔
826
  int32_t code = 0;
18,880✔
827

828
  if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC) && !(SCH_IS_EXPLAIN_JOB(pJob) && SCH_IS_INSERT_JOB(pJob))) {
18,880!
829
    SCH_ERR_RET(schLaunchFetchTask(pJob));
17,626!
830

831
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
17,624✔
832
      SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
122✔
833
      code = tsem_wait(&pJob->rspSem);
122✔
834
      if (code) {
120!
835
        qError("tsem_wait for fetch rspSem failed, error:%s", tstrerror(code));
×
836
        SCH_RET(code);
×
837
      }
838

839
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
120!
840
    }
841
  } else {
842
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
1,254!
843
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
×
844
    } else {
845
      schPostJobRes(pJob, SCH_OP_FETCH);
1,254✔
846
    }
847
  }
848

849
  SCH_RET(code);
18,757!
850
}
851

852
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
137,845✔
853
  int32_t  code = 0;
137,845✔
854
  int64_t  refId = -1;
137,845✔
855
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
137,845!
856
  if (NULL == pJob) {
137,974!
857
    qError("QID:0x%" PRIx64 ", calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
×
858
    SCH_ERR_JRET(terrno);
×
859
  }
860

861
  pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
137,974✔
862
  pJob->attr.localExec = pReq->localReq;
137,974✔
863
  pJob->conn = *pReq->pConn;
137,974✔
864
  qInfo("QID:0x%" PRIx64 " init with pTrans:%p", pReq->pDag->queryId, pJob->conn.pTrans);
137,974!
865
  
866
  if (pReq->sql) {
137,997!
867
    pJob->sql = taosStrdup(pReq->sql);
138,112!
868
    if (NULL == pJob->sql) {
138,048!
869
      qError("QID:0x%" PRIx64 ", strdup sql %s failed", pReq->pDag->queryId, pReq->sql);
×
870
      SCH_ERR_JRET(terrno);
×
871
    }
872
  }
873
  pJob->pDag = pReq->pDag;
137,933✔
874
  if (pReq->allocatorRefId > 0) {
137,933✔
875
    pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
46,853✔
876
    if (pJob->allocatorRefId <= 0) {
46,855!
877
      qError("QID:0x%" PRIx64 ", nodesMakeAllocatorWeakRef failed", pReq->pDag->queryId);
×
878
      SCH_ERR_JRET(terrno);
×
879
    }
880
  }
881
  pJob->chkKillFp = pReq->chkKillFp;
137,935✔
882
  pJob->chkKillParam = pReq->chkKillParam;
137,935✔
883
  pJob->userRes.execFp = pReq->execFp;
137,935✔
884
  pJob->userRes.cbParam = pReq->cbParam;
137,935✔
885
  pJob->source = pReq->source;
137,935✔
886
  pJob->pWorkerCb = pReq->pWorkerCb;
137,935✔
887

888
  if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
137,935!
889
    qTrace("QID:0x%" PRIx64 ", input exec nodeList is empty", pReq->pDag->queryId);
97,705!
890
  } else {
891
    pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL);
40,237✔
892
    if (NULL == pJob->nodeList) {
40,232!
893
      qError("QID:0x%" PRIx64 ", taosArrayDup failed, origNum:%d", pReq->pDag->queryId,
×
894
             (int32_t)taosArrayGetSize(pReq->pNodeList));
895
      SCH_ERR_JRET(terrno);
×
896
    }
897
  }
898

899
  pJob->taskList = taosHashInit(SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans), taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
138,089✔
900
                                HASH_ENTRY_LOCK);
901
  if (NULL == pJob->taskList) {
138,039!
902
    SCH_JOB_ELOG("taosHashInit %d taskList failed", SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans));
×
903
    SCH_ERR_JRET(terrno);
×
904
  }
905

906
  SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob));
138,039!
907

908
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
137,959!
909
    SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
×
910
  }
911

912
  pJob->execTasks = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
137,959✔
913
                                 HASH_ENTRY_LOCK);
914
  if (NULL == pJob->execTasks) {
138,064!
915
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
×
916
    SCH_ERR_JRET(terrno);
×
917
  }
918

919
  if (tsem_init(&pJob->rspSem, 0, 0)) {
138,064!
920
    SCH_JOB_ELOG("tsem_init failed, errno:%d", ERRNO);
×
921
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
922
  }
923

924
  pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
138,007✔
925
  if (pJob->refId < 0) {
138,060!
926
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
×
927
    SCH_ERR_JRET(terrno);
×
928
  }
929

930
  (void)atomic_add_fetch_32(&schMgmt.jobNum, 1);
138,060✔
931

932
  *pJobId = pJob->refId;
138,063✔
933

934
  SCH_JOB_TLOG("jobId:0x%" PRIx64 ", job created", pJob->refId);
138,063✔
935

936
  return TSDB_CODE_SUCCESS;
138,090✔
937

938
_return:
×
939

940
  if (NULL == pJob) {
×
941
    qDestroyQueryPlan(pReq->pDag);
×
942
  } else if (pJob->refId <= 0) {
×
943
    schFreeJobImpl(pJob);
×
944
  } else {
945
    code = taosRemoveRef(schMgmt.jobRef, pJob->refId);
×
946
    if (code) {
×
947
      SCH_JOB_DLOG("jobId:0x%" PRIx64 ", taosRemoveRef job from jobRef, error:%s", pJob->refId, tstrerror(code));
×
948
    }
949
  }
950

951
  SCH_RET(code);
×
952
}
953

954
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
137,885✔
955
  int32_t code = 0;
137,885✔
956
  qTrace("QID:0x%" PRIx64 ", jobId:0x%" PRIx64 ", sch job started", pReq->pDag->queryId, pJob->refId);
137,885✔
957

958
  SCH_ERR_RET(schLaunchJob(pJob));
137,885!
959

960
  if (pReq->syncReq) {
137,985✔
961
    SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
21,313✔
962
    code = tsem_wait(&pJob->rspSem);
21,313✔
963
    if (code) {
21,324!
964
      qError("QID:0x%" PRIx64 ", tsem_wait sync rspSem failed, error:%s", pReq->pDag->queryId, tstrerror(code));
×
965
      SCH_ERR_RET(code);
×
966
    }
967
  }
968

969
  SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
137,996✔
970

971
  return TSDB_CODE_SUCCESS;
138,007✔
972
}
973

974
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
15✔
975
  if (NULL == pReq || pReq->syncReq) {
15!
976
    return;
15✔
977
  }
978

979
  if (pReq->execFp) {
×
980
    (*pReq->execFp)(NULL, pReq->cbParam, errCode);
×
981
  } else if (pReq->fetchFp) {
×
982
    (*pReq->fetchFp)(NULL, pReq->cbParam, errCode);
×
983
  }
984
}
985

986
int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) {
×
987
  if (pJob->status >= JOB_TASK_STATUS_PART_SUCC) {
×
988
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
×
989
    if (pJob->fetched) {
×
990
      SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
991
      pJob->noMoreRetry = true;
×
992
      SCH_JOB_ELOG("already fetched while got error %s", tstrerror(rspCode));
×
993
      SCH_ERR_RET(rspCode);
×
994
    }
995

996
    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
997
    SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC));
×
998
  }
999

1000
  return TSDB_CODE_SUCCESS;
×
1001
}
1002

1003
int32_t schResetJobForRetry(SSchJob *pJob, SSchTask *pTask, int32_t rspCode, bool *inRetry) {
×
1004
  int32_t code = 0;
×
1005
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
×
1006

1007
  while (true) {
×
1008
    if (pTask->seriesId < atomic_load_64(&pJob->seriesId)) {
×
1009
      SCH_TASK_DLOG("task sId %" PRId64 " is smaller than current job sId %" PRId64, pTask->seriesId, pJob->seriesId);
×
1010
      return TSDB_CODE_SCH_IGNORE_ERROR;
×
1011
    }
1012

1013
    int8_t origInRetry = atomic_val_compare_exchange_8(&pJob->inRetry, 0, 1);
×
1014
    if (0 != origInRetry) {
×
1015
      SCH_JOB_DLOG("job already in retry, origInRetry: %d", pJob->inRetry);
×
1016
      taosUsleep(1);
×
1017
      continue;
×
1018
    }
1019

1020
    if (pTask->seriesId < atomic_load_64(&pJob->seriesId)) {
×
1021
      SCH_TASK_DLOG("task sId:%" PRId64 " is smaller than current job sId:%" PRId64, pTask->seriesId, pJob->seriesId);
×
1022
      return TSDB_CODE_SCH_IGNORE_ERROR;
×
1023
    }
1024

1025
    break;
×
1026
  }
1027

1028
  *inRetry = true;
×
1029
  SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode));
×
1030

1031
  (void)atomic_add_fetch_64(&pJob->seriesId, 1);
×
1032

1033
  for (int32_t i = 0; i < numOfLevels; ++i) {
×
1034
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
×
1035
    if (NULL == pLevel) {
×
1036
      SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels);
×
1037
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1038
    }
1039

1040
    pLevel->taskExecDoneNum = 0;
×
1041
    pLevel->taskLaunchedNum = 0;
×
1042

1043
    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
×
1044
    for (int32_t j = 0; j < numOfTasks; ++j) {
×
1045
      SSchTask *p = taosArrayGet(pLevel->subTasks, j);
×
1046
      if (NULL == p) {
×
1047
        SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", j, i, numOfTasks);
×
1048
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1049
      }
1050

1051
      SCH_LOCK_TASK(p);
×
1052
      
1053
      code = schChkUpdateRedirectCtx(pJob, p, NULL, rspCode);
×
1054
      if (TSDB_CODE_SUCCESS != code) {
×
1055
        SCH_UNLOCK_TASK(p);
×
1056
        SCH_RET(code);
×
1057
      }
1058

1059
      schResetTaskForRetry(pJob, p);
×
1060

1061
      SCH_LOCK(SCH_WRITE, &p->planLock);
×
1062
      qClearSubplanExecutionNode(p->plan);
×
1063
      SCH_UNLOCK(SCH_WRITE, &p->planLock);
×
1064

1065
      SCH_UNLOCK_TASK(p);
×
1066
    }
1067
  }
1068

1069
  SCH_RESET_JOB_LEVEL_IDX(pJob);
×
1070
  SCH_JOB_DLOG("update job sId to %" PRId64 ", levelIdx:%d", pJob->seriesId, pJob->levelIdx);
×
1071

1072
  return TSDB_CODE_SUCCESS;
×
1073
}
1074

1075
int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
×
1076
  int32_t code = 0;
×
1077
  bool    inRetry = false;
×
1078

1079
  taosMemoryFreeClear(pMsg->pData);
×
1080
  taosMemoryFreeClear(pMsg->pEpSet);
×
1081

1082
  SCH_UNLOCK_TASK(pTask);
×
1083
  SCH_TASK_DLOG("start to redirect all job tasks cause of error:%s from task TID:0x%" PRIx64, tstrerror(rspCode),
×
1084
                pTask->taskId);
1085

1086
  SCH_ERR_JRET(schResetJobForRetry(pJob, pTask, rspCode, &inRetry));
×
1087
  SCH_ERR_JRET(schLaunchJob(pJob));
×
1088

1089
  SCH_LOCK_TASK(pTask);
×
1090
  atomic_store_8(&pJob->inRetry, 0);
×
1091

1092
  SCH_RET(code);
×
1093

1094
_return:
×
1095
  SCH_LOCK_TASK(pTask);
×
1096
  code = schProcessOnTaskFailure(pJob, pTask, code);
×
1097

1098
  if (inRetry) {
×
1099
    atomic_store_8(&pJob->inRetry, 0);
×
1100
  }
1101
  SCH_RET(code);
×
1102
}
1103

1104
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
155,964✔
1105
  bool r = false;
155,964✔
1106
  SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
155,964!
1107
  if (sync >= 0) {
156,014✔
1108
    r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync);
18,880✔
1109
  } else {
1110
    r = (pJob->opStatus.op == op);
137,134✔
1111
  }
1112
  SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock);
156,014!
1113

1114
  return r;
156,031✔
1115
}
1116

1117
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode) {
159,684✔
1118
  int32_t op = 0;
159,684✔
1119

1120
  switch (type) {
159,684!
1121
    case SCH_OP_EXEC:
138,050✔
1122
      if (pReq && pReq->syncReq) {
138,050!
1123
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
21,324!
1124
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
21,314✔
1125
        if (SCH_OP_NULL == op || op != type) {
21,323!
1126
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
×
1127
                       jobTaskStatusStr(pJob->status));
1128
        }
1129
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
21,323!
1130
        schDumpJobExecRes(pJob, pReq->pExecRes);
21,323✔
1131
      }
1132
      break;
138,003✔
1133
    case SCH_OP_FETCH:
18,878✔
1134
      if (pReq && pReq->syncReq) {
18,878✔
1135
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
120!
1136
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
120✔
1137
        if (SCH_OP_NULL == op || op != type) {
120!
1138
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
×
1139
                       jobTaskStatusStr(pJob->status));
1140
        }
1141
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
120!
1142
      }
1143
      break;
18,877✔
1144
    case SCH_OP_GET_STATUS:
2,810✔
1145
      errCode = TSDB_CODE_SUCCESS;
2,810✔
1146
      break;
2,810✔
1147
    default:
×
1148
      break;
×
1149
  }
1150

1151
  if (errCode) {
159,636!
1152
    (void)schHandleJobFailure(pJob, errCode);  // handle internal
×
1153
  }
1154

1155
  SCH_JOB_TLOG("job end %s operation with code:%s", schGetOpStr(type), tstrerror(errCode));
159,636✔
1156
}
159,636✔
1157

1158
int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq) {
159,532✔
1159
  int32_t code = 0;
159,532✔
1160
  int8_t  status = SCH_GET_JOB_STATUS(pJob);
159,532✔
1161

1162
  switch (type) {
159,638!
1163
    case SCH_OP_EXEC:
137,988✔
1164
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
137,988!
1165
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
138,070!
1166
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
×
1167
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
×
1168
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
×
1169
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
1170
      }
1171

1172
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
138,074✔
1173

1174
      pJob->opStatus.syncReq = pReq->syncReq;
137,976✔
1175
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
137,976!
1176
      break;
138,068✔
1177
    case SCH_OP_FETCH:
18,880✔
1178
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
18,880!
1179
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
18,880!
1180
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
×
1181
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
×
1182
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
×
1183
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
1184
      }
1185

1186
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
18,880✔
1187

1188
      pJob->userRes.fetchRes = pReq->pFetchRes;
18,880✔
1189
      pJob->userRes.fetchFp = pReq->fetchFp;
18,880✔
1190
      pJob->userRes.cbParam = pReq->cbParam;
18,880✔
1191

1192
      pJob->opStatus.syncReq = pReq->syncReq;
18,880✔
1193
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
18,880!
1194

1195
      if (!SCH_JOB_NEED_FETCH(pJob)) {
18,880!
1196
        SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
×
1197
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
1198
      }
1199

1200
      if (status != JOB_TASK_STATUS_PART_SUCC && status != JOB_TASK_STATUS_FETCH) {
18,880!
1201
        SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
×
1202
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
×
1203
      }
1204

1205
      break;
18,880✔
1206
    case SCH_OP_GET_STATUS:
2,810✔
1207
      if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
2,810!
1208
        qDebug("jobId:0x%" PRIx64 ", job not initialized or not executable job", pJob->refId);
×
1209
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
×
1210
      }
1211
      return TSDB_CODE_SUCCESS;
2,810✔
1212
    default:
×
1213
      SCH_JOB_ELOG("unknown operation type %d", type);
×
1214
      SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
1215
  }
1216

1217
  if (schJobNeedToStop(pJob, &status)) {
156,883!
1218
    SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
×
1219
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1220
  }
1221

1222
  return TSDB_CODE_SUCCESS;
156,840✔
1223
}
1224

1225
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
181,714✔
1226
  if (pTask) {
181,714!
1227
    SCH_UNLOCK_TASK(pTask);
181,724!
1228
  }
1229

1230
  if (errCode) {
181,735✔
1231
    (void)schHandleJobFailure(pJob, errCode);  // ignore error
967✔
1232
  }
1233

1234
  if (pJob) {
181,735!
1235
    (void)schReleaseJob(pJob->refId);  // ignore error
181,748✔
1236
  }
1237
}
181,715✔
1238

1239
int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId) {
181,815✔
1240
  int32_t code = 0;
181,815✔
1241
  int8_t  status = 0;
181,815✔
1242

1243
  SSchTask *pTask = NULL;
181,815✔
1244
  SSchJob  *pJob = NULL;
181,815✔
1245

1246
  (void)schAcquireJob(rId, &pJob);
181,815✔
1247
  if (NULL == pJob) {
181,866✔
1248
    qWarn("QID:0x%" PRIx64 ", TID:0x%" PRIx64 "job no exist, may be dropped, jobId:0x%" PRIx64, qId, tId, rId);
70!
1249
    SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
70!
1250
  }
1251

1252
  if (schJobNeedToStop(pJob, &status)) {
363,572✔
1253
    SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
33!
1254
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
33!
1255
  }
1256

1257
  SCH_ERR_JRET(schGetTaskInJob(pJob, tId, &pTask));
181,743!
1258

1259
  SCH_LOCK_TASK(pTask);
181,724!
1260

1261
  *job = pJob;
181,731✔
1262
  *task = pTask;
181,731✔
1263

1264
  return TSDB_CODE_SUCCESS;
181,731✔
1265

1266
_return:
33✔
1267

1268
  if (pTask) {
33!
1269
    SCH_UNLOCK_TASK(pTask);
×
1270
  }
1271
  if (pJob) {
33!
1272
    (void)schReleaseJob(rId);  // ignore error
33✔
1273
  }
1274

1275
  SCH_RET(code);
33!
1276
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc