• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3548

04 Dec 2024 01:03PM UTC coverage: 59.846% (-0.8%) from 60.691%
#3548

push

travis-ci

web-flow
Merge pull request #29033 from taosdata/fix/calculate-vnode-memory-used

fix/calculate-vnode-memory-used

118484 of 254183 branches covered (46.61%)

Branch coverage included in aggregate %.

199691 of 277471 relevant lines covered (71.97%)

18794141.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.05
/source/dnode/mnode/impl/src/mndStb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndStb.h"
18
#include "audit.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndIndex.h"
22
#include "mndIndexComm.h"
23
#include "mndInfoSchema.h"
24
#include "mndMnode.h"
25
#include "mndPerfSchema.h"
26
#include "mndPrivilege.h"
27
#include "mndScheduler.h"
28
#include "mndShow.h"
29
#include "mndSma.h"
30
#include "mndTopic.h"
31
#include "mndTrans.h"
32
#include "mndUser.h"
33
#include "mndVgroup.h"
34
#include "tname.h"
35

36
#define STB_VER_NUMBER   2
37
#define STB_RESERVE_SIZE 64
38

39
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
40
static int32_t  mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
41
static int32_t  mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
42
static int32_t  mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
43
static int32_t  mndProcessTtlTimer(SRpcMsg *pReq);
44
static int32_t  mndProcessTrimDbTimer(SRpcMsg *pReq);
45
static int32_t  mndProcessS3MigrateDbTimer(SRpcMsg *pReq);
46
static int32_t  mndProcessS3MigrateDbRsp(SRpcMsg *pReq);
47
static int32_t  mndProcessCreateStbReq(SRpcMsg *pReq);
48
static int32_t  mndProcessAlterStbReq(SRpcMsg *pReq);
49
static int32_t  mndProcessDropStbReq(SRpcMsg *pReq);
50
static int32_t  mndProcessDropTtltbRsp(SRpcMsg *pReq);
51
static int32_t  mndProcessTrimDbRsp(SRpcMsg *pReq);
52
static int32_t  mndProcessTableMetaReq(SRpcMsg *pReq);
53
static int32_t  mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
54
static int32_t  mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
55
static void     mndCancelGetNextStb(SMnode *pMnode, void *pIter);
56
static int32_t  mndProcessTableCfgReq(SRpcMsg *pReq);
57
static int32_t  mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
58
                               void *alterOriData, int32_t alterOriDataLen);
59
static int32_t  mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
60
                                              void *alterOriData, int32_t alterOriDataLen, const SMAlterStbReq *pAlter);
61

62
static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq);
63
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq);
64

65
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq);
66
static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq);
67
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pReq);
68

69
int32_t mndInitStb(SMnode *pMnode) {
1,701✔
70
  SSdbTable table = {
1,701✔
71
      .sdbType = SDB_STB,
72
      .keyType = SDB_KEY_BINARY,
73
      .encodeFp = (SdbEncodeFp)mndStbActionEncode,
74
      .decodeFp = (SdbDecodeFp)mndStbActionDecode,
75
      .insertFp = (SdbInsertFp)mndStbActionInsert,
76
      .updateFp = (SdbUpdateFp)mndStbActionUpdate,
77
      .deleteFp = (SdbDeleteFp)mndStbActionDelete,
78
  };
79

80
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessCreateStbReq);
1,701✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq);
1,701✔
82
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq);
1,701✔
83
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp);
1,701✔
84
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbRsp);
1,701✔
85
  mndSetMsgHandle(pMnode, TDMT_VND_TRIM_RSP, mndProcessTrimDbRsp);
1,701✔
86
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
1,701✔
87
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
1,701✔
88
  mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
1,701✔
89
  mndSetMsgHandle(pMnode, TDMT_MND_TTL_TIMER, mndProcessTtlTimer);
1,701✔
90
  mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB_TIMER, mndProcessTrimDbTimer);
1,701✔
91
  mndSetMsgHandle(pMnode, TDMT_VND_S3MIGRATE_RSP, mndProcessS3MigrateDbRsp);
1,701✔
92
  mndSetMsgHandle(pMnode, TDMT_MND_S3MIGRATE_DB_TIMER, mndProcessS3MigrateDbTimer);
1,701✔
93
  mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
1,701✔
94
  mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP, mndProcessDropStbReqFromMNode);
1,701✔
95
  mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP_RSP, mndTransProcessRsp);
1,701✔
96
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma);
1,701✔
97
  mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs);
1,701✔
98
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TABLE_RSP, mndTransProcessRsp);
1,701✔
99
  //  mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
100

101
  // mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq);
102
  // mndSetMsgHandle(pMnode, TDMT_MND_DROP_INDEX, mndProcessDropIndexReq);
103
  // mndSetMsgHandle(pMnode, TDMT_VND_CREATE_INDEX_RSP, mndTransProcessRsp);
104
  // mndSetMsgHandle(pMnode, TDMT_VND_DROP_INDEX_RSP, mndTransProcessRsp);
105

106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
1,701✔
107
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb);
1,701✔
108

109
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COL, mndRetrieveStbCol);
1,701✔
110
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_COL, mndCancelGetNextStb);
1,701✔
111

112
  return sdbSetTable(pMnode->pSdb, table);
1,701✔
113
}
114

115
void mndCleanupStb(SMnode *pMnode) {}
1,701✔
116

117
SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
30,561✔
118
  int32_t code = 0;
30,561✔
119
  int32_t lino = 0;
30,561✔
120
  terrno = TSDB_CODE_OUT_OF_MEMORY;
30,561✔
121

122
  int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + pStb->commentLen +
61,122✔
123
                 pStb->ast1Len + pStb->ast2Len + pStb->numOfColumns * sizeof(SColCmpr) + STB_RESERVE_SIZE +
61,122✔
124
                 taosArrayGetSize(pStb->pFuncs) * TSDB_FUNC_NAME_LEN;
30,561✔
125
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
30,561✔
126
  if (pRaw == NULL) goto _OVER;
30,561!
127

128
  int32_t dataPos = 0;
30,561✔
129
  SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN, _OVER)
30,561!
130
  SDB_SET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN, _OVER)
30,561!
131
  SDB_SET_INT64(pRaw, dataPos, pStb->createdTime, _OVER)
30,561!
132
  SDB_SET_INT64(pRaw, dataPos, pStb->updateTime, _OVER)
30,561!
133
  SDB_SET_INT64(pRaw, dataPos, pStb->uid, _OVER)
30,561!
134
  SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, _OVER)
30,561!
135
  SDB_SET_INT32(pRaw, dataPos, pStb->tagVer, _OVER)
30,561!
136
  SDB_SET_INT32(pRaw, dataPos, pStb->colVer, _OVER)
30,561!
137
  SDB_SET_INT32(pRaw, dataPos, pStb->smaVer, _OVER)
30,561!
138
  SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER)
30,561!
139
  SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[0], _OVER)
30,561!
140
  SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[1], _OVER)
30,561!
141
  SDB_SET_INT64(pRaw, dataPos, pStb->watermark[0], _OVER)
30,561!
142
  SDB_SET_INT64(pRaw, dataPos, pStb->watermark[1], _OVER)
30,561!
143
  SDB_SET_INT32(pRaw, dataPos, pStb->ttl, _OVER)
30,561!
144
  SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, _OVER)
30,561!
145
  SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER)
30,561!
146
  SDB_SET_INT32(pRaw, dataPos, pStb->numOfFuncs, _OVER)
30,561!
147
  SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, _OVER)
30,561!
148
  SDB_SET_INT32(pRaw, dataPos, pStb->ast1Len, _OVER)
30,561!
149
  SDB_SET_INT32(pRaw, dataPos, pStb->ast2Len, _OVER)
30,561!
150

151
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
679,023✔
152
    SSchema *pSchema = &pStb->pColumns[i];
648,462✔
153
    SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
648,462!
154
    SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
648,462!
155
    SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
648,462!
156
    SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
648,462!
157
    SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
648,462!
158
  }
159

160
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
265,611✔
161
    SSchema *pSchema = &pStb->pTags[i];
235,050✔
162
    SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
235,050!
163
    SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
235,050!
164
    SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
235,050!
165
    SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
235,050!
166
    SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
235,050!
167
  }
168

169
  for (int32_t i = 0; i < pStb->numOfFuncs; ++i) {
30,567✔
170
    char *func = taosArrayGet(pStb->pFuncs, i);
6✔
171
    SDB_SET_BINARY(pRaw, dataPos, func, TSDB_FUNC_NAME_LEN, _OVER)
6!
172
  }
173

174
  if (pStb->commentLen > 0) {
30,561✔
175
    SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen + 1, _OVER)
131!
176
  }
177

178
  if (pStb->ast1Len > 0) {
30,561✔
179
    SDB_SET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
6!
180
  }
181

182
  if (pStb->ast2Len > 0) {
30,561✔
183
    SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
6!
184
  }
185

186
  if (pStb->pCmpr != NULL) {
30,561!
187
    for (int i = 0; i < pStb->numOfColumns; i++) {
679,023✔
188
      SColCmpr *p = &pStb->pCmpr[i];
648,462✔
189
      SDB_SET_INT16(pRaw, dataPos, p->id, _OVER)
648,462!
190
      SDB_SET_INT32(pRaw, dataPos, p->alg, _OVER)
648,462!
191
    }
192
  }
193
  SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
30,561!
194
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
30,561!
195

196
  terrno = 0;
30,561✔
197

198
_OVER:
30,561✔
199
  if (terrno != 0) {
30,561!
200
    mError("stb:%s, failed to encode to raw:%p since %s", pStb->name, pRaw, terrstr());
×
201
    sdbFreeRaw(pRaw);
×
202
    return NULL;
×
203
  }
204

205
  mTrace("stb:%s, encode to raw:%p, row:%p", pStb->name, pRaw, pStb);
30,561✔
206
  return pRaw;
30,561✔
207
}
208

209
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
20,311✔
210
  int32_t code = 0;
20,311✔
211
  int32_t lino = 0;
20,311✔
212
  terrno = TSDB_CODE_OUT_OF_MEMORY;
20,311✔
213
  SSdbRow *pRow = NULL;
20,311✔
214
  SStbObj *pStb = NULL;
20,311✔
215

216
  int8_t sver = 0;
20,311✔
217
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
20,311!
218

219
  if (sver > STB_VER_NUMBER) {
20,311!
220
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
221
    goto _OVER;
×
222
  }
223

224
  pRow = sdbAllocRow(sizeof(SStbObj));
20,311✔
225
  if (pRow == NULL) goto _OVER;
20,311!
226

227
  pStb = sdbGetRowObj(pRow);
20,311✔
228
  if (pStb == NULL) goto _OVER;
20,311!
229

230
  int32_t dataPos = 0;
20,311✔
231
  SDB_GET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN, _OVER)
20,311!
232
  SDB_GET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN, _OVER)
20,311!
233
  SDB_GET_INT64(pRaw, dataPos, &pStb->createdTime, _OVER)
20,311!
234
  SDB_GET_INT64(pRaw, dataPos, &pStb->updateTime, _OVER)
20,311!
235
  SDB_GET_INT64(pRaw, dataPos, &pStb->uid, _OVER)
20,311!
236
  SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, _OVER)
20,311!
237
  SDB_GET_INT32(pRaw, dataPos, &pStb->tagVer, _OVER)
20,311!
238
  SDB_GET_INT32(pRaw, dataPos, &pStb->colVer, _OVER)
20,311!
239
  SDB_GET_INT32(pRaw, dataPos, &pStb->smaVer, _OVER)
20,311!
240
  SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER)
20,311!
241
  SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[0], _OVER)
20,311!
242
  SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[1], _OVER)
20,311!
243
  SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[0], _OVER)
20,311!
244
  SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[1], _OVER)
20,311!
245
  SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, _OVER)
20,311!
246
  SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, _OVER)
20,311!
247
  SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER)
20,311!
248
  SDB_GET_INT32(pRaw, dataPos, &pStb->numOfFuncs, _OVER)
20,311!
249
  SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, _OVER)
20,311!
250
  SDB_GET_INT32(pRaw, dataPos, &pStb->ast1Len, _OVER)
20,311!
251
  SDB_GET_INT32(pRaw, dataPos, &pStb->ast2Len, _OVER)
20,311!
252

253
  pStb->pColumns = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
20,311✔
254
  pStb->pTags = taosMemoryCalloc(pStb->numOfTags, sizeof(SSchema));
20,311✔
255
  pStb->pFuncs = taosArrayInit(pStb->numOfFuncs, TSDB_FUNC_NAME_LEN);
20,311✔
256
  if (pStb->pColumns == NULL || pStb->pTags == NULL || pStb->pFuncs == NULL) {
20,311!
257
    goto _OVER;
×
258
  }
259

260
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
430,377✔
261
    SSchema *pSchema = &pStb->pColumns[i];
410,066✔
262
    SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
410,066!
263
    SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
410,066!
264
    SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
410,066!
265
    SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
410,066!
266
    SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
410,066!
267
  }
268

269
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
157,541✔
270
    SSchema *pSchema = &pStb->pTags[i];
137,230✔
271
    SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
137,230!
272
    SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
137,230!
273
    SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
137,230!
274
    SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
137,230!
275
    SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
137,230!
276
  }
277

278
  for (int32_t i = 0; i < pStb->numOfFuncs; ++i) {
20,321✔
279
    char funcName[TSDB_FUNC_NAME_LEN] = {0};
10✔
280
    SDB_GET_BINARY(pRaw, dataPos, funcName, TSDB_FUNC_NAME_LEN, _OVER)
10!
281
    if (taosArrayPush(pStb->pFuncs, funcName) == NULL) goto _OVER;
20!
282
  }
283

284
  if (pStb->commentLen > 0) {
20,311✔
285
    pStb->comment = taosMemoryCalloc(pStb->commentLen + 1, 1);
113✔
286
    if (pStb->comment == NULL) goto _OVER;
113!
287
    SDB_GET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen + 1, _OVER)
113!
288
  }
289

290
  if (pStb->ast1Len > 0) {
20,311✔
291
    pStb->pAst1 = taosMemoryCalloc(pStb->ast1Len, 1);
10✔
292
    if (pStb->pAst1 == NULL) goto _OVER;
10!
293
    SDB_GET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
10!
294
  }
295

296
  if (pStb->ast2Len > 0) {
20,311✔
297
    pStb->pAst2 = taosMemoryCalloc(pStb->ast2Len, 1);
10✔
298
    if (pStb->pAst2 == NULL) goto _OVER;
10!
299
    SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
10!
300
  }
301

302
  pStb->pCmpr = taosMemoryCalloc(pStb->numOfColumns, sizeof(SColCmpr));
20,311✔
303
  if (sver < STB_VER_NUMBER) {
20,311!
304
    // compatible with old data, setup default compress value
305
    // impl later
306
    for (int i = 0; i < pStb->numOfColumns; i++) {
×
307
      SSchema  *pSchema = &pStb->pColumns[i];
×
308
      SColCmpr *pCmpr = &pStb->pCmpr[i];
×
309
      pCmpr->id = pSchema->colId;
×
310
      pCmpr->alg = createDefaultColCmprByType(pSchema->type);
×
311
    }
312
  } else {
313
    for (int i = 0; i < pStb->numOfColumns; i++) {
430,377✔
314
      SColCmpr *pCmpr = &pStb->pCmpr[i];
410,066✔
315
      SDB_GET_INT16(pRaw, dataPos, &pCmpr->id, _OVER)
410,066!
316
      SDB_GET_INT32(pRaw, dataPos, (int32_t *)&pCmpr->alg, _OVER)  // compatiable
410,066!
317
    }
318
  }
319

320
  SDB_GET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
20,311!
321

322
  terrno = 0;
20,311✔
323

324
_OVER:
20,311✔
325
  if (terrno != 0) {
20,311!
326
    mError("stb:%s, failed to decode from raw:%p since %s", pStb == NULL ? "null" : pStb->name, pRaw, terrstr());
×
327
    if (pStb != NULL) {
×
328
      taosMemoryFreeClear(pStb->pColumns);
×
329
      taosMemoryFreeClear(pStb->pTags);
×
330
      taosMemoryFreeClear(pStb->comment);
×
331
      taosMemoryFree(pStb->pCmpr);
×
332
    }
333
    taosMemoryFreeClear(pRow);
×
334
    return NULL;
×
335
  }
336

337
  mTrace("stb:%s, decode from raw:%p, row:%p", pStb->name, pRaw, pStb);
20,311✔
338
  return pRow;
20,311✔
339
}
340

341
void mndFreeStb(SStbObj *pStb) {
28,691✔
342
  taosArrayDestroy(pStb->pFuncs);
28,691✔
343
  taosMemoryFreeClear(pStb->pColumns);
28,691!
344
  taosMemoryFreeClear(pStb->pTags);
28,691!
345
  taosMemoryFreeClear(pStb->comment);
28,691✔
346
  taosMemoryFreeClear(pStb->pAst1);
28,691✔
347
  taosMemoryFreeClear(pStb->pAst2);
28,691✔
348
  taosMemoryFreeClear(pStb->pCmpr);
28,691!
349
}
28,691✔
350

351
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
10,881✔
352
  mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb);
10,881✔
353
  return 0;
10,881✔
354
}
355

356
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
27,263✔
357
  mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
27,263✔
358
  mndFreeStb(pStb);
27,263✔
359
  return 0;
27,263✔
360
}
361

362
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
4,340✔
363
  mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
4,340✔
364

365
  taosWLockLatch(&pOld->lock);
4,340✔
366
  int32_t numOfColumns = pOld->numOfColumns;
4,340✔
367
  if (pOld->numOfColumns < pNew->numOfColumns) {
4,340✔
368
    void *pColumns = taosMemoryMalloc(pNew->numOfColumns * sizeof(SSchema));
203✔
369
    if (pColumns != NULL) {
203!
370
      taosMemoryFree(pOld->pColumns);
203✔
371
      pOld->pColumns = pColumns;
203✔
372
    } else {
373
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
374
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
375
      taosWUnLockLatch(&pOld->lock);
×
376
    }
377
  }
378

379
  if (pOld->numOfTags < pNew->numOfTags) {
4,340✔
380
    void *pTags = taosMemoryMalloc(pNew->numOfTags * sizeof(SSchema));
227✔
381
    if (pTags != NULL) {
227!
382
      taosMemoryFree(pOld->pTags);
227✔
383
      pOld->pTags = pTags;
227✔
384
    } else {
385
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
386
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
387
      taosWUnLockLatch(&pOld->lock);
×
388
    }
389
  }
390

391
  if (pOld->commentLen < pNew->commentLen && pNew->commentLen > 0) {
4,340!
392
    void *comment = taosMemoryMalloc(pNew->commentLen + 1);
16✔
393
    if (comment != NULL) {
16!
394
      taosMemoryFree(pOld->comment);
16✔
395
      pOld->comment = comment;
16✔
396
    } else {
397
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
398
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
399
      taosWUnLockLatch(&pOld->lock);
×
400
    }
401
  }
402
  pOld->commentLen = pNew->commentLen;
4,340✔
403

404
  if (pOld->ast1Len < pNew->ast1Len) {
4,340!
405
    void *pAst1 = taosMemoryMalloc(pNew->ast1Len + 1);
×
406
    if (pAst1 != NULL) {
×
407
      taosMemoryFree(pOld->pAst1);
×
408
      pOld->pAst1 = pAst1;
×
409
    } else {
410
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
411
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
412
      taosWUnLockLatch(&pOld->lock);
×
413
    }
414
  }
415

416
  if (pOld->ast2Len < pNew->ast2Len) {
4,340!
417
    void *pAst2 = taosMemoryMalloc(pNew->ast2Len + 1);
×
418
    if (pAst2 != NULL) {
×
419
      taosMemoryFree(pOld->pAst2);
×
420
      pOld->pAst2 = pAst2;
×
421
    } else {
422
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
423
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
424
      taosWUnLockLatch(&pOld->lock);
×
425
    }
426
  }
427

428
  pOld->updateTime = pNew->updateTime;
4,340✔
429
  pOld->tagVer = pNew->tagVer;
4,340✔
430
  pOld->colVer = pNew->colVer;
4,340✔
431
  pOld->smaVer = pNew->smaVer;
4,340✔
432
  pOld->nextColId = pNew->nextColId;
4,340✔
433
  pOld->ttl = pNew->ttl;
4,340✔
434
  if (pNew->numOfColumns > 0) {
4,340✔
435
    pOld->numOfColumns = pNew->numOfColumns;
4,296✔
436
    memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
4,296✔
437
  }
438
  if (pNew->numOfTags > 0) {
4,340✔
439
    pOld->numOfTags = pNew->numOfTags;
4,296✔
440
    memcpy(pOld->pTags, pNew->pTags, pOld->numOfTags * sizeof(SSchema));
4,296✔
441
  }
442
  if (pNew->commentLen > 0) {
4,340✔
443
    memcpy(pOld->comment, pNew->comment, pNew->commentLen + 1);
94✔
444
    pOld->commentLen = pNew->commentLen;
94✔
445
  }
446
  if (pNew->ast1Len != 0) {
4,340!
447
    memcpy(pOld->pAst1, pNew->pAst1, pNew->ast1Len);
×
448
    pOld->ast1Len = pNew->ast1Len;
×
449
  }
450
  if (pNew->ast2Len != 0) {
4,340!
451
    memcpy(pOld->pAst2, pNew->pAst2, pNew->ast2Len);
×
452
    pOld->ast2Len = pNew->ast2Len;
×
453
  }
454
  if (numOfColumns < pNew->numOfColumns) {
4,340✔
455
    taosMemoryFree(pOld->pCmpr);
203✔
456
    pOld->pCmpr = taosMemoryCalloc(pNew->numOfColumns, sizeof(SColCmpr));
203✔
457
    memcpy(pOld->pCmpr, pNew->pCmpr, pNew->numOfColumns * sizeof(SColCmpr));
203✔
458
  } else {
459
    memcpy(pOld->pCmpr, pNew->pCmpr, pNew->numOfColumns * sizeof(SColCmpr));
4,137✔
460
  }
461

462
  taosWUnLockLatch(&pOld->lock);
4,340✔
463
  return 0;
4,340✔
464
}
465

466
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) {
895,775✔
467
  SSdb    *pSdb = pMnode->pSdb;
895,775✔
468
  SStbObj *pStb = sdbAcquire(pSdb, SDB_STB, stbName);
895,775✔
469
  if (pStb == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
895,791!
470
    terrno = TSDB_CODE_MND_STB_NOT_EXIST;
15,565✔
471
  }
472
  return pStb;
895,782✔
473
}
474

475
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb) {
888,892✔
476
  SSdb *pSdb = pMnode->pSdb;
888,892✔
477
  sdbRelease(pSdb, pStb);
888,892✔
478
}
888,893✔
479

480
SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
12,402✔
481
  SName name = {0};
12,402✔
482
  if ((terrno = tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) return NULL;
12,402!
483

484
  char db[TSDB_TABLE_FNAME_LEN] = {0};
12,402✔
485
  if ((terrno = tNameGetFullDbName(&name, db)) != 0) return NULL;
12,402!
486

487
  return mndAcquireDb(pMnode, db);
12,402✔
488
}
489

490
static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *pSchema) {
491
  if (*(col_id_t *)colId < ((SSchema *)pSchema)->colId) {
492
    return -1;
493
  } else if (*(col_id_t *)colId > ((SSchema *)pSchema)->colId) {
494
    return 1;
495
  }
496
  return 0;
497
}
498

499
void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, void *alterOriData,
23,853✔
500
                            int32_t alterOriDataLen) {
501
  SEncoder       encoder = {0};
23,853✔
502
  int32_t        contLen;
503
  SName          name = {0};
23,853✔
504
  SVCreateStbReq req = {0};
23,853✔
505

506
  if ((terrno = tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
23,853!
507
    goto _err;
×
508
  }
509
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
23,853✔
510
  if ((terrno = tNameGetFullDbName(&name, dbFName)) != 0) {
23,853!
511
    goto _err;
×
512
  };
513

514
  req.name = (char *)tNameGetTableName(&name);
23,853✔
515
  req.suid = pStb->uid;
23,853✔
516
  req.rollup = pStb->ast1Len > 0 ? 1 : 0;
23,853✔
517
  req.alterOriData = alterOriData;
23,853✔
518
  req.alterOriDataLen = alterOriDataLen;
23,853✔
519
  req.source = pStb->source;
23,853✔
520
  // todo
521
  req.schemaRow.nCols = pStb->numOfColumns;
23,853✔
522
  req.schemaRow.version = pStb->colVer;
23,853✔
523
  req.schemaRow.pSchema = pStb->pColumns;
23,853✔
524
  req.schemaTag.nCols = pStb->numOfTags;
23,853✔
525
  req.schemaTag.version = pStb->tagVer;
23,853✔
526
  req.schemaTag.pSchema = pStb->pTags;
23,853✔
527

528
  req.colCmpred = 1;
23,853✔
529
  SColCmprWrapper *pCmpr = &req.colCmpr;
23,853✔
530
  pCmpr->version = pStb->colVer;
23,853✔
531
  pCmpr->nCols = pStb->numOfColumns;
23,853✔
532

533
  req.colCmpr.pColCmpr = taosMemoryCalloc(pCmpr->nCols, sizeof(SColCmpr));
23,853✔
534
  for (int32_t i = 0; i < pStb->numOfColumns; i++) {
442,454✔
535
    SColCmpr *p = &pCmpr->pColCmpr[i];
418,601✔
536
    p->alg = pStb->pCmpr[i].alg;
418,601✔
537
    p->id = pStb->pCmpr[i].id;
418,601✔
538
  }
539

540
  if (req.rollup) {
23,853✔
541
    req.rsmaParam.maxdelay[0] = pStb->maxdelay[0];
4✔
542
    req.rsmaParam.maxdelay[1] = pStb->maxdelay[1];
4✔
543
    req.rsmaParam.watermark[0] = pStb->watermark[0];
4✔
544
    req.rsmaParam.watermark[1] = pStb->watermark[1];
4✔
545
    if (pStb->ast1Len > 0) {
4!
546
      if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
4!
547
                             STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0],
548
                             req.rsmaParam.deleteMark[0]) < 0) {
549
        goto _err;
×
550
      }
551
    }
552
    if (pStb->ast2Len > 0) {
4!
553
      if (mndConvertRsmaTask(&req.rsmaParam.qmsg[1], &req.rsmaParam.qmsgLen[1], pStb->pAst2, pStb->uid,
4!
554
                             STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[1],
555
                             req.rsmaParam.deleteMark[1]) < 0) {
556
        goto _err;
×
557
      }
558
    }
559
  }
560
  // get length
561
  int32_t ret = 0;
23,853✔
562
  tEncodeSize(tEncodeSVCreateStbReq, &req, contLen, ret);
23,853!
563
  if (ret < 0) {
23,853!
564
    goto _err;
×
565
  }
566

567
  contLen += sizeof(SMsgHead);
23,853✔
568

569
  SMsgHead *pHead = taosMemoryCalloc(1, contLen);
23,853✔
570
  if (pHead == NULL) {
23,853!
571
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
572
    goto _err;
×
573
  }
574

575
  pHead->contLen = htonl(contLen);
23,853✔
576
  pHead->vgId = htonl(pVgroup->vgId);
23,853✔
577

578
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
23,853✔
579
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
23,853✔
580
  if (tEncodeSVCreateStbReq(&encoder, &req) < 0) {
23,853!
581
    taosMemoryFreeClear(pHead);
×
582
    tEncoderClear(&encoder);
×
583
    goto _err;
×
584
  }
585
  tEncoderClear(&encoder);
23,853✔
586

587
  *pContLen = contLen;
23,853✔
588
  taosMemoryFreeClear(req.rsmaParam.qmsg[0]);
23,853✔
589
  taosMemoryFreeClear(req.rsmaParam.qmsg[1]);
23,853✔
590
  taosMemoryFreeClear(req.colCmpr.pColCmpr);
23,853!
591
  return pHead;
23,853✔
592
_err:
×
593
  taosMemoryFreeClear(req.rsmaParam.qmsg[0]);
×
594
  taosMemoryFreeClear(req.rsmaParam.qmsg[1]);
×
595
  taosMemoryFreeClear(req.colCmpr.pColCmpr);
×
596
  return NULL;
×
597
}
598

599
static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
23,012✔
600
  SName        name = {0};
23,012✔
601
  SVDropStbReq req = {0};
23,012✔
602
  int32_t      contLen = 0;
23,012✔
603
  int32_t      ret = 0;
23,012✔
604
  SMsgHead    *pHead = NULL;
23,012✔
605
  SEncoder     encoder = {0};
23,012✔
606

607
  if ((terrno = tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
23,012!
608
    return NULL;
×
609
  }
610

611
  req.name = (char *)tNameGetTableName(&name);
23,012✔
612
  req.suid = pStb->uid;
23,012✔
613

614
  tEncodeSize(tEncodeSVDropStbReq, &req, contLen, ret);
23,012!
615
  if (ret < 0) return NULL;
23,012!
616

617
  contLen += sizeof(SMsgHead);
23,012✔
618
  pHead = taosMemoryMalloc(contLen);
23,012✔
619
  if (pHead == NULL) {
23,012!
620
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
621
    return NULL;
×
622
  }
623

624
  pHead->contLen = htonl(contLen);
23,012✔
625
  pHead->vgId = htonl(pVgroup->vgId);
23,012✔
626

627
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
23,012✔
628

629
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
23,012✔
630
  terrno = tEncodeSVDropStbReq(&encoder, &req);
23,012✔
631
  tEncoderClear(&encoder);
23,012✔
632
  if (terrno != 0) return NULL;
23,012!
633

634
  *pContLen = contLen;
23,012✔
635
  return pHead;
23,012✔
636
}
637

638
int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
8,579✔
639
  int32_t code = 0;
8,579✔
640
  if (pCreate->igExists < 0 || pCreate->igExists > 1) {
8,579!
641
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
642
    TAOS_RETURN(code);
×
643
  }
644

645
  if (pCreate->numOfColumns < TSDB_MIN_COLUMNS || pCreate->numOfTags + pCreate->numOfColumns > TSDB_MAX_COLUMNS) {
8,579!
646
    code = TSDB_CODE_PAR_INVALID_COLUMNS_NUM;
×
647
    TAOS_RETURN(code);
×
648
  }
649

650
  if (pCreate->numOfTags <= 0 || pCreate->numOfTags > TSDB_MAX_TAGS) {
8,579!
651
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
652
    TAOS_RETURN(code);
×
653
  }
654

655
  SField *pField = taosArrayGet(pCreate->pColumns, 0);
8,579✔
656
  if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
8,579!
657
    code = TSDB_CODE_PAR_INVALID_FIRST_COLUMN;
×
658
    TAOS_RETURN(code);
×
659
  }
660

661
  for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
179,047✔
662
    SFieldWithOptions *pField1 = taosArrayGet(pCreate->pColumns, i);
170,468✔
663
    if (pField1->type >= TSDB_DATA_TYPE_MAX) {
170,468!
664
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
665
      TAOS_RETURN(code);
×
666
    }
667
    if (pField1->bytes <= 0) {
170,468!
668
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
669
      TAOS_RETURN(code);
×
670
    }
671
    if (pField1->name[0] == 0) {
170,468!
672
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
673
      TAOS_RETURN(code);
×
674
    }
675
  }
676

677
  for (int32_t i = 0; i < pCreate->numOfTags; ++i) {
60,696✔
678
    SField *pField1 = taosArrayGet(pCreate->pTags, i);
52,117✔
679
    if (pField1->type >= TSDB_DATA_TYPE_MAX) {
52,117!
680
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
681
      TAOS_RETURN(code);
×
682
    }
683
    if (pField1->bytes <= 0) {
52,117!
684
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
685
      TAOS_RETURN(code);
×
686
    }
687
    if (pField1->name[0] == 0) {
52,117!
688
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
689
      TAOS_RETURN(code);
×
690
    }
691
  }
692

693
  TAOS_RETURN(code);
8,579✔
694
}
695

696
static int32_t mndSetCreateStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
×
697
  int32_t  code = 0;
×
698
  SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
×
699
  if (pRedoRaw == NULL) {
×
700
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
701
    if (terrno != 0) code = terrno;
×
702
    TAOS_RETURN(code);
×
703
  }
704
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
×
705
    sdbFreeRaw(pRedoRaw);
×
706
    TAOS_RETURN(code);
×
707
  }
708
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
709

710
  TAOS_RETURN(code);
×
711
}
712

713
static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
8,362✔
714
  int32_t  code = 0;
8,362✔
715
  SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
8,362✔
716
  if (pCommitRaw == NULL) {
8,362!
717
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
718
    if (terrno != 0) code = terrno;
×
719
    TAOS_RETURN(code);
×
720
  }
721
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
8,362!
722
    sdbFreeRaw(pCommitRaw);
×
723
    TAOS_RETURN(code);
×
724
  }
725
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
8,362!
726

727
  TAOS_RETURN(code);
8,362✔
728
}
729

730
static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
8,362✔
731
  int32_t code = 0;
8,362✔
732
  SSdb   *pSdb = pMnode->pSdb;
8,362✔
733
  SVgObj *pVgroup = NULL;
8,362✔
734
  void   *pIter = NULL;
8,362✔
735
  int32_t contLen;
736

737
  while (1) {
73,056✔
738
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
81,418✔
739
    if (pIter == NULL) break;
81,418✔
740
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
73,056✔
741
      sdbRelease(pSdb, pVgroup);
52,234✔
742
      continue;
52,234✔
743
    }
744

745
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, NULL, 0);
20,822✔
746
    if (pReq == NULL) {
20,822!
747
      sdbCancelFetch(pSdb, pIter);
×
748
      sdbRelease(pSdb, pVgroup);
×
749
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
750
      if (terrno != 0) code = terrno;
×
751
      TAOS_RETURN(code);
×
752
    }
753

754
    STransAction action = {0};
20,822✔
755
    action.mTraceId = pTrans->mTraceId;
20,822✔
756
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
20,822✔
757
    action.pCont = pReq;
20,822✔
758
    action.contLen = contLen;
20,822✔
759
    action.msgType = TDMT_VND_CREATE_STB;
20,822✔
760
    action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
20,822✔
761
    action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;
20,822✔
762
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
20,822!
763
      taosMemoryFree(pReq);
×
764
      sdbCancelFetch(pSdb, pIter);
×
765
      sdbRelease(pSdb, pVgroup);
×
766
      TAOS_RETURN(code);
×
767
    }
768
    sdbRelease(pSdb, pVgroup);
20,822✔
769
  }
770

771
  TAOS_RETURN(code);
8,362✔
772
}
773

774
int32_t mndSetForceDropCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SStbObj *pStb) {
2✔
775
  int32_t code = 0;
2✔
776
  SSdb   *pSdb = pMnode->pSdb;
2✔
777
  int32_t contLen;
778

779
  void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, NULL, 0);
2✔
780
  if (pReq == NULL) {
2!
781
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
782
    if (terrno != 0) code = terrno;
×
783
    TAOS_RETURN(code);
×
784
  }
785

786
  STransAction action = {0};
2✔
787
  action.mTraceId = pTrans->mTraceId;
2✔
788
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
2✔
789
  action.pCont = pReq;
2✔
790
  action.contLen = contLen;
2✔
791
  action.msgType = TDMT_VND_CREATE_STB;
2✔
792
  action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
2✔
793
  action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;
2✔
794
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2!
795
    taosMemoryFree(pReq);
×
796
    TAOS_RETURN(code);
×
797
  }
798

799
  TAOS_RETURN(code);
2✔
800
}
801

802
static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
8,362✔
803
  int32_t code = 0;
8,362✔
804
  SSdb   *pSdb = pMnode->pSdb;
8,362✔
805
  SVgObj *pVgroup = NULL;
8,362✔
806
  void   *pIter = NULL;
8,362✔
807

808
  while (1) {
73,056✔
809
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
81,418✔
810
    if (pIter == NULL) break;
81,418✔
811
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
73,056✔
812
      sdbRelease(pSdb, pVgroup);
52,234✔
813
      continue;
52,234✔
814
    }
815

816
    int32_t contLen = 0;
20,822✔
817
    void   *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
20,822✔
818
    if (pReq == NULL) {
20,822!
819
      sdbCancelFetch(pSdb, pIter);
×
820
      sdbRelease(pSdb, pVgroup);
×
821
      code = TSDB_CODE_OUT_OF_MEMORY;
×
822
      TAOS_RETURN(code);
×
823
    }
824

825
    STransAction action = {0};
20,822✔
826
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
20,822✔
827
    action.pCont = pReq;
20,822✔
828
    action.contLen = contLen;
20,822✔
829
    action.msgType = TDMT_VND_DROP_STB;
20,822✔
830
    action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
20,822✔
831
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
20,822!
832
      taosMemoryFree(pReq);
×
833
      sdbCancelFetch(pSdb, pIter);
×
834
      sdbRelease(pSdb, pVgroup);
×
835
      TAOS_RETURN(code);
×
836
    }
837
    sdbRelease(pSdb, pVgroup);
20,822✔
838
  }
839

840
  TAOS_RETURN(code);
8,362✔
841
}
842

843
static SSchema *mndFindStbColumns(const SStbObj *pStb, const char *colName) {
×
844
  for (int32_t col = 0; col < pStb->numOfColumns; ++col) {
×
845
    SSchema *pSchema = &pStb->pColumns[col];
×
846
    if (strncasecmp(pSchema->name, colName, TSDB_COL_NAME_LEN) == 0) {
×
847
      return pSchema;
×
848
    }
849
  }
850
  return NULL;
×
851
}
852

853
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb) {
8,380✔
854
  int32_t code = 0;
8,380✔
855
  memcpy(pDst->name, pCreate->name, TSDB_TABLE_FNAME_LEN);
8,380✔
856
  memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
8,380✔
857
  pDst->createdTime = taosGetTimestampMs();
8,380✔
858
  pDst->updateTime = pDst->createdTime;
8,380✔
859
  pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX)
8,380✔
860
                  ? pCreate->suid
861
                  : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
16,760!
862
  pDst->dbUid = pDb->uid;
8,380✔
863
  pDst->tagVer = 1;
8,380✔
864
  pDst->colVer = 1;
8,380✔
865
  pDst->smaVer = 1;
8,380✔
866
  pDst->nextColId = 1;
8,380✔
867
  pDst->maxdelay[0] = pCreate->delay1;
8,380✔
868
  pDst->maxdelay[1] = pCreate->delay2;
8,380✔
869
  pDst->watermark[0] = pCreate->watermark1;
8,380✔
870
  pDst->watermark[1] = pCreate->watermark2;
8,380✔
871
  pDst->ttl = pCreate->ttl;
8,380✔
872
  pDst->numOfColumns = pCreate->numOfColumns;
8,380✔
873
  pDst->numOfTags = pCreate->numOfTags;
8,380✔
874
  pDst->numOfFuncs = pCreate->numOfFuncs;
8,380✔
875
  pDst->commentLen = pCreate->commentLen;
8,380✔
876
  pDst->pFuncs = pCreate->pFuncs;
8,380✔
877
  pDst->source = pCreate->source;
8,380✔
878
  pCreate->pFuncs = NULL;
8,380✔
879

880
  if (pDst->commentLen > 0) {
8,380✔
881
    pDst->comment = taosMemoryCalloc(pDst->commentLen + 1, 1);
16✔
882
    if (pDst->comment == NULL) {
16!
883
      code = terrno;
×
884
      TAOS_RETURN(code);
×
885
    }
886
    memcpy(pDst->comment, pCreate->pComment, pDst->commentLen + 1);
16✔
887
  }
888

889
  pDst->ast1Len = pCreate->ast1Len;
8,380✔
890
  if (pDst->ast1Len > 0) {
8,380✔
891
    pDst->pAst1 = taosMemoryCalloc(pDst->ast1Len, 1);
3✔
892
    if (pDst->pAst1 == NULL) {
3!
893
      code = terrno;
×
894
      TAOS_RETURN(code);
×
895
    }
896
    memcpy(pDst->pAst1, pCreate->pAst1, pDst->ast1Len);
3✔
897
  }
898

899
  pDst->ast2Len = pCreate->ast2Len;
8,380✔
900
  if (pDst->ast2Len > 0) {
8,380✔
901
    pDst->pAst2 = taosMemoryCalloc(pDst->ast2Len, 1);
3✔
902
    if (pDst->pAst2 == NULL) {
3!
903
      code = terrno;
×
904
      TAOS_RETURN(code);
×
905
    }
906
    memcpy(pDst->pAst2, pCreate->pAst2, pDst->ast2Len);
3✔
907
  }
908

909
  pDst->pColumns = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SSchema));
8,380✔
910
  pDst->pTags = taosMemoryCalloc(1, pDst->numOfTags * sizeof(SSchema));
8,380✔
911
  if (pDst->pColumns == NULL || pDst->pTags == NULL) {
8,380!
912
    code = terrno;
×
913
    TAOS_RETURN(code);
×
914
  }
915

916
  if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) {
8,380!
917
    code = TSDB_CODE_OUT_OF_RANGE;
×
918
    TAOS_RETURN(code);
×
919
  }
920

921
  for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
177,607✔
922
    SFieldWithOptions *pField = taosArrayGet(pCreate->pColumns, i);
169,227✔
923
    SSchema           *pSchema = &pDst->pColumns[i];
169,227✔
924
    pSchema->type = pField->type;
169,227✔
925
    pSchema->bytes = pField->bytes;
169,227✔
926
    pSchema->flags = pField->flags;
169,227✔
927
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
169,227✔
928
    pSchema->colId = pDst->nextColId;
169,227✔
929
    pDst->nextColId++;
169,227✔
930
  }
931

932
  for (int32_t i = 0; i < pDst->numOfTags; ++i) {
59,345✔
933
    SField  *pField = taosArrayGet(pCreate->pTags, i);
50,965✔
934
    SSchema *pSchema = &pDst->pTags[i];
50,965✔
935
    pSchema->type = pField->type;
50,965✔
936
    pSchema->bytes = pField->bytes;
50,965✔
937
    if (i == 0) {
50,965✔
938
      SSCHMEA_SET_IDX_ON(pSchema);
8,380✔
939
    }
940
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
50,965✔
941
    pSchema->colId = pDst->nextColId;
50,965✔
942
    pDst->nextColId++;
50,965✔
943
  }
944
  // set col compress
945
  pDst->pCmpr = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SCmprObj));
8,380✔
946
  for (int32_t i = 0; i < pDst->numOfColumns; i++) {
177,607✔
947
    SFieldWithOptions *pField = taosArrayGet(pCreate->pColumns, i);
169,227✔
948
    SSchema           *pSchema = &pDst->pColumns[i];
169,227✔
949

950
    SColCmpr *pColCmpr = &pDst->pCmpr[i];
169,227✔
951
    pColCmpr->id = pSchema->colId;
169,227✔
952
    pColCmpr->alg = pField->compress;
169,227✔
953
  }
954
  TAOS_RETURN(code);
8,380✔
955
}
956
static int32_t mndGenIdxNameForFirstTag(char *fullname, char *dbname, char *stbname, char *tagname) {
6,952✔
957
  SName name = {0};
6,952✔
958
  if ((terrno = tNameFromString(&name, stbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
6,952!
959
    return -1;
×
960
  }
961
  return snprintf(fullname, TSDB_INDEX_FNAME_LEN, "%s.%s_%s", dbname, tagname, tNameGetTableName(&name));
6,952✔
962
}
963

964
static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
6,952✔
965
  SStbObj stbObj = {0};
6,952✔
966
  int32_t code = -1;
6,952✔
967

968
  char fullIdxName[TSDB_INDEX_FNAME_LEN * 2] = {0};
6,952✔
969

970
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stb");
6,952✔
971
  if (pTrans == NULL) {
6,952!
972
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
973
    if (terrno != 0) code = terrno;
×
974
    goto _OVER;
×
975
  }
976

977
  mInfo("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
6,952!
978
  TAOS_CHECK_GOTO(mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb), NULL, _OVER);
6,952!
979

980
  SSchema *pSchema = &(stbObj.pTags[0]);
6,952✔
981
  if (mndGenIdxNameForFirstTag(fullIdxName, pDb->name, stbObj.name, pSchema->name) < 0) {
6,952!
982
    goto _OVER;
×
983
  }
984
  SSIdx idx = {0};
6,952✔
985
  if (mndAcquireGlobalIdx(pMnode, fullIdxName, SDB_IDX, &idx) == 0 && idx.pIdx != NULL) {
6,952!
986
    code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
×
987
    mndReleaseIdx(pMnode, idx.pIdx);
×
988
    goto _OVER;
×
989
  }
990

991
  SIdxObj idxObj = {0};
6,952✔
992
  memcpy(idxObj.name, fullIdxName, TSDB_INDEX_FNAME_LEN);
6,952✔
993
  memcpy(idxObj.stb, stbObj.name, TSDB_TABLE_FNAME_LEN);
6,952✔
994
  memcpy(idxObj.db, stbObj.db, TSDB_DB_FNAME_LEN);
6,952✔
995
  memcpy(idxObj.colName, pSchema->name, TSDB_COL_NAME_LEN);
6,952✔
996
  idxObj.createdTime = taosGetTimestampMs();
6,952✔
997
  idxObj.uid = mndGenerateUid(fullIdxName, strlen(fullIdxName));
6,952✔
998
  idxObj.stbUid = stbObj.uid;
6,952✔
999
  idxObj.dbUid = stbObj.dbUid;
6,952✔
1000

1001
  TAOS_CHECK_GOTO(mndSetCreateIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
6,952!
1002

1003
  TAOS_CHECK_GOTO(mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj), NULL, _OVER);
6,952✔
1004
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
6,934✔
1005
  code = 0;
6,932✔
1006

1007
_OVER:
6,952✔
1008
  mndTransDrop(pTrans);
6,952✔
1009
  if (mndStbActionDelete(pMnode->pSdb, &stbObj) != 0) mError("failed to mndStbActionDelete");
6,952!
1010
  TAOS_RETURN(code);
6,952✔
1011
}
1012

1013
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
8,380✔
1014
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
8,380✔
1015
  TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
8,380✔
1016
  TAOS_CHECK_RETURN(mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
8,362!
1017
  TAOS_CHECK_RETURN(mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
8,362!
1018
  TAOS_CHECK_RETURN(mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
8,362!
1019
  return 0;
8,362✔
1020
}
1021

1022
static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
5,527✔
1023
  SMnode           *pMnode = pReq->info.node;
5,527✔
1024
  SSdb             *pSdb = pMnode->pSdb;
5,527✔
1025
  SVgObj           *pVgroup = NULL;
5,527✔
1026
  void             *pIter = NULL;
5,527✔
1027
  SVDropTtlTableReq ttlReq = {
5,527✔
1028
      .timestampSec = taosGetTimestampSec(), .ttlDropMaxCount = tsTtlBatchDropNum, .nUids = 0, .pTbUids = NULL};
5,527✔
1029
  int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
5,527✔
1030
  int32_t contLen = reqLen + sizeof(SMsgHead);
5,527✔
1031

1032
  mDebug("start to process ttl timer");
5,527✔
1033

1034
  while (1) {
97,436✔
1035
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
102,963✔
1036
    if (pIter == NULL) break;
102,963✔
1037

1038
    int32_t   code = 0;
97,436✔
1039
    SMsgHead *pHead = rpcMallocCont(contLen);
97,436✔
1040
    if (pHead == NULL) {
97,436!
1041
      sdbRelease(pSdb, pVgroup);
×
1042
      continue;
×
1043
    }
1044
    pHead->contLen = htonl(contLen);
97,436✔
1045
    pHead->vgId = htonl(pVgroup->vgId);
97,436✔
1046
    if ((code = tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), reqLen, &ttlReq)) < 0) {
97,436!
1047
      mError("vgId:%d, failed to serialize drop ttl table request since %s", pVgroup->vgId, tstrerror(code));
×
1048
      sdbRelease(pSdb, pVgroup);
×
1049
      continue;
×
1050
    }
1051

1052
    SRpcMsg rpcMsg = {
97,436✔
1053
        .msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
1054
    SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
97,436✔
1055
    code = tmsgSendReq(&epSet, &rpcMsg);
97,436✔
1056
    if (code != 0) {
97,436✔
1057
      mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
49!
1058
    } else {
1059
      mDebug("vgId:%d, send drop ttl table request to vnode, time:%" PRId32, pVgroup->vgId, ttlReq.timestampSec);
97,387✔
1060
    }
1061
    sdbRelease(pSdb, pVgroup);
97,436✔
1062
  }
1063

1064
  return 0;
5,527✔
1065
}
1066

1067
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq) {
4✔
1068
  SMnode     *pMnode = pReq->info.node;
4✔
1069
  SSdb       *pSdb = pMnode->pSdb;
4✔
1070
  SVgObj     *pVgroup = NULL;
4✔
1071
  void       *pIter = NULL;
4✔
1072
  SVTrimDbReq trimReq = {.timestamp = taosGetTimestampSec()};
4✔
1073
  int32_t     reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq);
4✔
1074
  int32_t     contLen = reqLen + sizeof(SMsgHead);
4✔
1075

1076
  while (1) {
208✔
1077
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
212✔
1078
    if (pIter == NULL) break;
212✔
1079

1080
    int32_t code = 0;
208✔
1081

1082
    SMsgHead *pHead = rpcMallocCont(contLen);
208✔
1083
    if (pHead == NULL) {
208!
1084
      sdbCancelFetch(pSdb, pVgroup);
×
1085
      sdbRelease(pSdb, pVgroup);
×
1086
      continue;
×
1087
    }
1088
    pHead->contLen = htonl(contLen);
208✔
1089
    pHead->vgId = htonl(pVgroup->vgId);
208✔
1090
    if ((code = tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), reqLen, &trimReq)) < 0) {
208!
1091
      mError("vgId:%d, failed to serialize trim db request since %s", pVgroup->vgId, tstrerror(code));
×
1092
    }
1093

1094
    SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen};
208✔
1095
    SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
208✔
1096
    code = tmsgSendReq(&epSet, &rpcMsg);
208✔
1097
    if (code != 0) {
208!
1098
      mError("vgId:%d, timer failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code);
×
1099
    } else {
1100
      mInfo("vgId:%d, timer send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp);
208!
1101
    }
1102
    sdbRelease(pSdb, pVgroup);
208✔
1103
  }
1104

1105
  return 0;
4✔
1106
}
1107

1108
static int32_t mndProcessS3MigrateDbTimer(SRpcMsg *pReq) {
×
1109
  SMnode          *pMnode = pReq->info.node;
×
1110
  SSdb            *pSdb = pMnode->pSdb;
×
1111
  SVgObj          *pVgroup = NULL;
×
1112
  void            *pIter = NULL;
×
1113
  SVS3MigrateDbReq s3migrateReq = {.timestamp = taosGetTimestampSec()};
×
1114
  int32_t          reqLen = tSerializeSVS3MigrateDbReq(NULL, 0, &s3migrateReq);
×
1115
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
1116

1117
  while (1) {
×
1118
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
1119
    if (pIter == NULL) break;
×
1120

1121
    int32_t code = 0;
×
1122

1123
    SMsgHead *pHead = rpcMallocCont(contLen);
×
1124
    if (pHead == NULL) {
×
1125
      sdbRelease(pSdb, pVgroup);
×
1126
      continue;
×
1127
    }
1128
    pHead->contLen = htonl(contLen);
×
1129
    pHead->vgId = htonl(pVgroup->vgId);
×
1130
    if ((code = tSerializeSVS3MigrateDbReq((char *)pHead + sizeof(SMsgHead), reqLen, &s3migrateReq)) < 0) {
×
1131
      mError("vgId:%d, failed to serialize s3migrate db request since %s", pVgroup->vgId, tstrerror(code));
×
1132
      sdbRelease(pSdb, pVgroup);
×
1133
      continue;
×
1134
    }
1135

1136
    SRpcMsg rpcMsg = {.msgType = TDMT_VND_S3MIGRATE, .pCont = pHead, .contLen = contLen};
×
1137
    SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
1138
    code = tmsgSendReq(&epSet, &rpcMsg);
×
1139
    if (code != 0) {
×
1140
      mError("vgId:%d, timer failed to send vnode-s3migrate request to vnode since 0x%x", pVgroup->vgId, code);
×
1141
    } else {
1142
      mInfo("vgId:%d, timer send vnode-s3migrate request to vnode, time:%d", pVgroup->vgId, s3migrateReq.timestamp);
×
1143
    }
1144
    sdbRelease(pSdb, pVgroup);
×
1145
  }
1146

1147
  return 0;
×
1148
}
1149

1150
static int32_t mndFindSuperTableTagIndex(const SStbObj *pStb, const char *tagName) {
1,245✔
1151
  for (int32_t tag = 0; tag < pStb->numOfTags; tag++) {
5,744✔
1152
    if (strcmp(pStb->pTags[tag].name, tagName) == 0) {
5,265✔
1153
      return tag;
766✔
1154
    }
1155
  }
1156

1157
  return -1;
479✔
1158
}
1159

1160
static int32_t mndFindSuperTableColumnIndex(const SStbObj *pStb, const char *colName) {
1,291✔
1161
  for (int32_t col = 0; col < pStb->numOfColumns; col++) {
13,016✔
1162
    if (strcmp(pStb->pColumns[col].name, colName) == 0) {
12,515✔
1163
      return col;
790✔
1164
    }
1165
  }
1166

1167
  return -1;
501✔
1168
}
1169

1170
static bool mndValidateSchema(SSchema *pSchemas, int32_t nSchema, SArray *pFields, int32_t maxLen) {
376✔
1171
  int32_t rowLen = 0;
376✔
1172
  for (int32_t i = 0; i < nSchema; ++i) {
5,592✔
1173
    rowLen += (pSchemas + i)->bytes;
5,216✔
1174
  }
1175

1176
  int32_t nField = taosArrayGetSize(pFields);
376✔
1177
  for (int32_t i = 0; i < nField; ++i) {
752✔
1178
    rowLen += ((SField *)TARRAY_GET_ELEM(pFields, i))->bytes;
376✔
1179
  }
1180

1181
  return rowLen <= maxLen;
376✔
1182
}
1183

1184
static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq *createReq) {
93✔
1185
  int32_t code = 0;
93✔
1186
  taosRLockLatch(&pStb->lock);
93✔
1187
  memcpy(pDst, pStb, sizeof(SStbObj));
93✔
1188
  taosRUnLockLatch(&pStb->lock);
93✔
1189

1190
  pDst->source = createReq->source;
93✔
1191
  pDst->updateTime = taosGetTimestampMs();
93✔
1192
  pDst->numOfColumns = createReq->numOfColumns;
93✔
1193
  pDst->numOfTags = createReq->numOfTags;
93✔
1194
  pDst->pColumns = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SSchema));
93✔
1195
  pDst->pTags = taosMemoryCalloc(1, pDst->numOfTags * sizeof(SSchema));
93✔
1196
  pDst->pCmpr = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SColCmpr));
93✔
1197

1198
  if (pDst->pColumns == NULL || pDst->pTags == NULL || pDst->pCmpr == NULL) {
93!
1199
    code = terrno;
×
1200
    TAOS_RETURN(code);
×
1201
  }
1202

1203
  if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) {
93!
1204
    code = TSDB_CODE_OUT_OF_RANGE;
×
1205
    TAOS_RETURN(code);
×
1206
  }
1207

1208
  for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
674✔
1209
    SFieldWithOptions *pField = taosArrayGet(createReq->pColumns, i);
581✔
1210
    SSchema           *pSchema = &pDst->pColumns[i];
581✔
1211
    pSchema->type = pField->type;
581✔
1212
    pSchema->bytes = pField->bytes;
581✔
1213
    pSchema->flags = pField->flags;
581✔
1214
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
581✔
1215
    int32_t cIndex = mndFindSuperTableColumnIndex(pStb, pField->name);
581✔
1216
    if (cIndex >= 0) {
581✔
1217
      pSchema->colId = pStb->pColumns[cIndex].colId;
514✔
1218
    } else {
1219
      pSchema->colId = pDst->nextColId++;
67✔
1220
    }
1221
  }
1222

1223
  for (int32_t i = 0; i < pDst->numOfTags; ++i) {
614✔
1224
    SField  *pField = taosArrayGet(createReq->pTags, i);
521✔
1225
    SSchema *pSchema = &pDst->pTags[i];
521✔
1226
    pSchema->type = pField->type;
521✔
1227
    pSchema->bytes = pField->bytes;
521✔
1228
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
521✔
1229
    int32_t cIndex = mndFindSuperTableTagIndex(pStb, pField->name);
521✔
1230
    if (cIndex >= 0) {
521✔
1231
      pSchema->colId = pStb->pTags[cIndex].colId;
466✔
1232
    } else {
1233
      pSchema->colId = pDst->nextColId++;
55✔
1234
    }
1235
  }
1236
  for (int32_t i = 0; i < pDst->numOfColumns; i++) {
674✔
1237
    SColCmpr          *p = pDst->pCmpr + i;
581✔
1238
    SFieldWithOptions *pField = taosArrayGet(createReq->pColumns, i);
581✔
1239
    SSchema           *pSchema = &pDst->pColumns[i];
581✔
1240
    p->id = pSchema->colId;
581✔
1241
    if (pField->compress == 0) {
581!
1242
      p->alg = createDefaultColCmprByType(pSchema->type);
×
1243
    } else {
1244
      p->alg = pField->compress;
581✔
1245
    }
1246
  }
1247
  pDst->tagVer = createReq->tagVer;
93✔
1248
  pDst->colVer = createReq->colVer;
93✔
1249
  return TSDB_CODE_SUCCESS;
93✔
1250
}
1251

1252
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
7,151✔
1253
  SMnode        *pMnode = pReq->info.node;
7,151✔
1254
  int32_t        code = -1;
7,151✔
1255
  SStbObj       *pStb = NULL;
7,151✔
1256
  SDbObj        *pDb = NULL;
7,151✔
1257
  SMCreateStbReq createReq = {0};
7,151✔
1258
  bool           isAlter = false;
7,151✔
1259

1260
  if (tDeserializeSMCreateStbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
7,151!
1261
    code = TSDB_CODE_INVALID_MSG;
×
1262
    goto _OVER;
×
1263
  }
1264

1265
  mInfo("stb:%s, start to create", createReq.name);
7,151!
1266
  if (mndCheckCreateStbReq(&createReq) != 0) {
7,151!
1267
    code = TSDB_CODE_INVALID_MSG;
×
1268
    goto _OVER;
×
1269
  }
1270

1271
  pStb = mndAcquireStb(pMnode, createReq.name);
7,151✔
1272
  if (pStb != NULL) {
7,151✔
1273
    if (createReq.igExists) {
197✔
1274
      if (createReq.source == TD_REQ_FROM_APP) {
195✔
1275
        mInfo("stb:%s, already exist, ignore exist is set", createReq.name);
11!
1276
        code = 0;
11✔
1277
        goto _OVER;
11✔
1278
      } else if (pStb->uid != createReq.suid) {
184!
1279
        mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
×
1280
        code = 0;
×
1281
        goto _OVER;
×
1282
      } else if (createReq.tagVer > 0 || createReq.colVer > 0) {
277!
1283
        int32_t tagDelta = createReq.tagVer - pStb->tagVer;
184✔
1284
        int32_t colDelta = createReq.colVer - pStb->colVer;
184✔
1285
        mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d",
184!
1286
              createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
1287
        if (tagDelta <= 0 && colDelta <= 0) {
184✔
1288
          mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name);
91!
1289
          code = 0;
91✔
1290
          goto _OVER;
91✔
1291
        } else if ((tagDelta == 1 && colDelta == 0) || (tagDelta == 0 && colDelta == 1) ||
93!
1292
                   (pStb->colVer == 1 && createReq.colVer > 1) || (pStb->tagVer == 1 && createReq.tagVer > 1)) {
×
1293
          isAlter = true;
93✔
1294
          mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
93!
1295
        } else {
1296
          mError("stb:%s, schema version increase more than 1 number, error is returned", createReq.name);
×
1297
          code = TSDB_CODE_MND_INVALID_SCHEMA_VER;
×
1298
          goto _OVER;
×
1299
        }
1300
      } else {
1301
        mError("stb:%s, already exist while create, input tagVer:%d colVer:%d is invalid, origin tagVer:%d colVer:%d",
×
1302
               createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
1303
        code = TSDB_CODE_MND_INVALID_SCHEMA_VER;
×
1304
        goto _OVER;
×
1305
      }
1306
    } else {
1307
      code = TSDB_CODE_MND_STB_ALREADY_EXIST;
2✔
1308
      goto _OVER;
2✔
1309
    }
1310
  } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
6,954!
1311
    goto _OVER;
×
1312
  } else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) &&
6,954!
1313
             (createReq.tagVer != 1 || createReq.colVer != 1)) {
49!
1314
    mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
2!
1315
    code = 0;
2✔
1316
    goto _OVER;
2✔
1317
  }
1318

1319
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
7,045✔
1320
  if (pDb == NULL) {
7,045!
1321
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1322
    goto _OVER;
×
1323
  }
1324

1325
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
7,045!
1326
    goto _OVER;
×
1327
  }
1328

1329
  int32_t numOfStbs = -1;
7,045✔
1330
  if ((code = mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs)) != 0) {
7,045!
1331
    goto _OVER;
×
1332
  }
1333

1334
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
7,045!
1335
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
1336
    goto _OVER;
×
1337
  }
1338

1339
  if ((code = grantCheck(TSDB_GRANT_STABLE)) < 0) {
7,045!
1340
    goto _OVER;
×
1341
  }
1342

1343
  if (isAlter) {
7,045✔
1344
    bool    needRsp = false;
93✔
1345
    SStbObj pDst = {0};
93✔
1346
    if ((code = mndBuildStbFromAlter(pStb, &pDst, &createReq)) != 0) {
93!
1347
      taosMemoryFreeClear(pDst.pTags);
×
1348
      taosMemoryFreeClear(pDst.pColumns);
×
1349
      taosMemoryFreeClear(pDst.pCmpr);
×
1350
      goto _OVER;
×
1351
    }
1352

1353
    code = mndAlterStbImp(pMnode, pReq, pDb, &pDst, needRsp, NULL, 0);
93✔
1354
    taosMemoryFreeClear(pDst.pTags);
93!
1355
    taosMemoryFreeClear(pDst.pColumns);
93!
1356
    taosMemoryFreeClear(pDst.pCmpr);
93!
1357
  } else {
1358
    code = mndCreateStb(pMnode, pReq, &createReq, pDb);
6,952✔
1359
  }
1360
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
7,045✔
1361

1362
  SName name = {0};
7,045✔
1363
  TAOS_CHECK_RETURN(tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
7,045!
1364

1365
  if (createReq.sql == NULL && createReq.sqlLen == 0) {
8,009!
1366
    char detail[1000] = {0};
964✔
1367

1368
    sprintf(detail, "dbname:%s, stable name:%s", name.dbname, name.tname);
964✔
1369

1370
    auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, detail, strlen(detail));
964✔
1371
  } else {
1372
    auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, createReq.sql, createReq.sqlLen);
6,081✔
1373
  }
1374
_OVER:
7,151✔
1375
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
7,151✔
1376
    mError("stb:%s, failed to create since %s", createReq.name, tstrerror(code));
22!
1377
  }
1378

1379
  mndReleaseStb(pMnode, pStb);
7,151✔
1380
  mndReleaseDb(pMnode, pDb);
7,151✔
1381
  tFreeSMCreateStbReq(&createReq);
7,151✔
1382

1383
  TAOS_RETURN(code);
7,151✔
1384
}
1385

1386
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
956✔
1387
  int32_t code = 0;
956✔
1388
  if (pAlter->commentLen >= 0) return 0;
956!
1389
  if (pAlter->ttl != 0) return 0;
×
1390

1391
  if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
×
1392
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1393
    TAOS_RETURN(code);
×
1394
  }
1395

1396
  for (int32_t i = 0; i < pAlter->numOfFields; ++i) {
×
1397
    SField *pField = taosArrayGet(pAlter->pFields, i);
×
1398
    if (pField->name[0] == 0) {
×
1399
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1400
      TAOS_RETURN(code);
×
1401
    }
1402
  }
1403

1404
  TAOS_RETURN(code);
×
1405
}
1406

1407
int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
2,344✔
1408
  pNew->pTags = taosMemoryCalloc(pNew->numOfTags, sizeof(SSchema));
2,344✔
1409
  pNew->pColumns = taosMemoryCalloc(pNew->numOfColumns, sizeof(SSchema));
2,344✔
1410
  pNew->pCmpr = taosMemoryCalloc(pNew->numOfColumns, sizeof(SColCmpr));
2,344✔
1411
  if (pNew->pTags == NULL || pNew->pColumns == NULL || pNew->pCmpr == NULL) {
2,344!
1412
    TAOS_RETURN(terrno);
×
1413
  }
1414

1415
  memcpy(pNew->pColumns, pOld->pColumns, sizeof(SSchema) * pOld->numOfColumns);
2,344✔
1416
  memcpy(pNew->pTags, pOld->pTags, sizeof(SSchema) * pOld->numOfTags);
2,344✔
1417
  memcpy(pNew->pCmpr, pOld->pCmpr, sizeof(SColCmpr) * pOld->numOfColumns);
2,344✔
1418

1419
  TAOS_RETURN(0);
2,344✔
1420
}
1421

1422
static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen,
21✔
1423
                                         int32_t ttl) {
1424
  int32_t code = 0;
21✔
1425
  if (commentLen > 0) {
21✔
1426
    pNew->commentLen = commentLen;
16✔
1427
    pNew->comment = taosMemoryCalloc(1, commentLen + 1);
16✔
1428
    if (pNew->comment == NULL) {
16!
1429
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1430
      return -1;
×
1431
    }
1432
    memcpy(pNew->comment, pComment, commentLen + 1);
16✔
1433
  } else if (commentLen == 0) {
5!
1434
    pNew->commentLen = 0;
5✔
1435
  } else {
1436
  }
1437

1438
  if (ttl >= 0) {
21!
1439
    pNew->ttl = ttl;
21✔
1440
  }
1441

1442
  if ((code = mndAllocStbSchemas(pOld, pNew)) != 0) {
21!
1443
    TAOS_RETURN(code);
×
1444
  }
1445
  TAOS_RETURN(code);
21✔
1446
}
1447

1448
static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ntags) {
201✔
1449
  int32_t code = 0;
201✔
1450
  if (pOld->numOfTags + ntags > TSDB_MAX_TAGS) {
201!
1451
    code = TSDB_CODE_MND_TOO_MANY_TAGS;
×
1452
    TAOS_RETURN(code);
×
1453
  }
1454

1455
  if (pOld->numOfColumns + ntags + pOld->numOfTags > TSDB_MAX_COLUMNS) {
201!
1456
    code = TSDB_CODE_MND_TOO_MANY_COLUMNS;
×
1457
    TAOS_RETURN(code);
×
1458
  }
1459

1460
  if (!mndValidateSchema(pOld->pTags, pOld->numOfTags, pFields, TSDB_MAX_TAGS_LEN)) {
201!
1461
    code = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
×
1462
    TAOS_RETURN(code);
×
1463
  }
1464

1465
  pNew->numOfTags = pNew->numOfTags + ntags;
201✔
1466
  if ((code = mndAllocStbSchemas(pOld, pNew)) != 0) {
201!
1467
    TAOS_RETURN(code);
×
1468
  }
1469

1470
  if (pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags) {
201!
1471
    code = TSDB_CODE_OUT_OF_RANGE;
×
1472
    TAOS_RETURN(code);
×
1473
  }
1474

1475
  for (int32_t i = 0; i < ntags; i++) {
390✔
1476
    SField *pField = taosArrayGet(pFields, i);
201✔
1477
    if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
201✔
1478
      code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
5✔
1479
      TAOS_RETURN(code);
5✔
1480
    }
1481

1482
    if (mndFindSuperTableTagIndex(pOld, pField->name) >= 0) {
196✔
1483
      code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
7✔
1484
      TAOS_RETURN(code);
7✔
1485
    }
1486

1487
    SSchema *pSchema = &pNew->pTags[pOld->numOfTags + i];
189✔
1488
    pSchema->bytes = pField->bytes;
189✔
1489
    pSchema->type = pField->type;
189✔
1490
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
189✔
1491
    pSchema->colId = pNew->nextColId;
189✔
1492
    pNew->nextColId++;
189✔
1493

1494
    mInfo("stb:%s, start to add tag %s", pNew->name, pSchema->name);
189!
1495
  }
1496

1497
  pNew->tagVer++;
189✔
1498
  TAOS_RETURN(code);
189✔
1499
}
1500

1501
static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
2,516✔
1502
  int32_t code = 0;
2,516✔
1503
  SSdb   *pSdb = pMnode->pSdb;
2,516✔
1504
  void   *pIter = NULL;
2,516✔
1505
  while (1) {
1,017✔
1506
    SMqTopicObj *pTopic = NULL;
3,533✔
1507
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
3,533✔
1508
    if (pIter == NULL) break;
3,533✔
1509

1510
    mInfo("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
1,095!
1511
          pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql);
1512
    if (pTopic->ast == NULL) {
1,095✔
1513
      sdbRelease(pSdb, pTopic);
103✔
1514
      continue;
103✔
1515
    }
1516

1517
    SNode *pAst = NULL;
992✔
1518
    if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
992!
1519
      code = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
×
1520
      mError("topic:%s, create ast error", pTopic->name);
×
1521
      sdbRelease(pSdb, pTopic);
×
1522
      sdbCancelFetch(pSdb, pIter);
×
1523
      TAOS_RETURN(code);
78✔
1524
    }
1525

1526
    SNodeList *pNodeList = NULL;
992✔
1527
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
992!
1528
        0) {
1529
      sdbRelease(pSdb, pTopic);
×
1530
      sdbCancelFetch(pSdb, pIter);
×
1531
      TAOS_RETURN(code);
×
1532
    }
1533
    SNode *pNode = NULL;
992✔
1534
    FOREACH(pNode, pNodeList) {
1,633✔
1535
      SColumnNode *pCol = (SColumnNode *)pNode;
1,556✔
1536
      mInfo("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId, pCol->tableId,
1,556!
1537
            pTopic->ctbStbUid);
1538

1539
      if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
1,556✔
1540
        mInfo("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
837!
1541
        goto NEXT;
837✔
1542
      }
1543
      if (pCol->colId > 0 && pCol->colId == colId) {
719!
1544
        code = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
78✔
1545
        mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
78!
1546
        nodesDestroyNode(pAst);
78✔
1547
        nodesDestroyList(pNodeList);
78✔
1548
        sdbCancelFetch(pSdb, pIter);
78✔
1549
        sdbRelease(pSdb, pTopic);
78✔
1550
        TAOS_RETURN(code);
78✔
1551
      }
1552
      mInfo("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
641!
1553
    }
1554

1555
  NEXT:
77✔
1556
    sdbRelease(pSdb, pTopic);
914✔
1557
    nodesDestroyNode(pAst);
914✔
1558
    nodesDestroyList(pNodeList);
914✔
1559
  }
1560
  TAOS_RETURN(code);
2,438✔
1561
}
1562

1563
static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
2,438✔
1564
  int32_t code = 0;
2,438✔
1565
  SSdb   *pSdb = pMnode->pSdb;
2,438✔
1566
  void   *pIter = NULL;
2,438✔
1567
  while (1) {
5,849✔
1568
    SStreamObj *pStream = NULL;
8,287✔
1569
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
8,287✔
1570
    if (pIter == NULL) break;
8,287✔
1571

1572
    SNode *pAst = NULL;
5,865✔
1573
    if (nodesStringToNode(pStream->ast, &pAst) != 0) {
5,865!
1574
      code = TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
1575
      mError("stream:%s, create ast error", pStream->name);
×
1576
      sdbRelease(pSdb, pStream);
×
1577
      sdbCancelFetch(pSdb, pIter);
×
1578
      TAOS_RETURN(code);
16✔
1579
    }
1580

1581
    SNodeList *pNodeList = NULL;
5,865✔
1582
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
5,865!
1583
        0) {
1584
      sdbRelease(pSdb, pStream);
×
1585
      sdbCancelFetch(pSdb, pIter);
×
1586
      TAOS_RETURN(code);
×
1587
    }
1588

1589
    SNode *pNode = NULL;
5,865✔
1590
    FOREACH(pNode, pNodeList) {
5,917!
1591
      SColumnNode *pCol = (SColumnNode *)pNode;
5,913✔
1592

1593
      if (pCol->tableId != suid) {
5,913✔
1594
        mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
5,845!
1595
        goto NEXT;
5,845✔
1596
      }
1597
      if (pCol->colId > 0 && pCol->colId == colId) {
68!
1598
        code = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
16✔
1599
        mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId);
16!
1600
        nodesDestroyNode(pAst);
16✔
1601
        nodesDestroyList(pNodeList);
16✔
1602
        sdbRelease(pSdb, pStream);
16✔
1603
        sdbCancelFetch(pSdb, pIter);
16✔
1604
        TAOS_RETURN(code);
16✔
1605
      }
1606
      mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
52!
1607
    }
1608

1609
  NEXT:
4✔
1610
    sdbRelease(pSdb, pStream);
5,849✔
1611
    nodesDestroyNode(pAst);
5,849✔
1612
    nodesDestroyList(pNodeList);
5,849✔
1613
  }
1614
  TAOS_RETURN(code);
2,422✔
1615
}
1616

1617
static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
2,422✔
1618
  int32_t code = 0;
2,422✔
1619
  SSdb   *pSdb = pMnode->pSdb;
2,422✔
1620
  void   *pIter = NULL;
2,422✔
1621
  while (1) {
8✔
1622
    SSmaObj *pSma = NULL;
2,430✔
1623
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
2,430✔
1624
    if (pIter == NULL) break;
2,430✔
1625

1626
    mInfo("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbFullName,
8!
1627
          suid, colId, pSma->sql);
1628

1629
    SNode *pAst = NULL;
8✔
1630
    if (nodesStringToNode(pSma->ast, &pAst) != 0) {
8!
1631
      code = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
×
1632
      mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
×
1633
             pSma->name, stbFullName, suid, colId);
1634
      sdbCancelFetch(pSdb, pIter);
×
1635
      TAOS_RETURN(code);
×
1636
    }
1637

1638
    SNodeList *pNodeList = NULL;
8✔
1639
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
8!
1640
        0) {
1641
      sdbCancelFetch(pSdb, pIter);
×
1642
      TAOS_RETURN(code);
×
1643
    }
1644
    SNode *pNode = NULL;
8✔
1645
    FOREACH(pNode, pNodeList) {
44!
1646
      SColumnNode *pCol = (SColumnNode *)pNode;
40✔
1647
      mInfo("tsma:%s, check colId:%d tableId:%" PRId64, pSma->name, pCol->colId, pCol->tableId);
40!
1648

1649
      if ((pCol->tableId != suid) && (pSma->stbUid != suid)) {
40!
1650
        mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
4!
1651
        goto NEXT;
4✔
1652
      }
1653
      if ((pCol->colId) > 0 && (pCol->colId == colId)) {
36!
1654
        code = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA;
×
1655
        mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId);
×
1656
        nodesDestroyNode(pAst);
×
1657
        nodesDestroyList(pNodeList);
×
1658
        sdbRelease(pSdb, pSma);
×
1659
        sdbCancelFetch(pSdb, pIter);
×
1660
        TAOS_RETURN(code);
×
1661
      }
1662
      mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
36!
1663
    }
1664

1665
  NEXT:
4✔
1666
    sdbRelease(pSdb, pSma);
8✔
1667
    nodesDestroyNode(pAst);
8✔
1668
    nodesDestroyList(pNodeList);
8✔
1669
  }
1670
  TAOS_RETURN(code);
2,422✔
1671
}
1672

1673
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
2,516✔
1674
  TAOS_CHECK_RETURN(mndCheckAlterColForTopic(pMnode, stbFullName, suid, colId));
2,516✔
1675
  TAOS_CHECK_RETURN(mndCheckAlterColForStream(pMnode, stbFullName, suid, colId));
2,438✔
1676
  TAOS_CHECK_RETURN(mndCheckAlterColForTSma(pMnode, stbFullName, suid, colId));
2,422!
1677
  TAOS_RETURN(0);
2,422✔
1678
}
1679

1680
static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
151✔
1681
  int32_t code = 0;
151✔
1682
  int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
151✔
1683
  if (tag < 0) {
151✔
1684
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
3✔
1685
    TAOS_RETURN(code);
3✔
1686
  }
1687

1688
  col_id_t colId = pOld->pTags[tag].colId;
148✔
1689
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
148✔
1690

1691
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
122!
1692

1693
  memmove(pNew->pTags + tag, pNew->pTags + tag + 1, sizeof(SSchema) * (pNew->numOfTags - tag - 1));
122✔
1694
  pNew->numOfTags--;
122✔
1695

1696
  pNew->tagVer++;
122✔
1697

1698
  // if (mndDropIndexByTag(pMnode, pOld, tagName) != 0) {
1699
  //   return -1;
1700
  // }
1701
  mInfo("stb:%s, start to drop tag %s", pNew->name, tagName);
122!
1702
  TAOS_RETURN(code);
122✔
1703
}
1704

1705
static int32_t mndAlterStbTagName(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
100✔
1706
  int32_t code = 0;
100✔
1707
  if ((int32_t)taosArrayGetSize(pFields) != 2) {
100!
1708
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1709
    TAOS_RETURN(code);
×
1710
  }
1711

1712
  SField *pField0 = taosArrayGet(pFields, 0);
100✔
1713
  SField *pField1 = taosArrayGet(pFields, 1);
100✔
1714

1715
  const char *oldTagName = pField0->name;
100✔
1716
  const char *newTagName = pField1->name;
100✔
1717

1718
  int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName);
100✔
1719
  if (tag < 0) {
100✔
1720
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
7✔
1721
    TAOS_RETURN(code);
7✔
1722
  }
1723

1724
  col_id_t colId = pOld->pTags[tag].colId;
93✔
1725
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
93✔
1726

1727
  if (mndFindSuperTableTagIndex(pOld, newTagName) >= 0) {
72✔
1728
    code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
9✔
1729
    TAOS_RETURN(code);
9✔
1730
  }
1731

1732
  if (mndFindSuperTableColumnIndex(pOld, newTagName) >= 0) {
63!
1733
    code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
×
1734
    TAOS_RETURN(code);
×
1735
  }
1736

1737
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
63!
1738

1739
  SSchema *pSchema = (SSchema *)(pNew->pTags + tag);
63✔
1740
  memcpy(pSchema->name, newTagName, TSDB_COL_NAME_LEN);
63✔
1741

1742
  pNew->tagVer++;
63✔
1743
  mInfo("stb:%s, start to modify tag %s to %s", pNew->name, oldTagName, newTagName);
63!
1744
  TAOS_RETURN(code);
63✔
1745
}
1746

1747
static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
37✔
1748
  int32_t code = 0;
37✔
1749
  int32_t tag = mndFindSuperTableTagIndex(pOld, pField->name);
37✔
1750
  if (tag < 0) {
37!
1751
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
×
1752
    TAOS_RETURN(code);
×
1753
  }
1754

1755
  col_id_t colId = pOld->pTags[tag].colId;
37✔
1756
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
37✔
1757

1758
  uint32_t nLen = 0;
28✔
1759
  for (int32_t i = 0; i < pOld->numOfTags; ++i) {
128✔
1760
    nLen += (pOld->pTags[i].colId == colId) ? pField->bytes : pOld->pTags[i].bytes;
100✔
1761
  }
1762

1763
  if (nLen > TSDB_MAX_TAGS_LEN) {
28!
1764
    code = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
×
1765
    TAOS_RETURN(code);
×
1766
  }
1767

1768
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
28!
1769

1770
  SSchema *pTag = pNew->pTags + tag;
28✔
1771

1772
  if (!(pTag->type == TSDB_DATA_TYPE_BINARY || pTag->type == TSDB_DATA_TYPE_VARBINARY ||
28!
1773
        pTag->type == TSDB_DATA_TYPE_NCHAR || pTag->type == TSDB_DATA_TYPE_GEOMETRY)) {
8!
1774
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1775
    TAOS_RETURN(code);
×
1776
  }
1777

1778
  if (pField->bytes <= pTag->bytes) {
28!
1779
    code = TSDB_CODE_MND_INVALID_ROW_BYTES;
×
1780
    TAOS_RETURN(code);
×
1781
  }
1782

1783
  pTag->bytes = pField->bytes;
28✔
1784
  pNew->tagVer++;
28✔
1785

1786
  mInfo("stb:%s, start to modify tag len %s to %d", pNew->name, pField->name, pField->bytes);
28!
1787
  TAOS_RETURN(code);
28✔
1788
}
1789

1790
static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pField,
25✔
1791
                                                 int32_t nCols) {
1792
  // if (pColCmpr == NULL || colName == NULL) return -1;
1793

1794
  if (taosArrayGetSize(pField) != nCols) return TSDB_CODE_FAILED;
25!
1795
  TAOS_FIELD *p = taosArrayGet(pField, 0);
25✔
1796

1797
  int32_t code = 0;
25✔
1798
  int32_t idx = mndFindSuperTableColumnIndex(pOld, p->name);
25✔
1799
  if (idx == -1) {
25!
1800
    code = TSDB_CODE_MND_COLUMN_NOT_EXIST;
×
1801
    TAOS_RETURN(code);
×
1802
  }
1803
  SSchema *pTarget = &pOld->pColumns[idx];
25✔
1804
  col_id_t colId = pTarget->colId;
25✔
1805
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
25!
1806

1807
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
25!
1808
  code = validColCmprByType(pTarget->type, p->bytes);
25✔
1809
  if (code != TSDB_CODE_SUCCESS) {
25✔
1810
    TAOS_RETURN(code);
1✔
1811
  }
1812

1813
  int8_t updated = 0;
24✔
1814
  for (int i = 0; i < pNew->numOfColumns; i++) {
158!
1815
    SColCmpr *pCmpr = &pNew->pCmpr[i];
158✔
1816
    if (pCmpr->id == colId) {
158✔
1817
      uint32_t dst = 0;
24✔
1818
      updated = tUpdateCompress(pCmpr->alg, p->bytes, TSDB_COLVAL_COMPRESS_DISABLED, TSDB_COLVAL_LEVEL_DISABLED,
24✔
1819
                                TSDB_COLVAL_LEVEL_MEDIUM, &dst);
1820
      if (updated > 0) pCmpr->alg = dst;
24✔
1821
      break;
24✔
1822
    }
1823
  }
1824

1825
  if (updated == 0) {
24✔
1826
    code = TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST;
7✔
1827
    TAOS_RETURN(code);
7✔
1828
  } else if (updated == -1) {
17!
1829
    code = TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
×
1830
    TAOS_RETURN(code);
×
1831
  }
1832

1833
  pNew->colVer++;
17✔
1834

1835
  TAOS_RETURN(code);
17✔
1836
}
1837
static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ncols,
175✔
1838
                                      int8_t withCompress) {
1839
  int32_t code = 0;
175✔
1840
  if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) {
175!
1841
    code = TSDB_CODE_MND_TOO_MANY_COLUMNS;
×
1842
    TAOS_RETURN(code);
×
1843
  }
1844

1845
  if ((code = grantCheck(TSDB_GRANT_TIMESERIES)) != 0) {
175!
1846
    TAOS_RETURN(code);
×
1847
  }
1848

1849
  if (!mndValidateSchema(pOld->pColumns, pOld->numOfColumns, pFields, TSDB_MAX_BYTES_PER_ROW)) {
175!
1850
    code = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
×
1851
    TAOS_RETURN(code);
×
1852
  }
1853

1854
  pNew->numOfColumns = pNew->numOfColumns + ncols;
175✔
1855

1856
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
175!
1857

1858
  if (pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ncols) {
175!
1859
    code = TSDB_CODE_OUT_OF_RANGE;
×
1860
    TAOS_RETURN(code);
×
1861
  }
1862

1863
  for (int32_t i = 0; i < ncols; i++) {
337✔
1864
    if (withCompress) {
175✔
1865
      SFieldWithOptions *pField = taosArrayGet(pFields, i);
2✔
1866
      if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
2!
1867
        code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
×
1868
        TAOS_RETURN(code);
×
1869
      }
1870

1871
      if (mndFindSuperTableTagIndex(pOld, pField->name) >= 0) {
2!
1872
        code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
×
1873
        TAOS_RETURN(code);
×
1874
      }
1875

1876
      SSchema *pSchema = &pNew->pColumns[pOld->numOfColumns + i];
2✔
1877
      pSchema->bytes = pField->bytes;
2✔
1878
      pSchema->type = pField->type;
2✔
1879
      memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
2✔
1880
      pSchema->colId = pNew->nextColId;
2✔
1881
      pNew->nextColId++;
2✔
1882

1883
      SColCmpr *pCmpr = &pNew->pCmpr[pOld->numOfColumns + i];
2✔
1884
      pCmpr->id = pSchema->colId;
2✔
1885
      pCmpr->alg = pField->compress;
2✔
1886
      mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name);
2!
1887
    } else {
1888
      SField *pField = taosArrayGet(pFields, i);
173✔
1889
      if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
173✔
1890
        code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
7✔
1891
        TAOS_RETURN(code);
7✔
1892
      }
1893

1894
      if (mndFindSuperTableTagIndex(pOld, pField->name) >= 0) {
166✔
1895
        code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
6✔
1896
        TAOS_RETURN(code);
6✔
1897
      }
1898

1899
      SSchema *pSchema = &pNew->pColumns[pOld->numOfColumns + i];
160✔
1900
      pSchema->bytes = pField->bytes;
160✔
1901
      pSchema->type = pField->type;
160✔
1902
      memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
160✔
1903
      pSchema->colId = pNew->nextColId;
160✔
1904
      pNew->nextColId++;
160✔
1905

1906
      SColCmpr *pCmpr = &pNew->pCmpr[pOld->numOfColumns + i];
160✔
1907
      pCmpr->id = pSchema->colId;
160✔
1908
      pCmpr->alg = createDefaultColCmprByType(pSchema->type);
160✔
1909
      mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name);
160!
1910
    }
1911
  }
1912

1913
  pNew->colVer++;
162✔
1914
  TAOS_RETURN(code);
162✔
1915
}
1916

1917
static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *colName) {
191✔
1918
  int32_t code = 0;
191✔
1919
  int32_t col = mndFindSuperTableColumnIndex(pOld, colName);
191✔
1920
  if (col < 0) {
191✔
1921
    code = TSDB_CODE_MND_COLUMN_NOT_EXIST;
6✔
1922
    TAOS_RETURN(code);
6✔
1923
  }
1924

1925
  if (col == 0) {
185✔
1926
    code = TSDB_CODE_MND_INVALID_STB_ALTER_OPTION;
4✔
1927
    TAOS_RETURN(code);
4✔
1928
  }
1929

1930
  if (pOld->numOfColumns == 2) {
181!
1931
    code = TSDB_CODE_PAR_INVALID_DROP_COL;
×
1932
    TAOS_RETURN(code);
×
1933
  }
1934

1935
  col_id_t colId = pOld->pColumns[col].colId;
181✔
1936
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
181✔
1937

1938
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
155!
1939

1940
  int32_t sz = pNew->numOfColumns - col - 1;
155✔
1941
  memmove(pNew->pColumns + col, pNew->pColumns + col + 1, sizeof(SSchema) * sz);
155✔
1942
  memmove(pNew->pCmpr + col, pNew->pCmpr + col + 1, sizeof(SColCmpr) * sz);
155✔
1943
  pNew->numOfColumns--;
155✔
1944

1945
  pNew->colVer++;
155✔
1946
  mInfo("stb:%s, start to drop col %s", pNew->name, colName);
155!
1947
  TAOS_RETURN(code);
155✔
1948
}
1949

1950
static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
55✔
1951
  int32_t code = 0;
55✔
1952
  int32_t col = mndFindSuperTableColumnIndex(pOld, pField->name);
55✔
1953
  if (col < 0) {
55✔
1954
    code = TSDB_CODE_MND_COLUMN_NOT_EXIST;
1✔
1955
    TAOS_RETURN(code);
1✔
1956
  }
1957

1958
  col_id_t colId = pOld->pColumns[col].colId;
54✔
1959

1960
  uint32_t nLen = 0;
54✔
1961
  for (int32_t i = 0; i < pOld->numOfColumns; ++i) {
319✔
1962
    nLen += (pOld->pColumns[i].colId == colId) ? pField->bytes : pOld->pColumns[i].bytes;
265✔
1963
  }
1964

1965
  if (nLen > TSDB_MAX_BYTES_PER_ROW) {
54!
1966
    code = TSDB_CODE_MND_INVALID_ROW_BYTES;
×
1967
    TAOS_RETURN(code);
×
1968
  }
1969

1970
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
54✔
1971

1972
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
42!
1973

1974
  SSchema *pCol = pNew->pColumns + col;
42✔
1975
  if (!(pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_VARBINARY ||
42✔
1976
        pCol->type == TSDB_DATA_TYPE_NCHAR || pCol->type == TSDB_DATA_TYPE_GEOMETRY)) {
8!
1977
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1978
    TAOS_RETURN(code);
×
1979
  }
1980

1981
  if (pField->bytes <= pCol->bytes) {
42!
1982
    code = TSDB_CODE_MND_INVALID_ROW_BYTES;
×
1983
    TAOS_RETURN(code);
×
1984
  }
1985

1986
  pCol->bytes = pField->bytes;
42✔
1987
  pNew->colVer++;
42✔
1988

1989
  mInfo("stb:%s, start to modify col len %s to %d", pNew->name, pField->name, pField->bytes);
42!
1990
  TAOS_RETURN(code);
42✔
1991
}
1992

1993
static int32_t mndSetAlterStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
892✔
1994
  int32_t  code = 0;
892✔
1995
  SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
892✔
1996
  if (pRedoRaw == NULL) {
892!
1997
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1998
    if (terrno != 0) code = terrno;
×
1999
    TAOS_RETURN(code);
×
2000
  }
2001
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
892!
2002
    sdbFreeRaw(pRedoRaw);
×
2003
    TAOS_RETURN(code);
×
2004
  }
2005
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
892!
2006

2007
  TAOS_RETURN(code);
892✔
2008
}
2009

2010
static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
892✔
2011
  int32_t  code = 0;
892✔
2012
  SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
892✔
2013
  if (pCommitRaw == NULL) {
892!
2014
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2015
    if (terrno != 0) code = terrno;
×
2016
    TAOS_RETURN(code);
×
2017
  }
2018
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
892!
2019
    sdbFreeRaw(pCommitRaw);
×
2020
    TAOS_RETURN(code);
×
2021
  }
2022
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
892!
2023

2024
  TAOS_RETURN(code);
892✔
2025
}
2026

2027
static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void *alterOriData,
892✔
2028
                                         int32_t alterOriDataLen) {
2029
  int32_t code = 0;
892✔
2030
  SSdb   *pSdb = pMnode->pSdb;
892✔
2031
  SVgObj *pVgroup = NULL;
892✔
2032
  void   *pIter = NULL;
892✔
2033
  int32_t contLen;
2034

2035
  while (1) {
5,603✔
2036
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
6,495✔
2037
    if (pIter == NULL) break;
6,495✔
2038
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
5,603✔
2039
      sdbRelease(pSdb, pVgroup);
3,505✔
2040
      continue;
3,505✔
2041
    }
2042

2043
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, alterOriData, alterOriDataLen);
2,098✔
2044
    if (pReq == NULL) {
2,098!
2045
      sdbCancelFetch(pSdb, pIter);
×
2046
      sdbRelease(pSdb, pVgroup);
×
2047
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2048
      if (terrno != 0) code = terrno;
×
2049
      TAOS_RETURN(code);
×
2050
    }
2051
    STransAction action = {0};
2,098✔
2052
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
2,098✔
2053
    action.pCont = pReq;
2,098✔
2054
    action.contLen = contLen;
2,098✔
2055
    action.msgType = TDMT_VND_ALTER_STB;
2,098✔
2056
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,098!
2057
      taosMemoryFree(pReq);
×
2058
      sdbCancelFetch(pSdb, pIter);
×
2059
      sdbRelease(pSdb, pVgroup);
×
2060
      TAOS_RETURN(code);
×
2061
    }
2062
    sdbRelease(pSdb, pVgroup);
2,098✔
2063
  }
2064

2065
  TAOS_RETURN(code);
892✔
2066
}
2067

2068
static int32_t mndSetAlterStbRedoActions2(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb,
×
2069
                                          void *alterOriData, int32_t alterOriDataLen) {
2070
  int32_t code = 0;
×
2071
  SSdb   *pSdb = pMnode->pSdb;
×
2072
  SVgObj *pVgroup = NULL;
×
2073
  void   *pIter = NULL;
×
2074
  int32_t contLen;
2075

2076
  while (1) {
×
2077
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2078
    if (pIter == NULL) break;
×
2079
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
×
2080
      sdbRelease(pSdb, pVgroup);
×
2081
      continue;
×
2082
    }
2083

2084
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, alterOriData, alterOriDataLen);
×
2085
    if (pReq == NULL) {
×
2086
      sdbCancelFetch(pSdb, pIter);
×
2087
      sdbRelease(pSdb, pVgroup);
×
2088
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2089
      if (terrno != 0) code = terrno;
×
2090
      TAOS_RETURN(code);
×
2091
    }
2092
    STransAction action = {0};
×
2093
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
2094
    action.pCont = pReq;
×
2095
    action.contLen = contLen;
×
2096
    action.msgType = TDMT_VND_CREATE_INDEX;
×
2097
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
2098
      taosMemoryFree(pReq);
×
2099
      sdbCancelFetch(pSdb, pIter);
×
2100
      sdbRelease(pSdb, pVgroup);
×
2101
      TAOS_RETURN(code);
×
2102
    }
2103
    sdbRelease(pSdb, pVgroup);
×
2104
  }
2105

2106
  TAOS_RETURN(code);
×
2107
}
2108
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
796,608✔
2109
  int32_t code = 0;
796,608✔
2110
  taosRLockLatch(&pStb->lock);
796,608✔
2111

2112
  int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
796,626✔
2113
  pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
796,626✔
2114
  if (pRsp->pSchemas == NULL) {
796,615!
2115
    taosRUnLockLatch(&pStb->lock);
×
2116
    code = terrno;
×
2117
    TAOS_RETURN(code);
×
2118
  }
2119
  pRsp->pSchemaExt = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaExt));
796,615✔
2120
  if (pRsp->pSchemaExt == NULL) {
796,619!
2121
    taosRUnLockLatch(&pStb->lock);
×
2122
    code = terrno;
×
2123
    TAOS_RETURN(code);
×
2124
  }
2125

2126
  tstrncpy(pRsp->dbFName, pStb->db, sizeof(pRsp->dbFName));
796,625✔
2127
  tstrncpy(pRsp->tbName, tbName, sizeof(pRsp->tbName));
796,625✔
2128
  tstrncpy(pRsp->stbName, tbName, sizeof(pRsp->stbName));
796,625✔
2129
  pRsp->dbId = pDb->uid;
796,625✔
2130
  pRsp->numOfTags = pStb->numOfTags;
796,625✔
2131
  pRsp->numOfColumns = pStb->numOfColumns;
796,625✔
2132
  pRsp->precision = pDb->cfg.precision;
796,625✔
2133
  pRsp->tableType = TSDB_SUPER_TABLE;
796,625✔
2134
  pRsp->sversion = pStb->colVer;
796,625✔
2135
  pRsp->tversion = pStb->tagVer;
796,625✔
2136
  pRsp->suid = pStb->uid;
796,625✔
2137
  pRsp->tuid = pStb->uid;
796,625✔
2138

2139
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
20,823,197✔
2140
    SSchema *pSchema = &pRsp->pSchemas[i];
20,026,572✔
2141
    SSchema *pSrcSchema = &pStb->pColumns[i];
20,026,572✔
2142
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
20,026,572✔
2143
    pSchema->type = pSrcSchema->type;
20,026,572✔
2144
    pSchema->flags = pSrcSchema->flags;
20,026,572✔
2145
    pSchema->colId = pSrcSchema->colId;
20,026,572✔
2146
    pSchema->bytes = pSrcSchema->bytes;
20,026,572✔
2147
  }
2148

2149
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
12,593,111✔
2150
    SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns];
11,796,486✔
2151
    SSchema *pSrcSchema = &pStb->pTags[i];
11,796,486✔
2152
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
11,796,486✔
2153
    pSchema->type = pSrcSchema->type;
11,796,486✔
2154
    pSchema->flags = pSrcSchema->flags;
11,796,486✔
2155
    pSchema->colId = pSrcSchema->colId;
11,796,486✔
2156
    pSchema->bytes = pSrcSchema->bytes;
11,796,486✔
2157
  }
2158
  for (int32_t i = 0; i < pStb->numOfColumns; i++) {
20,823,891✔
2159
    SColCmpr   *pCmpr = &pStb->pCmpr[i];
20,027,266✔
2160
    SSchemaExt *pSchEx = &pRsp->pSchemaExt[i];
20,027,266✔
2161
    pSchEx->colId = pCmpr->id;
20,027,266✔
2162
    pSchEx->compress = pCmpr->alg;
20,027,266✔
2163
  }
2164

2165
  taosRUnLockLatch(&pStb->lock);
796,625✔
2166
  TAOS_RETURN(code);
796,624✔
2167
}
2168

2169
static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableCfgRsp *pRsp) {
30✔
2170
  int32_t code = 0;
30✔
2171
  taosRLockLatch(&pStb->lock);
30✔
2172

2173
  int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
30✔
2174
  pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
30✔
2175
  if (pRsp->pSchemas == NULL) {
30!
2176
    taosRUnLockLatch(&pStb->lock);
×
2177
    code = terrno;
×
2178
    TAOS_RETURN(code);
×
2179
  }
2180

2181
  tstrncpy(pRsp->dbFName, pStb->db, sizeof(pRsp->dbFName));
30✔
2182
  tstrncpy(pRsp->tbName, tbName, sizeof(pRsp->tbName));
30✔
2183
  tstrncpy(pRsp->stbName, tbName, sizeof(pRsp->stbName));
30✔
2184
  pRsp->numOfTags = pStb->numOfTags;
30✔
2185
  pRsp->numOfColumns = pStb->numOfColumns;
30✔
2186
  pRsp->tableType = TSDB_SUPER_TABLE;
30✔
2187
  pRsp->delay1 = pStb->maxdelay[0];
30✔
2188
  pRsp->delay2 = pStb->maxdelay[1];
30✔
2189
  pRsp->watermark1 = pStb->watermark[0];
30✔
2190
  pRsp->watermark2 = pStb->watermark[1];
30✔
2191
  pRsp->ttl = pStb->ttl;
30✔
2192
  pRsp->commentLen = pStb->commentLen;
30✔
2193
  if (pStb->commentLen > 0) {
30!
2194
    pRsp->pComment = taosStrdup(pStb->comment);
×
2195
  }
2196

2197
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
138✔
2198
    SSchema *pSchema = &pRsp->pSchemas[i];
108✔
2199
    SSchema *pSrcSchema = &pStb->pColumns[i];
108✔
2200
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
108✔
2201
    pSchema->type = pSrcSchema->type;
108✔
2202
    pSchema->flags = pSrcSchema->flags;
108✔
2203
    pSchema->colId = pSrcSchema->colId;
108✔
2204
    pSchema->bytes = pSrcSchema->bytes;
108✔
2205
  }
2206

2207
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
83✔
2208
    SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns];
53✔
2209
    SSchema *pSrcSchema = &pStb->pTags[i];
53✔
2210
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
53✔
2211
    pSchema->type = pSrcSchema->type;
53✔
2212
    pSchema->flags = pSrcSchema->flags;
53✔
2213
    pSchema->colId = pSrcSchema->colId;
53✔
2214
    pSchema->bytes = pSrcSchema->bytes;
53✔
2215
  }
2216

2217
  if (pStb->numOfFuncs > 0) {
30!
2218
    pRsp->pFuncs = taosArrayDup(pStb->pFuncs, NULL);
×
2219
  }
2220

2221
  pRsp->pSchemaExt = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaExt));
30✔
2222
  for (int32_t i = 0; i < pStb->numOfColumns; i++) {
138✔
2223
    SColCmpr *pCmpr = &pStb->pCmpr[i];
108✔
2224

2225
    SSchemaExt *pSchExt = &pRsp->pSchemaExt[i];
108✔
2226
    pSchExt->colId = pCmpr->id;
108✔
2227
    pSchExt->compress = pCmpr->alg;
108✔
2228
  }
2229

2230
  taosRUnLockLatch(&pStb->lock);
30✔
2231
  TAOS_RETURN(code);
30✔
2232
}
2233

2234
static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bool *schema, bool *sma) {
76,374✔
2235
  int32_t code = 0;
76,374✔
2236
  char    tbFName[TSDB_TABLE_FNAME_LEN] = {0};
76,374✔
2237
  snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName);
76,374✔
2238

2239
  SDbObj *pDb = mndAcquireDb(pMnode, pStbVer->dbFName);
76,374✔
2240
  if (pDb == NULL) {
76,374✔
2241
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
358✔
2242
    TAOS_RETURN(code);
358✔
2243
  }
2244

2245
  if (pDb->uid != pStbVer->dbId) {
76,016✔
2246
    mndReleaseDb(pMnode, pDb);
215✔
2247
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
215✔
2248
    TAOS_RETURN(code);
215✔
2249
  }
2250

2251
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
75,801✔
2252
  if (pStb == NULL) {
75,801✔
2253
    mndReleaseDb(pMnode, pDb);
143✔
2254
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
143✔
2255
    TAOS_RETURN(code);
143✔
2256
  }
2257

2258
  taosRLockLatch(&pStb->lock);
75,658✔
2259

2260
  if (pStbVer->sversion != pStb->colVer || pStbVer->tversion != pStb->tagVer) {
75,658✔
2261
    *schema = true;
367✔
2262
  } else {
2263
    *schema = false;
75,291✔
2264
  }
2265

2266
  if (pStbVer->smaVer && pStbVer->smaVer != pStb->smaVer) {
75,658!
2267
    *sma = true;
×
2268
  } else {
2269
    *sma = false;
75,658✔
2270
  }
2271

2272
  taosRUnLockLatch(&pStb->lock);
75,658✔
2273

2274
  mndReleaseDb(pMnode, pDb);
75,658✔
2275
  mndReleaseStb(pMnode, pStb);
75,658✔
2276
  return TSDB_CODE_SUCCESS;
75,658✔
2277
}
2278

2279
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
790,126✔
2280
  int32_t code = 0;
790,126✔
2281
  char    tbFName[TSDB_TABLE_FNAME_LEN] = {0};
790,126✔
2282
  snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
790,126✔
2283

2284
  SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
790,126✔
2285
  if (pDb == NULL) {
790,115!
2286
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2287
    TAOS_RETURN(code);
×
2288
  }
2289

2290
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
790,115✔
2291
  if (pStb == NULL) {
790,114✔
2292
    mndReleaseDb(pMnode, pDb);
1,760✔
2293
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
1,760✔
2294
    TAOS_RETURN(code);
1,760✔
2295
  }
2296

2297
  code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp);
788,354✔
2298
  mndReleaseDb(pMnode, pDb);
788,359✔
2299
  mndReleaseStb(pMnode, pStb);
788,365✔
2300
  TAOS_RETURN(code);
788,366✔
2301
}
2302

2303
static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
30✔
2304
  int32_t code = 0;
30✔
2305
  char    tbFName[TSDB_TABLE_FNAME_LEN] = {0};
30✔
2306
  snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
30✔
2307

2308
  SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
30✔
2309
  if (pDb == NULL) {
30!
2310
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2311
    TAOS_RETURN(code);
×
2312
  }
2313

2314
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
30✔
2315
  if (pStb == NULL) {
30!
2316
    mndReleaseDb(pMnode, pDb);
×
2317
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
2318
    TAOS_RETURN(code);
×
2319
  }
2320

2321
  code = mndBuildStbCfgImp(pDb, pStb, tbName, pRsp);
30✔
2322

2323
  mndReleaseDb(pMnode, pDb);
30✔
2324
  mndReleaseStb(pMnode, pStb);
30✔
2325
  TAOS_RETURN(code);
30✔
2326
}
2327

2328
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, int32_t *pLen) {
778✔
2329
  int32_t       code = 0;
778✔
2330
  SEncoder      ec = {0};
778✔
2331
  uint32_t      contLen = 0;
778✔
2332
  SMAlterStbRsp alterRsp = {0};
778✔
2333
  SName         name = {0};
778✔
2334
  TAOS_CHECK_RETURN(tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
778!
2335

2336
  alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
778✔
2337
  if (NULL == alterRsp.pMeta) {
778!
2338
    code = terrno;
×
2339
    TAOS_RETURN(code);
×
2340
  }
2341

2342
  code = mndBuildStbSchemaImp(pDb, pObj, name.tname, alterRsp.pMeta);
778✔
2343
  if (code) {
778!
2344
    tFreeSMAlterStbRsp(&alterRsp);
×
2345
    return code;
×
2346
  }
2347

2348
  tEncodeSize(tEncodeSMAlterStbRsp, &alterRsp, contLen, code);
778!
2349
  if (code) {
778!
2350
    tFreeSMAlterStbRsp(&alterRsp);
×
2351
    return code;
×
2352
  }
2353

2354
  void *cont = taosMemoryMalloc(contLen);
778✔
2355
  if (NULL == cont) {
778!
2356
    code = terrno;
×
2357
    tFreeSMAlterStbRsp(&alterRsp);
×
2358
    TAOS_RETURN(code);
×
2359
  }
2360
  tEncoderInit(&ec, cont, contLen);
778✔
2361
  code = tEncodeSMAlterStbRsp(&ec, &alterRsp);
778✔
2362
  tEncoderClear(&ec);
778✔
2363

2364
  tFreeSMAlterStbRsp(&alterRsp);
778✔
2365

2366
  if (code < 0) TAOS_RETURN(code);
778!
2367

2368
  *pCont = cont;
778✔
2369
  *pLen = contLen;
778✔
2370

2371
  TAOS_RETURN(code);
778✔
2372
}
2373

2374
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char *dbFName, char *stbFName, void **pCont, int32_t *pLen) {
7,486✔
2375
  int32_t code = -1;
7,486✔
2376
  SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
7,486✔
2377
  if (NULL == pDb) {
7,486!
2378
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2379
    if (terrno != 0) code = terrno;
×
2380
    TAOS_RETURN(code);
×
2381
  }
2382

2383
  SStbObj *pObj = mndAcquireStb(pMnode, stbFName);
7,486✔
2384
  if (NULL == pObj) {
7,486✔
2385
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
2✔
2386
    if (terrno != 0) code = terrno;
2!
2387
    goto _OVER;
2✔
2388
  }
2389

2390
  SEncoder       ec = {0};
7,484✔
2391
  uint32_t       contLen = 0;
7,484✔
2392
  SMCreateStbRsp stbRsp = {0};
7,484✔
2393
  SName          name = {0};
7,484✔
2394
  TAOS_CHECK_GOTO(tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE), NULL, _OVER);
7,484!
2395

2396
  stbRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
7,484✔
2397
  if (NULL == stbRsp.pMeta) {
7,484!
2398
    code = terrno;
×
2399
    goto _OVER;
×
2400
  }
2401

2402
  code = mndBuildStbSchemaImp(pDb, pObj, name.tname, stbRsp.pMeta);
7,484✔
2403
  if (code) {
7,484!
2404
    tFreeSMCreateStbRsp(&stbRsp);
×
2405
    goto _OVER;
×
2406
  }
2407

2408
  tEncodeSize(tEncodeSMCreateStbRsp, &stbRsp, contLen, code);
7,484!
2409
  if (code) {
7,484!
2410
    tFreeSMCreateStbRsp(&stbRsp);
×
2411
    goto _OVER;
×
2412
  }
2413

2414
  void *cont = taosMemoryMalloc(contLen);
7,484✔
2415
  if (NULL == cont) {
7,484!
2416
    code = terrno;
×
2417
    tFreeSMCreateStbRsp(&stbRsp);
×
2418
    goto _OVER;
×
2419
  }
2420
  tEncoderInit(&ec, cont, contLen);
7,484✔
2421
  TAOS_CHECK_GOTO(tEncodeSMCreateStbRsp(&ec, &stbRsp), NULL, _OVER);
7,484!
2422
  tEncoderClear(&ec);
7,484✔
2423

2424
  tFreeSMCreateStbRsp(&stbRsp);
7,484✔
2425

2426
  *pCont = cont;
7,484✔
2427
  *pLen = contLen;
7,484✔
2428

2429
  code = 0;
7,484✔
2430

2431
_OVER:
7,486✔
2432
  if (pObj) {
7,486✔
2433
    mndReleaseStb(pMnode, pObj);
7,484✔
2434
  }
2435

2436
  if (pDb) {
7,486!
2437
    mndReleaseDb(pMnode, pDb);
7,486✔
2438
  }
2439

2440
  TAOS_RETURN(code);
7,486✔
2441
}
2442

2443
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
707✔
2444
                              void *alterOriData, int32_t alterOriDataLen) {
2445
  int32_t code = -1;
707✔
2446
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-stb");
707✔
2447
  if (pTrans == NULL) {
707!
2448
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2449
    if (terrno != 0) code = terrno;
×
2450
    goto _OVER;
×
2451
  }
2452

2453
  mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
707!
2454
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
707✔
2455
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
707!
2456

2457
  if (needRsp) {
707✔
2458
    void   *pCont = NULL;
593✔
2459
    int32_t contLen = 0;
593✔
2460
    TAOS_CHECK_GOTO(mndBuildSMAlterStbRsp(pDb, pStb, &pCont, &contLen), NULL, _OVER);
593!
2461
    mndTransSetRpcRsp(pTrans, pCont, contLen);
593✔
2462
  }
2463

2464
  TAOS_CHECK_GOTO(mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
707!
2465
  TAOS_CHECK_GOTO(mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
707!
2466
  TAOS_CHECK_GOTO(mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen), NULL, _OVER);
707!
2467
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
707!
2468

2469
  code = 0;
707✔
2470

2471
_OVER:
707✔
2472
  mndTransDrop(pTrans);
707✔
2473
  TAOS_RETURN(code);
707✔
2474
}
2475

2476
static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
185✔
2477
                                             void *alterOriData, int32_t alterOriDataLen, const SMAlterStbReq *pAlter) {
2478
  int32_t code = -1;
185✔
2479
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-stb");
185✔
2480
  if (pTrans == NULL) {
185!
2481
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2482
    if (terrno != 0) code = terrno;
×
2483
    goto _OVER;
×
2484
  }
2485

2486
  mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
185!
2487
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
185✔
2488

2489
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
185!
2490

2491
  if (needRsp) {
185!
2492
    void   *pCont = NULL;
185✔
2493
    int32_t contLen = 0;
185✔
2494
    TAOS_CHECK_GOTO(mndBuildSMAlterStbRsp(pDb, pStb, &pCont, &contLen), NULL, _OVER);
185!
2495
    mndTransSetRpcRsp(pTrans, pCont, contLen);
185✔
2496
  }
2497

2498
  if (pAlter->alterType == TSDB_ALTER_TABLE_DROP_TAG) {
185✔
2499
    SIdxObj idxObj = {0};
122✔
2500
    SField *pField0 = taosArrayGet(pAlter->pFields, 0);
122✔
2501
    bool    exist = false;
122✔
2502
    if (mndGetIdxsByTagName(pMnode, pStb, pField0->name, &idxObj) == 0) {
122✔
2503
      exist = true;
6✔
2504
    }
2505
    TAOS_CHECK_GOTO(mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
122!
2506
    TAOS_CHECK_GOTO(mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
122!
2507

2508
    if (exist == true) {
122✔
2509
      TAOS_CHECK_GOTO(mndSetDropIdxPrepareLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
6!
2510
      TAOS_CHECK_GOTO(mndSetDropIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
6!
2511
    }
2512

2513
    TAOS_CHECK_GOTO(mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen), NULL, _OVER);
122!
2514
    TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
122!
2515

2516
  } else if (pAlter->alterType == TSDB_ALTER_TABLE_UPDATE_TAG_NAME) {
63!
2517
    SIdxObj     idxObj = {0};
63✔
2518
    SField     *pField0 = taosArrayGet(pAlter->pFields, 0);
63✔
2519
    SField     *pField1 = taosArrayGet(pAlter->pFields, 1);
63✔
2520
    const char *oTagName = pField0->name;
63✔
2521
    const char *nTagName = pField1->name;
63✔
2522
    bool        exist = false;
63✔
2523

2524
    if (mndGetIdxsByTagName(pMnode, pStb, pField0->name, &idxObj) == 0) {
63✔
2525
      exist = true;
32✔
2526
    }
2527

2528
    TAOS_CHECK_GOTO(mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
63!
2529
    TAOS_CHECK_GOTO(mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
63!
2530

2531
    if (exist == true) {
63✔
2532
      memcpy(idxObj.colName, nTagName, strlen(nTagName));
32✔
2533
      idxObj.colName[strlen(nTagName)] = 0;
32✔
2534
      TAOS_CHECK_GOTO(mndSetAlterIdxPrepareLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
32!
2535
      TAOS_CHECK_GOTO(mndSetAlterIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
32!
2536
    }
2537

2538
    TAOS_CHECK_GOTO(mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen), NULL, _OVER);
63!
2539
    TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
63!
2540
  }
2541
  code = 0;
185✔
2542

2543
_OVER:
185✔
2544
  mndTransDrop(pTrans);
185✔
2545
  TAOS_RETURN(code);
185✔
2546
}
2547

2548
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
956✔
2549
  bool    needRsp = true;
956✔
2550
  int32_t code = -1;
956✔
2551
  SField *pField0 = NULL;
956✔
2552

2553
  SStbObj stbObj = {0};
956✔
2554
  taosRLockLatch(&pOld->lock);
956✔
2555
  memcpy(&stbObj, pOld, sizeof(SStbObj));
956✔
2556
  taosRUnLockLatch(&pOld->lock);
956✔
2557
  stbObj.pColumns = NULL;
956✔
2558
  stbObj.pTags = NULL;
956✔
2559
  stbObj.pFuncs = NULL;
956✔
2560
  stbObj.pCmpr = NULL;
956✔
2561
  stbObj.updateTime = taosGetTimestampMs();
956✔
2562
  stbObj.lock = 0;
956✔
2563
  bool updateTagIndex = false;
956✔
2564
  switch (pAlter->alterType) {
956!
2565
    case TSDB_ALTER_TABLE_ADD_TAG:
201✔
2566
      code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
201✔
2567
      break;
201✔
2568
    case TSDB_ALTER_TABLE_DROP_TAG:
151✔
2569
      pField0 = taosArrayGet(pAlter->pFields, 0);
151✔
2570
      code = mndDropSuperTableTag(pMnode, pOld, &stbObj, pField0->name);
151✔
2571
      updateTagIndex = true;
151✔
2572
      break;
151✔
2573
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
100✔
2574
      code = mndAlterStbTagName(pMnode, pOld, &stbObj, pAlter->pFields);
100✔
2575
      updateTagIndex = true;
100✔
2576
      break;
100✔
2577
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
37✔
2578
      pField0 = taosArrayGet(pAlter->pFields, 0);
37✔
2579
      code = mndAlterStbTagBytes(pMnode, pOld, &stbObj, pField0);
37✔
2580
      break;
37✔
2581
    case TSDB_ALTER_TABLE_ADD_COLUMN:
173✔
2582
      code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields, 0);
173✔
2583
      break;
173✔
2584
    case TSDB_ALTER_TABLE_DROP_COLUMN:
191✔
2585
      pField0 = taosArrayGet(pAlter->pFields, 0);
191✔
2586
      code = mndDropSuperTableColumn(pMnode, pOld, &stbObj, pField0->name);
191✔
2587
      break;
191✔
2588
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
55✔
2589
      pField0 = taosArrayGet(pAlter->pFields, 0);
55✔
2590
      code = mndAlterStbColumnBytes(pMnode, pOld, &stbObj, pField0);
55✔
2591
      break;
55✔
2592
    case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
21✔
2593
      needRsp = false;
21✔
2594
      code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl);
21✔
2595
      break;
21✔
2596
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
25✔
2597
      code = mndUpdateSuperTableColumnCompress(pMnode, pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
25✔
2598
      break;
25✔
2599
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
2✔
2600
      code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields, 1);
2✔
2601
      break;
2✔
2602
    default:
×
2603
      needRsp = false;
×
2604
      terrno = TSDB_CODE_OPS_NOT_SUPPORT;
×
2605
      break;
×
2606
  }
2607

2608
  if (code != 0) goto _OVER;
956✔
2609
  if (updateTagIndex == false) {
799✔
2610
    code = mndAlterStbImp(pMnode, pReq, pDb, &stbObj, needRsp, pReq->pCont, pReq->contLen);
614✔
2611
  } else {
2612
    code = mndAlterStbAndUpdateTagIdxImp(pMnode, pReq, pDb, &stbObj, needRsp, pReq->pCont, pReq->contLen, pAlter);
185✔
2613
  }
2614

2615
_OVER:
956✔
2616
  taosMemoryFreeClear(stbObj.pTags);
956✔
2617
  taosMemoryFreeClear(stbObj.pColumns);
956✔
2618
  taosMemoryFreeClear(stbObj.pCmpr);
956✔
2619
  if (pAlter->commentLen > 0) {
956✔
2620
    taosMemoryFreeClear(stbObj.comment);
16!
2621
  }
2622
  TAOS_RETURN(code);
956✔
2623
}
2624

2625
static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
956✔
2626
  SMnode       *pMnode = pReq->info.node;
956✔
2627
  int32_t       code = -1;
956✔
2628
  SDbObj       *pDb = NULL;
956✔
2629
  SStbObj      *pStb = NULL;
956✔
2630
  SMAlterStbReq alterReq = {0};
956✔
2631

2632
  if (tDeserializeSMAlterStbReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
956!
2633
    code = TSDB_CODE_INVALID_MSG;
×
2634
    goto _OVER;
×
2635
  }
2636

2637
  mInfo("stb:%s, start to alter", alterReq.name);
956!
2638
  if (mndCheckAlterStbReq(&alterReq) != 0) goto _OVER;
956!
2639

2640
  pDb = mndAcquireDbByStb(pMnode, alterReq.name);
956✔
2641
  if (pDb == NULL) {
956!
2642
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
2643
    goto _OVER;
×
2644
  }
2645

2646
  pStb = mndAcquireStb(pMnode, alterReq.name);
956✔
2647
  if (pStb == NULL) {
956!
2648
    code = TSDB_CODE_MND_STB_NOT_EXIST;
×
2649
    goto _OVER;
×
2650
  }
2651

2652
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
956!
2653
    goto _OVER;
×
2654
  }
2655

2656
  code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
956✔
2657
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
956✔
2658

2659
  SName   name = {0};
956✔
2660
  int32_t ret = 0;
956✔
2661
  if ((ret = tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
956!
2662
    mError("stb:%s, failed to tNameFromString since %s", alterReq.name, tstrerror(ret));
×
2663

2664
  auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, name.tname, alterReq.sql, alterReq.sqlLen);
956✔
2665

2666
_OVER:
956✔
2667
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
956!
2668
    mError("stb:%s, failed to alter since %s", alterReq.name, tstrerror(code));
157!
2669
  }
2670

2671
  mndReleaseStb(pMnode, pStb);
956✔
2672
  mndReleaseDb(pMnode, pDb);
956✔
2673
  tFreeSMAltertbReq(&alterReq);
956✔
2674

2675
  TAOS_RETURN(code);
956✔
2676
}
2677

2678
static int32_t mndSetDropStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
925✔
2679
  int32_t  code = 0;
925✔
2680
  SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
925✔
2681
  if (pRedoRaw == NULL) {
925!
2682
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2683
    if (terrno != 0) code = terrno;
×
2684
    TAOS_RETURN(code);
×
2685
  }
2686
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
925!
2687
    sdbFreeRaw(pRedoRaw);
×
2688
    TAOS_RETURN(code);
×
2689
  }
2690
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
925!
2691

2692
  TAOS_RETURN(code);
925✔
2693
}
2694

2695
static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
925✔
2696
  int32_t  code = 0;
925✔
2697
  SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
925✔
2698
  if (pCommitRaw == NULL) {
925!
2699
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2700
    if (terrno != 0) code = terrno;
×
2701
    TAOS_RETURN(code);
×
2702
  }
2703
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
925!
2704
    sdbFreeRaw(pCommitRaw);
×
2705
    TAOS_RETURN(code);
×
2706
  }
2707
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
925!
2708

2709
  TAOS_RETURN(code);
925✔
2710
}
2711

2712
static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
925✔
2713
  int32_t code = 0;
925✔
2714
  SSdb   *pSdb = pMnode->pSdb;
925✔
2715
  SVgObj *pVgroup = NULL;
925✔
2716
  void   *pIter = NULL;
925✔
2717

2718
  while (1) {
3,963✔
2719
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
4,888✔
2720
    if (pIter == NULL) break;
4,888✔
2721
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
3,963✔
2722
      sdbRelease(pSdb, pVgroup);
1,773✔
2723
      continue;
1,773✔
2724
    }
2725

2726
    int32_t contLen = 0;
2,190✔
2727
    void   *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
2,190✔
2728
    if (pReq == NULL) {
2,190!
2729
      sdbCancelFetch(pSdb, pIter);
×
2730
      sdbRelease(pSdb, pVgroup);
×
2731
      code = TSDB_CODE_OUT_OF_MEMORY;
×
2732
      TAOS_RETURN(code);
×
2733
    }
2734

2735
    STransAction action = {0};
2,190✔
2736
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
2,190✔
2737
    action.pCont = pReq;
2,190✔
2738
    action.contLen = contLen;
2,190✔
2739
    action.msgType = TDMT_VND_DROP_STB;
2,190✔
2740
    action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
2,190✔
2741
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,190!
2742
      taosMemoryFree(pReq);
×
2743
      sdbCancelFetch(pSdb, pIter);
×
2744
      sdbRelease(pSdb, pVgroup);
×
2745
      TAOS_RETURN(code);
×
2746
    }
2747
    sdbRelease(pSdb, pVgroup);
2,190✔
2748
  }
2749

2750
  TAOS_RETURN(code);
925✔
2751
}
2752

2753
static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
925✔
2754
  int32_t code = -1;
925✔
2755
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stb");
925✔
2756
  if (pTrans == NULL) {
925!
2757
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2758
    if (terrno != 0) code = terrno;
×
2759
    goto _OVER;
×
2760
  }
2761

2762
  mInfo("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
925!
2763
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
925✔
2764
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
925!
2765

2766
  TAOS_CHECK_GOTO(mndSetDropStbPrepareLogs(pMnode, pTrans, pStb), NULL, _OVER);
925!
2767
  TAOS_CHECK_GOTO(mndSetDropStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
925!
2768
  TAOS_CHECK_GOTO(mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb), NULL, _OVER);
925!
2769
  TAOS_CHECK_GOTO(mndDropIdxsByStb(pMnode, pTrans, pDb, pStb), NULL, _OVER);
925!
2770
  TAOS_CHECK_GOTO(mndDropSmasByStb(pMnode, pTrans, pDb, pStb), NULL, _OVER);
925!
2771
  TAOS_CHECK_GOTO(mndUserRemoveStb(pMnode, pTrans, pStb->name), NULL, _OVER);
925!
2772
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
925!
2773
  code = 0;
925✔
2774

2775
_OVER:
925✔
2776
  mndTransDrop(pTrans);
925✔
2777
  TAOS_RETURN(code);
925✔
2778
}
2779

2780
static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) {
929✔
2781
  int32_t code = 0;
929✔
2782
  SSdb   *pSdb = pMnode->pSdb;
929✔
2783
  void   *pIter = NULL;
929✔
2784
  while (1) {
80✔
2785
    SMqTopicObj *pTopic = NULL;
1,009✔
2786
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
1,009✔
2787
    if (pIter == NULL) break;
1,009✔
2788

2789
    if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
80✔
2790
      if (pTopic->stbUid == suid) {
14!
2791
        sdbRelease(pSdb, pTopic);
×
2792
        sdbCancelFetch(pSdb, pIter);
×
2793
        TAOS_RETURN(-1);
×
2794
      }
2795
    }
2796

2797
    if (pTopic->ast == NULL) {
80✔
2798
      sdbRelease(pSdb, pTopic);
38✔
2799
      continue;
38✔
2800
    }
2801

2802
    SNode *pAst = NULL;
42✔
2803
    if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
42!
2804
      code = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
×
2805
      mError("topic:%s, create ast error", pTopic->name);
×
2806
      sdbRelease(pSdb, pTopic);
×
2807
      sdbCancelFetch(pSdb, pIter);
×
2808
      TAOS_RETURN(code);
×
2809
    }
2810

2811
    SNodeList *pNodeList = NULL;
42✔
2812
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
42!
2813
        0) {
2814
      sdbRelease(pSdb, pTopic);
×
2815
      sdbCancelFetch(pSdb, pIter);
×
2816
      TAOS_RETURN(code);
×
2817
    }
2818
    SNode *pNode = NULL;
42✔
2819
    FOREACH(pNode, pNodeList) {
42✔
2820
      SColumnNode *pCol = (SColumnNode *)pNode;
28✔
2821

2822
      if (pCol->tableId == suid) {
28!
2823
        sdbRelease(pSdb, pTopic);
×
2824
        nodesDestroyNode(pAst);
×
2825
        nodesDestroyList(pNodeList);
×
2826
        sdbCancelFetch(pSdb, pIter);
×
2827
        TAOS_RETURN(-1);
×
2828
      } else {
2829
        goto NEXT;
28✔
2830
      }
2831
    }
2832
  NEXT:
14✔
2833
    sdbRelease(pSdb, pTopic);
42✔
2834
    nodesDestroyNode(pAst);
42✔
2835
    nodesDestroyList(pNodeList);
42✔
2836
  }
2837
  TAOS_RETURN(code);
929✔
2838
}
2839

2840
static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) {
929✔
2841
  int32_t code = 0;
929✔
2842
  SSdb   *pSdb = pMnode->pSdb;
929✔
2843
  void   *pIter = NULL;
929✔
2844
  while (1) {
124✔
2845
    SStreamObj *pStream = NULL;
1,053✔
2846
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
1,053✔
2847
    if (pIter == NULL) break;
1,053✔
2848

2849
    if (pStream->targetStbUid == suid) {
128!
2850
      sdbCancelFetch(pSdb, pIter);
×
2851
      sdbRelease(pSdb, pStream);
×
2852
      TAOS_RETURN(-1);
4✔
2853
    }
2854

2855
    SNode *pAst = NULL;
128✔
2856
    if (nodesStringToNode(pStream->ast, &pAst) != 0) {
128!
2857
      code = TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
2858
      mError("stream:%s, create ast error", pStream->name);
×
2859
      sdbCancelFetch(pSdb, pIter);
×
2860
      sdbRelease(pSdb, pStream);
×
2861
      TAOS_RETURN(code);
×
2862
    }
2863

2864
    SNodeList *pNodeList = NULL;
128✔
2865
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
128!
2866
        0) {
2867
      sdbCancelFetch(pSdb, pIter);
×
2868
      sdbRelease(pSdb, pStream);
×
2869
      TAOS_RETURN(code);
×
2870
    }
2871
    SNode *pNode = NULL;
128✔
2872
    FOREACH(pNode, pNodeList) {
128!
2873
      SColumnNode *pCol = (SColumnNode *)pNode;
128✔
2874

2875
      if (pCol->tableId == suid) {
128✔
2876
        sdbCancelFetch(pSdb, pIter);
4✔
2877
        sdbRelease(pSdb, pStream);
4✔
2878
        nodesDestroyNode(pAst);
4✔
2879
        nodesDestroyList(pNodeList);
4✔
2880
        TAOS_RETURN(-1);
4✔
2881
      } else {
2882
        goto NEXT;
124✔
2883
      }
2884
    }
2885
  NEXT:
×
2886
    sdbRelease(pSdb, pStream);
124✔
2887
    nodesDestroyNode(pAst);
124✔
2888
    nodesDestroyList(pNodeList);
124✔
2889
  }
2890
  TAOS_RETURN(code);
925✔
2891
}
2892

2893
static int32_t mndProcessDropTtltbRsp(SRpcMsg *pRsp) { return 0; }
×
2894
static int32_t mndProcessTrimDbRsp(SRpcMsg *pRsp) { return 0; }
210✔
2895
static int32_t mndProcessS3MigrateDbRsp(SRpcMsg *pRsp) { return 0; }
×
2896

2897
static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
1,205✔
2898
  SMnode      *pMnode = pReq->info.node;
1,205✔
2899
  int32_t      code = -1;
1,205✔
2900
  SDbObj      *pDb = NULL;
1,205✔
2901
  SStbObj     *pStb = NULL;
1,205✔
2902
  SMDropStbReq dropReq = {0};
1,205✔
2903

2904
  TAOS_CHECK_GOTO(tDeserializeSMDropStbReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
1,205!
2905

2906
  mInfo("stb:%s, start to drop", dropReq.name);
1,205!
2907

2908
  pStb = mndAcquireStb(pMnode, dropReq.name);
1,205✔
2909
  if (pStb == NULL) {
1,205✔
2910
    if (dropReq.igNotExists) {
276✔
2911
      mInfo("stb:%s, not exist, ignore not exist is set", dropReq.name);
275!
2912
      code = 0;
275✔
2913
      goto _OVER;
275✔
2914
    } else {
2915
      code = TSDB_CODE_MND_STB_NOT_EXIST;
1✔
2916
      goto _OVER;
1✔
2917
    }
2918
  }
2919

2920
  if ((dropReq.source == TD_REQ_FROM_TAOX_OLD || dropReq.source == TD_REQ_FROM_TAOX) && pStb->uid != dropReq.suid) {
929!
2921
    code = 0;
×
2922
    goto _OVER;
×
2923
  }
2924

2925
  pDb = mndAcquireDbByStb(pMnode, dropReq.name);
929✔
2926
  if (pDb == NULL) {
929!
2927
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2928
    goto _OVER;
×
2929
  }
2930

2931
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
929!
2932
    goto _OVER;
×
2933
  }
2934

2935
  if (mndCheckDropStbForTopic(pMnode, dropReq.name, pStb->uid) < 0) {
929!
2936
    code = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
×
2937
    goto _OVER;
×
2938
  }
2939

2940
  if (mndCheckDropStbForStream(pMnode, dropReq.name, pStb->uid) < 0) {
929✔
2941
    code = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
4✔
2942
    goto _OVER;
4✔
2943
  }
2944

2945
  code = mndDropStb(pMnode, pReq, pDb, pStb);
925✔
2946
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
925!
2947

2948
  SName   name = {0};
925✔
2949
  int32_t ret = 0;
925✔
2950
  if ((ret = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
925!
2951
    mError("stb:%s, failed to tNameFromString since %s", dropReq.name, tstrerror(ret));
×
2952

2953
  auditRecord(pReq, pMnode->clusterId, "dropStb", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
925✔
2954

2955
_OVER:
1,205✔
2956
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,205✔
2957
    mError("stb:%s, failed to drop since %s", dropReq.name, tstrerror(code));
5!
2958
  }
2959

2960
  mndReleaseDb(pMnode, pDb);
1,205✔
2961
  mndReleaseStb(pMnode, pStb);
1,205✔
2962
  tFreeSMDropStbReq(&dropReq);
1,205✔
2963
  TAOS_RETURN(code);
1,205✔
2964
}
2965

2966
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
920,550✔
2967
  SMnode       *pMnode = pReq->info.node;
920,550✔
2968
  int32_t       code = -1;
920,550✔
2969
  STableInfoReq infoReq = {0};
920,550✔
2970
  STableMetaRsp metaRsp = {0};
920,550✔
2971
  SUserObj     *pUser = NULL;
920,550✔
2972

2973
  code = mndAcquireUser(pMnode, pReq->info.conn.user, &pUser);
920,550✔
2974
  if (pUser == NULL) return 0;
920,565!
2975
  bool sysinfo = pUser->sysInfo;
920,565✔
2976

2977
  TAOS_CHECK_GOTO(tDeserializeSTableInfoReq(pReq->pCont, pReq->contLen, &infoReq), NULL, _OVER);
920,565!
2978

2979
  if (0 == strcmp(infoReq.dbFName, TSDB_INFORMATION_SCHEMA_DB)) {
920,570✔
2980
    mInfo("information_schema table:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
105,982!
2981
    TAOS_CHECK_GOTO(mndBuildInsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, sysinfo, &metaRsp), NULL, _OVER);
105,982✔
2982
  } else if (0 == strcmp(infoReq.dbFName, TSDB_PERFORMANCE_SCHEMA_DB)) {
814,588✔
2983
    mInfo("performance_schema table:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
24,833!
2984
    TAOS_CHECK_GOTO(mndBuildPerfsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp), NULL, _OVER);
24,833✔
2985
  } else {
2986
    mInfo("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
789,755✔
2987
    TAOS_CHECK_GOTO(mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp), NULL, _OVER);
789,779✔
2988
  }
2989

2990
  int32_t rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
918,590✔
2991
  if (rspLen < 0) {
918,569!
2992
    code = TSDB_CODE_INVALID_MSG;
×
2993
    goto _OVER;
×
2994
  }
2995

2996
  void *pRsp = rpcMallocCont(rspLen);
918,569✔
2997
  if (pRsp == NULL) {
918,554!
2998
    code = terrno;
×
2999
    goto _OVER;
×
3000
  }
3001

3002
  if ((rspLen = tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp)) < 0) {
918,554!
3003
    code = rspLen;
×
3004
    goto _OVER;
×
3005
  }
3006
  pReq->info.rsp = pRsp;
918,581✔
3007
  pReq->info.rspLen = rspLen;
918,581✔
3008
  code = 0;
918,581✔
3009

3010
  mTrace("%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName);
918,581✔
3011

3012
_OVER:
917,022✔
3013
  if (code != 0) {
920,565✔
3014
    mError("stb:%s.%s, failed to retrieve meta since %s", infoReq.dbFName, infoReq.tbName, tstrerror(code));
1,984!
3015
  }
3016

3017
  mndReleaseUser(pMnode, pUser);
920,565✔
3018
  tFreeSTableMetaRsp(&metaRsp);
920,574✔
3019
  // TODO change to TAOS_RETURN
3020
  return code;
920,569✔
3021
}
3022

3023
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq) {
30✔
3024
  SMnode      *pMnode = pReq->info.node;
30✔
3025
  int32_t      code = -1;
30✔
3026
  STableCfgReq cfgReq = {0};
30✔
3027
  STableCfgRsp cfgRsp = {0};
30✔
3028

3029
  TAOS_CHECK_GOTO(tDeserializeSTableCfgReq(pReq->pCont, pReq->contLen, &cfgReq), NULL, _OVER);
30!
3030

3031
  char dbName[TSDB_DB_NAME_LEN] = {0};
30✔
3032
  TAOS_CHECK_GOTO(mndExtractShortDbNameFromDbFullName(cfgReq.dbFName, dbName), NULL, _OVER);
30!
3033
  if (0 == strcmp(dbName, TSDB_INFORMATION_SCHEMA_DB)) {
30!
3034
    mInfo("information_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
×
3035
    TAOS_CHECK_GOTO(mndBuildInsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp), NULL, _OVER);
×
3036
  } else if (0 == strcmp(dbName, TSDB_PERFORMANCE_SCHEMA_DB)) {
30!
3037
    mInfo("performance_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
×
3038
    TAOS_CHECK_GOTO(mndBuildPerfsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp), NULL, _OVER);
×
3039
  } else {
3040
    mInfo("stb:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
30!
3041
    TAOS_CHECK_GOTO(mndBuildStbCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp), NULL, _OVER);
30!
3042
  }
3043

3044
  int32_t rspLen = tSerializeSTableCfgRsp(NULL, 0, &cfgRsp);
30✔
3045
  if (rspLen < 0) {
30!
3046
    code = TSDB_CODE_INVALID_MSG;
×
3047
    goto _OVER;
×
3048
  }
3049

3050
  void *pRsp = rpcMallocCont(rspLen);
30✔
3051
  if (pRsp == NULL) {
30!
3052
    code = terrno;
×
3053
    goto _OVER;
×
3054
  }
3055

3056
  if ((rspLen = tSerializeSTableCfgRsp(pRsp, rspLen, &cfgRsp)) < 0) {
30!
3057
    code = rspLen;
×
3058
    goto _OVER;
×
3059
  }
3060
  pReq->info.rsp = pRsp;
30✔
3061
  pReq->info.rspLen = rspLen;
30✔
3062
  code = 0;
30✔
3063

3064
  mTrace("%s.%s, cfg is retrieved", cfgReq.dbFName, cfgReq.tbName);
30✔
3065

3066
_OVER:
26✔
3067
  if (code != 0) {
30!
3068
    mError("stb:%s.%s, failed to retrieve cfg since %s", cfgReq.dbFName, cfgReq.tbName, tstrerror(code));
×
3069
  }
3070

3071
  tFreeSTableCfgRsp(&cfgRsp);
30✔
3072
  TAOS_RETURN(code);
30✔
3073
}
3074

3075
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t numOfStbs, void **ppRsp,
62,878✔
3076
                           int32_t *pRspLen) {
3077
  int32_t   code = 0;
62,878✔
3078
  SSTbHbRsp hbRsp = {0};
62,878✔
3079
  hbRsp.pMetaRsp = taosArrayInit(numOfStbs, sizeof(STableMetaRsp));
62,878✔
3080
  if (hbRsp.pMetaRsp == NULL) {
62,878!
3081
    code = terrno;
×
3082
    TAOS_RETURN(code);
×
3083
  }
3084

3085
  hbRsp.pIndexRsp = taosArrayInit(numOfStbs, sizeof(STableIndexRsp));
62,878✔
3086
  if (NULL == hbRsp.pIndexRsp) {
62,878!
3087
    taosArrayDestroy(hbRsp.pMetaRsp);
×
3088
    code = terrno;
×
3089
    TAOS_RETURN(code);
×
3090
  }
3091

3092
  for (int32_t i = 0; i < numOfStbs; ++i) {
139,252✔
3093
    SSTableVersion *pStbVersion = &pStbVersions[i];
76,374✔
3094
    pStbVersion->suid = be64toh(pStbVersion->suid);
76,374✔
3095
    pStbVersion->sversion = ntohl(pStbVersion->sversion);
76,374✔
3096
    pStbVersion->tversion = ntohl(pStbVersion->tversion);
76,374✔
3097
    pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
76,374✔
3098

3099
    bool    schema = false;
76,374✔
3100
    bool    sma = false;
76,374✔
3101
    int32_t code = mndValidateStbVersion(pMnode, pStbVersion, &schema, &sma);
76,374✔
3102
    if (TSDB_CODE_SUCCESS != code) {
76,374✔
3103
      STableMetaRsp metaRsp = {0};
716✔
3104
      metaRsp.numOfColumns = -1;
716✔
3105
      metaRsp.suid = pStbVersion->suid;
716✔
3106
      tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName));
716✔
3107
      tstrncpy(metaRsp.tbName, pStbVersion->stbName, sizeof(metaRsp.tbName));
716✔
3108
      tstrncpy(metaRsp.stbName, pStbVersion->stbName, sizeof(metaRsp.stbName));
716✔
3109
      if (taosArrayPush(hbRsp.pMetaRsp, &metaRsp) == NULL) {
1,432!
3110
        code = terrno;
×
3111
        return code;
×
3112
      }
3113
      continue;
716✔
3114
    }
3115

3116
    if (schema) {
75,658✔
3117
      STableMetaRsp metaRsp = {0};
367✔
3118
      mInfo("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
367!
3119
      if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp) != 0) {
367!
3120
        metaRsp.numOfColumns = -1;
×
3121
        metaRsp.suid = pStbVersion->suid;
×
3122
        tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName));
×
3123
        tstrncpy(metaRsp.tbName, pStbVersion->stbName, sizeof(metaRsp.tbName));
×
3124
        tstrncpy(metaRsp.stbName, pStbVersion->stbName, sizeof(metaRsp.stbName));
×
3125
        if (taosArrayPush(hbRsp.pMetaRsp, &metaRsp) == NULL) {
×
3126
          code = terrno;
×
3127
          return code;
×
3128
        }
3129
        continue;
×
3130
      }
3131

3132
      if (taosArrayPush(hbRsp.pMetaRsp, &metaRsp) == NULL) {
734!
3133
        code = terrno;
×
3134
        return code;
×
3135
      }
3136
    }
3137

3138
    if (sma) {
75,658!
3139
      bool           exist = false;
×
3140
      char           tbFName[TSDB_TABLE_FNAME_LEN];
3141
      STableIndexRsp indexRsp = {0};
×
3142
      indexRsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
×
3143
      if (NULL == indexRsp.pIndex) {
×
3144
        code = terrno;
×
3145
        TAOS_RETURN(code);
×
3146
      }
3147

3148
      sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName);
×
3149
      int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist);
×
3150
      if (code || !exist) {
×
3151
        indexRsp.suid = pStbVersion->suid;
×
3152
        indexRsp.version = -1;
×
3153
        indexRsp.pIndex = NULL;
×
3154
      }
3155

3156
      strcpy(indexRsp.dbFName, pStbVersion->dbFName);
×
3157
      strcpy(indexRsp.tbName, pStbVersion->stbName);
×
3158

3159
      if (taosArrayPush(hbRsp.pIndexRsp, &indexRsp) == NULL) {
×
3160
        code = terrno;
×
3161
        return code;
×
3162
      }
3163
    }
3164
  }
3165

3166
  int32_t rspLen = tSerializeSSTbHbRsp(NULL, 0, &hbRsp);
62,878✔
3167
  if (rspLen < 0) {
62,878!
3168
    tFreeSSTbHbRsp(&hbRsp);
×
3169
    code = TSDB_CODE_INVALID_MSG;
×
3170
    TAOS_RETURN(code);
×
3171
  }
3172

3173
  void *pRsp = taosMemoryMalloc(rspLen);
62,878✔
3174
  if (pRsp == NULL) {
62,878!
3175
    tFreeSSTbHbRsp(&hbRsp);
×
3176
    code = terrno;
×
3177
    TAOS_RETURN(code);
×
3178
  }
3179

3180
  rspLen = tSerializeSSTbHbRsp(pRsp, rspLen, &hbRsp);
62,878✔
3181
  tFreeSSTbHbRsp(&hbRsp);
62,878✔
3182
  if (rspLen < 0) return rspLen;
62,878!
3183
  *ppRsp = pRsp;
62,878✔
3184
  *pRspLen = rspLen;
62,878✔
3185
  TAOS_RETURN(code);
62,878✔
3186
}
3187

3188
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
8,473✔
3189
  int32_t code = 0;
8,473✔
3190
  SSdb   *pSdb = pMnode->pSdb;
8,473✔
3191
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
8,473✔
3192
  if (pDb == NULL) {
8,473!
3193
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
3194
    TAOS_RETURN(code);
×
3195
  }
3196

3197
  int32_t numOfStbs = 0;
8,473✔
3198
  void   *pIter = NULL;
8,473✔
3199
  while (1) {
179,156✔
3200
    SStbObj *pStb = NULL;
187,629✔
3201
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
187,629✔
3202
    if (pIter == NULL) break;
187,629✔
3203

3204
    if (pStb->dbUid == pDb->uid) {
179,156✔
3205
      numOfStbs++;
84,868✔
3206
    }
3207

3208
    sdbRelease(pSdb, pStb);
179,156✔
3209
  }
3210

3211
  *pNumOfStbs = numOfStbs;
8,473✔
3212
  mndReleaseDb(pMnode, pDb);
8,473✔
3213
  TAOS_RETURN(code);
8,473✔
3214
}
3215

3216
int32_t mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst) {
×
3217
  SName name = {0};
×
3218
  TAOS_CHECK_RETURN(tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
×
3219

3220
  TAOS_CHECK_RETURN(tNameGetFullDbName(&name, dst));
×
3221

3222
  return 0;
×
3223
}
3224

3225
int32_t mndExtractShortDbNameFromStbFullName(const char *stbFullName, char *dst) {
2✔
3226
  SName name = {0};
2✔
3227
  TAOS_CHECK_RETURN(tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
2!
3228

3229
  TAOS_CHECK_RETURN(tNameGetDbName(&name, dst));
2!
3230

3231
  return 0;
2✔
3232
}
3233

3234
int32_t mndExtractShortDbNameFromDbFullName(const char *stbFullName, char *dst) {
30✔
3235
  SName name = {0};
30✔
3236
  TAOS_CHECK_RETURN(tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB));
30!
3237

3238
  TAOS_CHECK_RETURN(tNameGetDbName(&name, dst));
30!
3239

3240
  return 0;
30✔
3241
}
3242

3243
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) {
1,457,996✔
3244
  int32_t pos = -1;
1,457,996✔
3245
  int32_t num = 0;
1,457,996✔
3246
  for (pos = 0; stbFullName[pos] != 0; ++pos) {
25,478,798✔
3247
    if (stbFullName[pos] == TS_PATH_DELIMITER[0]) num++;
25,478,762✔
3248
    if (num == 2) break;
25,478,762✔
3249
  }
3250

3251
  if (num == 2) {
1,457,996✔
3252
    tstrncpy(dst, stbFullName + pos + 1, dstSize);
1,457,966✔
3253
  }
3254
}
1,457,996✔
3255

3256
// static int32_t mndProcessRetrieveStbReq(SRpcMsg *pReq) {
3257
//   SMnode    *pMnode = pReq->info.node;
3258
//   SShowMgmt *pMgmt = &pMnode->showMgmt;
3259
//   SShowObj  *pShow = NULL;
3260
//   int32_t    rowsToRead = SHOW_STEP_SIZE;
3261
//   int32_t    rowsRead = 0;
3262
//
3263
//   SRetrieveTableReq retrieveReq = {0};
3264
//   if (tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
3265
//     terrno = TSDB_CODE_INVALID_MSG;
3266
//     return -1;
3267
//   }
3268
//
3269
//   SMnode    *pMnode = pReq->info.node;
3270
//   SSdb      *pSdb = pMnode->pSdb;
3271
//   int32_t    numOfRows = 0;
3272
//   SDbObj    *pDb = NULL;
3273
//   ESdbStatus objStatus = 0;
3274
//
3275
//   SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
3276
//   if (pUser == NULL) return 0;
3277
//   bool sysinfo = pUser->sysInfo;
3278
//
3279
//   // Append the information_schema database into the result.
3280
////  if (!pShow->sysDbRsp) {
3281
////    SDbObj infoschemaDb = {0};
3282
////    setInformationSchemaDbCfg(pMnode, &infoschemaDb);
3283
////    size_t numOfTables = 0;
3284
////    getVisibleInfosTablesNum(sysinfo, &numOfTables);
3285
////    mndDumpDbInfoData(pMnode, pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
3286
////
3287
////    numOfRows += 1;
3288
////
3289
////    SDbObj perfschemaDb = {0};
3290
////    setPerfSchemaDbCfg(pMnode, &perfschemaDb);
3291
////    numOfTables = 0;
3292
////    getPerfDbMeta(NULL, &numOfTables);
3293
////    mndDumpDbInfoData(pMnode, pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
3294
////
3295
////    numOfRows += 1;
3296
////    pShow->sysDbRsp = true;
3297
////  }
3298
//
3299
//  SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS);
3300
//  blockDataEnsureCapacity(p, rowsToRead);
3301
//
3302
//  size_t               size = 0;
3303
//  const SSysTableMeta* pSysDbTableMeta = NULL;
3304
//
3305
//  getInfosDbMeta(&pSysDbTableMeta, &size);
3306
//  p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);
3307
//
3308
//  getPerfDbMeta(&pSysDbTableMeta, &size);
3309
//  p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
3310
//
3311
//  blockDataDestroy(p);
3312
//
3313
//
3314
//  while (numOfRows < rowsToRead) {
3315
//    pShow->pIter = sdbFetchAll(pSdb, SDB_DB, pShow->pIter, (void **)&pDb, &objStatus, true);
3316
//    if (pShow->pIter == NULL) break;
3317
//    if (strncmp(retrieveReq.db, pDb->name, strlen(retrieveReq.db)) != 0){
3318
//      continue;
3319
//    }
3320
//    if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) {
3321
//      continue;
3322
//    }
3323
//
3324
//    while (numOfRows < rowsToRead) {
3325
//      pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
3326
//      if (pShow->pIter == NULL) break;
3327
//
3328
//      if (pDb != NULL && pStb->dbUid != pDb->uid) {
3329
//        sdbRelease(pSdb, pStb);
3330
//        continue;
3331
//      }
3332
//
3333
//      cols = 0;
3334
//
3335
//      SName name = {0};
3336
//      char  stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
3337
//      mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
3338
//      varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
3339
//
3340
//      SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3341
//      colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
3342
//
3343
//      char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
3344
//      tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
3345
//      tNameGetDbName(&name, varDataVal(db));
3346
//      varDataSetLen(db, strlen(varDataVal(db)));
3347
//
3348
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3349
//      colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
3350
//
3351
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3352
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
3353
//
3354
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3355
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false);
3356
//
3357
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3358
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false);
3359
//
3360
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3361
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->updateTime, false);  // number of tables
3362
//
3363
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3364
//      if (pStb->commentLen > 0) {
3365
//        char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
3366
//        STR_TO_VARSTR(comment, pStb->comment);
3367
//        colDataSetVal(pColInfo, numOfRows, comment, false);
3368
//      } else if (pStb->commentLen == 0) {
3369
//        char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
3370
//        STR_TO_VARSTR(comment, "");
3371
//        colDataSetVal(pColInfo, numOfRows, comment, false);
3372
//      } else {
3373
//        colDataSetNULL(pColInfo, numOfRows);
3374
//      }
3375
//
3376
//      char watermark[64 + VARSTR_HEADER_SIZE] = {0};
3377
//      sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]);
3378
//      varDataSetLen(watermark, strlen(varDataVal(watermark)));
3379
//
3380
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3381
//      colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false);
3382
//
3383
//      char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
3384
//      sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
3385
//      varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
3386
//
3387
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3388
//      colDataSetVal(pColInfo, numOfRows, (const char *)maxDelay, false);
3389
//
3390
//      char    rollup[160 + VARSTR_HEADER_SIZE] = {0};
3391
//      int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
3392
//      char   *sep = ", ";
3393
//      int32_t sepLen = strlen(sep);
3394
//      int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2;
3395
//      for (int32_t i = 0; i < rollupNum; ++i) {
3396
//        char *funcName = taosArrayGet(pStb->pFuncs, i);
3397
//        if (i) {
3398
//          strncat(varDataVal(rollup), sep, rollupLen);
3399
//          rollupLen -= sepLen;
3400
//        }
3401
//        strncat(varDataVal(rollup), funcName, rollupLen);
3402
//        rollupLen -= strlen(funcName);
3403
//      }
3404
//      varDataSetLen(rollup, strlen(varDataVal(rollup)));
3405
//
3406
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3407
//      colDataSetVal(pColInfo, numOfRows, (const char *)rollup, false);
3408
//
3409
//      numOfRows++;
3410
//      sdbRelease(pSdb, pStb);
3411
//    }
3412
//
3413
//    if (pDb != NULL) {
3414
//      mndReleaseDb(pMnode, pDb);
3415
//    }
3416
//
3417
//    sdbRelease(pSdb, pDb);
3418
//  }
3419
//
3420
//  pShow->numOfRows += numOfRows;
3421
//  mndReleaseUser(pMnode, pUser);
3422
//
3423
//
3424
//
3425
//
3426
//
3427
//
3428
//
3429
//
3430
//  ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
3431
//  if (retrieveFp == NULL) {
3432
//    mndReleaseShowObj(pShow, false);
3433
//    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
3434
//    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
3435
//    return -1;
3436
//  }
3437
//
3438
//  mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
3439
//  if (retrieveReq.user[0] != 0) {
3440
//    memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN);
3441
//  } else {
3442
//    memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
3443
//  }
3444
//  if (retrieveReq.db[0] && mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
3445
//    return -1;
3446
//  }
3447
//
3448
//  int32_t numOfCols = pShow->pMeta->numOfColumns;
3449
//
3450
//  SSDataBlock *pBlock = createDataBlock();
3451
//  for (int32_t i = 0; i < numOfCols; ++i) {
3452
//    SColumnInfoData idata = {0};
3453
//
3454
//    SSchema *p = &pShow->pMeta->pSchemas[i];
3455
//
3456
//    idata.info.bytes = p->bytes;
3457
//    idata.info.type = p->type;
3458
//    idata.info.colId = p->colId;
3459
//    blockDataAppendColInfo(pBlock, &idata);
3460
//  }
3461
//
3462
//  blockDataEnsureCapacity(pBlock, rowsToRead);
3463
//
3464
//  if (mndCheckRetrieveFinished(pShow)) {
3465
//    mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
3466
//    rowsRead = 0;
3467
//  } else {
3468
//    rowsRead = (*retrieveFp)(pReq, pShow, pBlock, rowsToRead);
3469
//    if (rowsRead < 0) {
3470
//      terrno = rowsRead;
3471
//      mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
3472
//      mndReleaseShowObj(pShow, true);
3473
//      blockDataDestroy(pBlock);
3474
//      return -1;
3475
//    }
3476
//
3477
//    pBlock->info.rows = rowsRead;
3478
//    mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
3479
//  }
3480
//
3481
//  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
3482
//         blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock));
3483
//
3484
//  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
3485
//  if (pRsp == NULL) {
3486
//    mndReleaseShowObj(pShow, false);
3487
//    terrno = TSDB_CODE_OUT_OF_MEMORY;
3488
//    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
3489
//    blockDataDestroy(pBlock);
3490
//    return -1;
3491
//  }
3492
//
3493
//  pRsp->handle = htobe64(pShow->id);
3494
//
3495
//  if (rowsRead > 0) {
3496
//    char    *pStart = pRsp->data;
3497
//    SSchema *ps = pShow->pMeta->pSchemas;
3498
//
3499
//    *(int32_t *)pStart = htonl(pShow->pMeta->numOfColumns);
3500
//    pStart += sizeof(int32_t);  // number of columns
3501
//
3502
//    for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
3503
//      SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
3504
//      pSchema->bytes = htonl(ps[i].bytes);
3505
//      pSchema->colId = htons(ps[i].colId);
3506
//      pSchema->type = ps[i].type;
3507
//
3508
//      pStart += sizeof(SSysTableSchema);
3509
//    }
3510
//
3511
//    int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
3512
//  }
3513
//
3514
//  pRsp->numOfRows = htonl(rowsRead);
3515
//  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
3516
//  pReq->info.rsp = pRsp;
3517
//  pReq->info.rspLen = size;
3518
//
3519
//  if (rowsRead == 0 || rowsRead < rowsToRead) {
3520
//    pRsp->completed = 1;
3521
//    mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
3522
//    mndReleaseShowObj(pShow, true);
3523
//  } else {
3524
//    mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
3525
//    mndReleaseShowObj(pShow, false);
3526
//  }
3527
//
3528
//  blockDataDestroy(pBlock);
3529
//  return TSDB_CODE_SUCCESS;
3530
//}
3531

3532
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
11,364✔
3533
  SMnode  *pMnode = pReq->info.node;
11,364✔
3534
  SSdb    *pSdb = pMnode->pSdb;
11,364✔
3535
  int32_t  numOfRows = 0;
11,364✔
3536
  SStbObj *pStb = NULL;
11,364✔
3537
  int32_t  cols = 0;
11,364✔
3538
  int32_t  lino = 0;
11,364✔
3539
  int32_t  code = 0;
11,364✔
3540

3541
  SDbObj *pDb = NULL;
11,364✔
3542
  if (strlen(pShow->db) > 0) {
11,364✔
3543
    pDb = mndAcquireDb(pMnode, pShow->db);
398✔
3544
    if (pDb == NULL) return terrno;
398!
3545
  }
3546

3547
  while (numOfRows < rows) {
802,589✔
3548
    pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
799,605✔
3549
    if (pShow->pIter == NULL) break;
799,239✔
3550

3551
    if (pDb != NULL && pStb->dbUid != pDb->uid) {
790,823✔
3552
      sdbRelease(pSdb, pStb);
782✔
3553
      continue;
794✔
3554
    }
3555

3556
    if (isTsmaResSTb(pStb->name)) {
790,041✔
3557
      sdbRelease(pSdb, pStb);
12✔
3558
      continue;
12✔
3559
    }
3560

3561
    cols = 0;
789,789✔
3562

3563
    SName name = {0};
789,789✔
3564

3565
    char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
789,789✔
3566
    mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
789,789✔
3567
    varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
790,114✔
3568
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
790,114✔
3569
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false), pStb, &lino, _ERROR);
788,749!
3570

3571
    char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
786,154✔
3572
    RETRIEVE_CHECK_GOTO(tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB), pStb, &lino, _ERROR);
786,154!
3573
    RETRIEVE_CHECK_GOTO(tNameGetDbName(&name, varDataVal(db)), pStb, &lino, _ERROR);
788,141!
3574
    varDataSetLen(db, strlen(varDataVal(db)));
788,633✔
3575
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
788,633✔
3576
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)db, false), pStb, &lino, _ERROR);
787,759!
3577

3578
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
783,717✔
3579
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->createdTime, false), pStb, &lino,
782,396!
3580
                        _ERROR);
3581

3582
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
780,512✔
3583
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false), pStb, &lino,
779,238!
3584
                        _ERROR);
3585

3586
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
779,051✔
3587
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false), pStb, &lino, _ERROR);
778,194!
3588

3589
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
778,272✔
3590
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->updateTime, false), pStb, &lino,
777,015!
3591
                        _ERROR);  // number of tables
3592

3593
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
778,035✔
3594
    if (pStb->commentLen > 0) {
776,876!
3595
      char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
×
3596
      STR_TO_VARSTR(comment, pStb->comment);
×
3597
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, comment, false), pStb, &lino, _ERROR);
×
3598
    } else if (pStb->commentLen == 0) {
777,970✔
3599
      char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
23,797✔
3600
      STR_TO_VARSTR(comment, "");
23,797✔
3601
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, comment, false), pStb, &lino, _ERROR);
23,797!
3602
    } else {
3603
      colDataSetNULL(pColInfo, numOfRows);
754,173!
3604
    }
3605

3606
    char watermark[64 + VARSTR_HEADER_SIZE] = {0};
778,051✔
3607
    sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]);
778,051✔
3608
    varDataSetLen(watermark, strlen(varDataVal(watermark)));
778,051✔
3609

3610
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
778,051✔
3611
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false), pStb, &lino, _ERROR);
786,785!
3612

3613
    char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
782,660✔
3614
    sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
782,660✔
3615
    varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
782,660✔
3616

3617
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
782,660✔
3618
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)maxDelay, false), pStb, &lino, _ERROR);
787,123!
3619

3620
    char    rollup[160 + VARSTR_HEADER_SIZE] = {0};
782,758✔
3621
    int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
782,758✔
3622
    char   *sep = ", ";
785,245✔
3623
    int32_t sepLen = strlen(sep);
785,245✔
3624
    int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2;
785,245✔
3625
    for (int32_t i = 0; i < rollupNum; ++i) {
785,248✔
3626
      char *funcName = taosArrayGet(pStb->pFuncs, i);
3✔
3627
      if (i) {
3!
3628
        (void)strncat(varDataVal(rollup), sep, rollupLen);
×
3629
        rollupLen -= sepLen;
×
3630
      }
3631
      (void)strncat(varDataVal(rollup), funcName, rollupLen);
3✔
3632
      rollupLen -= strlen(funcName);
3✔
3633
    }
3634
    varDataSetLen(rollup, strlen(varDataVal(rollup)));
785,245✔
3635

3636
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
785,245✔
3637
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)rollup, false), pStb, &lino, _ERROR);
785,458!
3638

3639
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
781,422✔
3640
    if (pColInfo) {
779,470!
3641
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)(&pStb->uid), false), pStb, &lino, _ERROR);
779,494!
3642
    }
3643

3644
    numOfRows++;
777,955✔
3645
    sdbRelease(pSdb, pStb);
777,955✔
3646
  }
3647

3648
  if (pDb != NULL) {
11,400✔
3649
    mndReleaseDb(pMnode, pDb);
395✔
3650
  }
3651

3652
  goto _OVER;
11,372✔
3653

3654
_ERROR:
×
3655
  mError("show:0x%" PRIx64 ", failed to retrieve data at %s:%d since %s", pShow->id, __FUNCTION__, lino,
×
3656
         tstrerror(code));
3657

3658
_OVER:
×
3659
  pShow->numOfRows += numOfRows;
11,372✔
3660
  return numOfRows;
11,372✔
3661
}
3662

3663
static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *pSysDbTableMeta, size_t size,
26,628✔
3664
                                    const char *dbName, const char *tbName) {
3665
  char    tName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
26,628✔
3666
  char    dName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
26,628✔
3667
  char    typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
26,628✔
3668
  int32_t numOfRows = p->info.rows;
26,628✔
3669
  int32_t lino = 0;
26,628✔
3670
  int32_t code = 0;
26,628✔
3671

3672
  STR_TO_VARSTR(dName, dbName);
26,628✔
3673
  STR_TO_VARSTR(typeName, "SYSTEM_TABLE");
26,628✔
3674

3675
  for (int32_t i = 0; i < size; ++i) {
541,291!
3676
    const SSysTableMeta *pm = &pSysDbTableMeta[i];
545,830✔
3677
    //    if (pm->sysInfo) {
3678
    //      continue;
3679
    //    }
3680
    if (tbName[0] && strncmp(tbName, pm->name, TSDB_TABLE_NAME_LEN) != 0) {
545,830!
3681
      continue;
×
3682
    }
3683

3684
    STR_TO_VARSTR(tName, pm->name);
545,830✔
3685

3686
    for (int32_t j = 0; j < pm->colNum; j++) {
5,137,911✔
3687
      // table name
3688
      SColumnInfoData *pColInfoData = taosArrayGet(p->pDataBlock, 0);
4,623,248✔
3689
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, tName, false), &lino, _OVER);
4,621,751!
3690

3691
      // database name
3692
      pColInfoData = taosArrayGet(p->pDataBlock, 1);
4,589,735✔
3693
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, dName, false), &lino, _OVER);
4,588,188!
3694

3695
      pColInfoData = taosArrayGet(p->pDataBlock, 2);
4,585,675✔
3696
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, typeName, false), &lino, _OVER);
4,584,642!
3697

3698
      // col name
3699
      char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
4,588,654✔
3700
      STR_TO_VARSTR(colName, pm->schema[j].name);
4,588,654✔
3701
      pColInfoData = taosArrayGet(p->pDataBlock, 3);
4,588,654✔
3702
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, colName, false), &lino, _OVER);
4,619,154!
3703

3704
      // col type
3705
      int8_t colType = pm->schema[j].type;
4,594,404✔
3706
      pColInfoData = taosArrayGet(p->pDataBlock, 4);
4,594,404✔
3707
      char colTypeStr[VARSTR_HEADER_SIZE + 32];
3708
      int  colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
4,593,595✔
3709
      if (colType == TSDB_DATA_TYPE_VARCHAR) {
4,593,595✔
3710
        colTypeLen +=
2,489,452✔
3711
            sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)", (int32_t)(pm->schema[j].bytes - VARSTR_HEADER_SIZE));
2,489,452✔
3712
      } else if (colType == TSDB_DATA_TYPE_NCHAR) {
2,104,143!
3713
        colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
×
3714
                              (int32_t)((pm->schema[j].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
×
3715
      }
3716
      varDataSetLen(colTypeStr, colTypeLen);
4,593,595✔
3717
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, (char *)colTypeStr, false), &lino, _OVER);
4,593,595!
3718

3719
      pColInfoData = taosArrayGet(p->pDataBlock, 5);
4,596,813✔
3720
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, (const char *)&pm->schema[j].bytes, false), &lino, _OVER);
4,595,731!
3721
      for (int32_t k = 6; k <= 8; ++k) {
18,339,396✔
3722
        pColInfoData = taosArrayGet(p->pDataBlock, k);
13,747,315✔
3723
        colDataSetNULL(pColInfoData, numOfRows);
13,746,157!
3724
      }
3725

3726
      numOfRows += 1;
4,592,081✔
3727
    }
3728
  }
3729
_OVER:
×
3730
  mError("failed at %s:%d since %s", __FUNCTION__, lino, tstrerror(code));
×
3731
  return numOfRows;
26,630✔
3732
}
3733
#define BUILD_COL_FOR_INFO_DB 1
3734
#define BUILD_COL_FOR_PERF_DB 1 << 1
3735
#define BUILD_COL_FOR_USER_DB 1 << 2
3736
#define BUILD_COL_FOR_ALL_DB  (BUILD_COL_FOR_INFO_DB | BUILD_COL_FOR_PERF_DB | BUILD_COL_FOR_USER_DB)
3737

3738
static int32_t buildSysDbColsInfo(SSDataBlock *p, int8_t buildWhichDBs, char *tb) {
13,314✔
3739
  size_t               size = 0;
13,314✔
3740
  const SSysTableMeta *pSysDbTableMeta = NULL;
13,314✔
3741

3742
  if (buildWhichDBs & BUILD_COL_FOR_INFO_DB) {
13,314!
3743
    getInfosDbMeta(&pSysDbTableMeta, &size);
13,314✔
3744
    p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB, tb);
13,314✔
3745
  }
3746

3747
  if (buildWhichDBs & BUILD_COL_FOR_PERF_DB) {
13,315!
3748
    getPerfDbMeta(&pSysDbTableMeta, &size);
13,315✔
3749
    p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB, tb);
13,315✔
3750
  }
3751

3752
  return p->info.rows;
13,315✔
3753
}
3754

3755
static int8_t determineBuildColForWhichDBs(const char *db) {
13,314✔
3756
  int8_t buildWhichDBs;
3757
  if (!db[0])
13,314!
3758
    buildWhichDBs = BUILD_COL_FOR_ALL_DB;
13,315✔
3759
  else {
3760
    char *p = strchr(db, '.');
×
3761
    if (p && strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB) == 0) {
×
3762
      buildWhichDBs = BUILD_COL_FOR_INFO_DB;
×
3763
    } else if (p && strcmp(p + 1, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
×
3764
      buildWhichDBs = BUILD_COL_FOR_PERF_DB;
×
3765
    } else {
3766
      buildWhichDBs = BUILD_COL_FOR_USER_DB;
×
3767
    }
3768
  }
3769
  return buildWhichDBs;
13,314✔
3770
}
3771

3772
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
13,314✔
3773
  uint8_t  buildWhichDBs;
3774
  SMnode  *pMnode = pReq->info.node;
13,314✔
3775
  SSdb    *pSdb = pMnode->pSdb;
13,314✔
3776
  SStbObj *pStb = NULL;
13,314✔
3777
  int32_t  numOfRows = 0;
13,314✔
3778
  int32_t  lino = 0;
13,314✔
3779
  int32_t  code = 0;
13,314✔
3780

3781
  buildWhichDBs = determineBuildColForWhichDBs(pShow->db);
13,314✔
3782

3783
  if (!pShow->sysDbRsp) {
13,315!
3784
    numOfRows = buildSysDbColsInfo(pBlock, buildWhichDBs, pShow->filterTb);
13,315✔
3785
    mDebug("mndRetrieveStbCol get system table cols, rows:%d, db:%s", numOfRows, pShow->db);
13,315✔
3786
    pShow->sysDbRsp = true;
13,315✔
3787
  }
3788

3789
  if (buildWhichDBs & BUILD_COL_FOR_USER_DB) {
13,315!
3790
    SDbObj *pDb = NULL;
13,315✔
3791
    if (strlen(pShow->db) > 0) {
13,315!
3792
      pDb = mndAcquireDb(pMnode, pShow->db);
×
3793
      if (pDb == NULL && TSDB_CODE_MND_DB_NOT_EXIST != terrno && pBlock->info.rows == 0) return terrno;
×
3794
    }
3795

3796
    char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
13,315✔
3797
    STR_TO_VARSTR(typeName, "SUPER_TABLE");
13,315✔
3798
    bool fetch = pShow->restore ? false : true;
13,315✔
3799
    pShow->restore = false;
13,315✔
3800
    while (numOfRows < rows) {
680,943✔
3801
      if (fetch) {
680,941!
3802
        pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
680,941✔
3803
        if (pShow->pIter == NULL) break;
680,915✔
3804
      } else {
3805
        fetch = true;
×
3806
        void *pKey = taosHashGetKey(pShow->pIter, NULL);
×
3807
        pStb = sdbAcquire(pSdb, SDB_STB, pKey);
×
3808
        if (!pStb) continue;
×
3809
      }
3810

3811
      if (pDb != NULL && pStb->dbUid != pDb->uid) {
667,600!
3812
        sdbRelease(pSdb, pStb);
×
3813
        continue;
×
3814
      }
3815

3816
      SName name = {0};
667,600✔
3817
      char  stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
667,600✔
3818
      mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
667,600✔
3819
      if (pShow->filterTb[0] && strncmp(pShow->filterTb, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN) != 0) {
667,572!
3820
        sdbRelease(pSdb, pStb);
×
3821
        continue;
×
3822
      }
3823

3824
      if ((numOfRows + pStb->numOfColumns) > rows) {
667,572!
3825
        pShow->restore = true;
×
3826
        if (numOfRows == 0) {
×
3827
          mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
×
3828
                 rows, pStb->numOfColumns, pStb->name, pStb->db);
3829
        }
3830
        sdbRelease(pSdb, pStb);
×
3831
        break;
×
3832
      }
3833

3834
      varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
667,572✔
3835

3836
      mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
667,572✔
3837

3838
      char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
667,572✔
3839
      RETRIEVE_CHECK_GOTO(tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB), pStb, &lino, _OVER);
667,572!
3840
      RETRIEVE_CHECK_GOTO(tNameGetDbName(&name, varDataVal(db)), pStb, &lino, _OVER);
667,477!
3841
      varDataSetLen(db, strlen(varDataVal(db)));
667,544✔
3842

3843
      for (int i = 0; i < pStb->numOfColumns; i++) {
15,396,672✔
3844
        int32_t          cols = 0;
15,173,952✔
3845
        SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,173,952✔
3846
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false), pStb, &lino, _OVER);
15,205,642!
3847

3848
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,913,292✔
3849
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)db, false), pStb, &lino, _OVER);
14,807,224!
3850

3851
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,741,119✔
3852
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, typeName, false), pStb, &lino, _OVER);
14,675,803!
3853

3854
        // col name
3855
        char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
14,727,730✔
3856
        STR_TO_VARSTR(colName, pStb->pColumns[i].name);
14,727,730✔
3857
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,727,730✔
3858
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, colName, false), pStb, &lino, _OVER);
15,116,459!
3859

3860
        // col type
3861
        int8_t colType = pStb->pColumns[i].type;
14,922,731✔
3862
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,922,731✔
3863
        char colTypeStr[VARSTR_HEADER_SIZE + 32];
3864
        int  colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
14,840,946✔
3865
        if (colType == TSDB_DATA_TYPE_VARCHAR) {
14,840,946✔
3866
          colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
1,236,458✔
3867
                                (int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
1,236,458✔
3868
        } else if (colType == TSDB_DATA_TYPE_NCHAR) {
13,604,488✔
3869
          colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
1,249,435✔
3870
                                (int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
1,249,435✔
3871
        }
3872
        varDataSetLen(colTypeStr, colTypeLen);
14,840,946✔
3873
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false), pStb, &lino, _OVER);
14,840,946!
3874

3875
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,087,150✔
3876
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false), pStb,
14,966,912!
3877
                            &lino, _OVER);
3878
        while (cols < pShow->numOfColumns) {
58,209,387✔
3879
          pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
43,480,259✔
3880
          colDataSetNULL(pColInfo, numOfRows);
43,305,135✔
3881
        }
3882
        numOfRows++;
14,729,128✔
3883
      }
3884

3885
      sdbRelease(pSdb, pStb);
222,720✔
3886
    }
3887

3888
    if (pDb != NULL) {
13,317!
3889
      mndReleaseDb(pMnode, pDb);
×
3890
    }
3891
  }
3892

3893
  mDebug("mndRetrieveStbCol success, rows:%d, pShow->numOfRows:%d", numOfRows, pShow->numOfRows);
13,315✔
3894
  goto _OVER;
13,315✔
3895

3896
_ERROR:
3897
  mError("failed to mndRetrieveStbCol, rows:%d, pShow->numOfRows:%d, at %s:%d since %s", numOfRows, pShow->numOfRows,
3898
         __FUNCTION__, lino, tstrerror(code));
3899

3900
_OVER:
13,315✔
3901
  pShow->numOfRows += numOfRows;
13,315✔
3902
  return numOfRows;
13,315✔
3903
}
3904

3905
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) {
×
3906
  SSdb *pSdb = pMnode->pSdb;
×
3907
  sdbCancelFetchByType(pSdb, pIter, SDB_STB);
×
3908
}
×
3909

3910
const char *mndGetStbStr(const char *src) {
33,074✔
3911
  char *posDb = strstr(src, TS_PATH_DELIMITER);
33,074✔
3912
  if (posDb != NULL) ++posDb;
33,074!
3913
  if (posDb == NULL) return src;
33,074!
3914

3915
  char *posStb = strstr(posDb, TS_PATH_DELIMITER);
33,074✔
3916
  if (posStb != NULL) ++posStb;
33,074!
3917
  if (posStb == NULL) return posDb;
33,074!
3918
  return posStb;
33,074✔
3919
}
3920

3921
static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
×
3922
  // impl
3923
  return TSDB_CODE_SUCCESS;
×
3924
}
3925

3926
/*int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void *sql,
3927
                        int32_t len) {
3928
  // impl later
3929
  int32_t code = 0;
3930
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-stb-index");
3931
  if (pTrans == NULL) goto _OVER;
3932

3933
  mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
3934
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
3935
  if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
3936

3937
  if (mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
3938
  if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
3939
  if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, sql, len) != 0) goto _OVER;
3940
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
3941

3942
  return code;
3943

3944
_OVER:
3945
  mndTransDrop(pTrans);
3946
  return code;
3947
}
3948
static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *tagIdxReq, SDbObj *pDb, SStbObj *pOld) {
3949
  bool    needRsp = true;
3950
  int32_t code = -1;
3951
  SField *pField0 = NULL;
3952

3953
  SStbObj  stbObj = {0};
3954
  SStbObj *pNew = &stbObj;
3955

3956
  taosRLockLatch(&pOld->lock);
3957
  memcpy(&stbObj, pOld, sizeof(SStbObj));
3958
  taosRUnLockLatch(&pOld->lock);
3959

3960
  stbObj.pColumns = NULL;
3961
  stbObj.pTags = NULL;
3962
  stbObj.updateTime = taosGetTimestampMs();
3963
  stbObj.lock = 0;
3964

3965
  int32_t tag = mndFindSuperTableTagIndex(pOld, tagIdxReq->colName);
3966
  if (tag < 0) {
3967
    terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
3968
    return -1;
3969
  }
3970
  col_id_t colId = pOld->pTags[tag].colId;
3971
  if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) {
3972
    return -1;
3973
  }
3974
  if (mndAllocStbSchemas(pOld, pNew) != 0) {
3975
    return -1;
3976
  }
3977

3978
  SSchema *pTag = pNew->pTags + tag;
3979
  if (IS_IDX_ON(pTag)) {
3980
    terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
3981
    return -1;
3982
  } else {
3983
    pTag->flags |= COL_IDX_ON;
3984
  }
3985
  pNew->tagVer++;
3986

3987
  code = mndAddIndexImpl(pMnode, pReq, pDb, pNew, needRsp, pReq->pCont, pReq->contLen);
3988

3989
  return code;
3990
}
3991
static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq) {
3992
  SMnode            *pMnode = pReq->info.node;
3993
  int32_t            code = -1;
3994
  SDbObj            *pDb = NULL;
3995
  SStbObj           *pStb = NULL;
3996
  SCreateTagIndexReq tagIdxReq = {0};
3997

3998
  if (tDeserializeSCreateTagIdxReq(pReq->pCont, pReq->contLen, &tagIdxReq) != 0) {
3999
    terrno = TSDB_CODE_INVALID_MSG;
4000
    goto _OVER;
4001
  }
4002

4003
  mInfo("stb:%s, start to alter", tagIdxReq.stbName);
4004

4005
  if (mndCheckIndexReq(&tagIdxReq) != TSDB_CODE_SUCCESS) {
4006
    goto _OVER;
4007
  }
4008

4009
  pDb = mndAcquireDbByStb(pMnode, tagIdxReq.dbFName);
4010
  if (pDb == NULL) {
4011
    terrno = TSDB_CODE_MND_DB_NOT_EXIST;
4012
    goto _OVER;
4013
  }
4014

4015
  pStb = mndAcquireStb(pMnode, tagIdxReq.stbName);
4016
  if (pStb == NULL) {
4017
    terrno = TSDB_CODE_MND_STB_NOT_EXIST;
4018
    goto _OVER;
4019
  }
4020
  if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
4021
    goto _OVER;
4022
  }
4023

4024
  code = mndAddIndex(pMnode, pReq, &tagIdxReq, pDb, pStb);
4025
  if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST || terrno == TSDB_CODE_MND_TAG_NOT_EXIST) {
4026
    return terrno;
4027
  } else {
4028
    if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
4029
  }
4030
_OVER:
4031
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
4032
    mError("stb:%s, failed to create index since %s", tagIdxReq.stbName, terrstr());
4033
  }
4034
  mndReleaseStb(pMnode, pStb);
4035
  mndReleaseDb(pMnode, pDb);
4036
  return code;
4037
}
4038
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq) {
4039
  SMnode          *pMnode = pReq->info.node;
4040
  int32_t          code = -1;
4041
  SDbObj          *pDb = NULL;
4042
  SStbObj         *pStb = NULL;
4043
  SDropTagIndexReq dropReq = {0};
4044
  if (tDeserializeSDropTagIdxReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
4045
    terrno = TSDB_CODE_INVALID_MSG;
4046
    goto _OVER;
4047
  }
4048
  //
4049
  return TSDB_CODE_SUCCESS;
4050
_OVER:
4051
  return code;
4052
}*/
4053

4054
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) {
189✔
4055
  int32_t code = mndProcessDropStbReq(pReq);
189✔
4056
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
189!
4057
    pReq->info.rsp = rpcMallocCont(1);
×
4058
    pReq->info.rspLen = 1;
×
4059
    pReq->info.noResp = false;
×
4060
    pReq->code = code;
×
4061
  }
4062
  return code;
189✔
4063
}
4064

4065
typedef struct SVDropTbVgReqs {
4066
  SArray     *pBatchReqs;
4067
  SVgroupInfo info;
4068
} SVDropTbVgReqs;
4069

4070
typedef struct SMDropTbDbInfo {
4071
  SArray *dbVgInfos;
4072
  int32_t hashPrefix;
4073
  int32_t hashSuffix;
4074
  int32_t hashMethod;
4075
} SMDropTbDbInfo;
4076

4077
typedef struct SMDropTbTsmaInfo {
4078
  char           tsmaResTbDbFName[TSDB_DB_FNAME_LEN];
4079
  char           tsmaResTbNamePrefix[TSDB_TABLE_FNAME_LEN];
4080
  int32_t        suid;
4081
  SMDropTbDbInfo dbInfo;  // reference to DbInfo in pDbMap
4082
} SMDropTbTsmaInfo;
4083

4084
typedef struct SMDropTbTsmaInfos {
4085
  SArray *pTsmaInfos;  // SMDropTbTsmaInfo
4086
} SMDropTbTsmaInfos;
4087

4088
typedef struct SMndDropTbsWithTsmaCtx {
4089
  SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>
4090
} SMndDropTbsWithTsmaCtx;
4091

4092
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
4093
                                                 int32_t vgId);
4094

4095
static void destroySVDropTbBatchReqs(void *p);
4096
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
24✔
4097
  if (!p) return;
24!
4098

4099
  if (p->pVgMap) {
24!
4100
    void *pIter = taosHashIterate(p->pVgMap, NULL);
24✔
4101
    while (pIter) {
52✔
4102
      SVDropTbVgReqs *pReqs = pIter;
28✔
4103
      taosArrayDestroyEx(pReqs->pBatchReqs, destroySVDropTbBatchReqs);
28✔
4104
      pIter = taosHashIterate(p->pVgMap, pIter);
28✔
4105
    }
4106
    taosHashCleanup(p->pVgMap);
24✔
4107
  }
4108
  taosMemoryFree(p);
24✔
4109
}
4110

4111
static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx **ppCtx) {
24✔
4112
  int32_t                 code = 0;
24✔
4113
  SMndDropTbsWithTsmaCtx *pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx));
24✔
4114
  if (!pCtx) return terrno;
24!
4115

4116
  pCtx->pVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
24✔
4117
  if (!pCtx->pVgMap) {
24!
4118
    code = terrno;
×
4119
    goto _end;
×
4120
  }
4121

4122
  *ppCtx = pCtx;
24✔
4123
_end:
24✔
4124
  if (code) mndDestroyDropTbsWithTsmaCtx(pCtx);
24!
4125
  return code;
24✔
4126
}
4127

4128
static void *mndBuildVDropTbsReq(SMnode *pMnode, const SVgroupInfo *pVgInfo, const SVDropTbBatchReq *pReq,
28✔
4129
                                 int32_t *len) {
4130
  int32_t   contLen = 0;
28✔
4131
  int32_t   ret = 0;
28✔
4132
  SMsgHead *pHead = NULL;
28✔
4133
  SEncoder  encoder = {0};
28✔
4134

4135
  tEncodeSize(tEncodeSVDropTbBatchReq, pReq, contLen, ret);
28!
4136
  if (ret < 0) return NULL;
28!
4137

4138
  contLen += sizeof(SMsgHead);
28✔
4139
  pHead = taosMemoryMalloc(contLen);
28✔
4140
  if (pHead == NULL) {
28!
4141
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4142
    return NULL;
×
4143
  }
4144

4145
  pHead->contLen = htonl(contLen);
28✔
4146
  pHead->vgId = htonl(pVgInfo->vgId);
28✔
4147

4148
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
28✔
4149

4150
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
28✔
4151
  int32_t code = tEncodeSVDropTbBatchReq(&encoder, pReq);
28✔
4152
  tEncoderClear(&encoder);
28✔
4153
  if (code != 0) return NULL;
28!
4154

4155
  *len = contLen;
28✔
4156
  return pHead;
28✔
4157
}
4158

4159
static int32_t mndSetDropTbsRedoActions(SMnode *pMnode, STrans *pTrans, const SVDropTbVgReqs *pVgReqs, void *pCont,
28✔
4160
                                        int32_t contLen, tmsg_t msgType) {
4161
  STransAction action = {0};
28✔
4162
  action.epSet = pVgReqs->info.epSet;
28✔
4163
  action.pCont = pCont;
28✔
4164
  action.contLen = contLen;
28✔
4165
  action.msgType = msgType;
28✔
4166
  action.acceptableCode = TSDB_CODE_TDB_TABLE_NOT_EXIST;
28✔
4167
  return mndTransAppendRedoAction(pTrans, &action);
28✔
4168
}
4169

4170
static int32_t mndBuildDropTbRedoActions(SMnode *pMnode, STrans *pTrans, SHashObj *pVgMap, tmsg_t msgType) {
24✔
4171
  int32_t code = 0;
24✔
4172
  void   *pIter = taosHashIterate(pVgMap, NULL);
24✔
4173
  while (pIter) {
52✔
4174
    const SVDropTbVgReqs *pVgReqs = pIter;
28✔
4175
    int32_t               len = 0;
28✔
4176
    for (int32_t i = 0; i < taosArrayGetSize(pVgReqs->pBatchReqs) && code == TSDB_CODE_SUCCESS; ++i) {
56!
4177
      SVDropTbBatchReq *pBatchReq = taosArrayGet(pVgReqs->pBatchReqs, i);
28✔
4178
      void             *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, pBatchReq, &len);
28✔
4179
      if (!p) {
28!
4180
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4181
        if (terrno != 0) code = terrno;
×
4182
        break;
×
4183
      }
4184
      if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len, msgType)) != 0) {
28!
4185
        break;
×
4186
      }
4187
    }
4188
    if (TSDB_CODE_SUCCESS != code) {
28!
4189
      taosHashCancelIterate(pVgMap, pIter);
×
4190
      break;
×
4191
    }
4192
    pIter = taosHashIterate(pVgMap, pIter);
28✔
4193
  }
4194
  return code;
24✔
4195
}
4196

4197
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx *pCtx) {
24✔
4198
  int32_t code = 0;
24✔
4199
  SMnode *pMnode = pRsp->info.node;
24✔
4200
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs");
24✔
4201
  mndTransSetChangeless(pTrans);
24✔
4202
  mndTransSetSerial(pTrans);
24✔
4203
  if (pTrans == NULL) {
24!
4204
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4205
    if (terrno != 0) code = terrno;
×
4206
    goto _OVER;
×
4207
  }
4208

4209
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
24!
4210

4211
  if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER;
24!
4212
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
24!
4213

4214
_OVER:
24✔
4215
  mndTransDrop(pTrans);
24✔
4216
  TAOS_RETURN(code);
24✔
4217
}
4218

4219
static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
20✔
4220
  int32_t      code = -1;
20✔
4221
  SMnode      *pMnode = pReq->info.node;
20✔
4222
  SDbObj      *pDb = NULL;
20✔
4223
  SStbObj     *pStb = NULL;
20✔
4224
  SMDropTbsReq dropReq = {0};
20✔
4225
  bool         locked = false;
20✔
4226
  if (tDeserializeSMDropTbsReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
20!
4227
    code = TSDB_CODE_INVALID_MSG;
×
4228
    goto _OVER;
×
4229
  }
4230

4231
  SMndDropTbsWithTsmaCtx *pCtx = NULL;
20✔
4232
  code = mndInitDropTbsWithTsmaCtx(&pCtx);
20✔
4233
  if (code) goto _OVER;
20!
4234
  for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
44✔
4235
    SMDropTbReqsOnSingleVg *pReq = taosArrayGet(dropReq.pVgReqs, i);
24✔
4236
    code = mndDropTbForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
24✔
4237
    if (code) goto _OVER;
24!
4238
  }
4239
  code = mndCreateDropTbsTxnPrepare(pReq, pCtx);
20✔
4240
  if (code == 0) {
20!
4241
    code = TSDB_CODE_ACTION_IN_PROGRESS;
20✔
4242
  }
4243
_OVER:
×
4244
  tFreeSMDropTbsReq(&dropReq);
20✔
4245
  if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
20!
4246
  TAOS_RETURN(code);
20✔
4247
}
4248

4249
static int32_t createDropTbBatchReq(const SVDropTbReq *pReq, SVDropTbBatchReq *pBatchReq) {
28✔
4250
  pBatchReq->nReqs = 1;
28✔
4251
  pBatchReq->pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
28✔
4252
  if (!pBatchReq->pArray) return terrno;
28!
4253
  if (taosArrayPush(pBatchReq->pArray, pReq) == NULL) {
56!
4254
    taosArrayDestroy(pBatchReq->pArray);
×
4255
    pBatchReq->pArray = NULL;
×
4256
    return terrno;
×
4257
  }
4258
  return TSDB_CODE_SUCCESS;
28✔
4259
}
4260

4261
static void destroySVDropTbBatchReqs(void *p) {
28✔
4262
  SVDropTbBatchReq *pReq = p;
28✔
4263
  taosArrayDestroy(pReq->pArray);
28✔
4264
  pReq->pArray = NULL;
28✔
4265
}
28✔
4266

4267
static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupInfo *pVgInfo, char *name, tb_uid_t suid,
28✔
4268
                            bool ignoreNotExists) {
4269
  SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists, .uid = 0};
28✔
4270

4271
  SVDropTbVgReqs *pVgReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
28✔
4272
  SVDropTbVgReqs  vgReqs = {0};
28✔
4273
  if (pVgReqs == NULL) {
28!
4274
    vgReqs.info = *pVgInfo;
28✔
4275
    vgReqs.pBatchReqs = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbBatchReq));
28✔
4276
    if (!vgReqs.pBatchReqs) return terrno;
28!
4277
    SVDropTbBatchReq batchReq = {0};
28✔
4278
    int32_t          code = createDropTbBatchReq(&req, &batchReq);
28✔
4279
    if (TSDB_CODE_SUCCESS != code) return code;
28!
4280
    if (taosArrayPush(vgReqs.pBatchReqs, &batchReq) == NULL) {
56!
4281
      taosArrayDestroy(batchReq.pArray);
×
4282
      return terrno;
×
4283
    }
4284
    if (taosHashPut(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &vgReqs, sizeof(vgReqs)) != 0) {
28!
4285
      taosArrayDestroyEx(vgReqs.pBatchReqs, destroySVDropTbBatchReqs);
×
4286
      return terrno;
×
4287
    }
4288
  } else {
4289
    SVDropTbBatchReq batchReq = {0};
×
4290
    int32_t          code = createDropTbBatchReq(&req, &batchReq);
×
4291
    if (TSDB_CODE_SUCCESS != code) return code;
×
4292
    if (taosArrayPush(pVgReqs->pBatchReqs, &batchReq) == NULL) {
×
4293
      taosArrayDestroy(batchReq.pArray);
×
4294
      return terrno;
×
4295
    }
4296
  }
4297
  return 0;
28✔
4298
}
4299

4300
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
28✔
4301
                                                 int32_t vgId) {
4302
  int32_t code = 0;
28✔
4303

4304
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
28✔
4305
  if (!pVgObj) {
28!
4306
    code = 0;
×
4307
    goto _end;
×
4308
  }
4309
  SVgroupInfo vgInfo = {.hashBegin = pVgObj->hashBegin,
28✔
4310
                        .hashEnd = pVgObj->hashEnd,
28✔
4311
                        .numOfTable = pVgObj->numOfTables,
28✔
4312
                        .vgId = pVgObj->vgId};
28✔
4313
  vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj);
28✔
4314
  mndReleaseVgroup(pMnode, pVgObj);
28✔
4315

4316
  for (int32_t i = 0; i < pTbs->size; ++i) {
56✔
4317
    SVDropTbReq *pTb = taosArrayGet(pTbs, i);
28✔
4318
    TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists), NULL, _end);
28!
4319
  }
4320
_end:
28✔
4321
  return code;
28✔
4322
}
4323

4324
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
97,362✔
4325
  int32_t                 code = -1;
97,362✔
4326
  SDecoder                decoder = {0};
97,362✔
4327
  SMnode                 *pMnode = pRsp->info.node;
97,362✔
4328
  SVFetchTtlExpiredTbsRsp rsp = {0};
97,362✔
4329
  SMndDropTbsWithTsmaCtx *pCtx = NULL;
97,362✔
4330
  if (pRsp->code != TSDB_CODE_SUCCESS) {
97,362✔
4331
    code = pRsp->code;
399✔
4332
    goto _end;
399✔
4333
  }
4334
  if (pRsp->contLen == 0) {
96,963✔
4335
    code = 0;
96,959✔
4336
    goto _end;
96,959✔
4337
  }
4338

4339
  tDecoderInit(&decoder, pRsp->pCont, pRsp->contLen);
4✔
4340
  code = tDecodeVFetchTtlExpiredTbsRsp(&decoder, &rsp);
4✔
4341
  if (code) goto _end;
4!
4342

4343
  code = mndInitDropTbsWithTsmaCtx(&pCtx);
4✔
4344
  if (code) goto _end;
4!
4345

4346
  code = mndDropTbForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
4✔
4347
  if (code) goto _end;
4!
4348
  code = mndCreateDropTbsTxnPrepare(pRsp, pCtx);
4✔
4349
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
4!
4350
_end:
×
4351
  if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
97,362✔
4352
  tDecoderClear(&decoder);
97,362✔
4353
  tFreeFetchTtlExpiredTbsRsp(&rsp);
97,362✔
4354
  TAOS_RETURN(code);
97,362✔
4355
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc