• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.03
/source/libs/scheduler/src/schJob.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "command.h"
18
#include "query.h"
19
#include "schInt.h"
20
#include "tmsg.h"
21
#include "tref.h"
22
#include "trpc.h"
23

24
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
2,458,041✔
25
  if (TSDB_CODE_SUCCESS == errCode) {
2,458,041✔
26
    return;
1,684,444✔
27
  }
28

29
  int32_t origCode = atomic_load_32(&pJob->errCode);
773,597✔
30
  if (TSDB_CODE_SUCCESS == origCode) {
773,597✔
31
    if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
771,030!
32
      goto _return;
771,030✔
33
    }
34

35
    origCode = atomic_load_32(&pJob->errCode);
×
36
  }
37

38
  if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
2,567!
39
    return;
263✔
40
  }
41

42
  if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
2,304!
43
    atomic_store_32(&pJob->errCode, errCode);
×
44
    goto _return;
×
45
  }
46

47
  return;
2,304✔
48

49
_return:
771,030✔
50

51
  SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode));
771,030✔
52
}
53

54
bool schJobDone(SSchJob *pJob) {
11,026,564✔
55
  int8_t status = SCH_GET_JOB_STATUS(pJob);
11,026,564✔
56

57
  return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || status == JOB_TASK_STATUS_SUCC);
11,026,913!
58
}
59

60
FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
3,522,034✔
61
  int8_t status = SCH_GET_JOB_STATUS(pJob);
6,866,591✔
62
  if (pStatus) {
11,026,670!
63
    *pStatus = status;
11,028,133✔
64
  }
65

66
  if (schJobDone(pJob)) {
11,026,670!
67
    return true;
3,024✔
68
  }
69

70
  if (pJob->chkKillFp && (*pJob->chkKillFp)(pJob->chkKillParam)) {
11,024,561!
71
    schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
×
72
    return true;
2✔
73
  }
74

75
  return false;
11,026,105✔
76
}
77

78
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
11,412,302✔
79
  int32_t code = 0;
11,412,302✔
80

81
  int8_t oriStatus = 0;
11,412,302✔
82

83
  while (true) {
84
    oriStatus = SCH_GET_JOB_STATUS(pJob);
11,412,302✔
85

86
    if (oriStatus == newStatus) {
11,412,294✔
87
      if (JOB_TASK_STATUS_FETCH == newStatus) {
186,794✔
88
        return code;
186,780✔
89
      }
90

91
      SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
14!
92
    }
93

94
    switch (oriStatus) {
11,225,500!
95
      case JOB_TASK_STATUS_NULL:
2,454,977✔
96
        if (newStatus != JOB_TASK_STATUS_INIT) {
2,454,977!
97
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
98
        }
99

100
        break;
2,454,977✔
101
      case JOB_TASK_STATUS_INIT:
2,454,993✔
102
        if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_DROP) {
2,454,993!
103
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
104
        }
105

106
        break;
2,454,993✔
107
      case JOB_TASK_STATUS_EXEC:
2,454,994✔
108
        if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_FAIL &&
2,454,994!
109
            newStatus != JOB_TASK_STATUS_DROP) {
110
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
111
        }
112

113
        break;
2,454,994✔
114
      case JOB_TASK_STATUS_PART_SUCC:
2,453,290✔
115
        if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
2,453,290!
116
            newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC &&
702,787!
117
            newStatus != JOB_TASK_STATUS_FETCH) {
118
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
119
        }
120

121
        break;
2,453,290✔
122
      case JOB_TASK_STATUS_FETCH:
702,784✔
123
        if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
702,784!
124
            newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC &&
×
125
            newStatus != JOB_TASK_STATUS_FETCH) {
126
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
127
        }
128

129
        break;
702,784✔
130
      case JOB_TASK_STATUS_SUCC:
704,490✔
131
      case JOB_TASK_STATUS_FAIL:
132
        if (newStatus != JOB_TASK_STATUS_DROP) {
704,490!
UNCOV
133
          SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
134
        }
135

136
        break;
704,490✔
137
      case JOB_TASK_STATUS_DROP:
×
138
        SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
×
139
        break;
×
140

141
      default:
×
142
        SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
×
143
        SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
×
144
    }
145

146
    if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
11,225,500!
147
      continue;
×
148
    }
149

150
    SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
11,225,550✔
151

152
    break;
11,225,548✔
153
  }
154

155
  return TSDB_CODE_SUCCESS;
11,225,548✔
156

157
_return:
14✔
158

159
  if (TSDB_CODE_SCH_IGNORE_ERROR == code) {
14!
160
    SCH_JOB_DLOG("ignore job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
14✔
161
  } else {
UNCOV
162
    SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
×
163
  }
164
  SCH_RET(code);
14!
165
}
166

167
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
2,454,988✔
168
  for (int32_t i = 0; i < pJob->levelNum; ++i) {
5,476,204✔
169
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
3,021,219✔
170
    if (NULL == pLevel) {
3,021,223!
171
      SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, pJob->levelNum);
×
UNCOV
172
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
173
    }
174

175
    for (int32_t m = 0; m < pLevel->taskNum; ++m) {
6,974,042✔
176
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
3,952,826✔
177
      if (NULL == pTask) {
3,952,831!
178
        SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum: %d", m, pLevel->level, pLevel->taskNum);
×
179
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
180
      }
181

182
      SSubplan *pPlan = pTask->plan;
3,952,827✔
183
      int32_t   childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
3,952,827✔
184
      int32_t   parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
3,952,827✔
185

186
      if (childNum > 0) {
3,952,827✔
187
        if (pJob->levelIdx == pLevel->level) {
689,294!
188
          SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
×
189
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
190
        }
191

192
        pTask->children = taosArrayInit(childNum, POINTER_BYTES);
689,294✔
193
        if (NULL == pTask->children) {
689,294✔
194
          SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
1!
195
          SCH_ERR_RET(terrno);
1!
196
        }
197
      }
198

199
      for (int32_t n = 0; n < childNum; ++n) {
5,396,477✔
200
        SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
1,443,652✔
201
        if (NULL == child) {
1,443,652!
202
          SCH_JOB_ELOG("fail to get the %dth child subplan, childNum: %d", n, childNum);
×
203
          SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
204
        }
205

206
        if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(child)) {
1,443,652!
207
          SCH_JOB_ELOG("invalid subplan type for the %dth child, level:%d, subplanNodeType:%d", n, i, nodeType(child));
×
208
          SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
209
        }
210

211
        SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
1,443,652✔
212
        if (NULL == childTask || NULL == *childTask) {
1,443,651!
213
          SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
×
214
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
215
        }
216

217
        if (NULL == taosArrayPush(pTask->children, childTask)) {
2,887,302!
218
          SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
×
219
          SCH_ERR_RET(terrno);
×
220
        }
221

222
        SCH_TASK_DLOG("children info, the %d child TID 0x%" PRIx64, n, (*childTask)->taskId);
1,443,651!
223
      }
224

225
      if (parentNum > 0) {
3,952,825✔
226
        if (0 == pLevel->level) {
1,443,651!
227
          SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
×
228
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
229
        }
230

231
        pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
1,443,651✔
232
        if (NULL == pTask->parents) {
1,443,648!
233
          SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
×
234
          SCH_ERR_RET(terrno);
×
235
        }
236
      } else {
237
        if (0 != pLevel->level) {
2,509,174!
238
          SCH_TASK_ELOG("invalid task info, level:%d, parentNum:%d", pLevel->level, parentNum);
×
239
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
240
        }
241
      }
242

243
      for (int32_t n = 0; n < parentNum; ++n) {
5,396,470✔
244
        SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
1,443,648✔
245
        if (NULL == parent) {
1,443,648!
246
          SCH_JOB_ELOG("fail to get the %dth parent subplan, parentNum: %d", n, parentNum);
×
247
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
248
        }
249

250
        if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(parent)) {
1,443,648!
251
          SCH_JOB_ELOG("invalid subplan type for the %dth parent, level:%d, subplanNodeType:%d", n, i, nodeType(parent));
×
252
          SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
253
        }
254

255
        SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
1,443,648✔
256
        if (NULL == parentTask || NULL == *parentTask) {
1,443,647!
257
          SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
×
258
          SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
259
        }
260

261
        if (NULL == taosArrayPush(pTask->parents, parentTask)) {
2,887,295!
262
          SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
×
263
          SCH_ERR_RET(terrno);
×
264
        }
265

266
        SCH_TASK_DLOG("parents info, the %d parent TID 0x%" PRIx64, n, (*parentTask)->taskId);
1,443,648!
267
      }
268

269
      SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
3,952,822!
270
    }
271
  }
272

273
  SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
2,454,985✔
274
  if (NULL == pLevel) {
2,454,985!
275
    SCH_JOB_ELOG("fail to get level 0 level, levelNum:%d", (int32_t)taosArrayGetSize(pJob->levels));
×
276
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
277
  }
278

279
  if (SCH_IS_QUERY_JOB(pJob)) {
2,454,985✔
280
    if (pLevel->taskNum > 1) {
749,893!
281
      SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
×
282
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
283
    }
284

285
    SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
749,893✔
286
    if (NULL == pLevel) {
749,894!
287
      SCH_JOB_ELOG("fail to get the first task in level 0, taskNum:%d", (int32_t)taosArrayGetSize(pLevel->subTasks));
×
288
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
289
    }
290

291
    if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType || EXPLAIN_MODE_DISABLE != pJob->attr.explainMode) {
749,886✔
292
      pJob->attr.needFetch = true;
749,741✔
293
    }
294
  }
295

296
  return TSDB_CODE_SUCCESS;
2,454,978✔
297
}
298

299
int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
3,952,828✔
300
  if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) {
3,952,828✔
301
    return TSDB_CODE_SUCCESS;
2,393,021✔
302
  }
303

304
  if (NULL == taosArrayPush(pJob->dataSrcTasks, &pTask)) {
3,119,621!
305
    SCH_ERR_RET(terrno);
×
306
  }
307

308
  return TSDB_CODE_SUCCESS;
1,559,814✔
309
}
310

311
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
2,455,125✔
312
  int32_t code = 0;
2,455,125✔
313
  pJob->queryId = pDag->queryId;
2,455,125✔
314

315
  if (pDag->numOfSubplans <= 0) {
2,455,125!
316
    SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
×
317
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
318
  }
319

320
  pJob->dataSrcTasks = taosArrayInit(SCH_GET_TASK_CAPACITY(pDag->numOfSubplans), POINTER_BYTES);
2,455,125✔
321
  if (NULL == pJob->dataSrcTasks) {
2,455,135✔
322
    SCH_ERR_RET(terrno);
1!
323
  }
324

325
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
2,455,134!
326
  if (levelNum <= 0) {
2,455,134!
327
    SCH_JOB_ELOG("invalid level num:%d", levelNum);
×
328
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
329
  }
330

331
  SHashObj *planToTask = taosHashInit(
2,455,134✔
332
      SCH_GET_TASK_CAPACITY(pDag->numOfSubplans),
2,455,136✔
333
      taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
334
      HASH_NO_LOCK);
335
  if (NULL == planToTask) {
2,455,143!
336
    SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
×
337
    SCH_ERR_JRET(terrno);
×
338
  }
339

340
  pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
2,455,143✔
341
  if (NULL == pJob->levels) {
2,455,137!
342
    SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
×
343
    SCH_ERR_JRET(terrno);
×
344
  }
345

346
  pJob->levelNum = levelNum;
2,455,137✔
347
  SCH_RESET_JOB_LEVEL_IDX(pJob);
2,455,137✔
348

349
  SSchLevel      level = {0};
2,455,133✔
350
  SNodeListNode *plans = NULL;
2,455,133✔
351
  int32_t        taskNum = 0;
2,455,133✔
352
  int32_t        totalTaskNum = 0;
2,455,133✔
353
  SSchLevel     *pLevel = NULL;
2,455,133✔
354

355
  level.status = JOB_TASK_STATUS_INIT;
2,455,133✔
356

357
  for (int32_t i = 0; i < levelNum; ++i) {
5,476,370✔
358
    if (NULL == taosArrayPush(pJob->levels, &level)) {
6,042,735!
359
      SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
×
360
      SCH_ERR_JRET(terrno);
×
361
    }
362

363
    pLevel = taosArrayGet(pJob->levels, i);
3,021,374✔
364
    if (NULL == pLevel) {
3,021,365!
365
      SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum);
×
366
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
367
    }
368

369
    pLevel->level = i;
3,021,365✔
370

371
    plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
3,021,365✔
372
    if (NULL == plans) {
3,021,371!
373
      SCH_JOB_ELOG("empty level plan, level:%d", i);
×
374
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
375
    }
376

377
    if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
3,021,371!
378
      SCH_JOB_ELOG("invalid level plan, level:%d, planNodeType:%d", i, nodeType(plans));
×
379
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
380
    }
381

382
    taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
3,021,371!
383
    if (taskNum <= 0) {
3,021,371!
384
      SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
×
385
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
386
    }
387

388
    totalTaskNum += taskNum;
3,021,371✔
389
    if (totalTaskNum > pDag->numOfSubplans) {
3,021,371!
390
      SCH_JOB_ELOG("current totalTaskNum %d is bigger than numOfSubplans %d, level:%d", totalTaskNum, pDag->numOfSubplans, i);
×
391
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
392
    }
393
    
394
    pLevel->taskNum = taskNum;
3,021,371✔
395

396
    pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
3,021,371✔
397
    if (NULL == pLevel->subTasks) {
3,021,363!
398
      SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
×
399
      SCH_ERR_JRET(terrno);
×
400
    }
401

402
    for (int32_t n = 0; n < taskNum; ++n) {
6,974,207✔
403
      SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
3,952,971✔
404

405
      SCH_ERR_JRET(schValidateSubplan(pJob, plan, pLevel->level, n, taskNum));
3,952,973!
406
      
407
      SCH_SET_JOB_TYPE(pJob, plan->subplanType);
3,952,821✔
408

409
      SSchTask  task = {0};
3,952,821✔
410
      SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
3,952,821✔
411
      if (NULL == pTask) {
3,952,837!
412
        SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
×
413
        SCH_ERR_JRET(terrno);
×
414
      }
415

416
      SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel));
3,952,837!
417

418
      SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
3,952,841!
419

420
      code = taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES);
3,952,836✔
421
      if (0 != code) {
3,952,839!
422
        SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d, error:%s", n, tstrerror(code));
×
423
        SCH_ERR_JRET(code);
×
424
      }
425

426
      code = taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
3,952,839✔
427
      if (0 != code) {
3,952,842!
428
        SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d, error:%s", n, tstrerror(code));
×
429
        SCH_ERR_JRET(code);
×
430
      }
431

432
      ++pJob->taskNum;
3,952,842✔
433
    }
434

435
    SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum);
3,021,236✔
436
  }
437

438
  if (totalTaskNum != pDag->numOfSubplans) {
2,455,009!
439
    SCH_JOB_ELOG("totalTaskNum %d mis-match with numOfSubplans %d", totalTaskNum, pDag->numOfSubplans);
×
440
    SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
441
  }
442

443
  SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
2,455,009!
444

445
_return:
2,454,979✔
446

447
  if (planToTask) {
2,455,121✔
448
    taosHashCleanup(planToTask);
2,455,120✔
449
  }
450

451
  SCH_RET(code);
2,455,134!
452
}
453

454
void schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
2,454,995✔
455
  pRes->code = atomic_load_32(&pJob->errCode);
2,454,995✔
456
  pRes->numOfRows = pJob->resNumOfRows;
2,454,999✔
457

458
  SCH_LOCK(SCH_WRITE, &pJob->resLock);
2,454,999!
459
  pRes->res = pJob->execRes.res;
2,454,999✔
460
  pRes->msgType = pJob->execRes.msgType;
2,454,999✔
461
  pRes->numOfBytes = pJob->execRes.numOfBytes;
2,454,999✔
462
  pJob->execRes.res = NULL;
2,454,999✔
463
  SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
2,454,999!
464

465
  SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code));
2,454,998✔
466
}
2,454,999✔
467

468
int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
889,565✔
469
  int32_t code = 0;
889,565✔
470

471
  SCH_LOCK(SCH_WRITE, &pJob->resLock);
889,565!
472

473
  pJob->fetched = true;
889,566✔
474

475
  if (pJob->fetchRes && ((SRetrieveTableRsp *)pJob->fetchRes)->completed) {
889,566!
476
    SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL));
702,780!
477
  }
478

479
  while (true) {
480
    *pData = atomic_load_ptr(&pJob->fetchRes);
889,565✔
481
    if (*pData != atomic_val_compare_exchange_ptr(&pJob->fetchRes, *pData, NULL)) {
889,564!
482
      continue;
×
483
    }
484

485
    break;
889,567✔
486
  }
487

488
  if (NULL == *pData) {
889,567!
489
    SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
×
490
    if (NULL == rsp) {
×
491
      SCH_JOB_ELOG("malloc SRetrieveTableRsp %d failed, code:%x", (int32_t)sizeof(SRetrieveTableRsp), terrno);
×
492
      SCH_ERR_JRET(terrno);
×
493
    }
494

495
    rsp->completed = 1;
×
496

497
    *pData = rsp;
×
498
    SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
×
499
  }
500

501
  SCH_JOB_DLOG("fetch done, totalRows:%" PRId64, pJob->resNumOfRows);
889,564✔
502

503
_return:
889,565✔
504

505
  SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
889,565!
506

507
  return code;
889,567✔
508
}
509

510
int32_t schNotifyUserExecRes(SSchJob *pJob) {
2,453,721✔
511
  SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
2,453,721✔
512
  if (NULL == pRes) {
2,453,726!
513
    qError("malloc execResult %d failed, error: %x", (int32_t)sizeof(SExecResult), terrno);
×
514
    SCH_RET(terrno);
×
515
  }
516

517
  schDumpJobExecRes(pJob, pRes);
2,453,726✔
518

519
  SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode));
2,453,721✔
520
  (*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
2,453,721✔
521
  SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode));
2,453,725✔
522

523
  return TSDB_CODE_SUCCESS;
2,453,725✔
524
}
525

526
int32_t schNotifyUserFetchRes(SSchJob *pJob) {
889,564✔
527
  void *pRes = NULL;
889,564✔
528

529
  int32_t code = schDumpJobFetchRes(pJob, &pRes);
889,564✔
530
  if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == atomic_load_32(&pJob->errCode)) {
889,567!
531
    atomic_store_32(&pJob->errCode, code);
×
532
  }
533

534
  SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode));
889,567✔
535
  (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
889,567✔
536
  SCH_JOB_DLOG("sch end from fetch cb, code: %s", tstrerror(pJob->errCode));
889,566✔
537

538
  return TSDB_CODE_SUCCESS;
889,566✔
539
}
540

541
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
5,799,563✔
542
  int32_t code = 0;
5,799,563✔
543
  SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
5,799,563!
544

545
  if (SCH_OP_NULL == pJob->opStatus.op) {
5,799,576✔
546
    SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
2,455,012✔
547
    goto _return;
2,455,012✔
548
  }
549

550
  if (SCH_OP_NULL != op && pJob->opStatus.op != op) {
3,344,564!
551
    SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
×
552
    goto _return;
×
553
  }
554

555
  if (SCH_JOB_IN_SYNC_OP(pJob)) {
3,344,564✔
556
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
1,274!
557
    code = tsem_post(&pJob->rspSem);
1,274✔
558
    if (code) {
1,274!
559
      SCH_JOB_ELOG("tsem_post failed for syncOp, error:%s", tstrerror(code));
×
560
    }
561
  } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
3,343,290!
562
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
2,453,725!
563
    (void)schNotifyUserExecRes(pJob);  // ignore error
2,453,725✔
564
  } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) {
889,562!
565
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
889,567!
566
    (void)schNotifyUserFetchRes(pJob);  // ignore error
889,566✔
567
  } else {
568
    SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
×
569
    SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status));
×
570
  }
571

572
  return;
3,344,563✔
573

574
_return:
2,455,012✔
575

576
  SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
2,455,012!
577
}
578

579
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
2,456,719✔
580
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
2,456,719✔
581
    schPostJobRes(pJob, 0);
14✔
582
    return TSDB_CODE_SCH_IGNORE_ERROR;
14✔
583
  }
584

585
  schUpdateJobErrCode(pJob, errCode);
2,456,705✔
586

587
  int32_t code = atomic_load_32(&pJob->errCode);
2,456,702✔
588
  if (code) {
2,456,702✔
589
    SCH_JOB_DLOG("job failed with error %s", tstrerror(code));
772,739✔
590
  }
591

592
  schPostJobRes(pJob, 0);
2,456,702✔
593

594
  SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
2,456,709!
595
}
596

597
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
1,744✔
598
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
1,744✔
599
    return TSDB_CODE_SCH_IGNORE_ERROR;
21✔
600
  }
601

602
  (void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode);  // ignore error
1,723✔
603

604
  return TSDB_CODE_SCH_IGNORE_ERROR;
1,723✔
605
}
606

607
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailure(pJob, errCode)); }
2,454,996!
608

609
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
2,454,998✔
610
  if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
2,454,998!
611
    return TSDB_CODE_SCH_IGNORE_ERROR;
×
612
  }
613

614
  (void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode);  // ignore error
2,454,998✔
615

616
  return TSDB_CODE_SCH_IGNORE_ERROR;
2,454,999✔
617
}
618

619
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
2,453,285✔
620
  if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) {
2,453,285!
621
    SCH_ERR_RET(schLaunchFetchTask(pJob));
×
622
  } else {
623
    schPostJobRes(pJob, 0);
2,453,289✔
624
  }
625

626
  return TSDB_CODE_SUCCESS;
2,453,288✔
627
}
628

629
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
770,220✔
630

631
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
12,251✔
632
  SCH_TASK_DLOG("got explain rsp, rows:%" PRId64 ", complete:%d", htobe64(pRsp->numOfRows), pRsp->completed);
12,251!
633

634
  atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows));
12,251✔
635
  atomic_store_ptr(&pJob->fetchRes, pRsp);
12,251✔
636

637
  SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
12,251✔
638

639
  if (!SCH_IS_INSERT_JOB(pJob)) {
12,251✔
640
    schProcessOnDataFetched(pJob);
12,242✔
641
  }
642

643
  return TSDB_CODE_SUCCESS;
12,251✔
644
}
645

646
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
459,616✔
647
  if (!SCH_IS_QUERY_JOB(pJob)) {
459,616!
648
    return TSDB_CODE_SUCCESS;
×
649
  }
650

651
  SSchLevel *pLevel = pTask->level;
459,616✔
652
  int32_t    doneNum = atomic_load_32(&pLevel->taskExecDoneNum);
459,616✔
653
  if (doneNum == pLevel->taskNum) {
459,616!
654
    (void)atomic_sub_fetch_32(&pJob->levelIdx, 1);
459,616✔
655

656
    pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
459,617✔
657
    if (NULL == pLevel) {
459,617!
658
      SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, (int32_t)taosArrayGetSize(pJob->levels));
×
659
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
660
    }
661

662
    for (int32_t i = 0; i < pLevel->taskNum; ++i) {
1,038,066✔
663
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
578,450✔
664
      if (NULL == pTask) {
578,449!
665
        SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", i, pLevel->level, pLevel->taskNum);
×
666
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
667
      }
668

669
      if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
578,449!
670
        continue;
562,501✔
671
      }
672

673
      if (SCH_TASK_ALREADY_LAUNCHED(pTask)) {
15,948!
674
        continue;
×
675
      }
676

677
      SCH_ERR_RET(schLaunchTask(pJob, pTask));
15,948!
678
    }
679
  }
680

681
  return TSDB_CODE_SUCCESS;
459,616✔
682
}
683

684
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
1,689,117✔
685
  if (rsp->tbVerInfo) {
1,689,117✔
686
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
1,165,478!
687

688
    if (NULL == pJob->execRes.res) {
1,165,541✔
689
      pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
610,087✔
690
      if (NULL == pJob->execRes.res) {
610,087!
691
        SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
692
        SCH_ERR_RET(terrno);
×
693
      }
694
    }
695

696
    if (NULL == taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0),
1,165,541!
697
                                  taosArrayGetSize(rsp->tbVerInfo))) {
1,165,541✔
698
      SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
699
      SCH_ERR_RET(terrno);
×
700
    }
701

702
    taosArrayDestroy(rsp->tbVerInfo);
1,165,541✔
703

704
    pJob->execRes.msgType = TDMT_SCH_QUERY;
1,165,541✔
705

706
    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
1,165,541!
707
  }
708

709
  return TSDB_CODE_SUCCESS;
1,689,207✔
710
}
711

712
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
4,157,860✔
713
  schGetTaskFromList(pJob->taskList, taskId, pTask);
4,157,860✔
714
  if (NULL == *pTask) {
4,157,841!
715
    SCH_JOB_ELOG("task not found in job task list, taskId:0x%" PRIx64, taskId);
×
716
    SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
717
  }
718

719
  return TSDB_CODE_SUCCESS;
4,157,676✔
720
}
721

722
int32_t schLaunchJob(SSchJob *pJob) {
2,454,980✔
723
  if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) {
2,454,980✔
724
    SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes));
119,336!
725
    SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
119,336!
726
  } else {
727
    SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
2,335,644✔
728
    if (NULL == level) {
2,335,643!
729
      SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, (int32_t)taosArrayGetSize(pJob->levels));
×
730
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
731
    }
732

733
    SCH_ERR_RET(schLaunchLevelTasks(pJob, level));
2,335,643!
734
  }
735

736
  return TSDB_CODE_SUCCESS;
2,454,995✔
737
}
738

739
void schDropJobAllTasks(SSchJob *pJob) {
2,455,140✔
740
  schDropTaskInHashList(pJob, pJob->execTasks);
2,455,140✔
741
  //  schDropTaskInHashList(pJob, pJob->succTasks);
742
  //  schDropTaskInHashList(pJob, pJob->failTasks);
743
}
2,455,140✔
744

745
int32_t schNotifyJobAllTasks(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) {
×
746
  SCH_RET(schNotifyTaskInHashList(pJob, pJob->execTasks, type, pTask));
×
747
}
748

749
void schFreeJobImpl(void *job) {
2,455,142✔
750
  if (NULL == job) {
2,455,142!
751
    return;
×
752
  }
753

754
  SSchJob *pJob = job;
2,455,142✔
755
  uint64_t queryId = pJob->queryId;
2,455,142✔
756
  int64_t  refId = pJob->refId;
2,455,142✔
757

758
  qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
2,455,142✔
759

760
  schDropJobAllTasks(pJob);
2,455,142✔
761

762
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
2,455,140✔
763
  for (int32_t i = 0; i < numOfLevels; ++i) {
5,476,518✔
764
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
3,021,377✔
765
    if (NULL == pLevel) {
3,021,376!
766
      SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels);
×
767
      continue;
×
768
    }
769

770
    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
3,021,376✔
771
    for (int32_t j = 0; j < numOfTasks; ++j) {
6,974,221✔
772
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
3,952,843✔
773
      if (NULL == pLevel) {
3,952,843!
774
        SCH_JOB_ELOG("fail to get the %dth task, taskNum:%d", j, numOfTasks);
×
775
        continue;
×
776
      }
777

778
      schFreeTask(pJob, pTask);
3,952,843✔
779
    }
780

781
    taosArrayDestroy(pLevel->subTasks);
3,021,378✔
782
  }
783

784
  schFreeFlowCtrl(pJob);
2,455,141✔
785

786
  taosHashCleanup(pJob->execTasks);
2,455,139✔
787
  //  taosHashCleanup(pJob->failTasks);
788
  //  taosHashCleanup(pJob->succTasks);
789
  taosHashCleanup(pJob->taskList);
2,455,142✔
790

791
  taosArrayDestroy(pJob->levels);
2,455,142✔
792
  taosArrayDestroy(pJob->nodeList);
2,455,141✔
793
  taosArrayDestroy(pJob->dataSrcTasks);
2,455,142✔
794

795
  qExplainFreeCtx(pJob->explainCtx);
2,455,143✔
796

797
  destroyQueryExecRes(&pJob->execRes);
2,455,141✔
798

799
  qDestroyQueryPlan(pJob->pDag);
2,455,141✔
800
  (void)nodesReleaseAllocatorWeakRef(pJob->allocatorRefId);  // ignore error
2,455,139✔
801

802
  taosMemoryFreeClear(pJob->userRes.execRes);
2,455,141!
803
  taosMemoryFreeClear(pJob->fetchRes);
2,455,141!
804
  taosMemoryFreeClear(pJob->sql);
2,455,141!
805
  int32_t code = tsem_destroy(&pJob->rspSem);
2,455,142✔
806
  if (code) {
2,455,137!
807
    qError("tsem_destroy failed, error:%s", tstrerror(code));
×
808
  }
809
  taosMemoryFree(pJob);
2,455,137✔
810

811
  if (refId > 0) {
2,455,138✔
812
    int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
2,454,996✔
813
    if (jobNum == 0) {
2,454,999✔
814
      schCloseJobRef();
1,993,374✔
815
    }
816
  }
817

818
  qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
2,455,141✔
819
}
820

821
int32_t schJobFetchRows(SSchJob *pJob) {
889,566✔
822
  int32_t code = 0;
889,566✔
823

824
  if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC) && !(SCH_IS_EXPLAIN_JOB(pJob) && SCH_IS_INSERT_JOB(pJob))) {
889,566✔
825
    SCH_ERR_RET(schLaunchFetchTask(pJob));
770,221!
826

827
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
770,221!
828
      SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
×
829
      code = tsem_wait(&pJob->rspSem);
×
830
      if (code) {
×
831
        qError("tsem_wait for fetch rspSem failed, error:%s", tstrerror(code));
×
832
        SCH_RET(code);
×
833
      }
834

835
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
×
836
    }
837
  } else {
838
    if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
119,345!
839
      SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
×
840
    } else {
841
      schPostJobRes(pJob, SCH_OP_FETCH);
119,345✔
842
    }
843
  }
844

845
  SCH_RET(code);
889,566!
846
}
847

848
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
2,455,124✔
849
  int32_t  code = 0;
2,455,124✔
850
  int64_t  refId = -1;
2,455,124✔
851
  SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
2,455,124✔
852
  if (NULL == pJob) {
2,455,131!
853
    qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
×
854
    SCH_ERR_JRET(terrno);
×
855
  }
856

857
  pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
2,455,131✔
858
  pJob->attr.localExec = pReq->localReq;
2,455,131✔
859
  pJob->conn = *pReq->pConn;
2,455,131✔
860
  if (pReq->sql) {
2,455,131!
861
    pJob->sql = taosStrdup(pReq->sql);
2,455,138✔
862
    if (NULL == pJob->sql) {
2,455,135!
863
      qError("QID:0x%" PRIx64 " strdup sql %s failed", pReq->pDag->queryId, pReq->sql);
×
864
      SCH_ERR_JRET(terrno);
×
865
    }
866
  }
867
  pJob->pDag = pReq->pDag;
2,455,128✔
868
  if (pReq->allocatorRefId > 0) {
2,455,128✔
869
    pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
845,436✔
870
    if (pJob->allocatorRefId <= 0) {
845,444!
871
      qError("QID:0x%" PRIx64 " nodesMakeAllocatorWeakRef failed", pReq->pDag->queryId);
×
872
      SCH_ERR_JRET(terrno);
×
873
    }
874
  }
875
  pJob->chkKillFp = pReq->chkKillFp;
2,455,136✔
876
  pJob->chkKillParam = pReq->chkKillParam;
2,455,136✔
877
  pJob->userRes.execFp = pReq->execFp;
2,455,136✔
878
  pJob->userRes.cbParam = pReq->cbParam;
2,455,136✔
879
  pJob->source = pReq->source;
2,455,136✔
880
  pJob->pWorkerCb = pReq->pWorkerCb;
2,455,136✔
881

882
  if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
2,455,136✔
883
    qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
1,695,365✔
884
  } else {
885
    pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL);
759,770✔
886
    if (NULL == pJob->nodeList) {
759,769!
887
      qError("QID:0x%" PRIx64 " taosArrayDup failed, origNum:%d", pReq->pDag->queryId,
×
888
             (int32_t)taosArrayGetSize(pReq->pNodeList));
889
      SCH_ERR_JRET(terrno);
×
890
    }
891
  }
892

893
  pJob->taskList = taosHashInit(SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans), taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
2,455,138✔
894
                                HASH_ENTRY_LOCK);
895
  if (NULL == pJob->taskList) {
2,455,140!
896
    SCH_JOB_ELOG("taosHashInit %d taskList failed", SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans));
×
897
    SCH_ERR_JRET(terrno);
×
898
  }
899

900
  SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob));
2,455,140!
901

902
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
2,454,984✔
903
    SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
12,259!
904
  }
905

906
  pJob->execTasks = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
2,454,984✔
907
                                 HASH_ENTRY_LOCK);
908
  if (NULL == pJob->execTasks) {
2,454,997!
909
    SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
×
910
    SCH_ERR_JRET(terrno);
×
911
  }
912

913
  if (tsem_init(&pJob->rspSem, 0, 0)) {
2,454,997!
914
    SCH_JOB_ELOG("tsem_init failed, errno:%d", errno);
×
915
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
916
  }
917

918
  pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
2,454,986✔
919
  if (pJob->refId < 0) {
2,454,994!
920
    SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
×
921
    SCH_ERR_JRET(terrno);
×
922
  }
923

924
  (void)atomic_add_fetch_32(&schMgmt.jobNum, 1);
2,454,994✔
925

926
  *pJobId = pJob->refId;
2,454,998✔
927

928
  SCH_JOB_DLOG("job refId:0x%" PRIx64 " created", pJob->refId);
2,454,998✔
929

930
  return TSDB_CODE_SUCCESS;
2,454,999✔
931

932
_return:
142✔
933

934
  if (NULL == pJob) {
142!
935
    qDestroyQueryPlan(pReq->pDag);
×
936
  } else if (pJob->refId <= 0) {
142!
937
    schFreeJobImpl(pJob);
142✔
938
  } else {
939
    code = taosRemoveRef(schMgmt.jobRef, pJob->refId);
×
940
    if (code) {
×
941
      SCH_JOB_DLOG("taosRemoveRef job refId:0x%" PRIx64 " from jobRef, error:%s", pJob->refId, tstrerror(code));
×
942
    }
943
  }
944

945
  SCH_RET(code);
142!
946
}
947

948
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
2,454,983✔
949
  int32_t code = 0;
2,454,983✔
950
  qDebug("QID:0x%" PRIx64 " sch job refId 0x%" PRIx64 " started", pReq->pDag->queryId, pJob->refId);
2,454,983✔
951

952
  SCH_ERR_RET(schLaunchJob(pJob));
2,454,983!
953

954
  if (pReq->syncReq) {
2,454,995✔
955
    SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
1,273✔
956
    code = tsem_wait(&pJob->rspSem);
1,273✔
957
    if (code) {
1,274!
958
      qError("QID:0x%" PRIx64 " tsem_wait sync rspSem failed, error:%s", pReq->pDag->queryId, tstrerror(code));
×
959
      SCH_ERR_RET(code);
×
960
    }
961
  }
962

963
  SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
2,454,996✔
964

965
  return TSDB_CODE_SUCCESS;
2,454,994✔
966
}
967

968
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
5,612✔
969
  if (NULL == pReq || pReq->syncReq) {
5,612!
970
    return;
5,470✔
971
  }
972

973
  if (pReq->execFp) {
142!
974
    (*pReq->execFp)(NULL, pReq->cbParam, errCode);
142✔
975
  } else if (pReq->fetchFp) {
×
976
    (*pReq->fetchFp)(NULL, pReq->cbParam, errCode);
×
977
  }
978
}
979

980
int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) {
485✔
981
  if (pJob->status >= JOB_TASK_STATUS_PART_SUCC) {
485!
UNCOV
982
    SCH_LOCK(SCH_WRITE, &pJob->resLock);
×
UNCOV
983
    if (pJob->fetched) {
×
984
      SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
985
      pJob->noMoreRetry = true;
×
986
      SCH_JOB_ELOG("already fetched while got error %s", tstrerror(rspCode));
×
987
      SCH_ERR_RET(rspCode);
×
988
    }
UNCOV
989
    SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
990

UNCOV
991
    SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC));
×
992
  }
993

994
  return TSDB_CODE_SUCCESS;
485✔
995
}
996

997
int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) {
×
998
  int8_t origInRetry = atomic_val_compare_exchange_8(&pJob->inRetry, 0, 1);
×
999
  if (0 != origInRetry) {
×
1000
    SCH_JOB_DLOG("job already in retry, origInRetry: %d", pJob->inRetry);
×
1001
    return TSDB_CODE_SCH_IGNORE_ERROR;
×
1002
  }
1003

1004
  *inRetry = true;
×
1005

1006
  SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode));
×
1007

1008
  int32_t code = 0;
×
1009
  int32_t numOfLevels = taosArrayGetSize(pJob->levels);
×
1010
  for (int32_t i = 0; i < numOfLevels; ++i) {
×
1011
    SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
×
1012
    if (NULL == pLevel) {
×
1013
      SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels);
×
1014
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1015
    }
1016

1017
    pLevel->taskExecDoneNum = 0;
×
1018
    pLevel->taskLaunchedNum = 0;
×
1019

1020
    int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
×
1021
    for (int32_t j = 0; j < numOfTasks; ++j) {
×
1022
      SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
×
1023
      if (NULL == pTask) {
×
1024
        SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", j, i, numOfTasks);
×
1025
        SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1026
      }
1027

1028
      SCH_LOCK_TASK(pTask);
×
1029
      code = schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode);
×
1030
      if (TSDB_CODE_SUCCESS != code) {
×
1031
        SCH_UNLOCK_TASK(pTask);
×
1032
        SCH_RET(code);
×
1033
      }
1034
      qClearSubplanExecutionNode(pTask->plan);
×
1035
      schResetTaskForRetry(pJob, pTask);
×
1036
      SCH_UNLOCK_TASK(pTask);
×
1037
    }
1038
  }
1039

1040
  SCH_RESET_JOB_LEVEL_IDX(pJob);
×
1041

1042
  return TSDB_CODE_SUCCESS;
×
1043
}
1044

1045
int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
×
1046
  int32_t code = 0;
×
1047
  bool    inRetry = false;
×
1048

1049
  taosMemoryFreeClear(pMsg->pData);
×
1050
  taosMemoryFreeClear(pMsg->pEpSet);
×
1051

1052
  SCH_UNLOCK_TASK(pTask);
×
1053

1054
  SCH_TASK_DLOG("start to redirect all job tasks cause of error: %s", tstrerror(rspCode));
×
1055

1056
  SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode, &inRetry));
×
1057

1058
  SCH_ERR_JRET(schLaunchJob(pJob));
×
1059

1060
  SCH_LOCK_TASK(pTask);
×
1061

1062
  atomic_store_8(&pJob->inRetry, 0);
×
1063

1064
  SCH_RET(code);
×
1065

1066
_return:
×
1067

1068
  SCH_LOCK_TASK(pTask);
×
1069

1070
  code = schProcessOnTaskFailure(pJob, pTask, code);
×
1071

1072
  if (inRetry) {
×
1073
    atomic_store_8(&pJob->inRetry, 0);
×
1074
  }
1075

1076
  SCH_RET(code);
×
1077
}
1078

1079
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
3,342,851✔
1080
  bool r = false;
3,342,851✔
1081
  SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
3,342,851!
1082
  if (sync >= 0) {
3,342,857✔
1083
    r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync);
889,567!
1084
  } else {
1085
    r = (pJob->opStatus.op == op);
2,453,290✔
1086
  }
1087
  SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock);
3,342,857!
1088

1089
  return r;
3,342,855✔
1090
}
1091

1092
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode) {
3,365,195✔
1093
  int32_t op = 0;
3,365,195✔
1094

1095
  switch (type) {
3,365,195!
1096
    case SCH_OP_EXEC:
2,454,997✔
1097
      if (pReq && pReq->syncReq) {
2,454,997✔
1098
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
1,274!
1099
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
1,274✔
1100
        if (SCH_OP_NULL == op || op != type) {
1,274!
1101
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
×
1102
                       jobTaskStatusStr(pJob->status));
1103
        }
1104
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
1,274!
1105
        schDumpJobExecRes(pJob, pReq->pExecRes);
1,274✔
1106
      }
1107
      break;
2,454,994✔
1108
    case SCH_OP_FETCH:
889,567✔
1109
      if (pReq && pReq->syncReq) {
889,567!
1110
        SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
×
1111
        op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
×
1112
        if (SCH_OP_NULL == op || op != type) {
×
1113
          SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
×
1114
                       jobTaskStatusStr(pJob->status));
1115
        }
1116
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
×
1117
      }
1118
      break;
889,566✔
1119
    case SCH_OP_GET_STATUS:
20,635✔
1120
      errCode = TSDB_CODE_SUCCESS;
20,635✔
1121
      break;
20,635✔
1122
    default:
×
1123
      break;
×
1124
  }
1125

1126
  if (errCode) {
3,365,191!
1127
    (void)schHandleJobFailure(pJob, errCode);  // handle internal
×
1128
  }
1129

1130
  SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
3,365,191✔
1131
}
3,365,191✔
1132

1133
int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq) {
3,365,183✔
1134
  int32_t code = 0;
3,365,183✔
1135
  int8_t  status = SCH_GET_JOB_STATUS(pJob);
3,365,183✔
1136

1137
  switch (type) {
3,365,191!
1138
    case SCH_OP_EXEC:
2,454,993✔
1139
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
2,454,993!
1140
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
2,454,996!
1141
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
×
1142
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
×
1143
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
×
1144
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
1145
      }
1146

1147
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
2,454,998✔
1148

1149
      pJob->opStatus.syncReq = pReq->syncReq;
2,454,991✔
1150
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
2,454,991!
1151
      break;
2,454,995✔
1152
    case SCH_OP_FETCH:
889,567✔
1153
      SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
889,567!
1154
      if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
889,566!
1155
        SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
×
1156
        SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
×
1157
        schDirectPostJobRes(pReq, TSDB_CODE_APP_ERROR);
×
1158
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
1159
      }
1160

1161
      SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
889,567✔
1162

1163
      pJob->userRes.fetchRes = pReq->pFetchRes;
889,566✔
1164
      pJob->userRes.fetchFp = pReq->fetchFp;
889,566✔
1165
      pJob->userRes.cbParam = pReq->cbParam;
889,566✔
1166

1167
      pJob->opStatus.syncReq = pReq->syncReq;
889,566✔
1168
      SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
889,566!
1169

1170
      if (!SCH_JOB_NEED_FETCH(pJob)) {
889,566!
1171
        SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
×
1172
        SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
1173
      }
1174

1175
      if (status != JOB_TASK_STATUS_PART_SUCC && status != JOB_TASK_STATUS_FETCH) {
889,566!
1176
        SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
×
1177
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
×
1178
      }
1179

1180
      break;
889,566✔
1181
    case SCH_OP_GET_STATUS:
20,635✔
1182
      if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
20,635!
1183
        qDebug("job not initialized or not executable job, refId:0x%" PRIx64, pJob->refId);
8!
1184
        SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
8!
1185
      }
1186
      return TSDB_CODE_SUCCESS;
20,627✔
1187
    default:
×
1188
      SCH_JOB_ELOG("unknown operation type %d", type);
×
1189
      SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
1190
  }
1191

1192
  if (schJobNeedToStop(pJob, &status)) {
3,344,562!
1193
    SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
×
1194
    SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
×
1195
  }
1196

1197
  return TSDB_CODE_SUCCESS;
3,344,559✔
1198
}
1199

1200
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
4,157,292✔
1201
  if (pTask) {
4,157,292!
1202
    SCH_UNLOCK_TASK(pTask);
4,157,304!
1203
  }
1204

1205
  if (errCode) {
4,157,294✔
1206
    (void)schHandleJobFailure(pJob, errCode);  // ignore error
1,733✔
1207
  }
1208

1209
  if (pJob) {
4,157,294!
1210
    (void)schReleaseJob(pJob->refId);  // ignore error
4,157,310✔
1211
  }
1212
}
4,157,307✔
1213

1214
int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId) {
4,161,207✔
1215
  int32_t code = 0;
4,161,207✔
1216
  int8_t  status = 0;
4,161,207✔
1217

1218
  SSchTask *pTask = NULL;
4,161,207✔
1219
  SSchJob  *pJob = NULL;
4,161,207✔
1220

1221
  (void)schAcquireJob(rId, &pJob);
4,161,207✔
1222
  if (NULL == pJob) {
4,161,344✔
1223
    qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
1,007!
1224
    SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
1,007!
1225
  }
1226

1227
  if (schJobNeedToStop(pJob, &status)) {
8,320,672✔
1228
    SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
3,013!
1229
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
3,013!
1230
  }
1231

1232
  SCH_ERR_JRET(schGetTaskInJob(pJob, tId, &pTask));
4,157,322!
1233

1234
  SCH_LOCK_TASK(pTask);
4,157,082!
1235

1236
  *job = pJob;
4,157,184✔
1237
  *task = pTask;
4,157,184✔
1238

1239
  return TSDB_CODE_SUCCESS;
4,157,184✔
1240

1241
_return:
3,013✔
1242

1243
  if (pTask) {
3,013!
1244
    SCH_UNLOCK_TASK(pTask);
×
1245
  }
1246
  if (pJob) {
3,013!
1247
    (void)schReleaseJob(rId);  // ignore error
3,013✔
1248
  }
1249

1250
  SCH_RET(code);
3,013!
1251
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc