• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3627

02 Mar 2025 11:16PM UTC coverage: 63.596% (-0.2%) from 63.764%
#3627

push

travis-ci

GitHub
Merge pull request #29973 from taosdata/doc/internal

148665 of 299855 branches covered (49.58%)

Branch coverage included in aggregate %.

233076 of 300407 relevant lines covered (77.59%)

17543856.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.21
/source/dnode/vnode/src/vnd/vnodeQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "vnd.h"
18

19
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags)                                                    \
20
  do {                                                                                                        \
21
    int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal));                                        \
22
    if (newVal < 0) {                                                                                         \
23
      vWarn("vgId:%d, %s, abnormal val:%" PRIi64 ", old val:%" PRIi64, TD_VID(pVnode), tags, newVal, (oVal)); \
24
    }                                                                                                         \
25
  } while (0)
26

27
int vnodeQueryOpen(SVnode *pVnode) {
11,677✔
28
  return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb);
11,677✔
29
}
30

31
void vnodeQueryPreClose(SVnode *pVnode) { qWorkerStopAllTasks((void *)pVnode->pQuery); }
11,667✔
32

33
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
11,646✔
34

35
int32_t fillTableColCmpr(SMetaReader *reader, SSchemaExt *pExt, int32_t numOfCol) {
1,235,455✔
36
  int8_t tblType = reader->me.type;
1,235,455✔
37
  if (useCompress(tblType)) {
1,235,455!
38
    SColCmprWrapper *p = &(reader->me.colCmpr);
1,235,537✔
39
    if (numOfCol != p->nCols) {
1,235,537!
40
      vError("fillTableColCmpr table type:%d, col num:%d, col cmpr num:%d mismatch", tblType, numOfCol, p->nCols);
×
41
      return TSDB_CODE_APP_ERROR;
×
42
    }
43
    for (int i = 0; i < p->nCols; i++) {
31,334,501✔
44
      SColCmpr *pCmpr = &p->pColCmpr[i];
30,098,964✔
45
      pExt[i].colId = pCmpr->id;
30,098,964✔
46
      pExt[i].compress = pCmpr->alg;
30,098,964✔
47
    }
48
  }
49
  return 0;
1,235,516✔
50
}
51

52
void vnodePrintTableMeta(STableMetaRsp *pMeta) {
1,235,668✔
53
  if (!(qDebugFlag & DEBUG_DEBUG)) {
1,235,668✔
54
    return;
1,231,039✔
55
  }
56

57
  qDebug("tbName:%s", pMeta->tbName);
4,629!
58
  qDebug("stbName:%s", pMeta->stbName);
4,629!
59
  qDebug("dbFName:%s", pMeta->dbFName);
4,629!
60
  qDebug("dbId:%" PRId64, pMeta->dbId);
4,629!
61
  qDebug("numOfTags:%d", pMeta->numOfTags);
4,629!
62
  qDebug("numOfColumns:%d", pMeta->numOfColumns);
4,629!
63
  qDebug("precision:%d", pMeta->precision);
4,629!
64
  qDebug("tableType:%d", pMeta->tableType);
4,629!
65
  qDebug("sversion:%d", pMeta->sversion);
4,629!
66
  qDebug("tversion:%d", pMeta->tversion);
4,629!
67
  qDebug("suid:%" PRIu64, pMeta->suid);
4,629!
68
  qDebug("tuid:%" PRIu64, pMeta->tuid);
4,629!
69
  qDebug("vgId:%d", pMeta->vgId);
4,629!
70
  qDebug("sysInfo:%d", pMeta->sysInfo);
4,629!
71
  if (pMeta->pSchemas) {
4,629!
72
    for (int32_t i = 0; i < (pMeta->numOfColumns + pMeta->numOfTags); ++i) {
201,481✔
73
      SSchema *pSchema = pMeta->pSchemas + i;
196,818✔
74
      qDebug("%d col/tag: type:%d, flags:%d, colId:%d, bytes:%d, name:%s", i, pSchema->type, pSchema->flags,
196,818!
75
             pSchema->colId, pSchema->bytes, pSchema->name);
76
    }
77
  }
78
}
79

80
int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
1,244,315✔
81
  STableInfoReq  infoReq = {0};
1,244,315✔
82
  STableMetaRsp  metaRsp = {0};
1,244,315✔
83
  SMetaReader    mer1 = {0};
1,244,315✔
84
  SMetaReader    mer2 = {0};
1,244,315✔
85
  char           tableFName[TSDB_TABLE_FNAME_LEN];
86
  bool           reqTbUid = false;
1,244,315✔
87
  SRpcMsg        rpcMsg = {0};
1,244,315✔
88
  int32_t        code = 0;
1,244,315✔
89
  int32_t        rspLen = 0;
1,244,315✔
90
  void          *pRsp = NULL;
1,244,315✔
91
  SSchemaWrapper schema = {0};
1,244,315✔
92
  SSchemaWrapper schemaTag = {0};
1,244,315✔
93

94
  // decode req
95
  if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
1,244,315!
96
    code = terrno;
×
97
    goto _exit4;
×
98
  }
99

100
  if (infoReq.option == REQ_OPT_TBUID) reqTbUid = true;
1,245,637✔
101
  metaRsp.dbId = pVnode->config.dbId;
1,245,637✔
102
  tstrncpy(metaRsp.tbName, infoReq.tbName, TSDB_TABLE_NAME_LEN);
1,245,637✔
103
  (void)memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName));
1,245,637✔
104

105
  if (!reqTbUid) {
1,245,637✔
106
    (void)tsnprintf(tableFName, TSDB_TABLE_FNAME_LEN, "%s.%s", infoReq.dbFName, infoReq.tbName);
1,244,100✔
107
    code = vnodeValidateTableHash(pVnode, tableFName);
1,245,076✔
108
    if (code) {
1,244,964!
109
      goto _exit4;
×
110
    }
111
  }
112

113
  // query meta
114
  metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
1,246,501✔
115
  if (reqTbUid) {
1,245,510✔
116
    errno = 0;
534✔
117
    uint64_t tbUid = taosStr2UInt64(infoReq.tbName, NULL, 10);
534✔
118
    if (errno == ERANGE || tbUid == 0) {
533!
119
      code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
120
      goto _exit3;
130✔
121
    }
122
    SMetaReader mr3 = {0};
534✔
123
    metaReaderDoInit(&mr3, ((SVnode *)pVnode)->pMeta, META_READER_NOLOCK);
534✔
124
    if ((code = metaReaderGetTableEntryByUid(&mr3, tbUid)) < 0) {
534✔
125
      metaReaderClear(&mr3);
131✔
126
      TAOS_CHECK_GOTO(code, NULL, _exit3);
131!
127
    }
128
    tstrncpy(metaRsp.tbName, mr3.me.name, TSDB_TABLE_NAME_LEN);
401✔
129
    metaReaderClear(&mr3);
401✔
130
    TAOS_CHECK_GOTO(metaGetTableEntryByName(&mer1, metaRsp.tbName), NULL, _exit3);
402!
131
  } else if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) {
1,244,976✔
132
    code = terrno;
9,543✔
133
    goto _exit3;
9,693✔
134
  }
135

136
  metaRsp.tableType = mer1.me.type;
1,236,070✔
137
  metaRsp.vgId = TD_VID(pVnode);
1,236,070✔
138
  metaRsp.tuid = mer1.me.uid;
1,236,070✔
139

140
  if (mer1.me.type == TSDB_SUPER_TABLE) {
1,236,070✔
141
    tstrncpy(metaRsp.stbName, mer1.me.name, TSDB_TABLE_NAME_LEN);
650,646✔
142
    schema = mer1.me.stbEntry.schemaRow;
650,646✔
143
    schemaTag = mer1.me.stbEntry.schemaTag;
650,646✔
144
    metaRsp.suid = mer1.me.uid;
650,646✔
145
  } else if (mer1.me.type == TSDB_CHILD_TABLE) {
585,424✔
146
    metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
562,408✔
147
    if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit2;
562,405!
148

149
    tstrncpy(metaRsp.stbName, mer2.me.name, TSDB_TABLE_NAME_LEN);
562,410✔
150
    metaRsp.suid = mer2.me.uid;
562,410✔
151
    schema = mer2.me.stbEntry.schemaRow;
562,410✔
152
    schemaTag = mer2.me.stbEntry.schemaTag;
562,410✔
153
  } else if (mer1.me.type == TSDB_NORMAL_TABLE) {
23,016!
154
    schema = mer1.me.ntbEntry.schemaRow;
23,016✔
155
  } else {
156
    vError("vnodeGetTableMeta get invalid table type:%d", mer1.me.type);
×
157
    goto _exit3;
×
158
  }
159

160
  metaRsp.numOfTags = schemaTag.nCols;
1,236,072✔
161
  metaRsp.numOfColumns = schema.nCols;
1,236,072✔
162
  metaRsp.precision = pVnode->config.tsdbCfg.precision;
1,236,072✔
163
  metaRsp.sversion = schema.version;
1,236,072✔
164
  metaRsp.tversion = schemaTag.version;
1,236,072✔
165
  metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags));
1,236,072!
166
  metaRsp.pSchemaExt = (SSchemaExt *)taosMemoryCalloc(metaRsp.numOfColumns, sizeof(SSchemaExt));
1,235,913!
167
  if (NULL == metaRsp.pSchemas || NULL == metaRsp.pSchemaExt) {
1,235,570!
168
    code = terrno;
×
169
    goto _exit;
×
170
  }
171
  (void)memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
1,235,624✔
172
  if (schemaTag.nCols) {
1,235,624✔
173
    (void)memcpy(metaRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols);
1,212,270✔
174
  }
175
  if (metaRsp.pSchemaExt) {
1,235,624✔
176
    SMetaReader *pReader = mer1.me.type == TSDB_CHILD_TABLE ? &mer2 : &mer1;
1,235,298✔
177
    code = fillTableColCmpr(pReader, metaRsp.pSchemaExt, metaRsp.numOfColumns);
1,235,298✔
178
    if (code < 0) {
1,235,684!
179
      goto _exit;
×
180
    }
181
  } else {
182
    code = TSDB_CODE_OUT_OF_MEMORY;
326✔
183
    goto _exit;
326✔
184
  }
185

186
  vnodePrintTableMeta(&metaRsp);
1,235,684✔
187

188
  // encode and send response
189
  rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
1,235,703✔
190
  if (rspLen < 0) {
1,235,560!
191
    code = terrno;
×
192
    goto _exit;
×
193
  }
194

195
  if (direct) {
1,235,560✔
196
    pRsp = rpcMallocCont(rspLen);
230✔
197
  } else {
198
    pRsp = taosMemoryCalloc(1, rspLen);
1,235,330!
199
  }
200

201
  if (pRsp == NULL) {
1,235,105!
202
    code = terrno;
×
203
    goto _exit;
×
204
  }
205

206
  rspLen = tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
1,235,105✔
207
  if (rspLen < 0) {
1,235,856!
208
    code = terrno;
×
209
    goto _exit;
×
210
  }
211

212
_exit:
1,235,856✔
213
  taosMemoryFree(metaRsp.pSchemas);
1,236,182!
214
  taosMemoryFree(metaRsp.pSchemaExt);
1,235,831!
215
_exit2:
1,235,823✔
216
  metaReaderClear(&mer2);
1,235,823✔
217
_exit3:
1,245,453✔
218
  metaReaderClear(&mer1);
1,245,453✔
219
_exit4:
1,245,635✔
220
  rpcMsg.info = pMsg->info;
1,245,635✔
221
  rpcMsg.pCont = pRsp;
1,245,635✔
222
  rpcMsg.contLen = rspLen;
1,245,635✔
223
  rpcMsg.code = code;
1,245,635✔
224
  rpcMsg.msgType = pMsg->msgType;
1,245,635✔
225

226
  if (code) {
1,245,635✔
227
    qError("get table %s meta with %" PRIu8 " failed cause of %s", infoReq.tbName, infoReq.option, tstrerror(code));
9,825!
228
  }
229

230
  if (direct) {
1,245,362✔
231
    tmsgSendRsp(&rpcMsg);
1,786✔
232
  } else {
233
    *pMsg = rpcMsg;
1,243,576✔
234
  }
235

236
  return code;
1,245,258✔
237
}
238

239
int32_t vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
86✔
240
  STableCfgReq   cfgReq = {0};
86✔
241
  STableCfgRsp   cfgRsp = {0};
86✔
242
  SMetaReader    mer1 = {0};
86✔
243
  SMetaReader    mer2 = {0};
86✔
244
  char           tableFName[TSDB_TABLE_FNAME_LEN];
245
  SRpcMsg        rpcMsg = {0};
86✔
246
  int32_t        code = 0;
86✔
247
  int32_t        rspLen = 0;
86✔
248
  void          *pRsp = NULL;
86✔
249
  SSchemaWrapper schema = {0};
86✔
250
  SSchemaWrapper schemaTag = {0};
86✔
251

252
  // decode req
253
  if (tDeserializeSTableCfgReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
86!
254
    code = terrno;
×
255
    goto _exit;
×
256
  }
257

258
  tstrncpy(cfgRsp.tbName, cfgReq.tbName, TSDB_TABLE_NAME_LEN);
86✔
259
  (void)memcpy(cfgRsp.dbFName, cfgReq.dbFName, sizeof(cfgRsp.dbFName));
86✔
260

261
  (void)tsnprintf(tableFName, TSDB_TABLE_FNAME_LEN, "%s.%s", cfgReq.dbFName, cfgReq.tbName);
86✔
262
  code = vnodeValidateTableHash(pVnode, tableFName);
86✔
263
  if (code) {
86!
264
    goto _exit;
×
265
  }
266

267
  // query meta
268
  metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
86✔
269

270
  if (metaGetTableEntryByName(&mer1, cfgReq.tbName) < 0) {
86!
271
    code = terrno;
×
272
    goto _exit;
×
273
  }
274

275
  cfgRsp.tableType = mer1.me.type;
86✔
276

277
  if (mer1.me.type == TSDB_SUPER_TABLE) {
86!
278
    code = TSDB_CODE_VND_HASH_MISMATCH;
×
279
    goto _exit;
×
280
  } else if (mer1.me.type == TSDB_CHILD_TABLE) {
86✔
281
    metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
69✔
282
    if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
69!
283

284
    tstrncpy(cfgRsp.stbName, mer2.me.name, TSDB_TABLE_NAME_LEN);
69✔
285
    schema = mer2.me.stbEntry.schemaRow;
69✔
286
    schemaTag = mer2.me.stbEntry.schemaTag;
69✔
287
    cfgRsp.ttl = mer1.me.ctbEntry.ttlDays;
69✔
288
    cfgRsp.commentLen = mer1.me.ctbEntry.commentLen;
69✔
289
    if (mer1.me.ctbEntry.commentLen > 0) {
69!
290
      cfgRsp.pComment = taosStrdup(mer1.me.ctbEntry.comment);
×
291
      if (NULL == cfgRsp.pComment) {
×
292
        code = terrno;
×
293
        goto _exit;
×
294
      }
295
    }
296
    STag *pTag = (STag *)mer1.me.ctbEntry.pTags;
69✔
297
    cfgRsp.tagsLen = pTag->len;
69✔
298
    cfgRsp.pTags = taosMemoryMalloc(cfgRsp.tagsLen);
69!
299
    if (NULL == cfgRsp.pTags) {
69!
300
      code = terrno;
×
301
      goto _exit;
×
302
    }
303
    (void)memcpy(cfgRsp.pTags, pTag, cfgRsp.tagsLen);
69✔
304
  } else if (mer1.me.type == TSDB_NORMAL_TABLE) {
17!
305
    schema = mer1.me.ntbEntry.schemaRow;
17✔
306
    cfgRsp.ttl = mer1.me.ntbEntry.ttlDays;
17✔
307
    cfgRsp.commentLen = mer1.me.ntbEntry.commentLen;
17✔
308
    if (mer1.me.ntbEntry.commentLen > 0) {
17!
309
      cfgRsp.pComment = taosStrdup(mer1.me.ntbEntry.comment);
×
310
      if (NULL == cfgRsp.pComment) {
×
311
        code = terrno;
×
312
        goto _exit;
×
313
      }
314
    }
315
  } else {
316
    vError("vnodeGetTableCfg get invalid table type:%d", mer1.me.type);
×
317
    code = TSDB_CODE_APP_ERROR;
×
318
    goto _exit;
×
319
  }
320

321
  cfgRsp.numOfTags = schemaTag.nCols;
86✔
322
  cfgRsp.numOfColumns = schema.nCols;
86✔
323
  cfgRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (cfgRsp.numOfColumns + cfgRsp.numOfTags));
86!
324
  cfgRsp.pSchemaExt = (SSchemaExt *)taosMemoryMalloc(cfgRsp.numOfColumns * sizeof(SSchemaExt));
86!
325

326
  if (NULL == cfgRsp.pSchemas || NULL == cfgRsp.pSchemaExt) {
86!
327
    code = terrno;
×
328
    goto _exit;
×
329
  }
330
  (void)memcpy(cfgRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
86✔
331
  if (schemaTag.nCols) {
86✔
332
    (void)memcpy(cfgRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols);
69✔
333
  }
334

335
  // if (useCompress(cfgRsp.tableType)) {
336

337
  SMetaReader     *pReader = mer1.me.type == TSDB_CHILD_TABLE ? &mer2 : &mer1;
86✔
338
  SColCmprWrapper *pColCmpr = &pReader->me.colCmpr;
86✔
339

340
  for (int32_t i = 0; i < cfgRsp.numOfColumns; i++) {
743✔
341
    SColCmpr   *pCmpr = &pColCmpr->pColCmpr[i];
657✔
342
    SSchemaExt *pSchExt = cfgRsp.pSchemaExt + i;
657✔
343
    pSchExt->colId = pCmpr->id;
657✔
344
    pSchExt->compress = pCmpr->alg;
657✔
345
  }
346
  //}
347

348
  // encode and send response
349
  rspLen = tSerializeSTableCfgRsp(NULL, 0, &cfgRsp);
86✔
350
  if (rspLen < 0) {
86!
351
    code = terrno;
×
352
    goto _exit;
×
353
  }
354

355
  if (direct) {
86!
356
    pRsp = rpcMallocCont(rspLen);
×
357
  } else {
358
    pRsp = taosMemoryCalloc(1, rspLen);
86!
359
  }
360

361
  if (pRsp == NULL) {
86!
362
    code = terrno;
×
363
    goto _exit;
×
364
  }
365

366
  rspLen = tSerializeSTableCfgRsp(pRsp, rspLen, &cfgRsp);
86✔
367
  if (rspLen < 0) {
86!
368
    code = terrno;
×
369
    goto _exit;
×
370
  }
371

372
_exit:
86✔
373
  rpcMsg.info = pMsg->info;
86✔
374
  rpcMsg.pCont = pRsp;
86✔
375
  rpcMsg.contLen = rspLen;
86✔
376
  rpcMsg.code = code;
86✔
377
  rpcMsg.msgType = pMsg->msgType;
86✔
378

379
  if (code) {
86!
380
    qError("get table %s cfg failed cause of %s", cfgReq.tbName, tstrerror(code));
×
381
  }
382

383
  if (direct) {
86!
384
    tmsgSendRsp(&rpcMsg);
×
385
  } else {
386
    *pMsg = rpcMsg;
86✔
387
  }
388

389
  tFreeSTableCfgRsp(&cfgRsp);
86✔
390
  metaReaderClear(&mer2);
86✔
391
  metaReaderClear(&mer1);
86✔
392
  return code;
86✔
393
}
394

395
static FORCE_INLINE void vnodeFreeSBatchRspMsg(void *p) {
396
  if (NULL == p) {
397
    return;
398
  }
399

400
  SBatchRspMsg *pRsp = (SBatchRspMsg *)p;
401
  rpcFreeCont(pRsp->msg);
402
}
403

404
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
1,075,518✔
405
  int32_t      code = 0;
1,075,518✔
406
  int32_t      rspSize = 0;
1,075,518✔
407
  SBatchReq    batchReq = {0};
1,075,518✔
408
  SBatchMsg   *req = NULL;
1,075,518✔
409
  SBatchRspMsg rsp = {0};
1,075,518✔
410
  SBatchRsp    batchRsp = {0};
1,075,518✔
411
  SRpcMsg      reqMsg = *pMsg;
1,075,518✔
412
  SRpcMsg      rspMsg = {0};
1,075,518✔
413
  void        *pRsp = NULL;
1,075,518✔
414

415
  if (tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) {
1,075,518!
416
    code = terrno;
×
417
    qError("tDeserializeSBatchReq failed");
×
418
    goto _exit;
×
419
  }
420

421
  int32_t msgNum = taosArrayGetSize(batchReq.pMsgs);
1,076,092✔
422
  if (msgNum >= MAX_META_MSG_IN_BATCH) {
1,075,454!
423
    code = TSDB_CODE_INVALID_MSG;
×
424
    qError("too many msgs %d in vnode batch meta req", msgNum);
×
425
    goto _exit;
×
426
  }
427

428
  batchRsp.pRsps = taosArrayInit(msgNum, sizeof(SBatchRspMsg));
1,075,454✔
429
  if (NULL == batchRsp.pRsps) {
1,076,095✔
430
    code = terrno;
2✔
431
    qError("taosArrayInit %d SBatchRspMsg failed", msgNum);
×
432
    goto _exit;
×
433
  }
434

435
  for (int32_t i = 0; i < msgNum; ++i) {
2,324,895✔
436
    req = taosArrayGet(batchReq.pMsgs, i);
1,249,388✔
437
    if (req == NULL) {
1,248,800✔
438
      code = terrno;
84✔
439
      goto _exit;
×
440
    }
441

442
    reqMsg.msgType = req->msgType;
1,248,716✔
443
    reqMsg.pCont = req->msg;
1,248,716✔
444
    reqMsg.contLen = req->msgLen;
1,248,716✔
445

446
    switch (req->msgType) {
1,248,716!
447
      case TDMT_VND_TABLE_META:
1,242,205✔
448
        // error code has been set into reqMsg, no need to handle it here.
449
        if (TSDB_CODE_SUCCESS != vnodeGetTableMeta(pVnode, &reqMsg, false)) {
1,242,205✔
450
          qWarn("vnodeGetBatchMeta failed, msgType:%d", req->msgType);
8,137!
451
        }
452
        break;
1,242,837✔
453
      case TDMT_VND_TABLE_NAME:
534✔
454
        // error code has been set into reqMsg, no need to handle it here.
455
        if (TSDB_CODE_SUCCESS != vnodeGetTableMeta(pVnode, &reqMsg, false)) {
534✔
456
          qWarn("vnodeGetBatchName failed, msgType:%d", req->msgType);
132!
457
        }
458
        break;
533✔
459
      case TDMT_VND_TABLE_CFG:
86✔
460
        // error code has been set into reqMsg, no need to handle it here.
461
        if (TSDB_CODE_SUCCESS != vnodeGetTableCfg(pVnode, &reqMsg, false)) {
86!
462
          qWarn("vnodeGetBatchMeta failed, msgType:%d", req->msgType);
×
463
        }
464
        break;
86✔
465
      case TDMT_VND_GET_STREAM_PROGRESS:
5,891✔
466
        // error code has been set into reqMsg, no need to handle it here.
467
        if (TSDB_CODE_SUCCESS != vnodeGetStreamProgress(pVnode, &reqMsg, false)) {
5,891!
468
          qWarn("vnodeGetBatchMeta failed, msgType:%d", req->msgType);
×
469
        }
470
        break;
5,892✔
471
      default:
×
472
        qError("invalid req msgType %d", req->msgType);
×
473
        reqMsg.code = TSDB_CODE_INVALID_MSG;
×
474
        reqMsg.pCont = NULL;
×
475
        reqMsg.contLen = 0;
×
476
        break;
×
477
    }
478

479
    rsp.msgIdx = req->msgIdx;
1,249,348✔
480
    rsp.reqType = reqMsg.msgType;
1,249,348✔
481
    rsp.msgLen = reqMsg.contLen;
1,249,348✔
482
    rsp.rspCode = reqMsg.code;
1,249,348✔
483
    rsp.msg = reqMsg.pCont;
1,249,348✔
484

485
    if (NULL == taosArrayPush(batchRsp.pRsps, &rsp)) {
2,498,150!
486
      qError("taosArrayPush failed");
×
487
      code = terrno;
×
488
      goto _exit;
×
489
    }
490
  }
491

492
  rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp);
1,075,507✔
493
  if (rspSize < 0) {
1,075,771!
494
    qError("tSerializeSBatchRsp failed");
×
495
    code = terrno;
×
496
    goto _exit;
×
497
  }
498
  pRsp = rpcMallocCont(rspSize);
1,075,771✔
499
  if (pRsp == NULL) {
1,075,692!
500
    qError("rpcMallocCont %d failed", rspSize);
×
501
    code = terrno;
×
502
    goto _exit;
×
503
  }
504
  if (tSerializeSBatchRsp(pRsp, rspSize, &batchRsp) < 0) {
1,075,692!
505
    qError("tSerializeSBatchRsp %d failed", rspSize);
×
506
    code = terrno;
×
507
    goto _exit;
×
508
  }
509

510
_exit:
1,076,308✔
511

512
  rspMsg.info = pMsg->info;
1,076,308✔
513
  rspMsg.pCont = pRsp;
1,076,308✔
514
  rspMsg.contLen = rspSize;
1,076,308✔
515
  rspMsg.code = code;
1,076,308✔
516
  rspMsg.msgType = pMsg->msgType;
1,076,308✔
517

518
  if (code) {
1,076,308!
519
    qError("vnd get batch meta failed cause of %s", tstrerror(code));
×
520
  }
521

522
  taosArrayDestroyEx(batchReq.pMsgs, tFreeSBatchReqMsg);
1,076,308✔
523
  taosArrayDestroyEx(batchRsp.pRsps, tFreeSBatchRspMsg);
1,075,870✔
524

525
  tmsgSendRsp(&rspMsg);
1,076,095✔
526

527
  return code;
1,075,899✔
528
}
529

530
#define VNODE_DO_META_QUERY(pVnode, cmd)                 \
531
  do {                                                   \
532
    (void)taosThreadRwlockRdlock(&(pVnode)->metaRWLock); \
533
    cmd;                                                 \
534
    (void)taosThreadRwlockUnlock(&(pVnode)->metaRWLock); \
535
  } while (0)
536

537
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
1,047,811✔
538
  SSyncState state = syncGetState(pVnode->sync);
1,047,811✔
539
  pLoad->syncAppliedIndex = pVnode->state.applied;
1,047,811✔
540
  syncGetCommitIndex(pVnode->sync, &pLoad->syncCommitIndex);
1,047,811✔
541

542
  pLoad->vgId = TD_VID(pVnode);
1,047,811✔
543
  pLoad->syncState = state.state;
1,047,811✔
544
  pLoad->syncRestore = state.restored;
1,047,811✔
545
  pLoad->syncTerm = state.term;
1,047,811✔
546
  pLoad->roleTimeMs = state.roleTimeMs;
1,047,811✔
547
  pLoad->startTimeMs = state.startTimeMs;
1,047,811✔
548
  pLoad->syncCanRead = state.canRead;
1,047,811✔
549
  pLoad->learnerProgress = state.progress;
1,047,811✔
550
  pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
1,047,811✔
551
  pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
1,047,811✔
552
  VNODE_DO_META_QUERY(pVnode, pLoad->numOfTables = metaGetTbNum(pVnode->pMeta));
1,047,811✔
553
  VNODE_DO_META_QUERY(pVnode, pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1));
1,047,811✔
554
  pLoad->totalStorage = (int64_t)3 * 1073741824;
1,047,811✔
555
  pLoad->compStorage = (int64_t)2 * 1073741824;
1,047,811✔
556
  pLoad->pointsWritten = 100;
1,047,811✔
557
  pLoad->numOfSelectReqs = 1;
1,047,811✔
558
  pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert);
1,047,811✔
559
  pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess);
1,047,811✔
560
  pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert);
1,047,811✔
561
  pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess);
1,047,811✔
562
  return 0;
1,047,811✔
563
}
564

565
int32_t vnodeGetLoadLite(SVnode *pVnode, SVnodeLoadLite *pLoad) {
×
566
  SSyncState syncState = syncGetState(pVnode->sync);
×
567
  if (syncState.state == TAOS_SYNC_STATE_LEADER || syncState.state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
568
    pLoad->vgId = TD_VID(pVnode);
×
569
    pLoad->nTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1);
×
570
    return 0;
×
571
  }
572
  return -1;
×
573
}
574
/**
575
 * @brief Reset the statistics value by monitor interval
576
 *
577
 * @param pVnode
578
 * @param pLoad
579
 */
580
void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
32✔
581
  VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64, "nInsert");
32!
582
  VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64, "nInsertSuccess");
32!
583
  VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64, "nBatchInsert");
32!
584
  VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64,
32!
585
                            "nBatchInsertSuccess");
586
}
32✔
587

588
void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t *numOfTables, int64_t *numOfNormalTables) {
953,306✔
589
  SVnode    *pVnodeObj = pVnode;
953,306✔
590
  SVnodeCfg *pConf = &pVnodeObj->config;
953,306✔
591

592
  if (dbname) {
953,306!
593
    *dbname = pConf->dbname;
954,106✔
594
  }
595

596
  if (vgId) {
953,306✔
597
    *vgId = TD_VID(pVnodeObj);
953,145✔
598
  }
599

600
  if (numOfTables) {
953,306!
601
    *numOfTables = pConf->vndStats.numOfNTables + pConf->vndStats.numOfCTables;
×
602
  }
603

604
  if (numOfNormalTables) {
953,306!
605
    *numOfNormalTables = pConf->vndStats.numOfNTables;
×
606
  }
607
}
953,306✔
608

609
int32_t vnodeGetTableList(void *pVnode, int8_t type, SArray *pList) {
×
610
  if (type == TSDB_SUPER_TABLE) {
×
611
    return vnodeGetStbIdList(pVnode, 0, pList);
×
612
  } else {
613
    return TSDB_CODE_INVALID_PARA;
×
614
  }
615
}
616

617
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
×
618
  int32_t      code = TSDB_CODE_SUCCESS;
×
619
  SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, uid, 1);
×
620
  if (NULL == pCur) {
×
621
    qError("vnode get all table list failed");
×
622
    return terrno;
×
623
  }
624

625
  while (1) {
×
626
    tb_uid_t id = metaCtbCursorNext(pCur);
×
627
    if (id == 0) {
×
628
      break;
×
629
    }
630

631
    STableKeyInfo info = {uid = id};
×
632
    if (NULL == taosArrayPush(list, &info)) {
×
633
      qError("taosArrayPush failed");
×
634
      code = terrno;
×
635
      goto _exit;
×
636
    }
637
  }
638
_exit:
×
639
  metaCloseCtbCursor(pCur);
×
640
  return code;
×
641
}
642

643
int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg) {
×
644
  return 0;
×
645
}
646

647
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) {
2,726,054✔
648
  int32_t      code = TSDB_CODE_SUCCESS;
2,726,054✔
649
  SVnode      *pVnodeObj = pVnode;
2,726,054✔
650
  SMCtbCursor *pCur = metaOpenCtbCursor(pVnodeObj, suid, 1);
2,726,054✔
651
  if (NULL == pCur) {
2,730,439!
652
    qError("vnode get all table list failed");
×
653
    return terrno;
×
654
  }
655

656
  while (1) {
8,899,680✔
657
    tb_uid_t id = metaCtbCursorNext(pCur);
11,630,119✔
658
    if (id == 0) {
11,628,342✔
659
      break;
2,734,343✔
660
    }
661

662
    if (NULL == taosArrayPush(list, &id)) {
8,899,680!
663
      qError("taosArrayPush failed");
×
664
      code = terrno;
×
665
      goto _exit;
×
666
    }
667
  }
668

669
_exit:
2,734,343✔
670
  metaCloseCtbCursor(pCur);
2,734,343✔
671
  return code;
2,734,600✔
672
}
673

674
int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
152,320✔
675
  int32_t      code = TSDB_CODE_SUCCESS;
152,320✔
676
  SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, suid);
152,320✔
677
  if (!pCur) {
152,321!
678
    return TSDB_CODE_OUT_OF_MEMORY;
×
679
  }
680

681
  while (1) {
120,917✔
682
    tb_uid_t id = metaStbCursorNext(pCur);
273,238✔
683
    if (id == 0) {
273,245✔
684
      break;
152,321✔
685
    }
686

687
    if (NULL == taosArrayPush(list, &id)) {
120,917!
688
      qError("taosArrayPush failed");
×
689
      code = terrno;
×
690
      goto _exit;
×
691
    }
692
  }
693

694
_exit:
152,321✔
695
  metaCloseStbCursor(pCur);
152,321✔
696
  return code;
152,323✔
697
}
698

699
int32_t vnodeGetStbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg, void *arg1),
×
700
                                  void *arg) {
701
  int32_t      code = TSDB_CODE_SUCCESS;
×
702
  SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, suid);
×
703
  if (!pCur) {
×
704
    return terrno;
×
705
  }
706

707
  while (1) {
×
708
    tb_uid_t id = metaStbCursorNext(pCur);
×
709
    if (id == 0) {
×
710
      break;
×
711
    }
712

713
    if ((*filter) && (*filter)(arg, &id)) {
×
714
      continue;
×
715
    }
716

717
    if (NULL == taosArrayPush(list, &id)) {
×
718
      qError("taosArrayPush failed");
×
719
      code = terrno;
×
720
      goto _exit;
×
721
    }
722
  }
723

724
_exit:
×
725
  metaCloseStbCursor(pCur);
×
726
  return code;
×
727
}
728

729
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
21,421✔
730
  SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, suid, 0);
21,421✔
731
  if (!pCur) {
21,421!
732
    return terrno;
×
733
  }
734

735
  *num = 0;
21,421✔
736
  while (1) {
12,758✔
737
    tb_uid_t id = metaCtbCursorNext(pCur);
34,179✔
738
    if (id == 0) {
34,181✔
739
      break;
21,423✔
740
    }
741

742
    ++(*num);
12,758✔
743
  }
744

745
  metaCloseCtbCursor(pCur);
21,423✔
746
  return TSDB_CODE_SUCCESS;
21,423✔
747
}
748

749
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
21,422✔
750
  SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0);
21,422✔
751
  if (pSW) {
21,422!
752
    *num = pSW->nCols;
21,422!
753
    tDeleteSchemaWrapper(pSW);
754
  } else {
755
    *num = 2;
×
756
  }
757

758
  return TSDB_CODE_SUCCESS;
21,424✔
759
}
760

761
#ifdef TD_ENTERPRISE
762
const char *tkLogStb[] = {"cluster_info",
763
                          "data_dir",
764
                          "dnodes_info",
765
                          "d_info",
766
                          "grants_info",
767
                          "keeper_monitor",
768
                          "logs",
769
                          "log_dir",
770
                          "log_summary",
771
                          "m_info",
772
                          "taosadapter_restful_http_request_fail",
773
                          "taosadapter_restful_http_request_in_flight",
774
                          "taosadapter_restful_http_request_summary_milliseconds",
775
                          "taosadapter_restful_http_request_total",
776
                          "taosadapter_system_cpu_percent",
777
                          "taosadapter_system_mem_percent",
778
                          "temp_dir",
779
                          "vgroups_info",
780
                          "vnodes_role"};
781
const char *tkAuditStb[] = {"operations"};
782
const int   tkLogStbNum = ARRAY_SIZE(tkLogStb);
783
const int   tkAuditStbNum = ARRAY_SIZE(tkAuditStb);
784

785
// exclude stbs of taoskeeper log
786
static int32_t vnodeGetTimeSeriesBlackList(SVnode *pVnode, int32_t *tbSize) {
152,304✔
787
  int32_t      code = TSDB_CODE_SUCCESS;
152,304✔
788
  int32_t      tbNum = 0;
152,304✔
789
  const char **pTbArr = NULL;
152,304✔
790
  const char  *dbName = NULL;
152,304✔
791
  *tbSize = 0;
152,304✔
792

793
  if (!(dbName = strchr(pVnode->config.dbname, '.'))) return 0;
152,304!
794
  if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
152,304!
795
    tbNum = tkLogStbNum;
×
796
    pTbArr = (const char **)&tkLogStb;
×
797
  } else if (0 == strncmp(dbName, "audit", TSDB_DB_NAME_LEN)) {
152,304!
798
    tbNum = tkAuditStbNum;
×
799
    pTbArr = (const char **)&tkAuditStb;
×
800
  }
801
  if (tbNum && pTbArr) {
152,304!
802
    *tbSize = metaSizeOfTbFilterCache(pVnode->pMeta, 0);
×
803
    if (*tbSize < tbNum) {
×
804
      for (int32_t i = 0; i < tbNum; ++i) {
×
805
        tb_uid_t suid = metaGetTableEntryUidByName(pVnode->pMeta, pTbArr[i]);
×
806
        if (suid != 0) {
×
807
          code = metaPutTbToFilterCache(pVnode->pMeta, &suid, 0);
×
808
          if (TSDB_CODE_SUCCESS != code) {
×
809
            return code;
×
810
          }
811
        }
812
      }
813
      *tbSize = metaSizeOfTbFilterCache(pVnode->pMeta, 0);
×
814
    }
815
  }
816

817
  return code;
152,304✔
818
}
819
#endif
820

821
static bool vnodeTimeSeriesFilter(void *arg1, void *arg2) {
×
822
  SVnode *pVnode = (SVnode *)arg1;
×
823

824
  if (metaTbInFilterCache(pVnode->pMeta, arg2, 0)) {
×
825
    return true;
×
826
  }
827
  return false;
×
828
}
829

830
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num) {
152,303✔
831
  SArray *suidList = NULL;
152,303✔
832

833
  if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
152,303!
834
    return terrno;
×
835
  }
836

837
  int32_t tbFilterSize = 0;
152,304✔
838
  int32_t code = TSDB_CODE_SUCCESS;
152,304✔
839
#ifdef TD_ENTERPRISE
840
  code = vnodeGetTimeSeriesBlackList(pVnode, &tbFilterSize);
152,304✔
841
  if (TSDB_CODE_SUCCESS != code) {
152,303!
842
    goto _exit;
×
843
  }
844
#endif
845

846
  if ((!tbFilterSize && vnodeGetStbIdList(pVnode, 0, suidList) < 0) ||
152,303!
847
      (tbFilterSize && vnodeGetStbIdListByFilter(pVnode, 0, suidList, vnodeTimeSeriesFilter, pVnode) < 0)) {
152,304!
848
    qError("vgId:%d, failed to get stb id list error: %s", TD_VID(pVnode), terrstr());
×
849
    taosArrayDestroy(suidList);
×
850
    return terrno;
×
851
  }
852

853
  *num = 0;
152,305✔
854
  int64_t arrSize = taosArrayGetSize(suidList);
152,305✔
855
  for (int64_t i = 0; i < arrSize; ++i) {
273,235✔
856
    tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
120,928✔
857

858
    int64_t ctbNum = 0;
120,928✔
859
    int32_t numOfCols = 0;
120,928✔
860
    code = metaGetStbStats(pVnode, suid, &ctbNum, &numOfCols);
120,928✔
861
    if (TSDB_CODE_SUCCESS != code) {
120,931!
862
      goto _exit;
×
863
    }
864
    *num += ctbNum * (numOfCols - 1);
120,931✔
865
  }
866

867
_exit:
152,307✔
868
  taosArrayDestroy(suidList);
152,307✔
869
  return TSDB_CODE_SUCCESS;
152,306✔
870
}
871

872
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num) {
×
873
  SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, 0);
×
874
  if (!pCur) {
×
875
    return terrno;
×
876
  }
877

878
  *num = 0;
×
879
  while (1) {
×
880
    tb_uid_t id = metaStbCursorNext(pCur);
×
881
    if (id == 0) {
×
882
      break;
×
883
    }
884

885
    int64_t ctbNum = 0;
×
886
    int32_t code = vnodeGetCtbNum(pVnode, id, &ctbNum);
×
887
    if (TSDB_CODE_SUCCESS != code) {
×
888
      metaCloseStbCursor(pCur);
×
889
      return code;
×
890
    }
891

892
    *num += ctbNum;
×
893
  }
894

895
  metaCloseStbCursor(pCur);
×
896
  return TSDB_CODE_SUCCESS;
×
897
}
898

899
void *vnodeGetIdx(void *pVnode) {
61,576✔
900
  if (pVnode == NULL) {
61,576!
901
    return NULL;
×
902
  }
903

904
  return metaGetIdx(((SVnode *)pVnode)->pMeta);
61,576✔
905
}
906

907
void *vnodeGetIvtIdx(void *pVnode) {
61,504✔
908
  if (pVnode == NULL) {
61,504!
909
    return NULL;
×
910
  }
911
  return metaGetIvtIdx(((SVnode *)pVnode)->pMeta);
61,504✔
912
}
913

914
int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid) {
123✔
915
  return tsdbGetTableSchema(((SVnode *)pVnode)->pMeta, uid, pSchema, suid);
123✔
916
}
917

918
int32_t vnodeGetDBSize(void *pVnode, SDbSizeStatisInfo *pInfo) {
22✔
919
  SVnode *pVnodeObj = pVnode;
22✔
920
  if (pVnodeObj == NULL) {
22!
921
    return TSDB_CODE_VND_NOT_EXIST;
×
922
  }
923
  int32_t code = 0;
22✔
924
  char    path[TSDB_FILENAME_LEN] = {0};
22✔
925

926
  char   *dirName[] = {VNODE_TSDB_DIR, VNODE_WAL_DIR, VNODE_META_DIR, VNODE_TSDB_CACHE_DIR};
22✔
927
  int64_t dirSize[4];
928

929
  vnodeGetPrimaryDir(pVnodeObj->path, pVnodeObj->diskPrimary, pVnodeObj->pTfs, path, TSDB_FILENAME_LEN);
22✔
930
  int32_t offset = strlen(path);
22✔
931

932
  for (int i = 0; i < sizeof(dirName) / sizeof(dirName[0]); i++) {
110✔
933
    int64_t size = {0};
88✔
934
    (void)snprintf(path + offset, TSDB_FILENAME_LEN, "%s%s", TD_DIRSEP, dirName[i]);
88✔
935
    code = taosGetDirSize(path, &size);
88✔
936
    if (code != 0) {
88!
937
      return code;
×
938
    }
939
    path[offset] = 0;
88✔
940
    dirSize[i] = size;
88✔
941
  }
942

943
  pInfo->l1Size = dirSize[0] - dirSize[3];
22✔
944
  pInfo->walSize = dirSize[1];
22✔
945
  pInfo->metaSize = dirSize[2];
22✔
946
  pInfo->cacheSize = dirSize[3];
22✔
947

948
  code = tsdbGetS3Size(pVnodeObj->pTsdb, &pInfo->s3Size);
22✔
949

950
  return code;
22✔
951
}
952

953
int32_t vnodeGetStreamProgress(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
5,891✔
954
  int32_t            code = 0;
5,891✔
955
  SStreamProgressReq req;
956
  SStreamProgressRsp rsp = {0};
5,891✔
957
  SRpcMsg            rpcMsg = {.info = pMsg->info, .code = 0};
5,891✔
958
  char              *buf = NULL;
5,891✔
959
  int32_t            rspLen = 0;
5,891✔
960
  code = tDeserializeStreamProgressReq(pMsg->pCont, pMsg->contLen, &req);
5,891✔
961

962
  if (code == TSDB_CODE_SUCCESS) {
5,892!
963
    rsp.fetchIdx = req.fetchIdx;
5,892✔
964
    rsp.subFetchIdx = req.subFetchIdx;
5,892✔
965
    rsp.vgId = req.vgId;
5,892✔
966
    rsp.streamId = req.streamId;
5,892✔
967
    rspLen = tSerializeStreamProgressRsp(0, 0, &rsp);
5,892✔
968
    if (rspLen < 0) {
5,891!
969
      code = terrno;
×
970
      goto _OVER;
×
971
    }
972
    if (direct) {
5,891!
973
      buf = rpcMallocCont(rspLen);
×
974
    } else {
975
      buf = taosMemoryCalloc(1, rspLen);
5,891!
976
    }
977
    if (!buf) {
5,892!
978
      code = terrno;
×
979
      goto _OVER;
×
980
    }
981
  }
982

983
  if (code == TSDB_CODE_SUCCESS) {
5,892!
984
    code = tqGetStreamExecInfo(pVnode, req.streamId, &rsp.progressDelay, &rsp.fillHisFinished);
5,892✔
985
  }
986
  if (code == TSDB_CODE_SUCCESS) {
5,892!
987
    rspLen = tSerializeStreamProgressRsp(buf, rspLen, &rsp);
5,892✔
988
    if (rspLen < 0) {
5,892!
989
      code = terrno;
×
990
      goto _OVER;
×
991
    }
992
    rpcMsg.pCont = buf;
5,892✔
993
    buf = NULL;
5,892✔
994
    rpcMsg.contLen = rspLen;
5,892✔
995
    rpcMsg.code = code;
5,892✔
996
    rpcMsg.msgType = pMsg->msgType;
5,892✔
997
    if (direct) {
5,892!
998
      tmsgSendRsp(&rpcMsg);
×
999
    } else {
1000
      *pMsg = rpcMsg;
5,892✔
1001
    }
1002
  }
1003

1004
_OVER:
×
1005
  if (buf) {
5,892!
1006
    taosMemoryFree(buf);
×
1007
  }
1008
  return code;
5,892✔
1009
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc