• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.89
/source/dnode/mnode/impl/src/mndStb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndStb.h"
18
#include "audit.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndIndex.h"
22
#include "mndIndexComm.h"
23
#include "mndInfoSchema.h"
24
#include "mndMnode.h"
25
#include "mndPerfSchema.h"
26
#include "mndPrivilege.h"
27
#include "mndScheduler.h"
28
#include "mndShow.h"
29
#include "mndSma.h"
30
#include "mndTopic.h"
31
#include "mndTrans.h"
32
#include "mndUser.h"
33
#include "mndVgroup.h"
34
#include "tname.h"
35

36
#define STB_VER_NUMBER   2
37
#define STB_RESERVE_SIZE 64
38

39
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
40
static int32_t  mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
41
static int32_t  mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
42
static int32_t  mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
43
static int32_t  mndProcessTtlTimer(SRpcMsg *pReq);
44
static int32_t  mndProcessTrimDbTimer(SRpcMsg *pReq);
45
static int32_t  mndProcessS3MigrateDbTimer(SRpcMsg *pReq);
46
static int32_t  mndProcessS3MigrateDbRsp(SRpcMsg *pReq);
47
static int32_t  mndProcessCreateStbReq(SRpcMsg *pReq);
48
static int32_t  mndProcessAlterStbReq(SRpcMsg *pReq);
49
static int32_t  mndProcessDropStbReq(SRpcMsg *pReq);
50
static int32_t  mndProcessDropTtltbRsp(SRpcMsg *pReq);
51
static int32_t  mndProcessTrimDbRsp(SRpcMsg *pReq);
52
static int32_t  mndProcessTableMetaReq(SRpcMsg *pReq);
53
static int32_t  mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
54
static int32_t  mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
55
static void     mndCancelGetNextStb(SMnode *pMnode, void *pIter);
56
static int32_t  mndProcessTableCfgReq(SRpcMsg *pReq);
57
static int32_t  mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
58
                               void *alterOriData, int32_t alterOriDataLen);
59
static int32_t  mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
60
                                              void *alterOriData, int32_t alterOriDataLen, const SMAlterStbReq *pAlter);
61

62
static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq);
63
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq);
64

65
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq);
66
static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq);
67
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pReq);
68

69
int32_t mndInitStb(SMnode *pMnode) {
1,309✔
70
  SSdbTable table = {
1,309✔
71
      .sdbType = SDB_STB,
72
      .keyType = SDB_KEY_BINARY,
73
      .encodeFp = (SdbEncodeFp)mndStbActionEncode,
74
      .decodeFp = (SdbDecodeFp)mndStbActionDecode,
75
      .insertFp = (SdbInsertFp)mndStbActionInsert,
76
      .updateFp = (SdbUpdateFp)mndStbActionUpdate,
77
      .deleteFp = (SdbDeleteFp)mndStbActionDelete,
78
  };
79

80
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessCreateStbReq);
1,309✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq);
1,309✔
82
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq);
1,309✔
83
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp);
1,309✔
84
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbRsp);
1,309✔
85
  mndSetMsgHandle(pMnode, TDMT_VND_TRIM_RSP, mndProcessTrimDbRsp);
1,309✔
86
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
1,309✔
87
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
1,309✔
88
  mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
1,309✔
89
  mndSetMsgHandle(pMnode, TDMT_MND_TTL_TIMER, mndProcessTtlTimer);
1,309✔
90
  mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB_TIMER, mndProcessTrimDbTimer);
1,309✔
91
  mndSetMsgHandle(pMnode, TDMT_VND_S3MIGRATE_RSP, mndProcessS3MigrateDbRsp);
1,309✔
92
  mndSetMsgHandle(pMnode, TDMT_MND_S3MIGRATE_DB_TIMER, mndProcessS3MigrateDbTimer);
1,309✔
93
  mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
1,309✔
94
  mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP, mndProcessDropStbReqFromMNode);
1,309✔
95
  mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP_RSP, mndTransProcessRsp);
1,309✔
96
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma);
1,309✔
97
  mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs);
1,309✔
98
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TABLE_RSP, mndTransProcessRsp);
1,309✔
99
  //  mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
100

101
  // mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq);
102
  // mndSetMsgHandle(pMnode, TDMT_MND_DROP_INDEX, mndProcessDropIndexReq);
103
  // mndSetMsgHandle(pMnode, TDMT_VND_CREATE_INDEX_RSP, mndTransProcessRsp);
104
  // mndSetMsgHandle(pMnode, TDMT_VND_DROP_INDEX_RSP, mndTransProcessRsp);
105

106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
1,309✔
107
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb);
1,309✔
108

109
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COL, mndRetrieveStbCol);
1,309✔
110
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_COL, mndCancelGetNextStb);
1,309✔
111

112
  return sdbSetTable(pMnode->pSdb, table);
1,309✔
113
}
114

115
void mndCleanupStb(SMnode *pMnode) {}
1,308✔
116

117
SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
10,016✔
118
  int32_t code = 0;
10,016✔
119
  int32_t lino = 0;
10,016✔
120
  terrno = TSDB_CODE_OUT_OF_MEMORY;
10,016✔
121

122
  int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + pStb->commentLen +
20,032✔
123
                 pStb->ast1Len + pStb->ast2Len + pStb->numOfColumns * sizeof(SColCmpr) + STB_RESERVE_SIZE +
20,032✔
124
                 taosArrayGetSize(pStb->pFuncs) * TSDB_FUNC_NAME_LEN;
10,016✔
125
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
10,016✔
126
  if (pRaw == NULL) goto _OVER;
10,016!
127

128
  int32_t dataPos = 0;
10,016✔
129
  SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN, _OVER)
10,016!
130
  SDB_SET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN, _OVER)
10,016!
131
  SDB_SET_INT64(pRaw, dataPos, pStb->createdTime, _OVER)
10,016!
132
  SDB_SET_INT64(pRaw, dataPos, pStb->updateTime, _OVER)
10,016!
133
  SDB_SET_INT64(pRaw, dataPos, pStb->uid, _OVER)
10,016!
134
  SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, _OVER)
10,016!
135
  SDB_SET_INT32(pRaw, dataPos, pStb->tagVer, _OVER)
10,016!
136
  SDB_SET_INT32(pRaw, dataPos, pStb->colVer, _OVER)
10,016!
137
  SDB_SET_INT32(pRaw, dataPos, pStb->smaVer, _OVER)
10,016!
138
  SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER)
10,016!
139
  SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[0], _OVER)
10,016!
140
  SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[1], _OVER)
10,016!
141
  SDB_SET_INT64(pRaw, dataPos, pStb->watermark[0], _OVER)
10,016!
142
  SDB_SET_INT64(pRaw, dataPos, pStb->watermark[1], _OVER)
10,016!
143
  SDB_SET_INT32(pRaw, dataPos, pStb->ttl, _OVER)
10,016!
144
  SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, _OVER)
10,016!
145
  SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER)
10,016!
146
  SDB_SET_INT32(pRaw, dataPos, pStb->numOfFuncs, _OVER)
10,016!
147
  SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, _OVER)
10,016!
148
  SDB_SET_INT32(pRaw, dataPos, pStb->ast1Len, _OVER)
10,016!
149
  SDB_SET_INT32(pRaw, dataPos, pStb->ast2Len, _OVER)
10,016!
150

151
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
138,078✔
152
    SSchema *pSchema = &pStb->pColumns[i];
128,062✔
153
    SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
128,062!
154
    SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
128,062!
155
    SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
128,062!
156
    SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
128,062!
157
    SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
128,062!
158
  }
159

160
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
38,869✔
161
    SSchema *pSchema = &pStb->pTags[i];
28,853✔
162
    SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
28,853!
163
    SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
28,853!
164
    SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
28,853!
165
    SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
28,853!
166
    SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
28,853!
167
  }
168

169
  for (int32_t i = 0; i < pStb->numOfFuncs; ++i) {
10,022✔
170
    char *func = taosArrayGet(pStb->pFuncs, i);
6✔
171
    SDB_SET_BINARY(pRaw, dataPos, func, TSDB_FUNC_NAME_LEN, _OVER)
6!
172
  }
173

174
  if (pStb->commentLen > 0) {
10,016✔
175
    SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen + 1, _OVER)
143!
176
  }
177

178
  if (pStb->ast1Len > 0) {
10,016✔
179
    SDB_SET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
6!
180
  }
181

182
  if (pStb->ast2Len > 0) {
10,016✔
183
    SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
6!
184
  }
185

186
  if (pStb->pCmpr != NULL) {
10,016!
187
    for (int i = 0; i < pStb->numOfColumns; i++) {
138,078✔
188
      SColCmpr *p = &pStb->pCmpr[i];
128,062✔
189
      SDB_SET_INT16(pRaw, dataPos, p->id, _OVER)
128,062!
190
      SDB_SET_INT32(pRaw, dataPos, p->alg, _OVER)
128,062!
191
    }
192
  }
193
  SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
10,016!
194
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
10,016!
195

196
  terrno = 0;
10,016✔
197

198
_OVER:
10,016✔
199
  if (terrno != 0) {
10,016!
200
    mError("stb:%s, failed to encode to raw:%p since %s", pStb->name, pRaw, terrstr());
×
201
    sdbFreeRaw(pRaw);
×
202
    return NULL;
×
203
  }
204

205
  mTrace("stb:%s, encode to raw:%p, row:%p", pStb->name, pRaw, pStb);
10,016✔
206
  return pRaw;
10,016✔
207
}
208

209
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
8,161✔
210
  int32_t code = 0;
8,161✔
211
  int32_t lino = 0;
8,161✔
212
  terrno = TSDB_CODE_OUT_OF_MEMORY;
8,161✔
213
  SSdbRow *pRow = NULL;
8,161✔
214
  SStbObj *pStb = NULL;
8,161✔
215

216
  int8_t sver = 0;
8,161✔
217
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
8,161!
218

219
  if (sver > STB_VER_NUMBER) {
8,161!
220
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
221
    goto _OVER;
×
222
  }
223

224
  pRow = sdbAllocRow(sizeof(SStbObj));
8,161✔
225
  if (pRow == NULL) goto _OVER;
8,161!
226

227
  pStb = sdbGetRowObj(pRow);
8,161✔
228
  if (pStb == NULL) goto _OVER;
8,161!
229

230
  int32_t dataPos = 0;
8,161✔
231
  SDB_GET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN, _OVER)
8,161!
232
  SDB_GET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN, _OVER)
8,161!
233
  SDB_GET_INT64(pRaw, dataPos, &pStb->createdTime, _OVER)
8,161!
234
  SDB_GET_INT64(pRaw, dataPos, &pStb->updateTime, _OVER)
8,161!
235
  SDB_GET_INT64(pRaw, dataPos, &pStb->uid, _OVER)
8,161!
236
  SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, _OVER)
8,161!
237
  SDB_GET_INT32(pRaw, dataPos, &pStb->tagVer, _OVER)
8,161!
238
  SDB_GET_INT32(pRaw, dataPos, &pStb->colVer, _OVER)
8,161!
239
  SDB_GET_INT32(pRaw, dataPos, &pStb->smaVer, _OVER)
8,161!
240
  SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER)
8,161!
241
  SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[0], _OVER)
8,161!
242
  SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[1], _OVER)
8,161!
243
  SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[0], _OVER)
8,161!
244
  SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[1], _OVER)
8,161!
245
  SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, _OVER)
8,161!
246
  SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, _OVER)
8,161!
247
  SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER)
8,161!
248
  SDB_GET_INT32(pRaw, dataPos, &pStb->numOfFuncs, _OVER)
8,161!
249
  SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, _OVER)
8,161!
250
  SDB_GET_INT32(pRaw, dataPos, &pStb->ast1Len, _OVER)
8,161!
251
  SDB_GET_INT32(pRaw, dataPos, &pStb->ast2Len, _OVER)
8,161!
252

253
  pStb->pColumns = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
8,161✔
254
  pStb->pTags = taosMemoryCalloc(pStb->numOfTags, sizeof(SSchema));
8,161✔
255
  pStb->pFuncs = taosArrayInit(pStb->numOfFuncs, TSDB_FUNC_NAME_LEN);
8,161✔
256
  if (pStb->pColumns == NULL || pStb->pTags == NULL || pStb->pFuncs == NULL) {
8,161!
257
    goto _OVER;
×
258
  }
259

260
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
103,158✔
261
    SSchema *pSchema = &pStb->pColumns[i];
94,997✔
262
    SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
94,997!
263
    SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
94,997!
264
    SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
94,997!
265
    SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
94,997!
266
    SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
94,997!
267
  }
268

269
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
31,819✔
270
    SSchema *pSchema = &pStb->pTags[i];
23,658✔
271
    SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
23,658!
272
    SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
23,658!
273
    SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
23,658!
274
    SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
23,658!
275
    SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
23,658!
276
  }
277

278
  for (int32_t i = 0; i < pStb->numOfFuncs; ++i) {
8,171✔
279
    char funcName[TSDB_FUNC_NAME_LEN] = {0};
10✔
280
    SDB_GET_BINARY(pRaw, dataPos, funcName, TSDB_FUNC_NAME_LEN, _OVER)
10!
281
    if (taosArrayPush(pStb->pFuncs, funcName) == NULL) goto _OVER;
20!
282
  }
283

284
  if (pStb->commentLen > 0) {
8,161✔
285
    pStb->comment = taosMemoryCalloc(pStb->commentLen + 1, 1);
128✔
286
    if (pStb->comment == NULL) goto _OVER;
128!
287
    SDB_GET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen + 1, _OVER)
128!
288
  }
289

290
  if (pStb->ast1Len > 0) {
8,161✔
291
    pStb->pAst1 = taosMemoryCalloc(pStb->ast1Len, 1);
10✔
292
    if (pStb->pAst1 == NULL) goto _OVER;
10!
293
    SDB_GET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
10!
294
  }
295

296
  if (pStb->ast2Len > 0) {
8,161✔
297
    pStb->pAst2 = taosMemoryCalloc(pStb->ast2Len, 1);
10✔
298
    if (pStb->pAst2 == NULL) goto _OVER;
10!
299
    SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
10!
300
  }
301

302
  pStb->pCmpr = taosMemoryCalloc(pStb->numOfColumns, sizeof(SColCmpr));
8,161✔
303
  if (sver < STB_VER_NUMBER) {
8,161!
304
    // compatible with old data, setup default compress value
305
    // impl later
306
    for (int i = 0; i < pStb->numOfColumns; i++) {
×
307
      SSchema  *pSchema = &pStb->pColumns[i];
×
308
      SColCmpr *pCmpr = &pStb->pCmpr[i];
×
309
      pCmpr->id = pSchema->colId;
×
310
      pCmpr->alg = createDefaultColCmprByType(pSchema->type);
×
311
    }
312
  } else {
313
    for (int i = 0; i < pStb->numOfColumns; i++) {
103,158✔
314
      SColCmpr *pCmpr = &pStb->pCmpr[i];
94,997✔
315
      SDB_GET_INT16(pRaw, dataPos, &pCmpr->id, _OVER)
94,997!
316
      SDB_GET_INT32(pRaw, dataPos, (int32_t *)&pCmpr->alg, _OVER)  // compatiable
94,997!
317
    }
318
  }
319

320
  SDB_GET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
8,161!
321

322
  terrno = 0;
8,161✔
323

324
_OVER:
8,161✔
325
  if (terrno != 0) {
8,161!
326
    mError("stb:%s, failed to decode from raw:%p since %s", pStb == NULL ? "null" : pStb->name, pRaw, terrstr());
×
327
    if (pStb != NULL) {
×
328
      taosMemoryFreeClear(pStb->pColumns);
×
329
      taosMemoryFreeClear(pStb->pTags);
×
330
      taosMemoryFreeClear(pStb->comment);
×
331
      taosMemoryFree(pStb->pCmpr);
×
332
    }
333
    taosMemoryFreeClear(pRow);
×
334
    return NULL;
×
335
  }
336

337
  mTrace("stb:%s, decode from raw:%p, row:%p", pStb->name, pRaw, pStb);
8,161✔
338
  return pRow;
8,161✔
339
}
340

341
void mndFreeStb(SStbObj *pStb) {
11,445✔
342
  taosArrayDestroy(pStb->pFuncs);
11,445✔
343
  taosMemoryFreeClear(pStb->pColumns);
11,445!
344
  taosMemoryFreeClear(pStb->pTags);
11,445!
345
  taosMemoryFreeClear(pStb->comment);
11,445✔
346
  taosMemoryFreeClear(pStb->pAst1);
11,445✔
347
  taosMemoryFreeClear(pStb->pAst2);
11,445✔
348
  taosMemoryFreeClear(pStb->pCmpr);
11,445!
349
}
11,445✔
350

351
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
5,851✔
352
  mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb);
5,851✔
353
  return 0;
5,851✔
354
}
355

356
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
10,947✔
357
  mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
10,947✔
358
  mndFreeStb(pStb);
10,947✔
359
  return 0;
10,947✔
360
}
361

362
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
1,276✔
363
  mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
1,276✔
364

365
  taosWLockLatch(&pOld->lock);
1,276✔
366
  int32_t numOfColumns = pOld->numOfColumns;
1,276✔
367
  if (pOld->numOfColumns < pNew->numOfColumns) {
1,276✔
368
    void *pColumns = taosMemoryMalloc(pNew->numOfColumns * sizeof(SSchema));
168✔
369
    if (pColumns != NULL) {
168!
370
      taosMemoryFree(pOld->pColumns);
168✔
371
      pOld->pColumns = pColumns;
168✔
372
    } else {
373
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
374
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
375
      taosWUnLockLatch(&pOld->lock);
×
376
    }
377
  }
378

379
  if (pOld->numOfTags < pNew->numOfTags) {
1,276✔
380
    void *pTags = taosMemoryMalloc(pNew->numOfTags * sizeof(SSchema));
86✔
381
    if (pTags != NULL) {
86!
382
      taosMemoryFree(pOld->pTags);
86✔
383
      pOld->pTags = pTags;
86✔
384
    } else {
385
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
386
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
387
      taosWUnLockLatch(&pOld->lock);
×
388
    }
389
  }
390

391
  if (pOld->commentLen < pNew->commentLen && pNew->commentLen > 0) {
1,276✔
392
    void *comment = taosMemoryMalloc(pNew->commentLen + 1);
19✔
393
    if (comment != NULL) {
19!
394
      taosMemoryFree(pOld->comment);
19✔
395
      pOld->comment = comment;
19✔
396
    } else {
397
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
398
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
399
      taosWUnLockLatch(&pOld->lock);
×
400
    }
401
  }
402
  pOld->commentLen = pNew->commentLen;
1,276✔
403

404
  if (pOld->ast1Len < pNew->ast1Len) {
1,276!
405
    void *pAst1 = taosMemoryMalloc(pNew->ast1Len + 1);
×
406
    if (pAst1 != NULL) {
×
407
      taosMemoryFree(pOld->pAst1);
×
408
      pOld->pAst1 = pAst1;
×
409
    } else {
410
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
411
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
412
      taosWUnLockLatch(&pOld->lock);
×
413
    }
414
  }
415

416
  if (pOld->ast2Len < pNew->ast2Len) {
1,276!
417
    void *pAst2 = taosMemoryMalloc(pNew->ast2Len + 1);
×
418
    if (pAst2 != NULL) {
×
419
      taosMemoryFree(pOld->pAst2);
×
420
      pOld->pAst2 = pAst2;
×
421
    } else {
422
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
423
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
424
      taosWUnLockLatch(&pOld->lock);
×
425
    }
426
  }
427

428
  pOld->updateTime = pNew->updateTime;
1,276✔
429
  pOld->tagVer = pNew->tagVer;
1,276✔
430
  pOld->colVer = pNew->colVer;
1,276✔
431
  pOld->smaVer = pNew->smaVer;
1,276✔
432
  pOld->nextColId = pNew->nextColId;
1,276✔
433
  pOld->ttl = pNew->ttl;
1,276✔
434
  if (pNew->numOfColumns > 0) {
1,276✔
435
    pOld->numOfColumns = pNew->numOfColumns;
1,252✔
436
    memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
1,252✔
437
  }
438
  if (pNew->numOfTags > 0) {
1,276✔
439
    pOld->numOfTags = pNew->numOfTags;
1,252✔
440
    memcpy(pOld->pTags, pNew->pTags, pOld->numOfTags * sizeof(SSchema));
1,252✔
441
  }
442
  if (pNew->commentLen > 0) {
1,276✔
443
    memcpy(pOld->comment, pNew->comment, pNew->commentLen + 1);
102✔
444
    pOld->commentLen = pNew->commentLen;
102✔
445
  }
446
  if (pNew->ast1Len != 0) {
1,276!
447
    memcpy(pOld->pAst1, pNew->pAst1, pNew->ast1Len);
×
448
    pOld->ast1Len = pNew->ast1Len;
×
449
  }
450
  if (pNew->ast2Len != 0) {
1,276!
451
    memcpy(pOld->pAst2, pNew->pAst2, pNew->ast2Len);
×
452
    pOld->ast2Len = pNew->ast2Len;
×
453
  }
454
  if (numOfColumns < pNew->numOfColumns) {
1,276✔
455
    taosMemoryFree(pOld->pCmpr);
168✔
456
    pOld->pCmpr = taosMemoryCalloc(pNew->numOfColumns, sizeof(SColCmpr));
168✔
457
    memcpy(pOld->pCmpr, pNew->pCmpr, pNew->numOfColumns * sizeof(SColCmpr));
168✔
458
  } else {
459
    memcpy(pOld->pCmpr, pNew->pCmpr, pNew->numOfColumns * sizeof(SColCmpr));
1,108✔
460
  }
461

462
  taosWUnLockLatch(&pOld->lock);
1,276✔
463
  return 0;
1,276✔
464
}
465

466
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) {
16,025✔
467
  SSdb    *pSdb = pMnode->pSdb;
16,025✔
468
  SStbObj *pStb = sdbAcquire(pSdb, SDB_STB, stbName);
16,025✔
469
  if (pStb == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
16,025!
470
    terrno = TSDB_CODE_MND_STB_NOT_EXIST;
3,766✔
471
  }
472
  return pStb;
16,025✔
473
}
474

475
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb) {
15,753✔
476
  SSdb *pSdb = pMnode->pSdb;
15,753✔
477
  sdbRelease(pSdb, pStb);
15,753✔
478
}
15,753✔
479

480
SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
4,676✔
481
  SName name = {0};
4,676✔
482
  if ((terrno = tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) return NULL;
4,676!
483

484
  char db[TSDB_TABLE_FNAME_LEN] = {0};
4,676✔
485
  if ((terrno = tNameGetFullDbName(&name, db)) != 0) return NULL;
4,676!
486

487
  return mndAcquireDb(pMnode, db);
4,676✔
488
}
489

490
static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *pSchema) {
491
  if (*(col_id_t *)colId < ((SSchema *)pSchema)->colId) {
492
    return -1;
493
  } else if (*(col_id_t *)colId > ((SSchema *)pSchema)->colId) {
494
    return 1;
495
  }
496
  return 0;
497
}
498

499
void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, void *alterOriData,
9,725✔
500
                            int32_t alterOriDataLen) {
501
  SEncoder       encoder = {0};
9,725✔
502
  int32_t        contLen;
503
  SName          name = {0};
9,725✔
504
  SVCreateStbReq req = {0};
9,725✔
505

506
  if ((terrno = tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
9,725!
507
    goto _err;
×
508
  }
509
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
9,725✔
510
  if ((terrno = tNameGetFullDbName(&name, dbFName)) != 0) {
9,725!
511
    goto _err;
×
512
  };
513

514
  req.name = (char *)tNameGetTableName(&name);
9,725✔
515
  req.suid = pStb->uid;
9,725✔
516
  req.rollup = pStb->ast1Len > 0 ? 1 : 0;
9,725✔
517
  req.alterOriData = alterOriData;
9,725✔
518
  req.alterOriDataLen = alterOriDataLen;
9,725✔
519
  req.source = pStb->source;
9,725✔
520
  // todo
521
  req.schemaRow.nCols = pStb->numOfColumns;
9,725✔
522
  req.schemaRow.version = pStb->colVer;
9,725✔
523
  req.schemaRow.pSchema = pStb->pColumns;
9,725✔
524
  req.schemaTag.nCols = pStb->numOfTags;
9,725✔
525
  req.schemaTag.version = pStb->tagVer;
9,725✔
526
  req.schemaTag.pSchema = pStb->pTags;
9,725✔
527

528
  req.colCmpred = 1;
9,725✔
529
  SColCmprWrapper *pCmpr = &req.colCmpr;
9,725✔
530
  pCmpr->version = pStb->colVer;
9,725✔
531
  pCmpr->nCols = pStb->numOfColumns;
9,725✔
532

533
  req.colCmpr.pColCmpr = taosMemoryCalloc(pCmpr->nCols, sizeof(SColCmpr));
9,725✔
534
  for (int32_t i = 0; i < pStb->numOfColumns; i++) {
97,650✔
535
    SColCmpr *p = &pCmpr->pColCmpr[i];
87,925✔
536
    p->alg = pStb->pCmpr[i].alg;
87,925✔
537
    p->id = pStb->pCmpr[i].id;
87,925✔
538
  }
539

540
  if (req.rollup) {
9,725✔
541
    req.rsmaParam.maxdelay[0] = pStb->maxdelay[0];
4✔
542
    req.rsmaParam.maxdelay[1] = pStb->maxdelay[1];
4✔
543
    req.rsmaParam.watermark[0] = pStb->watermark[0];
4✔
544
    req.rsmaParam.watermark[1] = pStb->watermark[1];
4✔
545
    if (pStb->ast1Len > 0) {
4!
546
      if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
4!
547
                             STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0],
548
                             req.rsmaParam.deleteMark[0]) < 0) {
549
        goto _err;
×
550
      }
551
    }
552
    if (pStb->ast2Len > 0) {
4!
553
      if (mndConvertRsmaTask(&req.rsmaParam.qmsg[1], &req.rsmaParam.qmsgLen[1], pStb->pAst2, pStb->uid,
4!
554
                             STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[1],
555
                             req.rsmaParam.deleteMark[1]) < 0) {
556
        goto _err;
×
557
      }
558
    }
559
  }
560
  // get length
561
  int32_t ret = 0;
9,725✔
562
  tEncodeSize(tEncodeSVCreateStbReq, &req, contLen, ret);
9,725!
563
  if (ret < 0) {
9,725!
564
    goto _err;
×
565
  }
566

567
  contLen += sizeof(SMsgHead);
9,725✔
568

569
  SMsgHead *pHead = taosMemoryCalloc(1, contLen);
9,725✔
570
  if (pHead == NULL) {
9,725!
571
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
572
    goto _err;
×
573
  }
574

575
  pHead->contLen = htonl(contLen);
9,725✔
576
  pHead->vgId = htonl(pVgroup->vgId);
9,725✔
577

578
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
9,725✔
579
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
9,725✔
580
  if (tEncodeSVCreateStbReq(&encoder, &req) < 0) {
9,725!
581
    taosMemoryFreeClear(pHead);
×
582
    tEncoderClear(&encoder);
×
583
    goto _err;
×
584
  }
585
  tEncoderClear(&encoder);
9,725✔
586

587
  *pContLen = contLen;
9,725✔
588
  taosMemoryFreeClear(req.rsmaParam.qmsg[0]);
9,725✔
589
  taosMemoryFreeClear(req.rsmaParam.qmsg[1]);
9,725✔
590
  taosMemoryFreeClear(req.colCmpr.pColCmpr);
9,725!
591
  return pHead;
9,725✔
592
_err:
×
593
  taosMemoryFreeClear(req.rsmaParam.qmsg[0]);
×
594
  taosMemoryFreeClear(req.rsmaParam.qmsg[1]);
×
595
  taosMemoryFreeClear(req.colCmpr.pColCmpr);
×
596
  return NULL;
×
597
}
598

599
static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
8,699✔
600
  SName        name = {0};
8,699✔
601
  SVDropStbReq req = {0};
8,699✔
602
  int32_t      contLen = 0;
8,699✔
603
  int32_t      ret = 0;
8,699✔
604
  SMsgHead    *pHead = NULL;
8,699✔
605
  SEncoder     encoder = {0};
8,699✔
606

607
  if ((terrno = tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
8,699!
608
    return NULL;
×
609
  }
610

611
  req.name = (char *)tNameGetTableName(&name);
8,699✔
612
  req.suid = pStb->uid;
8,699✔
613

614
  tEncodeSize(tEncodeSVDropStbReq, &req, contLen, ret);
8,699!
615
  if (ret < 0) return NULL;
8,699!
616

617
  contLen += sizeof(SMsgHead);
8,699✔
618
  pHead = taosMemoryMalloc(contLen);
8,699✔
619
  if (pHead == NULL) {
8,699!
620
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
621
    return NULL;
×
622
  }
623

624
  pHead->contLen = htonl(contLen);
8,699✔
625
  pHead->vgId = htonl(pVgroup->vgId);
8,699✔
626

627
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
8,699✔
628

629
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
8,699✔
630
  terrno = tEncodeSVDropStbReq(&encoder, &req);
8,699✔
631
  tEncoderClear(&encoder);
8,699✔
632
  if (terrno != 0) return NULL;
8,699!
633

634
  *pContLen = contLen;
8,699✔
635
  return pHead;
8,699✔
636
}
637

638
int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
3,385✔
639
  int32_t code = 0;
3,385✔
640
  if (pCreate->igExists < 0 || pCreate->igExists > 1) {
3,385!
641
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
642
    TAOS_RETURN(code);
×
643
  }
644

645
  if (pCreate->numOfColumns < TSDB_MIN_COLUMNS || pCreate->numOfTags + pCreate->numOfColumns > TSDB_MAX_COLUMNS) {
3,385!
646
    code = TSDB_CODE_PAR_INVALID_COLUMNS_NUM;
×
647
    TAOS_RETURN(code);
×
648
  }
649

650
  if (pCreate->numOfTags <= 0 || pCreate->numOfTags > TSDB_MAX_TAGS) {
3,385!
651
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
652
    TAOS_RETURN(code);
×
653
  }
654

655
  SField *pField = taosArrayGet(pCreate->pColumns, 0);
3,385✔
656
  if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
3,385!
657
    code = TSDB_CODE_PAR_INVALID_FIRST_COLUMN;
×
658
    TAOS_RETURN(code);
×
659
  }
660

661
  for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
47,444✔
662
    SFieldWithOptions *pField1 = taosArrayGet(pCreate->pColumns, i);
44,059✔
663
    if (pField1->type >= TSDB_DATA_TYPE_MAX) {
44,059!
664
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
665
      TAOS_RETURN(code);
×
666
    }
667
    if (pField1->bytes <= 0) {
44,059!
668
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
669
      TAOS_RETURN(code);
×
670
    }
671
    if (pField1->name[0] == 0) {
44,059!
672
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
673
      TAOS_RETURN(code);
×
674
    }
675
  }
676

677
  for (int32_t i = 0; i < pCreate->numOfTags; ++i) {
13,764✔
678
    SField *pField1 = taosArrayGet(pCreate->pTags, i);
10,379✔
679
    if (pField1->type >= TSDB_DATA_TYPE_MAX) {
10,379!
680
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
681
      TAOS_RETURN(code);
×
682
    }
683
    if (pField1->bytes <= 0) {
10,379!
684
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
685
      TAOS_RETURN(code);
×
686
    }
687
    if (pField1->name[0] == 0) {
10,379!
688
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
689
      TAOS_RETURN(code);
×
690
    }
691
  }
692

693
  TAOS_RETURN(code);
3,385✔
694
}
695

696
static int32_t mndSetCreateStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
×
697
  int32_t  code = 0;
×
698
  SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
×
699
  if (pRedoRaw == NULL) {
×
700
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
701
    if (terrno != 0) code = terrno;
×
702
    TAOS_RETURN(code);
×
703
  }
704
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
×
705
    sdbFreeRaw(pRedoRaw);
×
706
    TAOS_RETURN(code);
×
707
  }
708
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
709

710
  TAOS_RETURN(code);
×
711
}
712

713
static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
3,282✔
714
  int32_t  code = 0;
3,282✔
715
  SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
3,282✔
716
  if (pCommitRaw == NULL) {
3,282!
717
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
718
    if (terrno != 0) code = terrno;
×
719
    TAOS_RETURN(code);
×
720
  }
721
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
3,282!
722
    sdbFreeRaw(pCommitRaw);
×
723
    TAOS_RETURN(code);
×
724
  }
725
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
3,282!
726

727
  TAOS_RETURN(code);
3,282✔
728
}
729

730
static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
3,282✔
731
  int32_t code = 0;
3,282✔
732
  SSdb   *pSdb = pMnode->pSdb;
3,282✔
733
  SVgObj *pVgroup = NULL;
3,282✔
734
  void   *pIter = NULL;
3,282✔
735
  int32_t contLen;
736

737
  while (1) {
16,033✔
738
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
19,315✔
739
    if (pIter == NULL) break;
19,315✔
740
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
16,033✔
741
      sdbRelease(pSdb, pVgroup);
7,619✔
742
      continue;
7,619✔
743
    }
744

745
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, NULL, 0);
8,414✔
746
    if (pReq == NULL) {
8,414!
747
      sdbCancelFetch(pSdb, pIter);
×
748
      sdbRelease(pSdb, pVgroup);
×
749
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
750
      if (terrno != 0) code = terrno;
×
751
      TAOS_RETURN(code);
×
752
    }
753

754
    STransAction action = {0};
8,414✔
755
    action.mTraceId = pTrans->mTraceId;
8,414✔
756
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
8,414✔
757
    action.pCont = pReq;
8,414✔
758
    action.contLen = contLen;
8,414✔
759
    action.msgType = TDMT_VND_CREATE_STB;
8,414✔
760
    action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
8,414✔
761
    action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;
8,414✔
762
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
8,414!
763
      taosMemoryFree(pReq);
×
764
      sdbCancelFetch(pSdb, pIter);
×
765
      sdbRelease(pSdb, pVgroup);
×
766
      TAOS_RETURN(code);
×
767
    }
768
    sdbRelease(pSdb, pVgroup);
8,414✔
769
  }
770

771
  TAOS_RETURN(code);
3,282✔
772
}
773

774
int32_t mndSetForceDropCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SStbObj *pStb) {
2✔
775
  int32_t code = 0;
2✔
776
  SSdb   *pSdb = pMnode->pSdb;
2✔
777
  int32_t contLen;
778

779
  void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, NULL, 0);
2✔
780
  if (pReq == NULL) {
2!
781
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
782
    if (terrno != 0) code = terrno;
×
783
    TAOS_RETURN(code);
×
784
  }
785

786
  STransAction action = {0};
2✔
787
  action.mTraceId = pTrans->mTraceId;
2✔
788
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
2✔
789
  action.pCont = pReq;
2✔
790
  action.contLen = contLen;
2✔
791
  action.msgType = TDMT_VND_CREATE_STB;
2✔
792
  action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
2✔
793
  action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;
2✔
794
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2!
795
    taosMemoryFree(pReq);
×
796
    TAOS_RETURN(code);
×
797
  }
798

799
  TAOS_RETURN(code);
2✔
800
}
801

802
static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
3,282✔
803
  int32_t code = 0;
3,282✔
804
  SSdb   *pSdb = pMnode->pSdb;
3,282✔
805
  SVgObj *pVgroup = NULL;
3,282✔
806
  void   *pIter = NULL;
3,282✔
807

808
  while (1) {
16,033✔
809
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
19,315✔
810
    if (pIter == NULL) break;
19,315✔
811
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
16,033✔
812
      sdbRelease(pSdb, pVgroup);
7,619✔
813
      continue;
7,619✔
814
    }
815

816
    int32_t contLen = 0;
8,414✔
817
    void   *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
8,414✔
818
    if (pReq == NULL) {
8,414!
819
      sdbCancelFetch(pSdb, pIter);
×
820
      sdbRelease(pSdb, pVgroup);
×
821
      code = TSDB_CODE_OUT_OF_MEMORY;
×
822
      TAOS_RETURN(code);
×
823
    }
824

825
    STransAction action = {0};
8,414✔
826
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
8,414✔
827
    action.pCont = pReq;
8,414✔
828
    action.contLen = contLen;
8,414✔
829
    action.msgType = TDMT_VND_DROP_STB;
8,414✔
830
    action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
8,414✔
831
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
8,414!
832
      taosMemoryFree(pReq);
×
833
      sdbCancelFetch(pSdb, pIter);
×
834
      sdbRelease(pSdb, pVgroup);
×
835
      TAOS_RETURN(code);
×
836
    }
837
    sdbRelease(pSdb, pVgroup);
8,414✔
838
  }
839

840
  TAOS_RETURN(code);
3,282✔
841
}
842

843
static SSchema *mndFindStbColumns(const SStbObj *pStb, const char *colName) {
×
844
  for (int32_t col = 0; col < pStb->numOfColumns; ++col) {
×
845
    SSchema *pSchema = &pStb->pColumns[col];
×
846
    if (strncasecmp(pSchema->name, colName, TSDB_COL_NAME_LEN) == 0) {
×
847
      return pSchema;
×
848
    }
849
  }
850
  return NULL;
×
851
}
852

853
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb) {
3,287✔
854
  int32_t code = 0;
3,287✔
855
  memcpy(pDst->name, pCreate->name, TSDB_TABLE_FNAME_LEN);
3,287✔
856
  memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
3,287✔
857
  pDst->createdTime = taosGetTimestampMs();
3,287✔
858
  pDst->updateTime = pDst->createdTime;
3,287✔
859
  pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX)
3,287✔
860
                  ? pCreate->suid
861
                  : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
6,574!
862
  pDst->dbUid = pDb->uid;
3,287✔
863
  pDst->tagVer = 1;
3,287✔
864
  pDst->colVer = 1;
3,287✔
865
  pDst->smaVer = 1;
3,287✔
866
  pDst->nextColId = 1;
3,287✔
867
  pDst->maxdelay[0] = pCreate->delay1;
3,287✔
868
  pDst->maxdelay[1] = pCreate->delay2;
3,287✔
869
  pDst->watermark[0] = pCreate->watermark1;
3,287✔
870
  pDst->watermark[1] = pCreate->watermark2;
3,287✔
871
  pDst->ttl = pCreate->ttl;
3,287✔
872
  pDst->numOfColumns = pCreate->numOfColumns;
3,287✔
873
  pDst->numOfTags = pCreate->numOfTags;
3,287✔
874
  pDst->numOfFuncs = pCreate->numOfFuncs;
3,287✔
875
  pDst->commentLen = pCreate->commentLen;
3,287✔
876
  pDst->pFuncs = pCreate->pFuncs;
3,287✔
877
  pDst->source = pCreate->source;
3,287✔
878
  pCreate->pFuncs = NULL;
3,287✔
879

880
  if (pDst->commentLen > 0) {
3,287✔
881
    pDst->comment = taosMemoryCalloc(pDst->commentLen + 1, 1);
16✔
882
    if (pDst->comment == NULL) {
16!
883
      code = terrno;
×
884
      TAOS_RETURN(code);
×
885
    }
886
    memcpy(pDst->comment, pCreate->pComment, pDst->commentLen + 1);
16✔
887
  }
888

889
  pDst->ast1Len = pCreate->ast1Len;
3,287✔
890
  if (pDst->ast1Len > 0) {
3,287✔
891
    pDst->pAst1 = taosMemoryCalloc(pDst->ast1Len, 1);
3✔
892
    if (pDst->pAst1 == NULL) {
3!
893
      code = terrno;
×
894
      TAOS_RETURN(code);
×
895
    }
896
    memcpy(pDst->pAst1, pCreate->pAst1, pDst->ast1Len);
3✔
897
  }
898

899
  pDst->ast2Len = pCreate->ast2Len;
3,287✔
900
  if (pDst->ast2Len > 0) {
3,287✔
901
    pDst->pAst2 = taosMemoryCalloc(pDst->ast2Len, 1);
3✔
902
    if (pDst->pAst2 == NULL) {
3!
903
      code = terrno;
×
904
      TAOS_RETURN(code);
×
905
    }
906
    memcpy(pDst->pAst2, pCreate->pAst2, pDst->ast2Len);
3✔
907
  }
908

909
  pDst->pColumns = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SSchema));
3,287✔
910
  pDst->pTags = taosMemoryCalloc(1, pDst->numOfTags * sizeof(SSchema));
3,287✔
911
  if (pDst->pColumns == NULL || pDst->pTags == NULL) {
3,287!
912
    code = terrno;
×
913
    TAOS_RETURN(code);
×
914
  }
915

916
  if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) {
3,287!
917
    code = TSDB_CODE_OUT_OF_RANGE;
×
918
    TAOS_RETURN(code);
×
919
  }
920

921
  for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
46,797✔
922
    SFieldWithOptions *pField = taosArrayGet(pCreate->pColumns, i);
43,510✔
923
    SSchema           *pSchema = &pDst->pColumns[i];
43,510✔
924
    pSchema->type = pField->type;
43,510✔
925
    pSchema->bytes = pField->bytes;
43,510✔
926
    pSchema->flags = pField->flags;
43,510✔
927
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
43,510✔
928
    pSchema->colId = pDst->nextColId;
43,510✔
929
    pDst->nextColId++;
43,510✔
930
  }
931

932
  for (int32_t i = 0; i < pDst->numOfTags; ++i) {
13,321✔
933
    SField  *pField = taosArrayGet(pCreate->pTags, i);
10,034✔
934
    SSchema *pSchema = &pDst->pTags[i];
10,034✔
935
    pSchema->type = pField->type;
10,034✔
936
    pSchema->bytes = pField->bytes;
10,034✔
937
    if (i == 0) {
10,034✔
938
      SSCHMEA_SET_IDX_ON(pSchema);
3,287✔
939
    }
940
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
10,034✔
941
    pSchema->colId = pDst->nextColId;
10,034✔
942
    pDst->nextColId++;
10,034✔
943
  }
944
  // set col compress
945
  pDst->pCmpr = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SCmprObj));
3,287✔
946
  for (int32_t i = 0; i < pDst->numOfColumns; i++) {
46,797✔
947
    SFieldWithOptions *pField = taosArrayGet(pCreate->pColumns, i);
43,510✔
948
    SSchema           *pSchema = &pDst->pColumns[i];
43,510✔
949

950
    SColCmpr *pColCmpr = &pDst->pCmpr[i];
43,510✔
951
    pColCmpr->id = pSchema->colId;
43,510✔
952
    pColCmpr->alg = pField->compress;
43,510✔
953
  }
954
  TAOS_RETURN(code);
3,287✔
955
}
956
static int32_t mndGenIdxNameForFirstTag(char *fullname, char *dbname, char *stbname, char *tagname) {
2,789✔
957
  SName name = {0};
2,789✔
958
  if ((terrno = tNameFromString(&name, stbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
2,789!
959
    return -1;
×
960
  }
961
  return snprintf(fullname, TSDB_INDEX_FNAME_LEN, "%s.%s_%s", dbname, tagname, tNameGetTableName(&name));
2,789✔
962
}
963

964
static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
2,789✔
965
  SStbObj stbObj = {0};
2,789✔
966
  int32_t code = -1;
2,789✔
967

968
  char fullIdxName[TSDB_INDEX_FNAME_LEN * 2] = {0};
2,789✔
969

970
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stb");
2,789✔
971
  if (pTrans == NULL) {
2,789!
972
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
973
    if (terrno != 0) code = terrno;
×
974
    goto _OVER;
×
975
  }
976

977
  mInfo("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
2,789!
978
  TAOS_CHECK_GOTO(mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb), NULL, _OVER);
2,789!
979

980
  SSchema *pSchema = &(stbObj.pTags[0]);
2,789✔
981
  if (mndGenIdxNameForFirstTag(fullIdxName, pDb->name, stbObj.name, pSchema->name) < 0) {
2,789!
982
    goto _OVER;
×
983
  }
984
  SSIdx idx = {0};
2,789✔
985
  if (mndAcquireGlobalIdx(pMnode, fullIdxName, SDB_IDX, &idx) == 0 && idx.pIdx != NULL) {
2,789!
986
    code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
×
987
    mndReleaseIdx(pMnode, idx.pIdx);
×
988
    goto _OVER;
×
989
  }
990

991
  SIdxObj idxObj = {0};
2,789✔
992
  memcpy(idxObj.name, fullIdxName, TSDB_INDEX_FNAME_LEN);
2,789✔
993
  memcpy(idxObj.stb, stbObj.name, TSDB_TABLE_FNAME_LEN);
2,789✔
994
  memcpy(idxObj.db, stbObj.db, TSDB_DB_FNAME_LEN);
2,789✔
995
  memcpy(idxObj.colName, pSchema->name, TSDB_COL_NAME_LEN);
2,789✔
996
  idxObj.createdTime = taosGetTimestampMs();
2,789✔
997
  idxObj.uid = mndGenerateUid(fullIdxName, strlen(fullIdxName));
2,789✔
998
  idxObj.stbUid = stbObj.uid;
2,789✔
999
  idxObj.dbUid = stbObj.dbUid;
2,789✔
1000

1001
  TAOS_CHECK_GOTO(mndSetCreateIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
2,789!
1002

1003
  TAOS_CHECK_GOTO(mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj), NULL, _OVER);
2,789✔
1004
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
2,784✔
1005
  code = 0;
2,783✔
1006

1007
_OVER:
2,789✔
1008
  mndTransDrop(pTrans);
2,789✔
1009
  if (mndStbActionDelete(pMnode->pSdb, &stbObj) != 0) mError("failed to mndStbActionDelete");
2,789!
1010
  TAOS_RETURN(code);
2,789✔
1011
}
1012

1013
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
3,287✔
1014
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
3,287✔
1015
  TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
3,287✔
1016
  TAOS_CHECK_RETURN(mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
3,282!
1017
  TAOS_CHECK_RETURN(mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
3,282!
1018
  TAOS_CHECK_RETURN(mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
3,282!
1019
  return 0;
3,282✔
1020
}
1021

1022
static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
2,634✔
1023
  SMnode           *pMnode = pReq->info.node;
2,634✔
1024
  SSdb             *pSdb = pMnode->pSdb;
2,634✔
1025
  SVgObj           *pVgroup = NULL;
2,634✔
1026
  void             *pIter = NULL;
2,634✔
1027
  SVDropTtlTableReq ttlReq = {
2,634✔
1028
      .timestampSec = taosGetTimestampSec(), .ttlDropMaxCount = tsTtlBatchDropNum, .nUids = 0, .pTbUids = NULL};
2,634✔
1029
  int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
2,634✔
1030
  int32_t contLen = reqLen + sizeof(SMsgHead);
2,634✔
1031

1032
  mDebug("start to process ttl timer");
2,634✔
1033

1034
  while (1) {
8,775✔
1035
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
11,409✔
1036
    if (pIter == NULL) break;
11,409✔
1037

1038
    int32_t   code = 0;
8,775✔
1039
    SMsgHead *pHead = rpcMallocCont(contLen);
8,775✔
1040
    if (pHead == NULL) {
8,775!
1041
      sdbRelease(pSdb, pVgroup);
×
1042
      continue;
×
1043
    }
1044
    pHead->contLen = htonl(contLen);
8,775✔
1045
    pHead->vgId = htonl(pVgroup->vgId);
8,775✔
1046
    if ((code = tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), reqLen, &ttlReq)) < 0) {
8,775!
1047
      mError("vgId:%d, failed to serialize drop ttl table request since %s", pVgroup->vgId, tstrerror(code));
×
1048
      sdbRelease(pSdb, pVgroup);
×
1049
      continue;
×
1050
    }
1051

1052
    SRpcMsg rpcMsg = {
8,775✔
1053
        .msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
1054
    SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
8,775✔
1055
    code = tmsgSendReq(&epSet, &rpcMsg);
8,775✔
1056
    if (code != 0) {
8,775✔
1057
      mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
9!
1058
    } else {
1059
      mDebug("vgId:%d, send drop ttl table request to vnode, time:%" PRId32, pVgroup->vgId, ttlReq.timestampSec);
8,766✔
1060
    }
1061
    sdbRelease(pSdb, pVgroup);
8,775✔
1062
  }
1063

1064
  return 0;
2,634✔
1065
}
1066

UNCOV
1067
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq) {
×
UNCOV
1068
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
1069
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
1070
  SVgObj     *pVgroup = NULL;
×
UNCOV
1071
  void       *pIter = NULL;
×
UNCOV
1072
  SVTrimDbReq trimReq = {.timestamp = taosGetTimestampSec()};
×
UNCOV
1073
  int32_t     reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq);
×
UNCOV
1074
  int32_t     contLen = reqLen + sizeof(SMsgHead);
×
1075

UNCOV
1076
  while (1) {
×
UNCOV
1077
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
UNCOV
1078
    if (pIter == NULL) break;
×
1079

UNCOV
1080
    int32_t code = 0;
×
1081

UNCOV
1082
    SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
1083
    if (pHead == NULL) {
×
1084
      sdbCancelFetch(pSdb, pVgroup);
×
1085
      sdbRelease(pSdb, pVgroup);
×
1086
      continue;
×
1087
    }
UNCOV
1088
    pHead->contLen = htonl(contLen);
×
UNCOV
1089
    pHead->vgId = htonl(pVgroup->vgId);
×
UNCOV
1090
    if ((code = tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), reqLen, &trimReq)) < 0) {
×
1091
      mError("vgId:%d, failed to serialize trim db request since %s", pVgroup->vgId, tstrerror(code));
×
1092
    }
1093

UNCOV
1094
    SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen};
×
UNCOV
1095
    SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
UNCOV
1096
    code = tmsgSendReq(&epSet, &rpcMsg);
×
UNCOV
1097
    if (code != 0) {
×
1098
      mError("vgId:%d, timer failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code);
×
1099
    } else {
UNCOV
1100
      mInfo("vgId:%d, timer send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp);
×
1101
    }
UNCOV
1102
    sdbRelease(pSdb, pVgroup);
×
1103
  }
1104

UNCOV
1105
  return 0;
×
1106
}
1107

1108
static int32_t mndProcessS3MigrateDbTimer(SRpcMsg *pReq) {
×
1109
  SMnode          *pMnode = pReq->info.node;
×
1110
  SSdb            *pSdb = pMnode->pSdb;
×
1111
  SVgObj          *pVgroup = NULL;
×
1112
  void            *pIter = NULL;
×
1113
  SVS3MigrateDbReq s3migrateReq = {.timestamp = taosGetTimestampSec()};
×
1114
  int32_t          reqLen = tSerializeSVS3MigrateDbReq(NULL, 0, &s3migrateReq);
×
1115
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
1116

1117
  while (1) {
×
1118
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
1119
    if (pIter == NULL) break;
×
1120

1121
    int32_t code = 0;
×
1122

1123
    SMsgHead *pHead = rpcMallocCont(contLen);
×
1124
    if (pHead == NULL) {
×
1125
      sdbRelease(pSdb, pVgroup);
×
1126
      continue;
×
1127
    }
1128
    pHead->contLen = htonl(contLen);
×
1129
    pHead->vgId = htonl(pVgroup->vgId);
×
1130
    if ((code = tSerializeSVS3MigrateDbReq((char *)pHead + sizeof(SMsgHead), reqLen, &s3migrateReq)) < 0) {
×
1131
      mError("vgId:%d, failed to serialize s3migrate db request since %s", pVgroup->vgId, tstrerror(code));
×
1132
      sdbRelease(pSdb, pVgroup);
×
1133
      continue;
×
1134
    }
1135

1136
    SRpcMsg rpcMsg = {.msgType = TDMT_VND_S3MIGRATE, .pCont = pHead, .contLen = contLen};
×
1137
    SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
1138
    code = tmsgSendReq(&epSet, &rpcMsg);
×
1139
    if (code != 0) {
×
1140
      mError("vgId:%d, timer failed to send vnode-s3migrate request to vnode since 0x%x", pVgroup->vgId, code);
×
1141
    } else {
1142
      mInfo("vgId:%d, timer send vnode-s3migrate request to vnode, time:%d", pVgroup->vgId, s3migrateReq.timestamp);
×
1143
    }
1144
    sdbRelease(pSdb, pVgroup);
×
1145
  }
1146

1147
  return 0;
×
1148
}
1149

1150
static int32_t mndFindSuperTableTagIndex(const SStbObj *pStb, const char *tagName) {
694✔
1151
  for (int32_t tag = 0; tag < pStb->numOfTags; tag++) {
2,458✔
1152
    if (strcmp(pStb->pTags[tag].name, tagName) == 0) {
2,151✔
1153
      return tag;
387✔
1154
    }
1155
  }
1156

1157
  return -1;
307✔
1158
}
1159

1160
static int32_t mndFindSuperTableColumnIndex(const SStbObj *pStb, const char *colName) {
682✔
1161
  for (int32_t col = 0; col < pStb->numOfColumns; col++) {
6,148✔
1162
    if (strcmp(pStb->pColumns[col].name, colName) == 0) {
5,829✔
1163
      return col;
363✔
1164
    }
1165
  }
1166

1167
  return -1;
319✔
1168
}
1169

1170
static bool mndValidateSchema(SSchema *pSchemas, int32_t nSchema, SArray *pFields, int32_t maxLen) {
243✔
1171
  int32_t rowLen = 0;
243✔
1172
  for (int32_t i = 0; i < nSchema; ++i) {
4,076✔
1173
    rowLen += (pSchemas + i)->bytes;
3,833✔
1174
  }
1175

1176
  int32_t nField = taosArrayGetSize(pFields);
243✔
1177
  for (int32_t i = 0; i < nField; ++i) {
486✔
1178
    rowLen += ((SField *)TARRAY_GET_ELEM(pFields, i))->bytes;
243✔
1179
  }
1180

1181
  return rowLen <= maxLen;
243✔
1182
}
1183

1184
static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq *createReq) {
39✔
1185
  int32_t code = 0;
39✔
1186
  taosRLockLatch(&pStb->lock);
39✔
1187
  memcpy(pDst, pStb, sizeof(SStbObj));
39✔
1188
  taosRUnLockLatch(&pStb->lock);
39✔
1189

1190
  pDst->source = createReq->source;
39✔
1191
  pDst->updateTime = taosGetTimestampMs();
39✔
1192
  pDst->numOfColumns = createReq->numOfColumns;
39✔
1193
  pDst->numOfTags = createReq->numOfTags;
39✔
1194
  pDst->pColumns = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SSchema));
39✔
1195
  pDst->pTags = taosMemoryCalloc(1, pDst->numOfTags * sizeof(SSchema));
39✔
1196
  pDst->pCmpr = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SColCmpr));
39✔
1197

1198
  if (pDst->pColumns == NULL || pDst->pTags == NULL || pDst->pCmpr == NULL) {
39!
1199
    code = terrno;
×
1200
    TAOS_RETURN(code);
×
1201
  }
1202

1203
  if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) {
39!
1204
    code = TSDB_CODE_OUT_OF_RANGE;
×
1205
    TAOS_RETURN(code);
×
1206
  }
1207

1208
  for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
281✔
1209
    SFieldWithOptions *pField = taosArrayGet(createReq->pColumns, i);
242✔
1210
    SSchema           *pSchema = &pDst->pColumns[i];
242✔
1211
    pSchema->type = pField->type;
242✔
1212
    pSchema->bytes = pField->bytes;
242✔
1213
    pSchema->flags = pField->flags;
242✔
1214
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
242✔
1215
    int32_t cIndex = mndFindSuperTableColumnIndex(pStb, pField->name);
242✔
1216
    if (cIndex >= 0) {
242✔
1217
      pSchema->colId = pStb->pColumns[cIndex].colId;
219✔
1218
    } else {
1219
      pSchema->colId = pDst->nextColId++;
23✔
1220
    }
1221
  }
1222

1223
  for (int32_t i = 0; i < pDst->numOfTags; ++i) {
231✔
1224
    SField  *pField = taosArrayGet(createReq->pTags, i);
192✔
1225
    SSchema *pSchema = &pDst->pTags[i];
192✔
1226
    pSchema->type = pField->type;
192✔
1227
    pSchema->bytes = pField->bytes;
192✔
1228
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
192✔
1229
    int32_t cIndex = mndFindSuperTableTagIndex(pStb, pField->name);
192✔
1230
    if (cIndex >= 0) {
192✔
1231
      pSchema->colId = pStb->pTags[cIndex].colId;
175✔
1232
    } else {
1233
      pSchema->colId = pDst->nextColId++;
17✔
1234
    }
1235
  }
1236
  for (int32_t i = 0; i < pDst->numOfColumns; i++) {
281✔
1237
    SColCmpr          *p = pDst->pCmpr + i;
242✔
1238
    SFieldWithOptions *pField = taosArrayGet(createReq->pColumns, i);
242✔
1239
    SSchema           *pSchema = &pDst->pColumns[i];
242✔
1240
    p->id = pSchema->colId;
242✔
1241
    if (pField->compress == 0) {
242!
1242
      p->alg = createDefaultColCmprByType(pSchema->type);
×
1243
    } else {
1244
      p->alg = pField->compress;
242✔
1245
    }
1246
  }
1247
  pDst->tagVer = createReq->tagVer;
39✔
1248
  pDst->colVer = createReq->colVer;
39✔
1249
  return TSDB_CODE_SUCCESS;
39✔
1250
}
1251

1252
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
2,887✔
1253
  SMnode        *pMnode = pReq->info.node;
2,887✔
1254
  int32_t        code = -1;
2,887✔
1255
  SStbObj       *pStb = NULL;
2,887✔
1256
  SDbObj        *pDb = NULL;
2,887✔
1257
  SMCreateStbReq createReq = {0};
2,887✔
1258
  bool           isAlter = false;
2,887✔
1259

1260
  if (tDeserializeSMCreateStbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
2,887!
1261
    code = TSDB_CODE_INVALID_MSG;
×
1262
    goto _OVER;
×
1263
  }
1264

1265
  mInfo("stb:%s, start to create", createReq.name);
2,887!
1266
  if (mndCheckCreateStbReq(&createReq) != 0) {
2,887!
1267
    code = TSDB_CODE_INVALID_MSG;
×
1268
    goto _OVER;
×
1269
  }
1270

1271
  pStb = mndAcquireStb(pMnode, createReq.name);
2,887✔
1272
  if (pStb != NULL) {
2,887✔
1273
    if (createReq.igExists) {
97✔
1274
      if (createReq.source == TD_REQ_FROM_APP) {
96✔
1275
        mInfo("stb:%s, already exist, ignore exist is set", createReq.name);
7!
1276
        code = 0;
7✔
1277
        goto _OVER;
7✔
1278
      } else if (pStb->uid != createReq.suid) {
89!
1279
        mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
×
1280
        code = 0;
×
1281
        goto _OVER;
×
1282
      } else if (createReq.tagVer > 0 || createReq.colVer > 0) {
128!
1283
        int32_t tagDelta = createReq.tagVer - pStb->tagVer;
89✔
1284
        int32_t colDelta = createReq.colVer - pStb->colVer;
89✔
1285
        mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d",
89!
1286
              createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
1287
        if (tagDelta <= 0 && colDelta <= 0) {
89✔
1288
          mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name);
50!
1289
          code = 0;
50✔
1290
          goto _OVER;
50✔
1291
        } else if ((tagDelta == 1 && colDelta == 0) || (tagDelta == 0 && colDelta == 1) ||
39!
1292
                   (pStb->colVer == 1 && createReq.colVer > 1) || (pStb->tagVer == 1 && createReq.tagVer > 1)) {
×
1293
          isAlter = true;
39✔
1294
          mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
39!
1295
        } else {
1296
          mError("stb:%s, schema version increase more than 1 number, error is returned", createReq.name);
×
1297
          code = TSDB_CODE_MND_INVALID_SCHEMA_VER;
×
1298
          goto _OVER;
×
1299
        }
1300
      } else {
1301
        mError("stb:%s, already exist while create, input tagVer:%d colVer:%d is invalid, origin tagVer:%d colVer:%d",
×
1302
               createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
1303
        code = TSDB_CODE_MND_INVALID_SCHEMA_VER;
×
1304
        goto _OVER;
×
1305
      }
1306
    } else {
1307
      code = TSDB_CODE_MND_STB_ALREADY_EXIST;
1✔
1308
      goto _OVER;
1✔
1309
    }
1310
  } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
2,790!
1311
    goto _OVER;
×
1312
  } else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) &&
2,790!
1313
             (createReq.tagVer != 1 || createReq.colVer != 1)) {
48!
1314
    mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
1!
1315
    code = 0;
1✔
1316
    goto _OVER;
1✔
1317
  }
1318

1319
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
2,828✔
1320
  if (pDb == NULL) {
2,828!
1321
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1322
    goto _OVER;
×
1323
  }
1324

1325
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
2,828!
1326
    goto _OVER;
×
1327
  }
1328

1329
  int32_t numOfStbs = -1;
2,828✔
1330
  if ((code = mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs)) != 0) {
2,828!
1331
    goto _OVER;
×
1332
  }
1333

1334
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
2,828!
1335
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
1336
    goto _OVER;
×
1337
  }
1338

1339
  if ((code = grantCheck(TSDB_GRANT_STABLE)) < 0) {
2,828!
1340
    goto _OVER;
×
1341
  }
1342

1343
  if (isAlter) {
2,828✔
1344
    bool    needRsp = false;
39✔
1345
    SStbObj pDst = {0};
39✔
1346
    if ((code = mndBuildStbFromAlter(pStb, &pDst, &createReq)) != 0) {
39!
1347
      taosMemoryFreeClear(pDst.pTags);
×
1348
      taosMemoryFreeClear(pDst.pColumns);
×
1349
      taosMemoryFreeClear(pDst.pCmpr);
×
1350
      goto _OVER;
×
1351
    }
1352

1353
    code = mndAlterStbImp(pMnode, pReq, pDb, &pDst, needRsp, NULL, 0);
39✔
1354
    taosMemoryFreeClear(pDst.pTags);
39!
1355
    taosMemoryFreeClear(pDst.pColumns);
39!
1356
    taosMemoryFreeClear(pDst.pCmpr);
39!
1357
  } else {
1358
    code = mndCreateStb(pMnode, pReq, &createReq, pDb);
2,789✔
1359
  }
1360
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
2,828✔
1361

1362
  SName name = {0};
2,828✔
1363
  TAOS_CHECK_RETURN(tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
2,828!
1364

1365
  if (createReq.sql == NULL && createReq.sqlLen == 0) {
3,033!
1366
    char detail[1000] = {0};
205✔
1367

1368
    sprintf(detail, "dbname:%s, stable name:%s", name.dbname, name.tname);
205✔
1369

1370
    auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, detail, strlen(detail));
205✔
1371
  } else {
1372
    auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, createReq.sql, createReq.sqlLen);
2,623✔
1373
  }
1374
_OVER:
2,887✔
1375
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,887✔
1376
    mError("stb:%s, failed to create since %s", createReq.name, tstrerror(code));
7!
1377
  }
1378

1379
  mndReleaseStb(pMnode, pStb);
2,887✔
1380
  mndReleaseDb(pMnode, pDb);
2,887✔
1381
  tFreeSMCreateStbReq(&createReq);
2,887✔
1382

1383
  TAOS_RETURN(code);
2,887✔
1384
}
1385

1386
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
621✔
1387
  int32_t code = 0;
621✔
1388
  if (pAlter->commentLen >= 0) return 0;
621!
1389
  if (pAlter->ttl != 0) return 0;
×
1390

1391
  if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
×
1392
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1393
    TAOS_RETURN(code);
×
1394
  }
1395

1396
  for (int32_t i = 0; i < pAlter->numOfFields; ++i) {
×
1397
    SField *pField = taosArrayGet(pAlter->pFields, i);
×
1398
    if (pField->name[0] == 0) {
×
1399
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1400
      TAOS_RETURN(code);
×
1401
    }
1402
  }
1403

1404
  TAOS_RETURN(code);
×
1405
}
1406

1407
int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
558✔
1408
  pNew->pTags = taosMemoryCalloc(pNew->numOfTags, sizeof(SSchema));
558✔
1409
  pNew->pColumns = taosMemoryCalloc(pNew->numOfColumns, sizeof(SSchema));
558✔
1410
  pNew->pCmpr = taosMemoryCalloc(pNew->numOfColumns, sizeof(SColCmpr));
558✔
1411
  if (pNew->pTags == NULL || pNew->pColumns == NULL || pNew->pCmpr == NULL) {
558!
1412
    TAOS_RETURN(terrno);
×
1413
  }
1414

1415
  memcpy(pNew->pColumns, pOld->pColumns, sizeof(SSchema) * pOld->numOfColumns);
558✔
1416
  memcpy(pNew->pTags, pOld->pTags, sizeof(SSchema) * pOld->numOfTags);
558✔
1417
  memcpy(pNew->pCmpr, pOld->pCmpr, sizeof(SColCmpr) * pOld->numOfColumns);
558✔
1418

1419
  TAOS_RETURN(0);
558✔
1420
}
1421

1422
static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen,
29✔
1423
                                         int32_t ttl) {
1424
  int32_t code = 0;
29✔
1425
  if (commentLen > 0) {
29✔
1426
    pNew->commentLen = commentLen;
19✔
1427
    pNew->comment = taosMemoryCalloc(1, commentLen + 1);
19✔
1428
    if (pNew->comment == NULL) {
19!
1429
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1430
      return -1;
×
1431
    }
1432
    memcpy(pNew->comment, pComment, commentLen + 1);
19✔
1433
  } else if (commentLen == 0) {
10!
1434
    pNew->commentLen = 0;
10✔
1435
  } else {
1436
  }
1437

1438
  if (ttl >= 0) {
29!
1439
    pNew->ttl = ttl;
29✔
1440
  }
1441

1442
  if ((code = mndAllocStbSchemas(pOld, pNew)) != 0) {
29!
1443
    TAOS_RETURN(code);
×
1444
  }
1445
  TAOS_RETURN(code);
29✔
1446
}
1447

1448
static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ntags) {
80✔
1449
  int32_t code = 0;
80✔
1450
  if (pOld->numOfTags + ntags > TSDB_MAX_TAGS) {
80!
1451
    code = TSDB_CODE_MND_TOO_MANY_TAGS;
×
1452
    TAOS_RETURN(code);
×
1453
  }
1454

1455
  if (pOld->numOfColumns + ntags + pOld->numOfTags > TSDB_MAX_COLUMNS) {
80!
1456
    code = TSDB_CODE_MND_TOO_MANY_COLUMNS;
×
1457
    TAOS_RETURN(code);
×
1458
  }
1459

1460
  if (!mndValidateSchema(pOld->pTags, pOld->numOfTags, pFields, TSDB_MAX_TAGS_LEN)) {
80!
UNCOV
1461
    code = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
×
UNCOV
1462
    TAOS_RETURN(code);
×
1463
  }
1464

1465
  pNew->numOfTags = pNew->numOfTags + ntags;
80✔
1466
  if ((code = mndAllocStbSchemas(pOld, pNew)) != 0) {
80!
1467
    TAOS_RETURN(code);
×
1468
  }
1469

1470
  if (pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags) {
80!
1471
    code = TSDB_CODE_OUT_OF_RANGE;
×
1472
    TAOS_RETURN(code);
×
1473
  }
1474

1475
  for (int32_t i = 0; i < ntags; i++) {
155✔
1476
    SField *pField = taosArrayGet(pFields, i);
80✔
1477
    if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
80✔
1478
      code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
2✔
1479
      TAOS_RETURN(code);
2✔
1480
    }
1481

1482
    if (mndFindSuperTableTagIndex(pOld, pField->name) >= 0) {
78✔
1483
      code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
3✔
1484
      TAOS_RETURN(code);
3✔
1485
    }
1486

1487
    SSchema *pSchema = &pNew->pTags[pOld->numOfTags + i];
75✔
1488
    pSchema->bytes = pField->bytes;
75✔
1489
    pSchema->type = pField->type;
75✔
1490
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
75✔
1491
    pSchema->colId = pNew->nextColId;
75✔
1492
    pNew->nextColId++;
75✔
1493

1494
    mInfo("stb:%s, start to add tag %s", pNew->name, pSchema->name);
75!
1495
  }
1496

1497
  pNew->tagVer++;
75✔
1498
  TAOS_RETURN(code);
75✔
1499
}
1500

1501
static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
395✔
1502
  int32_t code = 0;
395✔
1503
  SSdb   *pSdb = pMnode->pSdb;
395✔
1504
  void   *pIter = NULL;
395✔
1505
  while (1) {
998✔
1506
    SMqTopicObj *pTopic = NULL;
1,393✔
1507
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
1,393✔
1508
    if (pIter == NULL) break;
1,393✔
1509

1510
    mInfo("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
1,075!
1511
          pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql);
1512
    if (pTopic->ast == NULL) {
1,075✔
1513
      sdbRelease(pSdb, pTopic);
103✔
1514
      continue;
103✔
1515
    }
1516

1517
    SNode *pAst = NULL;
972✔
1518
    if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
972!
1519
      code = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
×
1520
      mError("topic:%s, create ast error", pTopic->name);
×
1521
      sdbRelease(pSdb, pTopic);
×
1522
      sdbCancelFetch(pSdb, pIter);
×
1523
      TAOS_RETURN(code);
77✔
1524
    }
1525

1526
    SNodeList *pNodeList = NULL;
972✔
1527
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
972!
1528
        0) {
1529
      sdbRelease(pSdb, pTopic);
×
1530
      sdbCancelFetch(pSdb, pIter);
×
1531
      TAOS_RETURN(code);
×
1532
    }
1533
    SNode *pNode = NULL;
972✔
1534
    FOREACH(pNode, pNodeList) {
1,558✔
1535
      SColumnNode *pCol = (SColumnNode *)pNode;
1,500✔
1536
      mInfo("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId, pCol->tableId,
1,500!
1537
            pTopic->ctbStbUid);
1538

1539
      if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
1,500✔
1540
        mInfo("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
837!
1541
        goto NEXT;
837✔
1542
      }
1543
      if (pCol->colId > 0 && pCol->colId == colId) {
663!
1544
        code = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
77✔
1545
        mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
77!
1546
        nodesDestroyNode(pAst);
77✔
1547
        nodesDestroyList(pNodeList);
77✔
1548
        sdbCancelFetch(pSdb, pIter);
77✔
1549
        sdbRelease(pSdb, pTopic);
77✔
1550
        TAOS_RETURN(code);
77✔
1551
      }
1552
      mInfo("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
586!
1553
    }
1554

1555
  NEXT:
58✔
1556
    sdbRelease(pSdb, pTopic);
895✔
1557
    nodesDestroyNode(pAst);
895✔
1558
    nodesDestroyList(pNodeList);
895✔
1559
  }
1560
  TAOS_RETURN(code);
318✔
1561
}
1562

1563
static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
318✔
1564
  int32_t code = 0;
318✔
1565
  SSdb   *pSdb = pMnode->pSdb;
318✔
1566
  void   *pIter = NULL;
318✔
UNCOV
1567
  while (1) {
×
1568
    SStreamObj *pStream = NULL;
318✔
1569
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
318✔
1570
    if (pIter == NULL) break;
318!
1571

UNCOV
1572
    SNode *pAst = NULL;
×
UNCOV
1573
    if (nodesStringToNode(pStream->ast, &pAst) != 0) {
×
1574
      code = TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
1575
      mError("stream:%s, create ast error", pStream->name);
×
1576
      sdbRelease(pSdb, pStream);
×
1577
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1578
      TAOS_RETURN(code);
×
1579
    }
1580

UNCOV
1581
    SNodeList *pNodeList = NULL;
×
UNCOV
1582
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
×
1583
        0) {
1584
      sdbRelease(pSdb, pStream);
×
1585
      sdbCancelFetch(pSdb, pIter);
×
1586
      TAOS_RETURN(code);
×
1587
    }
1588

UNCOV
1589
    SNode *pNode = NULL;
×
UNCOV
1590
    FOREACH(pNode, pNodeList) {
×
UNCOV
1591
      SColumnNode *pCol = (SColumnNode *)pNode;
×
1592

UNCOV
1593
      if (pCol->tableId != suid) {
×
UNCOV
1594
        mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
×
UNCOV
1595
        goto NEXT;
×
1596
      }
UNCOV
1597
      if (pCol->colId > 0 && pCol->colId == colId) {
×
UNCOV
1598
        code = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
UNCOV
1599
        mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId);
×
UNCOV
1600
        nodesDestroyNode(pAst);
×
UNCOV
1601
        nodesDestroyList(pNodeList);
×
UNCOV
1602
        sdbRelease(pSdb, pStream);
×
UNCOV
1603
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
1604
        TAOS_RETURN(code);
×
1605
      }
UNCOV
1606
      mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
×
1607
    }
1608

UNCOV
1609
  NEXT:
×
UNCOV
1610
    sdbRelease(pSdb, pStream);
×
UNCOV
1611
    nodesDestroyNode(pAst);
×
UNCOV
1612
    nodesDestroyList(pNodeList);
×
1613
  }
1614
  TAOS_RETURN(code);
318✔
1615
}
1616

1617
static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
318✔
1618
  int32_t code = 0;
318✔
1619
  SSdb   *pSdb = pMnode->pSdb;
318✔
1620
  void   *pIter = NULL;
318✔
UNCOV
1621
  while (1) {
×
1622
    SSmaObj *pSma = NULL;
318✔
1623
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
318✔
1624
    if (pIter == NULL) break;
318!
1625

UNCOV
1626
    mInfo("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbFullName,
×
1627
          suid, colId, pSma->sql);
1628

UNCOV
1629
    SNode *pAst = NULL;
×
UNCOV
1630
    if (nodesStringToNode(pSma->ast, &pAst) != 0) {
×
1631
      code = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
×
1632
      mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
×
1633
             pSma->name, stbFullName, suid, colId);
1634
      sdbCancelFetch(pSdb, pIter);
×
1635
      TAOS_RETURN(code);
×
1636
    }
1637

UNCOV
1638
    SNodeList *pNodeList = NULL;
×
UNCOV
1639
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
×
1640
        0) {
1641
      sdbCancelFetch(pSdb, pIter);
×
1642
      TAOS_RETURN(code);
×
1643
    }
UNCOV
1644
    SNode *pNode = NULL;
×
UNCOV
1645
    FOREACH(pNode, pNodeList) {
×
UNCOV
1646
      SColumnNode *pCol = (SColumnNode *)pNode;
×
UNCOV
1647
      mInfo("tsma:%s, check colId:%d tableId:%" PRId64, pSma->name, pCol->colId, pCol->tableId);
×
1648

UNCOV
1649
      if ((pCol->tableId != suid) && (pSma->stbUid != suid)) {
×
UNCOV
1650
        mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
×
UNCOV
1651
        goto NEXT;
×
1652
      }
UNCOV
1653
      if ((pCol->colId) > 0 && (pCol->colId == colId)) {
×
1654
        code = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA;
×
1655
        mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId);
×
1656
        nodesDestroyNode(pAst);
×
1657
        nodesDestroyList(pNodeList);
×
1658
        sdbRelease(pSdb, pSma);
×
1659
        sdbCancelFetch(pSdb, pIter);
×
1660
        TAOS_RETURN(code);
×
1661
      }
UNCOV
1662
      mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
×
1663
    }
1664

UNCOV
1665
  NEXT:
×
UNCOV
1666
    sdbRelease(pSdb, pSma);
×
UNCOV
1667
    nodesDestroyNode(pAst);
×
UNCOV
1668
    nodesDestroyList(pNodeList);
×
1669
  }
1670
  TAOS_RETURN(code);
318✔
1671
}
1672

1673
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
395✔
1674
  TAOS_CHECK_RETURN(mndCheckAlterColForTopic(pMnode, stbFullName, suid, colId));
395✔
1675
  TAOS_CHECK_RETURN(mndCheckAlterColForStream(pMnode, stbFullName, suid, colId));
318!
1676
  TAOS_CHECK_RETURN(mndCheckAlterColForTSma(pMnode, stbFullName, suid, colId));
318!
1677
  TAOS_RETURN(0);
318✔
1678
}
1679

1680
static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
87✔
1681
  int32_t code = 0;
87✔
1682
  int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
87✔
1683
  if (tag < 0) {
87✔
1684
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
3✔
1685
    TAOS_RETURN(code);
3✔
1686
  }
1687

1688
  col_id_t colId = pOld->pTags[tag].colId;
84✔
1689
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
84✔
1690

1691
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
67!
1692

1693
  memmove(pNew->pTags + tag, pNew->pTags + tag + 1, sizeof(SSchema) * (pNew->numOfTags - tag - 1));
67✔
1694
  pNew->numOfTags--;
67✔
1695

1696
  pNew->tagVer++;
67✔
1697

1698
  // if (mndDropIndexByTag(pMnode, pOld, tagName) != 0) {
1699
  //   return -1;
1700
  // }
1701
  mInfo("stb:%s, start to drop tag %s", pNew->name, tagName);
67!
1702
  TAOS_RETURN(code);
67✔
1703
}
1704

1705
static int32_t mndAlterStbTagName(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
85✔
1706
  int32_t code = 0;
85✔
1707
  if ((int32_t)taosArrayGetSize(pFields) != 2) {
85!
1708
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1709
    TAOS_RETURN(code);
×
1710
  }
1711

1712
  SField *pField0 = taosArrayGet(pFields, 0);
85✔
1713
  SField *pField1 = taosArrayGet(pFields, 1);
85✔
1714

1715
  const char *oldTagName = pField0->name;
85✔
1716
  const char *newTagName = pField1->name;
85✔
1717

1718
  int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName);
85✔
1719
  if (tag < 0) {
85✔
1720
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
7✔
1721
    TAOS_RETURN(code);
7✔
1722
  }
1723

1724
  col_id_t colId = pOld->pTags[tag].colId;
78✔
1725
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
78✔
1726

1727
  if (mndFindSuperTableTagIndex(pOld, newTagName) >= 0) {
61✔
1728
    code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
7✔
1729
    TAOS_RETURN(code);
7✔
1730
  }
1731

1732
  if (mndFindSuperTableColumnIndex(pOld, newTagName) >= 0) {
54!
1733
    code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
×
1734
    TAOS_RETURN(code);
×
1735
  }
1736

1737
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
54!
1738

1739
  SSchema *pSchema = (SSchema *)(pNew->pTags + tag);
54✔
1740
  memcpy(pSchema->name, newTagName, TSDB_COL_NAME_LEN);
54✔
1741

1742
  pNew->tagVer++;
54✔
1743
  mInfo("stb:%s, start to modify tag %s to %s", pNew->name, oldTagName, newTagName);
54!
1744
  TAOS_RETURN(code);
54✔
1745
}
1746

1747
static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
34✔
1748
  int32_t code = 0;
34✔
1749
  int32_t tag = mndFindSuperTableTagIndex(pOld, pField->name);
34✔
1750
  if (tag < 0) {
34!
1751
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
×
1752
    TAOS_RETURN(code);
×
1753
  }
1754

1755
  col_id_t colId = pOld->pTags[tag].colId;
34✔
1756
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
34✔
1757

1758
  uint32_t nLen = 0;
25✔
1759
  for (int32_t i = 0; i < pOld->numOfTags; ++i) {
116✔
1760
    nLen += (pOld->pTags[i].colId == colId) ? pField->bytes : pOld->pTags[i].bytes;
91✔
1761
  }
1762

1763
  if (nLen > TSDB_MAX_TAGS_LEN) {
25!
UNCOV
1764
    code = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
×
UNCOV
1765
    TAOS_RETURN(code);
×
1766
  }
1767

1768
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
25!
1769

1770
  SSchema *pTag = pNew->pTags + tag;
25✔
1771

1772
  if (!(pTag->type == TSDB_DATA_TYPE_BINARY || pTag->type == TSDB_DATA_TYPE_VARBINARY ||
25!
1773
        pTag->type == TSDB_DATA_TYPE_NCHAR || pTag->type == TSDB_DATA_TYPE_GEOMETRY)) {
8!
1774
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1775
    TAOS_RETURN(code);
×
1776
  }
1777

1778
  if (pField->bytes <= pTag->bytes) {
25!
UNCOV
1779
    code = TSDB_CODE_MND_INVALID_ROW_BYTES;
×
UNCOV
1780
    TAOS_RETURN(code);
×
1781
  }
1782

1783
  pTag->bytes = pField->bytes;
25✔
1784
  pNew->tagVer++;
25✔
1785

1786
  mInfo("stb:%s, start to modify tag len %s to %d", pNew->name, pField->name, pField->bytes);
25!
1787
  TAOS_RETURN(code);
25✔
1788
}
1789

UNCOV
1790
static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pField,
×
1791
                                                 int32_t nCols) {
1792
  // if (pColCmpr == NULL || colName == NULL) return -1;
1793

UNCOV
1794
  if (taosArrayGetSize(pField) != nCols) return TSDB_CODE_FAILED;
×
UNCOV
1795
  TAOS_FIELD *p = taosArrayGet(pField, 0);
×
1796

UNCOV
1797
  int32_t code = 0;
×
UNCOV
1798
  int32_t idx = mndFindSuperTableColumnIndex(pOld, p->name);
×
UNCOV
1799
  if (idx == -1) {
×
1800
    code = TSDB_CODE_MND_COLUMN_NOT_EXIST;
×
1801
    TAOS_RETURN(code);
×
1802
  }
UNCOV
1803
  SSchema *pTarget = &pOld->pColumns[idx];
×
UNCOV
1804
  col_id_t colId = pTarget->colId;
×
UNCOV
1805
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
×
1806

UNCOV
1807
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
×
UNCOV
1808
  code = validColCmprByType(pTarget->type, p->bytes);
×
UNCOV
1809
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1810
    TAOS_RETURN(code);
×
1811
  }
1812

UNCOV
1813
  int8_t updated = 0;
×
UNCOV
1814
  for (int i = 0; i < pNew->numOfColumns; i++) {
×
UNCOV
1815
    SColCmpr *pCmpr = &pNew->pCmpr[i];
×
UNCOV
1816
    if (pCmpr->id == colId) {
×
UNCOV
1817
      uint32_t dst = 0;
×
UNCOV
1818
      updated = tUpdateCompress(pCmpr->alg, p->bytes, TSDB_COLVAL_COMPRESS_DISABLED, TSDB_COLVAL_LEVEL_DISABLED,
×
1819
                                TSDB_COLVAL_LEVEL_MEDIUM, &dst);
UNCOV
1820
      if (updated > 0) pCmpr->alg = dst;
×
UNCOV
1821
      break;
×
1822
    }
1823
  }
1824

UNCOV
1825
  if (updated == 0) {
×
UNCOV
1826
    code = TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST;
×
UNCOV
1827
    TAOS_RETURN(code);
×
UNCOV
1828
  } else if (updated == -1) {
×
1829
    code = TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
×
1830
    TAOS_RETURN(code);
×
1831
  }
1832

UNCOV
1833
  pNew->colVer++;
×
1834

UNCOV
1835
  TAOS_RETURN(code);
×
1836
}
1837
static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ncols,
163✔
1838
                                      int8_t withCompress) {
1839
  int32_t code = 0;
163✔
1840
  if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) {
163!
1841
    code = TSDB_CODE_MND_TOO_MANY_COLUMNS;
×
1842
    TAOS_RETURN(code);
×
1843
  }
1844

1845
  if ((code = grantCheck(TSDB_GRANT_TIMESERIES)) != 0) {
163!
1846
    TAOS_RETURN(code);
×
1847
  }
1848

1849
  if (!mndValidateSchema(pOld->pColumns, pOld->numOfColumns, pFields, TSDB_MAX_BYTES_PER_ROW)) {
163!
UNCOV
1850
    code = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
×
UNCOV
1851
    TAOS_RETURN(code);
×
1852
  }
1853

1854
  pNew->numOfColumns = pNew->numOfColumns + ncols;
163✔
1855

1856
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
163!
1857

1858
  if (pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ncols) {
163!
1859
    code = TSDB_CODE_OUT_OF_RANGE;
×
1860
    TAOS_RETURN(code);
×
1861
  }
1862

1863
  for (int32_t i = 0; i < ncols; i++) {
314✔
1864
    if (withCompress) {
163!
UNCOV
1865
      SFieldWithOptions *pField = taosArrayGet(pFields, i);
×
UNCOV
1866
      if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
×
1867
        code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
×
1868
        TAOS_RETURN(code);
×
1869
      }
1870

UNCOV
1871
      if (mndFindSuperTableTagIndex(pOld, pField->name) >= 0) {
×
1872
        code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
×
1873
        TAOS_RETURN(code);
×
1874
      }
1875

UNCOV
1876
      SSchema *pSchema = &pNew->pColumns[pOld->numOfColumns + i];
×
UNCOV
1877
      pSchema->bytes = pField->bytes;
×
UNCOV
1878
      pSchema->type = pField->type;
×
UNCOV
1879
      memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
×
UNCOV
1880
      pSchema->colId = pNew->nextColId;
×
UNCOV
1881
      pNew->nextColId++;
×
1882

UNCOV
1883
      SColCmpr *pCmpr = &pNew->pCmpr[pOld->numOfColumns + i];
×
UNCOV
1884
      pCmpr->id = pSchema->colId;
×
UNCOV
1885
      pCmpr->alg = pField->compress;
×
UNCOV
1886
      mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name);
×
1887
    } else {
1888
      SField *pField = taosArrayGet(pFields, i);
163✔
1889
      if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
163✔
1890
        code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
6✔
1891
        TAOS_RETURN(code);
6✔
1892
      }
1893

1894
      if (mndFindSuperTableTagIndex(pOld, pField->name) >= 0) {
157✔
1895
        code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
6✔
1896
        TAOS_RETURN(code);
6✔
1897
      }
1898

1899
      SSchema *pSchema = &pNew->pColumns[pOld->numOfColumns + i];
151✔
1900
      pSchema->bytes = pField->bytes;
151✔
1901
      pSchema->type = pField->type;
151✔
1902
      memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
151✔
1903
      pSchema->colId = pNew->nextColId;
151✔
1904
      pNew->nextColId++;
151✔
1905

1906
      SColCmpr *pCmpr = &pNew->pCmpr[pOld->numOfColumns + i];
151✔
1907
      pCmpr->id = pSchema->colId;
151✔
1908
      pCmpr->alg = createDefaultColCmprByType(pSchema->type);
151✔
1909
      mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name);
151!
1910
    }
1911
  }
1912

1913
  pNew->colVer++;
151✔
1914
  TAOS_RETURN(code);
151✔
1915
}
1916

1917
static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *colName) {
96✔
1918
  int32_t code = 0;
96✔
1919
  int32_t col = mndFindSuperTableColumnIndex(pOld, colName);
96✔
1920
  if (col < 0) {
96✔
1921
    code = TSDB_CODE_MND_COLUMN_NOT_EXIST;
6✔
1922
    TAOS_RETURN(code);
6✔
1923
  }
1924

1925
  if (col == 0) {
90✔
1926
    code = TSDB_CODE_MND_INVALID_STB_ALTER_OPTION;
3✔
1927
    TAOS_RETURN(code);
3✔
1928
  }
1929

1930
  if (pOld->numOfColumns == 2) {
87!
1931
    code = TSDB_CODE_PAR_INVALID_DROP_COL;
×
1932
    TAOS_RETURN(code);
×
1933
  }
1934

1935
  col_id_t colId = pOld->pColumns[col].colId;
87✔
1936
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
87✔
1937

1938
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
65!
1939

1940
  int32_t sz = pNew->numOfColumns - col - 1;
65✔
1941
  memmove(pNew->pColumns + col, pNew->pColumns + col + 1, sizeof(SSchema) * sz);
65✔
1942
  memmove(pNew->pCmpr + col, pNew->pCmpr + col + 1, sizeof(SColCmpr) * sz);
65✔
1943
  pNew->numOfColumns--;
65✔
1944

1945
  pNew->colVer++;
65✔
1946
  mInfo("stb:%s, start to drop col %s", pNew->name, colName);
65!
1947
  TAOS_RETURN(code);
65✔
1948
}
1949

1950
static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
47✔
1951
  int32_t code = 0;
47✔
1952
  int32_t col = mndFindSuperTableColumnIndex(pOld, pField->name);
47✔
1953
  if (col < 0) {
47✔
1954
    code = TSDB_CODE_MND_COLUMN_NOT_EXIST;
1✔
1955
    TAOS_RETURN(code);
1✔
1956
  }
1957

1958
  col_id_t colId = pOld->pColumns[col].colId;
46✔
1959

1960
  uint32_t nLen = 0;
46✔
1961
  for (int32_t i = 0; i < pOld->numOfColumns; ++i) {
301✔
1962
    nLen += (pOld->pColumns[i].colId == colId) ? pField->bytes : pOld->pColumns[i].bytes;
255✔
1963
  }
1964

1965
  if (nLen > TSDB_MAX_BYTES_PER_ROW) {
46!
UNCOV
1966
    code = TSDB_CODE_MND_INVALID_ROW_BYTES;
×
UNCOV
1967
    TAOS_RETURN(code);
×
1968
  }
1969

1970
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
46✔
1971

1972
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
34!
1973

1974
  SSchema *pCol = pNew->pColumns + col;
34✔
1975
  if (!(pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_VARBINARY ||
34!
1976
        pCol->type == TSDB_DATA_TYPE_NCHAR || pCol->type == TSDB_DATA_TYPE_GEOMETRY)) {
8!
1977
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1978
    TAOS_RETURN(code);
×
1979
  }
1980

1981
  if (pField->bytes <= pCol->bytes) {
34!
UNCOV
1982
    code = TSDB_CODE_MND_INVALID_ROW_BYTES;
×
UNCOV
1983
    TAOS_RETURN(code);
×
1984
  }
1985

1986
  pCol->bytes = pField->bytes;
34✔
1987
  pNew->colVer++;
34✔
1988

1989
  mInfo("stb:%s, start to modify col len %s to %d", pNew->name, pField->name, pField->bytes);
34!
1990
  TAOS_RETURN(code);
34✔
1991
}
1992

1993
static int32_t mndSetAlterStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
539✔
1994
  int32_t  code = 0;
539✔
1995
  SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
539✔
1996
  if (pRedoRaw == NULL) {
539!
1997
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1998
    if (terrno != 0) code = terrno;
×
1999
    TAOS_RETURN(code);
×
2000
  }
2001
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
539!
2002
    sdbFreeRaw(pRedoRaw);
×
2003
    TAOS_RETURN(code);
×
2004
  }
2005
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
539!
2006

2007
  TAOS_RETURN(code);
539✔
2008
}
2009

2010
static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
539✔
2011
  int32_t  code = 0;
539✔
2012
  SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
539✔
2013
  if (pCommitRaw == NULL) {
539!
2014
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2015
    if (terrno != 0) code = terrno;
×
2016
    TAOS_RETURN(code);
×
2017
  }
2018
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
539!
2019
    sdbFreeRaw(pCommitRaw);
×
2020
    TAOS_RETURN(code);
×
2021
  }
2022
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
539!
2023

2024
  TAOS_RETURN(code);
539✔
2025
}
2026

2027
static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void *alterOriData,
539✔
2028
                                         int32_t alterOriDataLen) {
2029
  int32_t code = 0;
539✔
2030
  SSdb   *pSdb = pMnode->pSdb;
539✔
2031
  SVgObj *pVgroup = NULL;
539✔
2032
  void   *pIter = NULL;
539✔
2033
  int32_t contLen;
2034

2035
  while (1) {
3,384✔
2036
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
3,923✔
2037
    if (pIter == NULL) break;
3,923✔
2038
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
3,384✔
2039
      sdbRelease(pSdb, pVgroup);
2,124✔
2040
      continue;
2,124✔
2041
    }
2042

2043
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, alterOriData, alterOriDataLen);
1,260✔
2044
    if (pReq == NULL) {
1,260!
2045
      sdbCancelFetch(pSdb, pIter);
×
2046
      sdbRelease(pSdb, pVgroup);
×
2047
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2048
      if (terrno != 0) code = terrno;
×
2049
      TAOS_RETURN(code);
×
2050
    }
2051
    STransAction action = {0};
1,260✔
2052
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
1,260✔
2053
    action.pCont = pReq;
1,260✔
2054
    action.contLen = contLen;
1,260✔
2055
    action.msgType = TDMT_VND_ALTER_STB;
1,260✔
2056
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,260!
2057
      taosMemoryFree(pReq);
×
2058
      sdbCancelFetch(pSdb, pIter);
×
2059
      sdbRelease(pSdb, pVgroup);
×
2060
      TAOS_RETURN(code);
×
2061
    }
2062
    sdbRelease(pSdb, pVgroup);
1,260✔
2063
  }
2064

2065
  TAOS_RETURN(code);
539✔
2066
}
2067

2068
static int32_t mndSetAlterStbRedoActions2(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb,
×
2069
                                          void *alterOriData, int32_t alterOriDataLen) {
2070
  int32_t code = 0;
×
2071
  SSdb   *pSdb = pMnode->pSdb;
×
2072
  SVgObj *pVgroup = NULL;
×
2073
  void   *pIter = NULL;
×
2074
  int32_t contLen;
2075

2076
  while (1) {
×
2077
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2078
    if (pIter == NULL) break;
×
2079
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
×
2080
      sdbRelease(pSdb, pVgroup);
×
2081
      continue;
×
2082
    }
2083

2084
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, alterOriData, alterOriDataLen);
×
2085
    if (pReq == NULL) {
×
2086
      sdbCancelFetch(pSdb, pIter);
×
2087
      sdbRelease(pSdb, pVgroup);
×
2088
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2089
      if (terrno != 0) code = terrno;
×
2090
      TAOS_RETURN(code);
×
2091
    }
2092
    STransAction action = {0};
×
2093
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
2094
    action.pCont = pReq;
×
2095
    action.contLen = contLen;
×
2096
    action.msgType = TDMT_VND_CREATE_INDEX;
×
2097
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
2098
      taosMemoryFree(pReq);
×
2099
      sdbCancelFetch(pSdb, pIter);
×
2100
      sdbRelease(pSdb, pVgroup);
×
2101
      TAOS_RETURN(code);
×
2102
    }
2103
    sdbRelease(pSdb, pVgroup);
×
2104
  }
2105

2106
  TAOS_RETURN(code);
×
2107
}
2108
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
5,923✔
2109
  int32_t code = 0;
5,923✔
2110
  taosRLockLatch(&pStb->lock);
5,923✔
2111

2112
  int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
5,923✔
2113
  pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
5,923✔
2114
  if (pRsp->pSchemas == NULL) {
5,923!
2115
    taosRUnLockLatch(&pStb->lock);
×
2116
    code = terrno;
×
2117
    TAOS_RETURN(code);
×
2118
  }
2119
  pRsp->pSchemaExt = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaExt));
5,923✔
2120
  if (pRsp->pSchemaExt == NULL) {
5,923!
2121
    taosRUnLockLatch(&pStb->lock);
×
2122
    code = terrno;
×
2123
    TAOS_RETURN(code);
×
2124
  }
2125

2126
  tstrncpy(pRsp->dbFName, pStb->db, sizeof(pRsp->dbFName));
5,923✔
2127
  tstrncpy(pRsp->tbName, tbName, sizeof(pRsp->tbName));
5,923✔
2128
  tstrncpy(pRsp->stbName, tbName, sizeof(pRsp->stbName));
5,923✔
2129
  pRsp->dbId = pDb->uid;
5,923✔
2130
  pRsp->numOfTags = pStb->numOfTags;
5,923✔
2131
  pRsp->numOfColumns = pStb->numOfColumns;
5,923✔
2132
  pRsp->precision = pDb->cfg.precision;
5,923✔
2133
  pRsp->tableType = TSDB_SUPER_TABLE;
5,923✔
2134
  pRsp->sversion = pStb->colVer;
5,923✔
2135
  pRsp->tversion = pStb->tagVer;
5,923✔
2136
  pRsp->suid = pStb->uid;
5,923✔
2137
  pRsp->tuid = pStb->uid;
5,923✔
2138

2139
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
80,556✔
2140
    SSchema *pSchema = &pRsp->pSchemas[i];
74,633✔
2141
    SSchema *pSrcSchema = &pStb->pColumns[i];
74,633✔
2142
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
74,633✔
2143
    pSchema->type = pSrcSchema->type;
74,633✔
2144
    pSchema->flags = pSrcSchema->flags;
74,633✔
2145
    pSchema->colId = pSrcSchema->colId;
74,633✔
2146
    pSchema->bytes = pSrcSchema->bytes;
74,633✔
2147
  }
2148

2149
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
31,300✔
2150
    SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns];
25,377✔
2151
    SSchema *pSrcSchema = &pStb->pTags[i];
25,377✔
2152
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
25,377✔
2153
    pSchema->type = pSrcSchema->type;
25,377✔
2154
    pSchema->flags = pSrcSchema->flags;
25,377✔
2155
    pSchema->colId = pSrcSchema->colId;
25,377✔
2156
    pSchema->bytes = pSrcSchema->bytes;
25,377✔
2157
  }
2158
  for (int32_t i = 0; i < pStb->numOfColumns; i++) {
80,556✔
2159
    SColCmpr   *pCmpr = &pStb->pCmpr[i];
74,633✔
2160
    SSchemaExt *pSchEx = &pRsp->pSchemaExt[i];
74,633✔
2161
    pSchEx->colId = pCmpr->id;
74,633✔
2162
    pSchEx->compress = pCmpr->alg;
74,633✔
2163
  }
2164

2165
  taosRUnLockLatch(&pStb->lock);
5,923✔
2166
  TAOS_RETURN(code);
5,923✔
2167
}
2168

2169
static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableCfgRsp *pRsp) {
18✔
2170
  int32_t code = 0;
18✔
2171
  taosRLockLatch(&pStb->lock);
18✔
2172

2173
  int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
18✔
2174
  pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
18✔
2175
  if (pRsp->pSchemas == NULL) {
18!
2176
    taosRUnLockLatch(&pStb->lock);
×
2177
    code = terrno;
×
2178
    TAOS_RETURN(code);
×
2179
  }
2180

2181
  tstrncpy(pRsp->dbFName, pStb->db, sizeof(pRsp->dbFName));
18✔
2182
  tstrncpy(pRsp->tbName, tbName, sizeof(pRsp->tbName));
18✔
2183
  tstrncpy(pRsp->stbName, tbName, sizeof(pRsp->stbName));
18✔
2184
  pRsp->numOfTags = pStb->numOfTags;
18✔
2185
  pRsp->numOfColumns = pStb->numOfColumns;
18✔
2186
  pRsp->tableType = TSDB_SUPER_TABLE;
18✔
2187
  pRsp->delay1 = pStb->maxdelay[0];
18✔
2188
  pRsp->delay2 = pStb->maxdelay[1];
18✔
2189
  pRsp->watermark1 = pStb->watermark[0];
18✔
2190
  pRsp->watermark2 = pStb->watermark[1];
18✔
2191
  pRsp->ttl = pStb->ttl;
18✔
2192
  pRsp->commentLen = pStb->commentLen;
18✔
2193
  if (pStb->commentLen > 0) {
18!
2194
    pRsp->pComment = taosStrdup(pStb->comment);
×
2195
  }
2196

2197
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
85✔
2198
    SSchema *pSchema = &pRsp->pSchemas[i];
67✔
2199
    SSchema *pSrcSchema = &pStb->pColumns[i];
67✔
2200
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
67✔
2201
    pSchema->type = pSrcSchema->type;
67✔
2202
    pSchema->flags = pSrcSchema->flags;
67✔
2203
    pSchema->colId = pSrcSchema->colId;
67✔
2204
    pSchema->bytes = pSrcSchema->bytes;
67✔
2205
  }
2206

2207
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
52✔
2208
    SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns];
34✔
2209
    SSchema *pSrcSchema = &pStb->pTags[i];
34✔
2210
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
34✔
2211
    pSchema->type = pSrcSchema->type;
34✔
2212
    pSchema->flags = pSrcSchema->flags;
34✔
2213
    pSchema->colId = pSrcSchema->colId;
34✔
2214
    pSchema->bytes = pSrcSchema->bytes;
34✔
2215
  }
2216

2217
  if (pStb->numOfFuncs > 0) {
18!
2218
    pRsp->pFuncs = taosArrayDup(pStb->pFuncs, NULL);
×
2219
  }
2220

2221
  pRsp->pSchemaExt = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaExt));
18✔
2222
  for (int32_t i = 0; i < pStb->numOfColumns; i++) {
85✔
2223
    SColCmpr *pCmpr = &pStb->pCmpr[i];
67✔
2224

2225
    SSchemaExt *pSchExt = &pRsp->pSchemaExt[i];
67✔
2226
    pSchExt->colId = pCmpr->id;
67✔
2227
    pSchExt->compress = pCmpr->alg;
67✔
2228
  }
2229

2230
  taosRUnLockLatch(&pStb->lock);
18✔
2231
  TAOS_RETURN(code);
18✔
2232
}
2233

2234
static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bool *schema, bool *sma) {
5,951✔
2235
  int32_t code = 0;
5,951✔
2236
  char    tbFName[TSDB_TABLE_FNAME_LEN] = {0};
5,951✔
2237
  snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName);
5,951✔
2238

2239
  SDbObj *pDb = mndAcquireDb(pMnode, pStbVer->dbFName);
5,951✔
2240
  if (pDb == NULL) {
5,951✔
2241
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
86✔
2242
    TAOS_RETURN(code);
86✔
2243
  }
2244

2245
  if (pDb->uid != pStbVer->dbId) {
5,865✔
2246
    mndReleaseDb(pMnode, pDb);
30✔
2247
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
30✔
2248
    TAOS_RETURN(code);
30✔
2249
  }
2250

2251
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
5,835✔
2252
  if (pStb == NULL) {
5,835✔
2253
    mndReleaseDb(pMnode, pDb);
1✔
2254
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
1✔
2255
    TAOS_RETURN(code);
1✔
2256
  }
2257

2258
  taosRLockLatch(&pStb->lock);
5,834✔
2259

2260
  if (pStbVer->sversion != pStb->colVer || pStbVer->tversion != pStb->tagVer) {
5,834!
2261
    *schema = true;
2✔
2262
  } else {
2263
    *schema = false;
5,832✔
2264
  }
2265

2266
  if (pStbVer->smaVer && pStbVer->smaVer != pStb->smaVer) {
5,834!
2267
    *sma = true;
×
2268
  } else {
2269
    *sma = false;
5,834✔
2270
  }
2271

2272
  taosRUnLockLatch(&pStb->lock);
5,834✔
2273

2274
  mndReleaseDb(pMnode, pDb);
5,834✔
2275
  mndReleaseStb(pMnode, pStb);
5,834✔
2276
  return TSDB_CODE_SUCCESS;
5,834✔
2277
}
2278

2279
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
2,870✔
2280
  int32_t code = 0;
2,870✔
2281
  char    tbFName[TSDB_TABLE_FNAME_LEN] = {0};
2,870✔
2282
  snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
2,870✔
2283

2284
  SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
2,870✔
2285
  if (pDb == NULL) {
2,870!
2286
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2287
    TAOS_RETURN(code);
×
2288
  }
2289

2290
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
2,870✔
2291
  if (pStb == NULL) {
2,870✔
2292
    mndReleaseDb(pMnode, pDb);
259✔
2293
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
259✔
2294
    TAOS_RETURN(code);
259✔
2295
  }
2296

2297
  code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp);
2,611✔
2298
  mndReleaseDb(pMnode, pDb);
2,611✔
2299
  mndReleaseStb(pMnode, pStb);
2,611✔
2300
  TAOS_RETURN(code);
2,611✔
2301
}
2302

2303
static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
18✔
2304
  int32_t code = 0;
18✔
2305
  char    tbFName[TSDB_TABLE_FNAME_LEN] = {0};
18✔
2306
  snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
18✔
2307

2308
  SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
18✔
2309
  if (pDb == NULL) {
18!
2310
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2311
    TAOS_RETURN(code);
×
2312
  }
2313

2314
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
18✔
2315
  if (pStb == NULL) {
18!
2316
    mndReleaseDb(pMnode, pDb);
×
2317
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
2318
    TAOS_RETURN(code);
×
2319
  }
2320

2321
  code = mndBuildStbCfgImp(pDb, pStb, tbName, pRsp);
18✔
2322

2323
  mndReleaseDb(pMnode, pDb);
18✔
2324
  mndReleaseStb(pMnode, pStb);
18✔
2325
  TAOS_RETURN(code);
18✔
2326
}
2327

2328
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, int32_t *pLen) {
471✔
2329
  int32_t       code = 0;
471✔
2330
  SEncoder      ec = {0};
471✔
2331
  uint32_t      contLen = 0;
471✔
2332
  SMAlterStbRsp alterRsp = {0};
471✔
2333
  SName         name = {0};
471✔
2334
  TAOS_CHECK_RETURN(tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
471!
2335

2336
  alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
471✔
2337
  if (NULL == alterRsp.pMeta) {
471!
2338
    code = terrno;
×
2339
    TAOS_RETURN(code);
×
2340
  }
2341

2342
  code = mndBuildStbSchemaImp(pDb, pObj, name.tname, alterRsp.pMeta);
471✔
2343
  if (code) {
471!
2344
    tFreeSMAlterStbRsp(&alterRsp);
×
2345
    return code;
×
2346
  }
2347

2348
  tEncodeSize(tEncodeSMAlterStbRsp, &alterRsp, contLen, code);
471!
2349
  if (code) {
471!
2350
    tFreeSMAlterStbRsp(&alterRsp);
×
2351
    return code;
×
2352
  }
2353

2354
  void *cont = taosMemoryMalloc(contLen);
471✔
2355
  if (NULL == cont) {
471!
2356
    code = terrno;
×
2357
    tFreeSMAlterStbRsp(&alterRsp);
×
2358
    TAOS_RETURN(code);
×
2359
  }
2360
  tEncoderInit(&ec, cont, contLen);
471✔
2361
  code = tEncodeSMAlterStbRsp(&ec, &alterRsp);
471✔
2362
  tEncoderClear(&ec);
471✔
2363

2364
  tFreeSMAlterStbRsp(&alterRsp);
471✔
2365

2366
  if (code < 0) TAOS_RETURN(code);
471!
2367

2368
  *pCont = cont;
471✔
2369
  *pLen = contLen;
471✔
2370

2371
  TAOS_RETURN(code);
471✔
2372
}
2373

2374
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char *dbFName, char *stbFName, void **pCont, int32_t *pLen) {
2,843✔
2375
  int32_t code = -1;
2,843✔
2376
  SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
2,843✔
2377
  if (NULL == pDb) {
2,843!
2378
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2379
    if (terrno != 0) code = terrno;
×
2380
    TAOS_RETURN(code);
×
2381
  }
2382

2383
  SStbObj *pObj = mndAcquireStb(pMnode, stbFName);
2,843✔
2384
  if (NULL == pObj) {
2,843✔
2385
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
2✔
2386
    if (terrno != 0) code = terrno;
2!
2387
    goto _OVER;
2✔
2388
  }
2389

2390
  SEncoder       ec = {0};
2,841✔
2391
  uint32_t       contLen = 0;
2,841✔
2392
  SMCreateStbRsp stbRsp = {0};
2,841✔
2393
  SName          name = {0};
2,841✔
2394
  TAOS_CHECK_GOTO(tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE), NULL, _OVER);
2,841!
2395

2396
  stbRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
2,841✔
2397
  if (NULL == stbRsp.pMeta) {
2,841!
2398
    code = terrno;
×
2399
    goto _OVER;
×
2400
  }
2401

2402
  code = mndBuildStbSchemaImp(pDb, pObj, name.tname, stbRsp.pMeta);
2,841✔
2403
  if (code) {
2,841!
2404
    tFreeSMCreateStbRsp(&stbRsp);
×
2405
    goto _OVER;
×
2406
  }
2407

2408
  tEncodeSize(tEncodeSMCreateStbRsp, &stbRsp, contLen, code);
2,841!
2409
  if (code) {
2,841!
2410
    tFreeSMCreateStbRsp(&stbRsp);
×
2411
    goto _OVER;
×
2412
  }
2413

2414
  void *cont = taosMemoryMalloc(contLen);
2,841✔
2415
  if (NULL == cont) {
2,841!
2416
    code = terrno;
×
2417
    tFreeSMCreateStbRsp(&stbRsp);
×
2418
    goto _OVER;
×
2419
  }
2420
  tEncoderInit(&ec, cont, contLen);
2,841✔
2421
  TAOS_CHECK_GOTO(tEncodeSMCreateStbRsp(&ec, &stbRsp), NULL, _OVER);
2,841!
2422
  tEncoderClear(&ec);
2,841✔
2423

2424
  tFreeSMCreateStbRsp(&stbRsp);
2,841✔
2425

2426
  *pCont = cont;
2,841✔
2427
  *pLen = contLen;
2,841✔
2428

2429
  code = 0;
2,841✔
2430

2431
_OVER:
2,843✔
2432
  if (pObj) {
2,843✔
2433
    mndReleaseStb(pMnode, pObj);
2,841✔
2434
  }
2435

2436
  if (pDb) {
2,843!
2437
    mndReleaseDb(pMnode, pDb);
2,843✔
2438
  }
2439

2440
  TAOS_RETURN(code);
2,843✔
2441
}
2442

2443
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
418✔
2444
                              void *alterOriData, int32_t alterOriDataLen) {
2445
  int32_t code = -1;
418✔
2446
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-stb");
418✔
2447
  if (pTrans == NULL) {
418!
2448
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2449
    if (terrno != 0) code = terrno;
×
2450
    goto _OVER;
×
2451
  }
2452

2453
  mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
418!
2454
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
418✔
2455
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
418!
2456

2457
  if (needRsp) {
418✔
2458
    void   *pCont = NULL;
350✔
2459
    int32_t contLen = 0;
350✔
2460
    TAOS_CHECK_GOTO(mndBuildSMAlterStbRsp(pDb, pStb, &pCont, &contLen), NULL, _OVER);
350!
2461
    mndTransSetRpcRsp(pTrans, pCont, contLen);
350✔
2462
  }
2463

2464
  TAOS_CHECK_GOTO(mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
418!
2465
  TAOS_CHECK_GOTO(mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
418!
2466
  TAOS_CHECK_GOTO(mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen), NULL, _OVER);
418!
2467
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
418!
2468

2469
  code = 0;
418✔
2470

2471
_OVER:
418✔
2472
  mndTransDrop(pTrans);
418✔
2473
  TAOS_RETURN(code);
418✔
2474
}
2475

2476
static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
121✔
2477
                                             void *alterOriData, int32_t alterOriDataLen, const SMAlterStbReq *pAlter) {
2478
  int32_t code = -1;
121✔
2479
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-stb");
121✔
2480
  if (pTrans == NULL) {
121!
2481
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2482
    if (terrno != 0) code = terrno;
×
2483
    goto _OVER;
×
2484
  }
2485

2486
  mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
121!
2487
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
121✔
2488

2489
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
121!
2490

2491
  if (needRsp) {
121!
2492
    void   *pCont = NULL;
121✔
2493
    int32_t contLen = 0;
121✔
2494
    TAOS_CHECK_GOTO(mndBuildSMAlterStbRsp(pDb, pStb, &pCont, &contLen), NULL, _OVER);
121!
2495
    mndTransSetRpcRsp(pTrans, pCont, contLen);
121✔
2496
  }
2497

2498
  if (pAlter->alterType == TSDB_ALTER_TABLE_DROP_TAG) {
121✔
2499
    SIdxObj idxObj = {0};
67✔
2500
    SField *pField0 = taosArrayGet(pAlter->pFields, 0);
67✔
2501
    bool    exist = false;
67✔
2502
    if (mndGetIdxsByTagName(pMnode, pStb, pField0->name, &idxObj) == 0) {
67✔
2503
      exist = true;
12✔
2504
    }
2505
    TAOS_CHECK_GOTO(mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
67!
2506
    TAOS_CHECK_GOTO(mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
67!
2507

2508
    if (exist == true) {
67✔
2509
      TAOS_CHECK_GOTO(mndSetDropIdxPrepareLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
12!
2510
      TAOS_CHECK_GOTO(mndSetDropIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
12!
2511
    }
2512

2513
    TAOS_CHECK_GOTO(mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen), NULL, _OVER);
67!
2514
    TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
67!
2515

2516
  } else if (pAlter->alterType == TSDB_ALTER_TABLE_UPDATE_TAG_NAME) {
54!
2517
    SIdxObj     idxObj = {0};
54✔
2518
    SField     *pField0 = taosArrayGet(pAlter->pFields, 0);
54✔
2519
    SField     *pField1 = taosArrayGet(pAlter->pFields, 1);
54✔
2520
    const char *oTagName = pField0->name;
54✔
2521
    const char *nTagName = pField1->name;
54✔
2522
    bool        exist = false;
54✔
2523

2524
    if (mndGetIdxsByTagName(pMnode, pStb, pField0->name, &idxObj) == 0) {
54✔
2525
      exist = true;
24✔
2526
    }
2527

2528
    TAOS_CHECK_GOTO(mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
54!
2529
    TAOS_CHECK_GOTO(mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
54!
2530

2531
    if (exist == true) {
54✔
2532
      memcpy(idxObj.colName, nTagName, strlen(nTagName));
24✔
2533
      idxObj.colName[strlen(nTagName)] = 0;
24✔
2534
      TAOS_CHECK_GOTO(mndSetAlterIdxPrepareLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
24!
2535
      TAOS_CHECK_GOTO(mndSetAlterIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
24!
2536
    }
2537

2538
    TAOS_CHECK_GOTO(mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen), NULL, _OVER);
54!
2539
    TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
54!
2540
  }
2541
  code = 0;
121✔
2542

2543
_OVER:
121✔
2544
  mndTransDrop(pTrans);
121✔
2545
  TAOS_RETURN(code);
121✔
2546
}
2547

2548
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
621✔
2549
  bool    needRsp = true;
621✔
2550
  int32_t code = -1;
621✔
2551
  SField *pField0 = NULL;
621✔
2552

2553
  SStbObj stbObj = {0};
621✔
2554
  taosRLockLatch(&pOld->lock);
621✔
2555
  memcpy(&stbObj, pOld, sizeof(SStbObj));
621✔
2556
  taosRUnLockLatch(&pOld->lock);
621✔
2557
  stbObj.pColumns = NULL;
621✔
2558
  stbObj.pTags = NULL;
621✔
2559
  stbObj.pFuncs = NULL;
621✔
2560
  stbObj.pCmpr = NULL;
621✔
2561
  stbObj.updateTime = taosGetTimestampMs();
621✔
2562
  stbObj.lock = 0;
621✔
2563
  bool updateTagIndex = false;
621✔
2564
  switch (pAlter->alterType) {
621!
2565
    case TSDB_ALTER_TABLE_ADD_TAG:
80✔
2566
      code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
80✔
2567
      break;
80✔
2568
    case TSDB_ALTER_TABLE_DROP_TAG:
87✔
2569
      pField0 = taosArrayGet(pAlter->pFields, 0);
87✔
2570
      code = mndDropSuperTableTag(pMnode, pOld, &stbObj, pField0->name);
87✔
2571
      updateTagIndex = true;
87✔
2572
      break;
87✔
2573
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
85✔
2574
      code = mndAlterStbTagName(pMnode, pOld, &stbObj, pAlter->pFields);
85✔
2575
      updateTagIndex = true;
85✔
2576
      break;
85✔
2577
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
34✔
2578
      pField0 = taosArrayGet(pAlter->pFields, 0);
34✔
2579
      code = mndAlterStbTagBytes(pMnode, pOld, &stbObj, pField0);
34✔
2580
      break;
34✔
2581
    case TSDB_ALTER_TABLE_ADD_COLUMN:
163✔
2582
      code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields, 0);
163✔
2583
      break;
163✔
2584
    case TSDB_ALTER_TABLE_DROP_COLUMN:
96✔
2585
      pField0 = taosArrayGet(pAlter->pFields, 0);
96✔
2586
      code = mndDropSuperTableColumn(pMnode, pOld, &stbObj, pField0->name);
96✔
2587
      break;
96✔
2588
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
47✔
2589
      pField0 = taosArrayGet(pAlter->pFields, 0);
47✔
2590
      code = mndAlterStbColumnBytes(pMnode, pOld, &stbObj, pField0);
47✔
2591
      break;
47✔
2592
    case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
29✔
2593
      needRsp = false;
29✔
2594
      code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl);
29✔
2595
      break;
29✔
UNCOV
2596
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
×
UNCOV
2597
      code = mndUpdateSuperTableColumnCompress(pMnode, pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
×
UNCOV
2598
      break;
×
UNCOV
2599
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
×
UNCOV
2600
      code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields, 1);
×
UNCOV
2601
      break;
×
2602
    default:
×
2603
      needRsp = false;
×
2604
      terrno = TSDB_CODE_OPS_NOT_SUPPORT;
×
2605
      break;
×
2606
  }
2607

2608
  if (code != 0) goto _OVER;
621✔
2609
  if (updateTagIndex == false) {
500✔
2610
    code = mndAlterStbImp(pMnode, pReq, pDb, &stbObj, needRsp, pReq->pCont, pReq->contLen);
379✔
2611
  } else {
2612
    code = mndAlterStbAndUpdateTagIdxImp(pMnode, pReq, pDb, &stbObj, needRsp, pReq->pCont, pReq->contLen, pAlter);
121✔
2613
  }
2614

2615
_OVER:
621✔
2616
  taosMemoryFreeClear(stbObj.pTags);
621✔
2617
  taosMemoryFreeClear(stbObj.pColumns);
621✔
2618
  taosMemoryFreeClear(stbObj.pCmpr);
621✔
2619
  if (pAlter->commentLen > 0) {
621✔
2620
    taosMemoryFreeClear(stbObj.comment);
19!
2621
  }
2622
  TAOS_RETURN(code);
621✔
2623
}
2624

2625
static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
621✔
2626
  SMnode       *pMnode = pReq->info.node;
621✔
2627
  int32_t       code = -1;
621✔
2628
  SDbObj       *pDb = NULL;
621✔
2629
  SStbObj      *pStb = NULL;
621✔
2630
  SMAlterStbReq alterReq = {0};
621✔
2631

2632
  if (tDeserializeSMAlterStbReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
621!
2633
    code = TSDB_CODE_INVALID_MSG;
×
2634
    goto _OVER;
×
2635
  }
2636

2637
  mInfo("stb:%s, start to alter", alterReq.name);
621!
2638
  if (mndCheckAlterStbReq(&alterReq) != 0) goto _OVER;
621!
2639

2640
  pDb = mndAcquireDbByStb(pMnode, alterReq.name);
621✔
2641
  if (pDb == NULL) {
621!
2642
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
2643
    goto _OVER;
×
2644
  }
2645

2646
  pStb = mndAcquireStb(pMnode, alterReq.name);
621✔
2647
  if (pStb == NULL) {
621!
UNCOV
2648
    code = TSDB_CODE_MND_STB_NOT_EXIST;
×
UNCOV
2649
    goto _OVER;
×
2650
  }
2651

2652
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
621!
2653
    goto _OVER;
×
2654
  }
2655

2656
  code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
621✔
2657
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
621✔
2658

2659
  SName   name = {0};
621✔
2660
  int32_t ret = 0;
621✔
2661
  if ((ret = tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
621!
2662
    mError("stb:%s, failed to tNameFromString since %s", alterReq.name, tstrerror(ret));
×
2663

2664
  auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, name.tname, alterReq.sql, alterReq.sqlLen);
621✔
2665

2666
_OVER:
621✔
2667
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
621!
2668
    mError("stb:%s, failed to alter since %s", alterReq.name, tstrerror(code));
121!
2669
  }
2670

2671
  mndReleaseStb(pMnode, pStb);
621✔
2672
  mndReleaseDb(pMnode, pDb);
621✔
2673
  tFreeSMAltertbReq(&alterReq);
621✔
2674

2675
  TAOS_RETURN(code);
621✔
2676
}
2677

2678
static int32_t mndSetDropStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
132✔
2679
  int32_t  code = 0;
132✔
2680
  SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
132✔
2681
  if (pRedoRaw == NULL) {
132!
2682
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2683
    if (terrno != 0) code = terrno;
×
2684
    TAOS_RETURN(code);
×
2685
  }
2686
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
132!
2687
    sdbFreeRaw(pRedoRaw);
×
2688
    TAOS_RETURN(code);
×
2689
  }
2690
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
132!
2691

2692
  TAOS_RETURN(code);
132✔
2693
}
2694

2695
static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
132✔
2696
  int32_t  code = 0;
132✔
2697
  SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
132✔
2698
  if (pCommitRaw == NULL) {
132!
2699
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2700
    if (terrno != 0) code = terrno;
×
2701
    TAOS_RETURN(code);
×
2702
  }
2703
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
132!
2704
    sdbFreeRaw(pCommitRaw);
×
2705
    TAOS_RETURN(code);
×
2706
  }
2707
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
132!
2708

2709
  TAOS_RETURN(code);
132✔
2710
}
2711

2712
static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
132✔
2713
  int32_t code = 0;
132✔
2714
  SSdb   *pSdb = pMnode->pSdb;
132✔
2715
  SVgObj *pVgroup = NULL;
132✔
2716
  void   *pIter = NULL;
132✔
2717

2718
  while (1) {
715✔
2719
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
847✔
2720
    if (pIter == NULL) break;
847✔
2721
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
715✔
2722
      sdbRelease(pSdb, pVgroup);
430✔
2723
      continue;
430✔
2724
    }
2725

2726
    int32_t contLen = 0;
285✔
2727
    void   *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
285✔
2728
    if (pReq == NULL) {
285!
2729
      sdbCancelFetch(pSdb, pIter);
×
2730
      sdbRelease(pSdb, pVgroup);
×
2731
      code = TSDB_CODE_OUT_OF_MEMORY;
×
2732
      TAOS_RETURN(code);
×
2733
    }
2734

2735
    STransAction action = {0};
285✔
2736
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
285✔
2737
    action.pCont = pReq;
285✔
2738
    action.contLen = contLen;
285✔
2739
    action.msgType = TDMT_VND_DROP_STB;
285✔
2740
    action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
285✔
2741
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
285!
2742
      taosMemoryFree(pReq);
×
2743
      sdbCancelFetch(pSdb, pIter);
×
2744
      sdbRelease(pSdb, pVgroup);
×
2745
      TAOS_RETURN(code);
×
2746
    }
2747
    sdbRelease(pSdb, pVgroup);
285✔
2748
  }
2749

2750
  TAOS_RETURN(code);
132✔
2751
}
2752

2753
static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
132✔
2754
  int32_t code = -1;
132✔
2755
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stb");
132✔
2756
  if (pTrans == NULL) {
132!
2757
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2758
    if (terrno != 0) code = terrno;
×
2759
    goto _OVER;
×
2760
  }
2761

2762
  mInfo("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
132!
2763
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
132✔
2764
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
132!
2765

2766
  TAOS_CHECK_GOTO(mndSetDropStbPrepareLogs(pMnode, pTrans, pStb), NULL, _OVER);
132!
2767
  TAOS_CHECK_GOTO(mndSetDropStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
132!
2768
  TAOS_CHECK_GOTO(mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb), NULL, _OVER);
132!
2769
  TAOS_CHECK_GOTO(mndDropIdxsByStb(pMnode, pTrans, pDb, pStb), NULL, _OVER);
132!
2770
  TAOS_CHECK_GOTO(mndDropSmasByStb(pMnode, pTrans, pDb, pStb), NULL, _OVER);
132!
2771
  TAOS_CHECK_GOTO(mndUserRemoveStb(pMnode, pTrans, pStb->name), NULL, _OVER);
132!
2772
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
132!
2773
  code = 0;
132✔
2774

2775
_OVER:
132✔
2776
  mndTransDrop(pTrans);
132✔
2777
  TAOS_RETURN(code);
132✔
2778
}
2779

2780
static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) {
134✔
2781
  int32_t code = 0;
134✔
2782
  SSdb   *pSdb = pMnode->pSdb;
134✔
2783
  void   *pIter = NULL;
134✔
2784
  while (1) {
77✔
2785
    SMqTopicObj *pTopic = NULL;
211✔
2786
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
211✔
2787
    if (pIter == NULL) break;
211✔
2788

2789
    if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
77✔
2790
      if (pTopic->stbUid == suid) {
14!
2791
        sdbRelease(pSdb, pTopic);
×
2792
        sdbCancelFetch(pSdb, pIter);
×
2793
        TAOS_RETURN(-1);
×
2794
      }
2795
    }
2796

2797
    if (pTopic->ast == NULL) {
77✔
2798
      sdbRelease(pSdb, pTopic);
35✔
2799
      continue;
35✔
2800
    }
2801

2802
    SNode *pAst = NULL;
42✔
2803
    if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
42!
2804
      code = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
×
2805
      mError("topic:%s, create ast error", pTopic->name);
×
2806
      sdbRelease(pSdb, pTopic);
×
2807
      sdbCancelFetch(pSdb, pIter);
×
2808
      TAOS_RETURN(code);
×
2809
    }
2810

2811
    SNodeList *pNodeList = NULL;
42✔
2812
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
42!
2813
        0) {
2814
      sdbRelease(pSdb, pTopic);
×
2815
      sdbCancelFetch(pSdb, pIter);
×
2816
      TAOS_RETURN(code);
×
2817
    }
2818
    SNode *pNode = NULL;
42✔
2819
    FOREACH(pNode, pNodeList) {
42✔
2820
      SColumnNode *pCol = (SColumnNode *)pNode;
28✔
2821

2822
      if (pCol->tableId == suid) {
28!
2823
        sdbRelease(pSdb, pTopic);
×
2824
        nodesDestroyNode(pAst);
×
2825
        nodesDestroyList(pNodeList);
×
2826
        sdbCancelFetch(pSdb, pIter);
×
2827
        TAOS_RETURN(-1);
×
2828
      } else {
2829
        goto NEXT;
28✔
2830
      }
2831
    }
2832
  NEXT:
14✔
2833
    sdbRelease(pSdb, pTopic);
42✔
2834
    nodesDestroyNode(pAst);
42✔
2835
    nodesDestroyList(pNodeList);
42✔
2836
  }
2837
  TAOS_RETURN(code);
134✔
2838
}
2839

2840
static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) {
134✔
2841
  int32_t code = 0;
134✔
2842
  SSdb   *pSdb = pMnode->pSdb;
134✔
2843
  void   *pIter = NULL;
134✔
2844
  while (1) {
3✔
2845
    SStreamObj *pStream = NULL;
137✔
2846
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
137✔
2847
    if (pIter == NULL) break;
137✔
2848

2849
    if (pStream->targetStbUid == suid) {
5!
2850
      sdbCancelFetch(pSdb, pIter);
×
2851
      sdbRelease(pSdb, pStream);
×
2852
      TAOS_RETURN(-1);
2✔
2853
    }
2854

2855
    SNode *pAst = NULL;
5✔
2856
    if (nodesStringToNode(pStream->ast, &pAst) != 0) {
5!
2857
      code = TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
2858
      mError("stream:%s, create ast error", pStream->name);
×
2859
      sdbCancelFetch(pSdb, pIter);
×
2860
      sdbRelease(pSdb, pStream);
×
2861
      TAOS_RETURN(code);
×
2862
    }
2863

2864
    SNodeList *pNodeList = NULL;
5✔
2865
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
5!
2866
        0) {
2867
      sdbCancelFetch(pSdb, pIter);
×
2868
      sdbRelease(pSdb, pStream);
×
2869
      TAOS_RETURN(code);
×
2870
    }
2871
    SNode *pNode = NULL;
5✔
2872
    FOREACH(pNode, pNodeList) {
5!
2873
      SColumnNode *pCol = (SColumnNode *)pNode;
5✔
2874

2875
      if (pCol->tableId == suid) {
5✔
2876
        sdbCancelFetch(pSdb, pIter);
2✔
2877
        sdbRelease(pSdb, pStream);
2✔
2878
        nodesDestroyNode(pAst);
2✔
2879
        nodesDestroyList(pNodeList);
2✔
2880
        TAOS_RETURN(-1);
2✔
2881
      } else {
2882
        goto NEXT;
3✔
2883
      }
2884
    }
2885
  NEXT:
×
2886
    sdbRelease(pSdb, pStream);
3✔
2887
    nodesDestroyNode(pAst);
3✔
2888
    nodesDestroyList(pNodeList);
3✔
2889
  }
2890
  TAOS_RETURN(code);
132✔
2891
}
2892

2893
static int32_t mndProcessDropTtltbRsp(SRpcMsg *pRsp) { return 0; }
×
UNCOV
2894
static int32_t mndProcessTrimDbRsp(SRpcMsg *pRsp) { return 0; }
×
2895
static int32_t mndProcessS3MigrateDbRsp(SRpcMsg *pRsp) { return 0; }
×
2896

2897
static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
341✔
2898
  SMnode      *pMnode = pReq->info.node;
341✔
2899
  int32_t      code = -1;
341✔
2900
  SDbObj      *pDb = NULL;
341✔
2901
  SStbObj     *pStb = NULL;
341✔
2902
  SMDropStbReq dropReq = {0};
341✔
2903

2904
  TAOS_CHECK_GOTO(tDeserializeSMDropStbReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
341!
2905

2906
  mInfo("stb:%s, start to drop", dropReq.name);
341!
2907

2908
  pStb = mndAcquireStb(pMnode, dropReq.name);
341✔
2909
  if (pStb == NULL) {
341✔
2910
    if (dropReq.igNotExists) {
207✔
2911
      mInfo("stb:%s, not exist, ignore not exist is set", dropReq.name);
206!
2912
      code = 0;
206✔
2913
      goto _OVER;
206✔
2914
    } else {
2915
      code = TSDB_CODE_MND_STB_NOT_EXIST;
1✔
2916
      goto _OVER;
1✔
2917
    }
2918
  }
2919

2920
  if ((dropReq.source == TD_REQ_FROM_TAOX_OLD || dropReq.source == TD_REQ_FROM_TAOX) && pStb->uid != dropReq.suid) {
134!
2921
    code = 0;
×
2922
    goto _OVER;
×
2923
  }
2924

2925
  pDb = mndAcquireDbByStb(pMnode, dropReq.name);
134✔
2926
  if (pDb == NULL) {
134!
2927
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2928
    goto _OVER;
×
2929
  }
2930

2931
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
134!
2932
    goto _OVER;
×
2933
  }
2934

2935
  if (mndCheckDropStbForTopic(pMnode, dropReq.name, pStb->uid) < 0) {
134!
2936
    code = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
×
2937
    goto _OVER;
×
2938
  }
2939

2940
  if (mndCheckDropStbForStream(pMnode, dropReq.name, pStb->uid) < 0) {
134✔
2941
    code = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
2✔
2942
    goto _OVER;
2✔
2943
  }
2944

2945
  code = mndDropStb(pMnode, pReq, pDb, pStb);
132✔
2946
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
132!
2947

2948
  SName   name = {0};
132✔
2949
  int32_t ret = 0;
132✔
2950
  if ((ret = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
132!
2951
    mError("stb:%s, failed to tNameFromString since %s", dropReq.name, tstrerror(ret));
×
2952

2953
  auditRecord(pReq, pMnode->clusterId, "dropStb", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
132✔
2954

2955
_OVER:
341✔
2956
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
341✔
2957
    mError("stb:%s, failed to drop since %s", dropReq.name, tstrerror(code));
3!
2958
  }
2959

2960
  mndReleaseDb(pMnode, pDb);
341✔
2961
  mndReleaseStb(pMnode, pStb);
341✔
2962
  tFreeSMDropStbReq(&dropReq);
341✔
2963
  TAOS_RETURN(code);
341✔
2964
}
2965

2966
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
4,237✔
2967
  SMnode       *pMnode = pReq->info.node;
4,237✔
2968
  int32_t       code = -1;
4,237✔
2969
  STableInfoReq infoReq = {0};
4,237✔
2970
  STableMetaRsp metaRsp = {0};
4,237✔
2971
  SUserObj     *pUser = NULL;
4,237✔
2972

2973
  code = mndAcquireUser(pMnode, pReq->info.conn.user, &pUser);
4,237✔
2974
  if (pUser == NULL) return 0;
4,237!
2975
  bool sysinfo = pUser->sysInfo;
4,237✔
2976

2977
  TAOS_CHECK_GOTO(tDeserializeSTableInfoReq(pReq->pCont, pReq->contLen, &infoReq), NULL, _OVER);
4,237!
2978

2979
  if (0 == strcmp(infoReq.dbFName, TSDB_INFORMATION_SCHEMA_DB)) {
4,237✔
2980
    mInfo("information_schema table:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
1,183!
2981
    TAOS_CHECK_GOTO(mndBuildInsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, sysinfo, &metaRsp), NULL, _OVER);
1,183✔
2982
  } else if (0 == strcmp(infoReq.dbFName, TSDB_PERFORMANCE_SCHEMA_DB)) {
3,054✔
2983
    mInfo("performance_schema table:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
186!
2984
    TAOS_CHECK_GOTO(mndBuildPerfsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp), NULL, _OVER);
186✔
2985
  } else {
2986
    mInfo("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
2,868!
2987
    TAOS_CHECK_GOTO(mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp), NULL, _OVER);
2,868✔
2988
  }
2989

2990
  int32_t rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
3,795✔
2991
  if (rspLen < 0) {
3,795!
2992
    code = TSDB_CODE_INVALID_MSG;
×
2993
    goto _OVER;
×
2994
  }
2995

2996
  void *pRsp = rpcMallocCont(rspLen);
3,795✔
2997
  if (pRsp == NULL) {
3,795!
2998
    code = terrno;
×
2999
    goto _OVER;
×
3000
  }
3001

3002
  if ((rspLen = tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp)) < 0) {
3,795!
3003
    code = rspLen;
×
3004
    goto _OVER;
×
3005
  }
3006
  pReq->info.rsp = pRsp;
3,795✔
3007
  pReq->info.rspLen = rspLen;
3,795✔
3008
  code = 0;
3,795✔
3009

3010
  mTrace("%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName);
3,795✔
3011

3012
_OVER:
3,033✔
3013
  if (code != 0) {
4,237✔
3014
    mError("stb:%s.%s, failed to retrieve meta since %s", infoReq.dbFName, infoReq.tbName, tstrerror(code));
442!
3015
  }
3016

3017
  mndReleaseUser(pMnode, pUser);
4,237✔
3018
  tFreeSTableMetaRsp(&metaRsp);
4,237✔
3019
  // TODO change to TAOS_RETURN
3020
  return code;
4,237✔
3021
}
3022

3023
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq) {
48✔
3024
  SMnode      *pMnode = pReq->info.node;
48✔
3025
  int32_t      code = -1;
48✔
3026
  STableCfgReq cfgReq = {0};
48✔
3027
  STableCfgRsp cfgRsp = {0};
48✔
3028

3029
  TAOS_CHECK_GOTO(tDeserializeSTableCfgReq(pReq->pCont, pReq->contLen, &cfgReq), NULL, _OVER);
48!
3030

3031
  char dbName[TSDB_DB_NAME_LEN] = {0};
48✔
3032
  TAOS_CHECK_GOTO(mndExtractShortDbNameFromDbFullName(cfgReq.dbFName, dbName), NULL, _OVER);
48!
3033
  if (0 == strcmp(dbName, TSDB_INFORMATION_SCHEMA_DB)) {
48✔
3034
    mInfo("information_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
20!
3035
    TAOS_CHECK_GOTO(mndBuildInsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp), NULL, _OVER);
20!
3036
  } else if (0 == strcmp(dbName, TSDB_PERFORMANCE_SCHEMA_DB)) {
28✔
3037
    mInfo("performance_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
10!
3038
    TAOS_CHECK_GOTO(mndBuildPerfsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp), NULL, _OVER);
10!
3039
  } else {
3040
    mInfo("stb:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
18!
3041
    TAOS_CHECK_GOTO(mndBuildStbCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp), NULL, _OVER);
18!
3042
  }
3043

3044
  int32_t rspLen = tSerializeSTableCfgRsp(NULL, 0, &cfgRsp);
48✔
3045
  if (rspLen < 0) {
48!
3046
    code = TSDB_CODE_INVALID_MSG;
×
3047
    goto _OVER;
×
3048
  }
3049

3050
  void *pRsp = rpcMallocCont(rspLen);
48✔
3051
  if (pRsp == NULL) {
48!
3052
    code = terrno;
×
3053
    goto _OVER;
×
3054
  }
3055

3056
  if ((rspLen = tSerializeSTableCfgRsp(pRsp, rspLen, &cfgRsp)) < 0) {
48!
3057
    code = rspLen;
×
3058
    goto _OVER;
×
3059
  }
3060
  pReq->info.rsp = pRsp;
48✔
3061
  pReq->info.rspLen = rspLen;
48✔
3062
  code = 0;
48✔
3063

3064
  mTrace("%s.%s, cfg is retrieved", cfgReq.dbFName, cfgReq.tbName);
48!
3065

3066
_OVER:
48✔
3067
  if (code != 0) {
48!
3068
    mError("stb:%s.%s, failed to retrieve cfg since %s", cfgReq.dbFName, cfgReq.tbName, tstrerror(code));
×
3069
  }
3070

3071
  tFreeSTableCfgRsp(&cfgRsp);
48✔
3072
  TAOS_RETURN(code);
48✔
3073
}
3074

3075
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t numOfStbs, void **ppRsp,
3,928✔
3076
                           int32_t *pRspLen) {
3077
  int32_t   code = 0;
3,928✔
3078
  SSTbHbRsp hbRsp = {0};
3,928✔
3079
  hbRsp.pMetaRsp = taosArrayInit(numOfStbs, sizeof(STableMetaRsp));
3,928✔
3080
  if (hbRsp.pMetaRsp == NULL) {
3,928!
3081
    code = terrno;
×
3082
    TAOS_RETURN(code);
×
3083
  }
3084

3085
  hbRsp.pIndexRsp = taosArrayInit(numOfStbs, sizeof(STableIndexRsp));
3,928✔
3086
  if (NULL == hbRsp.pIndexRsp) {
3,928!
3087
    taosArrayDestroy(hbRsp.pMetaRsp);
×
3088
    code = terrno;
×
3089
    TAOS_RETURN(code);
×
3090
  }
3091

3092
  for (int32_t i = 0; i < numOfStbs; ++i) {
9,879✔
3093
    SSTableVersion *pStbVersion = &pStbVersions[i];
5,951✔
3094
    pStbVersion->suid = be64toh(pStbVersion->suid);
5,951✔
3095
    pStbVersion->sversion = ntohl(pStbVersion->sversion);
5,951✔
3096
    pStbVersion->tversion = ntohl(pStbVersion->tversion);
5,951✔
3097
    pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
5,951✔
3098

3099
    bool    schema = false;
5,951✔
3100
    bool    sma = false;
5,951✔
3101
    int32_t code = mndValidateStbVersion(pMnode, pStbVersion, &schema, &sma);
5,951✔
3102
    if (TSDB_CODE_SUCCESS != code) {
5,951✔
3103
      STableMetaRsp metaRsp = {0};
117✔
3104
      metaRsp.numOfColumns = -1;
117✔
3105
      metaRsp.suid = pStbVersion->suid;
117✔
3106
      tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName));
117✔
3107
      tstrncpy(metaRsp.tbName, pStbVersion->stbName, sizeof(metaRsp.tbName));
117✔
3108
      tstrncpy(metaRsp.stbName, pStbVersion->stbName, sizeof(metaRsp.stbName));
117✔
3109
      if (taosArrayPush(hbRsp.pMetaRsp, &metaRsp) == NULL) {
234!
3110
        code = terrno;
×
3111
        return code;
×
3112
      }
3113
      continue;
117✔
3114
    }
3115

3116
    if (schema) {
5,834✔
3117
      STableMetaRsp metaRsp = {0};
2✔
3118
      mInfo("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
2!
3119
      if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp) != 0) {
2!
3120
        metaRsp.numOfColumns = -1;
×
3121
        metaRsp.suid = pStbVersion->suid;
×
3122
        tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName));
×
3123
        tstrncpy(metaRsp.tbName, pStbVersion->stbName, sizeof(metaRsp.tbName));
×
3124
        tstrncpy(metaRsp.stbName, pStbVersion->stbName, sizeof(metaRsp.stbName));
×
3125
        if (taosArrayPush(hbRsp.pMetaRsp, &metaRsp) == NULL) {
×
3126
          code = terrno;
×
3127
          return code;
×
3128
        }
3129
        continue;
×
3130
      }
3131

3132
      if (taosArrayPush(hbRsp.pMetaRsp, &metaRsp) == NULL) {
4!
3133
        code = terrno;
×
3134
        return code;
×
3135
      }
3136
    }
3137

3138
    if (sma) {
5,834!
3139
      bool           exist = false;
×
3140
      char           tbFName[TSDB_TABLE_FNAME_LEN];
3141
      STableIndexRsp indexRsp = {0};
×
3142
      indexRsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
×
3143
      if (NULL == indexRsp.pIndex) {
×
3144
        code = terrno;
×
3145
        TAOS_RETURN(code);
×
3146
      }
3147

3148
      sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName);
×
3149
      int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist);
×
3150
      if (code || !exist) {
×
3151
        indexRsp.suid = pStbVersion->suid;
×
3152
        indexRsp.version = -1;
×
3153
        indexRsp.pIndex = NULL;
×
3154
      }
3155

3156
      strcpy(indexRsp.dbFName, pStbVersion->dbFName);
×
3157
      strcpy(indexRsp.tbName, pStbVersion->stbName);
×
3158

3159
      if (taosArrayPush(hbRsp.pIndexRsp, &indexRsp) == NULL) {
×
3160
        code = terrno;
×
3161
        return code;
×
3162
      }
3163
    }
3164
  }
3165

3166
  int32_t rspLen = tSerializeSSTbHbRsp(NULL, 0, &hbRsp);
3,928✔
3167
  if (rspLen < 0) {
3,928!
3168
    tFreeSSTbHbRsp(&hbRsp);
×
3169
    code = TSDB_CODE_INVALID_MSG;
×
3170
    TAOS_RETURN(code);
×
3171
  }
3172

3173
  void *pRsp = taosMemoryMalloc(rspLen);
3,928✔
3174
  if (pRsp == NULL) {
3,928!
3175
    tFreeSSTbHbRsp(&hbRsp);
×
3176
    code = terrno;
×
3177
    TAOS_RETURN(code);
×
3178
  }
3179

3180
  rspLen = tSerializeSSTbHbRsp(pRsp, rspLen, &hbRsp);
3,928✔
3181
  tFreeSSTbHbRsp(&hbRsp);
3,928✔
3182
  if (rspLen < 0) return rspLen;
3,928!
3183
  *ppRsp = pRsp;
3,928✔
3184
  *pRspLen = rspLen;
3,928✔
3185
  TAOS_RETURN(code);
3,928✔
3186
}
3187

3188
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
3,326✔
3189
  int32_t code = 0;
3,326✔
3190
  SSdb   *pSdb = pMnode->pSdb;
3,326✔
3191
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
3,326✔
3192
  if (pDb == NULL) {
3,326!
3193
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
3194
    TAOS_RETURN(code);
×
3195
  }
3196

3197
  int32_t numOfStbs = 0;
3,326✔
3198
  void   *pIter = NULL;
3,326✔
3199
  while (1) {
57,329✔
3200
    SStbObj *pStb = NULL;
60,655✔
3201
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
60,655✔
3202
    if (pIter == NULL) break;
60,655✔
3203

3204
    if (pStb->dbUid == pDb->uid) {
57,329✔
3205
      numOfStbs++;
42,868✔
3206
    }
3207

3208
    sdbRelease(pSdb, pStb);
57,329✔
3209
  }
3210

3211
  *pNumOfStbs = numOfStbs;
3,326✔
3212
  mndReleaseDb(pMnode, pDb);
3,326✔
3213
  TAOS_RETURN(code);
3,326✔
3214
}
3215

3216
int32_t mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst) {
×
3217
  SName name = {0};
×
3218
  TAOS_CHECK_RETURN(tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
×
3219

3220
  TAOS_CHECK_RETURN(tNameGetFullDbName(&name, dst));
×
3221

3222
  return 0;
×
3223
}
3224

3225
int32_t mndExtractShortDbNameFromStbFullName(const char *stbFullName, char *dst) {
30✔
3226
  SName name = {0};
30✔
3227
  TAOS_CHECK_RETURN(tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
30!
3228

3229
  TAOS_CHECK_RETURN(tNameGetDbName(&name, dst));
30!
3230

3231
  return 0;
30✔
3232
}
3233

3234
int32_t mndExtractShortDbNameFromDbFullName(const char *stbFullName, char *dst) {
48✔
3235
  SName name = {0};
48✔
3236
  TAOS_CHECK_RETURN(tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB));
48!
3237

3238
  TAOS_CHECK_RETURN(tNameGetDbName(&name, dst));
48!
3239

3240
  return 0;
48✔
3241
}
3242

3243
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) {
1,813✔
3244
  int32_t pos = -1;
1,813✔
3245
  int32_t num = 0;
1,813✔
3246
  for (pos = 0; stbFullName[pos] != 0; ++pos) {
12,521!
3247
    if (stbFullName[pos] == TS_PATH_DELIMITER[0]) num++;
12,521✔
3248
    if (num == 2) break;
12,521✔
3249
  }
3250

3251
  if (num == 2) {
1,813!
3252
    tstrncpy(dst, stbFullName + pos + 1, dstSize);
1,813✔
3253
  }
3254
}
1,813✔
3255

3256
// static int32_t mndProcessRetrieveStbReq(SRpcMsg *pReq) {
3257
//   SMnode    *pMnode = pReq->info.node;
3258
//   SShowMgmt *pMgmt = &pMnode->showMgmt;
3259
//   SShowObj  *pShow = NULL;
3260
//   int32_t    rowsToRead = SHOW_STEP_SIZE;
3261
//   int32_t    rowsRead = 0;
3262
//
3263
//   SRetrieveTableReq retrieveReq = {0};
3264
//   if (tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
3265
//     terrno = TSDB_CODE_INVALID_MSG;
3266
//     return -1;
3267
//   }
3268
//
3269
//   SMnode    *pMnode = pReq->info.node;
3270
//   SSdb      *pSdb = pMnode->pSdb;
3271
//   int32_t    numOfRows = 0;
3272
//   SDbObj    *pDb = NULL;
3273
//   ESdbStatus objStatus = 0;
3274
//
3275
//   SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
3276
//   if (pUser == NULL) return 0;
3277
//   bool sysinfo = pUser->sysInfo;
3278
//
3279
//   // Append the information_schema database into the result.
3280
////  if (!pShow->sysDbRsp) {
3281
////    SDbObj infoschemaDb = {0};
3282
////    setInformationSchemaDbCfg(pMnode, &infoschemaDb);
3283
////    size_t numOfTables = 0;
3284
////    getVisibleInfosTablesNum(sysinfo, &numOfTables);
3285
////    mndDumpDbInfoData(pMnode, pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
3286
////
3287
////    numOfRows += 1;
3288
////
3289
////    SDbObj perfschemaDb = {0};
3290
////    setPerfSchemaDbCfg(pMnode, &perfschemaDb);
3291
////    numOfTables = 0;
3292
////    getPerfDbMeta(NULL, &numOfTables);
3293
////    mndDumpDbInfoData(pMnode, pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
3294
////
3295
////    numOfRows += 1;
3296
////    pShow->sysDbRsp = true;
3297
////  }
3298
//
3299
//  SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS);
3300
//  blockDataEnsureCapacity(p, rowsToRead);
3301
//
3302
//  size_t               size = 0;
3303
//  const SSysTableMeta* pSysDbTableMeta = NULL;
3304
//
3305
//  getInfosDbMeta(&pSysDbTableMeta, &size);
3306
//  p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);
3307
//
3308
//  getPerfDbMeta(&pSysDbTableMeta, &size);
3309
//  p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
3310
//
3311
//  blockDataDestroy(p);
3312
//
3313
//
3314
//  while (numOfRows < rowsToRead) {
3315
//    pShow->pIter = sdbFetchAll(pSdb, SDB_DB, pShow->pIter, (void **)&pDb, &objStatus, true);
3316
//    if (pShow->pIter == NULL) break;
3317
//    if (strncmp(retrieveReq.db, pDb->name, strlen(retrieveReq.db)) != 0){
3318
//      continue;
3319
//    }
3320
//    if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) {
3321
//      continue;
3322
//    }
3323
//
3324
//    while (numOfRows < rowsToRead) {
3325
//      pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
3326
//      if (pShow->pIter == NULL) break;
3327
//
3328
//      if (pDb != NULL && pStb->dbUid != pDb->uid) {
3329
//        sdbRelease(pSdb, pStb);
3330
//        continue;
3331
//      }
3332
//
3333
//      cols = 0;
3334
//
3335
//      SName name = {0};
3336
//      char  stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
3337
//      mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
3338
//      varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
3339
//
3340
//      SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3341
//      colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
3342
//
3343
//      char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
3344
//      tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
3345
//      tNameGetDbName(&name, varDataVal(db));
3346
//      varDataSetLen(db, strlen(varDataVal(db)));
3347
//
3348
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3349
//      colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
3350
//
3351
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3352
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
3353
//
3354
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3355
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false);
3356
//
3357
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3358
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false);
3359
//
3360
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3361
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->updateTime, false);  // number of tables
3362
//
3363
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3364
//      if (pStb->commentLen > 0) {
3365
//        char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
3366
//        STR_TO_VARSTR(comment, pStb->comment);
3367
//        colDataSetVal(pColInfo, numOfRows, comment, false);
3368
//      } else if (pStb->commentLen == 0) {
3369
//        char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
3370
//        STR_TO_VARSTR(comment, "");
3371
//        colDataSetVal(pColInfo, numOfRows, comment, false);
3372
//      } else {
3373
//        colDataSetNULL(pColInfo, numOfRows);
3374
//      }
3375
//
3376
//      char watermark[64 + VARSTR_HEADER_SIZE] = {0};
3377
//      sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]);
3378
//      varDataSetLen(watermark, strlen(varDataVal(watermark)));
3379
//
3380
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3381
//      colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false);
3382
//
3383
//      char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
3384
//      sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
3385
//      varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
3386
//
3387
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3388
//      colDataSetVal(pColInfo, numOfRows, (const char *)maxDelay, false);
3389
//
3390
//      char    rollup[160 + VARSTR_HEADER_SIZE] = {0};
3391
//      int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
3392
//      char   *sep = ", ";
3393
//      int32_t sepLen = strlen(sep);
3394
//      int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2;
3395
//      for (int32_t i = 0; i < rollupNum; ++i) {
3396
//        char *funcName = taosArrayGet(pStb->pFuncs, i);
3397
//        if (i) {
3398
//          strncat(varDataVal(rollup), sep, rollupLen);
3399
//          rollupLen -= sepLen;
3400
//        }
3401
//        strncat(varDataVal(rollup), funcName, rollupLen);
3402
//        rollupLen -= strlen(funcName);
3403
//      }
3404
//      varDataSetLen(rollup, strlen(varDataVal(rollup)));
3405
//
3406
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3407
//      colDataSetVal(pColInfo, numOfRows, (const char *)rollup, false);
3408
//
3409
//      numOfRows++;
3410
//      sdbRelease(pSdb, pStb);
3411
//    }
3412
//
3413
//    if (pDb != NULL) {
3414
//      mndReleaseDb(pMnode, pDb);
3415
//    }
3416
//
3417
//    sdbRelease(pSdb, pDb);
3418
//  }
3419
//
3420
//  pShow->numOfRows += numOfRows;
3421
//  mndReleaseUser(pMnode, pUser);
3422
//
3423
//
3424
//
3425
//
3426
//
3427
//
3428
//
3429
//
3430
//  ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
3431
//  if (retrieveFp == NULL) {
3432
//    mndReleaseShowObj(pShow, false);
3433
//    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
3434
//    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
3435
//    return -1;
3436
//  }
3437
//
3438
//  mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
3439
//  if (retrieveReq.user[0] != 0) {
3440
//    memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN);
3441
//  } else {
3442
//    memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
3443
//  }
3444
//  if (retrieveReq.db[0] && mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
3445
//    return -1;
3446
//  }
3447
//
3448
//  int32_t numOfCols = pShow->pMeta->numOfColumns;
3449
//
3450
//  SSDataBlock *pBlock = createDataBlock();
3451
//  for (int32_t i = 0; i < numOfCols; ++i) {
3452
//    SColumnInfoData idata = {0};
3453
//
3454
//    SSchema *p = &pShow->pMeta->pSchemas[i];
3455
//
3456
//    idata.info.bytes = p->bytes;
3457
//    idata.info.type = p->type;
3458
//    idata.info.colId = p->colId;
3459
//    blockDataAppendColInfo(pBlock, &idata);
3460
//  }
3461
//
3462
//  blockDataEnsureCapacity(pBlock, rowsToRead);
3463
//
3464
//  if (mndCheckRetrieveFinished(pShow)) {
3465
//    mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
3466
//    rowsRead = 0;
3467
//  } else {
3468
//    rowsRead = (*retrieveFp)(pReq, pShow, pBlock, rowsToRead);
3469
//    if (rowsRead < 0) {
3470
//      terrno = rowsRead;
3471
//      mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
3472
//      mndReleaseShowObj(pShow, true);
3473
//      blockDataDestroy(pBlock);
3474
//      return -1;
3475
//    }
3476
//
3477
//    pBlock->info.rows = rowsRead;
3478
//    mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
3479
//  }
3480
//
3481
//  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
3482
//         blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock));
3483
//
3484
//  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
3485
//  if (pRsp == NULL) {
3486
//    mndReleaseShowObj(pShow, false);
3487
//    terrno = TSDB_CODE_OUT_OF_MEMORY;
3488
//    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
3489
//    blockDataDestroy(pBlock);
3490
//    return -1;
3491
//  }
3492
//
3493
//  pRsp->handle = htobe64(pShow->id);
3494
//
3495
//  if (rowsRead > 0) {
3496
//    char    *pStart = pRsp->data;
3497
//    SSchema *ps = pShow->pMeta->pSchemas;
3498
//
3499
//    *(int32_t *)pStart = htonl(pShow->pMeta->numOfColumns);
3500
//    pStart += sizeof(int32_t);  // number of columns
3501
//
3502
//    for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
3503
//      SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
3504
//      pSchema->bytes = htonl(ps[i].bytes);
3505
//      pSchema->colId = htons(ps[i].colId);
3506
//      pSchema->type = ps[i].type;
3507
//
3508
//      pStart += sizeof(SSysTableSchema);
3509
//    }
3510
//
3511
//    int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
3512
//  }
3513
//
3514
//  pRsp->numOfRows = htonl(rowsRead);
3515
//  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
3516
//  pReq->info.rsp = pRsp;
3517
//  pReq->info.rspLen = size;
3518
//
3519
//  if (rowsRead == 0 || rowsRead < rowsToRead) {
3520
//    pRsp->completed = 1;
3521
//    mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
3522
//    mndReleaseShowObj(pShow, true);
3523
//  } else {
3524
//    mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
3525
//    mndReleaseShowObj(pShow, false);
3526
//  }
3527
//
3528
//  blockDataDestroy(pBlock);
3529
//  return TSDB_CODE_SUCCESS;
3530
//}
3531

3532
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
414✔
3533
  SMnode  *pMnode = pReq->info.node;
414✔
3534
  SSdb    *pSdb = pMnode->pSdb;
414✔
3535
  int32_t  numOfRows = 0;
414✔
3536
  SStbObj *pStb = NULL;
414✔
3537
  int32_t  cols = 0;
414✔
3538
  int32_t  lino = 0;
414✔
3539
  int32_t  code = 0;
414✔
3540

3541
  SDbObj *pDb = NULL;
414✔
3542
  if (strlen(pShow->db) > 0) {
414✔
3543
    pDb = mndAcquireDb(pMnode, pShow->db);
327✔
3544
    if (pDb == NULL) return terrno;
327✔
3545
  }
3546

3547
  while (numOfRows < rows) {
2,209✔
3548
    pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
2,204✔
3549
    if (pShow->pIter == NULL) break;
2,204✔
3550

3551
    if (pDb != NULL && pStb->dbUid != pDb->uid) {
1,797✔
3552
      sdbRelease(pSdb, pStb);
75✔
3553
      continue;
81✔
3554
    }
3555

3556
    if (isTsmaResSTb(pStb->name)) {
1,722✔
3557
      sdbRelease(pSdb, pStb);
6✔
3558
      continue;
6✔
3559
    }
3560

3561
    cols = 0;
1,716✔
3562

3563
    SName name = {0};
1,716✔
3564

3565
    char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1,716✔
3566
    mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
1,716✔
3567
    varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
1,716✔
3568
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,716✔
3569
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false), pStb, &lino, _ERROR);
1,716!
3570

3571
    char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1,716✔
3572
    RETRIEVE_CHECK_GOTO(tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB), pStb, &lino, _ERROR);
1,716!
3573
    RETRIEVE_CHECK_GOTO(tNameGetDbName(&name, varDataVal(db)), pStb, &lino, _ERROR);
1,716!
3574
    varDataSetLen(db, strlen(varDataVal(db)));
1,716✔
3575
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,716✔
3576
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)db, false), pStb, &lino, _ERROR);
1,716!
3577

3578
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,716✔
3579
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->createdTime, false), pStb, &lino,
1,716!
3580
                        _ERROR);
3581

3582
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,716✔
3583
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false), pStb, &lino,
1,716!
3584
                        _ERROR);
3585

3586
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,716✔
3587
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false), pStb, &lino, _ERROR);
1,716!
3588

3589
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,716✔
3590
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->updateTime, false), pStb, &lino,
1,716!
3591
                        _ERROR);  // number of tables
3592

3593
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,716✔
3594
    if (pStb->commentLen > 0) {
1,716✔
3595
      char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
64✔
3596
      STR_TO_VARSTR(comment, pStb->comment);
64✔
3597
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, comment, false), pStb, &lino, _ERROR);
64!
3598
    } else if (pStb->commentLen == 0) {
1,652✔
3599
      char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
41✔
3600
      STR_TO_VARSTR(comment, "");
41✔
3601
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, comment, false), pStb, &lino, _ERROR);
41!
3602
    } else {
3603
      colDataSetNULL(pColInfo, numOfRows);
1,611!
3604
    }
3605

3606
    char watermark[64 + VARSTR_HEADER_SIZE] = {0};
1,716✔
3607
    sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]);
1,716✔
3608
    varDataSetLen(watermark, strlen(varDataVal(watermark)));
1,716✔
3609

3610
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,716✔
3611
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false), pStb, &lino, _ERROR);
1,716!
3612

3613
    char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
1,716✔
3614
    sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
1,716✔
3615
    varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
1,716✔
3616

3617
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,716✔
3618
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)maxDelay, false), pStb, &lino, _ERROR);
1,716!
3619

3620
    char    rollup[160 + VARSTR_HEADER_SIZE] = {0};
1,716✔
3621
    int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
1,716✔
3622
    char   *sep = ", ";
1,716✔
3623
    int32_t sepLen = strlen(sep);
1,716✔
3624
    int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2;
1,716✔
3625
    for (int32_t i = 0; i < rollupNum; ++i) {
1,719✔
3626
      char *funcName = taosArrayGet(pStb->pFuncs, i);
3✔
3627
      if (i) {
3!
3628
        (void)strncat(varDataVal(rollup), sep, rollupLen);
×
3629
        rollupLen -= sepLen;
×
3630
      }
3631
      (void)strncat(varDataVal(rollup), funcName, rollupLen);
3✔
3632
      rollupLen -= strlen(funcName);
3✔
3633
    }
3634
    varDataSetLen(rollup, strlen(varDataVal(rollup)));
1,716✔
3635

3636
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,716✔
3637
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)rollup, false), pStb, &lino, _ERROR);
1,716!
3638

3639
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,716✔
3640
    if (pColInfo) {
1,716!
3641
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)(&pStb->uid), false), pStb, &lino, _ERROR);
1,716!
3642
    }
3643

3644
    numOfRows++;
1,716✔
3645
    sdbRelease(pSdb, pStb);
1,716✔
3646
  }
3647

3648
  if (pDb != NULL) {
412✔
3649
    mndReleaseDb(pMnode, pDb);
325✔
3650
  }
3651

3652
  goto _OVER;
412✔
3653

3654
_ERROR:
×
3655
  mError("show:0x%" PRIx64 ", failed to retrieve data at %s:%d since %s", pShow->id, __FUNCTION__, lino,
×
3656
         tstrerror(code));
3657

3658
_OVER:
×
3659
  pShow->numOfRows += numOfRows;
412✔
3660
  return numOfRows;
412✔
3661
}
3662

3663
static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *pSysDbTableMeta, size_t size,
12,905✔
3664
                                    const char *dbName, const char *tbName) {
3665
  char    tName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
12,905✔
3666
  char    dName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
12,905✔
3667
  char    typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
12,905✔
3668
  int32_t numOfRows = p->info.rows;
12,905✔
3669
  int32_t lino = 0;
12,905✔
3670
  int32_t code = 0;
12,905✔
3671

3672
  STR_TO_VARSTR(dName, dbName);
12,905✔
3673
  STR_TO_VARSTR(typeName, "SYSTEM_TABLE");
12,905✔
3674

3675
  for (int32_t i = 0; i < size; ++i) {
270,686✔
3676
    const SSysTableMeta *pm = &pSysDbTableMeta[i];
258,111✔
3677
    //    if (pm->sysInfo) {
3678
    //      continue;
3679
    //    }
3680
    if (tbName[0] && strncmp(tbName, pm->name, TSDB_TABLE_NAME_LEN) != 0) {
258,111!
3681
      continue;
×
3682
    }
3683

3684
    STR_TO_VARSTR(tName, pm->name);
258,111✔
3685

3686
    for (int32_t j = 0; j < pm->colNum; j++) {
2,437,365✔
3687
      // table name
3688
      SColumnInfoData *pColInfoData = taosArrayGet(p->pDataBlock, 0);
2,179,584✔
3689
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, tName, false), &lino, _OVER);
2,179,527!
3690

3691
      // database name
3692
      pColInfoData = taosArrayGet(p->pDataBlock, 1);
2,179,396✔
3693
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, dName, false), &lino, _OVER);
2,179,303!
3694

3695
      pColInfoData = taosArrayGet(p->pDataBlock, 2);
2,179,122✔
3696
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, typeName, false), &lino, _OVER);
2,179,022!
3697

3698
      // col name
3699
      char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
2,179,206✔
3700
      STR_TO_VARSTR(colName, pm->schema[j].name);
2,179,206✔
3701
      pColInfoData = taosArrayGet(p->pDataBlock, 3);
2,179,206✔
3702
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, colName, false), &lino, _OVER);
2,179,846!
3703

3704
      // col type
3705
      int8_t colType = pm->schema[j].type;
2,179,760✔
3706
      pColInfoData = taosArrayGet(p->pDataBlock, 4);
2,179,760✔
3707
      char colTypeStr[VARSTR_HEADER_SIZE + 32];
3708
      int  colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
2,179,672✔
3709
      if (colType == TSDB_DATA_TYPE_VARCHAR) {
2,179,672✔
3710
        colTypeLen +=
1,200,047✔
3711
            sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)", (int32_t)(pm->schema[j].bytes - VARSTR_HEADER_SIZE));
1,200,047✔
3712
      } else if (colType == TSDB_DATA_TYPE_NCHAR) {
979,625!
3713
        colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
×
3714
                              (int32_t)((pm->schema[j].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
×
3715
      }
3716
      varDataSetLen(colTypeStr, colTypeLen);
2,179,672✔
3717
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, (char *)colTypeStr, false), &lino, _OVER);
2,179,672!
3718

3719
      pColInfoData = taosArrayGet(p->pDataBlock, 5);
2,180,124✔
3720
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, (const char *)&pm->schema[j].bytes, false), &lino, _OVER);
2,179,994!
3721
      for (int32_t k = 6; k <= 8; ++k) {
8,713,484✔
3722
        pColInfoData = taosArrayGet(p->pDataBlock, k);
6,534,230✔
3723
        colDataSetNULL(pColInfoData, numOfRows);
6,533,741!
3724
      }
3725

3726
      numOfRows += 1;
2,179,254✔
3727
    }
3728
  }
3729
_OVER:
12,575✔
3730
  mError("failed at %s:%d since %s", __FUNCTION__, lino, tstrerror(code));
12,575!
3731
  return numOfRows;
12,906✔
3732
}
3733
#define BUILD_COL_FOR_INFO_DB 1
3734
#define BUILD_COL_FOR_PERF_DB 1 << 1
3735
#define BUILD_COL_FOR_USER_DB 1 << 2
3736
#define BUILD_COL_FOR_ALL_DB  (BUILD_COL_FOR_INFO_DB | BUILD_COL_FOR_PERF_DB | BUILD_COL_FOR_USER_DB)
3737

3738
static int32_t buildSysDbColsInfo(SSDataBlock *p, int8_t buildWhichDBs, char *tb) {
6,461✔
3739
  size_t               size = 0;
6,461✔
3740
  const SSysTableMeta *pSysDbTableMeta = NULL;
6,461✔
3741

3742
  if (buildWhichDBs & BUILD_COL_FOR_INFO_DB) {
6,461✔
3743
    getInfosDbMeta(&pSysDbTableMeta, &size);
6,453✔
3744
    p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB, tb);
6,453✔
3745
  }
3746

3747
  if (buildWhichDBs & BUILD_COL_FOR_PERF_DB) {
6,460✔
3748
    getPerfDbMeta(&pSysDbTableMeta, &size);
6,452✔
3749
    p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB, tb);
6,452✔
3750
  }
3751

3752
  return p->info.rows;
6,461✔
3753
}
3754

3755
static int8_t determineBuildColForWhichDBs(const char *db) {
6,489✔
3756
  int8_t buildWhichDBs;
3757
  if (!db[0])
6,489✔
3758
    buildWhichDBs = BUILD_COL_FOR_ALL_DB;
6,452✔
3759
  else {
3760
    char *p = strchr(db, '.');
37✔
3761
    if (p && strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB) == 0) {
37!
3762
      buildWhichDBs = BUILD_COL_FOR_INFO_DB;
1✔
3763
    } else if (p && strcmp(p + 1, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
36!
3764
      buildWhichDBs = BUILD_COL_FOR_PERF_DB;
1✔
3765
    } else {
3766
      buildWhichDBs = BUILD_COL_FOR_USER_DB;
35✔
3767
    }
3768
  }
3769
  return buildWhichDBs;
6,489✔
3770
}
3771

3772
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
6,489✔
3773
  uint8_t  buildWhichDBs;
3774
  SMnode  *pMnode = pReq->info.node;
6,489✔
3775
  SSdb    *pSdb = pMnode->pSdb;
6,489✔
3776
  SStbObj *pStb = NULL;
6,489✔
3777
  int32_t  numOfRows = 0;
6,489✔
3778
  int32_t  lino = 0;
6,489✔
3779
  int32_t  code = 0;
6,489✔
3780

3781
  buildWhichDBs = determineBuildColForWhichDBs(pShow->db);
6,489✔
3782

3783
  if (!pShow->sysDbRsp) {
6,489✔
3784
    numOfRows = buildSysDbColsInfo(pBlock, buildWhichDBs, pShow->filterTb);
6,461✔
3785
    mDebug("mndRetrieveStbCol get system table cols, rows:%d, db:%s", numOfRows, pShow->db);
6,461!
3786
    pShow->sysDbRsp = true;
6,461✔
3787
  }
3788

3789
  if (buildWhichDBs & BUILD_COL_FOR_USER_DB) {
6,489✔
3790
    SDbObj *pDb = NULL;
6,487✔
3791
    if (strlen(pShow->db) > 0) {
6,487✔
3792
      pDb = mndAcquireDb(pMnode, pShow->db);
35✔
3793
      if (pDb == NULL && TSDB_CODE_MND_DB_NOT_EXIST != terrno && pBlock->info.rows == 0) return terrno;
35!
3794
    }
3795

3796
    char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
6,487✔
3797
    STR_TO_VARSTR(typeName, "SUPER_TABLE");
6,487✔
3798
    bool fetch = pShow->restore ? false : true;
6,487✔
3799
    pShow->restore = false;
6,487✔
3800
    while (numOfRows < rows) {
6,540!
3801
      if (fetch) {
6,540✔
3802
        pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
6,512✔
3803
        if (pShow->pIter == NULL) break;
6,540✔
3804
      } else {
3805
        fetch = true;
28✔
3806
        void *pKey = taosHashGetKey(pShow->pIter, NULL);
28✔
3807
        pStb = sdbAcquire(pSdb, SDB_STB, pKey);
28✔
3808
        if (!pStb) continue;
42!
3809
      }
3810

3811
      if (pDb != NULL && pStb->dbUid != pDb->uid) {
81✔
3812
        sdbRelease(pSdb, pStb);
14✔
3813
        continue;
14✔
3814
      }
3815

3816
      SName name = {0};
67✔
3817
      char  stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
67✔
3818
      mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
67✔
3819
      if (pShow->filterTb[0] && strncmp(pShow->filterTb, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN) != 0) {
67!
3820
        sdbRelease(pSdb, pStb);
×
3821
        continue;
×
3822
      }
3823

3824
      if ((numOfRows + pStb->numOfColumns) > rows) {
67✔
3825
        pShow->restore = true;
28✔
3826
        if (numOfRows == 0) {
28!
3827
          mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
×
3828
                 rows, pStb->numOfColumns, pStb->name, pStb->db);
3829
        }
3830
        sdbRelease(pSdb, pStb);
28✔
3831
        break;
28✔
3832
      }
3833

3834
      varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
39✔
3835

3836
      mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
39!
3837

3838
      char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
39✔
3839
      RETRIEVE_CHECK_GOTO(tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB), pStb, &lino, _OVER);
39!
3840
      RETRIEVE_CHECK_GOTO(tNameGetDbName(&name, varDataVal(db)), pStb, &lino, _OVER);
39!
3841
      varDataSetLen(db, strlen(varDataVal(db)));
39✔
3842

3843
      for (int i = 0; i < pStb->numOfColumns; i++) {
143,350✔
3844
        int32_t          cols = 0;
143,311✔
3845
        SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
143,311✔
3846
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false), pStb, &lino, _OVER);
143,311!
3847

3848
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
143,311✔
3849
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)db, false), pStb, &lino, _OVER);
143,311!
3850

3851
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
143,311✔
3852
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, typeName, false), pStb, &lino, _OVER);
143,311!
3853

3854
        // col name
3855
        char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
143,311✔
3856
        STR_TO_VARSTR(colName, pStb->pColumns[i].name);
143,311✔
3857
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
143,311✔
3858
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, colName, false), pStb, &lino, _OVER);
143,311!
3859

3860
        // col type
3861
        int8_t colType = pStb->pColumns[i].type;
143,311✔
3862
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
143,311✔
3863
        char colTypeStr[VARSTR_HEADER_SIZE + 32];
3864
        int  colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
143,311✔
3865
        if (colType == TSDB_DATA_TYPE_VARCHAR) {
143,311✔
3866
          colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
4✔
3867
                                (int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
4✔
3868
        } else if (colType == TSDB_DATA_TYPE_NCHAR) {
143,307✔
3869
          colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
4✔
3870
                                (int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
4✔
3871
        }
3872
        varDataSetLen(colTypeStr, colTypeLen);
143,311✔
3873
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false), pStb, &lino, _OVER);
143,311!
3874

3875
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
143,311✔
3876
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false), pStb,
143,311!
3877
                            &lino, _OVER);
3878
        while (cols < pShow->numOfColumns) {
573,244✔
3879
          pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
429,933✔
3880
          colDataSetNULL(pColInfo, numOfRows);
429,933!
3881
        }
3882
        numOfRows++;
143,311✔
3883
      }
3884

3885
      sdbRelease(pSdb, pStb);
39✔
3886
    }
3887

3888
    if (pDb != NULL) {
6,487✔
3889
      mndReleaseDb(pMnode, pDb);
35✔
3890
    }
3891
  }
3892

3893
  mDebug("mndRetrieveStbCol success, rows:%d, pShow->numOfRows:%d", numOfRows, pShow->numOfRows);
6,489!
3894
  goto _OVER;
6,489✔
3895

3896
_ERROR:
3897
  mError("failed to mndRetrieveStbCol, rows:%d, pShow->numOfRows:%d, at %s:%d since %s", numOfRows, pShow->numOfRows,
3898
         __FUNCTION__, lino, tstrerror(code));
3899

3900
_OVER:
6,489✔
3901
  pShow->numOfRows += numOfRows;
6,489✔
3902
  return numOfRows;
6,489✔
3903
}
3904

3905
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) {
1✔
3906
  SSdb *pSdb = pMnode->pSdb;
1✔
3907
  sdbCancelFetchByType(pSdb, pIter, SDB_STB);
1✔
3908
}
1✔
3909

3910
const char *mndGetStbStr(const char *src) {
151✔
3911
  char *posDb = strstr(src, TS_PATH_DELIMITER);
151✔
3912
  if (posDb != NULL) ++posDb;
151✔
3913
  if (posDb == NULL) return src;
151✔
3914

3915
  char *posStb = strstr(posDb, TS_PATH_DELIMITER);
150✔
3916
  if (posStb != NULL) ++posStb;
150!
3917
  if (posStb == NULL) return posDb;
150!
3918
  return posStb;
150✔
3919
}
3920

3921
static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
×
3922
  // impl
3923
  return TSDB_CODE_SUCCESS;
×
3924
}
3925

3926
/*int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void *sql,
3927
                        int32_t len) {
3928
  // impl later
3929
  int32_t code = 0;
3930
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-stb-index");
3931
  if (pTrans == NULL) goto _OVER;
3932

3933
  mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
3934
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
3935
  if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
3936

3937
  if (mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
3938
  if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
3939
  if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, sql, len) != 0) goto _OVER;
3940
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
3941

3942
  return code;
3943

3944
_OVER:
3945
  mndTransDrop(pTrans);
3946
  return code;
3947
}
3948
static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *tagIdxReq, SDbObj *pDb, SStbObj *pOld) {
3949
  bool    needRsp = true;
3950
  int32_t code = -1;
3951
  SField *pField0 = NULL;
3952

3953
  SStbObj  stbObj = {0};
3954
  SStbObj *pNew = &stbObj;
3955

3956
  taosRLockLatch(&pOld->lock);
3957
  memcpy(&stbObj, pOld, sizeof(SStbObj));
3958
  taosRUnLockLatch(&pOld->lock);
3959

3960
  stbObj.pColumns = NULL;
3961
  stbObj.pTags = NULL;
3962
  stbObj.updateTime = taosGetTimestampMs();
3963
  stbObj.lock = 0;
3964

3965
  int32_t tag = mndFindSuperTableTagIndex(pOld, tagIdxReq->colName);
3966
  if (tag < 0) {
3967
    terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
3968
    return -1;
3969
  }
3970
  col_id_t colId = pOld->pTags[tag].colId;
3971
  if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) {
3972
    return -1;
3973
  }
3974
  if (mndAllocStbSchemas(pOld, pNew) != 0) {
3975
    return -1;
3976
  }
3977

3978
  SSchema *pTag = pNew->pTags + tag;
3979
  if (IS_IDX_ON(pTag)) {
3980
    terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
3981
    return -1;
3982
  } else {
3983
    pTag->flags |= COL_IDX_ON;
3984
  }
3985
  pNew->tagVer++;
3986

3987
  code = mndAddIndexImpl(pMnode, pReq, pDb, pNew, needRsp, pReq->pCont, pReq->contLen);
3988

3989
  return code;
3990
}
3991
static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq) {
3992
  SMnode            *pMnode = pReq->info.node;
3993
  int32_t            code = -1;
3994
  SDbObj            *pDb = NULL;
3995
  SStbObj           *pStb = NULL;
3996
  SCreateTagIndexReq tagIdxReq = {0};
3997

3998
  if (tDeserializeSCreateTagIdxReq(pReq->pCont, pReq->contLen, &tagIdxReq) != 0) {
3999
    terrno = TSDB_CODE_INVALID_MSG;
4000
    goto _OVER;
4001
  }
4002

4003
  mInfo("stb:%s, start to alter", tagIdxReq.stbName);
4004

4005
  if (mndCheckIndexReq(&tagIdxReq) != TSDB_CODE_SUCCESS) {
4006
    goto _OVER;
4007
  }
4008

4009
  pDb = mndAcquireDbByStb(pMnode, tagIdxReq.dbFName);
4010
  if (pDb == NULL) {
4011
    terrno = TSDB_CODE_MND_DB_NOT_EXIST;
4012
    goto _OVER;
4013
  }
4014

4015
  pStb = mndAcquireStb(pMnode, tagIdxReq.stbName);
4016
  if (pStb == NULL) {
4017
    terrno = TSDB_CODE_MND_STB_NOT_EXIST;
4018
    goto _OVER;
4019
  }
4020
  if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
4021
    goto _OVER;
4022
  }
4023

4024
  code = mndAddIndex(pMnode, pReq, &tagIdxReq, pDb, pStb);
4025
  if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST || terrno == TSDB_CODE_MND_TAG_NOT_EXIST) {
4026
    return terrno;
4027
  } else {
4028
    if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
4029
  }
4030
_OVER:
4031
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
4032
    mError("stb:%s, failed to create index since %s", tagIdxReq.stbName, terrstr());
4033
  }
4034
  mndReleaseStb(pMnode, pStb);
4035
  mndReleaseDb(pMnode, pDb);
4036
  return code;
4037
}
4038
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq) {
4039
  SMnode          *pMnode = pReq->info.node;
4040
  int32_t          code = -1;
4041
  SDbObj          *pDb = NULL;
4042
  SStbObj         *pStb = NULL;
4043
  SDropTagIndexReq dropReq = {0};
4044
  if (tDeserializeSDropTagIdxReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
4045
    terrno = TSDB_CODE_INVALID_MSG;
4046
    goto _OVER;
4047
  }
4048
  //
4049
  return TSDB_CODE_SUCCESS;
4050
_OVER:
4051
  return code;
4052
}*/
4053

4054
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) {
2✔
4055
  int32_t code = mndProcessDropStbReq(pReq);
2✔
4056
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
4057
    pReq->info.rsp = rpcMallocCont(1);
×
4058
    pReq->info.rspLen = 1;
×
4059
    pReq->info.noResp = false;
×
4060
    pReq->code = code;
×
4061
  }
4062
  return code;
2✔
4063
}
4064

4065
typedef struct SVDropTbVgReqs {
4066
  SArray     *pBatchReqs;
4067
  SVgroupInfo info;
4068
} SVDropTbVgReqs;
4069

4070
typedef struct SMDropTbDbInfo {
4071
  SArray *dbVgInfos;
4072
  int32_t hashPrefix;
4073
  int32_t hashSuffix;
4074
  int32_t hashMethod;
4075
} SMDropTbDbInfo;
4076

4077
typedef struct SMDropTbTsmaInfo {
4078
  char           tsmaResTbDbFName[TSDB_DB_FNAME_LEN];
4079
  char           tsmaResTbNamePrefix[TSDB_TABLE_FNAME_LEN];
4080
  int32_t        suid;
4081
  SMDropTbDbInfo dbInfo;  // reference to DbInfo in pDbMap
4082
} SMDropTbTsmaInfo;
4083

4084
typedef struct SMDropTbTsmaInfos {
4085
  SArray *pTsmaInfos;  // SMDropTbTsmaInfo
4086
} SMDropTbTsmaInfos;
4087

4088
typedef struct SMndDropTbsWithTsmaCtx {
4089
  SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>
4090
} SMndDropTbsWithTsmaCtx;
4091

4092
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
4093
                                                 int32_t vgId);
4094

4095
static void destroySVDropTbBatchReqs(void *p);
4096
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
18✔
4097
  if (!p) return;
18!
4098

4099
  if (p->pVgMap) {
18!
4100
    void *pIter = taosHashIterate(p->pVgMap, NULL);
18✔
4101
    while (pIter) {
36✔
4102
      SVDropTbVgReqs *pReqs = pIter;
18✔
4103
      taosArrayDestroyEx(pReqs->pBatchReqs, destroySVDropTbBatchReqs);
18✔
4104
      pIter = taosHashIterate(p->pVgMap, pIter);
18✔
4105
    }
4106
    taosHashCleanup(p->pVgMap);
18✔
4107
  }
4108
  taosMemoryFree(p);
18✔
4109
}
4110

4111
static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx **ppCtx) {
18✔
4112
  int32_t                 code = 0;
18✔
4113
  SMndDropTbsWithTsmaCtx *pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx));
18✔
4114
  if (!pCtx) return terrno;
18!
4115

4116
  pCtx->pVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
18✔
4117
  if (!pCtx->pVgMap) {
18!
4118
    code = terrno;
×
4119
    goto _end;
×
4120
  }
4121

4122
  *ppCtx = pCtx;
18✔
4123
_end:
18✔
4124
  if (code) mndDestroyDropTbsWithTsmaCtx(pCtx);
18!
4125
  return code;
18✔
4126
}
4127

4128
static void *mndBuildVDropTbsReq(SMnode *pMnode, const SVgroupInfo *pVgInfo, const SVDropTbBatchReq *pReq,
52✔
4129
                                 int32_t *len) {
4130
  int32_t   contLen = 0;
52✔
4131
  int32_t   ret = 0;
52✔
4132
  SMsgHead *pHead = NULL;
52✔
4133
  SEncoder  encoder = {0};
52✔
4134

4135
  tEncodeSize(tEncodeSVDropTbBatchReq, pReq, contLen, ret);
52!
4136
  if (ret < 0) return NULL;
52!
4137

4138
  contLen += sizeof(SMsgHead);
52✔
4139
  pHead = taosMemoryMalloc(contLen);
52✔
4140
  if (pHead == NULL) {
52!
4141
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4142
    return NULL;
×
4143
  }
4144

4145
  pHead->contLen = htonl(contLen);
52✔
4146
  pHead->vgId = htonl(pVgInfo->vgId);
52✔
4147

4148
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
52✔
4149

4150
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
52✔
4151
  int32_t code = tEncodeSVDropTbBatchReq(&encoder, pReq);
52✔
4152
  tEncoderClear(&encoder);
52✔
4153
  if (code != 0) return NULL;
52!
4154

4155
  *len = contLen;
52✔
4156
  return pHead;
52✔
4157
}
4158

4159
static int32_t mndSetDropTbsRedoActions(SMnode *pMnode, STrans *pTrans, const SVDropTbVgReqs *pVgReqs, void *pCont,
52✔
4160
                                        int32_t contLen, tmsg_t msgType) {
4161
  STransAction action = {0};
52✔
4162
  action.epSet = pVgReqs->info.epSet;
52✔
4163
  action.pCont = pCont;
52✔
4164
  action.contLen = contLen;
52✔
4165
  action.msgType = msgType;
52✔
4166
  action.acceptableCode = TSDB_CODE_TDB_TABLE_NOT_EXIST;
52✔
4167
  return mndTransAppendRedoAction(pTrans, &action);
52✔
4168
}
4169

4170
static int32_t mndBuildDropTbRedoActions(SMnode *pMnode, STrans *pTrans, SHashObj *pVgMap, tmsg_t msgType) {
13✔
4171
  int32_t code = 0;
13✔
4172
  void   *pIter = taosHashIterate(pVgMap, NULL);
13✔
4173
  while (pIter) {
26✔
4174
    const SVDropTbVgReqs *pVgReqs = pIter;
13✔
4175
    int32_t               len = 0;
13✔
4176
    for (int32_t i = 0; i < taosArrayGetSize(pVgReqs->pBatchReqs) && code == TSDB_CODE_SUCCESS; ++i) {
65!
4177
      SVDropTbBatchReq *pBatchReq = taosArrayGet(pVgReqs->pBatchReqs, i);
52✔
4178
      void             *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, pBatchReq, &len);
52✔
4179
      if (!p) {
52!
4180
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4181
        if (terrno != 0) code = terrno;
×
4182
        break;
×
4183
      }
4184
      if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len, msgType)) != 0) {
52!
4185
        break;
×
4186
      }
4187
    }
4188
    if (TSDB_CODE_SUCCESS != code) {
13!
4189
      taosHashCancelIterate(pVgMap, pIter);
×
4190
      break;
×
4191
    }
4192
    pIter = taosHashIterate(pVgMap, pIter);
13✔
4193
  }
4194
  return code;
13✔
4195
}
4196

4197
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx *pCtx) {
18✔
4198
  int32_t code = 0;
18✔
4199
  SMnode *pMnode = pRsp->info.node;
18✔
4200
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs");
18✔
4201
  mndTransSetChangeless(pTrans);
18✔
4202
  mndTransSetSerial(pTrans);
18✔
4203
  if (pTrans == NULL) {
18!
4204
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4205
    if (terrno != 0) code = terrno;
×
4206
    goto _OVER;
×
4207
  }
4208

4209
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
18✔
4210

4211
  if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER;
13!
4212
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
13!
4213

4214
_OVER:
13✔
4215
  mndTransDrop(pTrans);
18✔
4216
  TAOS_RETURN(code);
18✔
4217
}
4218

UNCOV
4219
static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
×
UNCOV
4220
  int32_t      code = -1;
×
UNCOV
4221
  SMnode      *pMnode = pReq->info.node;
×
UNCOV
4222
  SDbObj      *pDb = NULL;
×
UNCOV
4223
  SStbObj     *pStb = NULL;
×
UNCOV
4224
  SMDropTbsReq dropReq = {0};
×
UNCOV
4225
  bool         locked = false;
×
UNCOV
4226
  if (tDeserializeSMDropTbsReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
×
4227
    code = TSDB_CODE_INVALID_MSG;
×
4228
    goto _OVER;
×
4229
  }
4230

UNCOV
4231
  SMndDropTbsWithTsmaCtx *pCtx = NULL;
×
UNCOV
4232
  code = mndInitDropTbsWithTsmaCtx(&pCtx);
×
UNCOV
4233
  if (code) goto _OVER;
×
UNCOV
4234
  for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
×
UNCOV
4235
    SMDropTbReqsOnSingleVg *pReq = taosArrayGet(dropReq.pVgReqs, i);
×
UNCOV
4236
    code = mndDropTbForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
×
UNCOV
4237
    if (code) goto _OVER;
×
4238
  }
UNCOV
4239
  code = mndCreateDropTbsTxnPrepare(pReq, pCtx);
×
UNCOV
4240
  if (code == 0) {
×
UNCOV
4241
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
4242
  }
4243
_OVER:
×
UNCOV
4244
  tFreeSMDropTbsReq(&dropReq);
×
UNCOV
4245
  if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
×
UNCOV
4246
  TAOS_RETURN(code);
×
4247
}
4248

4249
static int32_t createDropTbBatchReq(const SVDropTbReq *pReq, SVDropTbBatchReq *pBatchReq) {
74✔
4250
  pBatchReq->nReqs = 1;
74✔
4251
  pBatchReq->pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
74✔
4252
  if (!pBatchReq->pArray) return terrno;
74!
4253
  if (taosArrayPush(pBatchReq->pArray, pReq) == NULL) {
148!
4254
    taosArrayDestroy(pBatchReq->pArray);
×
4255
    pBatchReq->pArray = NULL;
×
4256
    return terrno;
×
4257
  }
4258
  return TSDB_CODE_SUCCESS;
74✔
4259
}
4260

4261
static void destroySVDropTbBatchReqs(void *p) {
74✔
4262
  SVDropTbBatchReq *pReq = p;
74✔
4263
  taosArrayDestroy(pReq->pArray);
74✔
4264
  pReq->pArray = NULL;
74✔
4265
}
74✔
4266

4267
static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupInfo *pVgInfo, char *name, tb_uid_t suid,
74✔
4268
                            bool ignoreNotExists) {
4269
  SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists, .uid = 0};
74✔
4270

4271
  SVDropTbVgReqs *pVgReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
74✔
4272
  SVDropTbVgReqs  vgReqs = {0};
74✔
4273
  if (pVgReqs == NULL) {
74✔
4274
    vgReqs.info = *pVgInfo;
18✔
4275
    vgReqs.pBatchReqs = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbBatchReq));
18✔
4276
    if (!vgReqs.pBatchReqs) return terrno;
18!
4277
    SVDropTbBatchReq batchReq = {0};
18✔
4278
    int32_t          code = createDropTbBatchReq(&req, &batchReq);
18✔
4279
    if (TSDB_CODE_SUCCESS != code) return code;
18!
4280
    if (taosArrayPush(vgReqs.pBatchReqs, &batchReq) == NULL) {
36!
4281
      taosArrayDestroy(batchReq.pArray);
×
4282
      return terrno;
×
4283
    }
4284
    if (taosHashPut(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &vgReqs, sizeof(vgReqs)) != 0) {
18!
4285
      taosArrayDestroyEx(vgReqs.pBatchReqs, destroySVDropTbBatchReqs);
×
4286
      return terrno;
×
4287
    }
4288
  } else {
4289
    SVDropTbBatchReq batchReq = {0};
56✔
4290
    int32_t          code = createDropTbBatchReq(&req, &batchReq);
56✔
4291
    if (TSDB_CODE_SUCCESS != code) return code;
56!
4292
    if (taosArrayPush(pVgReqs->pBatchReqs, &batchReq) == NULL) {
112!
4293
      taosArrayDestroy(batchReq.pArray);
×
4294
      return terrno;
×
4295
    }
4296
  }
4297
  return 0;
74✔
4298
}
4299

4300
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
18✔
4301
                                                 int32_t vgId) {
4302
  int32_t code = 0;
18✔
4303

4304
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
18✔
4305
  if (!pVgObj) {
18!
4306
    code = 0;
×
4307
    goto _end;
×
4308
  }
4309
  SVgroupInfo vgInfo = {.hashBegin = pVgObj->hashBegin,
18✔
4310
                        .hashEnd = pVgObj->hashEnd,
18✔
4311
                        .numOfTable = pVgObj->numOfTables,
18✔
4312
                        .vgId = pVgObj->vgId};
18✔
4313
  vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj);
18✔
4314
  mndReleaseVgroup(pMnode, pVgObj);
18✔
4315

4316
  for (int32_t i = 0; i < pTbs->size; ++i) {
92✔
4317
    SVDropTbReq *pTb = taosArrayGet(pTbs, i);
74✔
4318
    TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists), NULL, _end);
74!
4319
  }
4320
_end:
18✔
4321
  return code;
18✔
4322
}
4323

4324
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
8,741✔
4325
  int32_t                 code = -1;
8,741✔
4326
  SDecoder                decoder = {0};
8,741✔
4327
  SMnode                 *pMnode = pRsp->info.node;
8,741✔
4328
  SVFetchTtlExpiredTbsRsp rsp = {0};
8,741✔
4329
  SMndDropTbsWithTsmaCtx *pCtx = NULL;
8,741✔
4330
  if (pRsp->code != TSDB_CODE_SUCCESS) {
8,741✔
4331
    code = pRsp->code;
137✔
4332
    goto _end;
137✔
4333
  }
4334
  if (pRsp->contLen == 0) {
8,604✔
4335
    code = 0;
8,586✔
4336
    goto _end;
8,586✔
4337
  }
4338

4339
  tDecoderInit(&decoder, pRsp->pCont, pRsp->contLen);
18✔
4340
  code = tDecodeVFetchTtlExpiredTbsRsp(&decoder, &rsp);
18✔
4341
  if (code) goto _end;
18!
4342

4343
  code = mndInitDropTbsWithTsmaCtx(&pCtx);
18✔
4344
  if (code) goto _end;
18!
4345

4346
  code = mndDropTbForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
18✔
4347
  if (code) goto _end;
18!
4348
  code = mndCreateDropTbsTxnPrepare(pRsp, pCtx);
18✔
4349
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
18✔
4350
_end:
5✔
4351
  if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
8,741✔
4352
  tDecoderClear(&decoder);
8,741✔
4353
  tFreeFetchTtlExpiredTbsRsp(&rsp);
8,741✔
4354
  TAOS_RETURN(code);
8,741✔
4355
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc