• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.77
/source/dnode/vnode/src/vnd/vnodeSvr.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "audit.h"
17
#include "cos.h"
18
#include "monitor.h"
19
#include "tencode.h"
20
#include "tglobal.h"
21
#include "tmsg.h"
22
#include "tstrbuild.h"
23
#include "vnd.h"
24
#include "vnode.h"
25
#include "vnodeInt.h"
26

27
extern taos_counter_t *tsInsertCounter;
28

29
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
30
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
31
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
32
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
33
                                       SRpcMsg *pOriginRpc);
34
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
35
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
36
                                     SRpcMsg *pOriginRpc);
37
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
38
                                     SRpcMsg *pOriginalMsg);
39
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
40
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
41
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
42
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
43
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
44
static int32_t vnodeProcessS3MigrateReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
45
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
46
                                     SRpcMsg *pOriginalMsg);
47
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
48
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
49
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
50
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
51
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
52
static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
53
static int32_t vnodeProcessDropTSmaCtbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
54
                                          SRpcMsg *pOriginRpc);
55

56
static int32_t vnodeCheckToken(SVnode *pVnode, char *member0Token, char *member1Token);
57
static int32_t vnodeCheckSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
58
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
59

60
extern int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
61
extern int32_t vnodeQueryCompactProgress(SVnode *pVnode, SRpcMsg *pMsg);
62

63
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t btime, int64_t *pUid) {
135,243✔
64
  int32_t code = 0;
135,243✔
65
  int32_t lino = 0;
135,243✔
66

67
  if (tStartDecode(pCoder) < 0) {
135,243!
68
    code = TSDB_CODE_INVALID_MSG;
×
69
    TSDB_CHECK_CODE(code, lino, _exit);
×
70
  }
71

72
  // flags
73
  if (tDecodeI32v(pCoder, NULL) < 0) {
135,302!
74
    code = TSDB_CODE_INVALID_MSG;
×
75
    TSDB_CHECK_CODE(code, lino, _exit);
×
76
  }
77

78
  // name
79
  char *name = NULL;
135,302✔
80
  if (tDecodeCStr(pCoder, &name) < 0) {
135,247!
81
    code = TSDB_CODE_INVALID_MSG;
×
82
    TSDB_CHECK_CODE(code, lino, _exit);
×
83
  }
84

85
  // uid
86
  int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
135,247✔
87
  if (uid == 0) {
135,276✔
88
    uid = tGenIdPI64();
127,209✔
89
  }
90
  taosSetInt64Aligned((int64_t *)(pCoder->data + pCoder->pos), uid);
135,258✔
91

92
  // btime
93
  taosSetInt64Aligned((int64_t *)(pCoder->data + pCoder->pos + 8), btime);
135,258✔
94

95
  tEndDecode(pCoder);
135,258✔
96

97
_exit:
135,236✔
98
  if (code) {
135,236!
99
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
100
  } else {
101
    vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid);
135,236✔
102
    if (pUid) *pUid = uid;
135,236✔
103
  }
104
  return code;
135,225✔
105
}
106
static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
67,296✔
107
  int32_t code = 0;
67,296✔
108
  int32_t lino = 0;
67,296✔
109

110
  int64_t  btime = taosGetTimestampMs();
67,301✔
111
  SDecoder dc = {0};
67,301✔
112
  int32_t  nReqs;
113

114
  tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
67,301✔
115
  if (tStartDecode(&dc) < 0) {
67,287!
116
    code = TSDB_CODE_INVALID_MSG;
×
117
    return code;
×
118
  }
119

120
  if (tDecodeI32v(&dc, &nReqs) < 0) {
67,308!
121
    code = TSDB_CODE_INVALID_MSG;
×
122
    TSDB_CHECK_CODE(code, lino, _exit);
×
123
  }
124
  for (int32_t iReq = 0; iReq < nReqs; iReq++) {
158,855✔
125
    code = vnodePreprocessCreateTableReq(pVnode, &dc, btime, NULL);
91,585✔
126
    TSDB_CHECK_CODE(code, lino, _exit);
91,547!
127
  }
128

129
  tEndDecode(&dc);
67,270✔
130

131
_exit:
67,290✔
132
  tDecoderClear(&dc);
67,290✔
133
  if (code) {
67,302!
134
    vError("vgId:%d, %s:%d failed to preprocess submit request since %s, msg type:%s", TD_VID(pVnode), __func__, lino,
×
135
           tstrerror(code), TMSG_INFO(pMsg->msgType));
136
  }
137
  return code;
67,295✔
138
}
139

140
static int32_t vnodePreProcessAlterTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
1,101✔
141
  int32_t code = TSDB_CODE_INVALID_MSG;
1,101✔
142
  int32_t lino = 0;
1,101✔
143

144
  SDecoder dc = {0};
1,101✔
145
  tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
1,101✔
146

147
  SVAlterTbReq vAlterTbReq = {0};
1,101✔
148
  int64_t      ctimeMs = taosGetTimestampMs();
1,101✔
149
  if (tDecodeSVAlterTbReqSetCtime(&dc, &vAlterTbReq, ctimeMs) < 0) {
1,101!
150
    taosArrayDestroy(vAlterTbReq.pMultiTag);
×
151
    vAlterTbReq.pMultiTag = NULL;
×
152
    goto _exit;
×
153
  }
154
  taosArrayDestroy(vAlterTbReq.pMultiTag);
1,101✔
155
  vAlterTbReq.pMultiTag = NULL;
1,101✔
156

157
  code = 0;
1,101✔
158

159
_exit:
1,101✔
160
  tDecoderClear(&dc);
1,101✔
161
  if (code) {
1,101!
162
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
163
  } else {
164
    vTrace("vgId:%d %s done, table:%s ctimeMs generated:%" PRId64, TD_VID(pVnode), __func__, vAlterTbReq.tbName,
1,101✔
165
           ctimeMs);
166
  }
167
  return code;
1,101✔
168
}
169

170
static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) {
112,058✔
171
  int32_t code = TSDB_CODE_INVALID_MSG;
112,058✔
172
  int32_t lino = 0;
112,058✔
173

174
  SMsgHead *pContOld = pMsg->pCont;
112,058✔
175
  int32_t   reqLenOld = pMsg->contLen - sizeof(SMsgHead);
112,058✔
176

177
  SArray *tbUids = NULL;
112,058✔
178
  int64_t timestampMs = 0;
112,058✔
179

180
  SVDropTtlTableReq ttlReq = {0};
112,058✔
181
  if (tDeserializeSVDropTtlTableReq((char *)pContOld + sizeof(SMsgHead), reqLenOld, &ttlReq) != 0) {
112,058!
182
    code = TSDB_CODE_INVALID_MSG;
×
183
    TSDB_CHECK_CODE(code, lino, _exit);
×
184
  }
185

186
  {  // find expired uids
187
    tbUids = taosArrayInit(8, sizeof(tb_uid_t));
112,468✔
188
    if (tbUids == NULL) {
112,573!
189
      code = terrno;
×
190
      TSDB_CHECK_CODE(code, lino, _exit);
×
191
    }
192

193
    timestampMs = (int64_t)ttlReq.timestampSec * 1000;
112,573✔
194
    code = metaTtlFindExpired(pVnode->pMeta, timestampMs, tbUids, ttlReq.ttlDropMaxCount);
112,573✔
195
    if (code != 0) {
112,361!
196
      code = TSDB_CODE_INVALID_MSG;
×
197
      TSDB_CHECK_CODE(code, lino, _exit);
×
198
    }
199

200
    ttlReq.nUids = taosArrayGetSize(tbUids);
112,361✔
201
    ttlReq.pTbUids = tbUids;
112,292✔
202
  }
203

204
  if (ttlReq.nUids == 0) {
112,292!
205
    code = TSDB_CODE_MSG_PREPROCESSED;
112,338✔
206
    TSDB_CHECK_CODE(code, lino, _exit);
112,338!
207
  }
208

209
  {  // prepare new content
210
    int32_t reqLenNew = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
×
211
    int32_t contLenNew = reqLenNew + sizeof(SMsgHead);
5✔
212

213
    SMsgHead *pContNew = rpcMallocCont(contLenNew);
5✔
214
    if (pContNew == NULL) {
5!
215
      code = terrno;
×
216
      TSDB_CHECK_CODE(code, lino, _exit);
×
217
    }
218

219
    if (tSerializeSVDropTtlTableReq((char *)pContNew + sizeof(SMsgHead), reqLenNew, &ttlReq) != 0) {
5!
220
      vError("vgId:%d %s:%d failed to serialize drop ttl request", TD_VID(pVnode), __func__, lino);
5!
221
    }
222
    pContNew->contLen = htonl(reqLenNew);
5✔
223
    pContNew->vgId = pContOld->vgId;
5✔
224

225
    rpcFreeCont(pContOld);
5✔
226
    pMsg->pCont = pContNew;
5✔
227
    pMsg->contLen = contLenNew;
5✔
228
  }
229

230
  code = 0;
5✔
231

232
_exit:
112,376✔
233
  taosArrayDestroy(tbUids);
112,376✔
234

235
  if (code && code != TSDB_CODE_MSG_PREPROCESSED) {
112,348!
236
    vError("vgId:%d, %s:%d failed to preprocess drop ttl request since %s, msg type:%s", TD_VID(pVnode), __func__, lino,
×
237
           tstrerror(code), TMSG_INFO(pMsg->msgType));
238
  } else {
239
    vTrace("vgId:%d, %s done, timestampSec:%d, nUids:%d", TD_VID(pVnode), __func__, ttlReq.timestampSec, ttlReq.nUids);
112,348✔
240
  }
241

242
  return code;
112,324✔
243
}
244

245
extern int64_t tsMaxKeyByPrecision[];
246
static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t btimeMs, int64_t ctimeMs) {
10,653,778✔
247
  int32_t code = 0;
10,653,778✔
248
  int32_t lino = 0;
10,653,778✔
249

250
  if (tStartDecode(pCoder) < 0) {
10,653,778!
251
    code = TSDB_CODE_INVALID_MSG;
×
252
    TSDB_CHECK_CODE(code, lino, _exit);
×
253
  }
254

255
  SSubmitTbData submitTbData;
256
  uint8_t       version;
257
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
10,673,965!
258
    code = TSDB_CODE_INVALID_MSG;
×
259
    TSDB_CHECK_CODE(code, lino, _exit);
×
260
  }
261
  version = (submitTbData.flags >> 8) & 0xff;
10,673,965✔
262
  submitTbData.flags = submitTbData.flags & 0xff;
10,673,965✔
263

264
  int64_t uid;
265
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
10,673,965✔
266
    code = vnodePreprocessCreateTableReq(pVnode, pCoder, btimeMs, &uid);
43,664✔
267
    TSDB_CHECK_CODE(code, lino, _exit);
43,674!
268
  }
269

270
  // submit data
271
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
10,668,939!
272
    code = TSDB_CODE_INVALID_MSG;
×
273
    TSDB_CHECK_CODE(code, lino, _exit);
×
274
  }
275

276
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
10,668,939✔
277
    taosSetInt64Aligned((int64_t *)(pCoder->data + pCoder->pos), uid);
43,675✔
278
    pCoder->pos += sizeof(int64_t);
43,675✔
279
  } else {
280
    if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
10,620,005!
281
      code = TSDB_CODE_INVALID_MSG;
×
282
      TSDB_CHECK_CODE(code, lino, _exit);
×
283
    }
284
  }
285

286
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
10,652,725!
287
    code = TSDB_CODE_INVALID_MSG;
×
288
    TSDB_CHECK_CODE(code, lino, _exit);
×
289
  }
290

291
  // scan and check
292
  TSKEY now = btimeMs;
10,652,725✔
293
  if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) {
10,652,725✔
294
    now *= 1000;
22,360✔
295
  } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
10,630,365✔
296
    now *= 1000000;
3,703✔
297
  }
298

299
  int32_t keep = pVnode->config.tsdbCfg.keep2;
10,652,725✔
300
  /*
301
  int32_t nlevel = tfsGetLevel(pVnode->pTfs);
302
  if (nlevel > 1 && tsS3Enabled) {
303
    if (nlevel == 3) {
304
      keep = pVnode->config.tsdbCfg.keep1;
305
    } else if (nlevel == 2) {
306
      keep = pVnode->config.tsdbCfg.keep0;
307
    }
308
  }
309
  */
310

311
  TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * keep;
10,652,725✔
312
  TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
10,652,725✔
313
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
10,652,725✔
314
    uint64_t nColData;
315
    if (tDecodeU64v(pCoder, &nColData) < 0) {
232!
316
      code = TSDB_CODE_INVALID_MSG;
×
317
      TSDB_CHECK_CODE(code, lino, _exit);
1!
318
    }
319

320
    SColData colData = {0};
232✔
321
    code = tDecodeColData(version, pCoder, &colData);
232✔
322
    if (code) {
232!
323
      code = TSDB_CODE_INVALID_MSG;
×
324
      TSDB_CHECK_CODE(code, lino, _exit);
×
325
    }
326

327
    if (colData.flag != HAS_VALUE) {
232!
328
      code = TSDB_CODE_INVALID_MSG;
×
329
      TSDB_CHECK_CODE(code, lino, _exit);
×
330
    }
331

332
    for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
41,048✔
333
      if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
40,817!
334
        code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
1✔
335
        TSDB_CHECK_CODE(code, lino, _exit);
1!
336
      }
337
    }
338

339
    for (uint64_t i = 1; i < nColData; i++) {
953✔
340
      code = tDecodeColData(version, pCoder, &colData);
720✔
341
      if (code) {
722!
342
        code = TSDB_CODE_INVALID_MSG;
×
343
        TSDB_CHECK_CODE(code, lino, _exit);
×
344
      }
345
    }
346
  } else {
347
    uint64_t nRow;
348
    if (tDecodeU64v(pCoder, &nRow) < 0) {
10,654,321!
349
      code = TSDB_CODE_INVALID_MSG;
×
350
      TSDB_CHECK_CODE(code, lino, _exit);
499!
351
    }
352

353
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
652,483,346✔
354
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
641,829,524✔
355
      pCoder->pos += pRow->len;
641,829,524✔
356
#ifndef NO_UNALIGNED_ACCESS
357
      if (pRow->ts < minKey || pRow->ts > maxKey) {
641,829,524✔
358
#else
359
      TSKEY ts = taosGetInt64Aligned(&pRow->ts);
360
      if (ts < minKey || ts > maxKey) {
361
#endif
362
        code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
6,771✔
363
        TSDB_CHECK_CODE(code, lino, _exit);
6,771✔
364
      }
365
    }
366
  }
367

368
  if (!tDecodeIsEnd(pCoder)) {
10,654,055!
369
    taosSetInt64Aligned((int64_t *)(pCoder->data + pCoder->pos), ctimeMs);
10,656,064✔
370
    pCoder->pos += sizeof(int64_t);
10,656,064✔
371
  }
372

373
  tEndDecode(pCoder);
10,654,055✔
374

375
_exit:
10,653,190✔
376
  if (code) {
10,653,190✔
377
    vError("vgId:%d, %s:%d failed to vnodePreProcessSubmitTbData submit request since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
500!
378
  }
379
  return code;
10,653,316✔
380
}
381
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
9,991,539✔
382
  int32_t code = 0;
9,991,539✔
383
  int32_t lino = 0;
9,991,539✔
384

385
  if (tsBypassFlag & TSDB_BYPASS_RA_RPC_RECV_SUBMIT) {
9,991,539!
386
    return TSDB_CODE_MSG_PREPROCESSED;
×
387
  }
388

389
  SDecoder *pCoder = &(SDecoder){0};
9,991,539✔
390

391
  if (taosHton64(((SSubmitReq2Msg *)pMsg->pCont)->version) != 1) {
9,991,539!
392
    code = TSDB_CODE_INVALID_MSG;
×
393
    TSDB_CHECK_CODE(code, lino, _exit);
×
394
  }
395

396
  tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg));
9,991,553✔
397

398
  if (tStartDecode(pCoder) < 0) {
9,991,504!
399
    code = TSDB_CODE_INVALID_MSG;
×
400
    TSDB_CHECK_CODE(code, lino, _exit);
×
401
  }
402

403
  uint64_t nSubmitTbData;
404
  if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) {
9,991,629!
405
    code = TSDB_CODE_INVALID_MSG;
×
406
    TSDB_CHECK_CODE(code, lino, _exit);
×
407
  }
408

409
  int64_t btimeMs = taosGetTimestampMs();
9,991,564✔
410
  int64_t ctimeMs = btimeMs;
9,991,564✔
411
  for (int32_t i = 0; i < nSubmitTbData; i++) {
20,644,197✔
412
    code = vnodePreProcessSubmitTbData(pVnode, pCoder, btimeMs, ctimeMs);
10,654,159✔
413
    TSDB_CHECK_CODE(code, lino, _exit);
10,653,133✔
414
  }
415

416
  tEndDecode(pCoder);
9,990,038✔
417

418
_exit:
9,991,506✔
419
  tDecoderClear(pCoder);
9,991,506✔
420
  if (code) {
9,991,588✔
421
    vError("vgId:%d, %s:%d failed to preprocess submit request since %s, msg type:%s", TD_VID(pVnode), __func__, lino,
500!
422
           tstrerror(code), TMSG_INFO(pMsg->msgType));
423
  }
424
  return code;
9,991,567✔
425
}
426

427
static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
67,233✔
428
  int32_t code = 0;
67,233✔
429

430
  int32_t    size;
431
  int32_t    ret;
432
  uint8_t   *pCont;
433
  SEncoder  *pCoder = &(SEncoder){0};
67,233✔
434
  SDeleteRes res = {0};
67,233✔
435

436
  SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .skipRollup = 1};
67,233✔
437
  initStorageAPI(&handle.api);
67,233✔
438

439
  code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
67,234✔
440
  if (code) goto _exit;
67,237!
441

442
  res.ctimeMs = taosGetTimestampMs();
67,237✔
443
  // malloc and encode
444
  tEncodeSize(tEncodeDeleteRes, &res, size, ret);
67,237!
445
  pCont = rpcMallocCont(size + sizeof(SMsgHead));
67,237✔
446

447
  ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
67,237✔
448
  ((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
67,237✔
449

450
  tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
67,237✔
451
  if (tEncodeDeleteRes(pCoder, &res) != 0) {
67,237!
452
    vError("vgId:%d %s failed to encode delete response", TD_VID(pVnode), __func__);
×
453
  }
454
  tEncoderClear(pCoder);
67,235✔
455

456
  rpcFreeCont(pMsg->pCont);
67,236✔
457
  pMsg->pCont = pCont;
67,238✔
458
  pMsg->contLen = size + sizeof(SMsgHead);
67,238✔
459

460
  taosArrayDestroy(res.uidList);
67,238✔
461

462
_exit:
67,238✔
463
  return code;
67,238✔
464
}
465

466
static int32_t vnodePreProcessBatchDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
2,663✔
467
  int32_t code = 0;
2,663✔
468
  int32_t lino = 0;
2,663✔
469

470
  int64_t         ctimeMs = taosGetTimestampMs();
2,661✔
471
  SBatchDeleteReq pReq = {0};
2,661✔
472
  SDecoder       *pCoder = &(SDecoder){0};
2,661✔
473

474
  tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
2,661✔
475

476
  if (tDecodeSBatchDeleteReqSetCtime(pCoder, &pReq, ctimeMs) < 0) {
2,662!
477
    code = TSDB_CODE_INVALID_MSG;
×
478
  }
479

480
  tDecoderClear(pCoder);
2,663✔
481
  taosArrayDestroy(pReq.deleteReqs);
2,665✔
482

483
  if (code) {
2,664!
484
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
485
  } else {
486
    vTrace("vgId:%d %s done, ctimeMs generated:%" PRId64, TD_VID(pVnode), __func__, ctimeMs);
2,664✔
487
  }
488
  return code;
2,664✔
489
}
490

491
static int32_t vnodePreProcessArbCheckSyncMsg(SVnode *pVnode, SRpcMsg *pMsg) {
22✔
492
  SVArbCheckSyncReq syncReq = {0};
22✔
493

494
  if (tDeserializeSVArbCheckSyncReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead),
22!
495
                                    &syncReq) != 0) {
496
    return TSDB_CODE_INVALID_MSG;
×
497
  }
498

499
  int32_t ret = vnodeCheckToken(pVnode, syncReq.member0Token, syncReq.member1Token);
22✔
500
  if (ret != 0) {
22✔
501
    vError("vgId:%d, failed to preprocess arb check sync request since %s", TD_VID(pVnode), tstrerror(ret));
18!
502
  }
503

504
  int32_t code = terrno;
22✔
505
  tFreeSVArbCheckSyncReq(&syncReq);
22✔
506

507
  return code;
22✔
508
}
509

510
int32_t vnodePreProcessDropTbMsg(SVnode *pVnode, SRpcMsg *pMsg) {
2,581✔
511
  int32_t          code = TSDB_CODE_SUCCESS;
2,581✔
512
  int32_t          lino = 0;
2,581✔
513
  int32_t          size = 0;
2,581✔
514
  SDecoder         dc = {0};
2,581✔
515
  SEncoder         ec = {0};
2,581✔
516
  SVDropTbBatchReq receivedBatchReqs = {0};
2,581✔
517
  SVDropTbBatchReq sentBatchReqs = {0};
2,581✔
518

519
  tDecoderInit(&dc, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead));
2,581✔
520

521
  code = tDecodeSVDropTbBatchReq(&dc, &receivedBatchReqs);
2,581✔
522
  if (code < 0) {
2,581!
523
    terrno = code;
×
524
    TSDB_CHECK_CODE(code, lino, _exit);
×
525
  }
526
  sentBatchReqs.pArray = taosArrayInit(receivedBatchReqs.nReqs, sizeof(SVDropTbReq));
2,581✔
527
  if (!sentBatchReqs.pArray) {
2,581!
528
    code = terrno;
×
529
    goto _exit;
×
530
  }
531

532
  for (int32_t i = 0; i < receivedBatchReqs.nReqs; ++i) {
5,187✔
533
    SVDropTbReq *pReq = receivedBatchReqs.pReqs + i;
2,606✔
534
    tb_uid_t     uid = metaGetTableEntryUidByName(pVnode->pMeta, pReq->name);
2,606✔
535
    if (uid == 0) {
2,606!
536
      vWarn("vgId:%d, preprocess drop ctb: %s not found", TD_VID(pVnode), pReq->name);
×
537
      continue;
×
538
    }
539
    pReq->uid = uid;
2,606✔
540
    vDebug("vgId:%d %s for: %s, uid: %" PRId64, TD_VID(pVnode), __func__, pReq->name, pReq->uid);
2,606✔
541
    if (taosArrayPush(sentBatchReqs.pArray, pReq) == NULL) {
5,212!
542
      code = terrno;
×
543
      goto _exit;
×
544
    }
545
  }
546
  sentBatchReqs.nReqs = sentBatchReqs.pArray->size;
2,581✔
547

548
  tEncodeSize(tEncodeSVDropTbBatchReq, &sentBatchReqs, size, code);
2,581!
549
  tEncoderInit(&ec, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), size);
2,581✔
550
  code = tEncodeSVDropTbBatchReq(&ec, &sentBatchReqs);
2,581✔
551
  tEncoderClear(&ec);
2,581✔
552
  if (code != TSDB_CODE_SUCCESS) {
2,581!
553
    vError("vgId:%d %s failed to encode drop tb batch req: %s", TD_VID(pVnode), __func__, tstrerror(code));
×
554
    TSDB_CHECK_CODE(code, lino, _exit);
×
555
  }
556

557
_exit:
2,581✔
558
  tDecoderClear(&dc);
2,581✔
559
  if (sentBatchReqs.pArray) {
2,581!
560
    taosArrayDestroy(sentBatchReqs.pArray);
2,581✔
561
  }
562
  return code;
2,581✔
563
}
564

565
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
10,355,401✔
566
  int32_t code = 0;
10,355,401✔
567

568
  switch (pMsg->msgType) {
10,355,401✔
569
    case TDMT_VND_CREATE_TABLE: {
67,306✔
570
      code = vnodePreProcessCreateTableMsg(pVnode, pMsg);
67,306✔
571
    } break;
67,294✔
572
    case TDMT_VND_ALTER_TABLE: {
1,101✔
573
      code = vnodePreProcessAlterTableMsg(pVnode, pMsg);
1,101✔
574
    } break;
1,101✔
575
    case TDMT_VND_FETCH_TTL_EXPIRED_TBS:
112,441✔
576
    case TDMT_VND_DROP_TTL_TABLE: {
577
      code = vnodePreProcessDropTtlMsg(pVnode, pMsg);
112,441✔
578
    } break;
112,296✔
579
    case TDMT_VND_SUBMIT: {
9,991,609✔
580
      code = vnodePreProcessSubmitMsg(pVnode, pMsg);
9,991,609✔
581
    } break;
9,991,564✔
582
    case TDMT_VND_DELETE: {
67,234✔
583
      code = vnodePreProcessDeleteMsg(pVnode, pMsg);
67,234✔
584
    } break;
67,238✔
585
    case TDMT_VND_BATCH_DEL: {
2,664✔
586
      code = vnodePreProcessBatchDeleteMsg(pVnode, pMsg);
2,664✔
587
    } break;
2,664✔
588
    case TDMT_VND_ARB_CHECK_SYNC: {
22✔
589
      code = vnodePreProcessArbCheckSyncMsg(pVnode, pMsg);
22✔
590
    } break;
22✔
591
    case TDMT_VND_DROP_TABLE: {
2,581✔
592
      code = vnodePreProcessDropTbMsg(pVnode, pMsg);
2,581✔
593
    } break;
2,581✔
594
    default:
110,443✔
595
      break;
110,443✔
596
  }
597

598
  if (code && code != TSDB_CODE_MSG_PREPROCESSED) {
10,355,203✔
599
    vError("vgId:%d, failed to preprocess write request since %s, msg type:%s", TD_VID(pVnode), tstrerror(code),
518!
600
           TMSG_INFO(pMsg->msgType));
601
  }
602
  return code;
10,355,110✔
603
}
604

605
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg *pRsp) {
11,436,755✔
606
  int32_t code = 0;
11,436,755✔
607
  void   *ptr = NULL;
11,436,755✔
608
  void   *pReq;
609
  int32_t len;
610

611
  (void)taosThreadMutexLock(&pVnode->mutex);
11,436,755✔
612
  if (pVnode->disableWrite) {
11,437,288!
613
    (void)taosThreadMutexUnlock(&pVnode->mutex);
×
614
    vError("vgId:%d write is disabled for snapshot, version:%" PRId64, TD_VID(pVnode), ver);
×
615
    return TSDB_CODE_VND_WRITE_DISABLED;
×
616
  }
617
  (void)taosThreadMutexUnlock(&pVnode->mutex);
11,437,288✔
618

619
  if (ver <= pVnode->state.applied) {
11,437,238!
620
    vError("vgId:%d, duplicate write request. ver: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), ver,
×
621
           pVnode->state.applied);
622
    return terrno = TSDB_CODE_VND_DUP_REQUEST;
×
623
  }
624

625
  vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64 ", state.applyTerm:%" PRId64
11,437,238!
626
         ", conn.applyTerm:%" PRId64 ", contLen:%d",
627
         TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied, pVnode->state.applyTerm,
628
         pMsg->info.conn.applyTerm, pMsg->contLen);
629

630
  if (!(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm)) {
11,436,942!
631
    return terrno = TSDB_CODE_INTERNAL_ERROR;
×
632
  }
633

634
  if (!(pVnode->state.applied + 1 == ver)) {
11,436,942!
635
    return terrno = TSDB_CODE_INTERNAL_ERROR;
×
636
  }
637

638
  atomic_store_64(&pVnode->state.applied, ver);
11,436,942✔
639
  atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
11,437,268✔
640

641
  if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;
11,437,361✔
642

643
  // skip header
644
  pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
11,411,511✔
645
  len = pMsg->contLen - sizeof(SMsgHead);
11,411,511✔
646
  bool needCommit = false;
11,411,511✔
647

648
  switch (pMsg->msgType) {
11,411,511!
649
    /* META */
650
    case TDMT_VND_CREATE_STB:
27,764✔
651
      if (vnodeProcessCreateStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
27,764✔
652
      break;
27,876✔
653
    case TDMT_VND_ALTER_STB:
6,433✔
654
      if (vnodeProcessAlterStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
6,433!
655
      break;
6,444✔
656
    case TDMT_VND_DROP_STB:
2,511✔
657
      if (vnodeProcessDropStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
2,511!
658
      break;
2,523✔
659
    case TDMT_VND_CREATE_TABLE:
75,642✔
660
      if (vnodeProcessCreateTbReq(pVnode, ver, pReq, len, pRsp, pMsg) < 0) goto _err;
75,642!
661
      break;
75,689✔
662
    case TDMT_VND_ALTER_TABLE:
1,082✔
663
      if (vnodeProcessAlterTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
1,082!
664
      break;
1,082✔
665
    case TDMT_VND_DROP_TABLE:
2,623✔
666
      if (vnodeProcessDropTbReq(pVnode, ver, pReq, len, pRsp, pMsg) < 0) goto _err;
2,623!
667
      break;
2,623✔
668
    case TDMT_VND_DROP_TTL_TABLE:
×
669
      if (vnodeProcessDropTtlTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
×
670
      break;
×
671
    case TDMT_VND_FETCH_TTL_EXPIRED_TBS:
5✔
672
      if (vnodeProcessFetchTtlExpiredTbs(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
5!
673
      break;
5✔
674
    case TDMT_VND_TRIM:
264✔
675
      if (vnodeProcessTrimReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
264!
676
      break;
264✔
677
    case TDMT_VND_S3MIGRATE:
×
678
      if (vnodeProcessS3MigrateReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
×
679
      break;
×
UNCOV
680
    case TDMT_VND_CREATE_SMA:
×
UNCOV
681
      if (vnodeProcessCreateTSmaReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
×
UNCOV
682
      break;
×
683
    /* TSDB */
684
    case TDMT_VND_SUBMIT:
11,128,314✔
685
      if (vnodeProcessSubmitReq(pVnode, ver, pMsg->pCont, pMsg->contLen, pRsp, pMsg) < 0) goto _err;
11,128,314✔
686
      break;
11,127,757✔
687
    case TDMT_VND_DELETE:
68,437✔
688
      if (vnodeProcessDeleteReq(pVnode, ver, pReq, len, pRsp, pMsg) < 0) goto _err;
68,437!
689
      break;
68,434✔
690
    case TDMT_VND_BATCH_DEL:
2,657✔
691
      if (vnodeProcessBatchDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
2,657!
692
      break;
2,663✔
693
    /* TQ */
694
#if defined(USE_TQ) || defined(USE_STREAM)
695
    case TDMT_VND_TMQ_SUBSCRIBE:
3,161✔
696
      if (tqProcessSubscribeReq(pVnode->pTq, ver, pReq, len) < 0) {
3,161✔
697
        goto _err;
1✔
698
      }
699
      break;
3,163✔
700
    case TDMT_VND_TMQ_DELETE_SUB:
925✔
701
      if (tqProcessDeleteSubReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
925!
702
        goto _err;
×
703
      }
704
      break;
926✔
705
    case TDMT_VND_TMQ_COMMIT_OFFSET:
12,418✔
706
      if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, len) < 0) {
12,418!
707
        goto _err;
×
708
      }
709
      break;
12,427✔
710
    case TDMT_VND_TMQ_ADD_CHECKINFO:
122✔
711
      if (tqProcessAddCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
122!
712
        goto _err;
×
713
      }
714
      break;
122✔
715
    case TDMT_VND_TMQ_DEL_CHECKINFO:
10✔
716
      if (tqProcessDelCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
10!
717
        goto _err;
×
718
      }
719
      break;
10✔
720
    case TDMT_STREAM_TASK_DEPLOY: {
14,068✔
721
      if ((code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len)) != TSDB_CODE_SUCCESS) {
14,068!
722
        goto _err;
×
723
      }
724
    } break;
14,085✔
725
    case TDMT_STREAM_TASK_DROP: {
6,944✔
726
      if ((code = tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) {
6,944!
727
        goto _err;
×
728
      }
729
    } break;
6,959✔
730
    case TDMT_STREAM_TASK_UPDATE_CHKPT: {
5,576✔
731
      if ((code = tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) {
5,576!
732
        goto _err;
×
733
      }
734
    } break;
5,589✔
735
    case TDMT_STREAM_CONSEN_CHKPT: {
314✔
736
      if (pVnode->restored && (code = tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg)) < 0) {
314!
737
        goto _err;
×
738
      }
739

740
    } break;
314✔
741
    case TDMT_STREAM_TASK_PAUSE: {
1,505✔
742
      if (pVnode->restored && vnodeIsLeader(pVnode) &&
2,904!
743
          (code = tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) {
1,397✔
744
        goto _err;
×
745
      }
746
    } break;
1,507✔
747
    case TDMT_STREAM_TASK_RESUME: {
2,670✔
748
      if (pVnode->restored && vnodeIsLeader(pVnode) &&
5,236!
749
          (code = tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) {
2,562✔
750
        goto _err;
×
751
      }
752
    } break;
2,673✔
753
    case TDMT_VND_STREAM_TASK_RESET: {
×
754
      if (pVnode->restored && vnodeIsLeader(pVnode) && (code = tqProcessTaskResetReq(pVnode->pTq, pMsg)) < 0) {
×
755
        goto _err;
×
756
      }
757

758
    } break;
×
759
    case TDMT_VND_STREAM_ALL_STOP: {
4,159✔
760
      if (pVnode->restored && vnodeIsLeader(pVnode) && (code = tqProcessAllTaskStopReq(pVnode->pTq, pMsg)) < 0) {
4,159!
761
        goto _err;
×
762
      }
763

764
    } break;
4,164✔
765
#endif
766
    case TDMT_VND_ALTER_CONFIRM:
7,997✔
767
      needCommit = pVnode->config.hashChange;
7,997✔
768
      if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
7,997!
769
        goto _err;
×
770
      }
771
      break;
7,997✔
772
    case TDMT_VND_ALTER_CONFIG:
616✔
773
      vnodeProcessAlterConfigReq(pVnode, ver, pReq, len, pRsp);
616✔
774
      break;
620✔
775
    case TDMT_VND_COMMIT:
28,569✔
776
      needCommit = true;
28,569✔
777
      break;
28,569✔
778
    case TDMT_VND_CREATE_INDEX:
945✔
779
      vnodeProcessCreateIndexReq(pVnode, ver, pReq, len, pRsp);
945✔
780
      break;
945✔
781
    case TDMT_VND_DROP_INDEX:
2,130✔
782
      vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
2,130✔
783
      break;
2,130✔
784
    case TDMT_VND_STREAM_CHECK_POINT_SOURCE:  // always return true
3,352✔
785
      tqProcessTaskCheckPointSourceReq(pVnode->pTq, pMsg, pRsp);
3,352✔
786
      break;
3,357✔
787
    case TDMT_VND_STREAM_TASK_UPDATE:  // always return true
86✔
788
      tqProcessTaskUpdateReq(pVnode->pTq, pMsg);
86✔
789
      break;
86✔
790
    case TDMT_VND_COMPACT:
203✔
791
      vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp);
203✔
792
      goto _exit;
203✔
793
    case TDMT_SYNC_CONFIG_CHANGE:
×
794
      vnodeProcessConfigChangeReq(pVnode, ver, pReq, len, pRsp);
×
795
      break;
×
796
#ifdef TD_ENTERPRISE
797
    case TDMT_VND_KILL_COMPACT:
×
798
      vnodeProcessKillCompactReq(pVnode, ver, pReq, len, pRsp);
×
799
      break;
×
800
#endif
801
    /* ARB */
802
    case TDMT_VND_ARB_CHECK_SYNC:
9✔
803
      vnodeProcessArbCheckSyncReq(pVnode, pReq, len, pRsp);
9✔
804
      break;
9✔
805
    default:
×
806
      vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
×
807
      return TSDB_CODE_INVALID_MSG;
×
808
  }
809

810
  vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
11,411,012!
811
         ver);
812

813
  walApplyVer(pVnode->pWal, ver);
11,411,012✔
814

815
  code = tqPushMsg(pVnode->pTq, pMsg->msgType);
11,410,843✔
816
  if (code) {
11,410,567!
817
    vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
×
818
    return code;
×
819
  }
820

821
  // commit if need
822
  if (needCommit) {
11,410,567✔
823
    vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), ver);
28,582✔
824
    code = vnodeAsyncCommit(pVnode);
28,632✔
825
    if (code) {
28,628!
826
      vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno));
×
827
      goto _err;
×
828
    }
829

830
    // start a new one
831
    code = vnodeBegin(pVnode);
28,628✔
832
    if (code) {
28,621!
UNCOV
833
      vError("vgId:%d, failed to begin vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
×
834
      goto _err;
×
835
    }
836
  }
837

838
_exit:
11,410,623✔
839
  return 0;
11,435,977✔
840

841
_err:
12✔
842
  vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
12!
843
         tstrerror(terrno), ver);
844
  return code;
12✔
845
}
846

847
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
11,110,230✔
848
  if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
11,110,230✔
849
    return 0;
3,643,729✔
850
  }
851

852
  return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType);
7,466,501✔
853
}
854

855
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
11,116,215✔
856
  vTrace("message in vnode query queue is processing");
11,116,215✔
857
  if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !syncIsReadyForRead(pVnode->sync)) {
11,116,216✔
858
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
3,848✔
859
    return 0;
3,848✔
860
  }
861

862
  if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) {
11,112,288!
863
    vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
×
864
    return 0;
×
865
  }
866

867
  SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->workerCb};
11,112,288✔
868
  initStorageAPI(&handle.api);
11,112,288✔
869
  int32_t code = TSDB_CODE_SUCCESS;
11,108,168✔
870
  bool    redirected = false;
11,108,168✔
871

872
  switch (pMsg->msgType) {
11,108,168!
873
    case TDMT_SCH_QUERY:
5,719,348✔
874
      if (!syncIsReadyForRead(pVnode->sync)) {
5,719,348✔
875
        pMsg->code = (terrno) ? terrno : TSDB_CODE_SYN_NOT_LEADER;
2,996!
876
        redirected = true;
2,996✔
877
      }
878
      code = qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
5,720,594✔
879
      if (redirected) {
5,720,562✔
880
        vnodeRedirectRpcMsg(pVnode, pMsg, pMsg->code);
3,068✔
881
        return 0;
2,996✔
882
      }
883
      return code;
5,717,494✔
884
    case TDMT_SCH_MERGE_QUERY:
1,750,567✔
885
      return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
1,750,567✔
886
    case TDMT_SCH_QUERY_CONTINUE:
433,353✔
887
      return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
433,353✔
888
    case TDMT_VND_TMQ_CONSUME:
3,178,797✔
889
      return tqProcessPollReq(pVnode->pTq, pMsg);
3,178,797✔
890
    case TDMT_VND_TMQ_CONSUME_PUSH:
27,245✔
891
      return tqProcessPollPush(pVnode->pTq);
27,245✔
UNCOV
892
    default:
×
UNCOV
893
      vError("unknown msg type:%d in query queue", pMsg->msgType);
×
894
      return TSDB_CODE_APP_ERROR;
×
895
  }
896
}
897

898
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
20,252,846✔
899
  vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
20,252,846✔
900
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
20,252,848!
901
       pMsg->msgType == TDMT_VND_BATCH_META || pMsg->msgType == TDMT_VND_TABLE_NAME || pMsg->msgType == TDMT_VND_VSUBTABLES_META) &&
14,118,873!
902
      !syncIsReadyForRead(pVnode->sync)) {
7,246,523✔
903
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
2,076✔
904
    return 0;
2,076✔
905
  }
906

907
  switch (pMsg->msgType) {
20,253,933!
908
    case TDMT_SCH_FETCH:
9,790,400✔
909
    case TDMT_SCH_MERGE_FETCH:
910
      return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
9,790,400✔
911
    case TDMT_SCH_FETCH_RSP:
×
912
      return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
×
913
    // case TDMT_SCH_CANCEL_TASK:
914
    //   return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
915
    case TDMT_SCH_DROP_TASK:
7,467,442✔
916
      return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
7,467,442✔
917
    case TDMT_SCH_TASK_NOTIFY:
2,149✔
918
      return qWorkerProcessNotifyMsg(pVnode, pVnode->pQuery, pMsg, 0);
2,149✔
919
    case TDMT_SCH_QUERY_HEARTBEAT:
1,883,525✔
920
      return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
1,883,525✔
921
    case TDMT_VND_TABLE_META:
1,888✔
922
    case TDMT_VND_TABLE_NAME:
923
      return vnodeGetTableMeta(pVnode, pMsg, true);
1,888✔
924
    case TDMT_VND_TABLE_CFG:
×
925
      return vnodeGetTableCfg(pVnode, pMsg, true);
×
926
    case TDMT_VND_BATCH_META:
1,108,405✔
927
      return vnodeGetBatchMeta(pVnode, pMsg);
1,108,405✔
NEW
928
    case TDMT_VND_VSUBTABLES_META:
×
NEW
929
      return vnodeGetVSubtablesMeta(pVnode, pMsg);
×
930
#ifdef TD_ENTERPRISE
931
    case TDMT_VND_QUERY_COMPACT_PROGRESS:
48✔
932
      return vnodeQueryCompactProgress(pVnode, pMsg);
48✔
933
#endif
934
      //    case TDMT_VND_TMQ_CONSUME:
935
      //      return tqProcessPollReq(pVnode->pTq, pMsg);
936
#ifdef USE_TQ
937
    case TDMT_VND_TMQ_VG_WALINFO:
43✔
938
      return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
43✔
939
    case TDMT_VND_TMQ_VG_COMMITTEDINFO:
5✔
940
      return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
5✔
941
    case TDMT_VND_TMQ_SEEK:
28✔
942
      return tqProcessSeekReq(pVnode->pTq, pMsg);
28✔
943
#endif
944
    default:
×
945
      vError("unknown msg type:%d in fetch queue", pMsg->msgType);
×
946
      return TSDB_CODE_APP_ERROR;
×
947
  }
948
}
949
#ifdef USE_STREAM
950
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
373,547✔
951
  vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg);
373,547✔
952

953
  // todo: NOTE: some command needs to run on follower, such as, stop_all_tasks
954
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
373,547!
955
       pMsg->msgType == TDMT_VND_BATCH_META) &&
373,580✔
956
      !syncIsReadyForRead(pVnode->sync)) {
37!
957
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
×
958
    return 0;
×
959
  }
960

961
  switch (pMsg->msgType) {
373,510!
962
    case TDMT_STREAM_TASK_RUN:
372,556✔
963
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
372,556✔
964
    case TDMT_STREAM_RETRIEVE:
534✔
965
      return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
534✔
966
    case TDMT_STREAM_RETRIEVE_RSP:
420✔
967
      return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
420✔
968
    case TDMT_VND_GET_STREAM_PROGRESS:
×
969
      return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
×
970
    default:
×
971
      vError("unknown msg type:%d in stream queue", pMsg->msgType);
×
972
      return TSDB_CODE_APP_ERROR;
×
973
  }
974
}
975

976
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
216,825✔
977
  vTrace("vgId:%d, msg:%p in stream ctrl queue is processing", pVnode->config.vgId, pMsg);
216,825✔
978
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
216,825!
979
       pMsg->msgType == TDMT_VND_BATCH_META) &&
216,825!
980
      !syncIsReadyForRead(pVnode->sync)) {
×
981
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
×
982
    return 0;
×
983
  }
984

985
  switch (pMsg->msgType) {
216,825!
986
    case TDMT_MND_STREAM_HEARTBEAT_RSP:
27,768✔
987
      return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
27,768✔
988
    case TDMT_STREAM_TASK_DISPATCH:
58,512✔
989
      return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
58,512✔
990
    case TDMT_STREAM_TASK_DISPATCH_RSP:
58,445✔
991
      return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
58,445✔
992
    case TDMT_VND_STREAM_TASK_CHECK:
22,545✔
993
      return tqProcessTaskCheckReq(pVnode->pTq, pMsg);
22,545✔
994
    case TDMT_VND_STREAM_TASK_CHECK_RSP:
22,963✔
995
      return tqProcessTaskCheckRsp(pVnode->pTq, pMsg);
22,963✔
996
    case TDMT_STREAM_TASK_CHECKPOINT_READY:
7,978✔
997
      return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
7,978✔
998
    case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
7,976✔
999
      return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg);
7,976✔
1000
    case TDMT_STREAM_RETRIEVE_TRIGGER:
×
1001
      return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg);
×
1002
    case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
×
1003
      return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg);
×
1004
    case TDMT_MND_STREAM_REQ_CHKPT_RSP:
4,383✔
1005
      return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
4,383✔
1006
    case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
6,255✔
1007
      return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg);
6,255✔
1008
    default:
×
1009
      vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType);
×
1010
      return TSDB_CODE_APP_ERROR;
×
1011
  }
1012
}
1013

1014
int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
2,527✔
1015
  vTrace("vgId:%d, msg:%p in stream long exec queue is processing", pVnode->config.vgId, pMsg);
2,527✔
1016
  if (!syncIsReadyForRead(pVnode->sync)) {
2,527!
1017
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
×
1018
    return 0;
×
1019
  }
1020

1021
  switch (pMsg->msgType) {
2,527!
1022
    case TDMT_VND_STREAM_SCAN_HISTORY:
2,527✔
1023
      return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
2,527✔
1024
    default:
×
1025
      vError("unknown msg type:%d in stream long exec queue", pMsg->msgType);
×
1026
      return TSDB_CODE_APP_ERROR;
×
1027
  }
1028
}
1029

1030
int32_t vnodeProcessStreamChkptMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
13,049✔
1031
  vTrace("vgId:%d, msg:%p in stream chkpt queue is processing", pVnode->config.vgId, pMsg);
13,049✔
1032
  if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
13,049!
1033
       pMsg->msgType == TDMT_VND_BATCH_META) &&
13,049!
1034
      !syncIsReadyForRead(pVnode->sync)) {
×
1035
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
×
1036
    return 0;
×
1037
  }
1038

1039
  switch (pMsg->msgType) {
13,049!
1040
    case TDMT_STREAM_CHKPT_EXEC:
13,049✔
1041
      return tqProcessTaskRunReq(pVnode->pTq, pMsg);
13,049✔
1042
    default:
×
1043
      vError("unknown msg type:%d in stream chkpt queue", pMsg->msgType);
×
1044
      return TSDB_CODE_APP_ERROR;
×
1045
  }
1046
}
1047
#endif
1048

UNCOV
1049
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
×
UNCOV
1050
  int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
×
UNCOV
1051
  if (code) {
×
1052
    vError("failed to process sma result since %s", tstrerror(code));
×
1053
  }
UNCOV
1054
}
×
1055

1056
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
152,318✔
1057
  if (NULL == pMetaRsp) {
152,318!
1058
    return;
×
1059
  }
1060

1061
  tstrncpy(pMetaRsp->dbFName, pVnode->config.dbname, TSDB_DB_FNAME_LEN);
152,318✔
1062
  pMetaRsp->dbId = pVnode->config.dbId;
152,318✔
1063
  pMetaRsp->vgId = TD_VID(pVnode);
152,318✔
1064
  pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
152,318✔
1065
}
1066

1067
extern int32_t vnodeAsyncRetention(SVnode *pVnode, int64_t now);
1068

1069
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
264✔
1070
  if (!pVnode->restored) {
264✔
1071
    vInfo("vgId:%d, ignore trim req during restoring. ver:%" PRId64, TD_VID(pVnode), ver);
17!
1072
    return 0;
17✔
1073
  }
1074

1075
  int32_t     code = 0;
247✔
1076
  SVTrimDbReq trimReq = {0};
247✔
1077

1078
  // decode
1079
  if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
247!
1080
    code = TSDB_CODE_INVALID_MSG;
×
1081
    goto _exit;
×
1082
  }
1083

1084
  vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
247!
1085

1086
  code = vnodeAsyncRetention(pVnode, trimReq.timestamp);
247✔
1087

1088
_exit:
247✔
1089
  return code;
247✔
1090
}
1091

1092
extern int32_t vnodeAsyncS3Migrate(SVnode *pVnode, int64_t now);
1093

1094
static int32_t vnodeProcessS3MigrateReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
×
1095
  int32_t          code = 0;
×
1096
  SVS3MigrateDbReq s3migrateReq = {0};
×
1097

1098
  // decode
1099
  if (tDeserializeSVS3MigrateDbReq(pReq, len, &s3migrateReq) != 0) {
×
1100
    code = TSDB_CODE_INVALID_MSG;
×
1101
    goto _exit;
×
1102
  }
1103

1104
  vInfo("vgId:%d, s3migrate vnode request will be processed, time:%d", pVnode->config.vgId, s3migrateReq.timestamp);
×
1105

1106
  code = vnodeAsyncS3Migrate(pVnode, s3migrateReq.timestamp);
×
1107

1108
_exit:
×
1109
  return code;
×
1110
}
1111

1112
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
×
1113
  int               ret = 0;
×
1114
  SVDropTtlTableReq ttlReq = {0};
×
1115
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
×
1116
    ret = TSDB_CODE_INVALID_MSG;
×
1117
    goto end;
×
1118
  }
1119

1120
  if (ttlReq.nUids != taosArrayGetSize(ttlReq.pTbUids)) {
×
1121
    ret = TSDB_CODE_INVALID_MSG;
×
1122
    goto end;
×
1123
  }
1124

1125
  if (ttlReq.nUids != 0) {
×
1126
    vInfo("vgId:%d, drop ttl table req will be processed, time:%d, ntbUids:%d", pVnode->config.vgId,
×
1127
          ttlReq.timestampSec, ttlReq.nUids);
1128
  }
1129

1130
  if (ttlReq.nUids > 0) {
×
1131
    int32_t code = metaDropMultipleTables(pVnode->pMeta, ver, ttlReq.pTbUids);
×
1132
    if (code) return code;
×
1133

1134
    code = tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false);
×
1135
    if (code) {
×
1136
      vError("vgId:%d, failed to update tbUid list since %s", TD_VID(pVnode), tstrerror(code));
×
1137
    }
1138
  }
1139

1140
end:
×
1141
  taosArrayDestroy(ttlReq.pTbUids);
×
1142
  return ret;
×
1143
}
1144

1145
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
5✔
1146
  int32_t                 code = -1;
5✔
1147
  SMetaReader             mr = {0};
5✔
1148
  SVDropTtlTableReq       ttlReq = {0};
5✔
1149
  SVFetchTtlExpiredTbsRsp rsp = {0};
5✔
1150
  SEncoder                encoder = {0};
5✔
1151
  SArray                 *pNames = NULL;
5✔
1152
  pRsp->msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP;
5✔
1153
  pRsp->code = TSDB_CODE_SUCCESS;
5✔
1154
  pRsp->pCont = NULL;
5✔
1155
  pRsp->contLen = 0;
5✔
1156

1157
  if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
5!
1158
    terrno = TSDB_CODE_INVALID_MSG;
×
1159
    goto _end;
×
1160
  }
1161

1162
  if (!(ttlReq.nUids == taosArrayGetSize(ttlReq.pTbUids))) {
5!
1163
    terrno = TSDB_CODE_INVALID_MSG;
×
1164
    goto _end;
×
1165
  }
1166

1167
  tb_uid_t    suid;
1168
  char        ctbName[TSDB_TABLE_NAME_LEN];
1169
  SVDropTbReq expiredTb = {.igNotExists = true};
5✔
1170
  metaReaderDoInit(&mr, pVnode->pMeta, 0);
5✔
1171
  rsp.vgId = TD_VID(pVnode);
5✔
1172
  rsp.pExpiredTbs = taosArrayInit(ttlReq.nUids, sizeof(SVDropTbReq));
5✔
1173
  if (!rsp.pExpiredTbs) goto _end;
5!
1174

1175
  pNames = taosArrayInit(ttlReq.nUids, TSDB_TABLE_NAME_LEN);
5✔
1176
  if (!pNames) {
5!
1177
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1178
    goto _end;
×
1179
  }
1180
  char buf[TSDB_TABLE_NAME_LEN];
1181
  for (int32_t i = 0; i < ttlReq.nUids; ++i) {
10✔
1182
    tb_uid_t *uid = taosArrayGet(ttlReq.pTbUids, i);
5✔
1183
    expiredTb.suid = *uid;
5✔
1184
    terrno = metaReaderGetTableEntryByUid(&mr, *uid);
5✔
1185
    if (terrno < 0) goto _end;
5!
1186
    tstrncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN);
5✔
1187
    void *p = taosArrayPush(pNames, buf);
5✔
1188
    if (p == NULL) {
5!
1189
      goto _end;
×
1190
    }
1191

1192
    expiredTb.name = p;
5✔
1193
    if (mr.me.type == TSDB_CHILD_TABLE) {
5!
1194
      expiredTb.suid = mr.me.ctbEntry.suid;
5✔
1195
    }
1196

1197
    if (taosArrayPush(rsp.pExpiredTbs, &expiredTb) == NULL) {
10!
1198
      goto _end;
×
1199
    }
1200
  }
1201

1202
  int32_t ret = 0;
5✔
1203
  tEncodeSize(tEncodeVFetchTtlExpiredTbsRsp, &rsp, pRsp->contLen, ret);
5!
1204
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
5✔
1205
  if (pRsp->pCont == NULL) {
5!
1206
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1207
    code = -1;
×
1208
    goto _end;
×
1209
  }
1210
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
5✔
1211
  terrno = tEncodeVFetchTtlExpiredTbsRsp(&encoder, &rsp);
5✔
1212
  tEncoderClear(&encoder);
5✔
1213

1214
  if (terrno == 0) code = 0;
5!
1215
_end:
×
1216
  metaReaderClear(&mr);
5✔
1217
  tFreeFetchTtlExpiredTbsRsp(&rsp);
5✔
1218
  taosArrayDestroy(ttlReq.pTbUids);
5✔
1219
  if (pNames) taosArrayDestroy(pNames);
5!
1220
  pRsp->code = terrno;
5✔
1221
  return code;
5✔
1222
}
1223

1224
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
27,790✔
1225
  int32_t        code = 0;
27,790✔
1226
  SVCreateStbReq req = {0};
27,790✔
1227
  SDecoder       coder;
1228

1229
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
27,790✔
1230
  pRsp->code = TSDB_CODE_SUCCESS;
27,790✔
1231
  pRsp->pCont = NULL;
27,790✔
1232
  pRsp->contLen = 0;
27,790✔
1233

1234
  // decode and process req
1235
  tDecoderInit(&coder, pReq, len);
27,790✔
1236

1237
  code = tDecodeSVCreateStbReq(&coder, &req);
27,705✔
1238
  if (code) {
27,755!
1239
    pRsp->code = code;
×
1240
    goto _err;
×
1241
  }
1242

1243
  code = metaCreateSuperTable(pVnode->pMeta, ver, &req);
27,755✔
1244
  if (code) {
27,894✔
1245
    pRsp->code = code;
3✔
1246
    goto _err;
3✔
1247
  }
1248

1249
  if ((code = tdProcessRSmaCreate(pVnode->pSma, &req)) < 0) {
27,891!
1250
    pRsp->code = code;
×
1251
    goto _err;
×
1252
  }
1253

1254
  tDecoderClear(&coder);
27,880✔
1255
  return 0;
27,857✔
1256

1257
_err:
3✔
1258
  tDecoderClear(&coder);
3✔
1259
  return code;
3✔
1260
}
1261

1262
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
75,644✔
1263
                                       SRpcMsg *pOriginRpc) {
1264
  SDecoder           decoder = {0};
75,644✔
1265
  SEncoder           encoder = {0};
75,644✔
1266
  int32_t            rcode = 0;
75,644✔
1267
  SVCreateTbBatchReq req = {0};
75,644✔
1268
  SVCreateTbReq     *pCreateReq;
1269
  SVCreateTbBatchRsp rsp = {0};
75,644✔
1270
  SVCreateTbRsp      cRsp = {0};
75,644✔
1271
  char               tbName[TSDB_TABLE_FNAME_LEN];
1272
  STbUidStore       *pStore = NULL;
75,644✔
1273
  SArray            *tbUids = NULL;
75,644✔
1274
  SArray            *tbNames = NULL;
75,644✔
1275
  pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
75,644✔
1276
  pRsp->code = TSDB_CODE_SUCCESS;
75,644✔
1277
  pRsp->pCont = NULL;
75,644✔
1278
  pRsp->contLen = 0;
75,644✔
1279

1280
  // decode
1281
  tDecoderInit(&decoder, pReq, len);
75,644✔
1282
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
75,639!
1283
    rcode = -1;
×
1284
    terrno = TSDB_CODE_INVALID_MSG;
×
1285
    goto _exit;
×
1286
  }
1287

1288
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
75,665✔
1289
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
75,682✔
1290
  tbNames = taosArrayInit(req.nReqs, sizeof(char *));
75,687✔
1291
  if (rsp.pArray == NULL || tbUids == NULL || tbNames == NULL) {
75,682!
1292
    rcode = -1;
×
1293
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1294
    goto _exit;
×
1295
  }
1296

1297
  // loop to create table
1298
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
190,857✔
1299
    pCreateReq = req.pReqs + iReq;
115,161✔
1300
    memset(&cRsp, 0, sizeof(cRsp));
115,161✔
1301

1302
    if (tsEnableAudit && tsEnableAuditCreateTable) {
115,161!
1303
      char *str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
115,161!
1304
      if (str == NULL) {
115,159!
1305
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1306
        rcode = -1;
×
1307
        goto _exit;
×
1308
      }
1309
      tstrncpy(str, pCreateReq->name, TSDB_TABLE_FNAME_LEN);
115,159✔
1310
      if (taosArrayPush(tbNames, &str) == NULL) {
115,166!
1311
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1312
        rcode = -1;
×
1313
        goto _exit;
×
1314
      }
1315
    }
1316

1317
    // validate hash
1318
    (void)tsnprintf(tbName, TSDB_TABLE_FNAME_LEN, "%s.%s", pVnode->config.dbname, pCreateReq->name);
115,166✔
1319
    if (vnodeValidateTableHash(pVnode, tbName) < 0) {
115,164✔
1320
      cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
31✔
1321
      if (taosArrayPush(rsp.pArray, &cRsp) == NULL) {
62!
1322
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1323
        rcode = -1;
×
1324
        goto _exit;
×
1325
      }
1326
      vError("vgId:%d create-table:%s failed due to hash value mismatch", TD_VID(pVnode), tbName);
31!
1327
      continue;
31✔
1328
    }
1329

1330
    // do create table
1331
    if (metaCreateTable2(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
115,132✔
1332
      if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
1,362!
1333
        cRsp.code = TSDB_CODE_SUCCESS;
3✔
1334
      } else {
1335
        cRsp.code = terrno;
1,359✔
1336
      }
1337
    } else {
1338
      cRsp.code = TSDB_CODE_SUCCESS;
113,783✔
1339
      if (tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid) < 0) {
113,783!
1340
        vError("vgId:%d, failed to fetch tbUid list", TD_VID(pVnode));
×
1341
      }
1342
      if (taosArrayPush(tbUids, &pCreateReq->uid) == NULL) {
227,568!
1343
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1344
        rcode = -1;
×
1345
        goto _exit;
×
1346
      }
1347
      vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
113,783✔
1348
    }
1349

1350
    if (taosArrayPush(rsp.pArray, &cRsp) == NULL) {
230,288!
1351
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1352
      rcode = -1;
×
1353
      goto _exit;
×
1354
    }
1355
  }
1356

1357
  vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
75,696✔
1358
  if (tqUpdateTbUidList(pVnode->pTq, tbUids, true) < 0) {
75,696!
1359
    vError("vgId:%d, failed to update tbUid list since %s", TD_VID(pVnode), tstrerror(terrno));
×
1360
  }
1361
  if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
75,692!
1362
    goto _exit;
×
1363
  }
1364
  pStore = tdUidStoreFree(pStore);
75,691✔
1365

1366
  // prepare rsp
1367
  int32_t ret = 0;
75,689✔
1368
  tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
75,689!
1369
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
75,678✔
1370
  if (pRsp->pCont == NULL) {
75,689!
1371
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1372
    rcode = -1;
×
1373
    goto _exit;
×
1374
  }
1375
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
75,689✔
1376
  if (tEncodeSVCreateTbBatchRsp(&encoder, &rsp) != 0) {
75,688!
1377
    vError("vgId:%d, failed to encode create table batch response", TD_VID(pVnode));
×
1378
  }
1379

1380
  if (tsEnableAudit && tsEnableAuditCreateTable) {
75,690!
1381
    int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
75,688✔
1382

1383
    SName name = {0};
75,688✔
1384
    if (tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB) < 0) {
75,688!
1385
      vError("vgId:%d, failed to get name from string", TD_VID(pVnode));
×
1386
    }
1387

1388
    SStringBuilder sb = {0};
75,685✔
1389
    for (int32_t i = 0; i < tbNames->size; i++) {
190,846✔
1390
      char **key = (char **)taosArrayGet(tbNames, i);
115,166✔
1391
      taosStringBuilderAppendStringLen(&sb, *key, strlen(*key));
115,165✔
1392
      if (i < tbNames->size - 1) {
115,161✔
1393
        taosStringBuilderAppendChar(&sb, ',');
39,485✔
1394
      }
1395
      // taosMemoryFreeClear(*key);
1396
    }
1397

1398
    size_t len = 0;
75,680✔
1399
    char  *keyJoined = taosStringBuilderGetResult(&sb, &len);
75,680✔
1400

1401
    if (pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0) {
75,683!
1402
      auditAddRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len);
60,439✔
1403
    }
1404

1405
    taosStringBuilderDestroy(&sb);
75,682✔
1406
  }
1407

1408
_exit:
2✔
1409
  tDeleteSVCreateTbBatchReq(&req);
75,687✔
1410
  taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
75,688✔
1411
  taosArrayDestroy(tbUids);
75,690✔
1412
  tDecoderClear(&decoder);
75,688✔
1413
  tEncoderClear(&encoder);
75,692✔
1414
  taosArrayDestroyP(tbNames, NULL);
75,690✔
1415
  return rcode;
75,690✔
1416
}
1417

1418
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
6,433✔
1419
  int32_t        code = 0;
6,433✔
1420
  SVCreateStbReq req = {0};
6,433✔
1421
  SDecoder       dc = {0};
6,433✔
1422

1423
  pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
6,433✔
1424
  pRsp->code = TSDB_CODE_SUCCESS;
6,433✔
1425
  pRsp->pCont = NULL;
6,433✔
1426
  pRsp->contLen = 0;
6,433✔
1427

1428
  tDecoderInit(&dc, pReq, len);
6,433✔
1429

1430
  // decode req
1431
  code = tDecodeSVCreateStbReq(&dc, &req);
6,433✔
1432
  if (code) {
6,437!
1433
    tDecoderClear(&dc);
×
1434
    return code;
×
1435
  }
1436

1437
  code = metaAlterSuperTable(pVnode->pMeta, ver, &req);
6,437✔
1438
  if (code) {
6,444!
1439
    pRsp->code = code;
×
1440
    tDecoderClear(&dc);
×
1441
    return code;
×
1442
  }
1443

1444
  tDecoderClear(&dc);
6,444✔
1445

1446
  return 0;
6,444✔
1447
}
1448

1449
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
2,512✔
1450
  SVDropStbReq req = {0};
2,512✔
1451
  int32_t      rcode = TSDB_CODE_SUCCESS;
2,512✔
1452
  SDecoder     decoder = {0};
2,512✔
1453
  SArray      *tbUidList = NULL;
2,512✔
1454

1455
  pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
2,512✔
1456
  pRsp->pCont = NULL;
2,512✔
1457
  pRsp->contLen = 0;
2,512✔
1458

1459
  // decode request
1460
  tDecoderInit(&decoder, pReq, len);
2,512✔
1461
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
2,509!
1462
    rcode = TSDB_CODE_INVALID_MSG;
×
1463
    goto _exit;
×
1464
  }
1465

1466
  // process request
1467
  tbUidList = taosArrayInit(8, sizeof(int64_t));
2,516✔
1468
  if (tbUidList == NULL) goto _exit;
2,516!
1469
  if (metaDropSuperTable(pVnode->pMeta, ver, &req) < 0) {
2,516✔
1470
    rcode = terrno;
5✔
1471
    goto _exit;
5✔
1472
  }
1473

1474
  if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) {
2,517!
1475
    rcode = terrno;
×
1476
    goto _exit;
×
1477
  }
1478

1479
  if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
2,518!
1480
    rcode = terrno;
×
1481
    goto _exit;
×
1482
  }
1483

1484
  // return rsp
1485
_exit:
2,518✔
1486
  if (tbUidList) taosArrayDestroy(tbUidList);
2,523!
1487
  pRsp->code = rcode;
2,523✔
1488
  tDecoderClear(&decoder);
2,523✔
1489
  return 0;
2,523✔
1490
}
1491

1492
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
1,082✔
1493
  SVAlterTbReq  vAlterTbReq = {0};
1,082✔
1494
  SVAlterTbRsp  vAlterTbRsp = {0};
1,082✔
1495
  SDecoder      dc = {0};
1,082✔
1496
  int32_t       code = 0;
1,082✔
1497
  int32_t       lino = 0;
1,082✔
1498
  int32_t       ret;
1499
  SEncoder      ec = {0};
1,082✔
1500
  STableMetaRsp vMetaRsp = {0};
1,082✔
1501

1502
  pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
1,082✔
1503
  pRsp->pCont = NULL;
1,082✔
1504
  pRsp->contLen = 0;
1,082✔
1505
  pRsp->code = TSDB_CODE_SUCCESS;
1,082✔
1506

1507
  tDecoderInit(&dc, pReq, len);
1,082✔
1508

1509
  // decode
1510
  if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
1,082!
1511
    vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
×
1512
    tDecoderClear(&dc);
×
1513
    goto _exit;
×
1514
  }
1515

1516
  // process
1517
  if (metaAlterTable(pVnode->pMeta, ver, &vAlterTbReq, &vMetaRsp) < 0) {
1,082✔
1518
    vAlterTbRsp.code = terrno;
36✔
1519
    tDecoderClear(&dc);
36✔
1520
    goto _exit;
36✔
1521
  }
1522
  tDecoderClear(&dc);
1,046✔
1523

1524
  if (NULL != vMetaRsp.pSchemas) {
1,046✔
1525
    vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
303✔
1526
    vAlterTbRsp.pMeta = &vMetaRsp;
303✔
1527
  }
1528

1529
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL || vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
1,046✔
1530
    int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, vAlterTbReq.tbName);
660✔
1531
    if (uid == 0) {
660!
1532
      vError("vgId:%d, %s failed at %s:%d since table %s not found", TD_VID(pVnode), __func__, __FILE__, __LINE__,
×
1533
             vAlterTbReq.tbName);
1534
      goto _exit;
×
1535
    }
1536

1537
    SArray* tbUids = taosArrayInit(4, sizeof(int64_t));
660✔
1538
    void* p = taosArrayPush(tbUids, &uid);
660✔
1539
    TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
660!
1540

1541
    vDebug("vgId:%d, remove tags value altered table:%s from query table list", TD_VID(pVnode), vAlterTbReq.tbName);
660✔
1542
    if ((code = tqUpdateTbUidList(pVnode->pTq, tbUids, false)) < 0) {
660!
1543
      vError("vgId:%d, failed to remove tbUid list since %s", TD_VID(pVnode), tstrerror(code));
×
1544
    }
1545

1546
    vDebug("vgId:%d, try to add table:%s in query table list", TD_VID(pVnode), vAlterTbReq.tbName);
660✔
1547
    if ((code = tqUpdateTbUidList(pVnode->pTq, tbUids, true)) < 0) {
660!
1548
      vError("vgId:%d, failed to add tbUid list since %s", TD_VID(pVnode), tstrerror(code));
×
1549
    }
1550

1551
    taosArrayDestroy(tbUids);
660✔
1552
  }
1553

1554
_exit:
386✔
1555
  taosArrayDestroy(vAlterTbReq.pMultiTag);
1,082✔
1556
  tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
1,082!
1557
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
1,082✔
1558
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
1,082✔
1559
  if (tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp) != 0) {
1,082!
1560
    vError("vgId:%d, failed to encode alter table response", TD_VID(pVnode));
×
1561
  }
1562

1563
  tEncoderClear(&ec);
1,082✔
1564
  if (vMetaRsp.pSchemas) {
1,082✔
1565
    taosMemoryFree(vMetaRsp.pSchemas);
303!
1566
    taosMemoryFree(vMetaRsp.pSchemaExt);
303!
1567
  }
1568
  if (vMetaRsp.pColRefs) {
1,082!
UNCOV
1569
    taosMemoryFree(vMetaRsp.pColRefs);
×
1570
  }
1571
  return 0;
1,082✔
1572
}
1573

1574
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
2,623✔
1575
                                     SRpcMsg *pOriginRpc) {
1576
  SVDropTbBatchReq req = {0};
2,623✔
1577
  SVDropTbBatchRsp rsp = {0};
2,623✔
1578
  SDecoder         decoder = {0};
2,623✔
1579
  SEncoder         encoder = {0};
2,623✔
1580
  int32_t          ret;
1581
  SArray          *tbUids = NULL;
2,623✔
1582
  STbUidStore     *pStore = NULL;
2,623✔
1583
  SArray          *tbNames = NULL;
2,623✔
1584

1585
  pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
2,623✔
1586
  pRsp->pCont = NULL;
2,623✔
1587
  pRsp->contLen = 0;
2,623✔
1588
  pRsp->code = TSDB_CODE_SUCCESS;
2,623✔
1589

1590
  // decode req
1591
  tDecoderInit(&decoder, pReq, len);
2,623✔
1592
  ret = tDecodeSVDropTbBatchReq(&decoder, &req);
2,623✔
1593
  if (ret < 0) {
2,623!
UNCOV
1594
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1595
    pRsp->code = terrno;
×
UNCOV
1596
    goto _exit;
×
1597
  }
1598

1599
  // process req
1600
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
2,623✔
1601
  rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
2,623✔
1602
  tbNames = taosArrayInit(req.nReqs, sizeof(char *));
2,623✔
1603
  if (tbUids == NULL || rsp.pArray == NULL || tbNames == NULL) goto _exit;
2,623!
1604

1605
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
5,271✔
1606
    SVDropTbReq *pDropTbReq = req.pReqs + iReq;
2,648✔
1607
    SVDropTbRsp  dropTbRsp = {0};
2,648✔
1608
    tb_uid_t     tbUid = 0;
2,648✔
1609

1610
    /* code */
1611
    ret = metaDropTable2(pVnode->pMeta, ver, pDropTbReq);
2,648✔
1612
    if (ret < 0) {
2,648!
1613
      if (pDropTbReq->igNotExists && terrno == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
×
UNCOV
1614
        dropTbRsp.code = TSDB_CODE_SUCCESS;
×
1615
      } else {
UNCOV
1616
        dropTbRsp.code = terrno;
×
1617
      }
1618
    } else {
1619
      dropTbRsp.code = TSDB_CODE_SUCCESS;
2,648✔
1620
      if (tbUid > 0) {
2,648!
UNCOV
1621
        if (tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid) < 0) {
×
UNCOV
1622
          vError("vgId:%d, failed to fetch tbUid list", TD_VID(pVnode));
×
1623
        }
1624
      }
1625
    }
1626

1627
    if (taosArrayPush(rsp.pArray, &dropTbRsp) == NULL) {
5,296!
UNCOV
1628
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1629
      pRsp->code = terrno;
×
UNCOV
1630
      goto _exit;
×
1631
    }
1632

1633
    if (tsEnableAuditCreateTable) {
2,648!
1634
      char *str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
2,648!
1635
      if (str == NULL) {
2,648!
UNCOV
1636
        pRsp->code = terrno;
×
UNCOV
1637
        goto _exit;
×
1638
      }
1639
      tstrncpy(str, pDropTbReq->name, TSDB_TABLE_FNAME_LEN);
2,648✔
1640
      if (taosArrayPush(tbNames, &str) == NULL) {
2,648!
UNCOV
1641
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1642
        pRsp->code = terrno;
×
UNCOV
1643
        goto _exit;
×
1644
      }
1645
    }
1646
  }
1647

1648
  if (tqUpdateTbUidList(pVnode->pTq, tbUids, false) < 0) {
2,623!
UNCOV
1649
    vError("vgId:%d, failed to update tbUid list since %s", TD_VID(pVnode), tstrerror(terrno));
×
1650
  }
1651

1652
  if (tdUpdateTbUidList(pVnode->pSma, pStore, false) < 0) {
2,623!
UNCOV
1653
    goto _exit;
×
1654
  }
1655

1656
  if (tsEnableAuditCreateTable) {
2,623!
1657
    int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
2,623✔
1658

1659
    SName name = {0};
2,623✔
1660
    if (tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB) != 0) {
2,623!
UNCOV
1661
      vError("vgId:%d, failed to get name from string", TD_VID(pVnode));
×
1662
    }
1663

1664
    SStringBuilder sb = {0};
2,623✔
1665
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
5,271✔
1666
      char **key = (char **)taosArrayGet(tbNames, iReq);
2,648✔
1667
      taosStringBuilderAppendStringLen(&sb, *key, strlen(*key));
2,648✔
1668
      if (iReq < req.nReqs - 1) {
2,648✔
1669
        taosStringBuilderAppendChar(&sb, ',');
25✔
1670
      }
1671
      taosMemoryFreeClear(*key);
2,648!
1672
    }
1673

1674
    size_t len = 0;
2,623✔
1675
    char  *keyJoined = taosStringBuilderGetResult(&sb, &len);
2,623✔
1676

1677
    if (pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0) {
2,623!
1678
      auditAddRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len);
2,526✔
1679
    }
1680

1681
    taosStringBuilderDestroy(&sb);
2,623✔
1682
  }
1683

UNCOV
1684
_exit:
×
1685
  taosArrayDestroy(tbUids);
2,623✔
1686
  pStore = tdUidStoreFree(pStore);
2,623✔
1687
  tDecoderClear(&decoder);
2,623✔
1688
  tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
2,623!
1689
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
2,623✔
1690
  tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
2,623✔
1691
  if (tEncodeSVDropTbBatchRsp(&encoder, &rsp) != 0) {
2,623!
UNCOV
1692
    vError("vgId:%d, failed to encode drop table batch response", TD_VID(pVnode));
×
1693
  }
1694
  tEncoderClear(&encoder);
2,623✔
1695
  taosArrayDestroy(rsp.pArray);
2,623✔
1696
  taosArrayDestroy(tbNames);
2,623✔
1697
  return 0;
2,623✔
1698
}
1699

1700
#ifdef BUILD_NO_CALL
1701
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
1702
                                              const char *tags) {
1703
  SSubmitBlkIter blkIter = {0};
1704
  STSchema      *pSchema = NULL;
1705
  tb_uid_t       suid = 0;
1706
  STSRow        *row = NULL;
1707
  int32_t        rv = -1;
1708

1709
  tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
1710
  if (blkIter.row == NULL) return 0;
1711

1712
  int32_t code = metaGetTbTSchemaNotNull(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row), 1,
1713
                                         &pSchema);  // TODO: use the real schema
1714
  if (TSDB_CODE_SUCCESS != code) {
1715
    printf("%s:%d no valid schema\n", tags, __LINE__);
1716
    return code;
1717
  }
1718

1719
  suid = msgIter->suid;
1720
  rv = TD_ROW_SVER(blkIter.row);
1721

1722
  char __tags[128] = {0};
1723
  snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
1724
  while ((row = tGetSubmitBlkNext(&blkIter))) {
1725
    tdSRowPrint(row, pSchema, __tags);
1726
  }
1727

1728
  taosMemoryFreeClear(pSchema);
1729

1730
  return TSDB_CODE_SUCCESS;
1731
}
1732
#endif
1733
typedef struct SSubmitReqConvertCxt {
1734
  SSubmitMsgIter msgIter;
1735
  SSubmitBlk    *pBlock;
1736
  SSubmitBlkIter blkIter;
1737
  STSRow        *pRow;
1738
  STSRowIter     rowIter;
1739
  SSubmitTbData *pTbData;
1740
  STSchema      *pTbSchema;
1741
  SArray        *pColValues;
1742
} SSubmitReqConvertCxt;
1743

UNCOV
1744
static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
×
1745
  taosMemoryFreeClear(pCxt->pTbSchema);
×
1746
  int32_t code = metaGetTbTSchemaNotNull(pMeta, pCxt->msgIter.suid > 0 ? pCxt->msgIter.suid : pCxt->msgIter.uid,
×
1747
                                         pCxt->msgIter.sversion, 1, &pCxt->pTbSchema);
1748
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1749
    return code;
×
1750
  }
1751
  tdSTSRowIterInit(&pCxt->rowIter, pCxt->pTbSchema);
×
1752

1753
  tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
×
1754
  if (NULL == pCxt->pTbData) {
×
UNCOV
1755
    pCxt->pTbData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
×
UNCOV
1756
    if (NULL == pCxt->pTbData) {
×
1757
      return terrno;
×
1758
    }
1759
  }
1760
  pCxt->pTbData->flags = 0;
×
1761
  pCxt->pTbData->suid = pCxt->msgIter.suid;
×
1762
  pCxt->pTbData->uid = pCxt->msgIter.uid;
×
1763
  pCxt->pTbData->sver = pCxt->msgIter.sversion;
×
1764
  pCxt->pTbData->pCreateTbReq = NULL;
×
UNCOV
1765
  pCxt->pTbData->aRowP = taosArrayInit(128, POINTER_BYTES);
×
UNCOV
1766
  if (NULL == pCxt->pTbData->aRowP) {
×
1767
    return terrno;
×
1768
  }
1769

1770
  taosArrayDestroy(pCxt->pColValues);
×
UNCOV
1771
  pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal));
×
1772
  if (NULL == pCxt->pColValues) {
×
1773
    return terrno;
×
1774
  }
1775
  for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) {
×
UNCOV
1776
    SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type);
×
UNCOV
1777
    if (taosArrayPush(pCxt->pColValues, &val) == NULL) {
×
UNCOV
1778
      return terrno;
×
1779
    }
1780
  }
1781

1782
  return TSDB_CODE_SUCCESS;
×
1783
}
1784

1785
static void vnodeDestroySubmitReqConvertCxt(SSubmitReqConvertCxt *pCxt) {
×
1786
  taosMemoryFreeClear(pCxt->pTbSchema);
×
1787
  tDestroySubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
×
UNCOV
1788
  taosMemoryFreeClear(pCxt->pTbData);
×
1789
  taosArrayDestroy(pCxt->pColValues);
×
1790
}
×
1791

1792
static int32_t vnodeCellValConvertToColVal(STColumn *pCol, SCellVal *pCellVal, SColVal *pColVal) {
×
UNCOV
1793
  if (tdValTypeIsNone(pCellVal->valType)) {
×
UNCOV
1794
    pColVal->flag = CV_FLAG_NONE;
×
1795
    return TSDB_CODE_SUCCESS;
×
1796
  }
1797

UNCOV
1798
  if (tdValTypeIsNull(pCellVal->valType)) {
×
UNCOV
1799
    pColVal->flag = CV_FLAG_NULL;
×
1800
    return TSDB_CODE_SUCCESS;
×
1801
  }
1802

1803
  if (IS_VAR_DATA_TYPE(pCol->type)) {
×
1804
    pColVal->value.nData = varDataLen(pCellVal->val);
×
1805
    pColVal->value.pData = (uint8_t *)varDataVal(pCellVal->val);
×
1806
  } else if (TSDB_DATA_TYPE_FLOAT == pCol->type) {
×
1807
    float f = GET_FLOAT_VAL(pCellVal->val);
×
UNCOV
1808
    valueSetDatum(&pColVal->value, pCol->type, &f, sizeof(f));
×
1809
  } else if (TSDB_DATA_TYPE_DOUBLE == pCol->type) {
×
UNCOV
1810
    taosSetPInt64Aligned(&pColVal->value.val, (int64_t *)pCellVal->val);
×
1811
  } else {
1812
    valueSetDatum(&pColVal->value, pCol->type, pCellVal->val, tDataTypes[pCol->type].bytes);
×
1813
  }
1814

UNCOV
1815
  pColVal->flag = CV_FLAG_VALUE;
×
1816
  return TSDB_CODE_SUCCESS;
×
1817
}
1818

1819
static int32_t vnodeTSRowConvertToColValArray(SSubmitReqConvertCxt *pCxt) {
×
1820
  int32_t code = TSDB_CODE_SUCCESS;
×
1821
  tdSTSRowIterReset(&pCxt->rowIter, pCxt->pRow);
×
1822
  for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->pTbSchema->numOfCols; ++i) {
×
1823
    STColumn *pCol = pCxt->pTbSchema->columns + i;
×
UNCOV
1824
    SCellVal  cellVal = {0};
×
1825
    if (!tdSTSRowIterFetch(&pCxt->rowIter, pCol->colId, pCol->type, &cellVal)) {
×
UNCOV
1826
      break;
×
1827
    }
UNCOV
1828
    code = vnodeCellValConvertToColVal(pCol, &cellVal, (SColVal *)taosArrayGet(pCxt->pColValues, i));
×
1829
  }
1830
  return code;
×
1831
}
1832

UNCOV
1833
static int32_t vnodeDecodeCreateTbReq(SSubmitReqConvertCxt *pCxt) {
×
UNCOV
1834
  if (pCxt->msgIter.schemaLen <= 0) {
×
1835
    return TSDB_CODE_SUCCESS;
×
1836
  }
1837

UNCOV
1838
  pCxt->pTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
×
UNCOV
1839
  if (NULL == pCxt->pTbData->pCreateTbReq) {
×
1840
    return terrno;
×
1841
  }
1842

1843
  SDecoder decoder = {0};
×
UNCOV
1844
  tDecoderInit(&decoder, (uint8_t *)pCxt->pBlock->data, pCxt->msgIter.schemaLen);
×
1845
  int32_t code = tDecodeSVCreateTbReq(&decoder, pCxt->pTbData->pCreateTbReq);
×
UNCOV
1846
  tDecoderClear(&decoder);
×
1847

1848
  return code;
×
1849
}
1850

1851
static int32_t vnodeSubmitReqConvertToSubmitReq2(SVnode *pVnode, SSubmitReq *pReq, SSubmitReq2 *pReq2) {
×
UNCOV
1852
  pReq2->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
×
UNCOV
1853
  if (NULL == pReq2->aSubmitTbData) {
×
1854
    return terrno;
×
1855
  }
1856

1857
  SSubmitReqConvertCxt cxt = {0};
×
1858

1859
  int32_t code = tInitSubmitMsgIter(pReq, &cxt.msgIter);
×
1860
  while (TSDB_CODE_SUCCESS == code) {
×
1861
    code = tGetSubmitMsgNext(&cxt.msgIter, &cxt.pBlock);
×
UNCOV
1862
    if (TSDB_CODE_SUCCESS == code) {
×
1863
      if (NULL == cxt.pBlock) {
×
UNCOV
1864
        break;
×
1865
      }
1866
      code = vnodeResetTableCxt(pVnode->pMeta, &cxt);
×
1867
    }
1868
    if (TSDB_CODE_SUCCESS == code) {
×
1869
      code = tInitSubmitBlkIter(&cxt.msgIter, cxt.pBlock, &cxt.blkIter);
×
1870
    }
1871
    if (TSDB_CODE_SUCCESS == code) {
×
1872
      code = vnodeDecodeCreateTbReq(&cxt);
×
1873
    }
1874
    while (TSDB_CODE_SUCCESS == code && (cxt.pRow = tGetSubmitBlkNext(&cxt.blkIter)) != NULL) {
×
1875
      code = vnodeTSRowConvertToColValArray(&cxt);
×
UNCOV
1876
      if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1877
        SRow **pNewRow = taosArrayReserve(cxt.pTbData->aRowP, 1);
×
1878
        code = tRowBuild(cxt.pColValues, cxt.pTbSchema, pNewRow);
×
1879
      }
1880
    }
1881
    if (TSDB_CODE_SUCCESS == code) {
×
1882
      code = (NULL == taosArrayPush(pReq2->aSubmitTbData, cxt.pTbData) ? terrno : TSDB_CODE_SUCCESS);
×
1883
    }
UNCOV
1884
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1885
      taosMemoryFreeClear(cxt.pTbData);
×
1886
    }
1887
  }
1888

UNCOV
1889
  vnodeDestroySubmitReqConvertCxt(&cxt);
×
1890
  return code;
×
1891
}
1892

1893
static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
×
1894
  int32_t  code = TSDB_CODE_SUCCESS;
×
1895
  char    *pMsg = NULL;
×
1896
  uint32_t msglen = 0;
×
1897
  tEncodeSize(tEncodeSubmitReq, pSubmitReq, msglen, code);
×
1898
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1899
    pMsg = taosMemoryMalloc(msglen);
×
UNCOV
1900
    if (NULL == pMsg) {
×
1901
      code = terrno;
×
1902
    }
1903
  }
1904
  if (TSDB_CODE_SUCCESS == code) {
×
1905
    SEncoder encoder;
UNCOV
1906
    tEncoderInit(&encoder, (uint8_t *)pMsg, msglen);
×
1907
    code = tEncodeSubmitReq(&encoder, pSubmitReq);
×
1908
    tEncoderClear(&encoder);
×
1909
  }
1910
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1911
    *ppMsg = pMsg;
×
1912
  }
UNCOV
1913
  return code;
×
1914
}
1915

1916
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
11,128,305✔
1917
                                     SRpcMsg *pOriginalMsg) {
1918
  int32_t code = 0;
11,128,305✔
1919
  terrno = 0;
11,128,305✔
1920

1921
  SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
11,128,152✔
1922
  SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
11,128,152✔
1923
  SArray      *newTbUids = NULL;
11,128,152✔
1924
  int32_t      ret;
1925
  SEncoder     ec = {0};
11,128,152✔
1926

1927
  pRsp->code = TSDB_CODE_SUCCESS;
11,128,152✔
1928

1929
  void           *pAllocMsg = NULL;
11,128,152✔
1930
  SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq;
11,128,152✔
1931
  if (0 == pMsg->version) {
11,128,152!
UNCOV
1932
    code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq);
×
1933
    if (TSDB_CODE_SUCCESS == code) {
×
1934
      code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq);
×
1935
    }
1936
    if (TSDB_CODE_SUCCESS == code) {
×
1937
      pAllocMsg = pReq;
×
1938
    }
UNCOV
1939
    if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1940
      goto _exit;
×
1941
    }
1942
  } else {
1943
    // decode
1944
    pReq = POINTER_SHIFT(pReq, sizeof(SSubmitReq2Msg));
11,128,152✔
1945
    len -= sizeof(SSubmitReq2Msg);
11,128,152✔
1946
    SDecoder dc = {0};
11,128,152✔
1947
    tDecoderInit(&dc, pReq, len);
11,128,152✔
1948
    if (tDecodeSubmitReq(&dc, pSubmitReq, NULL) < 0) {
11,127,823!
UNCOV
1949
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1950
      goto _exit;
×
1951
    }
1952
    tDecoderClear(&dc);
11,127,653✔
1953
  }
1954

1955
  // scan
1956
  TSKEY now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
11,128,426✔
1957
  TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
11,127,788✔
1958
  TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
11,127,788✔
1959
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
22,940,882✔
1960
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
11,822,941✔
1961

1962
    if (pSubmitTbData->pCreateTbReq && pSubmitTbData->pCreateTbReq->uid == 0) {
11,822,860!
UNCOV
1963
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1964
      goto _exit;
×
1965
    }
1966

1967
    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
11,822,860✔
1968
      if (TARRAY_SIZE(pSubmitTbData->aCol) <= 0) {
3,832!
UNCOV
1969
        code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1970
        goto _exit;
×
1971
      }
1972

1973
      SColData *colDataArr = TARRAY_DATA(pSubmitTbData->aCol);
3,832✔
1974
      SRowKey   lastKey;
1975
      tColDataArrGetRowKey(colDataArr, TARRAY_SIZE(pSubmitTbData->aCol), 0, &lastKey);
3,832✔
1976
      for (int32_t iRow = 1; iRow < colDataArr[0].nVal; iRow++) {
11,846,274✔
1977
        SRowKey key;
1978
        tColDataArrGetRowKey(TARRAY_DATA(pSubmitTbData->aCol), TARRAY_SIZE(pSubmitTbData->aCol), iRow, &key);
11,843,786✔
1979
        if (tRowKeyCompare(&lastKey, &key) >= 0) {
11,843,185!
UNCOV
1980
          code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1981
          vError("vgId:%d %s failed 1 since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
×
UNCOV
1982
          goto _exit;
×
1983
        }
1984
      }
1985
    } else {
1986
      int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
11,819,028✔
1987
      SRow  **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
11,819,028✔
1988
      SRowKey lastRowKey;
1989
      for (int32_t iRow = 0; iRow < nRow; ++iRow) {
689,889,873✔
1990
#ifndef NO_UNALIGNED_ACCESS
1991
        if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey) {
678,079,267✔
1992
#else
1993
        TSKEY ts = taosGetInt64Aligned(&(aRow[iRow]->ts));
1994
        if (ts < minKey || ts > maxKey) {
1995
#endif
1996
          code = TSDB_CODE_INVALID_MSG;
46,095✔
1997
          vError("vgId:%d %s failed 2 since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);
46,095!
UNCOV
1998
          goto _exit;
×
1999
        }
2000
        if (iRow == 0) {
678,033,172✔
2001
          tRowGetKey(aRow[iRow], &lastRowKey);
23,638,232✔
2002
        } else {
2003
          SRowKey rowKey;
2004
          tRowGetKey(aRow[iRow], &rowKey);
1,332,428,112✔
2005

2006
          if (tRowKeyCompare(&lastRowKey, &rowKey) >= 0) {
666,208,891!
UNCOV
2007
            code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2008
            vError("vgId:%d %s failed 3 since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);
×
UNCOV
2009
            goto _exit;
×
2010
          }
2011
          lastRowKey = rowKey;
666,254,573✔
2012
        }
2013
      }
2014
    }
2015
  }
2016

2017
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
22,943,773✔
2018
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
11,825,130✔
2019

2020
    if (pSubmitTbData->pCreateTbReq) {
11,824,652✔
2021
      pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;
47,336✔
2022
    } else {
2023
      SMetaInfo info = {0};
11,777,316✔
2024

2025
      code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL);
11,777,316✔
2026
      if (code) {
11,778,782!
UNCOV
2027
        code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
UNCOV
2028
        vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), pSubmitTbData->uid);
×
UNCOV
2029
        goto _exit;
×
2030
      }
2031

2032
      if (info.suid != pSubmitTbData->suid) {
11,778,924!
UNCOV
2033
        code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2034
        goto _exit;
×
2035
      }
2036

2037
      if (info.suid) {
11,778,924✔
2038
        if (metaGetInfo(pVnode->pMeta, info.suid, &info, NULL) != 0) {
10,801,557!
UNCOV
2039
          vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), info.suid);
×
2040
        }
2041
      }
2042

2043
      if (pSubmitTbData->sver != info.skmVer) {
11,778,496!
UNCOV
2044
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
UNCOV
2045
        goto _exit;
×
2046
      }
2047
    }
2048

2049
    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
11,825,832✔
2050
      int32_t   nColData = TARRAY_SIZE(pSubmitTbData->aCol);
3,832✔
2051
      SColData *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
3,832✔
2052

2053
      if (nColData <= 0) {
3,832!
UNCOV
2054
        code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2055
        goto _exit;
×
2056
      }
2057

2058
      if (aColData[0].cid != PRIMARYKEY_TIMESTAMP_COL_ID || aColData[0].type != TSDB_DATA_TYPE_TIMESTAMP ||
3,832!
2059
          aColData[0].nVal <= 0) {
3,832!
UNCOV
2060
        code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2061
        goto _exit;
×
2062
      }
2063

2064
      for (int32_t j = 1; j < nColData; j++) {
11,758✔
2065
        if (aColData[j].nVal != aColData[0].nVal) {
7,926!
UNCOV
2066
          code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2067
          goto _exit;
×
2068
        }
2069
      }
2070
    }
2071
  }
2072

2073
  vDebug("vgId:%d, submit block size %d", TD_VID(pVnode), (int32_t)taosArrayGetSize(pSubmitReq->aSubmitTbData));
11,118,643✔
2074

2075
  // loop to handle
2076
  for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
22,952,426✔
2077
    SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
11,824,402✔
2078

2079
    // create table
2080
    if (pSubmitTbData->pCreateTbReq) {
11,824,068✔
2081
      // alloc if need
2082
      if (pSubmitRsp->aCreateTbRsp == NULL &&
47,330✔
2083
          (pSubmitRsp->aCreateTbRsp = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(SVCreateTbRsp))) ==
30,806!
2084
              NULL) {
UNCOV
2085
        code = terrno;
×
2086
        goto _exit;
8✔
2087
      }
2088

2089
      SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);
47,330✔
2090

2091
      // create table
2092
      if (metaCreateTable2(pVnode->pMeta, ver, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
47,329✔
2093
        // create table success
2094

2095
        if (newTbUids == NULL &&
65,787!
2096
            (newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) {
27,555✔
UNCOV
2097
          code = terrno;
×
UNCOV
2098
          goto _exit;
×
2099
        }
2100

2101
        if (taosArrayPush(newTbUids, &pSubmitTbData->uid) == NULL) {
76,464!
UNCOV
2102
          code = terrno;
×
UNCOV
2103
          goto _exit;
×
2104
        }
2105

2106
        if (pCreateTbRsp->pMeta) {
38,232!
2107
          vnodeUpdateMetaRsp(pVnode, pCreateTbRsp->pMeta);
38,232✔
2108
        }
2109
      } else {  // create table failed
2110
        if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
9,097✔
2111
          code = terrno;
8✔
2112
          vError("vgId:%d failed to create table:%s, code:%s", TD_VID(pVnode), pSubmitTbData->pCreateTbReq->name,
8!
2113
                 tstrerror(terrno));
2114
          goto _exit;
8✔
2115
        }
2116
        terrno = 0;
9,089✔
2117
        pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid;  // update uid if table exist for using below
9,089✔
2118
      }
2119
    }
2120

2121
    // insert data
2122
    int32_t affectedRows;
2123
    code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows);
11,824,059✔
2124
    if (code) goto _exit;
11,825,268!
2125

2126
    code = metaUpdateChangeTimeWithLock(pVnode->pMeta, pSubmitTbData->uid, pSubmitTbData->ctimeMs);
11,825,268✔
2127
    if (code) goto _exit;
11,824,058!
2128

2129
    pSubmitRsp->affectedRows += affectedRows;
11,824,058✔
2130
  }
2131

2132
  // update the affected table uid list
2133
  if (taosArrayGetSize(newTbUids) > 0) {
11,128,024✔
2134
    vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
27,554✔
2135
           (int32_t)taosArrayGetSize(newTbUids));
2136
    if (tqUpdateTbUidList(pVnode->pTq, newTbUids, true) != 0) {
27,554!
UNCOV
2137
      vError("vgId:%d, failed to update tbUid list", TD_VID(pVnode));
×
2138
    }
2139
  }
2140

2141
_exit:
11,128,048✔
2142
  // message
2143
  pRsp->code = code;
11,128,056✔
2144
  tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
11,128,056!
2145
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
11,127,495✔
2146
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
11,128,205✔
2147
  if (tEncodeSSubmitRsp2(&ec, pSubmitRsp) < 0) {
11,128,127!
UNCOV
2148
    vError("vgId:%d, failed to encode submit response", TD_VID(pVnode));
×
2149
  }
2150
  tEncoderClear(&ec);
11,128,079✔
2151

2152
  // update statistics
2153
  (void)atomic_add_fetch_64(&pVnode->statis.nInsert, pSubmitRsp->affectedRows);
11,128,350✔
2154
  (void)atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
11,128,499✔
2155
  (void)atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
11,128,531✔
2156

2157
  if (tsEnableMonitor && tsMonitorFqdn[0] != 0 && tsMonitorPort != 0 && pSubmitRsp->affectedRows > 0 &&
11,128,469!
2158
      strlen(pOriginalMsg->info.conn.user) > 0 && tsInsertCounter != NULL) {
×
UNCOV
2159
    const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS,
×
2160
                                   pVnode->monitor.strClusterId,
×
2161
                                   pVnode->monitor.strDnodeId,
×
2162
                                   tsLocalEp,
2163
                                   pVnode->monitor.strVgId,
×
UNCOV
2164
                                   pOriginalMsg->info.conn.user,
×
2165
                                   "Success"};
UNCOV
2166
    int         tv = taos_counter_add(tsInsertCounter, pSubmitRsp->affectedRows, sample_labels);
×
2167
  }
2168

2169
  if (code == 0) {
11,128,469!
2170
    (void)atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
11,128,472✔
2171
    code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len);
11,128,467✔
2172
  }
2173
  /*
2174
  if (code == 0) {
2175
    atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
2176
    code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len);
2177

2178
    const char *batch_sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT, pVnode->monitor.strClusterId,
2179
                                        pVnode->monitor.strDnodeId, tsLocalEp, pVnode->monitor.strVgId,
2180
                                          pOriginalMsg->info.conn.user, "Success"};
2181
    taos_counter_inc(pVnode->monitor.insertCounter, batch_sample_labels);
2182
  }
2183
  else{
2184
    const char *batch_sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT, pVnode->monitor.strClusterId,
2185
                                        pVnode->monitor.strDnodeId, tsLocalEp, pVnode->monitor.strVgId,
2186
                                        pOriginalMsg->info.conn.user, "Failed"};
2187
    taos_counter_inc(pVnode->monitor.insertCounter, batch_sample_labels);
2188
  }
2189
  */
2190

2191
  // clear
2192
  taosArrayDestroy(newTbUids);
11,128,193✔
2193
  tDestroySubmitReq(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE);
11,128,017!
2194
  tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
11,128,141✔
2195

2196
  if (code) terrno = code;
11,127,965✔
2197

2198
  taosMemoryFree(pAllocMsg);
11,127,965!
2199

2200
  return code;
11,127,852✔
2201
}
2202

UNCOV
2203
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
×
2204
#ifdef USE_TSMA
UNCOV
2205
  SVCreateTSmaReq req = {0};
×
UNCOV
2206
  SDecoder        coder = {0};
×
2207

UNCOV
2208
  if (pRsp) {
×
UNCOV
2209
    pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
×
UNCOV
2210
    pRsp->code = TSDB_CODE_SUCCESS;
×
UNCOV
2211
    pRsp->pCont = NULL;
×
UNCOV
2212
    pRsp->contLen = 0;
×
2213
  }
2214

2215
  // decode and process req
2216
  tDecoderInit(&coder, pReq, len);
×
2217

2218
  if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
×
UNCOV
2219
    terrno = TSDB_CODE_MSG_DECODE_ERROR;
×
UNCOV
2220
    if (pRsp) pRsp->code = terrno;
×
UNCOV
2221
    goto _err;
×
2222
  }
2223

UNCOV
2224
  if (tdProcessTSmaCreate(pVnode->pSma, ver, (const char *)&req) < 0) {
×
UNCOV
2225
    if (pRsp) pRsp->code = terrno;
×
UNCOV
2226
    goto _err;
×
2227
  }
2228

UNCOV
2229
  tDecoderClear(&coder);
×
UNCOV
2230
  vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
×
2231
         req.indexName, req.indexUid, ver, req.tableUid);
2232
  return 0;
×
2233

UNCOV
2234
_err:
×
2235
  tDecoderClear(&coder);
×
UNCOV
2236
  vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
×
2237
         TD_VID(pVnode), req.indexName, req.indexUid, ver, req.tableUid, terrstr());
UNCOV
2238
  return terrno;
×
2239
#else
2240
  return TSDB_CODE_INTERNAL_ERROR;
2241
#endif
2242
}
2243

2244
/**
2245
 * @brief specific for smaDstVnode
2246
 *
2247
 * @param pVnode
2248
 * @param pCont
2249
 * @param contLen
2250
 * @return int32_t
2251
 */
UNCOV
2252
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
×
UNCOV
2253
  return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
×
2254
}
2255

2256
static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
28✔
2257
  int32_t code = TSDB_CODE_SUCCESS;
28✔
2258

2259
  vInfo("vgId:%d, trim meta of tables per hash range [%" PRIu32 ", %" PRIu32 "]. apply-index:%" PRId64, TD_VID(pVnode),
28!
2260
        pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
2261

2262
  // TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
2263
  code = metaTrimTables(pVnode->pMeta, ver);
28✔
2264

2265
  return code;
28✔
2266
}
2267

2268
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
7,997✔
2269
  vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confirm msg is processed", TD_VID(pVnode));
7,997!
2270
  int32_t code = TSDB_CODE_SUCCESS;
7,997✔
2271
  if (!pVnode->config.hashChange) {
7,997✔
2272
    goto _exit;
7,969✔
2273
  }
2274

2275
  code = vnodeConsolidateAlterHashRange(pVnode, ver);
28✔
2276
  if (code < 0) {
28!
UNCOV
2277
    vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), ver);
×
UNCOV
2278
    goto _exit;
×
2279
  }
2280
  pVnode->config.hashChange = false;
28✔
2281

2282
_exit:
7,997✔
2283
  pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
7,997✔
2284
  pRsp->code = code;
7,997✔
2285
  pRsp->pCont = NULL;
7,997✔
2286
  pRsp->contLen = 0;
7,997✔
2287

2288
  return code;
7,997✔
2289
}
2290

2291
extern int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb);
2292
extern void    tsdbEnableBgTask(STsdb *pTsdb);
2293

2294
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
616✔
2295
  bool walChanged = false;
616✔
2296
  bool tsdbChanged = false;
616✔
2297

2298
  SAlterVnodeConfigReq req = {0};
616✔
2299
  if (tDeserializeSAlterVnodeConfigReq(pReq, len, &req) != 0) {
616!
UNCOV
2300
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
2301
    return TSDB_CODE_INVALID_MSG;
×
2302
  }
2303

2304
  vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
619!
2305
        " cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d keepTimeOffset:%d s3KeepLocal:%d "
2306
        "s3Compact:%d fsync:%d level:%d "
2307
        "walRetentionPeriod:%d walRetentionSize:%d",
2308
        TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
2309
        req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
2310
        req.keepTimeOffset, req.s3KeepLocal, req.s3Compact, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod,
2311
        req.walRetentionSize);
2312

2313
  if (pVnode->config.cacheLastSize != req.cacheLastSize) {
619✔
2314
    pVnode->config.cacheLastSize = req.cacheLastSize;
224✔
2315
    tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
224✔
2316
  }
2317

2318
  if (pVnode->config.szBuf != req.buffer * 1024LL * 1024LL) {
619!
UNCOV
2319
    vInfo("vgId:%d, vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
×
2320
          (uint64_t)(req.buffer * 1024LL * 1024LL));
UNCOV
2321
    pVnode->config.szBuf = req.buffer * 1024LL * 1024LL;
×
2322
  }
2323

2324
  if (pVnode->config.szCache != req.pages) {
619!
2325
    if ((terrno = metaAlterCache(pVnode->pMeta, req.pages)) < 0) {
×
UNCOV
2326
      vError("vgId:%d, failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
×
2327
             pVnode->config.szCache, req.pages, tstrerror(terrno));
2328
      return terrno;
×
2329
    } else {
UNCOV
2330
      vInfo("vgId:%d, vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, req.pages);
×
UNCOV
2331
      pVnode->config.szCache = req.pages;
×
2332
    }
2333
  }
2334

2335
  if (pVnode->config.cacheLast != req.cacheLast) {
619✔
2336
    pVnode->config.cacheLast = req.cacheLast;
338✔
2337
  }
2338

2339
  if (pVnode->config.walCfg.fsyncPeriod != req.walFsyncPeriod) {
619✔
2340
    pVnode->config.walCfg.fsyncPeriod = req.walFsyncPeriod;
54✔
2341
    walChanged = true;
54✔
2342
  }
2343

2344
  if (pVnode->config.walCfg.level != req.walLevel) {
619✔
2345
    if (pVnode->config.walCfg.level == 0) {
36!
UNCOV
2346
      pVnode->config.walCfg.clearFiles = 1;
×
2347
    }
2348
    pVnode->config.walCfg.level = req.walLevel;
36✔
2349
    walChanged = true;
36✔
2350
  }
2351

2352
  if (pVnode->config.walCfg.retentionPeriod != req.walRetentionPeriod) {
619✔
2353
    pVnode->config.walCfg.retentionPeriod = req.walRetentionPeriod;
132✔
2354
    walChanged = true;
132✔
2355
  }
2356

2357
  if (pVnode->config.walCfg.retentionSize != req.walRetentionSize) {
619!
UNCOV
2358
    pVnode->config.walCfg.retentionSize = req.walRetentionSize;
×
UNCOV
2359
    walChanged = true;
×
2360
  }
2361

2362
  if (pVnode->config.tsdbCfg.keep0 != req.daysToKeep0) {
619✔
2363
    pVnode->config.tsdbCfg.keep0 = req.daysToKeep0;
54✔
2364
    if (!VND_IS_RSMA(pVnode)) {
54!
2365
      tsdbChanged = true;
54✔
2366
    }
2367
  }
2368

2369
  if (pVnode->config.tsdbCfg.keep1 != req.daysToKeep1) {
619✔
2370
    pVnode->config.tsdbCfg.keep1 = req.daysToKeep1;
54✔
2371
    if (!VND_IS_RSMA(pVnode)) {
54!
2372
      tsdbChanged = true;
54✔
2373
    }
2374
  }
2375

2376
  if (pVnode->config.tsdbCfg.keep2 != req.daysToKeep2) {
619✔
2377
    pVnode->config.tsdbCfg.keep2 = req.daysToKeep2;
56✔
2378
    if (!VND_IS_RSMA(pVnode)) {
56!
2379
      tsdbChanged = true;
56✔
2380
    }
2381
  }
2382

2383
  if (pVnode->config.tsdbCfg.keepTimeOffset != req.keepTimeOffset) {
619✔
2384
    pVnode->config.tsdbCfg.keepTimeOffset = req.keepTimeOffset;
2✔
2385
    if (!VND_IS_RSMA(pVnode)) {
2!
2386
      tsdbChanged = true;
2✔
2387
    }
2388
  }
2389

2390
  if (req.sttTrigger != -1 && req.sttTrigger != pVnode->config.sttTrigger) {
619!
2391
    if (req.sttTrigger > 1 && pVnode->config.sttTrigger > 1) {
×
UNCOV
2392
      pVnode->config.sttTrigger = req.sttTrigger;
×
2393
    } else {
2394
      vnodeAWait(&pVnode->commitTask);
×
2395

UNCOV
2396
      int32_t ret = tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
×
UNCOV
2397
      if (ret != 0) {
×
2398
        vError("vgId:%d, failed to disable bg task since %s", TD_VID(pVnode), tstrerror(ERRNO));
×
2399
      }
2400

UNCOV
2401
      pVnode->config.sttTrigger = req.sttTrigger;
×
UNCOV
2402
      tsdbEnableBgTask(pVnode->pTsdb);
×
2403
    }
2404
  }
2405

2406
  if (req.minRows != -1 && req.minRows != pVnode->config.tsdbCfg.minRows) {
619!
UNCOV
2407
    pVnode->config.tsdbCfg.minRows = req.minRows;
×
2408
  }
2409

2410
  if (req.s3KeepLocal != -1 && req.s3KeepLocal != pVnode->config.s3KeepLocal) {
619!
2411
    pVnode->config.s3KeepLocal = req.s3KeepLocal;
×
2412
  }
2413
  if (req.s3Compact != -1 && req.s3Compact != pVnode->config.s3Compact) {
619!
UNCOV
2414
    pVnode->config.s3Compact = req.s3Compact;
×
2415
  }
2416

2417
  if (walChanged) {
619✔
2418
    if (walAlter(pVnode->pWal, &pVnode->config.walCfg) != 0) {
222!
UNCOV
2419
      vError("vgId:%d, failed to alter wal config since %s", TD_VID(pVnode), tstrerror(ERRNO));
×
2420
    }
2421
  }
2422

2423
  if (tsdbChanged) {
620✔
2424
    tsdbSetKeepCfg(pVnode->pTsdb, &pVnode->config.tsdbCfg);
58✔
2425
  }
2426

2427
  return 0;
620✔
2428
}
2429

2430
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
2,663✔
2431
  SBatchDeleteReq deleteReq;
2432
  SDecoder        decoder;
2433
  tDecoderInit(&decoder, pReq, len);
2,663✔
2434
  if (tDecodeSBatchDeleteReq(&decoder, &deleteReq) < 0) {
2,657!
UNCOV
2435
    tDecoderClear(&decoder);
×
UNCOV
2436
    return terrno = TSDB_CODE_INVALID_MSG;
×
2437
  }
2438

2439
  SMetaReader mr = {0};
2,663✔
2440
  metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
2,663✔
2441
  STsdb *pTsdb = pVnode->pTsdb;
2,662✔
2442

2443
  if (deleteReq.level) {
2,662✔
2444
    pTsdb = deleteReq.level == 1 ? VND_RSMA1(pVnode) : VND_RSMA2(pVnode);
16✔
2445
  }
2446

2447
  int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
2,662✔
2448
  for (int32_t i = 0; i < sz; i++) {
6,317✔
2449
    SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i);
3,656✔
2450
    char             *name = pOneReq->tbname;
3,651✔
2451
    if (metaGetTableEntryByName(&mr, name) < 0) {
3,651✔
2452
      vDebug("vgId:%d, stream delete msg, skip since no table: %s", pVnode->config.vgId, name);
1,867✔
2453
      continue;
1,867✔
2454
    }
2455

2456
    int64_t uid = mr.me.uid;
1,787✔
2457

2458
    int32_t code = tsdbDeleteTableData(pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
1,787✔
2459
    if (code < 0) {
1,789!
UNCOV
2460
      terrno = code;
×
UNCOV
2461
      vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
×
2462
             TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
2463
    }
2464

2465
    if (deleteReq.level == 0) {
1,789✔
2466
      code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs);
1,173✔
2467
      if (code < 0) {
1,172!
UNCOV
2468
        terrno = code;
×
UNCOV
2469
        vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64
×
2470
               ", end ts:%" PRId64,
2471
               TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
2472
      }
2473
    }
2474
    tDecoderClear(&mr.coder);
1,788✔
2475
  }
2476
  metaReaderClear(&mr);
2,661✔
2477
  taosArrayDestroy(deleteReq.deleteReqs);
2,656✔
2478
  return 0;
2,663✔
2479
}
2480

2481
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
68,438✔
2482
                                     SRpcMsg *pOriginalMsg) {
2483
  int32_t     code = 0;
68,438✔
2484
  SDecoder   *pCoder = &(SDecoder){0};
68,438✔
2485
  SDeleteRes *pRes = &(SDeleteRes){0};
68,438✔
2486

2487
  pRsp->msgType = TDMT_VND_DELETE_RSP;
68,438✔
2488
  pRsp->pCont = NULL;
68,438✔
2489
  pRsp->contLen = 0;
68,438✔
2490
  pRsp->code = TSDB_CODE_SUCCESS;
68,438✔
2491

2492
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
68,438✔
2493
  if (pRes->uidList == NULL) {
68,438!
UNCOV
2494
    code = terrno;
×
UNCOV
2495
    goto _err;
×
2496
  }
2497

2498
  tDecoderInit(pCoder, pReq, len);
68,438✔
2499
  code = tDecodeDeleteRes(pCoder, pRes);
68,438✔
2500
  if (code) goto _err;
68,437!
2501

2502
  if (pRes->affectedRows > 0) {
68,437✔
2503
    for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
129,016✔
2504
      uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid);
65,401✔
2505
      code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey);
65,401✔
2506
      if (code) goto _err;
65,400!
2507
      code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, pRes->ctimeMs);
65,400✔
2508
      if (code) goto _err;
65,397!
2509
    }
2510
  }
2511

2512
  code = tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len);
68,433✔
2513

2514
  tDecoderClear(pCoder);
68,431✔
2515
  taosArrayDestroy(pRes->uidList);
68,431✔
2516

2517
  SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows};
68,437✔
2518
  int32_t     ret = 0;
68,437✔
2519
  tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret);
68,437!
2520
  pRsp->pCont = rpcMallocCont(pRsp->contLen);
68,434✔
2521
  SEncoder ec = {0};
68,435✔
2522
  tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
68,435✔
2523
  code = tEncodeSVDeleteRsp(&ec, &rsp);
68,436✔
2524
  if (code) goto _err;
68,437!
2525
  tEncoderClear(&ec);
68,437✔
2526
  return code;
68,436✔
2527

UNCOV
2528
_err:
×
2529
  /*
2530
  if(code == TSDB_CODE_SUCCESS){
2531
    const char *batch_sample_labels[] = {VNODE_METRIC_TAG_VALUE_DELETE, pVnode->monitor.strClusterId,
2532
                                        pVnode->monitor.strDnodeId, tsLocalEp, pVnode->monitor.strVgId,
2533
                                        pOriginalMsg->info.conn.user, "Success"};
2534
    taos_counter_inc(pVnode->monitor.insertCounter, batch_sample_labels);
2535
  }
2536
  else{
2537
    const char *batch_sample_labels[] = {VNODE_METRIC_TAG_VALUE_DELETE, pVnode->monitor.strClusterId,
2538
                                        pVnode->monitor.strDnodeId, tsLocalEp, pVnode->monitor.strVgId,
2539
                                        pOriginalMsg->info.conn.user, "Failed"};
2540
    taos_counter_inc(pVnode->monitor.insertCounter, batch_sample_labels);
2541
  }
2542
  */
2543

UNCOV
2544
  return code;
×
2545
}
2546
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
945✔
2547
  SVCreateStbReq req = {0};
945✔
2548
  SDecoder       dc = {0};
945✔
2549
  int32_t        code = 0;
945✔
2550

2551
  pRsp->msgType = TDMT_VND_CREATE_INDEX_RSP;
945✔
2552
  pRsp->code = TSDB_CODE_SUCCESS;
945✔
2553
  pRsp->pCont = NULL;
945✔
2554
  pRsp->contLen = 0;
945✔
2555

2556
  tDecoderInit(&dc, pReq, len);
945✔
2557
  // decode req
2558
  if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
945!
UNCOV
2559
    tDecoderClear(&dc);
×
UNCOV
2560
    return terrno = TSDB_CODE_INVALID_MSG;
×
2561
  }
2562

2563
  code = metaAddIndexToSuperTable(pVnode->pMeta, ver, &req);
945✔
2564
  if (code) {
945!
UNCOV
2565
    pRsp->code = code;
×
UNCOV
2566
    goto _err;
×
2567
  }
2568
  tDecoderClear(&dc);
945✔
2569
  return 0;
945✔
2570

UNCOV
2571
_err:
×
UNCOV
2572
  tDecoderClear(&dc);
×
UNCOV
2573
  return code;
×
2574
}
2575
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
2,130✔
2576
  SDropIndexReq req = {0};
2,130✔
2577
  int32_t       code = 0;
2,130✔
2578
  pRsp->msgType = TDMT_VND_DROP_INDEX_RSP;
2,130✔
2579
  pRsp->code = TSDB_CODE_SUCCESS;
2,130✔
2580
  pRsp->pCont = NULL;
2,130✔
2581
  pRsp->contLen = 0;
2,130✔
2582

2583
  if ((code = tDeserializeSDropIdxReq(pReq, len, &req))) {
2,130!
UNCOV
2584
    pRsp->code = code;
×
UNCOV
2585
    return code;
×
2586
  }
2587

2588
  code = metaDropIndexFromSuperTable(pVnode->pMeta, ver, &req);
2,130✔
2589
  if (code) {
2,130!
UNCOV
2590
    pRsp->code = code;
×
UNCOV
2591
    return code;
×
2592
  }
2593
  return TSDB_CODE_SUCCESS;
2,130✔
2594
}
2595

2596
extern int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
2597

2598
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
203✔
2599
  if (!pVnode->restored) {
203✔
2600
    vInfo("vgId:%d, ignore compact req during restoring. ver:%" PRId64, TD_VID(pVnode), ver);
173!
2601
    return 0;
173✔
2602
  }
2603
  return vnodeAsyncCompact(pVnode, ver, pReq, len, pRsp);
30✔
2604
}
2605

UNCOV
2606
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
×
UNCOV
2607
  if (syncCheckMember(pVnode->sync) != 0) {
×
2608
    vError("vgId:%d, failed to check member", TD_VID(pVnode));
×
2609
  }
2610

2611
  pRsp->msgType = TDMT_SYNC_CONFIG_CHANGE_RSP;
×
UNCOV
2612
  pRsp->code = TSDB_CODE_SUCCESS;
×
2613
  pRsp->pCont = NULL;
×
UNCOV
2614
  pRsp->contLen = 0;
×
2615

UNCOV
2616
  return 0;
×
2617
}
2618

2619
static int32_t vnodeCheckToken(SVnode *pVnode, char *member0Token, char *member1Token) {
31✔
2620
  SSyncState syncState = syncGetState(pVnode->sync);
31✔
2621
  if (syncState.state != TAOS_SYNC_STATE_LEADER) {
31✔
2622
    return terrno = TSDB_CODE_SYN_NOT_LEADER;
9✔
2623
  }
2624

2625
  char token[TSDB_ARB_TOKEN_SIZE] = {0};
22✔
2626
  if (vnodeGetArbToken(pVnode, token) != 0) {
22!
UNCOV
2627
    return terrno = TSDB_CODE_NOT_FOUND;
×
2628
  }
2629

2630
  if (strncmp(token, member0Token, TSDB_ARB_TOKEN_SIZE) != 0 &&
22✔
2631
      strncmp(token, member1Token, TSDB_ARB_TOKEN_SIZE) != 0) {
18✔
2632
    return terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
14✔
2633
  }
2634

2635
  terrno = TSDB_CODE_SUCCESS;
8✔
2636
  return 0;
8✔
2637
}
2638

2639
static int32_t vnodeCheckSyncd(SVnode *pVnode, char *member0Token, char *member1Token) {
9✔
2640
  int32_t code = vnodeCheckToken(pVnode, member0Token, member1Token);
9✔
2641
  if (code != 0) {
9✔
2642
    return code;
5✔
2643
  }
2644

2645
  return syncCheckSynced(pVnode->sync);
4✔
2646
}
2647

2648
static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
9✔
2649
  int32_t code = 0;
9✔
2650

2651
  SVArbCheckSyncReq syncReq = {0};
9✔
2652

2653
  code = tDeserializeSVArbCheckSyncReq(pReq, len, &syncReq);
9✔
2654
  if (code) {
9!
UNCOV
2655
    return terrno = code;
×
2656
  }
2657

2658
  pRsp->msgType = TDMT_VND_ARB_CHECK_SYNC_RSP;
9✔
2659
  pRsp->code = TSDB_CODE_SUCCESS;
9✔
2660
  pRsp->pCont = NULL;
9✔
2661
  pRsp->contLen = 0;
9✔
2662

2663
  SVArbCheckSyncRsp syncRsp = {0};
9✔
2664
  syncRsp.arbToken = syncReq.arbToken;
9✔
2665
  syncRsp.member0Token = syncReq.member0Token;
9✔
2666
  syncRsp.member1Token = syncReq.member1Token;
9✔
2667
  syncRsp.vgId = TD_VID(pVnode);
9✔
2668

2669
  if (vnodeCheckSyncd(pVnode, syncReq.member0Token, syncReq.member1Token) != 0) {
9✔
2670
    vError("vgId:%d, failed to check assigned log syncd", TD_VID(pVnode));
5!
2671
  }
2672
  syncRsp.errCode = terrno;
9✔
2673

2674
  if (vnodeUpdateArbTerm(pVnode, syncReq.arbTerm) != 0) {
9!
UNCOV
2675
    vError("vgId:%d, failed to update arb term", TD_VID(pVnode));
×
UNCOV
2676
    code = -1;
×
UNCOV
2677
    goto _OVER;
×
2678
  }
2679

2680
  int32_t contLen = tSerializeSVArbCheckSyncRsp(NULL, 0, &syncRsp);
9✔
2681
  if (contLen <= 0) {
9!
UNCOV
2682
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2683
    code = -1;
×
UNCOV
2684
    goto _OVER;
×
2685
  }
2686
  void *pHead = rpcMallocCont(contLen);
9✔
2687
  if (!pHead) {
9!
UNCOV
2688
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2689
    code = -1;
×
UNCOV
2690
    goto _OVER;
×
2691
  }
2692

2693
  if (tSerializeSVArbCheckSyncRsp(pHead, contLen, &syncRsp) <= 0) {
9!
2694
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
2695
    rpcFreeCont(pHead);
×
UNCOV
2696
    code = -1;
×
UNCOV
2697
    goto _OVER;
×
2698
  }
2699

2700
  pRsp->pCont = pHead;
9✔
2701
  pRsp->contLen = contLen;
9✔
2702

2703
  terrno = TSDB_CODE_SUCCESS;
9✔
2704

2705
_OVER:
9✔
2706
  tFreeSVArbCheckSyncReq(&syncReq);
9✔
2707
  return code;
9✔
2708
}
2709

2710
#ifndef TD_ENTERPRISE
2711
int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; }
2712
int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; }
2713
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc