• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.84
/source/libs/scheduler/src/schRemote.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "command.h"
18
#include "query.h"
19
#include "schInt.h"
20
#include "tarray.h"
21
#include "tglobal.h"
22
#include "tmisce.h"
23
#include "tmsg.h"
24
#include "tref.h"
25
#include "trpc.h"
26

27
// clang-format off
28
int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
1,407,942,248✔
29
  int32_t lastMsgType = pTask->lastMsgType;
1,407,942,248✔
30
  int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
1,407,950,921✔
31
  int32_t reqMsgType = (msgType & 1U) ? msgType : (msgType - 1);
1,407,963,130✔
32
  switch (msgType) {
1,407,963,130✔
33
    case TDMT_SCH_LINK_BROKEN:
74,797,399✔
34
    case TDMT_SCH_EXPLAIN_RSP:
35
      return TSDB_CODE_SUCCESS;
74,797,399✔
36
    case TDMT_SCH_FETCH_RSP:
262,647,451✔
37
    case TDMT_SCH_MERGE_FETCH_RSP:
38
      if (lastMsgType != reqMsgType) {
262,647,451✔
39
        SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
1,960✔
40
                      TMSG_INFO(msgType));
41
        SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
1,960✔
42
      }
43
      if (taskStatus != JOB_TASK_STATUS_FETCH) {
262,645,491✔
44
        SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
1,960✔
45
                      TMSG_INFO(msgType));
46
        SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
1,960✔
47
      }
48

49
      return TSDB_CODE_SUCCESS;
262,643,479✔
50
    case TDMT_SCH_MERGE_QUERY_RSP:
1,070,501,165✔
51
    case TDMT_SCH_QUERY_RSP:
52
    case TDMT_VND_CREATE_TABLE_RSP:
53
    case TDMT_VND_DROP_TABLE_RSP:
54
    case TDMT_VND_ALTER_TABLE_RSP:
55
    case TDMT_VND_SUBMIT_RSP:
56
    case TDMT_VND_DELETE_RSP:
57
    case TDMT_VND_COMMIT_RSP:
58
      break;
1,070,501,165✔
59
    default:
17,209✔
60
      SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
17,209✔
61
      SCH_ERR_RET(TSDB_CODE_INVALID_MSG);
17,209✔
62
  }
63

64
  if (lastMsgType != reqMsgType) {
1,070,516,320✔
65
    SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
1,960✔
66
                  TMSG_INFO(msgType));
67
    SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
1,960✔
68
  }
69

70
  if (taskStatus != JOB_TASK_STATUS_EXEC) {
1,070,514,360✔
71
    SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
1,960✔
72
                  TMSG_INFO(msgType));
73
    SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
878✔
74
  }
75

76
  return TSDB_CODE_SUCCESS;
1,070,495,186✔
77
}
78

79
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode) {
262,651,806✔
80
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
262,651,806✔
81
  int32_t code = 0;
262,651,806✔
82
  
83
  SCH_ERR_JRET(rspCode);
262,651,806✔
84

85
  if (NULL == msg) {
262,649,846✔
86
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
1,960✔
87
  }
88

89

90
  if (SCH_IS_EXPLAIN_JOB(pJob)) {
262,647,886✔
91
    if (rsp->completed) {
2,100,080✔
92
      SRetrieveTableRsp *pRsp = NULL;
1,069,106✔
93
      SCH_ERR_JRET(qExecExplainEnd(SCH_JOB_EXPLAIN_CTX(pJob), &pRsp));
1,069,106✔
94
      if (pRsp) {
1,069,106✔
95
        SCH_ERR_JRET(schProcessOnExplainDone(SCH_PARENT_JOB(pJob), pTask, pRsp));
1,064,583✔
96
      } else {
97
        SCH_ERR_JRET(schNotifyJobAllTasks(SCH_PARENT_JOB(pJob), pTask, TASK_NOTIFY_FINISHED));
4,523✔
98
      }
99
  
100
      taosMemoryFreeClear(msg);
1,069,106✔
101
  
102
      return TSDB_CODE_SUCCESS;
1,069,106✔
103
    }
104
  
105
    SCH_ERR_JRET(schLaunchFetchTask(pJob));
1,030,974✔
106
  
107
    taosMemoryFreeClear(msg);
1,030,974✔
108
  
109
    return TSDB_CODE_SUCCESS;
1,030,974✔
110
  }
111
  
112
  if (pJob->fetchRes) {
260,547,780✔
113
    SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes);
1,960✔
114
    SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
1,960✔
115
  }
116
  
117
  atomic_store_ptr(&pJob->fetchRes, rsp);
260,545,915✔
118
  (void)atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows));
260,545,915✔
119
  
120
  if (rsp->completed) {
260,545,941✔
121
    SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
134,134,610✔
122
  }
123
  
124
  SCH_TASK_DLOG("got fetch rsp, rows:%" PRId64 ", complete:%d", htobe64(rsp->numOfRows), rsp->completed);
260,545,928✔
125

126
  msg = NULL;
260,545,928✔
127
  schProcessOnDataFetched(pJob);
260,545,928✔
128

129
_return:
260,549,861✔
130

131
  taosMemoryFreeClear(msg);
260,549,861✔
132

133
  SCH_RET(code);
260,549,861✔
134
}
135

136
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
74,792,731✔
137
  SRetrieveTableRsp *pRsp = NULL;
74,792,731✔
138
  SExplainCtx* pCtx = SCH_JOB_EXPLAIN_CTX(pJob);
74,793,127✔
139
  SCH_ERR_RET(qExplainUpdateExecInfo(pCtx, qExplainGetCurrPlan(pCtx, pJob->subJobId), rsp, pTask->plan->id.groupId, &pRsp));
74,793,523✔
140
  
141
  if (pRsp) {
74,803,002✔
142
    SCH_ERR_RET(schProcessOnExplainDone(SCH_PARENT_JOB(pJob), pTask, pRsp));
4,523✔
143
  }
144

145
  return TSDB_CODE_SUCCESS;
74,803,002✔
146
}
147

148
int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
1,403,530,679✔
149
  int32_t code = 0;
1,403,530,679✔
150
  int32_t msgSize = pMsg->len;
1,403,530,679✔
151
  int32_t msgType = pMsg->msgType;
1,403,537,132✔
152

153
  switch (msgType) {
1,403,519,043✔
154
    case TDMT_VND_COMMIT_RSP: {
3,864,960✔
155
      SCH_ERR_JRET(rspCode);
3,864,960✔
156
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
3,864,244✔
157
      break;
3,864,686✔
158
    }
159
    case TDMT_VND_CREATE_TABLE_RSP: {
45,355,283✔
160
      SVCreateTbBatchRsp batchRsp = {0};
45,355,283✔
161
      if (pMsg->pData) {
45,357,845✔
162
        SDecoder coder = {0};
45,351,788✔
163
        tDecoderInit(&coder, pMsg->pData, msgSize);
45,352,598✔
164
        code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
45,348,095✔
165
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
45,348,040✔
166
          SCH_LOCK(SCH_WRITE, &pJob->resLock);
45,348,135✔
167
          if (NULL == pJob->execRes.res) {
45,343,382✔
168
            pJob->execRes.res = (void*)taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
43,815,888✔
169
            if (NULL == pJob->execRes.res) {
43,818,467✔
170
              code = terrno;
×
171
              SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
172
              
173
              tDecoderClear(&coder);
×
174
              SCH_ERR_JRET(code);
×
175
            }
176
            
177
            pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
43,815,740✔
178
          }
179

180
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
96,353,758✔
181
            SVCreateTbRsp *rsp = batchRsp.pRsps + i;
51,010,600✔
182
            if (rsp->pMeta) {
51,008,141✔
183
              if (NULL == taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta)) {
101,973,349✔
184
                code = terrno;
×
185
                SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
186
                
187
                tDecoderClear(&coder);
×
188
                SCH_ERR_JRET(code);
×
189
              }
190
            }
191
            
192
            if (TSDB_CODE_SUCCESS != rsp->code) {
51,010,960✔
193
              code = rsp->code;
14,489✔
194
            }
195
          }
196

197
          if (taosArrayGetSize((SArray*)pJob->execRes.res) <= 0) {        
45,343,158✔
198
            taosArrayDestroy((SArray*)pJob->execRes.res);
16,987✔
199
            pJob->execRes.res = NULL;
16,987✔
200
          }
201
          SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
45,350,407✔
202
        }
203
        
204
        tDecoderClear(&coder);
45,347,563✔
205
        SCH_ERR_JRET(code);
45,350,231✔
206
      }
207

208
      SCH_ERR_JRET(rspCode);
45,343,208✔
209
      taosMemoryFreeClear(pMsg->pData);
45,336,272✔
210

211
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
45,332,541✔
212
      break;
45,335,992✔
213
    }
214
    case TDMT_VND_DROP_TABLE_RSP: {
1,514,238✔
215
      SVDropTbBatchRsp batchRsp = {0};
1,514,238✔
216
      if (pMsg->pData) {
1,514,238✔
217
        SDecoder coder = {0};
1,513,670✔
218
        tDecoderInit(&coder, pMsg->pData, msgSize);
1,513,670✔
219
        code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
1,513,670✔
220
        if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
1,513,670✔
221
          for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
3,077,027✔
222
            SVDropTbRsp *rsp = batchRsp.pRsps + i;
1,563,357✔
223
            if (TSDB_CODE_SUCCESS != rsp->code) {
1,563,357✔
224
              code = rsp->code;
×
225
              tDecoderClear(&coder);
×
226
              SCH_ERR_JRET(code);
×
227
            }
228
          }
229
        }
230
        tDecoderClear(&coder);
1,513,670✔
231
        SCH_ERR_JRET(code);
1,513,670✔
232
      }
233

234
      SCH_ERR_JRET(rspCode);
1,514,238✔
235
      taosMemoryFreeClear(pMsg->pData);
1,513,670✔
236

237
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
1,513,670✔
238
      break;
1,513,670✔
239
    }
240
    case TDMT_VND_ALTER_TABLE_RSP: {
12,727,873✔
241
      SVAlterTbRsp rsp = {0};
12,727,873✔
242
      if (pMsg->pData) {
12,727,873✔
243
        SDecoder coder = {0};
12,725,607✔
244
        tDecoderInit(&coder, pMsg->pData, msgSize);
12,725,607✔
245
        code = tDecodeSVAlterTbRsp(&coder, &rsp);
12,725,607✔
246
        tDecoderClear(&coder);
12,725,607✔
247
        SCH_ERR_JRET(code);
12,725,607✔
248
        if (rsp.code == TSDB_CODE_VND_SAME_TAG) {
12,725,607✔
249
          rsp.code = TSDB_CODE_SUCCESS;
×
250
        }
251
        SCH_ERR_JRET(rsp.code);
12,725,607✔
252

253
        pJob->execRes.res = rsp.pMeta;
12,134,794✔
254
        pJob->execRes.msgType = TDMT_VND_ALTER_TABLE;
12,134,794✔
255
      }
256

257
      SCH_ERR_JRET(rspCode);
12,137,060✔
258

259
      if (NULL == pMsg->pData) {
12,136,754✔
260
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
1,960✔
261
      }
262

263
      taosMemoryFreeClear(pMsg->pData);
12,134,794✔
264

265
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
12,134,794✔
266
      break;
12,133,540✔
267
    }
268
    case TDMT_VND_SUBMIT_RSP: {
492,266,240✔
269
      SCH_ERR_JRET(rspCode);
492,266,240✔
270

271
      if (pMsg->pData) {
492,176,649✔
272
        SDecoder    coder = {0};
492,177,639✔
273
        SSubmitRsp2 *rsp = taosMemoryMalloc(sizeof(*rsp));
492,177,662✔
274
        if (NULL == rsp) {
492,177,605✔
275
          SCH_ERR_JRET(terrno);
1,960✔
276
        }
277
        tDecoderInit(&coder, pMsg->pData, msgSize);
492,177,605✔
278
        code = tDecodeSSubmitRsp2(&coder, rsp);
492,174,222✔
279
        tDecoderClear(&coder);
492,164,438✔
280
        if (code) {
492,163,799✔
281
          SCH_TASK_ELOG("tDecodeSSubmitRsp2 failed, code:%d", code);
1,960✔
282
          tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE);
1,960✔
283
          taosMemoryFree(rsp);
1,960✔
284
          SCH_ERR_JRET(code);
1,960✔
285
        }
286

287
        (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows);
492,161,839✔
288

289
        int32_t createTbRspNum = taosArrayGetSize(rsp->aCreateTbRsp);
492,175,293✔
290
        SCH_TASK_DLOG("submit succeed, affectedRows:%d, createTbRspNum:%d", rsp->affectedRows, createTbRspNum);
492,168,037✔
291

292
        if (rsp->aCreateTbRsp && taosArrayGetSize(rsp->aCreateTbRsp) > 0) {
492,174,323✔
293
          SCH_LOCK(SCH_WRITE, &pJob->resLock);
12,442,545✔
294
          if (pJob->execRes.res) {
12,438,445✔
295
            SSubmitRsp2 *sum = pJob->execRes.res;
83,663✔
296
            sum->affectedRows += rsp->affectedRows;
83,663✔
297
            if (sum->aCreateTbRsp) {
83,663✔
298
              if (NULL == taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp)) {
82,856✔
299
                code = terrno;
×
300
                SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
×
301
                SCH_ERR_JRET(code);
×
302
              }
303
              
304
              taosArrayDestroy(rsp->aCreateTbRsp);
82,856✔
305
            } else {
306
              TSWAP(sum->aCreateTbRsp, rsp->aCreateTbRsp);
807✔
307
            }
308
            taosMemoryFree(rsp);
83,663✔
309
          } else {
310
            pJob->execRes.res = rsp;
12,357,513✔
311
            pJob->execRes.msgType = TDMT_VND_SUBMIT;
12,358,531✔
312
          }
313
          pJob->execRes.numOfBytes += pTask->msgLen;
12,436,395✔
314
          SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
12,442,517✔
315
        } else {
316
          SCH_LOCK(SCH_WRITE, &pJob->resLock);
479,733,175✔
317
          pJob->execRes.numOfBytes += pTask->msgLen;
479,732,923✔
318
          if (NULL == pJob->execRes.res) {
479,735,839✔
319
            TSWAP(pJob->execRes.res, rsp);
464,556,091✔
320
            pJob->execRes.msgType = TDMT_VND_SUBMIT;
464,554,709✔
321
          }
322
          SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
479,736,473✔
323
          tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE);
479,733,992✔
324
          taosMemoryFree(rsp);
479,732,946✔
325
        }
326
      }
327

328
      taosMemoryFreeClear(pMsg->pData);
492,180,315✔
329

330
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
492,172,188✔
331

332
      break;
492,178,212✔
333
    }
334
    case TDMT_VND_DELETE_RSP: {
1,805,142✔
335
      SCH_ERR_JRET(rspCode);
1,805,142✔
336

337
      if (pMsg->pData) {
1,805,142✔
338
        SDecoder    coder = {0};
1,805,142✔
339
        SVDeleteRsp rsp = {0};
1,805,142✔
340
        tDecoderInit(&coder, pMsg->pData, msgSize);
1,805,142✔
341
        if (tDecodeSVDeleteRsp(&coder, &rsp) < 0) {
1,805,142✔
342
          code = terrno;
1,960✔
343
          tDecoderClear(&coder);
1,960✔
344
          SCH_ERR_JRET(code);
1,960✔
345
        }
346
        tDecoderClear(&coder);
1,802,608✔
347

348
        (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
1,803,182✔
349
        SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
1,803,195✔
350
      }
351

352
      taosMemoryFreeClear(pMsg->pData);
1,803,195✔
353

354
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
1,803,182✔
355

356
      break;
1,803,195✔
357
    }
358
    case TDMT_SCH_QUERY_RSP:
508,544,068✔
359
    case TDMT_SCH_MERGE_QUERY_RSP: {
360
      SCH_ERR_JRET(rspCode);
508,547,988✔
361
      if (NULL == pMsg->pData) {
496,416,261✔
362
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
1,960✔
363
      }
364

365
      if (taosArrayGetSize(pTask->parents) == 0 && SCH_IS_EXPLAIN_JOB(pJob) && SCH_IS_INSERT_JOB(pJob)) {
496,420,612✔
366
        SRetrieveTableRsp *pRsp = NULL;
4,797✔
367
        SCH_ERR_JRET(qExecExplainEnd(SCH_JOB_EXPLAIN_CTX(pJob), &pRsp));
4,797✔
368
        if (pRsp) {
4,797✔
369
          SCH_ERR_JRET(schProcessOnExplainDone(SCH_PARENT_JOB(pJob), pTask, pRsp));
4,797✔
370
        }
371
      }
372

373
      SQueryTableRsp rsp = {0};
496,435,141✔
374
      if (tDeserializeSQueryTableRsp(pMsg->pData, msgSize, &rsp) < 0) {
496,437,661✔
375
        SCH_TASK_ELOG("tDeserializeSQueryTableRsp failed, msgSize:%d", msgSize);
1,960✔
376
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_MSG);
615✔
377
      }
378
      
379
      SCH_ERR_JRET(rsp.code);
496,405,496✔
380

381
      SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp));
496,405,496✔
382

383
      (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
496,448,805✔
384

385
      taosMemoryFreeClear(pMsg->pData);
496,457,916✔
386

387
      SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
496,451,651✔
388

389
      break;
496,440,382✔
390
    }
391
    case TDMT_SCH_EXPLAIN_RSP: {
74,793,283✔
392
      SCH_ERR_JRET(rspCode);
74,801,123✔
393
      if (NULL == pMsg->pData) {
74,793,283✔
394
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
1,960✔
395
      }
396

397
      if (!SCH_IS_EXPLAIN_JOB(pJob)) {
74,791,719✔
398
        SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
1,960✔
399
        SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
1,960✔
400
      }
401

402
      if (pJob->fetchRes) {
74,789,759✔
403
        SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->fetchRes);
1,960✔
404
        SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
1,960✔
405
      }
406

407
      SExplainRsp rsp = {0};
74,788,195✔
408
      if (tDeserializeSExplainRsp(pMsg->pData, msgSize, &rsp)) {
74,787,403✔
409
        tFreeSExplainRsp(&rsp);
1,960✔
410
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
1,960✔
411
      }
412

413
      SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp));
74,796,010✔
414

415
      taosMemoryFreeClear(pMsg->pData);
74,802,434✔
416
      break;
74,803,710✔
417
    }
418
    case TDMT_SCH_FETCH_RSP:
262,643,505✔
419
    case TDMT_SCH_MERGE_FETCH_RSP: {
420
      code = schProcessFetchRsp(pJob, pTask, pMsg->pData, rspCode);
262,643,505✔
421
      pMsg->pData = NULL;
262,643,544✔
422
      SCH_ERR_JRET(code);
262,643,557✔
423
      break;
262,643,557✔
424
    }
425
    case TDMT_SCH_DROP_TASK_RSP: {
1,960✔
426
      // NEVER REACH HERE
427
      SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId);
1,960✔
428
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
1,960✔
429
      break;
×
430
    }
431
    case TDMT_SCH_LINK_BROKEN:
1,960✔
432
      SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
1,960✔
433
      SCH_ERR_JRET(rspCode);
1,960✔
434
      break;
1,960✔
435
    case TDMT_MND_CREATE_TOKEN_RSP:{
×
436
      SCreateTokenRsp batchRsp = {0};
×
437
      code = tDeserializeSCreateTokenResp(pMsg->pData, msgSize, &batchRsp);
×
438
      SCH_ERR_JRET(code);
×
439
      break;
×
440
    }
441
    case TDMT_MND_CREATE_TOTP_SECRET:{
×
442
      SCreateTotpSecretRsp rsp = {0};
×
443
      code = tDeserializeSCreateTotpSecretRsp(pMsg->pData, msgSize, &rsp);
×
444
      SCH_ERR_JRET(code);
×
445
      break;
×
446
    }
447
    default:
1,012✔
448
      SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%s", msgType, SCH_GET_TASK_STATUS_STR(pTask));
1,012✔
449
      SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
266✔
450
  }
451

452
  return TSDB_CODE_SUCCESS;
1,390,713,284✔
453

454
_return:
12,850,584✔
455

456
  taosMemoryFreeClear(pMsg->pData);
12,850,584✔
457

458
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
12,853,428✔
459
} 
460

461

462
// Note: no more task error processing, handled in function internal
463
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, uint64_t seriesId, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
1,407,920,792✔
464
  int32_t code = 0;
1,407,920,792✔
465
  int32_t msgType = pMsg->msgType;
1,407,920,792✔
466

467
  bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
1,407,930,029✔
468
  if (SCH_IS_QUERY_JOB(pJob)) {
1,407,930,029✔
469
    SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, seriesId, execId));
850,398,439✔
470
  }
471
  
472
  SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType));
1,407,945,991✔
473

474
  if (pTask->seriesId < atomic_load_64(&pJob->seriesId)) {
1,407,921,702✔
475
    SCH_TASK_DLOG("task sId %" PRId64 " is smaller than current job sId %" PRId64, pTask->seriesId, pJob->seriesId);
×
476
    SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
×
477
  }
478

479
  int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
1,407,913,719✔
480

481
  if (SCH_DATA_BIND_TASK_NEED_RETRY(pJob, pTask,reqType, rspCode)) {
1,407,919,675✔
482
    SCH_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
125,558✔
483
  } else if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
1,407,814,199✔
484
    SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
4,278,894✔
485
  }
486

487
  pTask->redirectCtx.inRedirect = false;
1,403,559,397✔
488
  SCH_RET(schProcessResponseMsg(pJob, pTask, pMsg, rspCode));
1,403,553,443✔
489

490
_return:
×
491
  taosMemoryFreeClear(pMsg->pData);
×
492
  SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
×
493
} 
494

495
int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
1,411,395,596✔
496
  int32_t                code = 0;
1,411,395,596✔
497
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
1,411,395,596✔
498
  SSchTask              *pTask = NULL;
1,411,395,596✔
499
  SSchJob               *pJob = NULL;
1,411,399,727✔
500

501
  int64_t qid = pParam->queryId;
1,411,400,887✔
502
  qDebug("QID:0x%" PRIx64 ", handle rsp msg, type:%s, handle:%p, code:%s", qid,TMSG_INFO(pMsg->msgType), pMsg->handle,
1,411,400,303✔
503
         tstrerror(rspCode));
504

505
  SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->subJobId, pParam->taskId));
1,411,402,054✔
506
  code = schHandleResponseMsg(pJob, pTask, pParam->seriesId, pParam->execId, pMsg, rspCode);
1,407,870,439✔
507
  pMsg->pData = NULL;
1,407,885,794✔
508

509
  schProcessOnCbEnd(pJob, pTask, code);
1,407,889,943✔
510

511
_return:
1,411,419,638✔
512

513
  taosMemoryFreeClear(pMsg->pData);
1,411,420,751✔
514
  taosMemoryFreeClear(pMsg->pEpSet);
1,411,416,451✔
515

516
  qTrace("QID:0x%" PRIx64 ", end to handle rsp msg, type:%s, handle:%p, code:%s", qid, TMSG_INFO(pMsg->msgType), pMsg->handle,
1,411,414,822✔
517
         tstrerror(rspCode));
518

519
  SCH_RET(code);
1,411,393,036✔
520
}
521

522
int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
21,068,197✔
523
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
21,068,197✔
524
  qDebug("QID:0x%" PRIx64 ", SID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " SJID:%d drop task rsp received, code:0x%x", 
21,068,197✔
525
         pParam->queryId, pParam->seriesId, pParam->clientId, pParam->taskId, pParam->subJobId, code);
526
  // called if drop task rsp received code
527
  (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
21,068,197✔
528

529
  if (pMsg->handle == NULL) {
21,067,120✔
530
    qError("sch handle is NULL, may be already released and mem lea");
20,137,638✔
531
  }
532
  if (pMsg) {
21,068,623✔
533
    taosMemoryFree(pMsg->pData);
21,069,257✔
534
    taosMemoryFree(pMsg->pEpSet);
21,069,326✔
535
  }
536
  return TSDB_CODE_SUCCESS;
21,069,326✔
537
}
538

539
int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
1,960✔
540
  SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
1,960✔
541
  qDebug("QID:0x%" PRIx64 ", SID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " SJID:%d task notify rsp received, code:0x%x", 
1,960✔
542
         pParam->queryId, pParam->seriesId, pParam->clientId, pParam->taskId, pParam->subJobId, code);
543
  if (pMsg) {
1,960✔
544
    taosMemoryFreeClear(pMsg->pData);
1,960✔
545
    taosMemoryFreeClear(pMsg->pEpSet);
1,960✔
546
  }
547
  return TSDB_CODE_SUCCESS;
1,960✔
548
}
549

550

551
int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
3,920✔
552
  SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
3,920✔
553
  (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
3,920✔
554

555
  qDebug("handle %p is broken", pMsg->handle);
3,920✔
556

557
  if (head->isHbParam) {
3,920✔
558
    taosMemoryFreeClear(pMsg->pData);
1,960✔
559
    taosMemoryFreeClear(pMsg->pEpSet);
1,960✔
560

561
    SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
1,960✔
562
    SSchTrans            trans = {.pTrans = hbParam->pTrans, .pHandle = NULL, .pHandleId = 0};
1,960✔
563
    SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));
1,960✔
564

565
    SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId, NULL));
1,960✔
566
  } else {
567
    SCH_ERR_RET(schHandleCallback(param, pMsg, code));
1,960✔
568
  }
569

570
  return TSDB_CODE_SUCCESS;
1,960✔
571
}
572

573
int32_t schHandleCommitCallback(void *param, SDataBuf *pMsg, int32_t code) {
3,866,118✔
574
  return schHandleCallback(param, pMsg, code);
3,866,118✔
575
}
576

577
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
264,667,870✔
578
  SSchedulerHbRsp        rsp = {0};
264,667,870✔
579
  SSchHbCallbackParam *pParam = (SSchHbCallbackParam *)param;
264,669,927✔
580

581
  if (code) {
264,669,927✔
582
    qError("hb rsp error:%s", tstrerror(code));
2,384,483✔
583
    (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
2,384,483✔
584
    SCH_ERR_JRET(code);
2,384,483✔
585
  }
586

587
  if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
262,285,444✔
588
    qError("invalid hb rsp msg, size:%d", pMsg->len);
1,960✔
589
    SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
1,960✔
590
  }
591

592
  SSchTrans trans = {0};
262,262,157✔
593
  trans.pTrans = pParam->pTrans;
262,265,278✔
594
  trans.pHandle = pMsg->handle;
262,268,732✔
595
  trans.pHandleId = pMsg->handleRefId;
262,271,366✔
596

597
  SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
262,271,614✔
598
  SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
262,272,185✔
599

600
_return:
264,667,687✔
601

602
  tFreeSSchedulerHbRsp(&rsp);
264,669,294✔
603
  taosMemoryFree(pMsg->pData);
264,668,251✔
604
  taosMemoryFree(pMsg->pEpSet);
264,670,794✔
605
  SCH_RET(code);
264,666,818✔
606
}
607

608
int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans,
2,147,483,647✔
609
                             void **pParam) {
610
  SQueryNodeAddr *pAddr = NULL;
2,147,483,647✔
611

612
  if (!isHb) {
2,147,483,647✔
613
    SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
2,147,483,647✔
614
    if (NULL == param) {
2,147,483,647✔
615
      SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
×
616
      SCH_ERR_RET(terrno);
×
617
    }
618

619
    param->queryId = pJob->queryId;
2,147,483,647✔
620
    param->seriesId = pTask->seriesId;
2,147,483,647✔
621
    param->refId = pJob->refId;
2,147,483,647✔
622
    param->clientId = SCH_CLIENT_ID(pTask);
2,147,483,647✔
623
    param->taskId = SCH_TASK_ID(pTask);
2,147,483,647✔
624
    param->subJobId = pJob->subJobId;
2,147,483,647✔
625
    param->pTrans = pJob->conn.pTrans;
2,147,483,647✔
626
    param->execId = pTask->execId;
2,147,483,647✔
627
    *pParam = param;
2,147,483,647✔
628

629
    return TSDB_CODE_SUCCESS;
2,147,483,647✔
630
  }
631

632
  if (TDMT_SCH_LINK_BROKEN == msgType) {
483,041,537✔
633
    SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
242,823,358✔
634
    if (NULL == param) {
241,196,027✔
635
      SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
×
636
      SCH_ERR_RET(terrno);
×
637
    }
638

639
    param->head.isHbParam = true;
241,196,027✔
640

641
    int32_t code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
242,160,104✔
642
    if (code != TSDB_CODE_SUCCESS) {
243,170,810✔
643
      taosMemoryFree(param);
×
644
      SCH_ERR_RET(code);
9,598✔
645
    }
646

647
    param->nodeEpId.nodeId = pAddr->nodeId;
243,180,408✔
648
    SEp *pEp = SCH_GET_CUR_EP(pAddr);
243,102,401✔
649
    tstrncpy(param->nodeEpId.ep.fqdn, pEp->fqdn, sizeof(param->nodeEpId.ep.fqdn));
242,365,526✔
650
    param->nodeEpId.ep.port = pEp->port;
242,193,319✔
651
    param->pTrans = trans->pTrans;
242,875,702✔
652
    *pParam = param;
242,242,073✔
653

654
    return TSDB_CODE_SUCCESS;
242,277,381✔
655
  }
656

657
  // hb msg
658
  SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
240,218,179✔
659
  if (NULL == param) {
238,977,642✔
660
    qError("calloc SSchTaskCallbackParam failed");
×
661
    SCH_ERR_RET(terrno);
×
662
  }
663

664
  param->head.isHbParam = true;
238,977,642✔
665
  param->pTrans = trans->pTrans;
239,269,298✔
666
  *pParam = param;
239,534,124✔
667

668
  return TSDB_CODE_SUCCESS;
239,703,259✔
669
}
670

671
int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint32_t msgSize, int32_t msgType,
2,147,483,647✔
672
                                SSchTrans *trans, bool isHb, SMsgSendInfo **pMsgSendInfo) {
673
  int32_t       code = 0;
2,147,483,647✔
674
  SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,147,483,647✔
675
  if (NULL == msgSendInfo) {
2,147,483,647✔
676
    qError("calloc SMsgSendInfo size %d failed", (int32_t)sizeof(SMsgSendInfo));
×
677
    SCH_ERR_JRET(terrno);
×
678
  }
679

680
  msgSendInfo->paramFreeFp = taosAutoMemoryFree;
2,147,483,647✔
681
  SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param));
2,147,483,647✔
682

683
  SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp));
2,147,483,647✔
684

685
  if (pJob) {
2,147,483,647✔
686
    msgSendInfo->requestId = pJob->conn.requestId;
2,147,483,647✔
687
    msgSendInfo->requestObjRefId = pJob->conn.requestObjRefId;
2,147,483,647✔
688
  } else {
689
    SCH_ERR_JRET(taosGetSystemUUIDU64(&msgSendInfo->requestId));
239,792,280✔
690
  }
691

692
  qDebug("ahandle %p alloced, QID:0x%" PRIx64, msgSendInfo, msgSendInfo->requestId);
2,147,483,647✔
693

694
  if (TDMT_SCH_LINK_BROKEN != msgType) {
2,147,483,647✔
695
    msgSendInfo->msgInfo.pData = msg;
2,147,483,647✔
696
    msgSendInfo->msgInfo.len = msgSize;
2,147,483,647✔
697
    msgSendInfo->msgInfo.handle = trans->pHandle;
2,147,483,647✔
698
    msgSendInfo->msgType = msgType;
2,147,483,647✔
699
  }
700

701
  *pMsgSendInfo = msgSendInfo;
2,147,483,647✔
702

703
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
704

705
_return:
×
706

707
  if (msgSendInfo) {
×
708
    destroySendMsgInfo(msgSendInfo);
×
709
  }
710

711
  taosMemoryFree(msg);
×
712

713
  SCH_RET(code);
×
714
}
715

716
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
2,147,483,647✔
717
  switch (msgType) {
2,147,483,647✔
718
    case TDMT_VND_CREATE_TABLE:
1,848,711,281✔
719
    case TDMT_VND_DROP_TABLE:
720
    case TDMT_VND_ALTER_TABLE:
721
    case TDMT_VND_SUBMIT:
722
    case TDMT_SCH_QUERY:
723
    case TDMT_SCH_MERGE_QUERY:
724
    case TDMT_VND_DELETE:
725
    case TDMT_SCH_EXPLAIN:
726
    case TDMT_SCH_FETCH:
727
    case TDMT_SCH_MERGE_FETCH:
728
      *fp = schHandleCallback;
1,848,711,281✔
729
      break;
1,848,775,908✔
730
    case TDMT_SCH_DROP_TASK:
514,825,604✔
731
      *fp = schHandleDropCallback;
514,825,604✔
732
      break;
514,827,732✔
733
    case TDMT_SCH_TASK_NOTIFY:
33,043✔
734
      *fp = schHandleNotifyCallback;
33,043✔
735
      break;
33,043✔
736
    case TDMT_SCH_QUERY_HEARTBEAT:
482,585,385✔
737
      *fp = schHandleHbCallback;
482,585,385✔
738
      break;
482,800,724✔
739
    case TDMT_VND_COMMIT:
3,864,644✔
740
      *fp = schHandleCommitCallback;
3,864,644✔
741
      break;
3,864,751✔
742
    case TDMT_SCH_LINK_BROKEN:
759,074,210✔
743
      *fp = schHandleLinkBrokenCallback;
759,074,210✔
744
      break;
759,027,675✔
745
    default:
155,845✔
746
      qError("unknown msg type for callback, msgType:%d", msgType);
155,845✔
747
      SCH_ERR_RET(TSDB_CODE_APP_ERROR);
244✔
748
  }
749

750
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
751
}
752

753
/*
754
int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
755
  SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
756
  if (NULL == param) {
757
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
758
    SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
759
  }
760

761
  param->head.isHbParam = true;
762

763
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
764

765
  param->nodeEpId.nodeId = addr->nodeId;
766
  SEp* pEp = SCH_GET_CUR_EP(addr);
767
  tstrncpy(param->nodeEpId.ep.fqdn, pEp->fqdn, sizeof(param->nodeEpId.ep.fqdn));
768
  param->nodeEpId.ep.port = pEp->port;
769
  param->pTrans = pJob->pTrans;
770

771
  *pParam = param;
772

773
  return TSDB_CODE_SUCCESS;
774
}
775
*/
776

777
int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
240,333,615✔
778
  int32_t code = 0;
240,333,615✔
779
  TAOS_MEMCPY(pDst, pSrc, sizeof(SRpcCtx));
240,333,615✔
780
  pDst->brokenVal.val = NULL;
240,333,615✔
781
  pDst->args = NULL;
240,256,863✔
782

783
  SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, &pDst->brokenVal.val));
240,135,848✔
784

785
  pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
240,001,697✔
786
  if (NULL == pDst->args) {
240,551,542✔
787
    qError("taosHashInit %d RpcCtx failed", 1);
×
788
    SCH_ERR_JRET(terrno);
×
789
  }
790

791
  SRpcCtxVal dst = {0};
240,654,204✔
792
  void      *pIter = taosHashIterate(pSrc->args, NULL);
240,346,598✔
793
  while (pIter) {
481,285,230✔
794
    SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
240,382,262✔
795
    int32_t    *msgType = taosHashGetKey(pIter, NULL);
240,382,262✔
796

797
    dst = *pVal;
240,146,698✔
798
    dst.val = NULL;
240,201,807✔
799

800
    SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, &dst.val));
240,201,807✔
801

802
    if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) {
240,385,344✔
803
      qError("taosHashPut msg %d to rpcCtx failed", *msgType);
×
804
      (*pSrc->freeFunc)(dst.val);
×
805
      SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
806
    }
807

808
    pIter = taosHashIterate(pSrc->args, pIter);
240,910,299✔
809
  }
810

811
  return TSDB_CODE_SUCCESS;
240,902,968✔
812

813
_return:
×
814

815
  schFreeRpcCtx(pDst);
×
816
  SCH_RET(code);
×
817
}
818

819
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
242,936,467✔
820
  int32_t              code = 0;
242,936,467✔
821
  SSchHbCallbackParam *param = NULL;
242,936,467✔
822
  SMsgSendInfo        *pMsgSendInfo = NULL;
242,936,467✔
823
  SQueryNodeAddr      *pAddr = NULL;
242,936,467✔
824
  SQueryNodeEpId       epId = {0};
243,117,682✔
825
  int32_t              msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP;
243,253,442✔
826

827
  code = schGetTaskCurrentNodeAddr(pTask, pJob, &pAddr);
242,534,282✔
828
  if (code != TSDB_CODE_SUCCESS) {
242,972,861✔
829
    SCH_ERR_JRET(code);
×
830
  }
831

832
  epId.nodeId = pAddr->nodeId;
242,972,861✔
833
  TAOS_MEMCPY(&epId.ep, SCH_GET_CUR_EP(pAddr), sizeof(SEp));
242,873,927✔
834

835
  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
242,571,417✔
836
  if (NULL == pCtx->args) {
243,342,786✔
837
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
×
838
    SCH_ERR_RET(terrno);
×
839
  }
840

841
  pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
243,384,765✔
842
  if (NULL == pMsgSendInfo) {
242,239,312✔
843
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
×
844
    SCH_ERR_JRET(terrno);
×
845
  }
846

847
  param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
242,239,312✔
848
  if (NULL == param) {
241,439,863✔
849
    SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
×
850
    SCH_ERR_JRET(terrno);
×
851
  }
852

853
  __async_send_cb_fn_t fp = NULL;
241,439,863✔
854
  SCH_ERR_JRET(schGetCallbackFp(TDMT_SCH_QUERY_HEARTBEAT, &fp));
242,154,018✔
855

856
  param->head.isHbParam = true;
243,014,075✔
857
  param->nodeEpId = epId;
242,994,744✔
858
  param->pTrans = pJob->conn.pTrans;
243,112,336✔
859

860
  pMsgSendInfo->param = param;
242,635,585✔
861
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
242,180,533✔
862
  pMsgSendInfo->fp = fp;
243,129,390✔
863

864
  SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo};
242,612,790✔
865
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
242,841,185✔
866
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
×
867
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
868
  }
869

870
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, true));
243,713,673✔
871
  pCtx->freeFunc = schFreeRpcCtxVal;
243,671,769✔
872

873
  return TSDB_CODE_SUCCESS;
243,624,095✔
874

875
_return:
×
876

877
  taosHashCleanup(pCtx->args);
×
878
  taosMemoryFreeClear(param);
×
879
  taosMemoryFreeClear(pMsgSendInfo);
×
880

881
  SCH_RET(code);
×
882
}
883

884
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
759,028,599✔
885
  int32_t       code = 0;
759,028,599✔
886
  int32_t       msgType = TDMT_SCH_LINK_BROKEN;
759,028,599✔
887
  SSchTrans     trans = {.pTrans = pJob->conn.pTrans};
759,028,599✔
888
  SMsgSendInfo *pMsgSendInfo = NULL;
759,190,271✔
889
  SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, msgType, &trans, isHb, &pMsgSendInfo));
759,416,839✔
890

891
  brokenVal->msgType = msgType;
760,108,226✔
892
  brokenVal->val = pMsgSendInfo;
760,102,701✔
893
  brokenVal->clone = schCloneSMsgSendInfo;
760,119,734✔
894

895
  return TSDB_CODE_SUCCESS;
760,081,605✔
896

897
_return:
×
898

899
  taosMemoryFreeClear(pMsgSendInfo->param);
×
900
  taosMemoryFreeClear(pMsgSendInfo);
×
901

902
  SCH_RET(code);
×
903
}
904

905
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
515,977,915✔
906
  int32_t       code = 0;
515,977,915✔
907
  SMsgSendInfo *pExplainMsgSendInfo = NULL;
515,977,915✔
908

909
  pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
516,084,881✔
910
  if (NULL == pCtx->args) {
516,122,186✔
911
    SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
×
912
    SCH_ERR_RET(terrno);
×
913
  }
914

915
  SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
516,147,032✔
916
  SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, TDMT_SCH_EXPLAIN, &trans, false, &pExplainMsgSendInfo));
516,227,350✔
917

918
  int32_t    msgType = TDMT_SCH_EXPLAIN_RSP;
516,340,196✔
919
  SRpcCtxVal ctxVal = {.val = pExplainMsgSendInfo, .clone = schCloneSMsgSendInfo};
516,323,231✔
920
  if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
516,357,052✔
921
    SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
×
922
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
923
  }
924

925
  SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, false));
516,447,432✔
926
  pCtx->freeFunc = schFreeRpcCtxVal;
516,376,801✔
927

928
  return TSDB_CODE_SUCCESS;
516,353,221✔
929

930
_return:
×
931

932
  taosHashCleanup(pCtx->args);
×
933

934
  if (pExplainMsgSendInfo) {
×
935
    taosMemoryFreeClear(pExplainMsgSendInfo->param);
×
936
    taosMemoryFreeClear(pExplainMsgSendInfo);
×
937
  }
938

939
  SCH_RET(code);
×
940
}
941

942
int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) {
579,641,815✔
943
  if (pSrc->isHbParam) {
579,641,815✔
944
    SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam));
504,954,279✔
945
    if (NULL == dst) {
502,483,501✔
946
      qError("malloc SSchHbCallbackParam failed");
×
947
      SCH_ERR_RET(terrno);
×
948
    }
949

950
    TAOS_MEMCPY(dst, pSrc, sizeof(*dst));
502,483,501✔
951
    *pDst = (SSchCallbackParamHeader *)dst;
502,483,501✔
952

953
    return TSDB_CODE_SUCCESS;
505,005,558✔
954
  }
955

956
  SSchTaskCallbackParam *dst = taosMemoryMalloc(sizeof(SSchTaskCallbackParam));
74,850,102✔
957
  if (NULL == dst) {
74,877,689✔
958
    qError("malloc SSchTaskCallbackParam failed");
×
959
    SCH_ERR_RET(terrno);
×
960
  }
961

962
  TAOS_MEMCPY(dst, pSrc, sizeof(*dst));
74,877,689✔
963
  *pDst = (SSchCallbackParamHeader *)dst;
74,877,689✔
964

965
  return TSDB_CODE_SUCCESS;
74,878,877✔
966
}
967

968
int32_t schCloneSMsgSendInfo(void *src, void **dst) {
579,043,135✔
969
  SMsgSendInfo *pSrc = src;
579,043,135✔
970
  int32_t       code = 0;
579,043,135✔
971
  SMsgSendInfo *pDst = taosMemoryCalloc(1, sizeof(*pSrc));
579,043,135✔
972
  if (NULL == pDst) {
577,571,972✔
973
    qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
×
974
    SCH_ERR_RET(terrno);
×
975
  }
976

977
  TAOS_MEMCPY(pDst, pSrc, sizeof(*pSrc));
577,571,972✔
978
  pDst->param = NULL;
577,571,972✔
979

980
  SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
579,812,816✔
981
  pDst->paramFreeFp = taosAutoMemoryFree;
579,729,569✔
982

983
  *dst = pDst;
579,567,298✔
984

985
  return TSDB_CODE_SUCCESS;
579,793,779✔
986

987
_return:
×
988

989
  taosMemoryFreeClear(pDst);
×
990
  SCH_RET(code);
×
991
}
992

993
int32_t schUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, SQueryNodeAddr *addr, SSchTask *pTask) {
2,092,095,454✔
994
  if (NULL == pTask || addr->nodeId < MNODE_HANDLE) {
2,092,095,454✔
995
    return TSDB_CODE_SUCCESS;
243,819,555✔
996
  }
997

998
  if (addr->nodeId == MNODE_HANDLE) {
1,848,297,678✔
999
    pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
20,033,859✔
1000
  } else {
1001
    pMsgSendInfo->target.type = TARGET_TYPE_VNODE;
1,828,281,868✔
1002
    pMsgSendInfo->target.vgId = addr->nodeId;
1,828,293,239✔
1003
    pMsgSendInfo->target.dbFName = taosStrdup(pTask->plan->dbFName);
1,828,269,250✔
1004
    if (NULL == pMsgSendInfo->target.dbFName) {
1,828,082,533✔
1005
      return terrno;
×
1006
    }
1007
  }
1008

1009
  return TSDB_CODE_SUCCESS;
1,848,085,770✔
1010
}
1011

1012
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQueryNodeAddr *addr, int32_t msgType,
2,091,404,373✔
1013
                        void *msg, uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
1014
  int32_t code = 0;
2,091,404,373✔
1015
  SEpSet *epSet = &addr->epSet;
2,091,404,373✔
1016

1017
  SMsgSendInfo *pMsgSendInfo = NULL;
2,091,664,125✔
1018
  bool          isHb = (TDMT_SCH_QUERY_HEARTBEAT == msgType);
2,091,727,757✔
1019
  SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
2,091,727,757✔
1020
  SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
2,092,279,893✔
1021

1022
  if (isHb && persistHandle && trans->pHandle == 0) {
2,091,997,273✔
1023
    int64_t refId = 0;
240,859,032✔
1024
    code = rpcAllocHandle(&refId); 
240,857,507✔
1025
    if (code != 0) {
240,819,445✔
1026
      SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code);
×
1027
      SCH_ERR_JRET(code);
10✔
1028
    }
1029
    trans->pHandle = (void *)refId;
240,814,562✔
1030
    pMsgSendInfo->msgInfo.handle =trans->pHandle;
240,823,583✔
1031
  } 
1032

1033
  if (pJob && pTask) {
2,091,949,009✔
1034
    SCH_TASK_DLOG("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
1,851,179,611✔
1035
           epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
1036
  } else {
1037
    qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
240,769,398✔
1038
           epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
1039
  }
1040
  
1041
  if (pTask) {
2,091,877,658✔
1042
    pTask->lastMsgType = msgType;
1,851,100,340✔
1043
  }
1044

1045
  code = asyncSendMsgToServerExt(trans->pTrans, epSet, NULL, pMsgSendInfo, persistHandle, ctx);
2,091,889,861✔
1046
  pMsgSendInfo = NULL;
2,092,195,859✔
1047
  if (code) {
2,092,195,859✔
1048
    SCH_ERR_JRET(code);
265,021✔
1049
  }
1050

1051
  if (pJob) {
2,091,930,838✔
1052
    SCH_TASK_TLOG("req msg sent, type:%d, %s", msgType, TMSG_INFO(msgType));
1,851,131,395✔
1053
  } else {
1054
    qTrace("req msg sent, type:%d, %s", msgType, TMSG_INFO(msgType));
240,799,443✔
1055
  }
1056
  return TSDB_CODE_SUCCESS;
2,091,868,573✔
1057

1058
_return:
275,161✔
1059

1060
  if (pJob) {
265,021✔
1061
    SCH_TASK_ELOG("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
260,649✔
1062
  } else {
1063
    qError("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
4,372✔
1064
  }
1065

1066
  if (pMsgSendInfo) {
265,021✔
1067
    destroySendMsgInfo(pMsgSendInfo);
×
1068
  }
1069

1070
  SCH_RET(code);
265,021✔
1071
}
1072

1073
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) {
241,020,867✔
1074
  SSchedulerHbReq req = {0};
241,020,867✔
1075
  int32_t         code = 0;
241,021,285✔
1076
  SRpcCtx         rpcCtx = {0};
241,021,285✔
1077
  SSchTrans       trans = {0};
241,021,285✔
1078
  int32_t         msgType = TDMT_SCH_QUERY_HEARTBEAT;
241,021,285✔
1079

1080
  req.header.vgId = nodeEpId->nodeId;
241,021,285✔
1081
  req.clientId = schMgmt.clientId;
241,021,285✔
1082
  TAOS_MEMCPY(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
241,021,285✔
1083

1084
  SCH_LOCK(SCH_READ, &schMgmt.hbLock);
241,021,285✔
1085
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
240,933,343✔
1086
  if (NULL == hb) {
240,451,255✔
1087
    SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
88,076✔
1088
    qError("hb connection no longer exist, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
88,076✔
1089
           nodeEpId->ep.port);
1090
    return TSDB_CODE_SUCCESS;
88,076✔
1091
  }
1092

1093
  SCH_LOCK(SCH_WRITE, &hb->lock);
240,363,179✔
1094
  code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
239,882,468✔
1095
  TAOS_MEMCPY(&trans, &hb->trans, sizeof(trans));
240,627,338✔
1096
  if (NULL == hb->trans.pTrans) {
240,487,362✔
1097
    qError("NULL pTrans got from hbConnections for epId:%d", nodeEpId->nodeId);
×
1098
  }
1099
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
240,684,819✔
1100
  SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
240,331,789✔
1101

1102
  SCH_ERR_RET(code);
239,407,081✔
1103

1104
  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
239,407,081✔
1105
  if (msgSize < 0) {
240,170,409✔
1106
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
×
1107
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
1108
  }
1109
  void *msg = taosMemoryCalloc(1, msgSize);
240,170,409✔
1110
  if (NULL == msg) {
240,218,923✔
1111
    qError("calloc hb req %d failed", msgSize);
×
1112
    SCH_ERR_JRET(terrno);
×
1113
  }
1114

1115
  if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
240,218,923✔
1116
    qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
×
1117
    SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
1118
  }
1119

1120
  int64_t        transporterId = 0;
240,184,237✔
1121
  SQueryNodeAddr addr = {.nodeId = nodeEpId->nodeId};
240,184,237✔
1122
  addr.epSet.inUse = 0;
240,304,789✔
1123
  addr.epSet.numOfEps = 1;
240,304,789✔
1124
  TAOS_MEMCPY(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
240,304,789✔
1125

1126
  code = schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx);
240,168,473✔
1127
  msg = NULL;
240,878,513✔
1128
  SCH_ERR_JRET(code);
240,878,513✔
1129

1130
  return TSDB_CODE_SUCCESS;
240,874,141✔
1131

1132
_return:
4,372✔
1133

1134
  taosMemoryFreeClear(msg);
4,372✔
1135
  schFreeRpcCtx(&rpcCtx);
4,372✔
1136
  SCH_RET(code);
4,372✔
1137
}
1138

1139
int32_t schBuildSubJobEndpoints(SSchTask *pTask, SArray** ppRes, SSchJob* pTarget) {
516,216,886✔
1140
  *ppRes = NULL;
516,216,886✔
1141
  int32_t code = TSDB_CODE_SUCCESS;
516,260,013✔
1142
  SSchJob* pJob = NULL;
516,260,013✔
1143
  
1144
  if (SCH_IS_PARENT_JOB(pTarget) && !SCH_JOB_GOT_SUB_JOBS(pTarget)) {
516,260,013✔
1145
    pJob = pTarget;
319,214,788✔
1146
    SCH_TASK_DLOG("task query without subEndPoints, pJob:%p", pTarget);
319,214,788✔
1147
    return TSDB_CODE_SUCCESS;
319,234,179✔
1148
  }
1149
  
1150
  int32_t subJobNum = SCH_IS_PARENT_JOB(pTarget) ? pTarget->subJobs->size : pTarget->subJobId;
197,032,205✔
1151
  SSchJob* pParent = SCH_IS_PARENT_JOB(pTarget) ? pTarget : (SSchJob*)pTarget->parent;
196,975,122✔
1152
  SDownstreamSourceNode* pSource = NULL;
196,977,929✔
1153

1154
  if (subJobNum > 0) {
196,971,239✔
1155
    *ppRes = taosArrayInit(subJobNum, POINTER_BYTES);
118,845,260✔
1156
    if (NULL == *ppRes) {
118,893,422✔
1157
      SCH_ERR_RET(terrno);
1✔
1158
    }
1159
  }
1160
  
1161
  for (int32_t i = 0; i < subJobNum; ++i) {
412,113,714✔
1162
    pJob = taosArrayGetP(pParent->subJobs, i);
215,036,488✔
1163
    if (NULL == pJob || NULL == pJob->fetchTask) {
215,026,710✔
UNCOV
1164
      SCH_JOB_ELOG("no fetchTask set in job, pJob:%p, fetchTask:%p", pJob, pJob ? pJob->fetchTask : NULL);
×
UNCOV
1165
      SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1166
    }
1167

1168
    SCH_ERR_JRET(nodesMakeNode(QUERY_NODE_DOWNSTREAM_SOURCE, (SNode**)&pSource));
215,029,481✔
1169
    
1170
    memcpy(&pSource->addr, &pJob->resNode, sizeof(pSource->addr));
214,962,671✔
1171
    pSource->clientId = pJob->fetchTask->clientId;
214,963,382✔
1172
    pSource->taskId = pJob->fetchTask->taskId;
214,951,004✔
1173
    pSource->sId = pJob->fetchTask->seriesId;
214,953,784✔
1174
    pSource->execId = pJob->fetchTask->execId;
214,956,008✔
1175
    pSource->fetchMsgType = SCH_FETCH_TYPE(pJob->fetchTask);
214,957,102✔
1176
    pSource->localExec = pJob->attr.localExec;
214,958,232✔
1177
    if (NULL == taosArrayPush(*ppRes, &pSource)) {
430,009,592✔
1178
      nodesDestroyNode((SNode *)pSource);
×
1179
      SCH_ERR_JRET(terrno);
×
1180
    }
1181
  }
1182

1183
  pJob = pTarget;
197,077,226✔
1184
  SCH_TASK_DLOG("task query with %d subEndPoints", subJobNum);
197,077,226✔
1185

1186
_return:
197,077,226✔
1187

1188
  if (code) {
197,061,152✔
1189
    taosArrayDestroyP(*ppRes, (FDelete)nodesDestroyNode);
×
1190
    *ppRes = NULL;
×
1191
  }
1192
  
1193
  return code;
197,055,628✔
1194
}
1195

1196
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType, void* param) {
1,851,056,146✔
1197
  int32_t  msgSize = 0;
1,851,056,146✔
1198
  void    *msg = NULL;
1,851,056,146✔
1199
  int32_t  code = 0;
1,851,056,146✔
1200
  bool     isCandidateAddr = false;
1,851,056,146✔
1201
  bool     persistHandle = false;
1,851,056,146✔
1202
  SRpcCtx  rpcCtx = {0};
1,851,056,146✔
1203

1204
  if (NULL == addr) {
1,851,107,928✔
1205
    code = schGetTaskCurrentNodeAddr(pTask, pJob, &addr);
1,073,596,039✔
1206
    if (code != TSDB_CODE_SUCCESS) {
1,073,817,421✔
1207
      SCH_ERR_JRET(code);
1,960✔
1208
    }
1209
    
1210
    isCandidateAddr = true;
1,073,815,461✔
1211
    SCH_TASK_TLOG("target candidateIdx %d, epInUse %d/%d", pTask->candidateIdx, addr->epSet.inUse,
1,073,815,461✔
1212
                  addr->epSet.numOfEps);
1213
  }
1214

1215
  switch (msgType) {
1,850,870,703✔
1216
    case TDMT_VND_CREATE_TABLE:
555,691,586✔
1217
    case TDMT_VND_DROP_TABLE:
1218
    case TDMT_VND_ALTER_TABLE:
1219
    case TDMT_VND_SUBMIT:
1220
    case TDMT_VND_COMMIT: {
1221
      msgSize = pTask->msgLen;
555,691,586✔
1222
      msg = pTask->msg;
555,707,152✔
1223
      pTask->msg = NULL;
555,695,158✔
1224
      break;
555,708,669✔
1225
    }
1226

1227
    case TDMT_VND_DELETE: {
1,803,195✔
1228
      SVDeleteReq req = {0};
1,803,195✔
1229
      req.header.vgId = addr->nodeId;
1,803,195✔
1230
      req.sId = pTask->seriesId;
1,803,195✔
1231
      req.queryId = pJob->queryId;
1,803,195✔
1232
      req.clientId = pTask->clientId;
1,803,195✔
1233
      req.taskId = pTask->taskId;
1,803,195✔
1234
      req.phyLen = pTask->msgLen;
1,803,195✔
1235
      req.sqlLen = strlen(pJob->sql);
1,803,195✔
1236
      req.sql = (char *)pJob->sql;
1,803,195✔
1237
      req.msg = pTask->msg;
1,803,195✔
1238
      req.source       = pJob->source;
1,803,195✔
1239
      req.secureDelete = pJob->secureDelete;
1,803,195✔
1240
      msgSize = tSerializeSVDeleteReq(NULL, 0, &req);
1,803,195✔
1241
      if (msgSize < 0) {
1,803,195✔
1242
        SCH_TASK_ELOG("tSerializeSVDeleteReq failed, code:%x", terrno);
×
1243
        SCH_ERR_JRET(terrno);
×
1244
      }
1245
      msg = taosMemoryCalloc(1, msgSize);
1,803,195✔
1246
      if (NULL == msg) {
1,803,195✔
1247
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1248
        SCH_ERR_JRET(terrno);
×
1249
      }
1250

1251
      msgSize = tSerializeSVDeleteReq(msg, msgSize, &req);
1,803,195✔
1252
      if (msgSize < 0) {
1,803,195✔
1253
        SCH_TASK_ELOG("tSerializeSVDeleteReq second failed, code:%x", terrno);
×
1254
        SCH_ERR_JRET(terrno);
×
1255
      }
1256
      break;
1,803,195✔
1257
    }
1258
    case TDMT_SCH_QUERY:
515,871,122✔
1259
    case TDMT_SCH_MERGE_QUERY: {
1260
      int32_t newPhase = (TDMT_SCH_QUERY == msgType) ? QUERY_PHASE_EXEC_DATA_QUERY : QUERY_PHASE_EXEC_MERGE_QUERY;
515,871,122✔
1261
      SCH_UPDATE_JOB_PHASE_IF_CHANGED(pJob, newPhase);
877,425,448✔
1262

1263
      SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
516,340,113✔
1264

1265
      SSubQueryMsg qMsg;
396,259,621✔
1266
      qMsg.header.vgId = addr->nodeId;
516,361,882✔
1267
      qMsg.header.contLen = 0;
516,372,039✔
1268
      qMsg.sId = pTask->seriesId;
516,372,039✔
1269
      qMsg.queryId = pJob->queryId;
516,373,978✔
1270
      qMsg.clientId = pTask->clientId;
516,383,961✔
1271
      qMsg.taskId = pTask->taskId;
516,345,498✔
1272
      qMsg.refId = pJob->refId;
516,369,320✔
1273
      qMsg.execId = pTask->execId;
516,346,646✔
1274
      qMsg.msgMask = (pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0;
516,354,820✔
1275
      qMsg.msgMask |= (pTask->plan->isView) ? QUERY_MSG_MASK_VIEW() : 0;
516,385,174✔
1276
      qMsg.msgMask |= (pTask->plan->isAudit) ? QUERY_MSG_MASK_AUDIT() : 0;
516,370,895✔
1277
      qMsg.msgMask |= (!SCH_IS_PARENT_JOB(pJob) && SCH_IS_ROOT_TASK(pTask)) ? QUERY_MSG_MASK_SUBQUERY() : 0;
516,354,165✔
1278
      qMsg.subQType = (!SCH_IS_PARENT_JOB(pJob) && SCH_IS_ROOT_TASK(pTask)) ? pJob->pDag->subQType : 0;
516,348,520✔
1279
      qMsg.taskType = (pJob->attr.type == JOB_TYPE_HQUERY)? TASK_TYPE_HQUERY:TASK_TYPE_QUERY;
516,344,700✔
1280
      qMsg.explain = SCH_IS_EXPLAIN_JOB(pJob);
516,372,215✔
1281
      qMsg.needFetch = SCH_TASK_NEED_FETCH(pTask);
516,345,549✔
1282
      qMsg.sqlLen = pJob->sql ? strlen(pJob->sql) : 0;
516,353,704✔
1283
      qMsg.sql = pJob->sql;
516,377,855✔
1284
      qMsg.msgLen = pTask->msgLen;
516,369,535✔
1285
      qMsg.msg = pTask->msg;
516,388,034✔
1286

1287
      if (strcmp(tsLocalFqdn, GET_ACTIVE_EP(&addr->epSet)->fqdn) == 0) {
516,354,098✔
1288
        qMsg.compress = 0;
476,960,633✔
1289
      } else {
1290
        qMsg.compress = 1;
39,373,710✔
1291
      }
1292

1293
      SCH_ERR_JRET(schBuildSubJobEndpoints(pTask, &qMsg.subEndPoints, pJob));
516,334,343✔
1294

1295
      msgSize = tSerializeSSubQueryMsg(NULL, 0, &qMsg);
516,292,630✔
1296
      if (msgSize < 0) {
516,108,372✔
1297
        SCH_TASK_ELOG("tSerializeSSubQueryMsg get size, msgSize:%d", msgSize);
×
1298
        taosArrayDestroyP(qMsg.subEndPoints, (FDelete)nodesDestroyNode);
×
1299
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
1300
      }
1301
      
1302
      msg = taosMemoryCalloc(1, msgSize);
516,108,372✔
1303
      if (NULL == msg) {
515,943,670✔
1304
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1305
        taosArrayDestroyP(qMsg.subEndPoints, (FDelete)nodesDestroyNode);
×
1306
        SCH_ERR_JRET(terrno);
×
1307
      }
1308

1309
      if (tSerializeSSubQueryMsg(msg, msgSize, &qMsg) < 0) {
515,943,670✔
1310
        SCH_TASK_ELOG("tSerializeSSubQueryMsg failed, msgSize:%d", msgSize);
×
1311
        taosArrayDestroyP(qMsg.subEndPoints, (FDelete)nodesDestroyNode);
×
1312
        taosMemoryFree(msg);
×
1313
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
×
1314
      }
1315

1316
      taosArrayDestroyP(qMsg.subEndPoints, (FDelete)nodesDestroyNode);
516,004,723✔
1317

1318
      persistHandle = true;
516,001,918✔
1319
      int64_t refId = 0;
516,001,918✔
1320
      code = rpcAllocHandle(&refId);
515,927,705✔
1321
      if (code != 0) {
516,304,143✔
1322
        SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code);
×
1323
        SCH_ERR_JRET(code);
×
1324
      }
1325

1326
      SCH_SET_TASK_HANDLE(pTask, (void *)refId);
516,148,033✔
1327
      break;
516,137,953✔
1328
    }
1329
    case TDMT_SCH_FETCH:
262,643,527✔
1330
    case TDMT_SCH_MERGE_FETCH: {
1331
      SResFetchReq req = {0};
262,643,527✔
1332
      req.header.vgId = addr->nodeId;
262,643,544✔
1333
      req.sId = pTask->seriesId;
262,643,544✔
1334
      req.queryId = pJob->queryId;
262,643,503✔
1335
      req.clientId = pTask->clientId;
262,643,503✔
1336
      req.taskId = pTask->taskId;
262,643,481✔
1337
      req.execId = pTask->execId;
262,643,479✔
1338
      req.reset = true;
262,643,531✔
1339

1340
      msgSize = tSerializeSResFetchReq(NULL, 0, &req, false, false);
262,643,531✔
1341
      if (msgSize < 0) {
262,643,518✔
1342
        SCH_TASK_ELOG("tSerializeSResFetchReq get size, msgSize:%d", msgSize);
×
1343
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1344
      }
1345
      
1346
      msg = taosMemoryCalloc(1, msgSize);
262,643,518✔
1347
      if (NULL == msg) {
262,643,051✔
1348
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1349
        SCH_ERR_RET(terrno);
×
1350
      }
1351

1352
      if (tSerializeSResFetchReq(msg, msgSize, &req, false, false) < 0) {
262,643,051✔
1353
        SCH_TASK_ELOG("tSerializeSResFetchReq %d failed", msgSize);
×
UNCOV
1354
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1355
      }
1356
      break;
262,643,542✔
1357
    }
1358
    case TDMT_SCH_DROP_TASK: {
514,830,190✔
1359
      STaskDropReq qMsg;
394,751,667✔
1360
      qMsg.header.vgId = addr->nodeId;
514,830,163✔
1361
      qMsg.header.contLen = 0;
514,830,260✔
1362
      qMsg.sId = pTask->seriesId;
514,830,260✔
1363
      qMsg.queryId = pJob->queryId;
514,828,724✔
1364
      qMsg.clientId = pTask->clientId;
514,829,084✔
1365
      qMsg.taskId = pTask->taskId;
514,829,956✔
1366
      qMsg.refId = pJob->refId;
514,829,496✔
1367
      qMsg.execId = *(int32_t*)param;
514,829,749✔
1368

1369
      msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
514,829,548✔
1370
      if (msgSize < 0) {
514,826,793✔
1371
        SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
×
1372
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1373
      }
1374
      
1375
      msg = taosMemoryCalloc(1, msgSize);
514,826,793✔
1376
      if (NULL == msg) {
514,831,141✔
1377
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1378
        SCH_ERR_RET(terrno);
×
1379
      }
1380

1381
      if (tSerializeSTaskDropReq(msg, msgSize, &qMsg) < 0) {
514,831,141✔
1382
        SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
×
1383
        taosMemoryFree(msg);
×
1384
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
48✔
1385
      }
1386
      break;
514,826,322✔
1387
    }
1388
/*
1389
    case TDMT_SCH_QUERY_HEARTBEAT: {
1390
      SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
1391

1392
      SSchedulerHbReq req = {0};
1393
      req.clientId = schMgmt.clientId;
1394
      req.header.vgId = addr->nodeId;
1395
      req.epId.nodeId = addr->nodeId;
1396
      TAOS_MEMCPY(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
1397

1398
      msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
1399
      if (msgSize < 0) {
1400
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
1401
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
1402
      }
1403
      msg = taosMemoryCalloc(1, msgSize);
1404
      if (NULL == msg) {
1405
        SCH_JOB_ELOG("calloc %d failed", msgSize);
1406
        SCH_ERR_RET(terrno);
1407
      }
1408
      if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
1409
        SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
1410
        SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
1411
      }
1412

1413
      persistHandle = true;
1414
      break;
1415
    }
1416
*/    
1417
    case TDMT_SCH_TASK_NOTIFY: {
31,083✔
1418
      ETaskNotifyType* pType = param;
31,083✔
1419
      STaskNotifyReq qMsg = {0};
31,083✔
1420
      qMsg.header.vgId = addr->nodeId;
31,083✔
1421
      qMsg.header.contLen = 0;
31,083✔
1422
      qMsg.sId = pTask->seriesId;
31,083✔
1423
      qMsg.queryId = pJob->queryId;
31,083✔
1424
      qMsg.clientId = pTask->clientId;
31,083✔
1425
      qMsg.taskId = pTask->taskId;
31,083✔
1426
      qMsg.refId = pJob->refId;
31,083✔
1427
      qMsg.execId = pTask->execId;
31,083✔
1428
      qMsg.type = *pType;
31,083✔
1429

1430
      msgSize = tSerializeSTaskNotifyReq(NULL, 0, &qMsg);
31,083✔
1431
      if (msgSize < 0) {
31,083✔
1432
        SCH_TASK_ELOG("tSerializeSTaskNotifyReq get size, msgSize:%d", msgSize);
×
1433
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1434
      }
1435
      
1436
      msg = taosMemoryCalloc(1, msgSize);
31,083✔
1437
      if (NULL == msg) {
31,083✔
1438
        SCH_TASK_ELOG("calloc %d failed", msgSize);
×
1439
        SCH_ERR_RET(terrno);
×
1440
      }
1441

1442
      if (tSerializeSTaskNotifyReq(msg, msgSize, &qMsg) < 0) {
31,083✔
1443
        SCH_TASK_ELOG("tSerializeSTaskNotifyReq failed, msgSize:%d", msgSize);
×
1444
        taosMemoryFree(msg);
×
1445
        SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
1446
      }
1447
      break;      
31,083✔
1448
    }
1449
    default:
×
1450
      SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
×
1451
      SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
×
1452
  }
1453

1454
  if ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (TDMT_VND_SUBMIT == msgType)) {
1,851,113,239✔
1455
    taosMemoryFree(msg);
69✔
1456
    SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
69✔
1457
  } else {
1458
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
1,851,113,170✔
1459
      SCH_ERR_JRET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
516,074,872✔
1460
    }
1461

1462
    SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
1,851,334,260✔
1463
    if (msgType == TDMT_SCH_DROP_TASK && pJob->errCode == TSDB_CODE_RPC_TIMEOUT) {
1,851,367,031✔
1464
      trans.pHandle = NULL;
×
1465
      SCH_TASK_WLOG("clear refId before send drop-task msg, code:%s", tstrerror(pJob->errCode));
×
1466
    }
1467

1468
    code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
1,851,256,022✔
1469
    msg = NULL;
1,851,235,112✔
1470
    SCH_ERR_JRET(code);
1,851,235,112✔
1471
  }
1472

1473
  return TSDB_CODE_SUCCESS;
1,850,961,143✔
1474

1475
_return:
269,151✔
1476

1477
  pTask->lastMsgType = -1;
262,609✔
1478
  schFreeRpcCtx(&rpcCtx);
262,609✔
1479

1480
  taosMemoryFreeClear(msg);
262,609✔
1481
  SCH_RET(code);
262,609✔
1482
}
1483
// clang-format on
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc