• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.67
/source/libs/catalog/src/ctgRemote.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "ctgRemote.h"
17
#include "catalogInt.h"
18
#include "query.h"
19
#include "systable.h"
20
#include "tname.h"
21
#include "tref.h"
22
#include "trpc.h"
23

24
typedef void* (*MallocType)(int64_t);
25

26
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
206,648✔
27
  int32_t       code = 0;
206,648✔
28
  SCatalog*     pCtg = pJob->pCtg;
206,648✔
29
  int32_t       taskNum = taosArrayGetSize(cbParam->taskId);
206,648✔
30
  SDataBuf      taskMsg = *pMsg;
206,652✔
31
  int32_t       msgNum = 0;
206,652✔
32
  SBatchRsp     batchRsp = {0};
206,652✔
33
  SBatchRspMsg  rsp = {0};
206,652✔
34
  SBatchRspMsg* pRsp = NULL;
206,652✔
35

36
  if (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) {
206,652!
37
    code = tDeserializeSBatchRsp(pMsg->pData, pMsg->len, &batchRsp);
206,093✔
38
    if (code < 0) {
206,089!
39
      ctgError("tDeserializeSBatchRsp failed, msgLen:%d", pMsg->len);
×
40
      CTG_ERR_RET(code);
×
41
    }
42

43
    msgNum = taosArrayGetSize(batchRsp.pRsps);
206,089✔
44
  }
45

46
  if (taskNum != msgNum && 0 != msgNum) {
206,642!
47
    ctgError("taskNum %d mis-match msgNum %d", taskNum, msgNum);
×
48
    msgNum = 0;
×
49
  }
50

51
  ctgDebug("QID:0x%" PRIx64 ", catalog got batch:%d rsp:%s", pJob->queryId, cbParam->batchId,
206,642!
52
           TMSG_INFO(cbParam->reqType + 1));
53

54
  SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
206,645✔
55
  if (NULL == pBatchs) {
206,652!
56
    ctgError("taosHashInit %d batch failed", taskNum);
×
57
    CTG_ERR_JRET(terrno);
×
58
  }
59

60
  for (int32_t i = 0; i < taskNum; ++i) {
620,350✔
61
    int32_t* taskId = taosArrayGet(cbParam->taskId, i);
413,699✔
62
    if (NULL == taskId) {
413,701!
63
      ctgError("taosArrayGet %d taskId failed, total:%d", i, (int32_t)taosArrayGetSize(cbParam->taskId));
×
64
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
65
    }
66

67
    int32_t* msgIdx = taosArrayGet(cbParam->msgIdx, i);
413,701✔
68
    if (NULL == msgIdx) {
413,701!
69
      ctgError("taosArrayGet %d msgIdx failed, total:%d", i, (int32_t)taosArrayGetSize(cbParam->msgIdx));
×
70
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
71
    }
72

73
    SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
413,701✔
74
    if (NULL == pTask) {
413,695!
75
      ctgError("taosArrayGet %d SCtgTask failed, total:%d", *taskId, (int32_t)taosArrayGetSize(pJob->pTasks));
×
76
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
1!
77
    }
78

79
    if (msgNum > 0) {
413,696✔
80
      pRsp = taosArrayGet(batchRsp.pRsps, i);
413,066✔
81

82
      if (pRsp->msgIdx != *msgIdx) {
413,063✔
83
        ctgError("rsp msgIdx %d mis-match msgIdx %d", pRsp->msgIdx, *msgIdx);
1!
84

85
        pRsp = &rsp;
×
86
        pRsp->msgIdx = *msgIdx;
×
87
        pRsp->reqType = -1;
×
88
        pRsp->rspCode = 0;
×
89
        taskMsg.msgType = -1;
×
90
        taskMsg.pData = NULL;
×
91
        taskMsg.len = 0;
×
92
      } else {
93
        taskMsg.msgType = pRsp->reqType;
413,062✔
94
        taskMsg.pData = pRsp->msg;
413,062✔
95
        taskMsg.len = pRsp->msgLen;
413,062✔
96
      }
97
    } else {
98
      pRsp = &rsp;
630✔
99
      pRsp->msgIdx = *msgIdx;
630✔
100
      pRsp->reqType = -1;
630✔
101
      pRsp->rspCode = 0;
630✔
102
      taskMsg.msgType = -1;
630✔
103
      taskMsg.pData = NULL;
630✔
104
      taskMsg.len = 0;
630✔
105
    }
106

107
    SCtgTaskReq tReq;
108
    tReq.pTask = pTask;
413,692✔
109
    tReq.msgIdx = pRsp->msgIdx;
413,692✔
110
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx);
413,692✔
111
    if (NULL == pMsgCtx) {
413,699!
112
      ctgError("task:%d, get SCtgMsgCtx failed, taskType:%d", tReq.msgIdx, pTask->type);
×
113
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
114
    }
115

116
    pMsgCtx->pBatchs = pBatchs;
413,699✔
117

118
    ctgDebug("QID:0x%" PRIx64 ", catalog task:%d handle rsp:%s, idx:%d pBatchs:%p", pJob->queryId, pTask->taskId,
413,699!
119
             TMSG_INFO(taskMsg.msgType + 1), pRsp->msgIdx, pBatchs);
120

121
    (void)(*gCtgAsyncFps[pTask->type].handleRspFp)(
413,700✔
122
        &tReq, pRsp->reqType, &taskMsg, (pRsp->rspCode ? pRsp->rspCode : rspCode));  // error handled internal
413,700✔
123
  }
124

125
  CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
206,651!
126

127
_return:
206,648✔
128

129
  taosArrayDestroyEx(batchRsp.pRsps, tFreeSBatchRspMsg);
206,648✔
130

131
  ctgFreeBatchs(pBatchs);
206,651✔
132
  CTG_RET(code);
206,651!
133
}
134

135
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) {
439,730✔
136
  int32_t code = 0;
439,730✔
137

138
  switch (reqType) {
439,730✔
139
    case TDMT_MND_QNODE_LIST: {
12,948✔
140
      if (TSDB_CODE_SUCCESS != rspCode) {
12,948!
141
        qError("error rsp for qnode list, error:%s", tstrerror(rspCode));
×
142
        CTG_ERR_RET(rspCode);
×
143
      }
144

145
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
12,948✔
146
      if (code) {
12,948!
147
        qError("process qnode list rsp failed, error:%s", tstrerror(rspCode));
×
148
        CTG_ERR_RET(code);
×
149
      }
150

151
      qDebug("got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out));
12,948✔
152
      break;
12,948✔
153
    }
154
    case TDMT_MND_DNODE_LIST: {
368✔
155
      if (TSDB_CODE_SUCCESS != rspCode) {
368!
156
        qError("error rsp for dnode list, error:%s", tstrerror(rspCode));
×
157
        CTG_ERR_RET(rspCode);
×
158
      }
159

160
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
368✔
161
      if (code) {
368!
162
        qError("process dnode list rsp failed, error:%s", tstrerror(rspCode));
×
163
        CTG_ERR_RET(code);
×
164
      }
165

166
      qDebug("got dnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*(SArray**)out));
368!
167
      break;
368✔
168
    }
169
    case TDMT_MND_USE_DB: {
150,287✔
170
      if (TSDB_CODE_SUCCESS != rspCode) {
150,287✔
171
        qError("db:%s, error rsp for use db, error:%s", target, tstrerror(rspCode));
766!
172
        CTG_ERR_RET(rspCode);
766!
173
      }
174

175
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
149,521✔
176
      if (code) {
149,525!
177
        qError("db:%s, process use db rsp failed, error:%s", target, tstrerror(code));
×
178
        CTG_ERR_RET(code);
×
179
      }
180

181
      qDebug("db:%s, got db vgInfo from mnode", target);
149,525✔
182
      break;
149,524✔
183
    }
184
    case TDMT_MND_GET_DB_CFG: {
6,542✔
185
      if (TSDB_CODE_SUCCESS != rspCode) {
6,542✔
186
        qError("db:%s, error rsp for get db cfg, error:%s", target, tstrerror(rspCode));
844!
187
        CTG_ERR_RET(rspCode);
844!
188
      }
189

190
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
5,698✔
191
      if (code) {
5,698!
192
        qError("db:%s, process get db cfg rsp failed, error:%s", target, tstrerror(code));
×
193
        CTG_ERR_RET(code);
×
194
      }
195

196
      qDebug("db:%s, got db cfg from mnode", target);
5,698✔
197
      break;
5,698✔
198
    }
199
    case TDMT_MND_GET_INDEX: {
1✔
200
      if (TSDB_CODE_SUCCESS != rspCode) {
1!
201
        qError("index:%s, error rsp for get index, error:%s", target, tstrerror(rspCode));
1!
202
        CTG_ERR_RET(rspCode);
1!
203
      }
204

205
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
×
206
      if (code) {
×
207
        qError("index:%s, process get index rsp failed, error:%s", target, tstrerror(code));
×
208
        CTG_ERR_RET(code);
×
209
      }
210

211
      qDebug("index:%s, got index from mnode", target);
×
212
      break;
×
213
    }
214
    case TDMT_MND_GET_TABLE_INDEX: {
1,379✔
215
      if (TSDB_CODE_SUCCESS != rspCode) {
1,379✔
216
        qError("tb:%s, error rsp for get table index, error:%s", target, tstrerror(rspCode));
1,378!
217
        CTG_ERR_RET(rspCode);
1,378!
218
      }
219

220
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
1✔
221
      if (code) {
1!
222
        qError("tb:%s, process get table index rsp failed, error:%s", target, tstrerror(code));
×
223
        CTG_ERR_RET(code);
×
224
      }
225

226
      qDebug("tb:%s, got table index from mnode", target);
1!
227
      break;
1✔
228
    }
229
    case TDMT_MND_RETRIEVE_FUNC: {
97✔
230
      if (TSDB_CODE_SUCCESS != rspCode) {
97✔
231
        qError("func:%s, error rsp for get udf, error:%s", target, tstrerror(rspCode));
10!
232
        CTG_ERR_RET(rspCode);
10!
233
      }
234

235
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
87✔
236
      if (code) {
87!
237
        qError("func:%s, Process get udf rsp failed, error:%s", target, tstrerror(code));
×
238
        CTG_ERR_RET(code);
×
239
      }
240

241
      qDebug("func:%s, got udf from mnode", target);
87!
242
      break;
87✔
243
    }
244
    case TDMT_MND_GET_USER_AUTH: {
22,384✔
245
      if (TSDB_CODE_SUCCESS != rspCode) {
22,384✔
246
        qError("user:%s, error rsp for get user auth, error:%s", target, tstrerror(rspCode));
3!
247
        CTG_ERR_RET(rspCode);
3!
248
      }
249

250
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
22,381✔
251
      if (code) {
22,378!
252
        qError("user:%s, process get user auth rsp failed, error:%s", target, tstrerror(code));
×
253
        CTG_ERR_RET(code);
×
254
      }
255

256
      qDebug("user:%s, got user auth from mnode", target);
22,378✔
257
      break;
22,378✔
258
    }
259
    case TDMT_MND_TABLE_META: {
26,036✔
260
      if (TSDB_CODE_SUCCESS != rspCode) {
26,036✔
261
        if (CTG_TABLE_NOT_EXIST(rspCode)) {
2,538✔
262
          SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
2,498✔
263
          qDebug("tb:%s, stablemeta not exist in mnode", target);
2,498✔
264
          return TSDB_CODE_SUCCESS;
2,500✔
265
        }
266

267
        qError("tb:%s, error rsp for stablemeta from mnode, error:%s", target, tstrerror(rspCode));
40!
268
        CTG_ERR_RET(rspCode);
40!
269
      }
270

271
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
23,498✔
272
      if (code) {
23,498!
273
        qError("tb:%s, process mnode stablemeta rsp failed, error:%s", target, tstrerror(code));
×
274
        CTG_ERR_RET(code);
×
275
      }
276

277
      qDebug("tb:%s, got table meta from mnode", target);
23,498✔
278
      break;
23,498✔
279
    }
280
    case TDMT_VND_TABLE_META: {
154,162✔
281
      if (TSDB_CODE_SUCCESS != rspCode) {
154,162✔
282
        if (CTG_TABLE_NOT_EXIST(rspCode)) {
57,734!
283
          SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
57,734✔
284
          qDebug("tb:%s, tablemeta not exist in vnode", target);
57,734✔
285
          return TSDB_CODE_SUCCESS;
57,734✔
286
        }
287

288
        qError("tb:%s, error rsp for table meta from vnode, code:%s", target, tstrerror(rspCode));
×
289
        CTG_ERR_RET(rspCode);
×
290
      }
291

292
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
96,428✔
293
      if (code) {
96,426!
294
        qError("tb:%s, process vnode tablemeta rsp failed, code:%s", target, tstrerror(code));
×
295
        CTG_ERR_RET(code);
×
296
      }
297

298
      qDebug("tb:%s, got table meta from vnode", target);
96,426✔
299
      break;
96,425✔
300
    }
301
    case TDMT_VND_TABLE_NAME: {
1,979✔
302
      if (TSDB_CODE_SUCCESS != rspCode) {
1,979✔
303
        if (CTG_TABLE_NOT_EXIST(rspCode)) {
756!
304
          SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
756✔
305
          qDebug("tb:%s, tablemeta not exist in vnode", target);
756!
306
          return TSDB_CODE_SUCCESS;
756✔
307
        }
308

309
        qError("tb:%s, error rsp for table meta from vnode, code:%s", target, tstrerror(rspCode));
×
310
        CTG_ERR_RET(rspCode);
×
311
      }
312

313
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
1,223✔
314
      if (code) {
1,223!
315
        qError("tb:%s, process vnode tablemeta rsp failed, code:%s", target, tstrerror(code));
×
316
        CTG_ERR_RET(code);
×
317
      }
318

319
      qDebug("tb:%s, got table meta from vnode", target);
1,223!
320
      break;
1,223✔
321
    }
322
    case TDMT_VND_TABLE_CFG: {
189✔
323
      if (TSDB_CODE_SUCCESS != rspCode) {
189!
324
        qError("tb:%s, error rsp for table cfg from vnode, code:%s,", target, tstrerror(rspCode));
×
325
        CTG_ERR_RET(rspCode);
×
326
      }
327

328
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
189✔
329
      if (code) {
189!
330
        qError("tb:%s, process vnode tb cfg rsp failed, code:%s", target, tstrerror(code));
×
331
        CTG_ERR_RET(code);
×
332
      }
333

334
      qDebug("tb:%s, got table cfg from vnode", target);
189✔
335
      break;
189✔
336
    }
337
    case TDMT_MND_TABLE_CFG: {
171✔
338
      if (TSDB_CODE_SUCCESS != rspCode) {
171!
339
        qError("tb:%s, error rsp for stb cfg from mnode, error:%s", target, tstrerror(rspCode));
×
340
        CTG_ERR_RET(rspCode);
×
341
      }
342

343
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
171✔
344
      if (code) {
171!
345
        qError("tb:%s, Process mnode stb cfg rsp failed, error:%s", target, tstrerror(code));
×
346
        CTG_ERR_RET(code);
×
347
      }
348

349
      qDebug("tb:%s, got stb cfg from mnode", target);
171✔
350
      break;
171✔
351
    }
352
    case TDMT_MND_SERVER_VERSION: {
1✔
353
      if (TSDB_CODE_SUCCESS != rspCode) {
1!
354
        qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode));
×
355
        CTG_ERR_RET(rspCode);
×
356
      }
357

358
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
1✔
359
      if (code) {
1!
360
        qError("process svr ver rsp failed, error:%s", tstrerror(code));
×
361
        CTG_ERR_RET(code);
×
362
      }
363

364
      qDebug("got svr ver from mnode");
1!
365
      break;
1✔
366
    }
367
    case TDMT_MND_VIEW_META: {
45,969✔
368
      if (TSDB_CODE_SUCCESS != rspCode) {
45,969✔
369
        if (TSDB_CODE_MND_VIEW_NOT_EXIST == rspCode) {
45,653!
370
          qDebug("no success rsp for get view-meta, error:%s, viewFName:%s", tstrerror(rspCode), target);
45,653✔
371
        } else {
372
          qError("error rsp for get view-meta, error:%s, viewFName:%s", tstrerror(rspCode), target);
×
373
        }
374
        CTG_ERR_RET(rspCode);
45,652!
375
      }
376

377
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
316✔
378
      if (code) {
317!
379
        qError("view:%s, process get view-meta rsp failed, error:%s", target, tstrerror(code));
×
380
        CTG_ERR_RET(code);
×
381
      }
382

383
      qDebug("view:%s, got view-meta from mnode", target);
317✔
384
      break;
317✔
385
    }
386
    case TDMT_MND_GET_TSMA:
1,177✔
387
    case TDMT_MND_GET_TABLE_TSMA: {
388
      if (TSDB_CODE_SUCCESS != rspCode) {
1,177✔
389
        if (TSDB_CODE_MND_SMA_NOT_EXIST != rspCode) {
149!
390
          qError("tb:%s, error rsp for get table tsma, error:%s", target, tstrerror(rspCode));
×
391
        }
392
        CTG_ERR_RET(rspCode);
149!
393
      }
394

395
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
1,028✔
396
      if (code) {
1,028!
397
        qError("tb:%s, process get table tsma rsp failed, error:%s", target, tstrerror(code));
×
398
        CTG_ERR_RET(code);
×
399
      }
400

401
      qDebug("tb:%s, got table tsma from mnode", target);
1,028!
402
      break;
1,028✔
403
    }
404
    case TDMT_VND_GET_STREAM_PROGRESS: {
5,464✔
405
      if (TSDB_CODE_SUCCESS != rspCode) {
5,464!
406
        CTG_ERR_RET(rspCode);
×
407
      }
408
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
5,464✔
409
      if (code) {
5,464!
410
        qError("tb:%s, process get stream progress rsp failed, error:%s", target, tstrerror(code));
×
411
        CTG_ERR_RET(code);
×
412
      }
413
      break;
5,464✔
414
    }
415
    case TDMT_VND_VSUBTABLES_META: {
8✔
416
      if (TSDB_CODE_SUCCESS != rspCode) {
8!
417
        CTG_ERR_RET(rspCode);
×
418
      }
419
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
8✔
420
      if (code) {
8!
421
        qError("Process get vnode virtual subtables rsp failed, err: %s, tbFName: %s", tstrerror(code), target);
×
422
        CTG_ERR_RET(code);
×
423
      }
424
      break;
8✔
425
    }
426
    case TDMT_VND_VSTB_REF_DBS: {
9,940✔
427
      if (TSDB_CODE_SUCCESS != rspCode) {
9,940✔
428
        CTG_ERR_RET(rspCode);
1!
429
      }
430
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
9,939✔
431
      if (code) {
9,938!
432
        qError("Process get vnode virtual subtable ref dbs rsp failed, err: %s, tbFName: %s", tstrerror(code), target);
×
433
        CTG_ERR_RET(code);
×
434
      }
435
      break;
9,938✔
436
    }
437
    default:
628✔
438
      if (TSDB_CODE_SUCCESS != rspCode) {
628!
439
        qError("get error rsp, error:%s", tstrerror(rspCode));
630!
440
        CTG_ERR_RET(rspCode);
630!
441
      }
442

UNCOV
443
      qError("invalid req type %s", TMSG_INFO(reqType));
×
444
      return TSDB_CODE_APP_ERROR;
×
445
  }
446

447
  return TSDB_CODE_SUCCESS;
329,266✔
448
}
449

450
int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
206,652✔
451
  SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
206,652✔
452
  int32_t                code = 0;
206,652✔
453
  SCtgJob*               pJob = NULL;
206,652✔
454

455
  CTG_API_JENTER();
206,652!
456

457
  pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
206,651✔
458
  if (NULL == pJob) {
206,655!
459
    qDebug("catalog job refId 0x%" PRIx64 " already dropped", cbParam->refId);
×
460
    goto _return;
×
461
  }
462

463
  SCatalog* pCtg = pJob->pCtg;
206,655✔
464

465
  if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) {
206,655!
466
    CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode));
206,655!
467
  } else {
468
    int32_t* taskId = taosArrayGet(cbParam->taskId, 0);
×
469
    if (NULL == taskId) {
×
470
      ctgError("taosArrayGet %d taskId failed, total:%d", 0, (int32_t)taosArrayGetSize(cbParam->taskId));
×
471
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
472
    }
473

474
    SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
×
475
    if (NULL == pTask) {
×
476
      ctgError("taosArrayGet %d SCtgTask failed, total:%d", *taskId, (int32_t)taosArrayGetSize(pJob->pTasks));
×
477
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
478
    }
479

480
    qDebug("QID:0x%" PRIx64 ", catalog task:%d handle rsp:%s", pJob->queryId, pTask->taskId,
×
481
           TMSG_INFO(cbParam->reqType + 1));
482

483
#if CTG_BATCH_FETCH
484
    SHashObj* pBatchs =
485
        taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
486
    if (NULL == pBatchs) {
×
487
      ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
×
488
      CTG_ERR_JRET(terrno);
×
489
    }
490

491
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
×
492
    if (NULL == pMsgCtx) {
×
493
      ctgError("task:%d, get SCtgMsgCtx failed, taskType:%d", -1, pTask->type);
×
494
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
495
    }
496

497
    pMsgCtx->pBatchs = pBatchs;
×
498
#endif
499

500
    SCtgTaskReq tReq;
501
    tReq.pTask = pTask;
×
502
    tReq.msgIdx = -1;
×
503

504
    CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, cbParam->reqType, pMsg, rspCode));
×
505

506
#if CTG_BATCH_FETCH
507
    CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
×
508
#endif
509
  }
510

511
_return:
206,651✔
512

513
  taosMemoryFree(pMsg->pData);
206,651!
514
  taosMemoryFree(pMsg->pEpSet);
206,654!
515

516
  if (pJob) {
206,653!
517
    int32_t code2 = taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
206,654✔
518
    if (code2) {
206,654!
519
      qError("release catalog job refId:%" PRId64 " failed, error:%s", cbParam->refId, tstrerror(code2));
×
520
    }
521
  }
522

523
  CTG_API_LEAVE(code);
206,653!
524
}
525

526
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, SArray* pMsgIdx, int32_t msgType,
206,629✔
527
                           SMsgSendInfo** pMsgSendInfo) {
528
  int32_t       code = 0;
206,629✔
529
  SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
206,629!
530
  if (NULL == msgSendInfo) {
206,645!
531
    qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
×
532
    CTG_ERR_JRET(terrno);
×
533
  }
534

535
  SCtgTaskCallbackParam* param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
206,645!
536
  if (NULL == param) {
206,640!
537
    qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam));
×
538
    taosMemoryFree(msgSendInfo);
×
539
    CTG_ERR_JRET(terrno);
×
540
  }
541

542
  param->reqType = msgType;
206,640✔
543
  param->queryId = pJob->queryId;
206,640✔
544
  param->refId = pJob->refId;
206,640✔
545
  param->taskId = pTaskId;
206,640✔
546
  param->batchId = batchId;
206,640✔
547
  param->msgIdx = pMsgIdx;
206,640✔
548

549
  msgSendInfo->param = param;
206,640✔
550
  msgSendInfo->paramFreeFp = ctgFreeMsgSendParam;
206,640✔
551
  msgSendInfo->fp = ctgHandleMsgCallback;
206,640✔
552

553
  *pMsgSendInfo = msgSendInfo;
206,640✔
554

555
  return TSDB_CODE_SUCCESS;
206,640✔
556

557
_return:
×
558

559
  taosArrayDestroy(pTaskId);
×
560
  destroySendMsgInfo(msgSendInfo);
×
561

562
  CTG_RET(code);
×
563
}
564

565
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId,
206,631✔
566
                        SArray* pMsgIdx, char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) {
567
  int32_t       code = 0;
206,631✔
568
  SMsgSendInfo* pMsgSendInfo = NULL;
206,631✔
569
  CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, pMsgIdx, msgType, &pMsgSendInfo));
206,631!
570

571
  CTG_ERR_JRET(ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId));
206,640!
572

573
  pMsgSendInfo->requestId = pConn->requestId;
206,647✔
574
  pMsgSendInfo->requestObjRefId = pConn->requestObjRefId;
206,647✔
575
  pMsgSendInfo->msgInfo.pData = msg;
206,647✔
576
  pMsgSendInfo->msgInfo.len = msgSize;
206,647✔
577
  pMsgSendInfo->msgInfo.handle = NULL;
206,647✔
578
  pMsgSendInfo->msgType = msgType;
206,647✔
579

580
  code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, NULL, pMsgSendInfo);
206,647✔
581
  pMsgSendInfo = NULL;
206,654✔
582
  if (code) {
206,654!
583
    ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));
×
584
    CTG_ERR_JRET(code);
×
585
  }
586

587
  ctgDebug("QID:0x%" PRIx64 ", catalog req msg sent, type:%s", pJob->queryId, TMSG_INFO(msgType));
206,654!
588
  return TSDB_CODE_SUCCESS;
206,651✔
589

590
_return:
×
591

592
  if (pMsgSendInfo) {
×
593
    destroySendMsgInfo(pMsgSendInfo);
×
594
  }
595

596
  CTG_RET(code);
×
597
}
598

599
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTaskReq* tReq, int32_t msgType,
413,691✔
600
                    void* msg, uint32_t msgSize) {
601
  int32_t     code = 0;
413,691✔
602
  SCtgTask*   pTask = tReq->pTask;
413,691✔
603
  SCtgJob*    pJob = pTask->pJob;
413,691✔
604
  SCtgBatch   newBatch = {0};
413,691✔
605
  SBatchMsg   req = {0};
413,691✔
606
  SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
413,691✔
607
  if (NULL == pMsgCtx) {
413,689!
608
    ctgError("task:%d, get SCtgMsgCtx failed, taskType:%d", tReq->msgIdx, pTask->type);
×
609
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
610
  }
611

612
  SHashObj*  pBatchs = pMsgCtx->pBatchs;
413,689✔
613
  SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
413,689✔
614
  if (NULL == pBatch) {
413,688✔
615
    newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg));
206,637✔
616
    newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
206,648✔
617
    newBatch.pMsgIdxs = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
206,653✔
618
    if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds || NULL == newBatch.pMsgIdxs) {
206,651!
619
      taosArrayDestroy(newBatch.pMsgs);
×
620
      taosArrayDestroy(newBatch.pTaskIds);
×
621
      taosArrayDestroy(newBatch.pMsgIdxs);
×
622
      CTG_ERR_JRET(terrno);
×
623
    }
624

625
    newBatch.conn = *pConn;
206,651✔
626

627
    req.msgIdx = tReq->msgIdx;
206,651✔
628
    req.msgType = msgType;
206,651✔
629
    req.msgLen = msgSize;
206,651✔
630
    req.msg = msg;
206,651✔
631
    if (NULL == taosArrayPush(newBatch.pMsgs, &req)) {
413,298!
632
      CTG_ERR_JRET(terrno);
×
633
    }
634
    msg = NULL;
206,647✔
635
    if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) {
413,294!
636
      CTG_ERR_JRET(terrno);
×
637
    }
638
    if (NULL == taosArrayPush(newBatch.pMsgIdxs, &req.msgIdx)) {
413,289!
639
      CTG_ERR_JRET(terrno);
×
640
    }
641

642
    if (vgId > 0) {
206,652✔
643
      SName* pName = NULL;
130,972✔
644
      if (TDMT_VND_TABLE_CFG == msgType) {
130,972✔
645
        SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
188✔
646
        pName = ctx->pName;
188✔
647
      } else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType ||
130,784✔
648
                 TDMT_VND_VSUBTABLES_META == msgType || TDMT_VND_VSTB_REF_DBS == msgType) {
13,522✔
649
        if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
127,188✔
650
          SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
116,258✔
651
          SCtgFetch*      fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
116,258✔
652
          CTG_ERR_JRET(ctgGetFetchName(ctx->pNames, fetch, &pName));
116,257!
653
        } else if (CTG_TASK_GET_TB_TSMA == pTask->type) {
10,930!
654
          SCtgTbTSMACtx* pCtx = pTask->taskCtx;
×
655
          SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
×
656
          STablesReq*    pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
×
657
          pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
×
658
          if (NULL == pName) {
×
659
            ctgError("fail to get %d SName, totalTables:%d", pFetch->tbIdx, (int32_t)taosArrayGetSize(pTbReq->pTables));
×
660
            CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
661
          }
662
        } else if (CTG_TASK_GET_TB_NAME == pTask->type) {
10,930✔
663
          SCtgTbNamesCtx* ctx = (SCtgTbNamesCtx*)pTask->taskCtx;
659✔
664
          SCtgFetch*      fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
659✔
665
          CTG_ERR_JRET(ctgGetFetchName(ctx->pNames, fetch, &pName));
659!
666
        } else {
667
          SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
10,271✔
668
          pName = ctx->pName;
10,271✔
669
        }
670
      } else if (TDMT_VND_GET_STREAM_PROGRESS == msgType) {
3,596!
671
        SCtgTbTSMACtx* pCtx = pTask->taskCtx;
3,596✔
672
        SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
3,596✔
673
        if (NULL == pFetch) {
3,596!
674
          ctgError("fail to get %d SCtgTSMAFetch, totalFetchs:%d", tReq->msgIdx,
×
675
                   (int32_t)taosArrayGetSize(pCtx->pFetches));
676
          CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
677
        }
678
        STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
3,596✔
679
        if (NULL == pTbReq) {
3,596!
680
          ctgError("fail to get %d STablesReq, totalTables:%d", pFetch->dbIdx, (int32_t)taosArrayGetSize(pCtx->pNames));
×
681
          CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
682
        }
683
        pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
3,596✔
684
        if (NULL == pName) {
3,596!
685
          ctgError("fail to get %d SName, totalTables:%d", pFetch->tbIdx, (int32_t)taosArrayGetSize(pTbReq->pTables));
×
686
          CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
687
        }
688
      } else {
689
        ctgError("invalid vnode msgType %d", msgType);
×
690
        CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
×
691
      }
692

693
      (void)tNameGetFullDbName(pName, newBatch.dbFName);
130,965✔
694
    }
695

696
    newBatch.msgType = (vgId > 0) ? TDMT_VND_BATCH_META : TDMT_MND_BATCH_META;
206,644✔
697
    newBatch.batchId = atomic_add_fetch_32(&pJob->batchId, 1);
206,644✔
698

699
    if (0 != taosHashPut(pBatchs, &vgId, sizeof(vgId), &newBatch, sizeof(newBatch))) {
206,654!
700
      CTG_ERR_JRET(terrno);
×
701
    }
702

703
    qDebug("QID:0x%" PRIx64 ", job:0x%" PRIx64 ", catalog task:%d, %s req added to batch:%d, target vgId:%d",
206,655!
704
           pTask->pJob->queryId, pTask->pJob->refId, pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, vgId);
705

706
    return TSDB_CODE_SUCCESS;
206,651✔
707
  }
708

709
  req.msgIdx = tReq->msgIdx;
207,051✔
710
  req.msgType = msgType;
207,051✔
711
  req.msgLen = msgSize;
207,051✔
712
  req.msg = msg;
207,051✔
713
  if (NULL == taosArrayPush(pBatch->pMsgs, &req)) {
414,102!
714
    CTG_ERR_JRET(terrno);
×
715
  }
716
  msg = NULL;
207,051✔
717
  if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) {
414,102!
718
    CTG_ERR_JRET(terrno);
×
719
  }
720
  if (NULL == taosArrayPush(pBatch->pMsgIdxs, &req.msgIdx)) {
414,100!
721
    CTG_ERR_JRET(terrno);
×
722
  }
723

724
  if (vgId > 0) {
207,049✔
725
    SName* pName = NULL;
38,238✔
726
    if (TDMT_VND_TABLE_CFG == msgType) {
38,238!
727
      SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
×
728
      pName = ctx->pName;
×
729
    } else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType ||
38,238✔
730
               TDMT_VND_VSUBTABLES_META == msgType || TDMT_VND_VSTB_REF_DBS == msgType) {
1,882✔
731
      if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
36,370✔
732
        SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
178✔
733
        SCtgFetch*      fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
178✔
734
        CTG_ERR_JRET(ctgGetFetchName(ctx->pNames, fetch, &pName));
178!
735
      } else if (CTG_TASK_GET_TB_TSMA == pTask->type) {
36,192✔
736
        SCtgTbTSMACtx* pCtx = pTask->taskCtx;
405✔
737
        SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
405✔
738
        STablesReq*    pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
405✔
739
        pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
405✔
740
        if (NULL == pName) {
405!
741
          ctgError("fail to get %d SName, totalTables:%d", pFetch->tbIdx, (int32_t)taosArrayGetSize(pTbReq->pTables));
×
742
          CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
1!
743
        }
744
      } else if (CTG_TASK_GET_TB_NAME == pTask->type) {
35,787✔
745
        SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
1,320✔
746
        SCtgFetch*      fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
1,320✔
747
        CTG_ERR_JRET(ctgGetFetchName(ctx->pNames, fetch, &pName));
1,320!
748
      } else {
749
        SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
34,467✔
750
        pName = ctx->pName;
34,467✔
751
      }
752
    } else if (TDMT_VND_GET_STREAM_PROGRESS == msgType) {
1,868!
753
      SCtgTbTSMACtx* pCtx = pTask->taskCtx;
1,868✔
754
      SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
1,868✔
755
      if (NULL == pFetch) {
1,868!
756
        ctgError("fail to get %d SCtgTSMAFetch, totalFetchs:%d", tReq->msgIdx,
×
757
                 (int32_t)taosArrayGetSize(pCtx->pFetches));
758
        CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
759
      }
760
      STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
1,868✔
761
      if (NULL == pTbReq) {
1,868!
762
        ctgError("fail to get %d STablesReq, totalTables:%d", pFetch->dbIdx, (int32_t)taosArrayGetSize(pCtx->pNames));
×
763
        CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
764
      }
765
      pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
1,868✔
766
      if (NULL == pName) {
1,868!
767
        ctgError("fail to get %d SName, totalTables:%d", pFetch->tbIdx, (int32_t)taosArrayGetSize(pTbReq->pTables));
×
768
        CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
×
769
      }
770
    } else {
771
      ctgError("invalid vnode msgType %d", msgType);
×
772
      CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
×
773
    }
774

775
    (void)tNameGetFullDbName(pName, pBatch->dbFName);
38,239✔
776
  }
777

778
  qDebug("QID:0x%" PRIx64 ", job:0x%" PRIx64 ", catalog task:%d, %s req added to batch:%d, target vgId:%d",
207,050!
779
         pTask->pJob->queryId, pTask->pJob->refId, pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, vgId);
780

781
  return TSDB_CODE_SUCCESS;
207,051✔
782

783
_return:
×
784

785
  ctgFreeBatch(&newBatch);
×
786
  taosMemoryFree(msg);
×
787

788
  return code;
×
789
}
790

791
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg, int32_t* pSize) {
206,638✔
792
  int32_t num = taosArrayGetSize(pBatch->pMsgs);
206,638✔
793
  if (num >= CTG_MAX_REQ_IN_BATCH) {
206,641!
794
    qError("too many msgs %d in one batch request", num);
×
795
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
×
796
  }
797

798
  SBatchReq batchReq = {0};
206,641✔
799

800
  batchReq.header.vgId = vgId;
206,641✔
801
  batchReq.pMsgs = pBatch->pMsgs;
206,641✔
802

803
  int32_t msgSize = tSerializeSBatchReq(NULL, 0, &batchReq);
206,641✔
804
  if (msgSize < 0) {
206,635!
805
    qError("tSerializeSBatchReq failed");
×
806
    CTG_ERR_RET(msgSize);
×
807
  }
808

809
  *msg = taosMemoryCalloc(1, msgSize);
206,635!
810
  if (NULL == (*msg)) {
206,649!
811
    qError("calloc batchReq msg failed, size:%d", msgSize);
×
812
    CTG_ERR_RET(terrno);
×
813
  }
814
  msgSize = tSerializeSBatchReq(*msg, msgSize, &batchReq);
206,649✔
815
  if (msgSize < 0) {
206,642!
816
    qError("tSerializeSBatchReq failed");
×
817
    CTG_ERR_RET(msgSize);
×
818
  }
819

820
  *pSize = msgSize;
206,642✔
821

822
  qTrace("batch:%d, batch req to vgId:%d msg built with %d meta reqs", pBatch->batchId, vgId, num);
206,642✔
823

824
  return TSDB_CODE_SUCCESS;
206,638✔
825
}
826

827
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
1,497,518✔
828
  int32_t code = 0;
1,497,518✔
829
  void*   msg = NULL;
1,497,518✔
830
  void*   p = taosHashIterate(pBatchs, NULL);
1,497,518✔
831
  while (NULL != p) {
1,704,348✔
832
    size_t     len = 0;
206,644✔
833
    int32_t*   vgId = taosHashGetKey(p, &len);
206,644✔
834
    SCtgBatch* pBatch = (SCtgBatch*)p;
206,642✔
835
    int32_t    msgSize = 0;
206,642✔
836

837
    qDebug("QID:0x%" PRIx64 ", job:0x%" PRIx64 ", catalog start to launch batch:%d", pJob->queryId, pJob->refId,
206,642✔
838
           pBatch->batchId);
839

840
    CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg, &msgSize));
206,642!
841
    code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs,
206,639✔
842
                           pBatch->dbFName, *vgId, pBatch->msgType, msg, msgSize);
206,639✔
843
    pBatch->pTaskIds = NULL;
206,650✔
844
    CTG_ERR_JRET(code);
206,650!
845

846
    p = taosHashIterate(pBatchs, p);
206,650✔
847
  }
848

849
  return TSDB_CODE_SUCCESS;
1,497,704✔
850

851
_return:
×
852

853
  if (p) {
×
854
    taosHashCancelIterate(pBatchs, p);
×
855
  }
856
  taosMemoryFree(msg);
×
857

858
  CTG_RET(code);
×
859
}
860

861
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* out, SCtgTask* pTask) {
12,948✔
862
  char*   msg = NULL;
12,948✔
863
  int32_t msgLen = 0;
12,948✔
864
  int32_t reqType = TDMT_MND_QNODE_LIST;
12,948✔
865
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
12,948✔
866

867
  ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
12,948✔
868

869
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
12,948✔
870
  if (code) {
12,948!
871
    ctgError("Build qnode list msg failed, error:%s", tstrerror(code));
×
872
    CTG_ERR_RET(code);
×
873
  }
874

875
  if (pTask) {
12,948✔
876
    void* pOut = taosArrayInit(4, sizeof(SQueryNodeLoad));
12,947✔
877
    if (NULL == pOut) {
12,947!
878
      CTG_ERR_RET(terrno);
×
879
    }
880

881
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, NULL));
12,947!
882

883
#if CTG_BATCH_FETCH
884
    SCtgTaskReq tReq;
885
    tReq.pTask = pTask;
12,947✔
886
    tReq.msgIdx = -1;
12,947✔
887
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
12,947!
888
#else
889
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
890
    if (NULL == pTaskId) {
891
      CTG_ERR_RET(terrno);
892
    }
893
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
894
      taosArrayDestroy(pTaskId);
895
      CTG_ERR_RET(terrno);
896
    }
897

898
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
899
#endif
900
  }
901

902
  SRpcMsg rpcMsg = {
1✔
903
      .msgType = reqType,
904
      .pCont = msg,
905
      .contLen = msgLen,
906
  };
907

908
  SRpcMsg rpcRsp = {0};
1✔
909
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
1!
910

911
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
1!
912

913
  rpcFreeCont(rpcRsp.pCont);
1✔
914

915
  return TSDB_CODE_SUCCESS;
1✔
916
}
917

918
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** out, SCtgTask* pTask) {
368✔
919
  char*   msg = NULL;
368✔
920
  int32_t msgLen = 0;
368✔
921
  int32_t reqType = TDMT_MND_DNODE_LIST;
368✔
922
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
368✔
923

924
  ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
368!
925

926
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
368✔
927
  if (code) {
368!
928
    ctgError("Build dnode list msg failed, error:%s", tstrerror(code));
×
929
    CTG_ERR_RET(code);
×
930
  }
931

932
  if (pTask) {
368✔
933
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, NULL));
367!
934

935
#if CTG_BATCH_FETCH
936
    SCtgTaskReq tReq;
937
    tReq.pTask = pTask;
367✔
938
    tReq.msgIdx = -1;
367✔
939
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
367!
940
#else
941
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
942
    if (NULL == pTaskId) {
943
      CTG_ERR_RET(terrno);
944
    }
945
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
946
      taosArrayDestroy(pTaskId);
947
      CTG_ERR_RET(terrno);
948
    }
949

950
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
951
#endif
952
  }
953

954
  SRpcMsg rpcMsg = {
1✔
955
      .msgType = reqType,
956
      .pCont = msg,
957
      .contLen = msgLen,
958
  };
959

960
  SRpcMsg rpcRsp = {0};
1✔
961
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
1!
962

963
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
1!
964

965
  rpcFreeCont(rpcRsp.pCont);
1✔
966

967
  return TSDB_CODE_SUCCESS;
1✔
968
}
969

970
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out,
150,313✔
971
                                SCtgTaskReq* tReq) {
972
  char*     msg = NULL;
150,313✔
973
  int32_t   msgLen = 0;
150,313✔
974
  int32_t   reqType = TDMT_MND_USE_DB;
150,313✔
975
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
150,313✔
976
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
150,313✔
977

978
  ctgDebug("db:%s, try to get db vgInfo from mnode", input->db);
150,313✔
979

980
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](input, &msg, 0, &msgLen, mallocFp);
150,313✔
981
  if (code) {
150,313!
982
    ctgError("db:%s, build use db msg failed, code:%s", input->db, tstrerror(code));
×
983
    CTG_ERR_RET(code);
×
984
  }
985

986
  if (pTask) {
150,313✔
987
    void* pOut = taosMemoryCalloc(1, sizeof(SUseDbOutput));
129,812!
988
    if (NULL == pOut) {
129,812!
989
      CTG_ERR_RET(terrno);
×
990
    }
991

992
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, input->db));
129,812!
993

994
#if CTG_BATCH_FETCH
995
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
129,811!
996
#else
997
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
998
    if (NULL == pTaskId) {
999
      CTG_ERR_RET(terrno);
1000
    }
1001
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1002
      taosArrayDestroy(pTaskId);
1003
      CTG_ERR_RET(terrno);
1004
    }
1005

1006
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1007
#endif
1008
  }
1009

1010
  SRpcMsg rpcMsg = {
20,501✔
1011
      .msgType = reqType,
1012
      .pCont = msg,
1013
      .contLen = msgLen,
1014
  };
1015

1016
  SRpcMsg rpcRsp = {0};
20,501✔
1017
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
20,501!
1018

1019
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db));
20,501!
1020

1021
  rpcFreeCont(rpcRsp.pCont);
20,501✔
1022

1023
  return TSDB_CODE_SUCCESS;
20,501✔
1024
}
1025

1026
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* out,
6,542✔
1027
                             SCtgTask* pTask) {
1028
  char*   msg = NULL;
6,542✔
1029
  int32_t msgLen = 0;
6,542✔
1030
  int32_t reqType = TDMT_MND_GET_DB_CFG;
6,542✔
1031
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
6,542✔
1032

1033
  ctgDebug("db:%s, try to get db cfg from mnode", dbFName);
6,542✔
1034

1035
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)dbFName, &msg, 0, &msgLen, mallocFp);
6,542✔
1036
  if (code) {
6,542!
1037
    ctgError("db:%s, build get db cfg msg failed, code:%s", dbFName, tstrerror(code));
×
1038
    CTG_ERR_RET(code);
×
1039
  }
1040

1041
  if (pTask) {
6,542✔
1042
    void* pOut = taosMemoryCalloc(1, sizeof(SDbCfgInfo));
6,541!
1043
    if (NULL == pOut) {
6,541!
1044
      CTG_ERR_RET(terrno);
×
1045
    }
1046

1047
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)dbFName));
6,541!
1048

1049
#if CTG_BATCH_FETCH
1050
    SCtgTaskReq tReq;
1051
    tReq.pTask = pTask;
6,541✔
1052
    tReq.msgIdx = -1;
6,541✔
1053
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
6,541!
1054
#else
1055
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1056
    if (NULL == pTaskId) {
1057
      CTG_ERR_RET(terrno);
1058
    }
1059
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1060
      taosArrayDestroy(pTaskId);
1061
      CTG_ERR_RET(terrno);
1062
    }
1063

1064
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1065
#endif
1066
  }
1067

1068
  SRpcMsg rpcMsg = {
1✔
1069
      .msgType = TDMT_MND_GET_DB_CFG,
1070
      .pCont = msg,
1071
      .contLen = msgLen,
1072
  };
1073

1074
  SRpcMsg rpcRsp = {0};
1✔
1075
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
1!
1076

1077
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)dbFName));
1!
1078

1079
  rpcFreeCont(rpcRsp.pCont);
1✔
1080

1081
  return TSDB_CODE_SUCCESS;
1✔
1082
}
1083

1084
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* out,
1✔
1085
                                 SCtgTask* pTask) {
1086
  char*   msg = NULL;
1✔
1087
  int32_t msgLen = 0;
1✔
1088
  int32_t reqType = TDMT_MND_GET_INDEX;
1✔
1089
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
1!
1090

1091
  ctgDebug("index:%s, try to get index from mnode", indexName);
1!
1092

1093
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)indexName, &msg, 0, &msgLen, mallocFp);
1✔
1094
  if (code) {
1!
1095
    ctgError("index:%s, build get index msg failed, code:%s", indexName, tstrerror(code));
×
1096
    CTG_ERR_RET(code);
×
1097
  }
1098

1099
  if (pTask) {
1!
1100
    void* pOut = taosMemoryCalloc(1, sizeof(SIndexInfo));
×
1101
    if (NULL == pOut) {
×
1102
      CTG_ERR_RET(terrno);
×
1103
    }
1104

1105
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)indexName));
×
1106

1107
#if CTG_BATCH_FETCH
1108
    SCtgTaskReq tReq;
1109
    tReq.pTask = pTask;
×
1110
    tReq.msgIdx = -1;
×
1111
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
×
1112
#else
1113
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1114
    if (NULL == pTaskId) {
1115
      CTG_ERR_RET(terrno);
1116
    }
1117
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1118
      taosArrayDestroy(pTaskId);
1119
      CTG_ERR_RET(terrno);
1120
    }
1121

1122
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1123
#endif
1124
  }
1125

1126
  SRpcMsg rpcMsg = {
1✔
1127
      .msgType = reqType,
1128
      .pCont = msg,
1129
      .contLen = msgLen,
1130
  };
1131

1132
  SRpcMsg rpcRsp = {0};
1✔
1133
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
1!
1134

1135
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName));
1!
1136

1137
  rpcFreeCont(rpcRsp.pCont);
×
1138

1139
  return TSDB_CODE_SUCCESS;
×
1140
}
1141

1142
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableIndex* out,
1,379✔
1143
                               SCtgTask* pTask) {
1144
  char*   msg = NULL;
1,379✔
1145
  int32_t msgLen = 0;
1,379✔
1146
  int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
1,379✔
1147
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
1,379✔
1148
  char tbFName[TSDB_TABLE_FNAME_LEN];
1149

1150
  ctgDebug("tb:%s, try to get tb index from mnode", tbFName);
1,379!
1151

1152
  int32_t code = tNameExtractFullName(name, tbFName);
1,379✔
1153
  if (code) {
1,379!
1154
    ctgError("tb:%s, tNameExtractFullName failed, code:%s, type:%d, dbName:%s", name->tname, tstrerror(code),
×
1155
             name->type, name->dbname);
1156
    CTG_ERR_RET(code);
×
1157
  }
1158

1159
  code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
1,379✔
1160
  if (code) {
1,379!
1161
    ctgError("tb:%s, build get index msg failed, code:%s", tbFName, tstrerror(code));
×
1162
    CTG_ERR_RET(code);
×
1163
  }
1164

1165
  if (pTask) {
1,379✔
1166
    void* pOut = taosMemoryCalloc(1, sizeof(STableIndex));
1,378!
1167
    if (NULL == pOut) {
1,378!
1168
      CTG_ERR_RET(terrno);
×
1169
    }
1170

1171
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName));
1,378!
1172

1173
#if CTG_BATCH_FETCH
1174
    SCtgTaskReq tReq;
1175
    tReq.pTask = pTask;
1,378✔
1176
    tReq.msgIdx = -1;
1,378✔
1177
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
1,378!
1178
#else
1179
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1180
    if (NULL == pTaskId) {
1181
      CTG_ERR_RET(terrno);
1182
    }
1183
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1184
      taosArrayDestroy(pTaskId);
1185
      CTG_ERR_RET(terrno);
1186
    }
1187

1188
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1189
#endif
1190
  }
1191

1192
  SRpcMsg rpcMsg = {
1✔
1193
      .msgType = reqType,
1194
      .pCont = msg,
1195
      .contLen = msgLen,
1196
  };
1197

1198
  SRpcMsg rpcRsp = {0};
1✔
1199
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
1!
1200

1201
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
1!
1202

1203
  rpcFreeCont(rpcRsp.pCont);
1✔
1204

1205
  return TSDB_CODE_SUCCESS;
1✔
1206
}
1207

1208
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* out,
97✔
1209
                               SCtgTask* pTask) {
1210
  char*   msg = NULL;
97✔
1211
  int32_t msgLen = 0;
97✔
1212
  int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
97✔
1213
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
97✔
1214

1215
  ctgDebug("func:%s, try to get udf info from mnode", funcName);
97!
1216

1217
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)funcName, &msg, 0, &msgLen, mallocFp);
97✔
1218
  if (code) {
97!
1219
    ctgError("func:%s, build get udf msg failed, code:%s", funcName, tstrerror(code));
×
1220
    CTG_ERR_RET(code);
×
1221
  }
1222

1223
  if (pTask) {
97✔
1224
    void* pOut = taosMemoryCalloc(1, sizeof(SFuncInfo));
96!
1225
    if (NULL == pOut) {
96!
1226
      CTG_ERR_RET(terrno);
×
1227
    }
1228

1229
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)funcName));
96!
1230

1231
#if CTG_BATCH_FETCH
1232
    SCtgTaskReq tReq;
1233
    tReq.pTask = pTask;
96✔
1234
    tReq.msgIdx = -1;
96✔
1235
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
96!
1236
#else
1237
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1238
    if (NULL == pTaskId) {
1239
      CTG_ERR_RET(terrno);
1240
    }
1241
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1242
      taosArrayDestroy(pTaskId);
1243
      CTG_ERR_RET(terrno);
1244
    }
1245

1246
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1247
#endif
1248
  }
1249

1250
  SRpcMsg rpcMsg = {
1✔
1251
      .msgType = reqType,
1252
      .pCont = msg,
1253
      .contLen = msgLen,
1254
  };
1255

1256
  SRpcMsg rpcRsp = {0};
1✔
1257
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
1!
1258

1259
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)funcName));
1!
1260

1261
  rpcFreeCont(rpcRsp.pCont);
1✔
1262

1263
  return TSDB_CODE_SUCCESS;
1✔
1264
}
1265

1266
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out,
22,394✔
1267
                                  SCtgTask* pTask) {
1268
  char*   msg = NULL;
22,394✔
1269
  int32_t msgLen = 0;
22,394✔
1270
  int32_t reqType = TDMT_MND_GET_USER_AUTH;
22,394✔
1271
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
22,394✔
1272

1273
  ctgDebug("user:%s, try to get user auth from mnode", user);
22,394✔
1274

1275
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)user, &msg, 0, &msgLen, mallocFp);
22,394✔
1276
  if (code) {
22,394!
1277
    ctgError("user:%s, build get user auth msg failed, code:%s", user, tstrerror(code));
×
1278
    CTG_ERR_RET(code);
×
1279
  }
1280

1281
  if (pTask) {
22,394✔
1282
    void* pOut = taosMemoryCalloc(1, sizeof(SGetUserAuthRsp));
22,323!
1283
    if (NULL == pOut) {
22,324!
1284
      CTG_ERR_RET(terrno);
×
1285
    }
1286

1287
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)user));
22,324!
1288

1289
#if CTG_BATCH_FETCH
1290
    SCtgTaskReq tReq;
1291
    tReq.pTask = pTask;
22,324✔
1292
    tReq.msgIdx = -1;
22,324✔
1293
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
22,324!
1294
#else
1295
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1296
    if (NULL == pTaskId) {
1297
      CTG_ERR_RET(terrno);
1298
    }
1299
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1300
      taosArrayDestroy(pTaskId);
1301
      CTG_ERR_RET(terrno);
1302
    }
1303

1304
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1305
#endif
1306
  }
1307

1308
  SRpcMsg rpcMsg = {
71✔
1309
      .msgType = reqType,
1310
      .pCont = msg,
1311
      .contLen = msgLen,
1312
  };
1313

1314
  SRpcMsg rpcRsp = {0};
71✔
1315
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
71!
1316

1317
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user));
71!
1318

1319
  rpcFreeCont(rpcRsp.pCont);
68✔
1320

1321
  return TSDB_CODE_SUCCESS;
70✔
1322
}
1323

1324
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, const char* tbName,
26,053✔
1325
                                  STableMetaOutput* out, SCtgTaskReq* tReq) {
1326
  SCtgTask*        pTask = tReq ? tReq->pTask : NULL;
26,053✔
1327
  SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
26,053✔
1328
  char*            msg = NULL;
26,053✔
1329
  SEpSet*          pVnodeEpSet = NULL;
26,053✔
1330
  int32_t          msgLen = 0;
26,053✔
1331
  int32_t          reqType = TDMT_MND_TABLE_META;
26,053✔
1332
  char             tbFName[TSDB_TABLE_FNAME_LEN];
1333
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
26,053✔
1334
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
26,053✔
1335

1336
  ctgDebug("tb:%s, try to get table meta from mnode", tbFName);
26,053✔
1337

1338
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
26,053✔
1339
  if (code) {
26,050!
1340
    ctgError("tb:%s, build mnode stablemeta msg failed, code:%s", tbFName, tstrerror(code));
×
1341
    CTG_ERR_RET(code);
2!
1342
  }
1343

1344
  if (pTask) {
26,052✔
1345
    void* pOut = taosMemoryCalloc(1, sizeof(STableMetaOutput));
23,695!
1346
    if (NULL == pOut) {
23,694!
1347
      CTG_ERR_RET(terrno);
×
1348
    }
1349

1350
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
23,694!
1351

1352
#if CTG_BATCH_FETCH
1353
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
23,695!
1354
#else
1355
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1356
    if (NULL == pTaskId) {
1357
      CTG_ERR_RET(terrno);
1358
    }
1359
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1360
      taosArrayDestroy(pTaskId);
1361
      CTG_ERR_RET(terrno);
1362
    }
1363

1364
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1365
#endif
1366
  }
1367

1368
  SRpcMsg rpcMsg = {
2,357✔
1369
      .msgType = reqType,
1370
      .pCont = msg,
1371
      .contLen = msgLen,
1372
  };
1373

1374
  SRpcMsg rpcRsp = {0};
2,357✔
1375
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
2,357!
1376

1377
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
2,360!
1378

1379
  rpcFreeCont(rpcRsp.pCont);
2,361✔
1380

1381
  return TSDB_CODE_SUCCESS;
2,359✔
1382
}
1383

1384
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out,
17,487✔
1385
                              SCtgTaskReq* tReq) {
1386
  char dbFName[TSDB_DB_FNAME_LEN];
1387
  (void)tNameGetFullDbName(pTableName, dbFName);
17,487✔
1388

1389
  return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, tReq);
17,510✔
1390
}
1391

1392
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
156,681✔
1393
                              STableMetaOutput* out, SCtgTaskReq* tReq) {
1394
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
156,681✔
1395
  uint8_t   autoCreateCtb = tReq ? tReq->autoCreateCtb : 0;
156,681✔
1396
  char      dbFName[TSDB_DB_FNAME_LEN];
1397
  (void)tNameGetFullDbName(pTableName, dbFName);
156,681✔
1398
  int32_t reqType = (pTask && pTask->type == CTG_TASK_GET_TB_NAME ? TDMT_VND_TABLE_NAME : TDMT_VND_TABLE_META);
156,709✔
1399
  char    tbFName[TSDB_TABLE_FNAME_LEN];
1400
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, pTableName->tname);
156,709✔
1401
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
156,709✔
1402

1403
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
156,709✔
1404
  ctgDebug("tb:%s, try to get table meta from vnode, vgId:%d, ep num:%d, ep:%s:%u", tbFName, vgroupInfo->vgId,
156,709✔
1405
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port);
1406

1407
  SBuildTableInput bInput = {.vgId = vgroupInfo->vgId,
313,413✔
1408
                             .option = reqType == TDMT_VND_TABLE_NAME ? REQ_OPT_TBUID : REQ_OPT_TBNAME,
156,709✔
1409
                             .autoCreateCtb = autoCreateCtb,
1410
                             .dbFName = dbFName,
1411
                             .tbName = (char*)tNameGetTableName(pTableName)};
156,709✔
1412
  char*            msg = NULL;
156,704✔
1413
  int32_t          msgLen = 0;
156,704✔
1414

1415
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
156,704✔
1416
  if (code) {
156,704!
1417
    ctgError("tb:%s, build vnode tablemeta msg failed, code:%s", tbFName, tstrerror(code));
×
1418
    CTG_ERR_RET(code);
×
1419
  }
1420

1421
  if (pTask) {
156,698✔
1422
    void* pOut = taosMemoryCalloc(1, sizeof(STableMetaOutput));
153,609!
1423
    if (NULL == pOut) {
153,608!
1424
      CTG_ERR_RET(terrno);
×
1425
    }
1426

1427
    SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
153,608✔
1428
                              .requestId = pConn->requestId,
153,608✔
1429
                              .requestObjRefId = pConn->requestObjRefId,
153,608✔
1430
                              .mgmtEps = vgroupInfo->epSet};
1431

1432
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
153,608!
1433

1434
#if CTG_BATCH_FETCH
1435
    CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen));
153,607!
1436
#else
1437
    SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
1438
    char           dbFName[TSDB_DB_FNAME_LEN];
1439
    (void)tNameGetFullDbName(ctx->pName, dbFName);
1440
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1441
    if (NULL == pTaskId) {
1442
      CTG_ERR_RET(terrno);
1443
    }
1444
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1445
      taosArrayDestroy(pTaskId);
1446
      CTG_ERR_RET(terrno);
1447
    }
1448

1449
    CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->vgId, reqType, msg, msgLen));
1450
#endif
1451
  }
1452

1453
  SRpcMsg rpcMsg = {
3,089✔
1454
      .msgType = reqType,
1455
      .pCont = msg,
1456
      .contLen = msgLen,
1457
  };
1458

1459
  SRpcMsg rpcRsp = {0};
3,089✔
1460
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp));
3,089!
1461

1462
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
3,093!
1463

1464
  rpcFreeCont(rpcRsp.pCont);
3,093✔
1465

1466
  return TSDB_CODE_SUCCESS;
3,092✔
1467
}
1468

1469
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
189✔
1470
                                SVgroupInfo* vgroupInfo, STableCfg** out, SCtgTask* pTask) {
1471
  char*   msg = NULL;
189✔
1472
  int32_t msgLen = 0;
189✔
1473
  int32_t reqType = TDMT_VND_TABLE_CFG;
189✔
1474
  char    tbFName[TSDB_TABLE_FNAME_LEN];
1475
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
189✔
1476
  char dbFName[TSDB_DB_FNAME_LEN];
1477
  (void)tNameGetFullDbName(pTableName, dbFName);
189✔
1478
  SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
189✔
1479

1480
  int32_t code = tNameExtractFullName(pTableName, tbFName);
189✔
1481
  if (code) {
189!
1482
    ctgError("tb:%s, tNameExtractFullName failed, code:%s, type:%d, dbName:%s", pTableName->tname, tstrerror(code),
×
1483
             pTableName->type, pTableName->dbname);
1484
    CTG_ERR_RET(code);
×
1485
  }
1486

1487
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
189✔
1488
  ctgDebug("tb:%s, try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d", tbFName, vgroupInfo->vgId,
189✔
1489
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port);
1490

1491
  code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
189✔
1492
  if (code) {
189!
1493
    ctgError("tb:%s, build get tb cfg msg failed, code:%s", tbFName, tstrerror(code));
×
1494
    CTG_ERR_RET(code);
×
1495
  }
1496

1497
  if (pTask) {
189✔
1498
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, (char*)tbFName));
188!
1499

1500
    SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
188✔
1501
                              .requestId = pConn->requestId,
188✔
1502
                              .requestObjRefId = pConn->requestObjRefId,
188✔
1503
                              .mgmtEps = vgroupInfo->epSet};
1504
#if CTG_BATCH_FETCH
1505
    SCtgTaskReq tReq;
1506
    tReq.pTask = pTask;
188✔
1507
    tReq.msgIdx = -1;
188✔
1508
    CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, &tReq, reqType, msg, msgLen));
188!
1509
#else
1510
    SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
1511
    char          dbFName[TSDB_DB_FNAME_LEN];
1512
    (void)tNameGetFullDbName(ctx->pName, dbFName);
1513
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1514
    if (NULL == pTaskId) {
1515
      CTG_ERR_RET(terrno);
1516
    }
1517
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1518
      taosArrayDestroy(pTaskId);
1519
      CTG_ERR_RET(terrno);
1520
    }
1521

1522
    CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->pVgInfo->vgId, reqType, msg,
1523
                            msgLen));
1524
#endif
1525
  }
1526

1527
  SRpcMsg rpcMsg = {
1✔
1528
      .msgType = reqType,
1529
      .pCont = msg,
1530
      .contLen = msgLen,
1531
  };
1532

1533
  SRpcMsg rpcRsp = {0};
1✔
1534
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp));
1!
1535

1536
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
1!
1537

1538
  rpcFreeCont(rpcRsp.pCont);
1✔
1539

1540
  return TSDB_CODE_SUCCESS;
1✔
1541
}
1542

1543
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out,
171✔
1544
                                SCtgTask* pTask) {
1545
  char*   msg = NULL;
171✔
1546
  int32_t msgLen = 0;
171✔
1547
  int32_t reqType = TDMT_MND_TABLE_CFG;
171✔
1548
  char    tbFName[TSDB_TABLE_FNAME_LEN];
1549
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
171!
1550
  char dbFName[TSDB_DB_FNAME_LEN];
1551
  (void)tNameGetFullDbName(pTableName, dbFName);
171✔
1552
  SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
171✔
1553

1554
  int32_t code = tNameExtractFullName(pTableName, tbFName);
171✔
1555
  if (code) {
171!
1556
    ctgError("tb:%s, tNameExtractFullName failed, code:%s, type:%d, dbName:%s", pTableName->tname, tstrerror(code),
×
1557
             pTableName->type, pTableName->dbname);
1558
    CTG_ERR_RET(code);
×
1559
  }
1560

1561
  ctgDebug("tb:%s, try to get table cfg from mnode", tbFName);
171✔
1562

1563
  code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
171✔
1564
  if (code) {
171!
1565
    ctgError("tb:%s, build get tb cfg msg failed, code:%s", tbFName, tstrerror(code));
×
1566
    CTG_ERR_RET(code);
×
1567
  }
1568

1569
  if (pTask) {
171!
1570
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, (char*)tbFName));
171!
1571

1572
#if CTG_BATCH_FETCH
1573
    SCtgTaskReq tReq;
1574
    tReq.pTask = pTask;
171✔
1575
    tReq.msgIdx = -1;
171✔
1576
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
171!
1577
#else
1578
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1579
    if (NULL == pTaskId) {
1580
      CTG_ERR_RET(terrno);
1581
    }
1582
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1583
      taosArrayDestroy(pTaskId);
1584
      CTG_ERR_RET(terrno);
1585
    }
1586

1587
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1588
#endif
1589
  }
1590

1591
  SRpcMsg rpcMsg = {
×
1592
      .msgType = reqType,
1593
      .pCont = msg,
1594
      .contLen = msgLen,
1595
  };
1596

1597
  SRpcMsg rpcRsp = {0};
×
1598
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
×
1599

1600
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
×
1601

1602
  rpcFreeCont(rpcRsp.pCont);
×
1603

1604
  return TSDB_CODE_SUCCESS;
×
1605
}
1606

1607
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask) {
1✔
1608
  char*   msg = NULL;
1✔
1609
  int32_t msgLen = 0;
1✔
1610
  int32_t reqType = TDMT_MND_SERVER_VERSION;
1✔
1611
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
1!
1612

1613
  qDebug("try to get svr ver from mnode");
1!
1614

1615
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
1✔
1616
  if (code) {
1!
1617
    ctgError("build get svr ver msg failed, code:%s", tstrerror(code));
×
1618
    CTG_ERR_RET(code);
×
1619
  }
1620

1621
  if (pTask) {
1!
1622
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, NULL));
×
1623

1624
#if CTG_BATCH_FETCH
1625
    SCtgTaskReq tReq;
1626
    tReq.pTask = pTask;
×
1627
    tReq.msgIdx = -1;
×
1628
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
×
1629
#else
1630
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1631
    if (NULL == pTaskId) {
1632
      CTG_ERR_RET(terrno);
1633
    }
1634
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1635
      taosArrayDestroy(pTaskId);
1636
      CTG_ERR_RET(terrno);
1637
    }
1638

1639
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1640
#endif
1641
  }
1642

1643
  SRpcMsg rpcMsg = {
1✔
1644
      .msgType = reqType,
1645
      .pCont = msg,
1646
      .contLen = msgLen,
1647
  };
1648

1649
  SRpcMsg rpcRsp = {0};
1✔
1650
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
1!
1651

1652
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
1!
1653

1654
  rpcFreeCont(rpcRsp.pCont);
1✔
1655

1656
  return TSDB_CODE_SUCCESS;
1✔
1657
}
1658

1659
int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pName, SViewMetaOutput* out,
45,972✔
1660
                                SCtgTaskReq* tReq) {
1661
  char*     msg = NULL;
45,972✔
1662
  int32_t   msgLen = 0;
45,972✔
1663
  int32_t   reqType = TDMT_MND_VIEW_META;
45,972✔
1664
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
45,972!
1665
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
45,972!
1666
  char    fullName[TSDB_TABLE_FNAME_LEN];
1667
  int32_t code = tNameExtractFullName(pName, fullName);
45,972✔
1668
  if (code) {
45,980!
1669
    ctgError("view:%s, tNameExtractFullName failed, code:%s, type:%d, dbName:%s", pName->tname, tstrerror(code), pName->type,
×
1670
             pName->dbname);
1671
    CTG_ERR_RET(code);
×
1672
  }
1673

1674
  ctgDebug("view:%s, try to get view info from mnode", fullName);
45,980✔
1675

1676
  code = queryBuildMsg[TMSG_INDEX(reqType)](fullName, &msg, 0, &msgLen, mallocFp);
45,980✔
1677
  if (code) {
45,977!
1678
    ctgError("view:%s, build view-meta msg failed, code:%s", fullName, tstrerror(code));
×
1679
    CTG_ERR_RET(code);
×
1680
  }
1681

1682
  if (pTask) {
45,977!
1683
    void* pOut = taosMemoryCalloc(1, POINTER_BYTES);
45,977!
1684
    if (NULL == pOut) {
45,980!
1685
      CTG_ERR_RET(terrno);
×
1686
    }
1687

1688
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, fullName));
45,980!
1689

1690
#if CTG_BATCH_FETCH
1691
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
45,980!
1692
#else
1693
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1694
    if (NULL == pTaskId) {
1695
      CTG_ERR_RET(terrno);
1696
    }
1697
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1698
      taosArrayDestroy(pTaskId);
1699
      CTG_ERR_RET(terrno);
1700
    }
1701

1702
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1703
#endif
1704
  }
1705

1706
  SRpcMsg rpcMsg = {
×
1707
      .msgType = reqType,
1708
      .pCont = msg,
1709
      .contLen = msgLen,
1710
  };
1711

1712
  SRpcMsg rpcRsp = {0};
×
1713
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
×
1714

1715
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, fullName));
×
1716

1717
  rpcFreeCont(rpcRsp.pCont);
×
1718

1719
  return TSDB_CODE_SUCCESS;
×
1720
}
1721

1722
int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* name, STableTSMAInfoRsp* out,
1,177✔
1723
                              SCtgTaskReq* tReq, int32_t reqType) {
1724
  char*     msg = NULL;
1,177✔
1725
  int32_t   msgLen = 0;
1,177✔
1726
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
1,177!
1727
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
1,177!
1728
  char    tbFName[TSDB_TABLE_FNAME_LEN];
1729
  int32_t code = tNameExtractFullName(name, tbFName);
1,177✔
1730
  if (code) {
1,177!
1731
    ctgError("tb:%s, tNameExtractFullName failed, code:%s, type:%d, dbName:%s", name->tname, tstrerror(code),
×
1732
             name->type, name->dbname);
1733
    CTG_ERR_RET(code);
×
1734
  }
1735

1736
  ctgDebug("tb:%s, try to get tb index from mnode", tbFName);
1,177!
1737

1738
  code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
1,177✔
1739
  if (code) {
1,177!
1740
    ctgError("tb:%s, build get index msg failed, code:%s", tbFName, tstrerror(code));
×
1741
    CTG_ERR_RET(code);
×
1742
  }
1743

1744
  if (pTask) {
1,177!
1745
    void* pOut = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
1,177!
1746
    if (NULL == pOut) {
1,177!
1747
      CTG_ERR_RET(terrno);
×
1748
    }
1749

1750
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, (char*)tbFName));
1,177!
1751

1752
#if CTG_BATCH_FETCH
1753
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
1,177!
1754
#else
1755
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1756
    if (NULL == pTaskId) {
1757
      CTG_ERR_RET(terrno);
1758
    }
1759
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1760
      taosArrayDestroy(pTaskId);
1761
      CTG_ERR_RET(terrno);
1762
    }
1763

1764
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1765
#endif
1766
  }
1767

1768
  SRpcMsg rpcMsg = {
×
1769
      .msgType = reqType,
1770
      .pCont = msg,
1771
      .contLen = msgLen,
1772
  };
1773

1774
  SRpcMsg rpcRsp = {0};
×
1775
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp));
×
1776

1777
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
×
1778

1779
  rpcFreeCont(rpcRsp.pCont);
×
1780

1781
  return TSDB_CODE_SUCCESS;
×
1782
}
1783

1784
int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTbName,
5,464✔
1785
                                      SVgroupInfo* vgroupInfo, SStreamProgressRsp* out, SCtgTaskReq* tReq,
1786
                                      void* bInput) {
1787
  char*   msg = NULL;
5,464✔
1788
  int32_t msgLen = 0;
5,464✔
1789
  int32_t reqType = TDMT_VND_GET_STREAM_PROGRESS;
5,464✔
1790
  char    tbFName[TSDB_TABLE_FNAME_LEN];
1791
  int32_t code = tNameExtractFullName(pTbName, tbFName);
5,464✔
1792
  if (code) {
5,464!
1793
    ctgError("tb:%s, tNameExtractFullName failed, code:%s, type:%d, dbName:%s", pTbName->tname, tstrerror(code),
×
1794
             pTbName->type, pTbName->dbname);
1795
    CTG_ERR_RET(code);
×
1796
  }
1797

1798
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
5,464!
1799
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
5,464!
1800

1801
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
5,464✔
1802
  ctgDebug("tb:%s try to get stream progress from vnode, vgId:%d, ep num:%d, ep %s:%d", tbFName, vgroupInfo->vgId,
5,464!
1803
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port);
1804

1805
  code = queryBuildMsg[TMSG_INDEX(reqType)](bInput, &msg, 0, &msgLen, mallocFp);
5,464✔
1806
  if (code) {
5,464!
1807
    ctgError("tb:%s, build get stream progress failed, code:%s", tbFName, tstrerror(code));
×
1808
    CTG_ERR_RET(code);
×
1809
  }
1810

1811
  if (pTask) {
5,464!
1812
    SStreamProgressRsp* pOut = taosMemoryCalloc(1, sizeof(SStreamProgressRsp));
5,464!
1813
    if (!pOut) {
5,464!
1814
      CTG_ERR_RET(terrno);
×
1815
    }
1816
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, (char*)tbFName));
5,464!
1817

1818
    SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
5,464✔
1819
                              .requestId = pConn->requestId,
5,464✔
1820
                              .requestObjRefId = pConn->requestObjRefId,
5,464✔
1821
                              .mgmtEps = vgroupInfo->epSet};
1822
#if CTG_BATCH_FETCH
1823
    CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen));
5,464!
1824
#else
1825
    char dbFName[TSDB_DB_FNAME_LEN];
1826
    (void)tNameGetFullDbName(pTbName, dbFName);
1827
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
1828
    if (NULL == pTaskId) {
1829
      CTG_ERR_RET(terrno);
1830
    }
1831
    if (NULL == taosArrayPush(pTaskId, &pTask->taskId)) {
1832
      taosArrayDestroy(pTaskId);
1833
      CTG_ERR_RET(terrno);
1834
    }
1835

1836
    CTG_RET(
1837
        ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, vgroupInfo->vgId, reqType, msg, msgLen));
1838
#endif
1839
  }
1840

1841
  SRpcMsg rpcMsg = {
×
1842
      .msgType = reqType,
1843
      .pCont = msg,
1844
      .contLen = msgLen,
1845
  };
1846

1847
  SRpcMsg rpcRsp = {0};
×
1848
  CTG_ERR_RET(rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp));
×
1849

1850
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
×
1851

1852
  rpcFreeCont(rpcRsp.pCont);
×
1853

1854
  return TSDB_CODE_SUCCESS;
×
1855
}
1856

1857
int32_t ctgGetVSubTablesFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, int64_t suid, SVgroupInfo* vgroupInfo, SCtgTaskReq* tReq) {
8✔
1858
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
8!
1859
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
8!
1860
  int32_t reqType = TDMT_VND_VSUBTABLES_META;
8✔
1861
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
8✔
1862
  ctgDebug("try to get vsubtables meta from vnode, vgId:%d, ep num:%d, ep %s:%d, suid:%" PRIu64, vgroupInfo->vgId,
8!
1863
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, suid);
1864

1865
  char*            msg = NULL;
8✔
1866
  int32_t          msgLen = 0;
8✔
1867

1868
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&suid, &msg, 0, &msgLen, mallocFp);
8✔
1869
  if (code) {
8!
1870
    ctgError("Build vnode vsubtables meta msg failed, code:%x, suid:%" PRIu64, code, suid);
×
1871
    CTG_ERR_RET(code);
×
1872
  }
1873

1874
  SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
8✔
1875
                            .requestId = pConn->requestId,
8✔
1876
                            .requestObjRefId = pConn->requestObjRefId,
8✔
1877
                            .mgmtEps = vgroupInfo->epSet};
1878

1879
  return ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen);
8✔
1880
}
1881

1882
int32_t ctgGetVStbRefDbsFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, int64_t suid, SVgroupInfo* vgroupInfo, SCtgTaskReq* tReq) {
9,940✔
1883
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
9,940!
1884
  void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
9,940!
1885
  int32_t reqType = TDMT_VND_VSTB_REF_DBS;
9,940✔
1886
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
9,940✔
1887
  ctgDebug("try to get vstb's ref dbs from vnode, vgId:%d, ep num:%d, ep %s:%d, suid:%" PRIu64, vgroupInfo->vgId,
9,940!
1888
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, suid);
1889

1890
  char*            msg = NULL;
9,940✔
1891
  int32_t          msgLen = 0;
9,940✔
1892

1893
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&suid, &msg, 0, &msgLen, mallocFp);
9,940✔
1894
  if (code) {
9,940!
1895
    ctgError("Build vnode vsubtables meta msg failed, code:%x, suid:%" PRIu64, code, suid);
×
1896
    CTG_ERR_RET(code);
×
1897
  }
1898

1899
  SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
9,940✔
1900
                            .requestId = pConn->requestId,
9,940✔
1901
                            .requestObjRefId = pConn->requestObjRefId,
9,940✔
1902
                            .mgmtEps = vgroupInfo->epSet};
1903

1904
  return ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen);
9,940✔
1905
}
1906

1907

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc