• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.45
/source/libs/scheduler/src/schRemote.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "command.h"
18
#include "query.h"
19
#include "schInt.h"
20
#include "tglobal.h"
21
#include "tmisce.h"
22
#include "tmsg.h"
23
#include "tref.h"
24
#include "trpc.h"
25

26
// clang-format off
27
int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
12,775,759✔
28
  int32_t lastMsgType = pTask->lastMsgType;
12,775,759✔
29
  int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
12,775,759✔
30
  int32_t reqMsgType = (msgType & 1U) ? msgType : (msgType - 1);
12,745,833!
31
  switch (msgType) {
12,745,833!
32
    case TDMT_SCH_LINK_BROKEN:
23,088✔
33
    case TDMT_SCH_EXPLAIN_RSP:
34
      return TSDB_CODE_SUCCESS;
23,088✔
35
    case TDMT_SCH_FETCH_RSP:
849,510✔
36
    case TDMT_SCH_MERGE_FETCH_RSP:
37
      if (lastMsgType != reqMsgType) {
849,510✔
38
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
10!
39
                      TMSG_INFO(msgType));
40
        SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
10!
41
      }
42
      if (taskStatus != JOB_TASK_STATUS_FETCH) {
849,500✔
43
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
10!
44
                      TMSG_INFO(msgType));
45
        SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
2!
46
      }
47

48
      return TSDB_CODE_SUCCESS;
849,482✔
49
    case TDMT_SCH_MERGE_QUERY_RSP:
11,876,320✔
50
    case TDMT_SCH_QUERY_RSP:
51
    case TDMT_VND_CREATE_TABLE_RSP:
52
    case TDMT_VND_DROP_TABLE_RSP:
53
    case TDMT_VND_ALTER_TABLE_RSP:
54
    case TDMT_VND_SUBMIT_RSP:
55
    case TDMT_VND_DELETE_RSP:
56
    case TDMT_VND_COMMIT_RSP:
57
      break;
11,876,320✔
UNCOV
58
    default:
×
UNCOV
59
      SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
×
UNCOV
60
      SCH_ERR_RET(TSDB_CODE_INVALID_MSG);
×
61
  }
62

63
  if (lastMsgType != reqMsgType) {
11,873,225✔
64
    SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
10!
65
                  TMSG_INFO(msgType));
66
    SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
10!
67
  }
68

69
  if (taskStatus != JOB_TASK_STATUS_EXEC) {
11,873,215✔
70
    SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
10!
71
                  TMSG_INFO(msgType));
72
    SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
×
73
  }
74

75
  return TSDB_CODE_SUCCESS;
11,870,868✔
76
}
77

78
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode) {
949,111✔
79
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
949,111✔
80
  int32_t code = 0;
949,111✔
81
  
82
  SCH_ERR_JRET(rspCode);
949,111!
83
  
84
  if (NULL == msg) {
949,101✔
85
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
10!
86
  }
87
  
88
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
949,091✔
89
    if (rsp->completed) {
13,600✔
90
      SRetrieveTableRsp *pRsp = NULL;
12,267✔
91
      SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
12,267!
92
      if (pRsp) {
12,267✔
93
        SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
12,265!
94
      } else {
95
        SCH_ERR_JRET(schNotifyJobAllTasks(pJob, pTask, TASK_NOTIFY_FINISHED));
2!
96
      }
97
  
98
      taosMemoryFreeClear(msg);
12,267!
99
  
100
      return TSDB_CODE_SUCCESS;
12,267✔
101
    }
102
  
103
    SCH_ERR_JRET(schLaunchFetchTask(pJob));
1,333!
104
  
105
    taosMemoryFreeClear(msg);
1,333!
106
  
107
    return TSDB_CODE_SUCCESS;
1,333✔
108
  }
109
  
110
  if (pJob->fetchRes) {
935,491✔
111
    SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes);
10!
112
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
10!
113
  }
114
  
115
  atomic_store_ptr(&pJob->fetchRes, rsp);
935,481✔
116
  (void)atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows));
935,502✔
117
  
118
  if (rsp->completed) {
935,500✔
119
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
711,873✔
120
  }
121
  
122
  SCH_TASK_DLOG("got fetch rsp, rows:%" PRId64 ", complete:%d", htobe64(rsp->numOfRows), rsp->completed);
935,498!
123

124
  msg = NULL;
935,497✔
125
  schProcessOnDataFetched(pJob);
935,497✔
126

127
_return:
935,516✔
128

129
  taosMemoryFreeClear(msg);
935,516!
130

131
  SCH_RET(code);
935,516!
132
}
133

134
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
23,676✔
135
  SRetrieveTableRsp *pRsp = NULL;
23,676✔
136
  SCH_ERR_RET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp));
23,676!
137
  
138
  if (pRsp) {
23,676✔
139
    SCH_ERR_RET(schProcessOnExplainDone(pJob, pTask, pRsp));
2!
140
  }
141

142
  return TSDB_CODE_SUCCESS;
23,676✔
143
}
144

145
int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
12,736,684✔
146
  int32_t code = 0;
12,736,684✔
147
  int32_t msgSize = pMsg->len;
12,736,684✔
148
  int32_t msgType = pMsg->msgType;
12,736,684✔
149

150
  pTask->redirectCtx.inRedirect = false;
12,736,684✔
151

152
  switch (msgType) {
12,736,684!
153
    case TDMT_VND_COMMIT_RSP: {
19,651✔
154
      SCH_ERR_JRET(rspCode);
19,651!
155
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
19,651!
156
      break;
19,654✔
157
    }
158
    case TDMT_VND_CREATE_TABLE_RSP: {
119,591✔
159
      SVCreateTbBatchRsp batchRsp = {0};
119,591✔
160
      if (pMsg->pData) {
119,591✔
161
        SDecoder coder = {0};
119,584✔
162
        tDecoderInit(&coder, pMsg->pData, msgSize);
119,584✔
163
        code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
119,597✔
164
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
119,622!
165
          SCH_LOCK(SCH_WRITE, &pJob->resLock);
119,634!
166
          if (NULL == pJob->execRes.res) {
119,652✔
167
            pJob->execRes.res = (void*)taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
110,728✔
168
            if (NULL == pJob->execRes.res) {
110,722✔
169
              code = terrno;
26✔
170
              SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
171
              
172
              tDecoderClear(&coder);
×
173
              SCH_ERR_JRET(code);
17!
174
            }
175
            
176
            pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
110,696✔
177
          }
178

179
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
265,468✔
180
            SVCreateTbRsp *rsp = batchRsp.pRsps + i;
145,848✔
181
            if (rsp->pMeta) {
145,848✔
182
              if (NULL == taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta)) {
291,504!
183
                code = terrno;
×
184
                SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
185
                
186
                tDecoderClear(&coder);
×
187
                SCH_ERR_JRET(code);
×
188
              }
189
            }
190
            
191
            if (TSDB_CODE_SUCCESS != rsp->code) {
145,848✔
192
              code = rsp->code;
17✔
193
            }
194
          }
195

196
          if (taosArrayGetSize((SArray*)pJob->execRes.res) <= 0) {        
119,620✔
197
            taosArrayDestroy((SArray*)pJob->execRes.res);
126✔
198
            pJob->execRes.res = NULL;
126✔
199
          }
200
          SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
119,600!
201
        }
202
        
203
        tDecoderClear(&coder);
119,647✔
204
        SCH_ERR_JRET(code);
119,630!
205
      }
206

207
      SCH_ERR_JRET(rspCode);
119,620!
208
      taosMemoryFreeClear(pMsg->pData);
119,620!
209

210
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
119,669!
211
      break;
119,623✔
212
    }
213
    case TDMT_VND_DROP_TABLE_RSP: {
15,461✔
214
      SVDropTbBatchRsp batchRsp = {0};
15,461✔
215
      if (pMsg->pData) {
15,461!
216
        SDecoder coder = {0};
15,461✔
217
        tDecoderInit(&coder, pMsg->pData, msgSize);
15,461✔
218
        code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
15,461✔
219
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
15,461!
220
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
31,338✔
221
            SVDropTbRsp *rsp = batchRsp.pRsps + i;
15,877✔
222
            if (TSDB_CODE_SUCCESS != rsp->code) {
15,877!
223
              code = rsp->code;
×
224
              tDecoderClear(&coder);
×
225
              SCH_ERR_JRET(code);
×
226
            }
227
          }
228
        }
229
        tDecoderClear(&coder);
15,461✔
230
        SCH_ERR_JRET(code);
15,461!
231
      }
232

233
      SCH_ERR_JRET(rspCode);
15,461!
234
      taosMemoryFreeClear(pMsg->pData);
15,461!
235

236
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
15,461!
237
      break;
15,461✔
238
    }
239
    case TDMT_VND_ALTER_TABLE_RSP: {
5,803✔
240
      SVAlterTbRsp rsp = {0};
5,803✔
241
      if (pMsg->pData) {
5,803✔
242
        SDecoder coder = {0};
5,789✔
243
        tDecoderInit(&coder, pMsg->pData, msgSize);
5,789✔
244
        code = tDecodeSVAlterTbRsp(&coder, &rsp);
5,789✔
245
        tDecoderClear(&coder);
5,789✔
246
        SCH_ERR_JRET(code);
6,704!
247
        SCH_ERR_JRET(rsp.code);
5,789!
248

249
        pJob->execRes.res = rsp.pMeta;
4,874✔
250
        pJob->execRes.msgType = TDMT_VND_ALTER_TABLE;
4,874✔
251
      }
252

253
      SCH_ERR_JRET(rspCode);
4,888!
254

255
      if (NULL == pMsg->pData) {
4,884✔
256
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
10!
257
      }
258

259
      taosMemoryFreeClear(pMsg->pData);
4,874!
260

261
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
4,874!
262
      break;
4,874✔
263
    }
264
    case TDMT_VND_SUBMIT_RSP: {
9,709,223✔
265
      SCH_ERR_JRET(rspCode);
9,709,223!
266

267
      if (pMsg->pData) {
9,708,579!
268
        SDecoder    coder = {0};
9,710,871✔
269
        SSubmitRsp2 *rsp = taosMemoryMalloc(sizeof(*rsp));
9,710,871!
270
        if (NULL == rsp) {
9,756,903!
271
          SCH_ERR_JRET(terrno);
10!
272
        }
273
        tDecoderInit(&coder, pMsg->pData, msgSize);
9,756,903✔
274
        code = tDecodeSSubmitRsp2(&coder, rsp);
9,756,973✔
275
        tDecoderClear(&coder);
9,760,112✔
276
        if (code) {
9,779,923✔
277
          SCH_TASK_ELOG("tDecodeSSubmitRsp2 failed, code:%d", code);
10!
278
          tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE);
10✔
279
          taosMemoryFree(rsp);
10!
280
          SCH_ERR_JRET(code);
10!
281
        }
282

283
        (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows);
9,779,913✔
284

285
        int32_t createTbRspNum = taosArrayGetSize(rsp->aCreateTbRsp);
9,787,147✔
286
        SCH_TASK_DLOG("submit succeed, affectedRows:%d, createTbRspNum:%d", rsp->affectedRows, createTbRspNum);
9,776,796!
287

288
        if (rsp->aCreateTbRsp && taosArrayGetSize(rsp->aCreateTbRsp) > 0) {
9,776,799!
289
          SCH_LOCK(SCH_WRITE, &pJob->resLock);
75,727!
290
          if (pJob->execRes.res) {
75,721✔
291
            SSubmitRsp2 *sum = pJob->execRes.res;
198✔
292
            sum->affectedRows += rsp->affectedRows;
198✔
293
            if (sum->aCreateTbRsp) {
198✔
294
              if (NULL == taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp)) {
195!
295
                code = terrno;
×
296
                SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
297
                SCH_ERR_JRET(code);
×
298
              }
299
              
300
              taosArrayDestroy(rsp->aCreateTbRsp);
195✔
301
            } else {
302
              TSWAP(sum->aCreateTbRsp, rsp->aCreateTbRsp);
3✔
303
            }
304
            taosMemoryFree(rsp);
198!
305
          } else {
306
            pJob->execRes.res = rsp;
75,523✔
307
            pJob->execRes.msgType = TDMT_VND_SUBMIT;
75,523✔
308
          }
309
          pJob->execRes.numOfBytes += pTask->msgLen;
75,721✔
310
          SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
75,721!
311
        } else {
312
          SCH_LOCK(SCH_WRITE, &pJob->resLock);
9,701,083!
313
          pJob->execRes.numOfBytes += pTask->msgLen;
9,630,959✔
314
          if (NULL == pJob->execRes.res) {
9,630,959✔
315
            TSWAP(pJob->execRes.res, rsp);
9,588,639✔
316
            pJob->execRes.msgType = TDMT_VND_SUBMIT;
9,588,639✔
317
          }
318
          SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
9,630,959!
319
          tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE);
9,670,882✔
320
          taosMemoryFree(rsp);
9,667,222!
321
        }
322
      }
323

324
      taosMemoryFreeClear(pMsg->pData);
9,736,554!
325

326
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
9,764,068!
327

328
      break;
9,769,108✔
329
    }
330
    case TDMT_VND_DELETE_RSP: {
55,335✔
331
      SCH_ERR_JRET(rspCode);
55,335!
332

333
      if (pMsg->pData) {
55,335✔
334
        SDecoder    coder = {0};
55,334✔
335
        SVDeleteRsp rsp = {0};
55,334✔
336
        tDecoderInit(&coder, pMsg->pData, msgSize);
55,334✔
337
        if (tDecodeSVDeleteRsp(&coder, &rsp) < 0) {
55,334✔
338
          code = terrno;
10✔
339
          tDecoderClear(&coder);
10✔
340
          SCH_ERR_JRET(code);
10!
341
        }
342
        tDecoderClear(&coder);
55,328✔
343

344
        (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
55,325✔
345
        SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
55,328!
346
      }
347

348
      taosMemoryFreeClear(pMsg->pData);
55,329!
349

350
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
55,328!
351

352
      break;
55,325✔
353
    }
354
    case TDMT_SCH_QUERY_RSP:
1,947,547✔
355
    case TDMT_SCH_MERGE_QUERY_RSP: {
356
      SCH_ERR_JRET(rspCode);
1,947,567!
357
      if (NULL == pMsg->pData) {
1,946,572✔
358
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
10!
359
      }
360

361
      if (taosArrayGetSize(pTask->parents) == 0 && SCH_IS_EXPLAIN_JOB(pJob) && SCH_IS_INSERT_JOB(pJob)) {
1,946,562✔
362
        SRetrieveTableRsp *pRsp = NULL;
9✔
363
        SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
9!
364
        if (pRsp) {
9!
365
          SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
9!
366
        }
367
      }
368

369
      SQueryTableRsp rsp = {0};
1,946,649✔
370
      if (tDeserializeSQueryTableRsp(pMsg->pData, msgSize, &rsp) < 0) {
1,946,649✔
371
        SCH_TASK_ELOG("tDeserializeSQueryTableRsp failed, msgSize:%d", msgSize);
10!
372
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_MSG);
8!
373
      }
374
      
375
      SCH_ERR_JRET(rsp.code);
1,946,639!
376

377
      SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp));
1,946,639!
378

379
      (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
1,946,719✔
380

381
      taosMemoryFreeClear(pMsg->pData);
1,946,783!
382

383
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
1,946,763!
384

385
      break;
1,946,778✔
386
    }
387
    case TDMT_SCH_EXPLAIN_RSP: {
23,128✔
388
      SCH_ERR_JRET(rspCode);
23,168!
389
      if (NULL == pMsg->pData) {
23,128✔
390
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
10!
391
      }
392

393
      if (!SCH_IS_EXPLAIN_JOB(pJob)) {
23,118✔
394
        SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
10!
395
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
10!
396
      }
397

398
      if (pJob->fetchRes) {
23,108✔
399
        SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->fetchRes);
10!
400
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
10!
401
      }
402

403
      SExplainRsp rsp = {0};
23,098✔
404
      if (tDeserializeSExplainRsp(pMsg->pData, msgSize, &rsp)) {
23,098✔
405
        tFreeSExplainRsp(&rsp);
10✔
406
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
10!
407
      }
408

409
      SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp));
23,088!
410

411
      taosMemoryFreeClear(pMsg->pData);
23,088!
412
      break;
23,088✔
413
    }
414
    case TDMT_SCH_FETCH_RSP:
849,476✔
415
    case TDMT_SCH_MERGE_FETCH_RSP: {
416
      code = schProcessFetchRsp(pJob, pTask, pMsg->pData, rspCode);
849,476✔
417
      pMsg->pData = NULL;
849,489✔
418
      SCH_ERR_JRET(code);
849,489!
419
      break;
849,489✔
420
    }
421
    case TDMT_SCH_DROP_TASK_RSP: {
10✔
422
      // NEVER REACH HERE
423
      SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId);
10!
424
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
10!
425
      break;
×
426
    }
427
    case TDMT_SCH_LINK_BROKEN:
10✔
428
      SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
10!
429
      SCH_ERR_JRET(rspCode);
10!
430
      break;
10✔
431
    default:
×
432
      SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%s", msgType, SCH_GET_TASK_STATUS_STR(pTask));
×
433
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
×
434
  }
435

436
  return TSDB_CODE_SUCCESS;
12,802,157✔
437

438
_return:
2,665✔
439

440
  taosMemoryFreeClear(pMsg->pData);
2,665!
441

442
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
2,665!
443
} 
444

445

446
// Note: no more task error processing, handled in function internal
447
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, uint64_t seriousId, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
12,752,498✔
448
  int32_t code = 0;
12,752,498✔
449
  int32_t msgType = pMsg->msgType;
12,752,498✔
450

451
  bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
12,752,498!
452
  if (SCH_IS_QUERY_JOB(pJob)) {
12,752,498✔
453
    SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, seriousId, execId));
2,825,856!
454
  }
455
  
456
  SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType));
12,751,859!
457

458
  if (pTask->seriousId < atomic_load_64(&pJob->seriousId)) {
12,749,465✔
459
    SCH_TASK_DLOG("task sId %" PRId64 " is smaller than current job sId %" PRId64, pTask->seriousId, pJob->seriousId);
2,256!
460
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
1,193!
461
  }
462

463
  int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
12,737,527!
464
#if 0
465
  if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
466
    SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
467
  } else if (SCH_TASKSET_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
468
    SCH_RET(schHandleTaskSetRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
469
  }
470
#else 
471
  if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
12,737,527!
UNCOV
472
    SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
✔
473
  }
474
#endif
475

476
  pTask->redirectCtx.inRedirect = false;
12,743,684✔
477

478
  SCH_RET(schProcessResponseMsg(pJob, pTask, pMsg, rspCode));
12,743,684✔
479

480
_return:
2,828✔
481

482
  taosMemoryFreeClear(pMsg->pData);
2,828!
483

484
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
2,828!
485
} 
486
int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
12,822,234✔
487
  int32_t                code = 0;
12,822,234✔
488
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
12,822,234✔
489
  SSchTask              *pTask = NULL;
12,822,234✔
490
  SSchJob               *pJob = NULL;
12,822,234✔
491

492
  int64_t qid = pParam->queryId;
12,822,234✔
493
  qDebug("QID:0x%" PRIx64 ", handle rsp msg, type:%s, handle:%p, code:%s", qid,TMSG_INFO(pMsg->msgType), pMsg->handle,
12,822,234!
494
         tstrerror(rspCode));
495

496
  SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId));
12,822,234✔
497
  code = schHandleResponseMsg(pJob, pTask, pParam->seriousId, pParam->execId, pMsg, rspCode);
12,748,167✔
498
  pMsg->pData = NULL;
12,808,442✔
499

500
  schProcessOnCbEnd(pJob, pTask, code);
12,808,442✔
501

502
_return:
12,818,461✔
503

504
  taosMemoryFreeClear(pMsg->pData);
12,818,461!
505
  taosMemoryFreeClear(pMsg->pEpSet);
12,818,461!
506

507
  qTrace("QID:0x%" PRIx64 ", end to handle rsp msg, type:%s, handle:%p, code:%s", qid, TMSG_INFO(pMsg->msgType), pMsg->handle,
12,818,458!
508
         tstrerror(rspCode));
509

510
  SCH_RET(code);
12,812,323✔
511
}
512

513
int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
7,584✔
514
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
7,584✔
515
  qDebug("QID:0x%" PRIx64 ", SID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " drop task rsp received, code:0x%x", 
7,584✔
516
         pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code);
517
  // called if drop task rsp received code
518
  (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
7,584✔
519

520
  if (pMsg->handle == NULL) {
7,584✔
521
    qError("sch handle is NULL, may be already released and mem lea");
7,260!
522
  }
523
  if (pMsg) {
7,584!
524
    taosMemoryFree(pMsg->pData);
7,584!
525
    taosMemoryFree(pMsg->pEpSet);
7,584!
526
  }
527
  return TSDB_CODE_SUCCESS;
7,584✔
528
}
529

530
int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
10✔
531
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
10✔
532
  qDebug("QID:0x%" PRIx64 ", SID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " task notify rsp received, code:0x%x", 
10!
533
         pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code);
534
  if (pMsg) {
10!
535
    taosMemoryFreeClear(pMsg->pData);
10!
536
    taosMemoryFreeClear(pMsg->pEpSet);
10!
537
  }
538
  return TSDB_CODE_SUCCESS;
10✔
539
}
540

541

542
int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
20✔
543
  SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
20✔
544
  (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
20✔
545

546
  qDebug("handle %p is broken", pMsg->handle);
20!
547

548
  if (head->isHbParam) {
20✔
549
    taosMemoryFreeClear(pMsg->pData);
10!
550
    taosMemoryFreeClear(pMsg->pEpSet);
10!
551

552
    SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
10✔
553
    SSchTrans            trans = {.pTrans = hbParam->pTrans, .pHandle = NULL, .pHandleId = 0};
10✔
554
    SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));
10!
555

556
    SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId, NULL));
10!
557
  } else {
558
    SCH_ERR_RET(schHandleCallback(param, pMsg, code));
10!
559
  }
560

561
  return TSDB_CODE_SUCCESS;
10✔
562
}
563

564
int32_t schHandleCommitCallback(void *param, SDataBuf *pMsg, int32_t code) {
19,652✔
565
  return schHandleCallback(param, pMsg, code);
19,652✔
566
}
567

568
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
1,154,668✔
569
  SSchedulerHbRsp        rsp = {0};
1,154,668✔
570
  SSchHbCallbackParam *pParam = (SSchHbCallbackParam *)param;
1,154,668✔
571

572
  if (code) {
1,154,668✔
573
    qError("hb rsp error:%s", tstrerror(code));
1,312!
574
    (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
1,312✔
575
    SCH_ERR_JRET(code);
1,312!
576
  }
577

578
  if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
1,153,356✔
579
    qError("invalid hb rsp msg, size:%d", pMsg->len);
10!
580
    SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
10!
581
  }
582

583
  SSchTrans trans = {0};
1,153,282✔
584
  trans.pTrans = pParam->pTrans;
1,153,282✔
585
  trans.pHandle = pMsg->handle;
1,153,282✔
586
  trans.pHandleId = pMsg->handleRefId;
1,153,282✔
587

588
  SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
1,153,282!
589
  SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
1,153,395!
590

591
_return:
1,153,401✔
592

593
  tFreeSSchedulerHbRsp(&rsp);
1,154,723✔
594
  taosMemoryFree(pMsg->pData);
1,154,712!
595
  taosMemoryFree(pMsg->pEpSet);
1,154,719!
596
  SCH_RET(code);
1,154,706!
597
}
598

599
int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans,
20,860,847✔
600
                             void **pParam) {
601
  if (!isHb) {
20,860,847✔
602
    SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
18,607,360!
603
    if (NULL == param) {
18,596,696!
604
      SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
×
605
      SCH_ERR_RET(terrno);
×
606
    }
607

608
    param->queryId = pJob->queryId;
18,596,696✔
609
    param->seriousId = pTask->seriousId;
18,596,696✔
610
    param->refId = pJob->refId;
18,596,696✔
611
    param->clientId = SCH_CLIENT_ID(pTask);
18,596,696!
612
    param->taskId = SCH_TASK_ID(pTask);
18,596,696!
613
    param->pTrans = pJob->conn.pTrans;
18,596,696✔
614
    param->execId = pTask->execId;
18,596,696✔
615
    *pParam = param;
18,596,696✔
616

617
    return TSDB_CODE_SUCCESS;
18,596,696✔
618
  }
619

620
  if (TDMT_SCH_LINK_BROKEN == msgType) {
2,253,487✔
621
    SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
1,177,914!
622
    if (NULL == param) {
1,181,981!
623
      SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
×
624
      SCH_ERR_RET(terrno);
×
625
    }
626

627
    param->head.isHbParam = true;
1,181,981✔
628

629
    SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
1,181,981✔
630
    if (NULL == addr) {
1,180,939!
631
      taosMemoryFree(param);
×
632
      SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum: %d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
×
633
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
634
    }
635
    param->nodeEpId.nodeId = addr->nodeId;
1,180,827✔
636
    SEp *pEp = SCH_GET_CUR_EP(addr);
1,180,827✔
637
    TAOS_STRCPY(param->nodeEpId.ep.fqdn, pEp->fqdn);
1,180,827✔
638
    param->nodeEpId.ep.port = pEp->port;
1,180,827✔
639
    param->pTrans = trans->pTrans;
1,180,827✔
640
    *pParam = param;
1,180,827✔
641

642
    return TSDB_CODE_SUCCESS;
1,180,827✔
643
  }
644

645
  // hb msg
646
  SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
1,075,573!
647
  if (NULL == param) {
1,113,614!
648
    qError("calloc SSchTaskCallbackParam failed");
×
649
    SCH_ERR_RET(terrno);
×
650
  }
651

652
  param->head.isHbParam = true;
1,113,614✔
653
  param->pTrans = trans->pTrans;
1,113,614✔
654
  *pParam = param;
1,113,614✔
655

656
  return TSDB_CODE_SUCCESS;
1,113,614✔
657
}
658

659
int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint32_t msgSize, int32_t msgType,
20,845,543✔
660
                                SSchTrans *trans, bool isHb, SMsgSendInfo **pMsgSendInfo) {
661
  int32_t       code = 0;
20,845,543✔
662
  SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
20,845,543!
663
  if (NULL == msgSendInfo) {
20,924,954!
664
    qError("calloc SMsgSendInfo size %d failed", (int32_t)sizeof(SMsgSendInfo));
×
665
    SCH_ERR_JRET(terrno);
×
666
  }
667

668
  msgSendInfo->paramFreeFp = taosAutoMemoryFree;
20,924,954✔
669
  SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param));
20,924,954!
670

671
  SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp));
20,894,331!
672

673
  if (pJob) {
20,885,509✔
674
    msgSendInfo->requestId = pJob->conn.requestId;
19,751,181✔
675
    msgSendInfo->requestObjRefId = pJob->conn.requestObjRefId;
19,751,181✔
676
  } else {
677
    SCH_ERR_JRET(taosGetSystemUUIDU64(&msgSendInfo->requestId));
1,134,328!
678
  }
679

680
  qDebug("ahandle %p alloced, QID:0x%" PRIx64, msgSendInfo, msgSendInfo->requestId);
20,866,144✔
681

682
  if (TDMT_SCH_LINK_BROKEN != msgType) {
20,895,774✔
683
    msgSendInfo->msgInfo.pData = msg;
17,772,521✔
684
    msgSendInfo->msgInfo.len = msgSize;
17,772,521✔
685
    msgSendInfo->msgInfo.handle = trans->pHandle;
17,772,521✔
686
    msgSendInfo->msgType = msgType;
17,772,521✔
687
  }
688

689
  *pMsgSendInfo = msgSendInfo;
20,895,774✔
690

691
  return TSDB_CODE_SUCCESS;
20,895,774✔
692

693
_return:
×
694

695
  if (msgSendInfo) {
×
696
    destroySendMsgInfo(msgSendInfo);
×
697
  }
698

699
  taosMemoryFree(msg);
×
700

701
  SCH_RET(code);
×
702
}
703

704
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
22,037,973✔
705
  switch (msgType) {
22,037,973!
706
    case TDMT_VND_CREATE_TABLE:
14,661,156✔
707
    case TDMT_VND_DROP_TABLE:
708
    case TDMT_VND_ALTER_TABLE:
709
    case TDMT_VND_SUBMIT:
710
    case TDMT_SCH_QUERY:
711
    case TDMT_SCH_MERGE_QUERY:
712
    case TDMT_VND_DELETE:
713
    case TDMT_SCH_EXPLAIN:
714
    case TDMT_SCH_FETCH:
715
    case TDMT_SCH_MERGE_FETCH:
716
      *fp = schHandleCallback;
14,661,156✔
717
      break;
14,661,156✔
718
    case TDMT_SCH_DROP_TASK:
1,952,374✔
719
      *fp = schHandleDropCallback;
1,952,374✔
720
      break;
1,952,374✔
721
    case TDMT_SCH_TASK_NOTIFY:
36✔
722
      *fp = schHandleNotifyCallback;
36✔
723
      break;
36✔
724
    case TDMT_SCH_QUERY_HEARTBEAT:
2,294,136✔
725
      *fp = schHandleHbCallback;
2,294,136✔
726
      break;
2,294,136✔
727
    case TDMT_VND_COMMIT:
19,652✔
728
      *fp = schHandleCommitCallback;
19,652✔
729
      break;
19,652✔
730
    case TDMT_SCH_LINK_BROKEN:
3,133,185✔
731
      *fp = schHandleLinkBrokenCallback;
3,133,185✔
732
      break;
3,133,185✔
733
    default:
×
734
      qError("unknown msg type for callback, msgType:%d", msgType);
×
735
      SCH_ERR_RET(TSDB_CODE_APP_ERROR);
×
736
  }
737

738
  return TSDB_CODE_SUCCESS;
22,060,518✔
739
}
740

741
/*
742
int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
743
  SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
744
  if (NULL == param) {
745
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
746
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
747
  }
748

749
  param->head.isHbParam = true;
750

751
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
752

753
  param->nodeEpId.nodeId = addr->nodeId;
754
  SEp* pEp = SCH_GET_CUR_EP(addr);
755
  tstrncpy(param->nodeEpId.ep.fqdn, pEp->fqdn, sizeof(param->nodeEpId.ep.fqdn));
756
  param->nodeEpId.ep.port = pEp->port;
757
  param->pTrans = pJob->pTrans;
758

759
  *pParam = param;
760

761
  return TSDB_CODE_SUCCESS;
762
}
763
*/
764

765
int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
1,108,573✔
766
  int32_t code = 0;
1,108,573✔
767
  TAOS_MEMCPY(pDst, pSrc, sizeof(SRpcCtx));
1,108,573✔
768
  pDst->brokenVal.val = NULL;
1,108,573✔
769
  pDst->args = NULL;
1,108,573✔
770

771
  SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, &pDst->brokenVal.val));
1,108,573!
772

773
  pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,104,690✔
774
  if (NULL == pDst->args) {
1,111,984!
775
    qError("taosHashInit %d RpcCtx failed", 1);
×
776
    SCH_ERR_JRET(terrno);
×
777
  }
778

779
  SRpcCtxVal dst = {0};
1,111,984✔
780
  void      *pIter = taosHashIterate(pSrc->args, NULL);
1,111,984✔
781
  while (pIter) {
2,233,163✔
782
    SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
1,115,497✔
783
    int32_t    *msgType = taosHashGetKey(pIter, NULL);
1,115,497✔
784

785
    dst = *pVal;
1,111,756✔
786
    dst.val = NULL;
1,111,756✔
787

788
    SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, &dst.val));
1,111,756!
789

790
    if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) {
1,109,394!
791
      qError("taosHashPut msg %d to rpcCtx failed", *msgType);
×
792
      (*pSrc->freeFunc)(dst.val);
×
793
      SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
794
    }
795

796
    pIter = taosHashIterate(pSrc->args, pIter);
1,117,477✔
797
  }
798

799
  return TSDB_CODE_SUCCESS;
1,117,666✔
800

801
_return:
×
802

803
  schFreeRpcCtx(pDst);
×
804
  SCH_RET(code);
×
805
}
806

807
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
1,180,019✔
808
  int32_t              code = 0;
1,180,019✔
809
  SSchHbCallbackParam *param = NULL;
1,180,019✔
810
  SMsgSendInfo        *pMsgSendInfo = NULL;
1,180,019✔
811
  SQueryNodeAddr      *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
1,180,019✔
812
  SQueryNodeEpId       epId = {0};
1,182,286✔
813

814
  epId.nodeId = addr->nodeId;
1,182,286✔
815
  TAOS_MEMCPY(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
1,182,286✔
816

817
  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,182,286✔
818
  if (NULL == pCtx->args) {
1,182,093!
819
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
×
820
    SCH_ERR_RET(terrno);
×
821
  }
822

823
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,182,093!
824
  if (NULL == pMsgSendInfo) {
1,182,882!
825
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
×
826
    SCH_ERR_JRET(terrno);
×
827
  }
828

829
  param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
1,182,882!
830
  if (NULL == param) {
1,182,291!
831
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
×
832
    SCH_ERR_JRET(terrno);
×
833
  }
834

835
  int32_t              msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP;
1,182,291✔
836
  __async_send_cb_fn_t fp = NULL;
1,182,291✔
837
  SCH_ERR_JRET(schGetCallbackFp(TDMT_SCH_QUERY_HEARTBEAT, &fp));
1,182,291!
838

839
  param->head.isHbParam = true;
1,179,788✔
840
  param->nodeEpId = epId;
1,179,788✔
841
  param->pTrans = pJob->conn.pTrans;
1,179,788✔
842

843
  pMsgSendInfo->param = param;
1,179,788✔
844
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
1,179,788✔
845
  pMsgSendInfo->fp = fp;
1,179,788✔
846

847
  SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo};
1,179,788✔
848
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
1,179,788!
849
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
×
850
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
851
  }
852

853
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, true));
1,183,828!
854
  pCtx->freeFunc = schFreeRpcCtxVal;
1,185,401✔
855

856
  return TSDB_CODE_SUCCESS;
1,185,401✔
857

858
_return:
×
859

860
  taosHashCleanup(pCtx->args);
×
861
  taosMemoryFreeClear(param);
×
862
  taosMemoryFreeClear(pMsgSendInfo);
×
863

864
  SCH_RET(code);
×
865
}
866

867
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
3,130,969✔
868
  int32_t       code = 0;
3,130,969✔
869
  int32_t       msgType = TDMT_SCH_LINK_BROKEN;
3,130,969✔
870
  SSchTrans     trans = {.pTrans = pJob->conn.pTrans};
3,130,969✔
871
  SMsgSendInfo *pMsgSendInfo = NULL;
3,130,969✔
872
  SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, msgType, &trans, isHb, &pMsgSendInfo));
3,130,969!
873

874
  brokenVal->msgType = msgType;
3,138,783✔
875
  brokenVal->val = pMsgSendInfo;
3,138,783✔
876
  brokenVal->clone = schCloneSMsgSendInfo;
3,138,783✔
877

878
  return TSDB_CODE_SUCCESS;
3,138,783✔
879

880
_return:
×
881

882
  taosMemoryFreeClear(pMsgSendInfo->param);
×
883
  taosMemoryFreeClear(pMsgSendInfo);
×
884

885
  SCH_RET(code);
×
886
}
887

888
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
1,948,126✔
889
  int32_t       code = 0;
1,948,126✔
890
  SMsgSendInfo *pExplainMsgSendInfo = NULL;
1,948,126✔
891

892
  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,948,126✔
893
  if (NULL == pCtx->args) {
1,948,140!
894
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
×
895
    SCH_ERR_RET(terrno);
×
896
  }
897

898
  SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
1,948,140!
899
  SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, TDMT_SCH_EXPLAIN, &trans, false, &pExplainMsgSendInfo));
1,948,140!
900

901
  int32_t    msgType = TDMT_SCH_EXPLAIN_RSP;
1,952,891✔
902
  SRpcCtxVal ctxVal = {.val = pExplainMsgSendInfo, .clone = schCloneSMsgSendInfo};
1,952,891✔
903
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
1,952,891!
904
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
×
905
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
906
  }
907

908
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, false));
1,953,531!
909
  pCtx->freeFunc = schFreeRpcCtxVal;
1,953,067✔
910

911
  return TSDB_CODE_SUCCESS;
1,953,067✔
912

913
_return:
×
914

915
  taosHashCleanup(pCtx->args);
×
916

917
  if (pExplainMsgSendInfo) {
×
918
    taosMemoryFreeClear(pExplainMsgSendInfo->param);
×
919
    taosMemoryFreeClear(pExplainMsgSendInfo);
×
920
  }
921

922
  SCH_RET(code);
×
923
}
924

925
int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) {
2,284,808✔
926
  if (pSrc->isHbParam) {
2,284,808✔
927
    SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam));
2,264,497!
928
    if (NULL == dst) {
2,247,156!
929
      qError("malloc SSchHbCallbackParam failed");
×
930
      SCH_ERR_RET(terrno);
×
931
    }
932

933
    TAOS_MEMCPY(dst, pSrc, sizeof(*dst));
2,247,156✔
934
    *pDst = (SSchCallbackParamHeader *)dst;
2,247,156✔
935

936
    return TSDB_CODE_SUCCESS;
2,247,156✔
937
  }
938

939
  SSchTaskCallbackParam *dst = taosMemoryMalloc(sizeof(SSchTaskCallbackParam));
20,311!
940
  if (NULL == dst) {
23,086!
941
    qError("malloc SSchTaskCallbackParam failed");
×
942
    SCH_ERR_RET(terrno);
×
943
  }
944

945
  TAOS_MEMCPY(dst, pSrc, sizeof(*dst));
23,086✔
946
  *pDst = (SSchCallbackParamHeader *)dst;
23,086✔
947

948
  return TSDB_CODE_SUCCESS;
23,086✔
949
}
950

951
int32_t schCloneSMsgSendInfo(void *src, void **dst) {
2,278,763✔
952
  SMsgSendInfo *pSrc = src;
2,278,763✔
953
  int32_t       code = 0;
2,278,763✔
954
  SMsgSendInfo *pDst = taosMemoryCalloc(1, sizeof(*pSrc));
2,278,763!
955
  if (NULL == pDst) {
2,284,728!
956
    qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
×
957
    SCH_ERR_RET(terrno);
×
958
  }
959

960
  TAOS_MEMCPY(pDst, pSrc, sizeof(*pSrc));
2,284,728✔
961
  pDst->param = NULL;
2,284,728✔
962

963
  SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
2,284,728!
964
  pDst->paramFreeFp = taosAutoMemoryFree;
2,270,206✔
965

966
  *dst = pDst;
2,270,206✔
967

968
  return TSDB_CODE_SUCCESS;
2,270,206✔
969

970
_return:
×
971

972
  taosMemoryFreeClear(pDst);
×
973
  SCH_RET(code);
×
974
}
975

976
int32_t schUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, SQueryNodeAddr *addr, SSchTask *pTask) {
15,772,243✔
977
  if (NULL == pTask || addr->nodeId < MNODE_HANDLE) {
15,772,243✔
978
    return TSDB_CODE_SUCCESS;
1,917,049✔
979
  }
980

981
  if (addr->nodeId == MNODE_HANDLE) {
13,855,194✔
982
    pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
47,735✔
983
  } else {
984
    pMsgSendInfo->target.type = TARGET_TYPE_VNODE;
13,807,459✔
985
    pMsgSendInfo->target.vgId = addr->nodeId;
13,807,459✔
986
    pMsgSendInfo->target.dbFName = taosStrdup(pTask->plan->dbFName);
13,807,459!
987
    if (NULL == pMsgSendInfo->target.dbFName) {
13,796,799!
988
      return terrno;
×
989
    }
990
  }
991

992
  return TSDB_CODE_SUCCESS;
13,844,534✔
993
}
994

995
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQueryNodeAddr *addr, int32_t msgType,
15,773,412✔
996
                        void *msg, uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
997
  int32_t code = 0;
15,773,412✔
998
  SEpSet *epSet = &addr->epSet;
15,773,412✔
999

1000
  SMsgSendInfo *pMsgSendInfo = NULL;
15,773,412✔
1001
  bool          isHb = (TDMT_SCH_QUERY_HEARTBEAT == msgType);
15,773,412✔
1002
  SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
15,773,412!
1003
  SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
15,807,018!
1004

1005
  if (isHb && persistHandle && trans->pHandle == 0) {
15,823,527!
1006
    int64_t refId = 0;
1,118,602✔
1007
    code = rpcAllocHandle(&refId); 
1,118,602✔
1008
    if (code != 0) {
1,118,670!
1009
      SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code);
×
1010
      SCH_ERR_JRET(code);
×
1011
    }
1012
    trans->pHandle = (void *)refId;
1,118,570✔
1013
    pMsgSendInfo->msgInfo.handle =trans->pHandle;
1,118,570✔
1014
  } 
1015

1016
  if (pJob && pTask) {
15,823,495!
1017
    SCH_TASK_DLOG("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
14,671,660!
1018
           epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
1019
  } else {
1020
    qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
1,151,835!
1021
           epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
1022
  }
1023
  
1024
  if (pTask) {
15,835,916✔
1025
    pTask->lastMsgType = msgType;
14,669,961✔
1026
  }
1027

1028
  code = asyncSendMsgToServerExt(trans->pTrans, epSet, NULL, pMsgSendInfo, persistHandle, ctx);
15,835,916✔
1029
  pMsgSendInfo = NULL;
15,874,418✔
1030
  if (code) {
15,874,418✔
1031
    SCH_ERR_JRET(code);
1,129!
1032
  }
1033

1034
  if (pJob) {
15,873,289✔
1035
    SCH_TASK_TLOG("req msg sent, type:%d, %s", msgType, TMSG_INFO(msgType));
14,754,216!
1036
  } else {
1037
    qTrace("req msg sent, type:%d, %s", msgType, TMSG_INFO(msgType));
1,119,073!
1038
  }
1039
  return TSDB_CODE_SUCCESS;
15,868,388✔
1040

1041
_return:
1,129✔
1042

1043
  if (pJob) {
1,129!
1044
    SCH_TASK_ELOG("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
1,129!
1045
  } else {
1046
    qError("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
×
1047
  }
1048

1049
  if (pMsgSendInfo) {
1,129!
1050
    destroySendMsgInfo(pMsgSendInfo);
×
1051
  }
1052

1053
  SCH_RET(code);
1,129!
1054
}
1055

1056
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) {
1,119,061✔
1057
  SSchedulerHbReq req = {0};
1,119,061✔
1058
  int32_t         code = 0;
1,119,061✔
1059
  SRpcCtx         rpcCtx = {0};
1,119,061✔
1060
  SSchTrans       trans = {0};
1,119,061✔
1061
  int32_t         msgType = TDMT_SCH_QUERY_HEARTBEAT;
1,119,061✔
1062

1063
  req.header.vgId = nodeEpId->nodeId;
1,119,061✔
1064
  req.clientId = schMgmt.clientId;
1,119,061✔
1065
  TAOS_MEMCPY(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
1,119,061✔
1066

1067
  SCH_LOCK(SCH_READ, &schMgmt.hbLock);
1,119,061!
1068
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
1,117,869✔
1069
  if (NULL == hb) {
1,114,965✔
1070
    SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
47!
1071
    qError("hb connection no longer exist, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
47!
1072
           nodeEpId->ep.port);
1073
    return TSDB_CODE_SUCCESS;
47✔
1074
  }
1075

1076
  SCH_LOCK(SCH_WRITE, &hb->lock);
1,114,918!
1077
  code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
1,112,285✔
1078
  TAOS_MEMCPY(&trans, &hb->trans, sizeof(trans));
1,116,773✔
1079
  if (NULL == hb->trans.pTrans) {
1,116,773!
1080
    qError("NULL pTrans got from hbConnections for epId:%d", nodeEpId->nodeId);
×
1081
  }
1082
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
1,116,773!
1083
  SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
1,115,620!
1084

1085
  SCH_ERR_RET(code);
1,114,880!
1086

1087
  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
1,114,880✔
1088
  if (msgSize < 0) {
1,111,782!
1089
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
×
1090
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
1091
  }
1092
  void *msg = taosMemoryCalloc(1, msgSize);
1,111,782!
1093
  if (NULL == msg) {
1,115,584!
1094
    qError("calloc hb req %d failed", msgSize);
×
1095
    SCH_ERR_JRET(terrno);
×
1096
  }
1097

1098
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
1,115,584!
1099
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
×
1100
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
1101
  }
1102

1103
  int64_t        transporterId = 0;
1,113,757✔
1104
  SQueryNodeAddr addr = {.nodeId = nodeEpId->nodeId};
1,113,757✔
1105
  addr.epSet.inUse = 0;
1,113,757✔
1106
  addr.epSet.numOfEps = 1;
1,113,757✔
1107
  TAOS_MEMCPY(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
1,113,757✔
1108

1109
  code = schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx);
1,113,757✔
1110
  msg = NULL;
1,118,704✔
1111
  SCH_ERR_JRET(code);
1,118,704!
1112

1113
  return TSDB_CODE_SUCCESS;
1,118,704✔
1114

1115
_return:
×
1116

1117
  taosMemoryFreeClear(msg);
×
1118
  schFreeRpcCtx(&rpcCtx);
×
1119
  SCH_RET(code);
×
1120
}
1121

1122
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType, void* param) {
14,670,244✔
1123
  int32_t  msgSize = 0;
14,670,244✔
1124
  void    *msg = NULL;
14,670,244✔
1125
  int32_t  code = 0;
14,670,244✔
1126
  bool     isCandidateAddr = false;
14,670,244✔
1127
  bool     persistHandle = false;
14,670,244✔
1128
  SRpcCtx  rpcCtx = {0};
14,670,244✔
1129

1130
  if (NULL == addr) {
14,670,244✔
1131
    addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
11,900,762✔
1132
    if (NULL == addr) {
11,894,442✔
1133
      SCH_TASK_ELOG("fail to get condidateAddr, candidateIdx %d, totalNum: %d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
10!
1134
      SCH_ERR_JRET(terrno);
10!
1135
    }
1136
    
1137
    isCandidateAddr = true;
11,894,432✔
1138
    SCH_TASK_TLOG("target candidateIdx %d, epInUse %d/%d", pTask->candidateIdx, addr->epSet.inUse,
11,894,432!
1139
                  addr->epSet.numOfEps);
1140
  }
1141

1142
  switch (msgType) {
14,669,600!
1143
    case TDMT_VND_CREATE_TABLE:
9,864,098✔
1144
    case TDMT_VND_DROP_TABLE:
1145
    case TDMT_VND_ALTER_TABLE:
1146
    case TDMT_VND_SUBMIT:
1147
    case TDMT_VND_COMMIT: {
1148
      msgSize = pTask->msgLen;
9,864,098✔
1149
      msg = pTask->msg;
9,864,098✔
1150
      pTask->msg = NULL;
9,864,098✔
1151
      break;
9,864,098✔
1152
    }
1153

1154
    case TDMT_VND_DELETE: {
55,318✔
1155
      SVDeleteReq req = {0};
55,318✔
1156
      req.header.vgId = addr->nodeId;
55,318✔
1157
      req.sId = pTask->seriousId;
55,318✔
1158
      req.queryId = pJob->queryId;
55,318✔
1159
      req.clientId = pTask->clientId;
55,318✔
1160
      req.taskId = pTask->taskId;
55,318✔
1161
      req.phyLen = pTask->msgLen;
55,318✔
1162
      req.sqlLen = strlen(pJob->sql);
55,318✔
1163
      req.sql = (char *)pJob->sql;
55,318✔
1164
      req.msg = pTask->msg;
55,318✔
1165
      req.source = pJob->source;
55,318✔
1166
      msgSize = tSerializeSVDeleteReq(NULL, 0, &req);
55,318✔
1167
      if (msgSize < 0) {
55,314!
1168
        SCH_TASK_ELOG("tSerializeSVDeleteReq failed, code:%x", terrno);
×
1169
        SCH_ERR_JRET(terrno);
×
1170
      }
1171
      msg = taosMemoryCalloc(1, msgSize);
55,314!
1172
      if (NULL == msg) {
55,322!
1173
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1174
        SCH_ERR_JRET(terrno);
×
1175
      }
1176

1177
      msgSize = tSerializeSVDeleteReq(msg, msgSize, &req);
55,322✔
1178
      if (msgSize < 0) {
55,317✔
1179
        SCH_TASK_ELOG("tSerializeSVDeleteReq second failed, code:%x", terrno);
5!
1180
        SCH_ERR_JRET(terrno);
5!
1181
      }
1182
      break;
55,312✔
1183
    }
1184
    case TDMT_SCH_QUERY:
1,948,295✔
1185
    case TDMT_SCH_MERGE_QUERY: {
1186
      SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
1,948,295!
1187

1188
      SSubQueryMsg qMsg;
1189
      qMsg.header.vgId = addr->nodeId;
1,952,662✔
1190
      qMsg.header.contLen = 0;
1,952,662✔
1191
      qMsg.sId = pTask->seriousId;
1,952,662✔
1192
      qMsg.queryId = pJob->queryId;
1,952,662✔
1193
      qMsg.clientId = pTask->clientId;
1,952,662✔
1194
      qMsg.taskId = pTask->taskId;
1,952,662✔
1195
      qMsg.refId = pJob->refId;
1,952,662✔
1196
      qMsg.execId = pTask->execId;
1,952,662✔
1197
      qMsg.msgMask = (pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0;
1,952,662✔
1198
      qMsg.msgMask |= (pTask->plan->isView) ? QUERY_MSG_MASK_VIEW() : 0;
1,952,662✔
1199
      qMsg.msgMask |= (pTask->plan->isAudit) ? QUERY_MSG_MASK_AUDIT() : 0;
1,952,662✔
1200
      qMsg.taskType = TASK_TYPE_TEMP;
1,952,662✔
1201
      qMsg.explain = SCH_IS_EXPLAIN_JOB(pJob);
1,952,662✔
1202
      qMsg.needFetch = SCH_TASK_NEED_FETCH(pTask);
1,952,662✔
1203
      qMsg.sqlLen = strlen(pJob->sql);
1,952,662✔
1204
      qMsg.sql = pJob->sql;
1,952,662✔
1205
      qMsg.msgLen = pTask->msgLen;
1,952,662✔
1206
      qMsg.msg = pTask->msg;
1,952,662✔
1207

1208
      if (strcmp(tsLocalFqdn, GET_ACTIVE_EP(&addr->epSet)->fqdn) == 0) {
1,952,662✔
1209
        qMsg.compress = 0;
554,332✔
1210
      } else {
1211
        qMsg.compress = 1;
1,398,330✔
1212
      }
1213

1214
      msgSize = tSerializeSSubQueryMsg(NULL, 0, &qMsg);
1,952,662✔
1215
      if (msgSize < 0) {
1,952,088!
1216
        SCH_TASK_ELOG("tSerializeSSubQueryMsg get size, msgSize:%d", msgSize);
×
1217
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1218
      }
1219
      
1220
      msg = taosMemoryCalloc(1, msgSize);
1,952,088!
1221
      if (NULL == msg) {
1,951,872!
1222
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1223
        SCH_ERR_RET(terrno);
×
1224
      }
1225

1226
      if (tSerializeSSubQueryMsg(msg, msgSize, &qMsg) < 0) {
1,951,872!
1227
        SCH_TASK_ELOG("tSerializeSSubQueryMsg failed, msgSize:%d", msgSize);
×
1228
        taosMemoryFree(msg);
×
1229
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1230
      }
1231

1232
      persistHandle = true;
1,952,115✔
1233
      int64_t refId = 0;
1,952,115✔
1234
      code = rpcAllocHandle(&refId);
1,952,115✔
1235
      if (code != 0) {
1,952,454!
1236
        SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code);
×
1237
        SCH_ERR_JRET(code);
×
1238
      }
1239

1240
      SCH_SET_TASK_HANDLE(pTask, (void *)refId);
1,952,361✔
1241
      break;
1,952,361✔
1242
    }
1243
    case TDMT_SCH_FETCH:
849,483✔
1244
    case TDMT_SCH_MERGE_FETCH: {
1245
      SResFetchReq req = {0};
849,483✔
1246
      req.header.vgId = addr->nodeId;
849,483✔
1247
      req.sId = pTask->seriousId;
849,483✔
1248
      req.queryId = pJob->queryId;
849,483✔
1249
      req.clientId = pTask->clientId;
849,483✔
1250
      req.taskId = pTask->taskId;
849,483✔
1251
      req.execId = pTask->execId;
849,483✔
1252

1253
      msgSize = tSerializeSResFetchReq(NULL, 0, &req);
849,483✔
1254
      if (msgSize < 0) {
849,484!
1255
        SCH_TASK_ELOG("tSerializeSResFetchReq get size, msgSize:%d", msgSize);
×
1256
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1257
      }
1258
      
1259
      msg = taosMemoryCalloc(1, msgSize);
849,484!
1260
      if (NULL == msg) {
849,495!
1261
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1262
        SCH_ERR_RET(terrno);
×
1263
      }
1264

1265
      if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
849,495!
1266
        SCH_TASK_ELOG("tSerializeSResFetchReq %d failed", msgSize);
×
1267
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1268
      }
1269
      break;
849,487✔
1270
    }
1271
    case TDMT_SCH_DROP_TASK: {
1,952,380✔
1272
      STaskDropReq qMsg;
1273
      qMsg.header.vgId = addr->nodeId;
1,952,380✔
1274
      qMsg.header.contLen = 0;
1,952,380✔
1275
      qMsg.sId = pTask->seriousId;
1,952,380✔
1276
      qMsg.queryId = pJob->queryId;
1,952,380✔
1277
      qMsg.clientId = pTask->clientId;
1,952,380✔
1278
      qMsg.taskId = pTask->taskId;
1,952,380✔
1279
      qMsg.refId = pJob->refId;
1,952,380✔
1280
      qMsg.execId = *(int32_t*)param;
1,952,380✔
1281

1282
      msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
1,952,380✔
1283
      if (msgSize < 0) {
1,952,375!
1284
        SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
×
1285
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1286
      }
1287
      
1288
      msg = taosMemoryCalloc(1, msgSize);
1,952,375!
1289
      if (NULL == msg) {
1,952,384!
1290
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1291
        SCH_ERR_RET(terrno);
×
1292
      }
1293

1294
      if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) {
1,952,384!
1295
        SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
×
1296
        taosMemoryFree(msg);
×
1297
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1298
      }
1299
      break;
1,952,370✔
1300
    }
1301
/*
1302
    case TDMT_SCH_QUERY_HEARTBEAT: {
1303
      SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
1304

1305
      SSchedulerHbReq req = {0};
1306
      req.clientId = schMgmt.clientId;
1307
      req.header.vgId = addr->nodeId;
1308
      req.epId.nodeId = addr->nodeId;
1309
      TAOS_MEMCPY(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
1310

1311
      msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
1312
      if (msgSize < 0) {
1313
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
1314
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
1315
      }
1316
      msg = taosMemoryCalloc(1, msgSize);
1317
      if (NULL == msg) {
1318
        SCH_JOB_ELOG("calloc %d failed", msgSize);
1319
        SCH_ERR_RET(terrno);
1320
      }
1321
      if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
1322
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
1323
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
1324
      }
1325

1326
      persistHandle = true;
1327
      break;
1328
    }
1329
*/    
1330
    case TDMT_SCH_TASK_NOTIFY: {
26✔
1331
      ETaskNotifyType* pType = param;
26✔
1332
      STaskNotifyReq qMsg;
1333
      qMsg.header.vgId = addr->nodeId;
26✔
1334
      qMsg.header.contLen = 0;
26✔
1335
      qMsg.sId = pTask->seriousId;
26✔
1336
      qMsg.queryId = pJob->queryId;
26✔
1337
      qMsg.clientId = pTask->clientId;
26✔
1338
      qMsg.taskId = pTask->taskId;
26✔
1339
      qMsg.refId = pJob->refId;
26✔
1340
      qMsg.execId = pTask->execId;
26✔
1341
      qMsg.type = *pType;
26✔
1342

1343
      msgSize = tSerializeSTaskNotifyReq(NULL, 0, &qMsg);
26✔
1344
      if (msgSize < 0) {
26!
1345
        SCH_TASK_ELOG("tSerializeSTaskNotifyReq get size, msgSize:%d", msgSize);
×
1346
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1347
      }
1348
      
1349
      msg = taosMemoryCalloc(1, msgSize);
26!
1350
      if (NULL == msg) {
26!
1351
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1352
        SCH_ERR_RET(terrno);
×
1353
      }
1354

1355
      if (tSerializeSTaskNotifyReq(msg, msgSize, &qMsg) < 0) {
26!
1356
        SCH_TASK_ELOG("tSerializeSTaskNotifyReq failed, msgSize:%d", msgSize);
×
1357
        taosMemoryFree(msg);
×
1358
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1359
      }
1360
      break;      
26✔
1361
    }
1362
    default:
×
1363
      SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
×
1364
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1365
  }
1366

1367
  if ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (TDMT_VND_SUBMIT == msgType)) {
14,673,654✔
1368
    taosMemoryFree(msg);
1!
1369
    SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
1!
1370
  } else {
1371
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
14,673,653✔
1372
      SCH_ERR_JRET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
1,938,119!
1373
    }
1374

1375
    SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
14,689,861!
1376
    code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
14,689,861✔
1377
    msg = NULL;
14,747,624✔
1378
    SCH_ERR_JRET(code);
14,747,624!
1379
  }
1380

1381
  return TSDB_CODE_SUCCESS;
14,746,496✔
1382

1383
_return:
1,139✔
1384

1385
  pTask->lastMsgType = -1;
1,139✔
1386
  schFreeRpcCtx(&rpcCtx);
1,139✔
1387

1388
  taosMemoryFreeClear(msg);
1,139!
1389
  SCH_RET(code);
1,139!
1390
}
1391
// clang-format on
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc