• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3903

24 Apr 2025 11:36AM UTC coverage: 55.307% (+0.09%) from 55.213%
#3903

push

travis-ci

happyguoxy
Sync branches at 2025-04-24 19:35

175024 of 316459 relevant lines covered (55.31%)

1151858.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.55
/source/libs/scheduler/src/schRemote.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "command.h"
18
#include "query.h"
19
#include "schInt.h"
20
#include "tglobal.h"
21
#include "tmisce.h"
22
#include "tmsg.h"
23
#include "tref.h"
24
#include "trpc.h"
25

26
// clang-format off
27
int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
174,776✔
28
  int32_t lastMsgType = pTask->lastMsgType;
174,776✔
29
  int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
174,776✔
30
  int32_t reqMsgType = (msgType & 1U) ? msgType : (msgType - 1);
174,798✔
31
  switch (msgType) {
174,798✔
32
    case TDMT_SCH_LINK_BROKEN:
20✔
33
    case TDMT_SCH_EXPLAIN_RSP:
34
      return TSDB_CODE_SUCCESS;
20✔
35
    case TDMT_SCH_FETCH_RSP:
8,567✔
36
    case TDMT_SCH_MERGE_FETCH_RSP:
37
      if (lastMsgType != reqMsgType) {
8,567✔
38
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
10✔
39
                      TMSG_INFO(msgType));
40
        SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
10✔
41
      }
42
      if (taskStatus != JOB_TASK_STATUS_FETCH) {
8,557✔
43
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
10✔
44
                      TMSG_INFO(msgType));
45
        SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
9✔
46
      }
47

48
      return TSDB_CODE_SUCCESS;
8,546✔
49
    case TDMT_SCH_MERGE_QUERY_RSP:
166,179✔
50
    case TDMT_SCH_QUERY_RSP:
51
    case TDMT_VND_CREATE_TABLE_RSP:
52
    case TDMT_VND_DROP_TABLE_RSP:
53
    case TDMT_VND_ALTER_TABLE_RSP:
54
    case TDMT_VND_SUBMIT_RSP:
55
    case TDMT_VND_DELETE_RSP:
56
    case TDMT_VND_COMMIT_RSP:
57
      break;
166,179✔
58
    default:
32✔
59
      SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
32✔
60
      SCH_ERR_RET(TSDB_CODE_INVALID_MSG);
32✔
61
  }
62

63
  if (lastMsgType != reqMsgType) {
166,201✔
64
    SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
10✔
65
                  TMSG_INFO(msgType));
66
    SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
10✔
67
  }
68

69
  if (taskStatus != JOB_TASK_STATUS_EXEC) {
166,191✔
70
    SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
10✔
71
                  TMSG_INFO(msgType));
72
    SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
×
73
  }
74

75
  return TSDB_CODE_SUCCESS;
166,165✔
76
}
77

78
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode) {
8,576✔
79
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
8,576✔
80
  int32_t code = 0;
8,576✔
81
  
82
  SCH_ERR_JRET(rspCode);
8,576✔
83
  
84
  if (NULL == msg) {
8,566✔
85
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
10✔
86
  }
87
  
88
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
8,556✔
89
    if (rsp->completed) {
15✔
90
      SRetrieveTableRsp *pRsp = NULL;
10✔
91
      SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
10✔
92
      if (pRsp) {
10✔
93
        SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
10✔
94
      } else {
95
        SCH_ERR_JRET(schNotifyJobAllTasks(pJob, pTask, TASK_NOTIFY_FINISHED));
×
96
      }
97
  
98
      taosMemoryFreeClear(msg);
10✔
99
  
100
      return TSDB_CODE_SUCCESS;
10✔
101
    }
102
  
103
    SCH_ERR_JRET(schLaunchFetchTask(pJob));
5✔
104
  
105
    taosMemoryFreeClear(msg);
5✔
106
  
107
    return TSDB_CODE_SUCCESS;
5✔
108
  }
109
  
110
  if (pJob->fetchRes) {
8,541✔
111
    SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes);
10✔
112
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
10✔
113
  }
114
  
115
  atomic_store_ptr(&pJob->fetchRes, rsp);
8,531✔
116
  (void)atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows));
8,532✔
117
  
118
  if (rsp->completed) {
8,531✔
119
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
6,378✔
120
  }
121
  
122
  SCH_TASK_DLOG("got fetch rsp, rows:%" PRId64 ", complete:%d", htobe64(rsp->numOfRows), rsp->completed);
8,530✔
123

124
  msg = NULL;
8,530✔
125
  schProcessOnDataFetched(pJob);
8,530✔
126

127
_return:
8,551✔
128

129
  taosMemoryFreeClear(msg);
8,551✔
130

131
  SCH_RET(code);
8,551✔
132
}
133

134
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
20✔
135
  SRetrieveTableRsp *pRsp = NULL;
20✔
136
  SCH_ERR_RET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp));
20✔
137
  
138
  if (pRsp) {
20✔
139
    SCH_ERR_RET(schProcessOnExplainDone(pJob, pTask, pRsp));
×
140
  }
141

142
  return TSDB_CODE_SUCCESS;
20✔
143
}
144

145
int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
174,595✔
146
  int32_t code = 0;
174,595✔
147
  int32_t msgSize = pMsg->len;
174,595✔
148
  int32_t msgType = pMsg->msgType;
174,595✔
149

150
  pTask->redirectCtx.inRedirect = false;
174,595✔
151

152
  switch (msgType) {
174,595✔
153
    case TDMT_VND_COMMIT_RSP: {
284✔
154
      SCH_ERR_JRET(rspCode);
284✔
155
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
284✔
156
      break;
284✔
157
    }
158
    case TDMT_VND_CREATE_TABLE_RSP: {
34,618✔
159
      SVCreateTbBatchRsp batchRsp = {0};
34,618✔
160
      if (pMsg->pData) {
34,618✔
161
        SDecoder coder = {0};
34,616✔
162
        tDecoderInit(&coder, pMsg->pData, msgSize);
34,616✔
163
        code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
34,620✔
164
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
34,642✔
165
          SCH_LOCK(SCH_WRITE, &pJob->resLock);
34,646✔
166
          if (NULL == pJob->execRes.res) {
34,646✔
167
            pJob->execRes.res = (void*)taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
34,014✔
168
            if (NULL == pJob->execRes.res) {
34,013✔
169
              code = terrno;
3✔
170
              SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
171
              
172
              tDecoderClear(&coder);
×
173
              SCH_ERR_JRET(code);
×
174
            }
175
            
176
            pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
34,010✔
177
          }
178

179
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
70,942✔
180
            SVCreateTbRsp *rsp = batchRsp.pRsps + i;
36,294✔
181
            if (rsp->pMeta) {
36,294✔
182
              if (NULL == taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta)) {
71,942✔
183
                code = terrno;
×
184
                SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
185
                
186
                tDecoderClear(&coder);
×
187
                SCH_ERR_JRET(code);
×
188
              }
189
            }
190
            
191
            if (TSDB_CODE_SUCCESS != rsp->code) {
36,300✔
192
              code = rsp->code;
×
193
            }
194
          }
195

196
          if (taosArrayGetSize((SArray*)pJob->execRes.res) <= 0) {        
34,648✔
197
            taosArrayDestroy((SArray*)pJob->execRes.res);
24✔
198
            pJob->execRes.res = NULL;
24✔
199
          }
200
          SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
34,639✔
201
        }
202
        
203
        tDecoderClear(&coder);
34,635✔
204
        SCH_ERR_JRET(code);
34,623✔
205
      }
206

207
      SCH_ERR_JRET(rspCode);
34,625✔
208
      taosMemoryFreeClear(pMsg->pData);
34,625✔
209

210
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
34,635✔
211
      break;
34,640✔
212
    }
213
    case TDMT_VND_DROP_TABLE_RSP: {
331✔
214
      SVDropTbBatchRsp batchRsp = {0};
331✔
215
      if (pMsg->pData) {
331✔
216
        SDecoder coder = {0};
331✔
217
        tDecoderInit(&coder, pMsg->pData, msgSize);
331✔
218
        code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
331✔
219
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
331✔
220
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
662✔
221
            SVDropTbRsp *rsp = batchRsp.pRsps + i;
331✔
222
            if (TSDB_CODE_SUCCESS != rsp->code) {
331✔
223
              code = rsp->code;
×
224
              tDecoderClear(&coder);
×
225
              SCH_ERR_JRET(code);
×
226
            }
227
          }
228
        }
229
        tDecoderClear(&coder);
331✔
230
        SCH_ERR_JRET(code);
331✔
231
      }
232

233
      SCH_ERR_JRET(rspCode);
331✔
234
      taosMemoryFreeClear(pMsg->pData);
331✔
235

236
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
331✔
237
      break;
331✔
238
    }
239
    case TDMT_VND_ALTER_TABLE_RSP: {
31✔
240
      SVAlterTbRsp rsp = {0};
31✔
241
      if (pMsg->pData) {
31✔
242
        SDecoder coder = {0};
21✔
243
        tDecoderInit(&coder, pMsg->pData, msgSize);
21✔
244
        code = tDecodeSVAlterTbRsp(&coder, &rsp);
21✔
245
        tDecoderClear(&coder);
21✔
246
        SCH_ERR_JRET(code);
21✔
247
        SCH_ERR_JRET(rsp.code);
21✔
248

249
        pJob->execRes.res = rsp.pMeta;
21✔
250
        pJob->execRes.msgType = TDMT_VND_ALTER_TABLE;
21✔
251
      }
252

253
      SCH_ERR_JRET(rspCode);
31✔
254

255
      if (NULL == pMsg->pData) {
31✔
256
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
10✔
257
      }
258

259
      taosMemoryFreeClear(pMsg->pData);
21✔
260

261
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
21✔
262
      break;
21✔
263
    }
264
    case TDMT_VND_SUBMIT_RSP: {
121,127✔
265
      SCH_ERR_JRET(rspCode);
121,127✔
266

267
      if (pMsg->pData) {
121,126✔
268
        SDecoder    coder = {0};
121,177✔
269
        SSubmitRsp2 *rsp = taosMemoryMalloc(sizeof(*rsp));
121,177✔
270
        if (NULL == rsp) {
121,191✔
271
          SCH_ERR_JRET(terrno);
10✔
272
        }
273
        tDecoderInit(&coder, pMsg->pData, msgSize);
121,191✔
274
        code = tDecodeSSubmitRsp2(&coder, rsp);
121,186✔
275
        tDecoderClear(&coder);
121,173✔
276
        if (code) {
121,185✔
277
          SCH_TASK_ELOG("tDecodeSSubmitRsp2 failed, code:%d", code);
10✔
278
          tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE);
10✔
279
          taosMemoryFree(rsp);
10✔
280
          SCH_ERR_JRET(code);
10✔
281
        }
282

283
        (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows);
121,175✔
284

285
        int32_t createTbRspNum = taosArrayGetSize(rsp->aCreateTbRsp);
121,181✔
286
        SCH_TASK_DLOG("submit succeed, affectedRows:%d, createTbRspNum:%d", rsp->affectedRows, createTbRspNum);
121,160✔
287

288
        if (rsp->aCreateTbRsp && taosArrayGetSize(rsp->aCreateTbRsp) > 0) {
121,160✔
289
          SCH_LOCK(SCH_WRITE, &pJob->resLock);
474✔
290
          if (pJob->execRes.res) {
474✔
291
            SSubmitRsp2 *sum = pJob->execRes.res;
86✔
292
            sum->affectedRows += rsp->affectedRows;
86✔
293
            if (sum->aCreateTbRsp) {
86✔
294
              if (NULL == taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp)) {
86✔
295
                code = terrno;
×
296
                SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
297
                SCH_ERR_JRET(code);
×
298
              }
299
              
300
              taosArrayDestroy(rsp->aCreateTbRsp);
86✔
301
            } else {
302
              TSWAP(sum->aCreateTbRsp, rsp->aCreateTbRsp);
×
303
            }
304
            taosMemoryFree(rsp);
86✔
305
          } else {
306
            pJob->execRes.res = rsp;
388✔
307
            pJob->execRes.msgType = TDMT_VND_SUBMIT;
388✔
308
          }
309
          pJob->execRes.numOfBytes += pTask->msgLen;
474✔
310
          SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
474✔
311
        } else {
312
          SCH_LOCK(SCH_WRITE, &pJob->resLock);
120,686✔
313
          pJob->execRes.numOfBytes += pTask->msgLen;
120,665✔
314
          if (NULL == pJob->execRes.res) {
120,665✔
315
            TSWAP(pJob->execRes.res, rsp);
120,346✔
316
            pJob->execRes.msgType = TDMT_VND_SUBMIT;
120,346✔
317
          }
318
          SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
120,665✔
319
          tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE);
120,676✔
320
          taosMemoryFree(rsp);
120,673✔
321
        }
322
      }
323

324
      taosMemoryFreeClear(pMsg->pData);
121,098✔
325

326
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
121,096✔
327

328
      break;
121,184✔
329
    }
330
    case TDMT_VND_DELETE_RSP: {
310✔
331
      SCH_ERR_JRET(rspCode);
310✔
332

333
      if (pMsg->pData) {
310✔
334
        SDecoder    coder = {0};
310✔
335
        SVDeleteRsp rsp = {0};
310✔
336
        tDecoderInit(&coder, pMsg->pData, msgSize);
310✔
337
        if (tDecodeSVDeleteRsp(&coder, &rsp) < 0) {
310✔
338
          code = terrno;
10✔
339
          tDecoderClear(&coder);
10✔
340
          SCH_ERR_JRET(code);
10✔
341
        }
342
        tDecoderClear(&coder);
300✔
343

344
        (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
300✔
345
        SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
300✔
346
      }
347

348
      taosMemoryFreeClear(pMsg->pData);
300✔
349

350
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
300✔
351

352
      break;
300✔
353
    }
354
    case TDMT_SCH_QUERY_RSP:
9,218✔
355
    case TDMT_SCH_MERGE_QUERY_RSP: {
356
      SCH_ERR_JRET(rspCode);
9,238✔
357
      if (NULL == pMsg->pData) {
9,053✔
358
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
10✔
359
      }
360

361
      if (taosArrayGetSize(pTask->parents) == 0 && SCH_IS_EXPLAIN_JOB(pJob) && SCH_IS_INSERT_JOB(pJob)) {
9,043✔
362
        SRetrieveTableRsp *pRsp = NULL;
×
363
        SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
×
364
        if (pRsp) {
×
365
          SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
×
366
        }
367
      }
368

369
      SQueryTableRsp rsp = {0};
9,046✔
370
      if (tDeserializeSQueryTableRsp(pMsg->pData, msgSize, &rsp) < 0) {
9,046✔
371
        SCH_TASK_ELOG("tDeserializeSQueryTableRsp failed, msgSize:%d", msgSize);
10✔
372
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_MSG);
8✔
373
      }
374
      
375
      SCH_ERR_JRET(rsp.code);
9,036✔
376

377
      SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp));
9,036✔
378

379
      (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
9,039✔
380

381
      taosMemoryFreeClear(pMsg->pData);
9,039✔
382

383
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
9,039✔
384

385
      break;
9,040✔
386
    }
387
    case TDMT_SCH_EXPLAIN_RSP: {
60✔
388
      SCH_ERR_JRET(rspCode);
100✔
389
      if (NULL == pMsg->pData) {
60✔
390
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
10✔
391
      }
392

393
      if (!SCH_IS_EXPLAIN_JOB(pJob)) {
50✔
394
        SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
10✔
395
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
10✔
396
      }
397

398
      if (pJob->fetchRes) {
40✔
399
        SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->fetchRes);
10✔
400
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
10✔
401
      }
402

403
      SExplainRsp rsp = {0};
30✔
404
      if (tDeserializeSExplainRsp(pMsg->pData, msgSize, &rsp)) {
30✔
405
        tFreeSExplainRsp(&rsp);
10✔
406
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
10✔
407
      }
408

409
      SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp));
20✔
410

411
      taosMemoryFreeClear(pMsg->pData);
20✔
412
      break;
20✔
413
    }
414
    case TDMT_SCH_FETCH_RSP:
8,546✔
415
    case TDMT_SCH_MERGE_FETCH_RSP: {
416
      code = schProcessFetchRsp(pJob, pTask, pMsg->pData, rspCode);
8,546✔
417
      pMsg->pData = NULL;
8,547✔
418
      SCH_ERR_JRET(code);
8,547✔
419
      break;
8,547✔
420
    }
421
    case TDMT_SCH_DROP_TASK_RSP: {
10✔
422
      // NEVER REACH HERE
423
      SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId);
10✔
424
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
10✔
425
      break;
×
426
    }
427
    case TDMT_SCH_LINK_BROKEN:
10✔
428
      SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
10✔
429
      SCH_ERR_JRET(rspCode);
10✔
430
      break;
10✔
431
    default:
50✔
432
      SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%s", msgType, SCH_GET_TASK_STATUS_STR(pTask));
50✔
433
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
1✔
434
  }
435

436
  return TSDB_CODE_SUCCESS;
174,368✔
437

438
_return:
276✔
439

440
  taosMemoryFreeClear(pMsg->pData);
276✔
441

442
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
276✔
443
} 
444

445

446
// Note: no more task error processing, handled in function internal
447
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, uint64_t seriousId, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
174,840✔
448
  int32_t code = 0;
174,840✔
449
  int32_t msgType = pMsg->msgType;
174,840✔
450

451
  bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
174,840✔
452
  if (SCH_IS_QUERY_JOB(pJob)) {
174,840✔
453
    SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, seriousId, execId));
18,113✔
454
  }
455
  
456
  SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType));
174,724✔
457

458
  if (pTask->seriousId < atomic_load_64(&pJob->seriousId)) {
174,738✔
459
    SCH_TASK_DLOG("task sId %" PRId64 " is smaller than current job sId %" PRId64, pTask->seriousId, pJob->seriousId);
15✔
460
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
461
  }
462

463
  int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
174,703✔
464
#if 0
465
  if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
466
    SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
467
  } else if (SCH_TASKSET_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
468
    SCH_RET(schHandleTaskSetRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
469
  }
470
#else 
471
  if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
174,703✔
472
    SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
189✔
473
  }
474
#endif
475

476
  pTask->redirectCtx.inRedirect = false;
174,514✔
477

478
  SCH_RET(schProcessResponseMsg(pJob, pTask, pMsg, rspCode));
174,514✔
479

480
_return:
128✔
481

482
  taosMemoryFreeClear(pMsg->pData);
128✔
483

484
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
128✔
485
} 
486
int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
174,600✔
487
  int32_t                code = 0;
174,600✔
488
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
174,600✔
489
  SSchTask              *pTask = NULL;
174,600✔
490
  SSchJob               *pJob = NULL;
174,600✔
491

492
  int64_t qid = pParam->queryId;
174,600✔
493
  qDebug("QID:0x%" PRIx64 ", handle rsp msg, type:%s, handle:%p, code:%s", qid,TMSG_INFO(pMsg->msgType), pMsg->handle,
174,600✔
494
         tstrerror(rspCode));
495

496
  SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId));
174,600✔
497
  code = schHandleResponseMsg(pJob, pTask, pParam->seriousId, pParam->execId, pMsg, rspCode);
174,518✔
498
  pMsg->pData = NULL;
174,540✔
499

500
  schProcessOnCbEnd(pJob, pTask, code);
174,540✔
501

502
_return:
174,598✔
503

504
  taosMemoryFreeClear(pMsg->pData);
174,598✔
505
  taosMemoryFreeClear(pMsg->pEpSet);
174,598✔
506

507
  qTrace("QID:0x%" PRIx64 ", end to handle rsp msg, type:%s, handle:%p, code:%s", qid, TMSG_INFO(pMsg->msgType), pMsg->handle,
174,598✔
508
         tstrerror(rspCode));
509

510
  SCH_RET(code);
174,585✔
511
}
512

513
int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
432✔
514
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
432✔
515
  qDebug("QID:0x%" PRIx64 ", SID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " drop task rsp received, code:0x%x", 
432✔
516
         pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code);
517
  // called if drop task rsp received code
518
  (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
432✔
519

520
  if (pMsg->handle == NULL) {
432✔
521
    qError("sch handle is NULL, may be already released and mem lea");
273✔
522
  }
523
  if (pMsg) {
432✔
524
    taosMemoryFree(pMsg->pData);
432✔
525
    taosMemoryFree(pMsg->pEpSet);
432✔
526
  }
527
  return TSDB_CODE_SUCCESS;
432✔
528
}
529

530
int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
10✔
531
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
10✔
532
  qDebug("QID:0x%" PRIx64 ", SID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " task notify rsp received, code:0x%x", 
10✔
533
         pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code);
534
  if (pMsg) {
10✔
535
    taosMemoryFreeClear(pMsg->pData);
10✔
536
    taosMemoryFreeClear(pMsg->pEpSet);
10✔
537
  }
538
  return TSDB_CODE_SUCCESS;
10✔
539
}
540

541

542
int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
20✔
543
  SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
20✔
544
  (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
20✔
545

546
  qDebug("handle %p is broken", pMsg->handle);
20✔
547

548
  if (head->isHbParam) {
20✔
549
    taosMemoryFreeClear(pMsg->pData);
10✔
550
    taosMemoryFreeClear(pMsg->pEpSet);
10✔
551

552
    SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
10✔
553
    SSchTrans            trans = {.pTrans = hbParam->pTrans, .pHandle = NULL, .pHandleId = 0};
10✔
554
    SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));
10✔
555

556
    SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId, NULL));
10✔
557
  } else {
558
    SCH_ERR_RET(schHandleCallback(param, pMsg, code));
10✔
559
  }
560

561
  return TSDB_CODE_SUCCESS;
10✔
562
}
563

564
int32_t schHandleCommitCallback(void *param, SDataBuf *pMsg, int32_t code) {
284✔
565
  return schHandleCallback(param, pMsg, code);
284✔
566
}
567

568
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
6,437✔
569
  SSchedulerHbRsp        rsp = {0};
6,437✔
570
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
6,437✔
571

572
  if (code) {
6,437✔
573
    qError("hb rsp error:%s", tstrerror(code));
203✔
574
    (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
203✔
575
    SCH_ERR_JRET(code);
203✔
576
  }
577

578
  if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
6,234✔
579
    qError("invalid hb rsp msg, size:%d", pMsg->len);
10✔
580
    SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
10✔
581
  }
582

583
  SSchTrans trans = {0};
6,224✔
584
  trans.pTrans = pParam->pTrans;
6,224✔
585
  trans.pHandle = pMsg->handle;
6,224✔
586
  trans.pHandleId = pMsg->handleRefId;
6,224✔
587

588
  SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
6,224✔
589
  SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
6,222✔
590

591
_return:
6,223✔
592

593
  tFreeSSchedulerHbRsp(&rsp);
6,436✔
594
  taosMemoryFree(pMsg->pData);
6,437✔
595
  taosMemoryFree(pMsg->pEpSet);
6,437✔
596
  SCH_RET(code);
6,436✔
597
}
598

599
int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans,
214,193✔
600
                             void **pParam) {
601
  if (!isHb) {
214,193✔
602
    SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
203,160✔
603
    if (NULL == param) {
203,118✔
604
      SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
×
605
      SCH_ERR_RET(terrno);
×
606
    }
607

608
    param->queryId = pJob->queryId;
203,118✔
609
    param->seriousId = pTask->seriousId;
203,118✔
610
    param->refId = pJob->refId;
203,118✔
611
    param->clientId = SCH_CLIENT_ID(pTask);
203,118✔
612
    param->taskId = SCH_TASK_ID(pTask);
203,118✔
613
    param->pTrans = pJob->conn.pTrans;
203,118✔
614
    param->execId = pTask->execId;
203,118✔
615
    *pParam = param;
203,118✔
616

617
    return TSDB_CODE_SUCCESS;
203,118✔
618
  }
619

620
  if (TDMT_SCH_LINK_BROKEN == msgType) {
11,033✔
621
    SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
5,527✔
622
    if (NULL == param) {
5,543✔
623
      SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
×
624
      SCH_ERR_RET(terrno);
×
625
    }
626

627
    param->head.isHbParam = true;
5,543✔
628

629
    SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
5,543✔
630
    if (NULL == addr) {
5,520✔
631
      taosMemoryFree(param);
×
632
      SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum: %d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
×
633
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
6✔
634
    }
635
    param->nodeEpId.nodeId = addr->nodeId;
5,526✔
636
    SEp *pEp = SCH_GET_CUR_EP(addr);
5,526✔
637
    TAOS_STRCPY(param->nodeEpId.ep.fqdn, pEp->fqdn);
5,526✔
638
    param->nodeEpId.ep.port = pEp->port;
5,526✔
639
    param->pTrans = trans->pTrans;
5,526✔
640
    *pParam = param;
5,526✔
641

642
    return TSDB_CODE_SUCCESS;
5,526✔
643
  }
644

645
  // hb msg
646
  SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
5,506✔
647
  if (NULL == param) {
5,572✔
648
    qError("calloc SSchTaskCallbackParam failed");
×
649
    SCH_ERR_RET(terrno);
×
650
  }
651

652
  param->pTrans = trans->pTrans;
5,572✔
653
  *pParam = param;
5,572✔
654

655
  return TSDB_CODE_SUCCESS;
5,572✔
656
}
657

658
int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint32_t msgSize, int32_t msgType,
214,111✔
659
                                SSchTrans *trans, bool isHb, SMsgSendInfo **pMsgSendInfo) {
660
  int32_t       code = 0;
214,111✔
661
  SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
214,111✔
662
  if (NULL == msgSendInfo) {
214,309✔
663
    qError("calloc SMsgSendInfo size %d failed", (int32_t)sizeof(SMsgSendInfo));
×
664
    SCH_ERR_JRET(terrno);
×
665
  }
666

667
  msgSendInfo->paramFreeFp = taosAutoMemoryFree;
214,309✔
668
  SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param));
214,309✔
669

670
  SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp));
214,275✔
671

672
  if (pJob) {
214,082✔
673
    msgSendInfo->requestId = pJob->conn.requestId;
208,551✔
674
    msgSendInfo->requestObjRefId = pJob->conn.requestObjRefId;
208,551✔
675
  } else {
676
    SCH_ERR_JRET(taosGetSystemUUIDU64(&msgSendInfo->requestId));
5,531✔
677
  }
678

679
  qDebug("ahandle %p alloced, QID:0x%" PRIx64, msgSendInfo, msgSendInfo->requestId);
214,082✔
680

681
  if (TDMT_SCH_LINK_BROKEN != msgType) {
214,173✔
682
    msgSendInfo->msgInfo.pData = msg;
199,051✔
683
    msgSendInfo->msgInfo.len = msgSize;
199,051✔
684
    msgSendInfo->msgInfo.handle = trans->pHandle;
199,051✔
685
    msgSendInfo->msgType = msgType;
199,051✔
686
  }
687

688
  *pMsgSendInfo = msgSendInfo;
214,173✔
689

690
  return TSDB_CODE_SUCCESS;
214,173✔
691

692
_return:
×
693

694
  if (msgSendInfo) {
×
695
    destroySendMsgInfo(msgSendInfo);
×
696
  }
697

698
  taosMemoryFree(msg);
×
699

700
  SCH_RET(code);
×
701
}
702

703
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
219,726✔
704
  switch (msgType) {
219,726✔
705
    case TDMT_VND_CREATE_TABLE:
183,897✔
706
    case TDMT_VND_DROP_TABLE:
707
    case TDMT_VND_ALTER_TABLE:
708
    case TDMT_VND_SUBMIT:
709
    case TDMT_SCH_QUERY:
710
    case TDMT_SCH_MERGE_QUERY:
711
    case TDMT_VND_DELETE:
712
    case TDMT_SCH_EXPLAIN:
713
    case TDMT_SCH_FETCH:
714
    case TDMT_SCH_MERGE_FETCH:
715
      *fp = schHandleCallback;
183,897✔
716
      break;
183,897✔
717
    case TDMT_SCH_DROP_TASK:
9,266✔
718
      *fp = schHandleDropCallback;
9,266✔
719
      break;
9,266✔
720
    case TDMT_SCH_TASK_NOTIFY:
10✔
721
      *fp = schHandleNotifyCallback;
10✔
722
      break;
10✔
723
    case TDMT_SCH_QUERY_HEARTBEAT:
11,096✔
724
      *fp = schHandleHbCallback;
11,096✔
725
      break;
11,096✔
726
    case TDMT_VND_COMMIT:
282✔
727
      *fp = schHandleCommitCallback;
282✔
728
      break;
282✔
729
    case TDMT_SCH_LINK_BROKEN:
15,099✔
730
      *fp = schHandleLinkBrokenCallback;
15,099✔
731
      break;
15,099✔
732
    default:
76✔
733
      qError("unknown msg type for callback, msgType:%d", msgType);
76✔
734
      SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
735
  }
736

737
  return TSDB_CODE_SUCCESS;
219,637✔
738
}
739

740
/*
741
int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
742
  SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
743
  if (NULL == param) {
744
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
745
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
746
  }
747

748
  param->head.isHbParam = true;
749

750
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
751

752
  param->nodeEpId.nodeId = addr->nodeId;
753
  SEp* pEp = SCH_GET_CUR_EP(addr);
754
  tstrncpy(param->nodeEpId.ep.fqdn, pEp->fqdn, sizeof(param->nodeEpId.ep.fqdn));
755
  param->nodeEpId.ep.port = pEp->port;
756
  param->pTrans = pJob->pTrans;
757

758
  *pParam = param;
759

760
  return TSDB_CODE_SUCCESS;
761
}
762
*/
763

764
int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
5,566✔
765
  int32_t code = 0;
5,566✔
766
  TAOS_MEMCPY(pDst, pSrc, sizeof(SRpcCtx));
5,566✔
767
  pDst->brokenVal.val = NULL;
5,566✔
768
  pDst->args = NULL;
5,566✔
769

770
  SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, &pDst->brokenVal.val));
5,566✔
771

772
  pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
5,560✔
773
  if (NULL == pDst->args) {
5,572✔
774
    qError("taosHashInit %d RpcCtx failed", 1);
×
775
    SCH_ERR_JRET(terrno);
×
776
  }
777

778
  SRpcCtxVal dst = {0};
5,572✔
779
  void      *pIter = taosHashIterate(pSrc->args, NULL);
5,572✔
780
  while (pIter) {
11,144✔
781
    SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
5,570✔
782
    int32_t    *msgType = taosHashGetKey(pIter, NULL);
5,570✔
783

784
    dst = *pVal;
5,568✔
785
    dst.val = NULL;
5,568✔
786

787
    SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, &dst.val));
5,568✔
788

789
    if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) {
5,573✔
790
      qError("taosHashPut msg %d to rpcCtx failed", *msgType);
×
791
      (*pSrc->freeFunc)(dst.val);
×
792
      SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
793
    }
794

795
    pIter = taosHashIterate(pSrc->args, pIter);
5,574✔
796
  }
797

798
  return TSDB_CODE_SUCCESS;
5,574✔
799

800
_return:
×
801

802
  schFreeRpcCtx(pDst);
×
803
  SCH_RET(code);
×
804
}
805

806
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
5,552✔
807
  int32_t              code = 0;
5,552✔
808
  SSchHbCallbackParam *param = NULL;
5,552✔
809
  SMsgSendInfo        *pMsgSendInfo = NULL;
5,552✔
810
  SQueryNodeAddr      *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
5,552✔
811
  SQueryNodeEpId       epId = {0};
5,571✔
812

813
  epId.nodeId = addr->nodeId;
5,571✔
814
  TAOS_MEMCPY(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
5,571✔
815

816
  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
5,571✔
817
  if (NULL == pCtx->args) {
5,568✔
818
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
×
819
    SCH_ERR_RET(terrno);
×
820
  }
821

822
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
5,568✔
823
  if (NULL == pMsgSendInfo) {
5,559✔
824
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
×
825
    SCH_ERR_JRET(terrno);
×
826
  }
827

828
  param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
5,559✔
829
  if (NULL == param) {
5,567✔
830
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
×
831
    SCH_ERR_JRET(terrno);
×
832
  }
833

834
  int32_t              msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP;
5,567✔
835
  __async_send_cb_fn_t fp = NULL;
5,567✔
836
  SCH_ERR_JRET(schGetCallbackFp(TDMT_SCH_QUERY_HEARTBEAT, &fp));
5,567✔
837

838
  param->nodeEpId = epId;
5,548✔
839
  param->pTrans = pJob->conn.pTrans;
5,548✔
840

841
  pMsgSendInfo->param = param;
5,548✔
842
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
5,548✔
843
  pMsgSendInfo->fp = fp;
5,548✔
844

845
  SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo};
5,548✔
846
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
5,548✔
847
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
×
848
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
849
  }
850

851
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, true));
5,573✔
852
  pCtx->freeFunc = schFreeRpcCtxVal;
5,578✔
853

854
  return TSDB_CODE_SUCCESS;
5,578✔
855

856
_return:
×
857

858
  taosHashCleanup(pCtx->args);
×
859
  taosMemoryFreeClear(param);
×
860
  taosMemoryFreeClear(pMsgSendInfo);
×
861

862
  SCH_RET(code);
×
863
}
864

865
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
15,122✔
866
  int32_t       code = 0;
15,122✔
867
  int32_t       msgType = TDMT_SCH_LINK_BROKEN;
15,122✔
868
  SSchTrans     trans = {.pTrans = pJob->conn.pTrans};
15,122✔
869
  SMsgSendInfo *pMsgSendInfo = NULL;
15,122✔
870
  SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, msgType, &trans, isHb, &pMsgSendInfo));
15,122✔
871

872
  brokenVal->msgType = msgType;
15,165✔
873
  brokenVal->val = pMsgSendInfo;
15,165✔
874
  brokenVal->clone = schCloneSMsgSendInfo;
15,165✔
875

876
  return TSDB_CODE_SUCCESS;
15,165✔
877

878
_return:
×
879

880
  taosMemoryFreeClear(pMsgSendInfo->param);
×
881
  taosMemoryFreeClear(pMsgSendInfo);
×
882

883
  SCH_RET(code);
×
884
}
885

886
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
9,566✔
887
  int32_t       code = 0;
9,566✔
888
  SMsgSendInfo *pExplainMsgSendInfo = NULL;
9,566✔
889

890
  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
9,566✔
891
  if (NULL == pCtx->args) {
9,581✔
892
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
×
893
    SCH_ERR_RET(terrno);
×
894
  }
895

896
  SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
9,581✔
897
  SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, TDMT_SCH_EXPLAIN, &trans, false, &pExplainMsgSendInfo));
9,581✔
898

899
  int32_t    msgType = TDMT_SCH_EXPLAIN_RSP;
9,582✔
900
  SRpcCtxVal ctxVal = {.val = pExplainMsgSendInfo, .clone = schCloneSMsgSendInfo};
9,582✔
901
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
9,582✔
902
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
×
903
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
904
  }
905

906
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, false));
9,573✔
907
  pCtx->freeFunc = schFreeRpcCtxVal;
9,583✔
908

909
  return TSDB_CODE_SUCCESS;
9,583✔
910

911
_return:
×
912

913
  taosHashCleanup(pCtx->args);
×
914

915
  if (pExplainMsgSendInfo) {
×
916
    taosMemoryFreeClear(pExplainMsgSendInfo->param);
×
917
    taosMemoryFreeClear(pExplainMsgSendInfo);
×
918
  }
919

920
  SCH_RET(code);
×
921
}
922

923
int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) {
12,290✔
924
  if (pSrc->isHbParam) {
12,290✔
925
    SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam));
5,564✔
926
    if (NULL == dst) {
5,560✔
927
      qError("malloc SSchHbCallbackParam failed");
×
928
      SCH_ERR_RET(terrno);
×
929
    }
930

931
    TAOS_MEMCPY(dst, pSrc, sizeof(*dst));
5,560✔
932
    *pDst = (SSchCallbackParamHeader *)dst;
5,560✔
933

934
    return TSDB_CODE_SUCCESS;
5,560✔
935
  }
936

937
  SSchTaskCallbackParam *dst = taosMemoryMalloc(sizeof(SSchTaskCallbackParam));
6,726✔
938
  if (NULL == dst) {
6,720✔
939
    qError("malloc SSchTaskCallbackParam failed");
×
940
    SCH_ERR_RET(terrno);
×
941
  }
942

943
  TAOS_MEMCPY(dst, pSrc, sizeof(*dst));
6,720✔
944
  *pDst = (SSchCallbackParamHeader *)dst;
6,720✔
945

946
  return TSDB_CODE_SUCCESS;
6,720✔
947
}
948

949
int32_t schCloneSMsgSendInfo(void *src, void **dst) {
12,289✔
950
  SMsgSendInfo *pSrc = src;
12,289✔
951
  int32_t       code = 0;
12,289✔
952
  SMsgSendInfo *pDst = taosMemoryCalloc(1, sizeof(*pSrc));
12,289✔
953
  if (NULL == pDst) {
12,298✔
954
    qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
×
955
    SCH_ERR_RET(terrno);
×
956
  }
957

958
  TAOS_MEMCPY(pDst, pSrc, sizeof(*pSrc));
12,298✔
959
  pDst->param = NULL;
12,298✔
960

961
  SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
12,298✔
962
  pDst->paramFreeFp = taosAutoMemoryFree;
12,281✔
963

964
  *dst = pDst;
12,281✔
965

966
  return TSDB_CODE_SUCCESS;
12,281✔
967

968
_return:
×
969

970
  taosMemoryFreeClear(pDst);
×
971
  SCH_RET(code);
×
972
}
973

974
int32_t schUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, SQueryNodeAddr *addr, SSchTask *pTask) {
189,462✔
975
  if (NULL == pTask || addr->nodeId < MNODE_HANDLE) {
189,462✔
976
    return TSDB_CODE_SUCCESS;
5,585✔
977
  }
978

979
  if (addr->nodeId == MNODE_HANDLE) {
183,877✔
980
    pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
2,249✔
981
  } else {
982
    pMsgSendInfo->target.type = TARGET_TYPE_VNODE;
181,628✔
983
    pMsgSendInfo->target.vgId = addr->nodeId;
181,628✔
984
    pMsgSendInfo->target.dbFName = taosStrdup(pTask->plan->dbFName);
181,628✔
985
    if (NULL == pMsgSendInfo->target.dbFName) {
181,660✔
986
      return terrno;
×
987
    }
988
  }
989

990
  return TSDB_CODE_SUCCESS;
183,909✔
991
}
992

993
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQueryNodeAddr *addr, int32_t msgType,
189,486✔
994
                        void *msg, uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
995
  int32_t code = 0;
189,486✔
996
  SEpSet *epSet = &addr->epSet;
189,486✔
997

998
  SMsgSendInfo *pMsgSendInfo = NULL;
189,486✔
999
  bool          isHb = (TDMT_SCH_QUERY_HEARTBEAT == msgType);
189,486✔
1000
  SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
189,486✔
1001
  SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
189,476✔
1002

1003
  if (isHb && persistHandle && trans->pHandle == 0) {
189,667✔
1004
    int64_t refId = 0;
5,570✔
1005
    code = rpcAllocHandle(&refId); 
5,570✔
1006
    if (code != 0) {
5,565✔
1007
      SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code);
×
1008
      SCH_ERR_JRET(code);
×
1009
    }
1010
    trans->pHandle = (void *)refId;
5,565✔
1011
    pMsgSendInfo->msgInfo.handle =trans->pHandle;
5,565✔
1012
  } 
1013

1014
  if (pJob && pTask) {
189,662✔
1015
    SCH_TASK_DLOG("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
183,953✔
1016
           epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
1017
  } else {
1018
    qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
5,709✔
1019
           epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
1020
  }
1021
  
1022
  if (pTask) {
189,589✔
1023
    pTask->lastMsgType = msgType;
183,916✔
1024
  }
1025

1026
  code = asyncSendMsgToServerExt(trans->pTrans, epSet, NULL, pMsgSendInfo, persistHandle, ctx);
189,589✔
1027
  pMsgSendInfo = NULL;
189,733✔
1028
  if (code) {
189,733✔
1029
    SCH_ERR_JRET(code);
28✔
1030
  }
1031

1032
  if (pJob) {
189,705✔
1033
    SCH_TASK_TLOG("req msg sent, type:%d, %s", msgType, TMSG_INFO(msgType));
184,117✔
1034
  } else {
1035
    qTrace("req msg sent, type:%d, %s", msgType, TMSG_INFO(msgType));
5,588✔
1036
  }
1037
  return TSDB_CODE_SUCCESS;
189,648✔
1038

1039
_return:
28✔
1040

1041
  if (pJob) {
28✔
1042
    SCH_TASK_ELOG("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
28✔
1043
  } else {
1044
    qError("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
×
1045
  }
1046

1047
  if (pMsgSendInfo) {
28✔
1048
    destroySendMsgInfo(pMsgSendInfo);
×
1049
  }
1050

1051
  SCH_RET(code);
28✔
1052
}
1053

1054
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) {
5,590✔
1055
  SSchedulerHbReq req = {0};
5,590✔
1056
  int32_t         code = 0;
5,590✔
1057
  SRpcCtx         rpcCtx = {0};
5,590✔
1058
  SSchTrans       trans = {0};
5,590✔
1059
  int32_t         msgType = TDMT_SCH_QUERY_HEARTBEAT;
5,590✔
1060

1061
  req.header.vgId = nodeEpId->nodeId;
5,590✔
1062
  req.clientId = schMgmt.clientId;
5,590✔
1063
  TAOS_MEMCPY(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
5,590✔
1064

1065
  SCH_LOCK(SCH_READ, &schMgmt.hbLock);
5,590✔
1066
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
5,592✔
1067
  if (NULL == hb) {
5,592✔
1068
    SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
20✔
1069
    qError("hb connection no longer exist, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
20✔
1070
           nodeEpId->ep.port);
1071
    return TSDB_CODE_SUCCESS;
20✔
1072
  }
1073

1074
  SCH_LOCK(SCH_WRITE, &hb->lock);
5,572✔
1075
  code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
5,571✔
1076
  TAOS_MEMCPY(&trans, &hb->trans, sizeof(trans));
5,571✔
1077
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
5,571✔
1078
  SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
5,569✔
1079

1080
  SCH_ERR_RET(code);
5,569✔
1081

1082
  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
5,569✔
1083
  if (msgSize < 0) {
5,564✔
1084
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
×
1085
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
1086
  }
1087
  void *msg = taosMemoryCalloc(1, msgSize);
5,564✔
1088
  if (NULL == msg) {
5,570✔
1089
    qError("calloc hb req %d failed", msgSize);
×
1090
    SCH_ERR_JRET(terrno);
×
1091
  }
1092

1093
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
5,570✔
1094
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
×
1095
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
1096
  }
1097

1098
  int64_t        transporterId = 0;
5,572✔
1099
  SQueryNodeAddr addr = {.nodeId = nodeEpId->nodeId};
5,572✔
1100
  addr.epSet.inUse = 0;
5,572✔
1101
  addr.epSet.numOfEps = 1;
5,572✔
1102
  TAOS_MEMCPY(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
5,572✔
1103

1104
  code = schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx);
5,572✔
1105
  msg = NULL;
5,570✔
1106
  SCH_ERR_JRET(code);
5,570✔
1107

1108
  return TSDB_CODE_SUCCESS;
5,570✔
1109

1110
_return:
×
1111

1112
  taosMemoryFreeClear(msg);
×
1113
  schFreeRpcCtx(&rpcCtx);
×
1114
  SCH_RET(code);
×
1115
}
1116

1117
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType, void* param) {
183,949✔
1118
  int32_t  msgSize = 0;
183,949✔
1119
  void    *msg = NULL;
183,949✔
1120
  int32_t  code = 0;
183,949✔
1121
  bool     isCandidateAddr = false;
183,949✔
1122
  bool     persistHandle = false;
183,949✔
1123
  SRpcCtx  rpcCtx = {0};
183,949✔
1124

1125
  if (NULL == addr) {
183,949✔
1126
    addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
166,308✔
1127
    if (NULL == addr) {
166,252✔
1128
      SCH_TASK_ELOG("fail to get condidateAddr, candidateIdx %d, totalNum: %d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
10✔
1129
      SCH_ERR_JRET(terrno);
10✔
1130
    }
1131
    
1132
    isCandidateAddr = true;
166,242✔
1133
    SCH_TASK_TLOG("target candidateIdx %d, epInUse %d/%d", pTask->candidateIdx, addr->epSet.inUse,
166,242✔
1134
                  addr->epSet.numOfEps);
1135
  }
1136

1137
  switch (msgType) {
183,908✔
1138
    case TDMT_VND_CREATE_TABLE:
156,234✔
1139
    case TDMT_VND_DROP_TABLE:
1140
    case TDMT_VND_ALTER_TABLE:
1141
    case TDMT_VND_SUBMIT:
1142
    case TDMT_VND_COMMIT: {
1143
      msgSize = pTask->msgLen;
156,234✔
1144
      msg = pTask->msg;
156,234✔
1145
      pTask->msg = NULL;
156,234✔
1146
      break;
156,234✔
1147
    }
1148

1149
    case TDMT_VND_DELETE: {
300✔
1150
      SVDeleteReq req = {0};
300✔
1151
      req.header.vgId = addr->nodeId;
300✔
1152
      req.sId = pTask->seriousId;
300✔
1153
      req.queryId = pJob->queryId;
300✔
1154
      req.clientId = pTask->clientId;
300✔
1155
      req.taskId = pTask->taskId;
300✔
1156
      req.phyLen = pTask->msgLen;
300✔
1157
      req.sqlLen = strlen(pJob->sql);
300✔
1158
      req.sql = (char *)pJob->sql;
300✔
1159
      req.msg = pTask->msg;
300✔
1160
      req.source = pJob->source;
300✔
1161
      msgSize = tSerializeSVDeleteReq(NULL, 0, &req);
300✔
1162
      if (msgSize < 0) {
300✔
1163
        SCH_TASK_ELOG("tSerializeSVDeleteReq failed, code:%x", terrno);
×
1164
        SCH_ERR_JRET(terrno);
×
1165
      }
1166
      msg = taosMemoryCalloc(1, msgSize);
300✔
1167
      if (NULL == msg) {
300✔
1168
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1169
        SCH_ERR_JRET(terrno);
×
1170
      }
1171

1172
      msgSize = tSerializeSVDeleteReq(msg, msgSize, &req);
300✔
1173
      if (msgSize < 0) {
300✔
1174
        SCH_TASK_ELOG("tSerializeSVDeleteReq second failed, code:%x", terrno);
×
1175
        SCH_ERR_JRET(terrno);
×
1176
      }
1177
      break;
300✔
1178
    }
1179
    case TDMT_SCH_QUERY:
9,563✔
1180
    case TDMT_SCH_MERGE_QUERY: {
1181
      SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
9,563✔
1182

1183
      SSubQueryMsg qMsg;
1184
      qMsg.header.vgId = addr->nodeId;
9,571✔
1185
      qMsg.header.contLen = 0;
9,571✔
1186
      qMsg.sId = pTask->seriousId;
9,571✔
1187
      qMsg.queryId = pJob->queryId;
9,571✔
1188
      qMsg.clientId = pTask->clientId;
9,571✔
1189
      qMsg.taskId = pTask->taskId;
9,571✔
1190
      qMsg.refId = pJob->refId;
9,571✔
1191
      qMsg.execId = pTask->execId;
9,571✔
1192
      qMsg.msgMask = (pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0;
9,571✔
1193
      qMsg.msgMask |= (pTask->plan->isView) ? QUERY_MSG_MASK_VIEW() : 0;
9,571✔
1194
      qMsg.msgMask |= (pTask->plan->isAudit) ? QUERY_MSG_MASK_AUDIT() : 0;
9,571✔
1195
      qMsg.taskType = TASK_TYPE_TEMP;
9,571✔
1196
      qMsg.explain = SCH_IS_EXPLAIN_JOB(pJob);
9,571✔
1197
      qMsg.needFetch = SCH_TASK_NEED_FETCH(pTask);
9,571✔
1198
      qMsg.sqlLen = strlen(pJob->sql);
9,571✔
1199
      qMsg.sql = pJob->sql;
9,571✔
1200
      qMsg.msgLen = pTask->msgLen;
9,571✔
1201
      qMsg.msg = pTask->msg;
9,571✔
1202

1203
      if (strcmp(tsLocalFqdn, GET_ACTIVE_EP(&addr->epSet)->fqdn) == 0) {
9,571✔
1204
        qMsg.compress = 0;
658✔
1205
      } else {
1206
        qMsg.compress = 1;
8,913✔
1207
      }
1208

1209
      msgSize = tSerializeSSubQueryMsg(NULL, 0, &qMsg);
9,571✔
1210
      if (msgSize < 0) {
9,558✔
1211
        SCH_TASK_ELOG("tSerializeSSubQueryMsg get size, msgSize:%d", msgSize);
×
1212
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1213
      }
1214
      
1215
      msg = taosMemoryCalloc(1, msgSize);
9,558✔
1216
      if (NULL == msg) {
9,557✔
1217
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1218
        SCH_ERR_RET(terrno);
×
1219
      }
1220

1221
      if (tSerializeSSubQueryMsg(msg, msgSize, &qMsg) < 0) {
9,557✔
1222
        SCH_TASK_ELOG("tSerializeSSubQueryMsg failed, msgSize:%d", msgSize);
×
1223
        taosMemoryFree(msg);
×
1224
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1225
      }
1226

1227
      persistHandle = true;
9,562✔
1228
      int64_t refId = 0;
9,562✔
1229
      code = rpcAllocHandle(&refId);
9,562✔
1230
      if (code != 0) {
9,563✔
1231
        SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code);
×
1232
        SCH_ERR_JRET(code);
3✔
1233
      }
1234

1235
      SCH_SET_TASK_HANDLE(pTask, (void *)refId);
9,566✔
1236
      break;
9,566✔
1237
    }
1238
    case TDMT_SCH_FETCH:
8,544✔
1239
    case TDMT_SCH_MERGE_FETCH: {
1240
      SResFetchReq req = {0};
8,544✔
1241
      req.header.vgId = addr->nodeId;
8,544✔
1242
      req.sId = pTask->seriousId;
8,544✔
1243
      req.queryId = pJob->queryId;
8,544✔
1244
      req.clientId = pTask->clientId;
8,544✔
1245
      req.taskId = pTask->taskId;
8,544✔
1246
      req.execId = pTask->execId;
8,544✔
1247

1248
      msgSize = tSerializeSResFetchReq(NULL, 0, &req);
8,544✔
1249
      if (msgSize < 0) {
8,544✔
1250
        SCH_TASK_ELOG("tSerializeSResFetchReq get size, msgSize:%d", msgSize);
×
1251
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1252
      }
1253
      
1254
      msg = taosMemoryCalloc(1, msgSize);
8,544✔
1255
      if (NULL == msg) {
8,546✔
1256
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1257
        SCH_ERR_RET(terrno);
×
1258
      }
1259

1260
      if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
8,546✔
1261
        SCH_TASK_ELOG("tSerializeSResFetchReq %d failed", msgSize);
×
1262
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1263
      }
1264
      break;
8,543✔
1265
    }
1266
    case TDMT_SCH_DROP_TASK: {
9,267✔
1267
      STaskDropReq qMsg;
1268
      qMsg.header.vgId = addr->nodeId;
9,267✔
1269
      qMsg.header.contLen = 0;
9,267✔
1270
      qMsg.sId = pTask->seriousId;
9,267✔
1271
      qMsg.queryId = pJob->queryId;
9,267✔
1272
      qMsg.clientId = pTask->clientId;
9,267✔
1273
      qMsg.taskId = pTask->taskId;
9,267✔
1274
      qMsg.refId = pJob->refId;
9,267✔
1275
      qMsg.execId = *(int32_t*)param;
9,267✔
1276

1277
      msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
9,267✔
1278
      if (msgSize < 0) {
9,266✔
1279
        SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
×
1280
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1281
      }
1282
      
1283
      msg = taosMemoryCalloc(1, msgSize);
9,266✔
1284
      if (NULL == msg) {
9,268✔
1285
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1286
        SCH_ERR_RET(terrno);
×
1287
      }
1288

1289
      if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) {
9,268✔
1290
        SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
×
1291
        taosMemoryFree(msg);
×
1292
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1293
      }
1294
      break;
9,266✔
1295
    }
1296
/*
1297
    case TDMT_SCH_QUERY_HEARTBEAT: {
1298
      SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
1299

1300
      SSchedulerHbReq req = {0};
1301
      req.clientId = schMgmt.clientId;
1302
      req.header.vgId = addr->nodeId;
1303
      req.epId.nodeId = addr->nodeId;
1304
      TAOS_MEMCPY(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
1305

1306
      msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
1307
      if (msgSize < 0) {
1308
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
1309
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
1310
      }
1311
      msg = taosMemoryCalloc(1, msgSize);
1312
      if (NULL == msg) {
1313
        SCH_JOB_ELOG("calloc %d failed", msgSize);
1314
        SCH_ERR_RET(terrno);
1315
      }
1316
      if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
1317
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
1318
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
1319
      }
1320

1321
      persistHandle = true;
1322
      break;
1323
    }
1324
*/    
1325
    case TDMT_SCH_TASK_NOTIFY: {
×
1326
      ETaskNotifyType* pType = param;
×
1327
      STaskNotifyReq qMsg;
1328
      qMsg.header.vgId = addr->nodeId;
×
1329
      qMsg.header.contLen = 0;
×
1330
      qMsg.sId = pTask->seriousId;
×
1331
      qMsg.queryId = pJob->queryId;
×
1332
      qMsg.clientId = pTask->clientId;
×
1333
      qMsg.taskId = pTask->taskId;
×
1334
      qMsg.refId = pJob->refId;
×
1335
      qMsg.execId = pTask->execId;
×
1336
      qMsg.type = *pType;
×
1337

1338
      msgSize = tSerializeSTaskNotifyReq(NULL, 0, &qMsg);
×
1339
      if (msgSize < 0) {
×
1340
        SCH_TASK_ELOG("tSerializeSTaskNotifyReq get size, msgSize:%d", msgSize);
×
1341
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1342
      }
1343
      
1344
      msg = taosMemoryCalloc(1, msgSize);
×
1345
      if (NULL == msg) {
×
1346
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1347
        SCH_ERR_RET(terrno);
×
1348
      }
1349

1350
      if (tSerializeSTaskNotifyReq(msg, msgSize, &qMsg) < 0) {
×
1351
        SCH_TASK_ELOG("tSerializeSTaskNotifyReq failed, msgSize:%d", msgSize);
×
1352
        taosMemoryFree(msg);
×
1353
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1354
      }
1355
      break;      
×
1356
    }
1357
    default:
×
1358
      SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
×
1359
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1360
  }
1361

1362
  if ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (TDMT_VND_SUBMIT == msgType)) {
183,909✔
1363
    taosMemoryFree(msg);
×
1364
    SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
×
1365
  } else {
1366
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
183,909✔
1367
      SCH_ERR_JRET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
9,596✔
1368
    }
1369

1370
    SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
183,915✔
1371
    code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
183,915✔
1372
    msg = NULL;
184,115✔
1373
    SCH_ERR_JRET(code);
184,115✔
1374
  }
1375

1376
  return TSDB_CODE_SUCCESS;
184,087✔
1377

1378
_return:
38✔
1379

1380
  pTask->lastMsgType = -1;
38✔
1381
  schFreeRpcCtx(&rpcCtx);
38✔
1382

1383
  taosMemoryFreeClear(msg);
38✔
1384
  SCH_RET(code);
38✔
1385
}
1386
// clang-format on
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc