• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.17
/source/dnode/mnode/impl/src/mndStb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndStb.h"
18
#include "audit.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndIndex.h"
22
#include "mndIndexComm.h"
23
#include "mndInfoSchema.h"
24
#include "mndMnode.h"
25
#include "mndPerfSchema.h"
26
#include "mndPrivilege.h"
27
#include "mndScheduler.h"
28
#include "mndShow.h"
29
#include "mndSma.h"
30
#include "mndTopic.h"
31
#include "mndTrans.h"
32
#include "mndUser.h"
33
#include "mndVgroup.h"
34
#include "tname.h"
35

36
#define STB_VER_NUMBER   2
37
#define STB_RESERVE_SIZE 64
38

39
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
40
static int32_t  mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
41
static int32_t  mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
42
static int32_t  mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
43
static int32_t  mndProcessTtlTimer(SRpcMsg *pReq);
44
static int32_t  mndProcessTrimDbTimer(SRpcMsg *pReq);
45
static int32_t  mndProcessS3MigrateDbTimer(SRpcMsg *pReq);
46
static int32_t  mndProcessS3MigrateDbRsp(SRpcMsg *pReq);
47
static int32_t  mndProcessCreateStbReq(SRpcMsg *pReq);
48
static int32_t  mndProcessAlterStbReq(SRpcMsg *pReq);
49
static int32_t  mndProcessDropStbReq(SRpcMsg *pReq);
50
static int32_t  mndProcessDropTtltbRsp(SRpcMsg *pReq);
51
static int32_t  mndProcessTrimDbRsp(SRpcMsg *pReq);
52
static int32_t  mndProcessTableMetaReq(SRpcMsg *pReq);
53
static int32_t  mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
54
static int32_t  mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
55
static void     mndCancelGetNextStb(SMnode *pMnode, void *pIter);
56
static int32_t  mndProcessTableCfgReq(SRpcMsg *pReq);
57
static int32_t  mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
58
                               void *alterOriData, int32_t alterOriDataLen);
59
static int32_t  mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
60
                                              void *alterOriData, int32_t alterOriDataLen, const SMAlterStbReq *pAlter);
61

62
static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq);
63
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq);
64

65
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq);
66
static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq);
67
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pReq);
68

69
int32_t mndInitStb(SMnode *pMnode) {
716✔
70
  SSdbTable table = {
716✔
71
      .sdbType = SDB_STB,
72
      .keyType = SDB_KEY_BINARY,
73
      .encodeFp = (SdbEncodeFp)mndStbActionEncode,
74
      .decodeFp = (SdbDecodeFp)mndStbActionDecode,
75
      .insertFp = (SdbInsertFp)mndStbActionInsert,
76
      .updateFp = (SdbUpdateFp)mndStbActionUpdate,
77
      .deleteFp = (SdbDeleteFp)mndStbActionDelete,
78
  };
79

80
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessCreateStbReq);
716✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq);
716✔
82
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq);
716✔
83
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp);
716✔
84
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbRsp);
716✔
85
  mndSetMsgHandle(pMnode, TDMT_VND_TRIM_RSP, mndProcessTrimDbRsp);
716✔
86
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
716✔
87
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
716✔
88
  mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
716✔
89
  mndSetMsgHandle(pMnode, TDMT_MND_TTL_TIMER, mndProcessTtlTimer);
716✔
90
  mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB_TIMER, mndProcessTrimDbTimer);
716✔
91
  mndSetMsgHandle(pMnode, TDMT_VND_S3MIGRATE_RSP, mndProcessS3MigrateDbRsp);
716✔
92
  mndSetMsgHandle(pMnode, TDMT_MND_S3MIGRATE_DB_TIMER, mndProcessS3MigrateDbTimer);
716✔
93
  mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
716✔
94
  mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP, mndProcessDropStbReqFromMNode);
716✔
95
  mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP_RSP, mndTransProcessRsp);
716✔
96
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma);
716✔
97
  mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs);
716✔
98
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TABLE_RSP, mndTransProcessRsp);
716✔
99
  //  mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
100

101
  // mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq);
102
  // mndSetMsgHandle(pMnode, TDMT_MND_DROP_INDEX, mndProcessDropIndexReq);
103
  // mndSetMsgHandle(pMnode, TDMT_VND_CREATE_INDEX_RSP, mndTransProcessRsp);
104
  // mndSetMsgHandle(pMnode, TDMT_VND_DROP_INDEX_RSP, mndTransProcessRsp);
105

106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
716✔
107
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb);
716✔
108

109
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COL, mndRetrieveStbCol);
716✔
110
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_COL, mndCancelGetNextStb);
716✔
111

112
  return sdbSetTable(pMnode->pSdb, table);
716✔
113
}
114

115
void mndCleanupStb(SMnode *pMnode) {}
715✔
116

117
SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
21,990✔
118
  int32_t code = 0;
21,990✔
119
  int32_t lino = 0;
21,990✔
120
  terrno = TSDB_CODE_OUT_OF_MEMORY;
21,990✔
121

122
  int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + pStb->commentLen +
43,980✔
123
                 pStb->ast1Len + pStb->ast2Len + pStb->numOfColumns * sizeof(SColCmpr) + STB_RESERVE_SIZE +
43,980✔
124
                 taosArrayGetSize(pStb->pFuncs) * TSDB_FUNC_NAME_LEN;
21,990✔
125
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
21,990✔
126
  if (pRaw == NULL) goto _OVER;
21,990!
127

128
  int32_t dataPos = 0;
21,990✔
129
  SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN, _OVER)
21,990!
130
  SDB_SET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN, _OVER)
21,990!
131
  SDB_SET_INT64(pRaw, dataPos, pStb->createdTime, _OVER)
21,990!
132
  SDB_SET_INT64(pRaw, dataPos, pStb->updateTime, _OVER)
21,990!
133
  SDB_SET_INT64(pRaw, dataPos, pStb->uid, _OVER)
21,990!
134
  SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, _OVER)
21,990!
135
  SDB_SET_INT32(pRaw, dataPos, pStb->tagVer, _OVER)
21,990!
136
  SDB_SET_INT32(pRaw, dataPos, pStb->colVer, _OVER)
21,990!
137
  SDB_SET_INT32(pRaw, dataPos, pStb->smaVer, _OVER)
21,990!
138
  SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER)
21,990!
139
  SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[0], _OVER)
21,990!
140
  SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[1], _OVER)
21,990!
141
  SDB_SET_INT64(pRaw, dataPos, pStb->watermark[0], _OVER)
21,990!
142
  SDB_SET_INT64(pRaw, dataPos, pStb->watermark[1], _OVER)
21,990!
143
  SDB_SET_INT32(pRaw, dataPos, pStb->ttl, _OVER)
21,990!
144
  SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, _OVER)
21,990!
145
  SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER)
21,990!
146
  SDB_SET_INT32(pRaw, dataPos, pStb->numOfFuncs, _OVER)
21,990!
147
  SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, _OVER)
21,990!
148
  SDB_SET_INT32(pRaw, dataPos, pStb->ast1Len, _OVER)
21,990!
149
  SDB_SET_INT32(pRaw, dataPos, pStb->ast2Len, _OVER)
21,990!
150

151
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
470,170✔
152
    SSchema *pSchema = &pStb->pColumns[i];
448,180✔
153
    SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
448,180!
154
    SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
448,180!
155
    SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
448,180!
156
    SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
448,180!
157
    SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
448,180!
158
  }
159

160
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
273,919✔
161
    SSchema *pSchema = &pStb->pTags[i];
251,929✔
162
    SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
251,929!
163
    SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
251,929!
164
    SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
251,929!
165
    SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
251,929!
166
    SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
251,929!
167
  }
168

169
  for (int32_t i = 0; i < pStb->numOfFuncs; ++i) {
21,996✔
170
    char *func = taosArrayGet(pStb->pFuncs, i);
6✔
171
    SDB_SET_BINARY(pRaw, dataPos, func, TSDB_FUNC_NAME_LEN, _OVER)
6!
172
  }
173

174
  if (pStb->commentLen > 0) {
21,990✔
175
    SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen + 1, _OVER)
91!
176
  }
177

178
  if (pStb->ast1Len > 0) {
21,990✔
179
    SDB_SET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
6!
180
  }
181

182
  if (pStb->ast2Len > 0) {
21,990✔
183
    SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
6!
184
  }
185

186
  if (pStb->pCmpr != NULL) {
21,990!
187
    for (int i = 0; i < pStb->numOfColumns; i++) {
470,170✔
188
      SColCmpr *p = &pStb->pCmpr[i];
448,180✔
189
      SDB_SET_INT16(pRaw, dataPos, p->id, _OVER)
448,180!
190
      SDB_SET_INT32(pRaw, dataPos, p->alg, _OVER)
448,180!
191
    }
192
  }
193
  SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
21,990!
194
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
21,990!
195

196
  terrno = 0;
21,990✔
197

198
_OVER:
21,990✔
199
  if (terrno != 0) {
21,990!
200
    mError("stb:%s, failed to encode to raw:%p since %s", pStb->name, pRaw, terrstr());
×
201
    sdbFreeRaw(pRaw);
×
202
    return NULL;
×
203
  }
204

205
  mTrace("stb:%s, encode to raw:%p, row:%p", pStb->name, pRaw, pStb);
21,990✔
206
  return pRaw;
21,990✔
207
}
208

209
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
7,562✔
210
  int32_t code = 0;
7,562✔
211
  int32_t lino = 0;
7,562✔
212
  terrno = TSDB_CODE_OUT_OF_MEMORY;
7,562✔
213
  SSdbRow *pRow = NULL;
7,562✔
214
  SStbObj *pStb = NULL;
7,562✔
215

216
  int8_t sver = 0;
7,562✔
217
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
7,562!
218

219
  if (sver > STB_VER_NUMBER) {
7,562!
220
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
221
    goto _OVER;
×
222
  }
223

224
  pRow = sdbAllocRow(sizeof(SStbObj));
7,562✔
225
  if (pRow == NULL) goto _OVER;
7,562!
226

227
  pStb = sdbGetRowObj(pRow);
7,562✔
228
  if (pStb == NULL) goto _OVER;
7,562!
229

230
  int32_t dataPos = 0;
7,562✔
231
  SDB_GET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN, _OVER)
7,562!
232
  SDB_GET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN, _OVER)
7,562!
233
  SDB_GET_INT64(pRaw, dataPos, &pStb->createdTime, _OVER)
7,562!
234
  SDB_GET_INT64(pRaw, dataPos, &pStb->updateTime, _OVER)
7,562!
235
  SDB_GET_INT64(pRaw, dataPos, &pStb->uid, _OVER)
7,562!
236
  SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, _OVER)
7,562!
237
  SDB_GET_INT32(pRaw, dataPos, &pStb->tagVer, _OVER)
7,562!
238
  SDB_GET_INT32(pRaw, dataPos, &pStb->colVer, _OVER)
7,562!
239
  SDB_GET_INT32(pRaw, dataPos, &pStb->smaVer, _OVER)
7,562!
240
  SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER)
7,562!
241
  SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[0], _OVER)
7,562!
242
  SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[1], _OVER)
7,562!
243
  SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[0], _OVER)
7,562!
244
  SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[1], _OVER)
7,562!
245
  SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, _OVER)
7,562!
246
  SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, _OVER)
7,562!
247
  SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER)
7,562!
248
  SDB_GET_INT32(pRaw, dataPos, &pStb->numOfFuncs, _OVER)
7,562!
249
  SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, _OVER)
7,562!
250
  SDB_GET_INT32(pRaw, dataPos, &pStb->ast1Len, _OVER)
7,562!
251
  SDB_GET_INT32(pRaw, dataPos, &pStb->ast2Len, _OVER)
7,562!
252

253
  pStb->pColumns = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
7,562✔
254
  pStb->pTags = taosMemoryCalloc(pStb->numOfTags, sizeof(SSchema));
7,562✔
255
  pStb->pFuncs = taosArrayInit(pStb->numOfFuncs, TSDB_FUNC_NAME_LEN);
7,562✔
256
  if (pStb->pColumns == NULL || pStb->pTags == NULL || pStb->pFuncs == NULL) {
7,562!
257
    goto _OVER;
×
258
  }
259

260
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
116,775✔
261
    SSchema *pSchema = &pStb->pColumns[i];
109,213✔
262
    SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
109,213!
263
    SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
109,213!
264
    SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
109,213!
265
    SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
109,213!
266
    SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
109,213!
267
  }
268

269
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
59,994✔
270
    SSchema *pSchema = &pStb->pTags[i];
52,432✔
271
    SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
52,432!
272
    SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
52,432!
273
    SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
52,432!
274
    SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
52,432!
275
    SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
52,432!
276
  }
277

278
  for (int32_t i = 0; i < pStb->numOfFuncs; ++i) {
7,572✔
279
    char funcName[TSDB_FUNC_NAME_LEN] = {0};
10✔
280
    SDB_GET_BINARY(pRaw, dataPos, funcName, TSDB_FUNC_NAME_LEN, _OVER)
10!
281
    if (taosArrayPush(pStb->pFuncs, funcName) == NULL) goto _OVER;
20!
282
  }
283

284
  if (pStb->commentLen > 0) {
7,562✔
285
    pStb->comment = taosMemoryCalloc(pStb->commentLen + 1, 1);
83✔
286
    if (pStb->comment == NULL) goto _OVER;
83!
287
    SDB_GET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen + 1, _OVER)
83!
288
  }
289

290
  if (pStb->ast1Len > 0) {
7,562✔
291
    pStb->pAst1 = taosMemoryCalloc(pStb->ast1Len, 1);
10✔
292
    if (pStb->pAst1 == NULL) goto _OVER;
10!
293
    SDB_GET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
10!
294
  }
295

296
  if (pStb->ast2Len > 0) {
7,562✔
297
    pStb->pAst2 = taosMemoryCalloc(pStb->ast2Len, 1);
10✔
298
    if (pStb->pAst2 == NULL) goto _OVER;
10!
299
    SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
10!
300
  }
301

302
  pStb->pCmpr = taosMemoryCalloc(pStb->numOfColumns, sizeof(SColCmpr));
7,562✔
303
  if (sver < STB_VER_NUMBER) {
7,562!
304
    // compatible with old data, setup default compress value
305
    // impl later
306
    for (int i = 0; i < pStb->numOfColumns; i++) {
×
307
      SSchema  *pSchema = &pStb->pColumns[i];
×
308
      SColCmpr *pCmpr = &pStb->pCmpr[i];
×
309
      pCmpr->id = pSchema->colId;
×
310
      pCmpr->alg = createDefaultColCmprByType(pSchema->type);
×
311
    }
312
  } else {
313
    for (int i = 0; i < pStb->numOfColumns; i++) {
116,775✔
314
      SColCmpr *pCmpr = &pStb->pCmpr[i];
109,213✔
315
      SDB_GET_INT16(pRaw, dataPos, &pCmpr->id, _OVER)
109,213!
316
      SDB_GET_INT32(pRaw, dataPos, (int32_t *)&pCmpr->alg, _OVER)  // compatiable
109,213!
317
    }
318
  }
319

320
  SDB_GET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
7,562!
321

322
  terrno = 0;
7,562✔
323

324
_OVER:
7,562✔
325
  if (terrno != 0) {
7,562!
326
    mError("stb:%s, failed to decode from raw:%p since %s", pStb == NULL ? "null" : pStb->name, pRaw, terrstr());
×
327
    if (pStb != NULL) {
×
328
      taosMemoryFreeClear(pStb->pColumns);
×
329
      taosMemoryFreeClear(pStb->pTags);
×
330
      taosMemoryFreeClear(pStb->comment);
×
331
      taosMemoryFree(pStb->pCmpr);
×
332
    }
333
    taosMemoryFreeClear(pRow);
×
334
    return NULL;
×
335
  }
336

337
  mTrace("stb:%s, decode from raw:%p, row:%p", pStb->name, pRaw, pStb);
7,562✔
338
  return pRow;
7,562✔
339
}
340

341
void mndFreeStb(SStbObj *pStb) {
10,647✔
342
  taosArrayDestroy(pStb->pFuncs);
10,647✔
343
  taosMemoryFreeClear(pStb->pColumns);
10,647!
344
  taosMemoryFreeClear(pStb->pTags);
10,647!
345
  taosMemoryFreeClear(pStb->comment);
10,647✔
346
  taosMemoryFreeClear(pStb->pAst1);
10,647✔
347
  taosMemoryFreeClear(pStb->pAst2);
10,647✔
348
  taosMemoryFreeClear(pStb->pCmpr);
10,647!
349
}
10,647✔
350

351
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
3,864✔
352
  mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb);
3,864✔
353
  return 0;
3,864✔
354
}
355

356
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
9,936✔
357
  mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
9,936✔
358
  mndFreeStb(pStb);
9,936✔
359
  return 0;
9,936✔
360
}
361

362
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
2,035✔
363
  mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
2,035!
364

365
  taosWLockLatch(&pOld->lock);
2,035✔
366
  int32_t numOfColumns = pOld->numOfColumns;
2,035✔
367
  if (pOld->numOfColumns < pNew->numOfColumns) {
2,035✔
368
    void *pColumns = taosMemoryMalloc(pNew->numOfColumns * sizeof(SSchema));
34✔
369
    if (pColumns != NULL) {
34!
370
      taosMemoryFree(pOld->pColumns);
34✔
371
      pOld->pColumns = pColumns;
34✔
372
    } else {
373
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
374
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
375
      taosWUnLockLatch(&pOld->lock);
×
376
    }
377
  }
378

379
  if (pOld->numOfTags < pNew->numOfTags) {
2,035✔
380
    void *pTags = taosMemoryMalloc(pNew->numOfTags * sizeof(SSchema));
79✔
381
    if (pTags != NULL) {
79!
382
      taosMemoryFree(pOld->pTags);
79✔
383
      pOld->pTags = pTags;
79✔
384
    } else {
385
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
386
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
387
      taosWUnLockLatch(&pOld->lock);
×
388
    }
389
  }
390

391
  if (pOld->commentLen < pNew->commentLen && pNew->commentLen > 0) {
2,035!
392
    void *comment = taosMemoryMalloc(pNew->commentLen + 1);
2✔
393
    if (comment != NULL) {
2!
394
      taosMemoryFree(pOld->comment);
2✔
395
      pOld->comment = comment;
2✔
396
    } else {
397
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
398
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
399
      taosWUnLockLatch(&pOld->lock);
×
400
    }
401
  }
402
  pOld->commentLen = pNew->commentLen;
2,035✔
403

404
  if (pOld->ast1Len < pNew->ast1Len) {
2,035!
405
    void *pAst1 = taosMemoryMalloc(pNew->ast1Len + 1);
×
406
    if (pAst1 != NULL) {
×
407
      taosMemoryFree(pOld->pAst1);
×
408
      pOld->pAst1 = pAst1;
×
409
    } else {
410
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
411
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
412
      taosWUnLockLatch(&pOld->lock);
×
413
    }
414
  }
415

416
  if (pOld->ast2Len < pNew->ast2Len) {
2,035!
417
    void *pAst2 = taosMemoryMalloc(pNew->ast2Len + 1);
×
418
    if (pAst2 != NULL) {
×
419
      taosMemoryFree(pOld->pAst2);
×
420
      pOld->pAst2 = pAst2;
×
421
    } else {
422
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
423
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
424
      taosWUnLockLatch(&pOld->lock);
×
425
    }
426
  }
427

428
  pOld->updateTime = pNew->updateTime;
2,035✔
429
  pOld->tagVer = pNew->tagVer;
2,035✔
430
  pOld->colVer = pNew->colVer;
2,035✔
431
  pOld->smaVer = pNew->smaVer;
2,035✔
432
  pOld->nextColId = pNew->nextColId;
2,035✔
433
  pOld->ttl = pNew->ttl;
2,035✔
434
  if (pNew->numOfColumns > 0) {
2,035✔
435
    pOld->numOfColumns = pNew->numOfColumns;
1,996✔
436
    memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
1,996✔
437
  }
438
  if (pNew->numOfTags > 0) {
2,035✔
439
    pOld->numOfTags = pNew->numOfTags;
1,996✔
440
    memcpy(pOld->pTags, pNew->pTags, pOld->numOfTags * sizeof(SSchema));
1,996✔
441
  }
442
  if (pNew->commentLen > 0) {
2,035✔
443
    memcpy(pOld->comment, pNew->comment, pNew->commentLen + 1);
66✔
444
    pOld->commentLen = pNew->commentLen;
66✔
445
  }
446
  if (pNew->ast1Len != 0) {
2,035!
447
    memcpy(pOld->pAst1, pNew->pAst1, pNew->ast1Len);
×
448
    pOld->ast1Len = pNew->ast1Len;
×
449
  }
450
  if (pNew->ast2Len != 0) {
2,035!
451
    memcpy(pOld->pAst2, pNew->pAst2, pNew->ast2Len);
×
452
    pOld->ast2Len = pNew->ast2Len;
×
453
  }
454
  if (numOfColumns < pNew->numOfColumns) {
2,035✔
455
    taosMemoryFree(pOld->pCmpr);
34✔
456
    pOld->pCmpr = taosMemoryCalloc(pNew->numOfColumns, sizeof(SColCmpr));
34✔
457
    memcpy(pOld->pCmpr, pNew->pCmpr, pNew->numOfColumns * sizeof(SColCmpr));
34✔
458
  } else {
459
    memcpy(pOld->pCmpr, pNew->pCmpr, pNew->numOfColumns * sizeof(SColCmpr));
2,001✔
460
  }
461

462
  taosWUnLockLatch(&pOld->lock);
2,035✔
463
  return 0;
2,035✔
464
}
465

466
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) {
40,968✔
467
  SSdb    *pSdb = pMnode->pSdb;
40,968✔
468
  SStbObj *pStb = sdbAcquire(pSdb, SDB_STB, stbName);
40,968✔
469
  if (pStb == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
40,968!
470
    terrno = TSDB_CODE_MND_STB_NOT_EXIST;
4,145✔
471
  }
472
  return pStb;
40,968✔
473
}
474

475
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb) {
39,969✔
476
  SSdb *pSdb = pMnode->pSdb;
39,969✔
477
  sdbRelease(pSdb, pStb);
39,969✔
478
}
39,969✔
479

480
SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
4,875✔
481
  SName name = {0};
4,875✔
482
  if ((terrno = tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) return NULL;
4,875!
483

484
  char db[TSDB_TABLE_FNAME_LEN] = {0};
4,875✔
485
  if ((terrno = tNameGetFullDbName(&name, db)) != 0) return NULL;
4,875!
486

487
  return mndAcquireDb(pMnode, db);
4,875✔
488
}
489

490
static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *pSchema) {
491
  if (*(col_id_t *)colId < ((SSchema *)pSchema)->colId) {
492
    return -1;
493
  } else if (*(col_id_t *)colId > ((SSchema *)pSchema)->colId) {
494
    return 1;
495
  }
496
  return 0;
497
}
498

499
void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, void *alterOriData,
8,614✔
500
                            int32_t alterOriDataLen) {
501
  SEncoder       encoder = {0};
8,614✔
502
  int32_t        contLen;
503
  SName          name = {0};
8,614✔
504
  SVCreateStbReq req = {0};
8,614✔
505

506
  if ((terrno = tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
8,614!
507
    goto _err;
×
508
  }
509
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
8,614✔
510
  if ((terrno = tNameGetFullDbName(&name, dbFName)) != 0) {
8,614!
511
    goto _err;
×
512
  };
513

514
  req.name = (char *)tNameGetTableName(&name);
8,614✔
515
  req.suid = pStb->uid;
8,614✔
516
  req.rollup = pStb->ast1Len > 0 ? 1 : 0;
8,614✔
517
  req.alterOriData = alterOriData;
8,614✔
518
  req.alterOriDataLen = alterOriDataLen;
8,614✔
519
  req.source = pStb->source;
8,614✔
520
  // todo
521
  req.schemaRow.nCols = pStb->numOfColumns;
8,614✔
522
  req.schemaRow.version = pStb->colVer;
8,614✔
523
  req.schemaRow.pSchema = pStb->pColumns;
8,614✔
524
  req.schemaTag.nCols = pStb->numOfTags;
8,614✔
525
  req.schemaTag.version = pStb->tagVer;
8,614✔
526
  req.schemaTag.pSchema = pStb->pTags;
8,614✔
527

528
  req.colCmpred = 1;
8,614✔
529
  SColCmprWrapper *pCmpr = &req.colCmpr;
8,614✔
530
  pCmpr->version = pStb->colVer;
8,614✔
531
  pCmpr->nCols = pStb->numOfColumns;
8,614✔
532

533
  req.colCmpr.pColCmpr = taosMemoryCalloc(pCmpr->nCols, sizeof(SColCmpr));
8,614✔
534
  for (int32_t i = 0; i < pStb->numOfColumns; i++) {
124,726✔
535
    SColCmpr *p = &pCmpr->pColCmpr[i];
116,112✔
536
    p->alg = pStb->pCmpr[i].alg;
116,112✔
537
    p->id = pStb->pCmpr[i].id;
116,112✔
538
  }
539

540
  if (req.rollup) {
8,614✔
541
    req.rsmaParam.maxdelay[0] = pStb->maxdelay[0];
4✔
542
    req.rsmaParam.maxdelay[1] = pStb->maxdelay[1];
4✔
543
    req.rsmaParam.watermark[0] = pStb->watermark[0];
4✔
544
    req.rsmaParam.watermark[1] = pStb->watermark[1];
4✔
545
    if (pStb->ast1Len > 0) {
4!
546
      if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
4!
547
                             STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0],
548
                             req.rsmaParam.deleteMark[0]) < 0) {
549
        goto _err;
×
550
      }
551
    }
552
    if (pStb->ast2Len > 0) {
4!
553
      if (mndConvertRsmaTask(&req.rsmaParam.qmsg[1], &req.rsmaParam.qmsgLen[1], pStb->pAst2, pStb->uid,
4!
554
                             STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[1],
555
                             req.rsmaParam.deleteMark[1]) < 0) {
556
        goto _err;
×
557
      }
558
    }
559
  }
560
  // get length
561
  int32_t ret = 0;
8,614✔
562
  tEncodeSize(tEncodeSVCreateStbReq, &req, contLen, ret);
8,614!
563
  if (ret < 0) {
8,614!
564
    goto _err;
×
565
  }
566

567
  contLen += sizeof(SMsgHead);
8,614✔
568

569
  SMsgHead *pHead = taosMemoryCalloc(1, contLen);
8,614✔
570
  if (pHead == NULL) {
8,614!
571
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
572
    goto _err;
×
573
  }
574

575
  pHead->contLen = htonl(contLen);
8,614✔
576
  pHead->vgId = htonl(pVgroup->vgId);
8,614✔
577

578
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
8,614✔
579
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
8,614✔
580
  if (tEncodeSVCreateStbReq(&encoder, &req) < 0) {
8,614!
581
    taosMemoryFreeClear(pHead);
×
582
    tEncoderClear(&encoder);
×
583
    goto _err;
×
584
  }
585
  tEncoderClear(&encoder);
8,614✔
586

587
  *pContLen = contLen;
8,614✔
588
  taosMemoryFreeClear(req.rsmaParam.qmsg[0]);
8,614✔
589
  taosMemoryFreeClear(req.rsmaParam.qmsg[1]);
8,614✔
590
  taosMemoryFreeClear(req.colCmpr.pColCmpr);
8,614!
591
  return pHead;
8,614✔
592
_err:
×
593
  taosMemoryFreeClear(req.rsmaParam.qmsg[0]);
×
594
  taosMemoryFreeClear(req.rsmaParam.qmsg[1]);
×
595
  taosMemoryFreeClear(req.colCmpr.pColCmpr);
×
596
  return NULL;
×
597
}
598

599
static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
8,371✔
600
  SName        name = {0};
8,371✔
601
  SVDropStbReq req = {0};
8,371✔
602
  int32_t      contLen = 0;
8,371✔
603
  int32_t      ret = 0;
8,371✔
604
  SMsgHead    *pHead = NULL;
8,371✔
605
  SEncoder     encoder = {0};
8,371✔
606

607
  if ((terrno = tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
8,371!
608
    return NULL;
×
609
  }
610

611
  req.name = (char *)tNameGetTableName(&name);
8,371✔
612
  req.suid = pStb->uid;
8,371✔
613

614
  tEncodeSize(tEncodeSVDropStbReq, &req, contLen, ret);
8,371!
615
  if (ret < 0) return NULL;
8,371!
616

617
  contLen += sizeof(SMsgHead);
8,371✔
618
  pHead = taosMemoryMalloc(contLen);
8,371✔
619
  if (pHead == NULL) {
8,371!
620
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
621
    return NULL;
×
622
  }
623

624
  pHead->contLen = htonl(contLen);
8,371✔
625
  pHead->vgId = htonl(pVgroup->vgId);
8,371✔
626

627
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
8,371✔
628

629
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
8,371✔
630
  terrno = tEncodeSVDropStbReq(&encoder, &req);
8,371✔
631
  tEncoderClear(&encoder);
8,371✔
632
  if (terrno != 0) return NULL;
8,371!
633

634
  *pContLen = contLen;
8,371✔
635
  return pHead;
8,371✔
636
}
637

638
int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
3,110✔
639
  int32_t code = 0;
3,110✔
640
  if (pCreate->igExists < 0 || pCreate->igExists > 1) {
3,110!
641
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
642
    TAOS_RETURN(code);
×
643
  }
644

645
  if (pCreate->numOfColumns < TSDB_MIN_COLUMNS || pCreate->numOfTags + pCreate->numOfColumns > TSDB_MAX_COLUMNS) {
3,110!
646
    code = TSDB_CODE_PAR_INVALID_COLUMNS_NUM;
×
647
    TAOS_RETURN(code);
×
648
  }
649

650
  if (pCreate->numOfTags <= 0 || pCreate->numOfTags > TSDB_MAX_TAGS) {
3,110!
651
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
652
    TAOS_RETURN(code);
×
653
  }
654

655
  SField *pField = taosArrayGet(pCreate->pColumns, 0);
3,110✔
656
  if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
3,110!
657
    code = TSDB_CODE_PAR_INVALID_FIRST_COLUMN;
×
658
    TAOS_RETURN(code);
×
659
  }
660

661
  for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
46,239✔
662
    SFieldWithOptions *pField1 = taosArrayGet(pCreate->pColumns, i);
43,129✔
663
    if (pField1->type >= TSDB_DATA_TYPE_MAX) {
43,129!
664
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
665
      TAOS_RETURN(code);
×
666
    }
667
    if (pField1->bytes <= 0) {
43,129!
668
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
669
      TAOS_RETURN(code);
×
670
    }
671
    if (pField1->name[0] == 0) {
43,129!
672
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
673
      TAOS_RETURN(code);
×
674
    }
675
  }
676

677
  for (int32_t i = 0; i < pCreate->numOfTags; ++i) {
23,176✔
678
    SField *pField1 = taosArrayGet(pCreate->pTags, i);
20,066✔
679
    if (pField1->type >= TSDB_DATA_TYPE_MAX) {
20,066!
680
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
681
      TAOS_RETURN(code);
×
682
    }
683
    if (pField1->bytes <= 0) {
20,066!
684
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
685
      TAOS_RETURN(code);
×
686
    }
687
    if (pField1->name[0] == 0) {
20,066!
688
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
689
      TAOS_RETURN(code);
×
690
    }
691
  }
692

693
  TAOS_RETURN(code);
3,110✔
694
}
695

696
static int32_t mndSetCreateStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
×
697
  int32_t  code = 0;
×
698
  SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
×
699
  if (pRedoRaw == NULL) {
×
700
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
701
    if (terrno != 0) code = terrno;
×
702
    TAOS_RETURN(code);
×
703
  }
704
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
×
705
    sdbFreeRaw(pRedoRaw);
×
706
    TAOS_RETURN(code);
×
707
  }
708
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
709

710
  TAOS_RETURN(code);
×
711
}
712

713
static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
3,079✔
714
  int32_t  code = 0;
3,079✔
715
  SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
3,079✔
716
  if (pCommitRaw == NULL) {
3,079!
717
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
718
    if (terrno != 0) code = terrno;
×
719
    TAOS_RETURN(code);
×
720
  }
721
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
3,079!
722
    sdbFreeRaw(pCommitRaw);
×
723
    TAOS_RETURN(code);
×
724
  }
725
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
3,079!
726

727
  TAOS_RETURN(code);
3,079✔
728
}
729

730
static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
3,079✔
731
  int32_t code = 0;
3,079✔
732
  SSdb   *pSdb = pMnode->pSdb;
3,079✔
733
  SVgObj *pVgroup = NULL;
3,079✔
734
  void   *pIter = NULL;
3,079✔
735
  int32_t contLen;
736

737
  while (1) {
82,347✔
738
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
85,426✔
739
    if (pIter == NULL) break;
85,426✔
740
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
82,347✔
741
      sdbRelease(pSdb, pVgroup);
74,842✔
742
      continue;
74,842✔
743
    }
744

745
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, NULL, 0);
7,505✔
746
    if (pReq == NULL) {
7,505!
747
      sdbCancelFetch(pSdb, pIter);
×
748
      sdbRelease(pSdb, pVgroup);
×
749
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
750
      if (terrno != 0) code = terrno;
×
751
      TAOS_RETURN(code);
×
752
    }
753

754
    STransAction action = {0};
7,505✔
755
    action.mTraceId = pTrans->mTraceId;
7,505✔
756
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
7,505✔
757
    action.pCont = pReq;
7,505✔
758
    action.contLen = contLen;
7,505✔
759
    action.msgType = TDMT_VND_CREATE_STB;
7,505✔
760
    action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
7,505✔
761
    action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;
7,505✔
762
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
7,505!
763
      taosMemoryFree(pReq);
×
764
      sdbCancelFetch(pSdb, pIter);
×
765
      sdbRelease(pSdb, pVgroup);
×
766
      TAOS_RETURN(code);
×
767
    }
768
    sdbRelease(pSdb, pVgroup);
7,505✔
769
  }
770

771
  TAOS_RETURN(code);
3,079✔
772
}
773

UNCOV
774
int32_t mndSetForceDropCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SStbObj *pStb) {
×
UNCOV
775
  int32_t code = 0;
×
UNCOV
776
  SSdb   *pSdb = pMnode->pSdb;
×
777
  int32_t contLen;
778

UNCOV
779
  void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, NULL, 0);
×
UNCOV
780
  if (pReq == NULL) {
×
781
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
782
    if (terrno != 0) code = terrno;
×
783
    TAOS_RETURN(code);
×
784
  }
785

UNCOV
786
  STransAction action = {0};
×
UNCOV
787
  action.mTraceId = pTrans->mTraceId;
×
UNCOV
788
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
UNCOV
789
  action.pCont = pReq;
×
UNCOV
790
  action.contLen = contLen;
×
UNCOV
791
  action.msgType = TDMT_VND_CREATE_STB;
×
UNCOV
792
  action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
×
UNCOV
793
  action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;
×
UNCOV
794
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
795
    taosMemoryFree(pReq);
×
796
    TAOS_RETURN(code);
×
797
  }
798

UNCOV
799
  TAOS_RETURN(code);
×
800
}
801

802
static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
3,079✔
803
  int32_t code = 0;
3,079✔
804
  SSdb   *pSdb = pMnode->pSdb;
3,079✔
805
  SVgObj *pVgroup = NULL;
3,079✔
806
  void   *pIter = NULL;
3,079✔
807

808
  while (1) {
82,347✔
809
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
85,426✔
810
    if (pIter == NULL) break;
85,426✔
811
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
82,347✔
812
      sdbRelease(pSdb, pVgroup);
74,842✔
813
      continue;
74,842✔
814
    }
815

816
    int32_t contLen = 0;
7,505✔
817
    void   *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
7,505✔
818
    if (pReq == NULL) {
7,505!
819
      sdbCancelFetch(pSdb, pIter);
×
820
      sdbRelease(pSdb, pVgroup);
×
821
      code = TSDB_CODE_OUT_OF_MEMORY;
×
822
      TAOS_RETURN(code);
×
823
    }
824

825
    STransAction action = {0};
7,505✔
826
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
7,505✔
827
    action.pCont = pReq;
7,505✔
828
    action.contLen = contLen;
7,505✔
829
    action.msgType = TDMT_VND_DROP_STB;
7,505✔
830
    action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
7,505✔
831
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
7,505!
832
      taosMemoryFree(pReq);
×
833
      sdbCancelFetch(pSdb, pIter);
×
834
      sdbRelease(pSdb, pVgroup);
×
835
      TAOS_RETURN(code);
×
836
    }
837
    sdbRelease(pSdb, pVgroup);
7,505✔
838
  }
839

840
  TAOS_RETURN(code);
3,079✔
841
}
842

843
static SSchema *mndFindStbColumns(const SStbObj *pStb, const char *colName) {
×
844
  for (int32_t col = 0; col < pStb->numOfColumns; ++col) {
×
845
    SSchema *pSchema = &pStb->pColumns[col];
×
846
    if (strncasecmp(pSchema->name, colName, TSDB_COL_NAME_LEN) == 0) {
×
847
      return pSchema;
×
848
    }
849
  }
850
  return NULL;
×
851
}
852

853
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb) {
3,088✔
854
  int32_t code = 0;
3,088✔
855
  memcpy(pDst->name, pCreate->name, TSDB_TABLE_FNAME_LEN);
3,088✔
856
  memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
3,088✔
857
  pDst->createdTime = taosGetTimestampMs();
3,088✔
858
  pDst->updateTime = pDst->createdTime;
3,088✔
859
  pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX)
3,088!
860
                  ? pCreate->suid
861
                  : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
6,176!
862
  pDst->dbUid = pDb->uid;
3,088✔
863
  pDst->tagVer = 1;
3,088✔
864
  pDst->colVer = 1;
3,088✔
865
  pDst->smaVer = 1;
3,088✔
866
  pDst->nextColId = 1;
3,088✔
867
  pDst->maxdelay[0] = pCreate->delay1;
3,088✔
868
  pDst->maxdelay[1] = pCreate->delay2;
3,088✔
869
  pDst->watermark[0] = pCreate->watermark1;
3,088✔
870
  pDst->watermark[1] = pCreate->watermark2;
3,088✔
871
  pDst->ttl = pCreate->ttl;
3,088✔
872
  pDst->numOfColumns = pCreate->numOfColumns;
3,088✔
873
  pDst->numOfTags = pCreate->numOfTags;
3,088✔
874
  pDst->numOfFuncs = pCreate->numOfFuncs;
3,088✔
875
  pDst->commentLen = pCreate->commentLen;
3,088✔
876
  pDst->pFuncs = pCreate->pFuncs;
3,088✔
877
  pDst->source = pCreate->source;
3,088✔
878
  pCreate->pFuncs = NULL;
3,088✔
879

880
  if (pDst->commentLen > 0) {
3,088✔
881
    pDst->comment = taosMemoryCalloc(pDst->commentLen + 1, 1);
12✔
882
    if (pDst->comment == NULL) {
12!
883
      code = terrno;
×
884
      TAOS_RETURN(code);
×
885
    }
886
    memcpy(pDst->comment, pCreate->pComment, pDst->commentLen + 1);
12✔
887
  }
888

889
  pDst->ast1Len = pCreate->ast1Len;
3,088✔
890
  if (pDst->ast1Len > 0) {
3,088✔
891
    pDst->pAst1 = taosMemoryCalloc(pDst->ast1Len, 1);
3✔
892
    if (pDst->pAst1 == NULL) {
3!
893
      code = terrno;
×
894
      TAOS_RETURN(code);
×
895
    }
896
    memcpy(pDst->pAst1, pCreate->pAst1, pDst->ast1Len);
3✔
897
  }
898

899
  pDst->ast2Len = pCreate->ast2Len;
3,088✔
900
  if (pDst->ast2Len > 0) {
3,088✔
901
    pDst->pAst2 = taosMemoryCalloc(pDst->ast2Len, 1);
3✔
902
    if (pDst->pAst2 == NULL) {
3!
903
      code = terrno;
×
904
      TAOS_RETURN(code);
×
905
    }
906
    memcpy(pDst->pAst2, pCreate->pAst2, pDst->ast2Len);
3✔
907
  }
908

909
  pDst->pColumns = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SSchema));
3,088✔
910
  pDst->pTags = taosMemoryCalloc(1, pDst->numOfTags * sizeof(SSchema));
3,088✔
911
  if (pDst->pColumns == NULL || pDst->pTags == NULL) {
3,088!
912
    code = terrno;
×
913
    TAOS_RETURN(code);
×
914
  }
915

916
  if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) {
3,088!
917
    code = TSDB_CODE_OUT_OF_RANGE;
×
918
    TAOS_RETURN(code);
×
919
  }
920

921
  for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
46,165✔
922
    SFieldWithOptions *pField = taosArrayGet(pCreate->pColumns, i);
43,077✔
923
    SSchema           *pSchema = &pDst->pColumns[i];
43,077✔
924
    pSchema->type = pField->type;
43,077✔
925
    pSchema->bytes = pField->bytes;
43,077✔
926
    pSchema->flags = pField->flags;
43,077✔
927
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
43,077✔
928
    pSchema->colId = pDst->nextColId;
43,077✔
929
    pDst->nextColId++;
43,077✔
930
  }
931

932
  for (int32_t i = 0; i < pDst->numOfTags; ++i) {
23,005✔
933
    SField  *pField = taosArrayGet(pCreate->pTags, i);
19,917✔
934
    SSchema *pSchema = &pDst->pTags[i];
19,917✔
935
    pSchema->type = pField->type;
19,917✔
936
    pSchema->bytes = pField->bytes;
19,917✔
937
    if (i == 0) {
19,917✔
938
      SSCHMEA_SET_IDX_ON(pSchema);
3,088✔
939
    }
940
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
19,917✔
941
    pSchema->colId = pDst->nextColId;
19,917✔
942
    pDst->nextColId++;
19,917✔
943
  }
944
  // set col compress
945
  pDst->pCmpr = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SCmprObj));
3,088✔
946
  for (int32_t i = 0; i < pDst->numOfColumns; i++) {
46,165✔
947
    SFieldWithOptions *pField = taosArrayGet(pCreate->pColumns, i);
43,077✔
948
    SSchema           *pSchema = &pDst->pColumns[i];
43,077✔
949

950
    SColCmpr *pColCmpr = &pDst->pCmpr[i];
43,077✔
951
    pColCmpr->id = pSchema->colId;
43,077✔
952
    pColCmpr->alg = pField->compress;
43,077✔
953
  }
954
  TAOS_RETURN(code);
3,088✔
955
}
956
static int32_t mndGenIdxNameForFirstTag(char *fullname, char *dbname, char *stbname, char *tagname) {
2,377✔
957
  SName name = {0};
2,377✔
958
  if ((terrno = tNameFromString(&name, stbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
2,377!
959
    return -1;
×
960
  }
961
  return snprintf(fullname, TSDB_INDEX_FNAME_LEN, "%s.%s_%s", dbname, tagname, tNameGetTableName(&name));
2,377✔
962
}
963

964
static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
2,377✔
965
  SStbObj stbObj = {0};
2,377✔
966
  int32_t code = -1;
2,377✔
967

968
  char fullIdxName[TSDB_INDEX_FNAME_LEN * 2] = {0};
2,377✔
969

970
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stb");
2,377✔
971
  if (pTrans == NULL) {
2,377!
972
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
973
    if (terrno != 0) code = terrno;
×
974
    goto _OVER;
×
975
  }
976

977
  mInfo("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
2,377!
978
  TAOS_CHECK_GOTO(mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb), NULL, _OVER);
2,377!
979

980
  SSchema *pSchema = &(stbObj.pTags[0]);
2,377✔
981
  if (mndGenIdxNameForFirstTag(fullIdxName, pDb->name, stbObj.name, pSchema->name) < 0) {
2,377!
982
    goto _OVER;
×
983
  }
984
  SSIdx idx = {0};
2,377✔
985
  if (mndAcquireGlobalIdx(pMnode, fullIdxName, SDB_IDX, &idx) == 0 && idx.pIdx != NULL) {
2,377!
986
    code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
×
987
    mndReleaseIdx(pMnode, idx.pIdx);
×
988
    goto _OVER;
×
989
  }
990

991
  SIdxObj idxObj = {0};
2,377✔
992
  memcpy(idxObj.name, fullIdxName, TSDB_INDEX_FNAME_LEN);
2,377✔
993
  memcpy(idxObj.stb, stbObj.name, TSDB_TABLE_FNAME_LEN);
2,377✔
994
  memcpy(idxObj.db, stbObj.db, TSDB_DB_FNAME_LEN);
2,377✔
995
  memcpy(idxObj.colName, pSchema->name, TSDB_COL_NAME_LEN);
2,377✔
996
  idxObj.createdTime = taosGetTimestampMs();
2,377✔
997
  idxObj.uid = mndGenerateUid(fullIdxName, strlen(fullIdxName));
2,377✔
998
  idxObj.stbUid = stbObj.uid;
2,377✔
999
  idxObj.dbUid = stbObj.dbUid;
2,377✔
1000

1001
  TAOS_CHECK_GOTO(mndSetCreateIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
2,377!
1002

1003
  TAOS_CHECK_GOTO(mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj), NULL, _OVER);
2,377✔
1004
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
2,368✔
1005
  code = 0;
2,367✔
1006

1007
_OVER:
2,377✔
1008
  mndTransDrop(pTrans);
2,377✔
1009
  if (mndStbActionDelete(pMnode->pSdb, &stbObj) != 0) mError("failed to mndStbActionDelete");
2,377!
1010
  TAOS_RETURN(code);
2,377✔
1011
}
1012

1013
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
3,088✔
1014
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
3,088✔
1015
  TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
3,088✔
1016
  TAOS_CHECK_RETURN(mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
3,079!
1017
  TAOS_CHECK_RETURN(mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
3,079!
1018
  TAOS_CHECK_RETURN(mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
3,079!
1019
  return 0;
3,079✔
1020
}
1021

1022
static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
4,140✔
1023
  SMnode           *pMnode = pReq->info.node;
4,140✔
1024
  SSdb             *pSdb = pMnode->pSdb;
4,140✔
1025
  SVgObj           *pVgroup = NULL;
4,140✔
1026
  void             *pIter = NULL;
4,140✔
1027
  SVDropTtlTableReq ttlReq = {
4,140✔
1028
      .timestampSec = taosGetTimestampSec(), .ttlDropMaxCount = tsTtlBatchDropNum, .nUids = 0, .pTbUids = NULL};
4,140✔
1029
  int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
4,140✔
1030
  int32_t contLen = reqLen + sizeof(SMsgHead);
4,140✔
1031

1032
  mDebug("start to process ttl timer");
4,140✔
1033

1034
  while (1) {
396,559✔
1035
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
400,699✔
1036
    if (pIter == NULL) break;
400,699✔
1037

1038
    int32_t   code = 0;
396,559✔
1039
    SMsgHead *pHead = rpcMallocCont(contLen);
396,559✔
1040
    if (pHead == NULL) {
396,559!
1041
      sdbRelease(pSdb, pVgroup);
×
1042
      continue;
×
1043
    }
1044
    pHead->contLen = htonl(contLen);
396,559✔
1045
    pHead->vgId = htonl(pVgroup->vgId);
396,559✔
1046
    if ((code = tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), reqLen, &ttlReq)) < 0) {
396,559!
1047
      mError("vgId:%d, failed to serialize drop ttl table request since %s", pVgroup->vgId, tstrerror(code));
×
1048
      sdbRelease(pSdb, pVgroup);
×
1049
      continue;
×
1050
    }
1051

1052
    SRpcMsg rpcMsg = {
396,559✔
1053
        .msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
1054
    SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
396,559✔
1055
    code = tmsgSendReq(&epSet, &rpcMsg);
396,559✔
1056
    if (code != 0) {
396,559✔
1057
      mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
4!
1058
    } else {
1059
      mDebug("vgId:%d, send drop ttl table request to vnode, time:%" PRId32, pVgroup->vgId, ttlReq.timestampSec);
396,555✔
1060
    }
1061
    sdbRelease(pSdb, pVgroup);
396,559✔
1062
  }
1063

1064
  return 0;
4,140✔
1065
}
1066

1067
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq) {
7✔
1068
  SMnode     *pMnode = pReq->info.node;
7✔
1069
  SSdb       *pSdb = pMnode->pSdb;
7✔
1070
  SVgObj     *pVgroup = NULL;
7✔
1071
  void       *pIter = NULL;
7✔
1072
  SVTrimDbReq trimReq = {.timestamp = taosGetTimestampSec()};
7✔
1073
  int32_t     reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq);
7✔
1074
  int32_t     contLen = reqLen + sizeof(SMsgHead);
7✔
1075

1076
  while (1) {
662✔
1077
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
669✔
1078
    if (pIter == NULL) break;
669✔
1079

1080
    int32_t code = 0;
662✔
1081

1082
    SMsgHead *pHead = rpcMallocCont(contLen);
662✔
1083
    if (pHead == NULL) {
662!
1084
      sdbCancelFetch(pSdb, pVgroup);
×
1085
      sdbRelease(pSdb, pVgroup);
×
1086
      continue;
×
1087
    }
1088
    pHead->contLen = htonl(contLen);
662✔
1089
    pHead->vgId = htonl(pVgroup->vgId);
662✔
1090
    if ((code = tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), reqLen, &trimReq)) < 0) {
662!
1091
      mError("vgId:%d, failed to serialize trim db request since %s", pVgroup->vgId, tstrerror(code));
×
1092
    }
1093

1094
    SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen};
662✔
1095
    SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
662✔
1096
    code = tmsgSendReq(&epSet, &rpcMsg);
662✔
1097
    if (code != 0) {
662!
1098
      mError("vgId:%d, timer failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code);
×
1099
    } else {
1100
      mInfo("vgId:%d, timer send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp);
662!
1101
    }
1102
    sdbRelease(pSdb, pVgroup);
662✔
1103
  }
1104

1105
  return 0;
7✔
1106
}
1107

1108
static int32_t mndProcessS3MigrateDbTimer(SRpcMsg *pReq) {
×
1109
  SMnode          *pMnode = pReq->info.node;
×
1110
  SSdb            *pSdb = pMnode->pSdb;
×
1111
  SVgObj          *pVgroup = NULL;
×
1112
  void            *pIter = NULL;
×
1113
  SVS3MigrateDbReq s3migrateReq = {.timestamp = taosGetTimestampSec()};
×
1114
  int32_t          reqLen = tSerializeSVS3MigrateDbReq(NULL, 0, &s3migrateReq);
×
1115
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
1116

1117
  while (1) {
×
1118
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
1119
    if (pIter == NULL) break;
×
1120

1121
    int32_t code = 0;
×
1122

1123
    SMsgHead *pHead = rpcMallocCont(contLen);
×
1124
    if (pHead == NULL) {
×
1125
      sdbRelease(pSdb, pVgroup);
×
1126
      continue;
×
1127
    }
1128
    pHead->contLen = htonl(contLen);
×
1129
    pHead->vgId = htonl(pVgroup->vgId);
×
1130
    if ((code = tSerializeSVS3MigrateDbReq((char *)pHead + sizeof(SMsgHead), reqLen, &s3migrateReq)) < 0) {
×
1131
      mError("vgId:%d, failed to serialize s3migrate db request since %s", pVgroup->vgId, tstrerror(code));
×
1132
      sdbRelease(pSdb, pVgroup);
×
1133
      continue;
×
1134
    }
1135

1136
    SRpcMsg rpcMsg = {.msgType = TDMT_VND_S3MIGRATE, .pCont = pHead, .contLen = contLen};
×
1137
    SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
1138
    code = tmsgSendReq(&epSet, &rpcMsg);
×
1139
    if (code != 0) {
×
1140
      mError("vgId:%d, timer failed to send vnode-s3migrate request to vnode since 0x%x", pVgroup->vgId, code);
×
1141
    } else {
1142
      mInfo("vgId:%d, timer send vnode-s3migrate request to vnode, time:%d", pVgroup->vgId, s3migrateReq.timestamp);
×
1143
    }
1144
    sdbRelease(pSdb, pVgroup);
×
1145
  }
1146

1147
  return 0;
×
1148
}
1149

1150
static int32_t mndFindSuperTableTagIndex(const SStbObj *pStb, const char *tagName) {
321✔
1151
  for (int32_t tag = 0; tag < pStb->numOfTags; tag++) {
1,000✔
1152
    if (strcmp(pStb->pTags[tag].name, tagName) == 0) {
847✔
1153
      return tag;
168✔
1154
    }
1155
  }
1156

1157
  return -1;
153✔
1158
}
1159

1160
static int32_t mndFindSuperTableColumnIndex(const SStbObj *pStb, const char *colName) {
240✔
1161
  for (int32_t col = 0; col < pStb->numOfColumns; col++) {
928✔
1162
    if (strcmp(pStb->pColumns[col].name, colName) == 0) {
772✔
1163
      return col;
84✔
1164
    }
1165
  }
1166

1167
  return -1;
156✔
1168
}
1169

1170
static bool mndValidateSchema(SSchema *pSchemas, int32_t nSchema, SArray *pFields, int32_t maxLen) {
131✔
1171
  int32_t rowLen = 0;
131✔
1172
  for (int32_t i = 0; i < nSchema; ++i) {
501✔
1173
    rowLen += (pSchemas + i)->bytes;
370✔
1174
  }
1175

1176
  int32_t nField = taosArrayGetSize(pFields);
131✔
1177
  for (int32_t i = 0; i < nField; ++i) {
262✔
1178
    rowLen += ((SField *)TARRAY_GET_ELEM(pFields, i))->bytes;
131✔
1179
  }
1180

1181
  return rowLen <= maxLen;
131✔
1182
}
1183

1184
static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq *createReq) {
5✔
1185
  int32_t code = 0;
5✔
1186
  taosRLockLatch(&pStb->lock);
5✔
1187
  memcpy(pDst, pStb, sizeof(SStbObj));
5✔
1188
  taosRUnLockLatch(&pStb->lock);
5✔
1189

1190
  pDst->source = createReq->source;
5✔
1191
  pDst->updateTime = taosGetTimestampMs();
5✔
1192
  pDst->numOfColumns = createReq->numOfColumns;
5✔
1193
  pDst->numOfTags = createReq->numOfTags;
5✔
1194
  pDst->pColumns = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SSchema));
5✔
1195
  pDst->pTags = taosMemoryCalloc(1, pDst->numOfTags * sizeof(SSchema));
5✔
1196
  pDst->pCmpr = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SColCmpr));
5✔
1197

1198
  if (pDst->pColumns == NULL || pDst->pTags == NULL || pDst->pCmpr == NULL) {
5!
1199
    code = terrno;
×
1200
    TAOS_RETURN(code);
×
1201
  }
1202

1203
  if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) {
5!
1204
    code = TSDB_CODE_OUT_OF_RANGE;
×
1205
    TAOS_RETURN(code);
×
1206
  }
1207

1208
  for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
15✔
1209
    SFieldWithOptions *pField = taosArrayGet(createReq->pColumns, i);
10✔
1210
    SSchema           *pSchema = &pDst->pColumns[i];
10✔
1211
    pSchema->type = pField->type;
10✔
1212
    pSchema->bytes = pField->bytes;
10✔
1213
    pSchema->flags = pField->flags;
10✔
1214
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
10✔
1215
    int32_t cIndex = mndFindSuperTableColumnIndex(pStb, pField->name);
10✔
1216
    if (cIndex >= 0) {
10!
1217
      pSchema->colId = pStb->pColumns[cIndex].colId;
10✔
1218
    } else {
1219
      pSchema->colId = pDst->nextColId++;
×
1220
    }
1221
  }
1222

1223
  for (int32_t i = 0; i < pDst->numOfTags; ++i) {
48✔
1224
    SField  *pField = taosArrayGet(createReq->pTags, i);
43✔
1225
    SSchema *pSchema = &pDst->pTags[i];
43✔
1226
    pSchema->type = pField->type;
43✔
1227
    pSchema->bytes = pField->bytes;
43✔
1228
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
43✔
1229
    int32_t cIndex = mndFindSuperTableTagIndex(pStb, pField->name);
43✔
1230
    if (cIndex >= 0) {
43✔
1231
      pSchema->colId = pStb->pTags[cIndex].colId;
37✔
1232
    } else {
1233
      pSchema->colId = pDst->nextColId++;
6✔
1234
    }
1235
  }
1236
  for (int32_t i = 0; i < pDst->numOfColumns; i++) {
15✔
1237
    SColCmpr          *p = pDst->pCmpr + i;
10✔
1238
    SFieldWithOptions *pField = taosArrayGet(createReq->pColumns, i);
10✔
1239
    SSchema           *pSchema = &pDst->pColumns[i];
10✔
1240
    p->id = pSchema->colId;
10✔
1241
    if (pField->compress == 0) {
10!
1242
      p->alg = createDefaultColCmprByType(pSchema->type);
×
1243
    } else {
1244
      p->alg = pField->compress;
10✔
1245
    }
1246
  }
1247
  pDst->tagVer = createReq->tagVer;
5✔
1248
  pDst->colVer = createReq->colVer;
5✔
1249
  return TSDB_CODE_SUCCESS;
5✔
1250
}
1251

1252
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
2,399✔
1253
  SMnode        *pMnode = pReq->info.node;
2,399✔
1254
  int32_t        code = -1;
2,399✔
1255
  SStbObj       *pStb = NULL;
2,399✔
1256
  SDbObj        *pDb = NULL;
2,399✔
1257
  SMCreateStbReq createReq = {0};
2,399✔
1258
  bool           isAlter = false;
2,399✔
1259

1260
  if (tDeserializeSMCreateStbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
2,399!
1261
    code = TSDB_CODE_INVALID_MSG;
×
1262
    goto _OVER;
×
1263
  }
1264

1265
  mInfo("stb:%s, start to create", createReq.name);
2,399!
1266
  if (mndCheckCreateStbReq(&createReq) != 0) {
2,399!
1267
    code = TSDB_CODE_INVALID_MSG;
×
1268
    goto _OVER;
×
1269
  }
1270

1271
  pStb = mndAcquireStb(pMnode, createReq.name);
2,399✔
1272
  if (pStb != NULL) {
2,399✔
1273
    if (createReq.igExists) {
22✔
1274
      if (createReq.source == TD_REQ_FROM_APP) {
19✔
1275
        mInfo("stb:%s, already exist, ignore exist is set", createReq.name);
6!
1276
        code = 0;
6✔
1277
        goto _OVER;
6✔
1278
      } else if (pStb->uid != createReq.suid) {
13!
1279
        mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
×
1280
        code = 0;
×
1281
        goto _OVER;
×
1282
      } else if (createReq.tagVer > 0 || createReq.colVer > 0) {
18!
1283
        int32_t tagDelta = createReq.tagVer - pStb->tagVer;
13✔
1284
        int32_t colDelta = createReq.colVer - pStb->colVer;
13✔
1285
        mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d",
13!
1286
              createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
1287
        if (tagDelta <= 0 && colDelta <= 0) {
13!
1288
          mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name);
8!
1289
          code = 0;
8✔
1290
          goto _OVER;
8✔
1291
        } else if ((tagDelta == 1 && colDelta == 0) || (tagDelta == 0 && colDelta == 1) ||
5!
1292
                   (pStb->colVer == 1 && createReq.colVer > 1) || (pStb->tagVer == 1 && createReq.tagVer > 1)) {
×
1293
          isAlter = true;
5✔
1294
          mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
5!
1295
        } else {
1296
          mError("stb:%s, schema version increase more than 1 number, error is returned", createReq.name);
×
1297
          code = TSDB_CODE_MND_INVALID_SCHEMA_VER;
×
1298
          goto _OVER;
×
1299
        }
1300
      } else {
1301
        mError("stb:%s, already exist while create, input tagVer:%d colVer:%d is invalid, origin tagVer:%d colVer:%d",
×
1302
               createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
1303
        code = TSDB_CODE_MND_INVALID_SCHEMA_VER;
×
1304
        goto _OVER;
×
1305
      }
1306
    } else {
1307
      code = TSDB_CODE_MND_STB_ALREADY_EXIST;
3✔
1308
      goto _OVER;
3✔
1309
    }
1310
  } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
2,377!
1311
    goto _OVER;
×
1312
  } else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) &&
2,377!
1313
             (createReq.tagVer != 1 || createReq.colVer != 1)) {
×
1314
    mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
×
1315
    code = 0;
×
1316
    goto _OVER;
×
1317
  }
1318

1319
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
2,382✔
1320
  if (pDb == NULL) {
2,382!
1321
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1322
    goto _OVER;
×
1323
  }
1324

1325
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
2,382!
1326
    goto _OVER;
×
1327
  }
1328

1329
  int32_t numOfStbs = -1;
2,382✔
1330
  if ((code = mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs)) != 0) {
2,382!
1331
    goto _OVER;
×
1332
  }
1333

1334
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
2,382!
1335
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
1336
    goto _OVER;
×
1337
  }
1338

1339
  if ((code = grantCheck(TSDB_GRANT_STABLE)) < 0) {
2,382!
1340
    goto _OVER;
×
1341
  }
1342

1343
  if (isAlter) {
2,382✔
1344
    bool    needRsp = false;
5✔
1345
    SStbObj pDst = {0};
5✔
1346
    if ((code = mndBuildStbFromAlter(pStb, &pDst, &createReq)) != 0) {
5!
1347
      taosMemoryFreeClear(pDst.pTags);
×
1348
      taosMemoryFreeClear(pDst.pColumns);
×
1349
      taosMemoryFreeClear(pDst.pCmpr);
×
1350
      goto _OVER;
×
1351
    }
1352

1353
    code = mndAlterStbImp(pMnode, pReq, pDb, &pDst, needRsp, NULL, 0);
5✔
1354
    taosMemoryFreeClear(pDst.pTags);
5!
1355
    taosMemoryFreeClear(pDst.pColumns);
5!
1356
    taosMemoryFreeClear(pDst.pCmpr);
5!
1357
  } else {
1358
    code = mndCreateStb(pMnode, pReq, &createReq, pDb);
2,377✔
1359
  }
1360
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
2,382✔
1361

1362
  SName name = {0};
2,382✔
1363
  TAOS_CHECK_RETURN(tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
2,382!
1364

1365
  if (createReq.sql == NULL && createReq.sqlLen == 0) {
2,855!
1366
    char detail[1000] = {0};
473✔
1367

1368
    sprintf(detail, "dbname:%s, stable name:%s", name.dbname, name.tname);
473✔
1369

1370
    auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, detail, strlen(detail));
473✔
1371
  } else {
1372
    auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, createReq.sql, createReq.sqlLen);
1,909✔
1373
  }
1374
_OVER:
2,399✔
1375
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,399✔
1376
    mError("stb:%s, failed to create since %s", createReq.name, tstrerror(code));
13!
1377
  }
1378

1379
  mndReleaseStb(pMnode, pStb);
2,399✔
1380
  mndReleaseDb(pMnode, pDb);
2,399✔
1381
  tFreeSMCreateStbReq(&createReq);
2,399✔
1382

1383
  TAOS_RETURN(code);
2,399✔
1384
}
1385

1386
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
323✔
1387
  int32_t code = 0;
323✔
1388
  if (pAlter->commentLen >= 0) return 0;
323!
1389
  if (pAlter->ttl != 0) return 0;
×
1390

1391
  if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
×
1392
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1393
    TAOS_RETURN(code);
×
1394
  }
1395

1396
  for (int32_t i = 0; i < pAlter->numOfFields; ++i) {
×
1397
    SField *pField = taosArrayGet(pAlter->pFields, i);
×
1398
    if (pField->name[0] == 0) {
×
1399
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1400
      TAOS_RETURN(code);
×
1401
    }
1402
  }
1403

1404
  TAOS_RETURN(code);
×
1405
}
1406

1407
int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
1,309✔
1408
  pNew->pTags = taosMemoryCalloc(pNew->numOfTags, sizeof(SSchema));
1,309✔
1409
  pNew->pColumns = taosMemoryCalloc(pNew->numOfColumns, sizeof(SSchema));
1,309✔
1410
  pNew->pCmpr = taosMemoryCalloc(pNew->numOfColumns, sizeof(SColCmpr));
1,309✔
1411
  if (pNew->pTags == NULL || pNew->pColumns == NULL || pNew->pCmpr == NULL) {
1,309!
1412
    TAOS_RETURN(terrno);
×
1413
  }
1414

1415
  memcpy(pNew->pColumns, pOld->pColumns, sizeof(SSchema) * pOld->numOfColumns);
1,309✔
1416
  memcpy(pNew->pTags, pOld->pTags, sizeof(SSchema) * pOld->numOfTags);
1,309✔
1417
  memcpy(pNew->pCmpr, pOld->pCmpr, sizeof(SColCmpr) * pOld->numOfColumns);
1,309✔
1418

1419
  TAOS_RETURN(0);
1,309✔
1420
}
1421

1422
static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen,
3✔
1423
                                         int32_t ttl) {
1424
  int32_t code = 0;
3✔
1425
  if (commentLen > 0) {
3✔
1426
    pNew->commentLen = commentLen;
2✔
1427
    pNew->comment = taosMemoryCalloc(1, commentLen + 1);
2✔
1428
    if (pNew->comment == NULL) {
2!
1429
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1430
      return -1;
×
1431
    }
1432
    memcpy(pNew->comment, pComment, commentLen + 1);
2✔
1433
  } else if (commentLen == 0) {
1!
1434
    pNew->commentLen = 0;
1✔
1435
  } else {
1436
  }
1437

1438
  if (ttl >= 0) {
3!
1439
    pNew->ttl = ttl;
3✔
1440
  }
1441

1442
  if ((code = mndAllocStbSchemas(pOld, pNew)) != 0) {
3!
1443
    TAOS_RETURN(code);
×
1444
  }
1445
  TAOS_RETURN(code);
3✔
1446
}
1447

1448
static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ntags) {
85✔
1449
  int32_t code = 0;
85✔
1450
  if (pOld->numOfTags + ntags > TSDB_MAX_TAGS) {
85!
1451
    code = TSDB_CODE_MND_TOO_MANY_TAGS;
×
1452
    TAOS_RETURN(code);
×
1453
  }
1454

1455
  if (pOld->numOfColumns + ntags + pOld->numOfTags > TSDB_MAX_COLUMNS) {
85!
1456
    code = TSDB_CODE_MND_TOO_MANY_COLUMNS;
×
1457
    TAOS_RETURN(code);
×
1458
  }
1459

1460
  if (!mndValidateSchema(pOld->pTags, pOld->numOfTags, pFields, TSDB_MAX_TAGS_LEN)) {
85!
1461
    code = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
×
1462
    TAOS_RETURN(code);
×
1463
  }
1464

1465
  pNew->numOfTags = pNew->numOfTags + ntags;
85✔
1466
  if ((code = mndAllocStbSchemas(pOld, pNew)) != 0) {
85!
1467
    TAOS_RETURN(code);
×
1468
  }
1469

1470
  if (pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags) {
85!
1471
    code = TSDB_CODE_OUT_OF_RANGE;
×
1472
    TAOS_RETURN(code);
×
1473
  }
1474

1475
  for (int32_t i = 0; i < ntags; i++) {
159✔
1476
    SField *pField = taosArrayGet(pFields, i);
85✔
1477
    if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
85✔
1478
      code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
5✔
1479
      TAOS_RETURN(code);
5✔
1480
    }
1481

1482
    if (mndFindSuperTableTagIndex(pOld, pField->name) >= 0) {
80✔
1483
      code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
6✔
1484
      TAOS_RETURN(code);
6✔
1485
    }
1486

1487
    SSchema *pSchema = &pNew->pTags[pOld->numOfTags + i];
74✔
1488
    pSchema->bytes = pField->bytes;
74✔
1489
    pSchema->type = pField->type;
74✔
1490
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
74✔
1491
    pSchema->colId = pNew->nextColId;
74✔
1492
    pNew->nextColId++;
74✔
1493

1494
    mInfo("stb:%s, start to add tag %s", pNew->name, pSchema->name);
74!
1495
  }
1496

1497
  pNew->tagVer++;
74✔
1498
  TAOS_RETURN(code);
74✔
1499
}
1500

1501
static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
1,474✔
1502
  int32_t code = 0;
1,474✔
1503
  SSdb   *pSdb = pMnode->pSdb;
1,474✔
1504
  void   *pIter = NULL;
1,474✔
1505
  while (1) {
×
1506
    SMqTopicObj *pTopic = NULL;
1,474✔
1507
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
1,474✔
1508
    if (pIter == NULL) break;
1,474!
1509

1510
    mInfo("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
×
1511
          pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql);
1512
    if (pTopic->ast == NULL) {
×
1513
      sdbRelease(pSdb, pTopic);
×
1514
      continue;
×
1515
    }
1516

1517
    SNode *pAst = NULL;
×
1518
    if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
×
1519
      code = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
×
1520
      mError("topic:%s, create ast error", pTopic->name);
×
1521
      sdbRelease(pSdb, pTopic);
×
1522
      sdbCancelFetch(pSdb, pIter);
×
1523
      TAOS_RETURN(code);
×
1524
    }
1525

1526
    SNodeList *pNodeList = NULL;
×
1527
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
×
1528
        0) {
1529
      sdbRelease(pSdb, pTopic);
×
1530
      sdbCancelFetch(pSdb, pIter);
×
1531
      TAOS_RETURN(code);
×
1532
    }
1533
    SNode *pNode = NULL;
×
1534
    FOREACH(pNode, pNodeList) {
×
1535
      SColumnNode *pCol = (SColumnNode *)pNode;
×
1536
      mInfo("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId, pCol->tableId,
×
1537
            pTopic->ctbStbUid);
1538

1539
      if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
×
1540
        mInfo("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
×
1541
        goto NEXT;
×
1542
      }
1543
      if (pCol->colId > 0 && pCol->colId == colId) {
×
1544
        code = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
×
1545
        mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
×
1546
        nodesDestroyNode(pAst);
×
1547
        nodesDestroyList(pNodeList);
×
1548
        sdbCancelFetch(pSdb, pIter);
×
1549
        sdbRelease(pSdb, pTopic);
×
1550
        TAOS_RETURN(code);
×
1551
      }
1552
      mInfo("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
×
1553
    }
1554

1555
  NEXT:
×
1556
    sdbRelease(pSdb, pTopic);
×
1557
    nodesDestroyNode(pAst);
×
1558
    nodesDestroyList(pNodeList);
×
1559
  }
1560
  TAOS_RETURN(code);
1,474✔
1561
}
1562

1563
static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
1,474✔
1564
  int32_t code = 0;
1,474✔
1565
  SSdb   *pSdb = pMnode->pSdb;
1,474✔
1566
  void   *pIter = NULL;
1,474✔
1567
  while (1) {
2,564✔
1568
    SStreamObj *pStream = NULL;
4,038✔
1569
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
4,038✔
1570
    if (pIter == NULL) break;
4,038✔
1571

1572
    SNode *pAst = NULL;
2,564✔
1573
    if (nodesStringToNode(pStream->ast, &pAst) != 0) {
2,564!
1574
      code = TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
1575
      mError("stream:%s, create ast error", pStream->name);
×
1576
      sdbRelease(pSdb, pStream);
×
1577
      sdbCancelFetch(pSdb, pIter);
×
1578
      TAOS_RETURN(code);
×
1579
    }
1580

1581
    SNodeList *pNodeList = NULL;
2,564✔
1582
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
2,564!
1583
        0) {
1584
      sdbRelease(pSdb, pStream);
×
1585
      sdbCancelFetch(pSdb, pIter);
×
1586
      TAOS_RETURN(code);
×
1587
    }
1588

1589
    SNode *pNode = NULL;
2,564✔
1590
    FOREACH(pNode, pNodeList) {
2,564!
1591
      SColumnNode *pCol = (SColumnNode *)pNode;
2,564✔
1592

1593
      if (pCol->tableId != suid) {
2,564!
1594
        mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
2,564!
1595
        goto NEXT;
2,564✔
1596
      }
1597
      if (pCol->colId > 0 && pCol->colId == colId) {
×
1598
        code = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
1599
        mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId);
×
1600
        nodesDestroyNode(pAst);
×
1601
        nodesDestroyList(pNodeList);
×
1602
        sdbRelease(pSdb, pStream);
×
1603
        sdbCancelFetch(pSdb, pIter);
×
1604
        TAOS_RETURN(code);
×
1605
      }
1606
      mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
×
1607
    }
1608

1609
  NEXT:
×
1610
    sdbRelease(pSdb, pStream);
2,564✔
1611
    nodesDestroyNode(pAst);
2,564✔
1612
    nodesDestroyList(pNodeList);
2,564✔
1613
  }
1614
  TAOS_RETURN(code);
1,474✔
1615
}
1616

1617
static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
1,474✔
1618
  int32_t code = 0;
1,474✔
1619
  SSdb   *pSdb = pMnode->pSdb;
1,474✔
1620
  void   *pIter = NULL;
1,474✔
1621
  while (1) {
×
1622
    SSmaObj *pSma = NULL;
1,474✔
1623
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
1,474✔
1624
    if (pIter == NULL) break;
1,474!
1625

1626
    mInfo("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbFullName,
×
1627
          suid, colId, pSma->sql);
1628

1629
    SNode *pAst = NULL;
×
1630
    if (nodesStringToNode(pSma->ast, &pAst) != 0) {
×
1631
      code = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
×
1632
      mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
×
1633
             pSma->name, stbFullName, suid, colId);
1634
      sdbCancelFetch(pSdb, pIter);
×
1635
      TAOS_RETURN(code);
×
1636
    }
1637

1638
    SNodeList *pNodeList = NULL;
×
1639
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
×
1640
        0) {
1641
      sdbCancelFetch(pSdb, pIter);
×
1642
      TAOS_RETURN(code);
×
1643
    }
1644
    SNode *pNode = NULL;
×
1645
    FOREACH(pNode, pNodeList) {
×
1646
      SColumnNode *pCol = (SColumnNode *)pNode;
×
1647
      mInfo("tsma:%s, check colId:%d tableId:%" PRId64, pSma->name, pCol->colId, pCol->tableId);
×
1648

1649
      if ((pCol->tableId != suid) && (pSma->stbUid != suid)) {
×
1650
        mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
×
1651
        goto NEXT;
×
1652
      }
1653
      if ((pCol->colId) > 0 && (pCol->colId == colId)) {
×
1654
        code = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA;
×
1655
        mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId);
×
1656
        nodesDestroyNode(pAst);
×
1657
        nodesDestroyList(pNodeList);
×
1658
        sdbRelease(pSdb, pSma);
×
1659
        sdbCancelFetch(pSdb, pIter);
×
1660
        TAOS_RETURN(code);
×
1661
      }
1662
      mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
×
1663
    }
1664

1665
  NEXT:
×
1666
    sdbRelease(pSdb, pSma);
×
1667
    nodesDestroyNode(pAst);
×
1668
    nodesDestroyList(pNodeList);
×
1669
  }
1670
  TAOS_RETURN(code);
1,474✔
1671
}
1672

1673
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
1,474✔
1674
  TAOS_CHECK_RETURN(mndCheckAlterColForTopic(pMnode, stbFullName, suid, colId));
1,474!
1675
  TAOS_CHECK_RETURN(mndCheckAlterColForStream(pMnode, stbFullName, suid, colId));
1,474!
1676
  TAOS_CHECK_RETURN(mndCheckAlterColForTSma(pMnode, stbFullName, suid, colId));
1,474!
1677
  TAOS_RETURN(0);
1,474✔
1678
}
1679

1680
static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
65✔
1681
  int32_t code = 0;
65✔
1682
  int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
65✔
1683
  if (tag < 0) {
65✔
1684
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
3✔
1685
    TAOS_RETURN(code);
3✔
1686
  }
1687

1688
  col_id_t colId = pOld->pTags[tag].colId;
62✔
1689
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
62!
1690

1691
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
62!
1692

1693
  memmove(pNew->pTags + tag, pNew->pTags + tag + 1, sizeof(SSchema) * (pNew->numOfTags - tag - 1));
62✔
1694
  pNew->numOfTags--;
62✔
1695

1696
  pNew->tagVer++;
62✔
1697

1698
  // if (mndDropIndexByTag(pMnode, pOld, tagName) != 0) {
1699
  //   return -1;
1700
  // }
1701
  mInfo("stb:%s, start to drop tag %s", pNew->name, tagName);
62!
1702
  TAOS_RETURN(code);
62✔
1703
}
1704

1705
static int32_t mndAlterStbTagName(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
46✔
1706
  int32_t code = 0;
46✔
1707
  if ((int32_t)taosArrayGetSize(pFields) != 2) {
46!
1708
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1709
    TAOS_RETURN(code);
×
1710
  }
1711

1712
  SField *pField0 = taosArrayGet(pFields, 0);
46✔
1713
  SField *pField1 = taosArrayGet(pFields, 1);
46✔
1714

1715
  const char *oldTagName = pField0->name;
46✔
1716
  const char *newTagName = pField1->name;
46✔
1717

1718
  int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName);
46✔
1719
  if (tag < 0) {
46✔
1720
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
7✔
1721
    TAOS_RETURN(code);
7✔
1722
  }
1723

1724
  col_id_t colId = pOld->pTags[tag].colId;
39✔
1725
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
39!
1726

1727
  if (mndFindSuperTableTagIndex(pOld, newTagName) >= 0) {
39✔
1728
    code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
9✔
1729
    TAOS_RETURN(code);
9✔
1730
  }
1731

1732
  if (mndFindSuperTableColumnIndex(pOld, newTagName) >= 0) {
30!
1733
    code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
×
1734
    TAOS_RETURN(code);
×
1735
  }
1736

1737
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
30!
1738

1739
  SSchema *pSchema = (SSchema *)(pNew->pTags + tag);
30✔
1740
  memcpy(pSchema->name, newTagName, TSDB_COL_NAME_LEN);
30✔
1741

1742
  pNew->tagVer++;
30✔
1743
  mInfo("stb:%s, start to modify tag %s to %s", pNew->name, oldTagName, newTagName);
30!
1744
  TAOS_RETURN(code);
30✔
1745
}
1746

1747
static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
9✔
1748
  int32_t code = 0;
9✔
1749
  int32_t tag = mndFindSuperTableTagIndex(pOld, pField->name);
9✔
1750
  if (tag < 0) {
9!
1751
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
×
1752
    TAOS_RETURN(code);
×
1753
  }
1754

1755
  col_id_t colId = pOld->pTags[tag].colId;
9✔
1756
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
9!
1757

1758
  uint32_t nLen = 0;
9✔
1759
  for (int32_t i = 0; i < pOld->numOfTags; ++i) {
27✔
1760
    nLen += (pOld->pTags[i].colId == colId) ? pField->bytes : pOld->pTags[i].bytes;
18✔
1761
  }
1762

1763
  if (nLen > TSDB_MAX_TAGS_LEN) {
9!
1764
    code = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
×
1765
    TAOS_RETURN(code);
×
1766
  }
1767

1768
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
9!
1769

1770
  SSchema *pTag = pNew->pTags + tag;
9✔
1771

1772
  if (!(pTag->type == TSDB_DATA_TYPE_BINARY || pTag->type == TSDB_DATA_TYPE_VARBINARY ||
9!
1773
        pTag->type == TSDB_DATA_TYPE_NCHAR || pTag->type == TSDB_DATA_TYPE_GEOMETRY)) {
3!
1774
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1775
    TAOS_RETURN(code);
×
1776
  }
1777

1778
  if (pField->bytes <= pTag->bytes) {
9!
1779
    code = TSDB_CODE_MND_INVALID_ROW_BYTES;
×
1780
    TAOS_RETURN(code);
×
1781
  }
1782

1783
  pTag->bytes = pField->bytes;
9✔
1784
  pNew->tagVer++;
9✔
1785

1786
  mInfo("stb:%s, start to modify tag len %s to %d", pNew->name, pField->name, pField->bytes);
9!
1787
  TAOS_RETURN(code);
9✔
1788
}
1789

1790
static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pField,
25✔
1791
                                                 int32_t nCols) {
1792
  // if (pColCmpr == NULL || colName == NULL) return -1;
1793

1794
  if (taosArrayGetSize(pField) != nCols) return TSDB_CODE_FAILED;
25!
1795
  TAOS_FIELD *p = taosArrayGet(pField, 0);
25✔
1796

1797
  int32_t code = 0;
25✔
1798
  int32_t idx = mndFindSuperTableColumnIndex(pOld, p->name);
25✔
1799
  if (idx == -1) {
25!
1800
    code = TSDB_CODE_MND_COLUMN_NOT_EXIST;
×
1801
    TAOS_RETURN(code);
×
1802
  }
1803
  SSchema *pTarget = &pOld->pColumns[idx];
25✔
1804
  col_id_t colId = pTarget->colId;
25✔
1805
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
25!
1806

1807
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
25!
1808
  code = validColCmprByType(pTarget->type, p->bytes);
25✔
1809
  if (code != TSDB_CODE_SUCCESS) {
25✔
1810
    TAOS_RETURN(code);
1✔
1811
  }
1812

1813
  int8_t updated = 0;
24✔
1814
  for (int i = 0; i < pNew->numOfColumns; i++) {
158!
1815
    SColCmpr *pCmpr = &pNew->pCmpr[i];
158✔
1816
    if (pCmpr->id == colId) {
158✔
1817
      uint32_t dst = 0;
24✔
1818
      updated = tUpdateCompress(pCmpr->alg, p->bytes, TSDB_COLVAL_COMPRESS_DISABLED, TSDB_COLVAL_LEVEL_DISABLED,
24✔
1819
                                TSDB_COLVAL_LEVEL_MEDIUM, &dst);
1820
      if (updated > 0) pCmpr->alg = dst;
24✔
1821
      break;
24✔
1822
    }
1823
  }
1824

1825
  if (updated == 0) {
24✔
1826
    code = TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST;
8✔
1827
    TAOS_RETURN(code);
8✔
1828
  } else if (updated == -1) {
16!
1829
    code = TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
×
1830
    TAOS_RETURN(code);
×
1831
  }
1832

1833
  pNew->colVer++;
16✔
1834

1835
  TAOS_RETURN(code);
16✔
1836
}
1837
static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ncols,
46✔
1838
                                      int8_t withCompress) {
1839
  int32_t code = 0;
46✔
1840
  if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) {
46!
1841
    code = TSDB_CODE_MND_TOO_MANY_COLUMNS;
×
1842
    TAOS_RETURN(code);
×
1843
  }
1844

1845
  if ((code = grantCheck(TSDB_GRANT_TIMESERIES)) != 0) {
46!
1846
    TAOS_RETURN(code);
×
1847
  }
1848

1849
  if (!mndValidateSchema(pOld->pColumns, pOld->numOfColumns, pFields, TSDB_MAX_BYTES_PER_ROW)) {
46!
1850
    code = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
×
1851
    TAOS_RETURN(code);
×
1852
  }
1853

1854
  pNew->numOfColumns = pNew->numOfColumns + ncols;
46✔
1855

1856
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
46!
1857

1858
  if (pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ncols) {
46!
1859
    code = TSDB_CODE_OUT_OF_RANGE;
×
1860
    TAOS_RETURN(code);
×
1861
  }
1862

1863
  for (int32_t i = 0; i < ncols; i++) {
79✔
1864
    if (withCompress) {
46✔
1865
      SFieldWithOptions *pField = taosArrayGet(pFields, i);
2✔
1866
      if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
2!
1867
        code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
×
1868
        TAOS_RETURN(code);
×
1869
      }
1870

1871
      if (mndFindSuperTableTagIndex(pOld, pField->name) >= 0) {
2!
1872
        code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
×
1873
        TAOS_RETURN(code);
×
1874
      }
1875

1876
      SSchema *pSchema = &pNew->pColumns[pOld->numOfColumns + i];
2✔
1877
      pSchema->bytes = pField->bytes;
2✔
1878
      pSchema->type = pField->type;
2✔
1879
      memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
2✔
1880
      pSchema->colId = pNew->nextColId;
2✔
1881
      pNew->nextColId++;
2✔
1882

1883
      SColCmpr *pCmpr = &pNew->pCmpr[pOld->numOfColumns + i];
2✔
1884
      pCmpr->id = pSchema->colId;
2✔
1885
      pCmpr->alg = pField->compress;
2✔
1886
      mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name);
2!
1887
    } else {
1888
      SField *pField = taosArrayGet(pFields, i);
44✔
1889
      if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
44✔
1890
        code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
7✔
1891
        TAOS_RETURN(code);
7✔
1892
      }
1893

1894
      if (mndFindSuperTableTagIndex(pOld, pField->name) >= 0) {
37✔
1895
        code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
6✔
1896
        TAOS_RETURN(code);
6✔
1897
      }
1898

1899
      SSchema *pSchema = &pNew->pColumns[pOld->numOfColumns + i];
31✔
1900
      pSchema->bytes = pField->bytes;
31✔
1901
      pSchema->type = pField->type;
31✔
1902
      memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
31✔
1903
      pSchema->colId = pNew->nextColId;
31✔
1904
      pNew->nextColId++;
31✔
1905

1906
      SColCmpr *pCmpr = &pNew->pCmpr[pOld->numOfColumns + i];
31✔
1907
      pCmpr->id = pSchema->colId;
31✔
1908
      pCmpr->alg = createDefaultColCmprByType(pSchema->type);
31✔
1909
      mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name);
31!
1910
    }
1911
  }
1912

1913
  pNew->colVer++;
33✔
1914
  TAOS_RETURN(code);
33✔
1915
}
1916

1917
static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *colName) {
30✔
1918
  int32_t code = 0;
30✔
1919
  int32_t col = mndFindSuperTableColumnIndex(pOld, colName);
30✔
1920
  if (col < 0) {
30✔
1921
    code = TSDB_CODE_MND_COLUMN_NOT_EXIST;
6✔
1922
    TAOS_RETURN(code);
6✔
1923
  }
1924

1925
  if (col == 0) {
24✔
1926
    code = TSDB_CODE_MND_INVALID_STB_ALTER_OPTION;
4✔
1927
    TAOS_RETURN(code);
4✔
1928
  }
1929

1930
  if (pOld->numOfColumns == 2) {
20!
1931
    code = TSDB_CODE_PAR_INVALID_DROP_COL;
×
1932
    TAOS_RETURN(code);
×
1933
  }
1934

1935
  col_id_t colId = pOld->pColumns[col].colId;
20✔
1936
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
20!
1937

1938
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
20!
1939

1940
  int32_t sz = pNew->numOfColumns - col - 1;
20✔
1941
  memmove(pNew->pColumns + col, pNew->pColumns + col + 1, sizeof(SSchema) * sz);
20✔
1942
  memmove(pNew->pCmpr + col, pNew->pCmpr + col + 1, sizeof(SColCmpr) * sz);
20✔
1943
  pNew->numOfColumns--;
20✔
1944

1945
  pNew->colVer++;
20✔
1946
  mInfo("stb:%s, start to drop col %s", pNew->name, colName);
20!
1947
  TAOS_RETURN(code);
20✔
1948
}
1949

1950
static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
14✔
1951
  int32_t code = 0;
14✔
1952
  int32_t col = mndFindSuperTableColumnIndex(pOld, pField->name);
14✔
1953
  if (col < 0) {
14✔
1954
    code = TSDB_CODE_MND_COLUMN_NOT_EXIST;
1✔
1955
    TAOS_RETURN(code);
1✔
1956
  }
1957

1958
  col_id_t colId = pOld->pColumns[col].colId;
13✔
1959

1960
  uint32_t nLen = 0;
13✔
1961
  for (int32_t i = 0; i < pOld->numOfColumns; ++i) {
60✔
1962
    nLen += (pOld->pColumns[i].colId == colId) ? pField->bytes : pOld->pColumns[i].bytes;
47✔
1963
  }
1964

1965
  if (nLen > TSDB_MAX_BYTES_PER_ROW) {
13!
1966
    code = TSDB_CODE_MND_INVALID_ROW_BYTES;
×
1967
    TAOS_RETURN(code);
×
1968
  }
1969

1970
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
13!
1971

1972
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
13!
1973

1974
  SSchema *pCol = pNew->pColumns + col;
13✔
1975
  if (!(pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_VARBINARY ||
13!
1976
        pCol->type == TSDB_DATA_TYPE_NCHAR || pCol->type == TSDB_DATA_TYPE_GEOMETRY)) {
3!
1977
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1978
    TAOS_RETURN(code);
×
1979
  }
1980

1981
  if (pField->bytes <= pCol->bytes) {
13!
1982
    code = TSDB_CODE_MND_INVALID_ROW_BYTES;
×
1983
    TAOS_RETURN(code);
×
1984
  }
1985

1986
  pCol->bytes = pField->bytes;
13✔
1987
  pNew->colVer++;
13✔
1988

1989
  mInfo("stb:%s, start to modify col len %s to %d", pNew->name, pField->name, pField->bytes);
13!
1990
  TAOS_RETURN(code);
13✔
1991
}
1992

1993
static int32_t mndSetAlterStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
265✔
1994
  int32_t  code = 0;
265✔
1995
  SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
265✔
1996
  if (pRedoRaw == NULL) {
265!
1997
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1998
    if (terrno != 0) code = terrno;
×
1999
    TAOS_RETURN(code);
×
2000
  }
2001
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
265!
2002
    sdbFreeRaw(pRedoRaw);
×
2003
    TAOS_RETURN(code);
×
2004
  }
2005
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
265!
2006

2007
  TAOS_RETURN(code);
265✔
2008
}
2009

2010
static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
265✔
2011
  int32_t  code = 0;
265✔
2012
  SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
265✔
2013
  if (pCommitRaw == NULL) {
265!
2014
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2015
    if (terrno != 0) code = terrno;
×
2016
    TAOS_RETURN(code);
×
2017
  }
2018
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
265!
2019
    sdbFreeRaw(pCommitRaw);
×
2020
    TAOS_RETURN(code);
×
2021
  }
2022
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
265!
2023

2024
  TAOS_RETURN(code);
265✔
2025
}
2026

2027
static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void *alterOriData,
265✔
2028
                                         int32_t alterOriDataLen) {
2029
  int32_t code = 0;
265✔
2030
  SSdb   *pSdb = pMnode->pSdb;
265✔
2031
  SVgObj *pVgroup = NULL;
265✔
2032
  void   *pIter = NULL;
265✔
2033
  int32_t contLen;
2034

2035
  while (1) {
648✔
2036
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
913✔
2037
    if (pIter == NULL) break;
913✔
2038
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
648✔
2039
      sdbRelease(pSdb, pVgroup);
118✔
2040
      continue;
118✔
2041
    }
2042

2043
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, alterOriData, alterOriDataLen);
530✔
2044
    if (pReq == NULL) {
530!
2045
      sdbCancelFetch(pSdb, pIter);
×
2046
      sdbRelease(pSdb, pVgroup);
×
2047
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2048
      if (terrno != 0) code = terrno;
×
2049
      TAOS_RETURN(code);
×
2050
    }
2051
    STransAction action = {0};
530✔
2052
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
530✔
2053
    action.pCont = pReq;
530✔
2054
    action.contLen = contLen;
530✔
2055
    action.msgType = TDMT_VND_ALTER_STB;
530✔
2056
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
530!
2057
      taosMemoryFree(pReq);
×
2058
      sdbCancelFetch(pSdb, pIter);
×
2059
      sdbRelease(pSdb, pVgroup);
×
2060
      TAOS_RETURN(code);
×
2061
    }
2062
    sdbRelease(pSdb, pVgroup);
530✔
2063
  }
2064

2065
  TAOS_RETURN(code);
265✔
2066
}
2067

2068
static int32_t mndSetAlterStbRedoActions2(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb,
×
2069
                                          void *alterOriData, int32_t alterOriDataLen) {
2070
  int32_t code = 0;
×
2071
  SSdb   *pSdb = pMnode->pSdb;
×
2072
  SVgObj *pVgroup = NULL;
×
2073
  void   *pIter = NULL;
×
2074
  int32_t contLen;
2075

2076
  while (1) {
×
2077
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2078
    if (pIter == NULL) break;
×
2079
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
×
2080
      sdbRelease(pSdb, pVgroup);
×
2081
      continue;
×
2082
    }
2083

2084
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, alterOriData, alterOriDataLen);
×
2085
    if (pReq == NULL) {
×
2086
      sdbCancelFetch(pSdb, pIter);
×
2087
      sdbRelease(pSdb, pVgroup);
×
2088
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2089
      if (terrno != 0) code = terrno;
×
2090
      TAOS_RETURN(code);
×
2091
    }
2092
    STransAction action = {0};
×
2093
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
2094
    action.pCont = pReq;
×
2095
    action.contLen = contLen;
×
2096
    action.msgType = TDMT_VND_CREATE_INDEX;
×
2097
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
2098
      taosMemoryFree(pReq);
×
2099
      sdbCancelFetch(pSdb, pIter);
×
2100
      sdbRelease(pSdb, pVgroup);
×
2101
      TAOS_RETURN(code);
×
2102
    }
2103
    sdbRelease(pSdb, pVgroup);
×
2104
  }
2105

2106
  TAOS_RETURN(code);
×
2107
}
2108
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
16,719✔
2109
  int32_t code = 0;
16,719✔
2110
  taosRLockLatch(&pStb->lock);
16,719✔
2111

2112
  int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
16,718✔
2113
  pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
16,718✔
2114
  if (pRsp->pSchemas == NULL) {
16,719!
2115
    taosRUnLockLatch(&pStb->lock);
×
2116
    code = terrno;
×
2117
    TAOS_RETURN(code);
×
2118
  }
2119
  pRsp->pSchemaExt = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaExt));
16,719✔
2120
  if (pRsp->pSchemaExt == NULL) {
16,719!
2121
    taosRUnLockLatch(&pStb->lock);
×
2122
    code = terrno;
×
2123
    TAOS_RETURN(code);
×
2124
  }
2125

2126
  tstrncpy(pRsp->dbFName, pStb->db, sizeof(pRsp->dbFName));
16,719✔
2127
  tstrncpy(pRsp->tbName, tbName, sizeof(pRsp->tbName));
16,719✔
2128
  tstrncpy(pRsp->stbName, tbName, sizeof(pRsp->stbName));
16,719✔
2129
  pRsp->dbId = pDb->uid;
16,719✔
2130
  pRsp->numOfTags = pStb->numOfTags;
16,719✔
2131
  pRsp->numOfColumns = pStb->numOfColumns;
16,719✔
2132
  pRsp->precision = pDb->cfg.precision;
16,719✔
2133
  pRsp->tableType = TSDB_SUPER_TABLE;
16,719✔
2134
  pRsp->sversion = pStb->colVer;
16,719✔
2135
  pRsp->tversion = pStb->tagVer;
16,719✔
2136
  pRsp->suid = pStb->uid;
16,719✔
2137
  pRsp->tuid = pStb->uid;
16,719✔
2138

2139
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
379,846✔
2140
    SSchema *pSchema = &pRsp->pSchemas[i];
363,127✔
2141
    SSchema *pSrcSchema = &pStb->pColumns[i];
363,127✔
2142
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
363,127✔
2143
    pSchema->type = pSrcSchema->type;
363,127✔
2144
    pSchema->flags = pSrcSchema->flags;
363,127✔
2145
    pSchema->colId = pSrcSchema->colId;
363,127✔
2146
    pSchema->bytes = pSrcSchema->bytes;
363,127✔
2147
  }
2148

2149
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
230,070✔
2150
    SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns];
213,351✔
2151
    SSchema *pSrcSchema = &pStb->pTags[i];
213,351✔
2152
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
213,351✔
2153
    pSchema->type = pSrcSchema->type;
213,351✔
2154
    pSchema->flags = pSrcSchema->flags;
213,351✔
2155
    pSchema->colId = pSrcSchema->colId;
213,351✔
2156
    pSchema->bytes = pSrcSchema->bytes;
213,351✔
2157
  }
2158
  for (int32_t i = 0; i < pStb->numOfColumns; i++) {
379,839✔
2159
    SColCmpr   *pCmpr = &pStb->pCmpr[i];
363,120✔
2160
    SSchemaExt *pSchEx = &pRsp->pSchemaExt[i];
363,120✔
2161
    pSchEx->colId = pCmpr->id;
363,120✔
2162
    pSchEx->compress = pCmpr->alg;
363,120✔
2163
  }
2164

2165
  taosRUnLockLatch(&pStb->lock);
16,719✔
2166
  TAOS_RETURN(code);
16,719✔
2167
}
2168

2169
static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableCfgRsp *pRsp) {
22✔
2170
  int32_t code = 0;
22✔
2171
  taosRLockLatch(&pStb->lock);
22✔
2172

2173
  int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
22✔
2174
  pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
22✔
2175
  if (pRsp->pSchemas == NULL) {
22!
2176
    taosRUnLockLatch(&pStb->lock);
×
2177
    code = terrno;
×
2178
    TAOS_RETURN(code);
×
2179
  }
2180

2181
  tstrncpy(pRsp->dbFName, pStb->db, sizeof(pRsp->dbFName));
22✔
2182
  tstrncpy(pRsp->tbName, tbName, sizeof(pRsp->tbName));
22✔
2183
  tstrncpy(pRsp->stbName, tbName, sizeof(pRsp->stbName));
22✔
2184
  pRsp->numOfTags = pStb->numOfTags;
22✔
2185
  pRsp->numOfColumns = pStb->numOfColumns;
22✔
2186
  pRsp->tableType = TSDB_SUPER_TABLE;
22✔
2187
  pRsp->delay1 = pStb->maxdelay[0];
22✔
2188
  pRsp->delay2 = pStb->maxdelay[1];
22✔
2189
  pRsp->watermark1 = pStb->watermark[0];
22✔
2190
  pRsp->watermark2 = pStb->watermark[1];
22✔
2191
  pRsp->ttl = pStb->ttl;
22✔
2192
  pRsp->commentLen = pStb->commentLen;
22✔
2193
  if (pStb->commentLen > 0) {
22!
2194
    pRsp->pComment = taosStrdup(pStb->comment);
×
2195
  }
2196

2197
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
70✔
2198
    SSchema *pSchema = &pRsp->pSchemas[i];
48✔
2199
    SSchema *pSrcSchema = &pStb->pColumns[i];
48✔
2200
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
48✔
2201
    pSchema->type = pSrcSchema->type;
48✔
2202
    pSchema->flags = pSrcSchema->flags;
48✔
2203
    pSchema->colId = pSrcSchema->colId;
48✔
2204
    pSchema->bytes = pSrcSchema->bytes;
48✔
2205
  }
2206

2207
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
47✔
2208
    SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns];
25✔
2209
    SSchema *pSrcSchema = &pStb->pTags[i];
25✔
2210
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
25✔
2211
    pSchema->type = pSrcSchema->type;
25✔
2212
    pSchema->flags = pSrcSchema->flags;
25✔
2213
    pSchema->colId = pSrcSchema->colId;
25✔
2214
    pSchema->bytes = pSrcSchema->bytes;
25✔
2215
  }
2216

2217
  if (pStb->numOfFuncs > 0) {
22!
2218
    pRsp->pFuncs = taosArrayDup(pStb->pFuncs, NULL);
×
2219
  }
2220

2221
  pRsp->pSchemaExt = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaExt));
22✔
2222
  for (int32_t i = 0; i < pStb->numOfColumns; i++) {
70✔
2223
    SColCmpr *pCmpr = &pStb->pCmpr[i];
48✔
2224

2225
    SSchemaExt *pSchExt = &pRsp->pSchemaExt[i];
48✔
2226
    pSchExt->colId = pCmpr->id;
48✔
2227
    pSchExt->compress = pCmpr->alg;
48✔
2228
  }
2229

2230
  taosRUnLockLatch(&pStb->lock);
22✔
2231
  TAOS_RETURN(code);
22✔
2232
}
2233

2234
static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bool *schema, bool *sma) {
18,637✔
2235
  int32_t code = 0;
18,637✔
2236
  char    tbFName[TSDB_TABLE_FNAME_LEN] = {0};
18,637✔
2237
  snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName);
18,637✔
2238

2239
  SDbObj *pDb = mndAcquireDb(pMnode, pStbVer->dbFName);
18,637✔
2240
  if (pDb == NULL) {
18,637✔
2241
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
142✔
2242
    TAOS_RETURN(code);
142✔
2243
  }
2244

2245
  if (pDb->uid != pStbVer->dbId) {
18,495!
2246
    mndReleaseDb(pMnode, pDb);
×
2247
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2248
    TAOS_RETURN(code);
×
2249
  }
2250

2251
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
18,495✔
2252
  if (pStb == NULL) {
18,495✔
2253
    mndReleaseDb(pMnode, pDb);
1✔
2254
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
1✔
2255
    TAOS_RETURN(code);
1✔
2256
  }
2257

2258
  taosRLockLatch(&pStb->lock);
18,494✔
2259

2260
  if (pStbVer->sversion != pStb->colVer || pStbVer->tversion != pStb->tagVer) {
18,494!
2261
    *schema = true;
314✔
2262
  } else {
2263
    *schema = false;
18,180✔
2264
  }
2265

2266
  if (pStbVer->smaVer && pStbVer->smaVer != pStb->smaVer) {
18,494!
2267
    *sma = true;
×
2268
  } else {
2269
    *sma = false;
18,494✔
2270
  }
2271

2272
  taosRUnLockLatch(&pStb->lock);
18,494✔
2273

2274
  mndReleaseDb(pMnode, pDb);
18,494✔
2275
  mndReleaseStb(pMnode, pStb);
18,494✔
2276
  return TSDB_CODE_SUCCESS;
18,494✔
2277
}
2278

2279
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
14,800✔
2280
  int32_t code = 0;
14,800✔
2281
  char    tbFName[TSDB_TABLE_FNAME_LEN] = {0};
14,800✔
2282
  snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
14,800✔
2283

2284
  SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
14,800✔
2285
  if (pDb == NULL) {
14,800!
2286
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2287
    TAOS_RETURN(code);
×
2288
  }
2289

2290
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
14,800✔
2291
  if (pStb == NULL) {
14,800✔
2292
    mndReleaseDb(pMnode, pDb);
990✔
2293
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
990✔
2294
    TAOS_RETURN(code);
990✔
2295
  }
2296

2297
  code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp);
13,810✔
2298
  mndReleaseDb(pMnode, pDb);
13,810✔
2299
  mndReleaseStb(pMnode, pStb);
13,810✔
2300
  TAOS_RETURN(code);
13,810✔
2301
}
2302

2303
static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
22✔
2304
  int32_t code = 0;
22✔
2305
  char    tbFName[TSDB_TABLE_FNAME_LEN] = {0};
22✔
2306
  snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
22✔
2307

2308
  SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
22✔
2309
  if (pDb == NULL) {
22!
2310
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2311
    TAOS_RETURN(code);
×
2312
  }
2313

2314
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
22✔
2315
  if (pStb == NULL) {
22!
2316
    mndReleaseDb(pMnode, pDb);
×
2317
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
2318
    TAOS_RETURN(code);
×
2319
  }
2320

2321
  code = mndBuildStbCfgImp(pDb, pStb, tbName, pRsp);
22✔
2322

2323
  mndReleaseDb(pMnode, pDb);
22✔
2324
  mndReleaseStb(pMnode, pStb);
22✔
2325
  TAOS_RETURN(code);
22✔
2326
}
2327

2328
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, int32_t *pLen) {
257✔
2329
  int32_t       code = 0;
257✔
2330
  SEncoder      ec = {0};
257✔
2331
  uint32_t      contLen = 0;
257✔
2332
  SMAlterStbRsp alterRsp = {0};
257✔
2333
  SName         name = {0};
257✔
2334
  TAOS_CHECK_RETURN(tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
257!
2335

2336
  alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
257✔
2337
  if (NULL == alterRsp.pMeta) {
257!
2338
    code = terrno;
×
2339
    TAOS_RETURN(code);
×
2340
  }
2341

2342
  code = mndBuildStbSchemaImp(pDb, pObj, name.tname, alterRsp.pMeta);
257✔
2343
  if (code) {
257!
2344
    tFreeSMAlterStbRsp(&alterRsp);
×
2345
    return code;
×
2346
  }
2347

2348
  tEncodeSize(tEncodeSMAlterStbRsp, &alterRsp, contLen, code);
257!
2349
  if (code) {
257!
2350
    tFreeSMAlterStbRsp(&alterRsp);
×
2351
    return code;
×
2352
  }
2353

2354
  void *cont = taosMemoryMalloc(contLen);
257✔
2355
  if (NULL == cont) {
257!
2356
    code = terrno;
×
2357
    tFreeSMAlterStbRsp(&alterRsp);
×
2358
    TAOS_RETURN(code);
×
2359
  }
2360
  tEncoderInit(&ec, cont, contLen);
257✔
2361
  code = tEncodeSMAlterStbRsp(&ec, &alterRsp);
257✔
2362
  tEncoderClear(&ec);
257✔
2363

2364
  tFreeSMAlterStbRsp(&alterRsp);
257✔
2365

2366
  if (code < 0) TAOS_RETURN(code);
257!
2367

2368
  *pCont = cont;
257✔
2369
  *pLen = contLen;
257✔
2370

2371
  TAOS_RETURN(code);
257✔
2372
}
2373

2374
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char *dbFName, char *stbFName, void **pCont, int32_t *pLen) {
2,655✔
2375
  int32_t code = -1;
2,655✔
2376
  SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
2,655✔
2377
  if (NULL == pDb) {
2,655!
2378
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2379
    if (terrno != 0) code = terrno;
×
2380
    TAOS_RETURN(code);
×
2381
  }
2382

2383
  SStbObj *pObj = mndAcquireStb(pMnode, stbFName);
2,655✔
2384
  if (NULL == pObj) {
2,655✔
2385
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
3✔
2386
    if (terrno != 0) code = terrno;
3!
2387
    goto _OVER;
3✔
2388
  }
2389

2390
  SEncoder       ec = {0};
2,652✔
2391
  uint32_t       contLen = 0;
2,652✔
2392
  SMCreateStbRsp stbRsp = {0};
2,652✔
2393
  SName          name = {0};
2,652✔
2394
  TAOS_CHECK_GOTO(tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE), NULL, _OVER);
2,652!
2395

2396
  stbRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
2,652✔
2397
  if (NULL == stbRsp.pMeta) {
2,652!
2398
    code = terrno;
×
2399
    goto _OVER;
×
2400
  }
2401

2402
  code = mndBuildStbSchemaImp(pDb, pObj, name.tname, stbRsp.pMeta);
2,652✔
2403
  if (code) {
2,652!
2404
    tFreeSMCreateStbRsp(&stbRsp);
×
2405
    goto _OVER;
×
2406
  }
2407

2408
  tEncodeSize(tEncodeSMCreateStbRsp, &stbRsp, contLen, code);
2,652!
2409
  if (code) {
2,652!
2410
    tFreeSMCreateStbRsp(&stbRsp);
×
2411
    goto _OVER;
×
2412
  }
2413

2414
  void *cont = taosMemoryMalloc(contLen);
2,652✔
2415
  if (NULL == cont) {
2,652!
2416
    code = terrno;
×
2417
    tFreeSMCreateStbRsp(&stbRsp);
×
2418
    goto _OVER;
×
2419
  }
2420
  tEncoderInit(&ec, cont, contLen);
2,652✔
2421
  TAOS_CHECK_GOTO(tEncodeSMCreateStbRsp(&ec, &stbRsp), NULL, _OVER);
2,652!
2422
  tEncoderClear(&ec);
2,652✔
2423

2424
  tFreeSMCreateStbRsp(&stbRsp);
2,652✔
2425

2426
  *pCont = cont;
2,652✔
2427
  *pLen = contLen;
2,652✔
2428

2429
  code = 0;
2,652✔
2430

2431
_OVER:
2,655✔
2432
  if (pObj) {
2,655✔
2433
    mndReleaseStb(pMnode, pObj);
2,652✔
2434
  }
2435

2436
  if (pDb) {
2,655!
2437
    mndReleaseDb(pMnode, pDb);
2,655✔
2438
  }
2439

2440
  TAOS_RETURN(code);
2,655✔
2441
}
2442

2443
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
173✔
2444
                              void *alterOriData, int32_t alterOriDataLen) {
2445
  int32_t code = -1;
173✔
2446
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-stb");
173✔
2447
  if (pTrans == NULL) {
173!
2448
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2449
    if (terrno != 0) code = terrno;
×
2450
    goto _OVER;
×
2451
  }
2452

2453
  mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
173!
2454
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
173✔
2455
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
173!
2456

2457
  if (needRsp) {
173✔
2458
    void   *pCont = NULL;
165✔
2459
    int32_t contLen = 0;
165✔
2460
    TAOS_CHECK_GOTO(mndBuildSMAlterStbRsp(pDb, pStb, &pCont, &contLen), NULL, _OVER);
165!
2461
    mndTransSetRpcRsp(pTrans, pCont, contLen);
165✔
2462
  }
2463

2464
  TAOS_CHECK_GOTO(mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
173!
2465
  TAOS_CHECK_GOTO(mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
173!
2466
  TAOS_CHECK_GOTO(mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen), NULL, _OVER);
173!
2467
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
173!
2468

2469
  code = 0;
173✔
2470

2471
_OVER:
173✔
2472
  mndTransDrop(pTrans);
173✔
2473
  TAOS_RETURN(code);
173✔
2474
}
2475

2476
static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
92✔
2477
                                             void *alterOriData, int32_t alterOriDataLen, const SMAlterStbReq *pAlter) {
2478
  int32_t code = -1;
92✔
2479
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-stb");
92✔
2480
  if (pTrans == NULL) {
92!
2481
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2482
    if (terrno != 0) code = terrno;
×
2483
    goto _OVER;
×
2484
  }
2485

2486
  mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
92!
2487
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
92✔
2488

2489
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
92!
2490

2491
  if (needRsp) {
92!
2492
    void   *pCont = NULL;
92✔
2493
    int32_t contLen = 0;
92✔
2494
    TAOS_CHECK_GOTO(mndBuildSMAlterStbRsp(pDb, pStb, &pCont, &contLen), NULL, _OVER);
92!
2495
    mndTransSetRpcRsp(pTrans, pCont, contLen);
92✔
2496
  }
2497

2498
  if (pAlter->alterType == TSDB_ALTER_TABLE_DROP_TAG) {
92✔
2499
    SIdxObj idxObj = {0};
62✔
2500
    SField *pField0 = taosArrayGet(pAlter->pFields, 0);
62✔
2501
    bool    exist = false;
62✔
2502
    if (mndGetIdxsByTagName(pMnode, pStb, pField0->name, &idxObj) == 0) {
62✔
2503
      exist = true;
2✔
2504
    }
2505
    TAOS_CHECK_GOTO(mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
62!
2506
    TAOS_CHECK_GOTO(mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
62!
2507

2508
    if (exist == true) {
62✔
2509
      TAOS_CHECK_GOTO(mndSetDropIdxPrepareLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
2!
2510
      TAOS_CHECK_GOTO(mndSetDropIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
2!
2511
    }
2512

2513
    TAOS_CHECK_GOTO(mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen), NULL, _OVER);
62!
2514
    TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
62!
2515

2516
  } else if (pAlter->alterType == TSDB_ALTER_TABLE_UPDATE_TAG_NAME) {
30!
2517
    SIdxObj     idxObj = {0};
30✔
2518
    SField     *pField0 = taosArrayGet(pAlter->pFields, 0);
30✔
2519
    SField     *pField1 = taosArrayGet(pAlter->pFields, 1);
30✔
2520
    const char *oTagName = pField0->name;
30✔
2521
    const char *nTagName = pField1->name;
30✔
2522
    bool        exist = false;
30✔
2523

2524
    if (mndGetIdxsByTagName(pMnode, pStb, pField0->name, &idxObj) == 0) {
30✔
2525
      exist = true;
20✔
2526
    }
2527

2528
    TAOS_CHECK_GOTO(mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
30!
2529
    TAOS_CHECK_GOTO(mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
30!
2530

2531
    if (exist == true) {
30✔
2532
      memcpy(idxObj.colName, nTagName, strlen(nTagName));
20✔
2533
      idxObj.colName[strlen(nTagName)] = 0;
20✔
2534
      TAOS_CHECK_GOTO(mndSetAlterIdxPrepareLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
20!
2535
      TAOS_CHECK_GOTO(mndSetAlterIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
20!
2536
    }
2537

2538
    TAOS_CHECK_GOTO(mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen), NULL, _OVER);
30!
2539
    TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
30!
2540
  }
2541
  code = 0;
92✔
2542

2543
_OVER:
92✔
2544
  mndTransDrop(pTrans);
92✔
2545
  TAOS_RETURN(code);
92✔
2546
}
2547

2548
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
323✔
2549
  bool    needRsp = true;
323✔
2550
  int32_t code = -1;
323✔
2551
  SField *pField0 = NULL;
323✔
2552

2553
  SStbObj stbObj = {0};
323✔
2554
  taosRLockLatch(&pOld->lock);
323✔
2555
  memcpy(&stbObj, pOld, sizeof(SStbObj));
323✔
2556
  taosRUnLockLatch(&pOld->lock);
323✔
2557
  stbObj.pColumns = NULL;
323✔
2558
  stbObj.pTags = NULL;
323✔
2559
  stbObj.pFuncs = NULL;
323✔
2560
  stbObj.pCmpr = NULL;
323✔
2561
  stbObj.updateTime = taosGetTimestampMs();
323✔
2562
  stbObj.lock = 0;
323✔
2563
  bool updateTagIndex = false;
323✔
2564
  switch (pAlter->alterType) {
323!
2565
    case TSDB_ALTER_TABLE_ADD_TAG:
85✔
2566
      code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
85✔
2567
      break;
85✔
2568
    case TSDB_ALTER_TABLE_DROP_TAG:
65✔
2569
      pField0 = taosArrayGet(pAlter->pFields, 0);
65✔
2570
      code = mndDropSuperTableTag(pMnode, pOld, &stbObj, pField0->name);
65✔
2571
      updateTagIndex = true;
65✔
2572
      break;
65✔
2573
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
46✔
2574
      code = mndAlterStbTagName(pMnode, pOld, &stbObj, pAlter->pFields);
46✔
2575
      updateTagIndex = true;
46✔
2576
      break;
46✔
2577
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
9✔
2578
      pField0 = taosArrayGet(pAlter->pFields, 0);
9✔
2579
      code = mndAlterStbTagBytes(pMnode, pOld, &stbObj, pField0);
9✔
2580
      break;
9✔
2581
    case TSDB_ALTER_TABLE_ADD_COLUMN:
44✔
2582
      code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields, 0);
44✔
2583
      break;
44✔
2584
    case TSDB_ALTER_TABLE_DROP_COLUMN:
30✔
2585
      pField0 = taosArrayGet(pAlter->pFields, 0);
30✔
2586
      code = mndDropSuperTableColumn(pMnode, pOld, &stbObj, pField0->name);
30✔
2587
      break;
30✔
2588
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
14✔
2589
      pField0 = taosArrayGet(pAlter->pFields, 0);
14✔
2590
      code = mndAlterStbColumnBytes(pMnode, pOld, &stbObj, pField0);
14✔
2591
      break;
14✔
2592
    case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
3✔
2593
      needRsp = false;
3✔
2594
      code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl);
3✔
2595
      break;
3✔
2596
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
25✔
2597
      code = mndUpdateSuperTableColumnCompress(pMnode, pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
25✔
2598
      break;
25✔
2599
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
2✔
2600
      code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields, 1);
2✔
2601
      break;
2✔
2602
    default:
×
2603
      needRsp = false;
×
2604
      terrno = TSDB_CODE_OPS_NOT_SUPPORT;
×
2605
      break;
×
2606
  }
2607

2608
  if (code != 0) goto _OVER;
323✔
2609
  if (updateTagIndex == false) {
260✔
2610
    code = mndAlterStbImp(pMnode, pReq, pDb, &stbObj, needRsp, pReq->pCont, pReq->contLen);
168✔
2611
  } else {
2612
    code = mndAlterStbAndUpdateTagIdxImp(pMnode, pReq, pDb, &stbObj, needRsp, pReq->pCont, pReq->contLen, pAlter);
92✔
2613
  }
2614

2615
_OVER:
323✔
2616
  taosMemoryFreeClear(stbObj.pTags);
323✔
2617
  taosMemoryFreeClear(stbObj.pColumns);
323✔
2618
  taosMemoryFreeClear(stbObj.pCmpr);
323✔
2619
  if (pAlter->commentLen > 0) {
323✔
2620
    taosMemoryFreeClear(stbObj.comment);
2!
2621
  }
2622
  TAOS_RETURN(code);
323✔
2623
}
2624

2625
static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
323✔
2626
  SMnode       *pMnode = pReq->info.node;
323✔
2627
  int32_t       code = -1;
323✔
2628
  SDbObj       *pDb = NULL;
323✔
2629
  SStbObj      *pStb = NULL;
323✔
2630
  SMAlterStbReq alterReq = {0};
323✔
2631

2632
  if (tDeserializeSMAlterStbReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
323!
2633
    code = TSDB_CODE_INVALID_MSG;
×
2634
    goto _OVER;
×
2635
  }
2636

2637
  mInfo("stb:%s, start to alter", alterReq.name);
323!
2638
  if (mndCheckAlterStbReq(&alterReq) != 0) goto _OVER;
323!
2639

2640
  pDb = mndAcquireDbByStb(pMnode, alterReq.name);
323✔
2641
  if (pDb == NULL) {
323!
2642
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
2643
    goto _OVER;
×
2644
  }
2645

2646
  pStb = mndAcquireStb(pMnode, alterReq.name);
323✔
2647
  if (pStb == NULL) {
323!
2648
    code = TSDB_CODE_MND_STB_NOT_EXIST;
×
2649
    goto _OVER;
×
2650
  }
2651

2652
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
323!
2653
    goto _OVER;
×
2654
  }
2655

2656
  code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
323✔
2657
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
323✔
2658

2659
  SName   name = {0};
323✔
2660
  int32_t ret = 0;
323✔
2661
  if ((ret = tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
323!
2662
    mError("stb:%s, failed to tNameFromString since %s", alterReq.name, tstrerror(ret));
×
2663

2664
  auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, name.tname, alterReq.sql, alterReq.sqlLen);
323✔
2665

2666
_OVER:
323✔
2667
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
323!
2668
    mError("stb:%s, failed to alter since %s", alterReq.name, tstrerror(code));
63!
2669
  }
2670

2671
  mndReleaseStb(pMnode, pStb);
323✔
2672
  mndReleaseDb(pMnode, pDb);
323✔
2673
  tFreeSMAltertbReq(&alterReq);
323✔
2674

2675
  TAOS_RETURN(code);
323✔
2676
}
2677

2678
static int32_t mndSetDropStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
433✔
2679
  int32_t  code = 0;
433✔
2680
  SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
433✔
2681
  if (pRedoRaw == NULL) {
433!
2682
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2683
    if (terrno != 0) code = terrno;
×
2684
    TAOS_RETURN(code);
×
2685
  }
2686
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
433!
2687
    sdbFreeRaw(pRedoRaw);
×
2688
    TAOS_RETURN(code);
×
2689
  }
2690
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
433!
2691

2692
  TAOS_RETURN(code);
433✔
2693
}
2694

2695
static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
433✔
2696
  int32_t  code = 0;
433✔
2697
  SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
433✔
2698
  if (pCommitRaw == NULL) {
433!
2699
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2700
    if (terrno != 0) code = terrno;
×
2701
    TAOS_RETURN(code);
×
2702
  }
2703
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
433!
2704
    sdbFreeRaw(pCommitRaw);
×
2705
    TAOS_RETURN(code);
×
2706
  }
2707
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
433!
2708

2709
  TAOS_RETURN(code);
433✔
2710
}
2711

2712
static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
433✔
2713
  int32_t code = 0;
433✔
2714
  SSdb   *pSdb = pMnode->pSdb;
433✔
2715
  SVgObj *pVgroup = NULL;
433✔
2716
  void   *pIter = NULL;
433✔
2717

2718
  while (1) {
1,599✔
2719
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
2,032✔
2720
    if (pIter == NULL) break;
2,032✔
2721
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
1,599✔
2722
      sdbRelease(pSdb, pVgroup);
733✔
2723
      continue;
733✔
2724
    }
2725

2726
    int32_t contLen = 0;
866✔
2727
    void   *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
866✔
2728
    if (pReq == NULL) {
866!
2729
      sdbCancelFetch(pSdb, pIter);
×
2730
      sdbRelease(pSdb, pVgroup);
×
2731
      code = TSDB_CODE_OUT_OF_MEMORY;
×
2732
      TAOS_RETURN(code);
×
2733
    }
2734

2735
    STransAction action = {0};
866✔
2736
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
866✔
2737
    action.pCont = pReq;
866✔
2738
    action.contLen = contLen;
866✔
2739
    action.msgType = TDMT_VND_DROP_STB;
866✔
2740
    action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
866✔
2741
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
866!
2742
      taosMemoryFree(pReq);
×
2743
      sdbCancelFetch(pSdb, pIter);
×
2744
      sdbRelease(pSdb, pVgroup);
×
2745
      TAOS_RETURN(code);
×
2746
    }
2747
    sdbRelease(pSdb, pVgroup);
866✔
2748
  }
2749

2750
  TAOS_RETURN(code);
433✔
2751
}
2752

2753
static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
433✔
2754
  int32_t code = -1;
433✔
2755
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stb");
433✔
2756
  if (pTrans == NULL) {
433!
2757
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2758
    if (terrno != 0) code = terrno;
×
2759
    goto _OVER;
×
2760
  }
2761

2762
  mInfo("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
433!
2763
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
433✔
2764
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
433!
2765

2766
  TAOS_CHECK_GOTO(mndSetDropStbPrepareLogs(pMnode, pTrans, pStb), NULL, _OVER);
433!
2767
  TAOS_CHECK_GOTO(mndSetDropStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
433!
2768
  TAOS_CHECK_GOTO(mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb), NULL, _OVER);
433!
2769
  TAOS_CHECK_GOTO(mndDropIdxsByStb(pMnode, pTrans, pDb, pStb), NULL, _OVER);
433!
2770
  TAOS_CHECK_GOTO(mndDropSmasByStb(pMnode, pTrans, pDb, pStb), NULL, _OVER);
433!
2771
  TAOS_CHECK_GOTO(mndUserRemoveStb(pMnode, pTrans, pStb->name), NULL, _OVER);
433!
2772
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
433!
2773
  code = 0;
433✔
2774

2775
_OVER:
433✔
2776
  mndTransDrop(pTrans);
433✔
2777
  TAOS_RETURN(code);
433✔
2778
}
2779

2780
static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) {
433✔
2781
  int32_t code = 0;
433✔
2782
  SSdb   *pSdb = pMnode->pSdb;
433✔
2783
  void   *pIter = NULL;
433✔
2784
  while (1) {
1✔
2785
    SMqTopicObj *pTopic = NULL;
434✔
2786
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
434✔
2787
    if (pIter == NULL) break;
434✔
2788

2789
    if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
1!
2790
      if (pTopic->stbUid == suid) {
×
2791
        sdbRelease(pSdb, pTopic);
×
2792
        sdbCancelFetch(pSdb, pIter);
×
2793
        TAOS_RETURN(-1);
×
2794
      }
2795
    }
2796

2797
    if (pTopic->ast == NULL) {
1!
2798
      sdbRelease(pSdb, pTopic);
1✔
2799
      continue;
1✔
2800
    }
2801

2802
    SNode *pAst = NULL;
×
2803
    if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
×
2804
      code = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
×
2805
      mError("topic:%s, create ast error", pTopic->name);
×
2806
      sdbRelease(pSdb, pTopic);
×
2807
      sdbCancelFetch(pSdb, pIter);
×
2808
      TAOS_RETURN(code);
×
2809
    }
2810

2811
    SNodeList *pNodeList = NULL;
×
2812
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
×
2813
        0) {
2814
      sdbRelease(pSdb, pTopic);
×
2815
      sdbCancelFetch(pSdb, pIter);
×
2816
      TAOS_RETURN(code);
×
2817
    }
2818
    SNode *pNode = NULL;
×
2819
    FOREACH(pNode, pNodeList) {
×
2820
      SColumnNode *pCol = (SColumnNode *)pNode;
×
2821

2822
      if (pCol->tableId == suid) {
×
2823
        sdbRelease(pSdb, pTopic);
×
2824
        nodesDestroyNode(pAst);
×
2825
        nodesDestroyList(pNodeList);
×
2826
        sdbCancelFetch(pSdb, pIter);
×
2827
        TAOS_RETURN(-1);
×
2828
      } else {
2829
        goto NEXT;
×
2830
      }
2831
    }
2832
  NEXT:
×
2833
    sdbRelease(pSdb, pTopic);
×
2834
    nodesDestroyNode(pAst);
×
2835
    nodesDestroyList(pNodeList);
×
2836
  }
2837
  TAOS_RETURN(code);
433✔
2838
}
2839

2840
static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) {
433✔
2841
  int32_t code = 0;
433✔
2842
  SSdb   *pSdb = pMnode->pSdb;
433✔
2843
  void   *pIter = NULL;
433✔
2844
  while (1) {
×
2845
    SStreamObj *pStream = NULL;
433✔
2846
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
433✔
2847
    if (pIter == NULL) break;
433!
2848

2849
    if (pStream->targetStbUid == suid) {
×
2850
      sdbCancelFetch(pSdb, pIter);
×
2851
      sdbRelease(pSdb, pStream);
×
2852
      TAOS_RETURN(-1);
×
2853
    }
2854

2855
    SNode *pAst = NULL;
×
2856
    if (nodesStringToNode(pStream->ast, &pAst) != 0) {
×
2857
      code = TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
2858
      mError("stream:%s, create ast error", pStream->name);
×
2859
      sdbCancelFetch(pSdb, pIter);
×
2860
      sdbRelease(pSdb, pStream);
×
2861
      TAOS_RETURN(code);
×
2862
    }
2863

2864
    SNodeList *pNodeList = NULL;
×
2865
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
×
2866
        0) {
2867
      sdbCancelFetch(pSdb, pIter);
×
2868
      sdbRelease(pSdb, pStream);
×
2869
      TAOS_RETURN(code);
×
2870
    }
2871
    SNode *pNode = NULL;
×
2872
    FOREACH(pNode, pNodeList) {
×
2873
      SColumnNode *pCol = (SColumnNode *)pNode;
×
2874

2875
      if (pCol->tableId == suid) {
×
2876
        sdbCancelFetch(pSdb, pIter);
×
2877
        sdbRelease(pSdb, pStream);
×
2878
        nodesDestroyNode(pAst);
×
2879
        nodesDestroyList(pNodeList);
×
2880
        TAOS_RETURN(-1);
×
2881
      } else {
2882
        goto NEXT;
×
2883
      }
2884
    }
2885
  NEXT:
×
2886
    sdbRelease(pSdb, pStream);
×
2887
    nodesDestroyNode(pAst);
×
2888
    nodesDestroyList(pNodeList);
×
2889
  }
2890
  TAOS_RETURN(code);
433✔
2891
}
2892

2893
static int32_t mndProcessDropTtltbRsp(SRpcMsg *pRsp) { return 0; }
×
2894
static int32_t mndProcessTrimDbRsp(SRpcMsg *pRsp) { return 0; }
662✔
UNCOV
2895
static int32_t mndProcessS3MigrateDbRsp(SRpcMsg *pRsp) { return 0; }
×
2896

2897
static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
494✔
2898
  SMnode      *pMnode = pReq->info.node;
494✔
2899
  int32_t      code = -1;
494✔
2900
  SDbObj      *pDb = NULL;
494✔
2901
  SStbObj     *pStb = NULL;
494✔
2902
  SMDropStbReq dropReq = {0};
494✔
2903

2904
  TAOS_CHECK_GOTO(tDeserializeSMDropStbReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
494!
2905

2906
  mInfo("stb:%s, start to drop", dropReq.name);
494!
2907

2908
  pStb = mndAcquireStb(pMnode, dropReq.name);
494✔
2909
  if (pStb == NULL) {
494✔
2910
    if (dropReq.igNotExists) {
61✔
2911
      mInfo("stb:%s, not exist, ignore not exist is set", dropReq.name);
60!
2912
      code = 0;
60✔
2913
      goto _OVER;
60✔
2914
    } else {
2915
      code = TSDB_CODE_MND_STB_NOT_EXIST;
1✔
2916
      goto _OVER;
1✔
2917
    }
2918
  }
2919

2920
  if ((dropReq.source == TD_REQ_FROM_TAOX_OLD || dropReq.source == TD_REQ_FROM_TAOX) && pStb->uid != dropReq.suid) {
433!
2921
    code = 0;
×
2922
    goto _OVER;
×
2923
  }
2924

2925
  pDb = mndAcquireDbByStb(pMnode, dropReq.name);
433✔
2926
  if (pDb == NULL) {
433!
2927
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2928
    goto _OVER;
×
2929
  }
2930

2931
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
433!
2932
    goto _OVER;
×
2933
  }
2934

2935
  if (mndCheckDropStbForTopic(pMnode, dropReq.name, pStb->uid) < 0) {
433!
2936
    code = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
×
2937
    goto _OVER;
×
2938
  }
2939

2940
  if (mndCheckDropStbForStream(pMnode, dropReq.name, pStb->uid) < 0) {
433!
2941
    code = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
2942
    goto _OVER;
×
2943
  }
2944

2945
  code = mndDropStb(pMnode, pReq, pDb, pStb);
433✔
2946
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
433!
2947

2948
  SName   name = {0};
433✔
2949
  int32_t ret = 0;
433✔
2950
  if ((ret = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
433!
2951
    mError("stb:%s, failed to tNameFromString since %s", dropReq.name, tstrerror(ret));
×
2952

2953
  auditRecord(pReq, pMnode->clusterId, "dropStb", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
433✔
2954

2955
_OVER:
494✔
2956
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
494✔
2957
    mError("stb:%s, failed to drop since %s", dropReq.name, tstrerror(code));
1!
2958
  }
2959

2960
  mndReleaseDb(pMnode, pDb);
494✔
2961
  mndReleaseStb(pMnode, pStb);
494✔
2962
  tFreeSMDropStbReq(&dropReq);
494✔
2963
  TAOS_RETURN(code);
494✔
2964
}
2965

2966
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
15,945✔
2967
  SMnode       *pMnode = pReq->info.node;
15,945✔
2968
  int32_t       code = -1;
15,945✔
2969
  STableInfoReq infoReq = {0};
15,945✔
2970
  STableMetaRsp metaRsp = {0};
15,945✔
2971
  SUserObj     *pUser = NULL;
15,945✔
2972

2973
  code = mndAcquireUser(pMnode, pReq->info.conn.user, &pUser);
15,945✔
2974
  if (pUser == NULL) return 0;
15,946!
2975
  bool sysinfo = pUser->sysInfo;
15,946✔
2976

2977
  TAOS_CHECK_GOTO(tDeserializeSTableInfoReq(pReq->pCont, pReq->contLen, &infoReq), NULL, _OVER);
15,946!
2978

2979
  if (0 == strcmp(infoReq.dbFName, TSDB_INFORMATION_SCHEMA_DB)) {
15,946✔
2980
    mInfo("information_schema table:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
1,332!
2981
    TAOS_CHECK_GOTO(mndBuildInsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, sysinfo, &metaRsp), NULL, _OVER);
1,332✔
2982
  } else if (0 == strcmp(infoReq.dbFName, TSDB_PERFORMANCE_SCHEMA_DB)) {
14,614✔
2983
    mInfo("performance_schema table:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
128!
2984
    TAOS_CHECK_GOTO(mndBuildPerfsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp), NULL, _OVER);
128✔
2985
  } else {
2986
    mInfo("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
14,486✔
2987
    TAOS_CHECK_GOTO(mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp), NULL, _OVER);
14,487✔
2988
  }
2989

2990
  int32_t rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
14,753✔
2991
  if (rspLen < 0) {
14,753!
2992
    code = TSDB_CODE_INVALID_MSG;
×
2993
    goto _OVER;
×
2994
  }
2995

2996
  void *pRsp = rpcMallocCont(rspLen);
14,753✔
2997
  if (pRsp == NULL) {
14,753!
2998
    code = terrno;
×
2999
    goto _OVER;
×
3000
  }
3001

3002
  if ((rspLen = tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp)) < 0) {
14,753!
3003
    code = rspLen;
×
3004
    goto _OVER;
×
3005
  }
3006
  pReq->info.rsp = pRsp;
14,753✔
3007
  pReq->info.rspLen = rspLen;
14,753✔
3008
  code = 0;
14,753✔
3009

3010
  mTrace("%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName);
14,753✔
3011

3012
_OVER:
14,750✔
3013
  if (code != 0) {
15,946✔
3014
    mError("stb:%s.%s, failed to retrieve meta since %s", infoReq.dbFName, infoReq.tbName, tstrerror(code));
1,193!
3015
  }
3016

3017
  mndReleaseUser(pMnode, pUser);
15,946✔
3018
  tFreeSTableMetaRsp(&metaRsp);
15,946✔
3019
  // TODO change to TAOS_RETURN
3020
  return code;
15,946✔
3021
}
3022

3023
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq) {
22✔
3024
  SMnode      *pMnode = pReq->info.node;
22✔
3025
  int32_t      code = -1;
22✔
3026
  STableCfgReq cfgReq = {0};
22✔
3027
  STableCfgRsp cfgRsp = {0};
22✔
3028

3029
  TAOS_CHECK_GOTO(tDeserializeSTableCfgReq(pReq->pCont, pReq->contLen, &cfgReq), NULL, _OVER);
22!
3030

3031
  char dbName[TSDB_DB_NAME_LEN] = {0};
22✔
3032
  TAOS_CHECK_GOTO(mndExtractShortDbNameFromDbFullName(cfgReq.dbFName, dbName), NULL, _OVER);
22!
3033
  if (0 == strcmp(dbName, TSDB_INFORMATION_SCHEMA_DB)) {
22!
3034
    mInfo("information_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
×
3035
    TAOS_CHECK_GOTO(mndBuildInsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp), NULL, _OVER);
×
3036
  } else if (0 == strcmp(dbName, TSDB_PERFORMANCE_SCHEMA_DB)) {
22!
3037
    mInfo("performance_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
×
3038
    TAOS_CHECK_GOTO(mndBuildPerfsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp), NULL, _OVER);
×
3039
  } else {
3040
    mInfo("stb:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
22!
3041
    TAOS_CHECK_GOTO(mndBuildStbCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp), NULL, _OVER);
22!
3042
  }
3043

3044
  int32_t rspLen = tSerializeSTableCfgRsp(NULL, 0, &cfgRsp);
22✔
3045
  if (rspLen < 0) {
22!
3046
    code = TSDB_CODE_INVALID_MSG;
×
3047
    goto _OVER;
×
3048
  }
3049

3050
  void *pRsp = rpcMallocCont(rspLen);
22✔
3051
  if (pRsp == NULL) {
22!
3052
    code = terrno;
×
3053
    goto _OVER;
×
3054
  }
3055

3056
  if ((rspLen = tSerializeSTableCfgRsp(pRsp, rspLen, &cfgRsp)) < 0) {
22!
3057
    code = rspLen;
×
3058
    goto _OVER;
×
3059
  }
3060
  pReq->info.rsp = pRsp;
22✔
3061
  pReq->info.rspLen = rspLen;
22✔
3062
  code = 0;
22✔
3063

3064
  mTrace("%s.%s, cfg is retrieved", cfgReq.dbFName, cfgReq.tbName);
22!
3065

3066
_OVER:
22✔
3067
  if (code != 0) {
22!
3068
    mError("stb:%s.%s, failed to retrieve cfg since %s", cfgReq.dbFName, cfgReq.tbName, tstrerror(code));
×
3069
  }
3070

3071
  tFreeSTableCfgRsp(&cfgRsp);
22✔
3072
  TAOS_RETURN(code);
22✔
3073
}
3074

3075
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t numOfStbs, void **ppRsp,
14,627✔
3076
                           int32_t *pRspLen) {
3077
  int32_t   code = 0;
14,627✔
3078
  SSTbHbRsp hbRsp = {0};
14,627✔
3079
  hbRsp.pMetaRsp = taosArrayInit(numOfStbs, sizeof(STableMetaRsp));
14,627✔
3080
  if (hbRsp.pMetaRsp == NULL) {
14,627!
3081
    code = terrno;
×
3082
    TAOS_RETURN(code);
×
3083
  }
3084

3085
  hbRsp.pIndexRsp = taosArrayInit(numOfStbs, sizeof(STableIndexRsp));
14,627✔
3086
  if (NULL == hbRsp.pIndexRsp) {
14,627!
3087
    taosArrayDestroy(hbRsp.pMetaRsp);
×
3088
    code = terrno;
×
3089
    TAOS_RETURN(code);
×
3090
  }
3091

3092
  for (int32_t i = 0; i < numOfStbs; ++i) {
33,264✔
3093
    SSTableVersion *pStbVersion = &pStbVersions[i];
18,637✔
3094
    pStbVersion->suid = be64toh(pStbVersion->suid);
18,637✔
3095
    pStbVersion->sversion = ntohl(pStbVersion->sversion);
18,637✔
3096
    pStbVersion->tversion = ntohl(pStbVersion->tversion);
18,637✔
3097
    pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
18,637✔
3098

3099
    bool    schema = false;
18,637✔
3100
    bool    sma = false;
18,637✔
3101
    int32_t code = mndValidateStbVersion(pMnode, pStbVersion, &schema, &sma);
18,637✔
3102
    if (TSDB_CODE_SUCCESS != code) {
18,637✔
3103
      STableMetaRsp metaRsp = {0};
143✔
3104
      metaRsp.numOfColumns = -1;
143✔
3105
      metaRsp.suid = pStbVersion->suid;
143✔
3106
      tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName));
143✔
3107
      tstrncpy(metaRsp.tbName, pStbVersion->stbName, sizeof(metaRsp.tbName));
143✔
3108
      tstrncpy(metaRsp.stbName, pStbVersion->stbName, sizeof(metaRsp.stbName));
143✔
3109
      if (taosArrayPush(hbRsp.pMetaRsp, &metaRsp) == NULL) {
286!
3110
        code = terrno;
×
3111
        return code;
×
3112
      }
3113
      continue;
143✔
3114
    }
3115

3116
    if (schema) {
18,494✔
3117
      STableMetaRsp metaRsp = {0};
314✔
3118
      mInfo("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
314!
3119
      if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp) != 0) {
314!
3120
        metaRsp.numOfColumns = -1;
×
3121
        metaRsp.suid = pStbVersion->suid;
×
3122
        tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName));
×
3123
        tstrncpy(metaRsp.tbName, pStbVersion->stbName, sizeof(metaRsp.tbName));
×
3124
        tstrncpy(metaRsp.stbName, pStbVersion->stbName, sizeof(metaRsp.stbName));
×
3125
        if (taosArrayPush(hbRsp.pMetaRsp, &metaRsp) == NULL) {
×
3126
          code = terrno;
×
3127
          return code;
×
3128
        }
3129
        continue;
×
3130
      }
3131

3132
      if (taosArrayPush(hbRsp.pMetaRsp, &metaRsp) == NULL) {
628!
3133
        code = terrno;
×
3134
        return code;
×
3135
      }
3136
    }
3137

3138
    if (sma) {
18,494!
3139
      bool           exist = false;
×
3140
      char           tbFName[TSDB_TABLE_FNAME_LEN];
3141
      STableIndexRsp indexRsp = {0};
×
3142
      indexRsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
×
3143
      if (NULL == indexRsp.pIndex) {
×
3144
        code = terrno;
×
3145
        TAOS_RETURN(code);
×
3146
      }
3147

3148
      sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName);
×
3149
      int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist);
×
3150
      if (code || !exist) {
×
3151
        indexRsp.suid = pStbVersion->suid;
×
3152
        indexRsp.version = -1;
×
3153
        indexRsp.pIndex = NULL;
×
3154
      }
3155

3156
      strcpy(indexRsp.dbFName, pStbVersion->dbFName);
×
3157
      strcpy(indexRsp.tbName, pStbVersion->stbName);
×
3158

3159
      if (taosArrayPush(hbRsp.pIndexRsp, &indexRsp) == NULL) {
×
3160
        code = terrno;
×
3161
        return code;
×
3162
      }
3163
    }
3164
  }
3165

3166
  int32_t rspLen = tSerializeSSTbHbRsp(NULL, 0, &hbRsp);
14,627✔
3167
  if (rspLen < 0) {
14,627!
3168
    tFreeSSTbHbRsp(&hbRsp);
×
3169
    code = TSDB_CODE_INVALID_MSG;
×
3170
    TAOS_RETURN(code);
×
3171
  }
3172

3173
  void *pRsp = taosMemoryMalloc(rspLen);
14,627✔
3174
  if (pRsp == NULL) {
14,627!
3175
    tFreeSSTbHbRsp(&hbRsp);
×
3176
    code = terrno;
×
3177
    TAOS_RETURN(code);
×
3178
  }
3179

3180
  rspLen = tSerializeSSTbHbRsp(pRsp, rspLen, &hbRsp);
14,627✔
3181
  tFreeSSTbHbRsp(&hbRsp);
14,627✔
3182
  if (rspLen < 0) return rspLen;
14,627!
3183
  *ppRsp = pRsp;
14,627✔
3184
  *pRspLen = rspLen;
14,627✔
3185
  TAOS_RETURN(code);
14,627✔
3186
}
3187

3188
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
3,093✔
3189
  int32_t code = 0;
3,093✔
3190
  SSdb   *pSdb = pMnode->pSdb;
3,093✔
3191
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
3,093✔
3192
  if (pDb == NULL) {
3,093!
3193
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
3194
    TAOS_RETURN(code);
×
3195
  }
3196

3197
  int32_t numOfStbs = 0;
3,093✔
3198
  void   *pIter = NULL;
3,093✔
3199
  while (1) {
196,874✔
3200
    SStbObj *pStb = NULL;
199,967✔
3201
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
199,967✔
3202
    if (pIter == NULL) break;
199,967✔
3203

3204
    if (pStb->dbUid == pDb->uid) {
196,874✔
3205
      numOfStbs++;
27,169✔
3206
    }
3207

3208
    sdbRelease(pSdb, pStb);
196,874✔
3209
  }
3210

3211
  *pNumOfStbs = numOfStbs;
3,093✔
3212
  mndReleaseDb(pMnode, pDb);
3,093✔
3213
  TAOS_RETURN(code);
3,093✔
3214
}
3215

3216
int32_t mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst) {
×
3217
  SName name = {0};
×
3218
  TAOS_CHECK_RETURN(tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
×
3219

3220
  TAOS_CHECK_RETURN(tNameGetFullDbName(&name, dst));
×
3221

3222
  return 0;
×
3223
}
3224

3225
int32_t mndExtractShortDbNameFromStbFullName(const char *stbFullName, char *dst) {
2✔
3226
  SName name = {0};
2✔
3227
  TAOS_CHECK_RETURN(tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
2!
3228

3229
  TAOS_CHECK_RETURN(tNameGetDbName(&name, dst));
2!
3230

3231
  return 0;
2✔
3232
}
3233

3234
int32_t mndExtractShortDbNameFromDbFullName(const char *stbFullName, char *dst) {
22✔
3235
  SName name = {0};
22✔
3236
  TAOS_CHECK_RETURN(tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB));
22!
3237

3238
  TAOS_CHECK_RETURN(tNameGetDbName(&name, dst));
22!
3239

3240
  return 0;
22✔
3241
}
3242

3243
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) {
494✔
3244
  int32_t pos = -1;
494✔
3245
  int32_t num = 0;
494✔
3246
  for (pos = 0; stbFullName[pos] != 0; ++pos) {
4,753!
3247
    if (stbFullName[pos] == TS_PATH_DELIMITER[0]) num++;
4,753✔
3248
    if (num == 2) break;
4,753✔
3249
  }
3250

3251
  if (num == 2) {
494!
3252
    tstrncpy(dst, stbFullName + pos + 1, dstSize);
494✔
3253
  }
3254
}
494✔
3255

3256
// static int32_t mndProcessRetrieveStbReq(SRpcMsg *pReq) {
3257
//   SMnode    *pMnode = pReq->info.node;
3258
//   SShowMgmt *pMgmt = &pMnode->showMgmt;
3259
//   SShowObj  *pShow = NULL;
3260
//   int32_t    rowsToRead = SHOW_STEP_SIZE;
3261
//   int32_t    rowsRead = 0;
3262
//
3263
//   SRetrieveTableReq retrieveReq = {0};
3264
//   if (tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
3265
//     terrno = TSDB_CODE_INVALID_MSG;
3266
//     return -1;
3267
//   }
3268
//
3269
//   SMnode    *pMnode = pReq->info.node;
3270
//   SSdb      *pSdb = pMnode->pSdb;
3271
//   int32_t    numOfRows = 0;
3272
//   SDbObj    *pDb = NULL;
3273
//   ESdbStatus objStatus = 0;
3274
//
3275
//   SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
3276
//   if (pUser == NULL) return 0;
3277
//   bool sysinfo = pUser->sysInfo;
3278
//
3279
//   // Append the information_schema database into the result.
3280
////  if (!pShow->sysDbRsp) {
3281
////    SDbObj infoschemaDb = {0};
3282
////    setInformationSchemaDbCfg(pMnode, &infoschemaDb);
3283
////    size_t numOfTables = 0;
3284
////    getVisibleInfosTablesNum(sysinfo, &numOfTables);
3285
////    mndDumpDbInfoData(pMnode, pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
3286
////
3287
////    numOfRows += 1;
3288
////
3289
////    SDbObj perfschemaDb = {0};
3290
////    setPerfSchemaDbCfg(pMnode, &perfschemaDb);
3291
////    numOfTables = 0;
3292
////    getPerfDbMeta(NULL, &numOfTables);
3293
////    mndDumpDbInfoData(pMnode, pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
3294
////
3295
////    numOfRows += 1;
3296
////    pShow->sysDbRsp = true;
3297
////  }
3298
//
3299
//  SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS);
3300
//  blockDataEnsureCapacity(p, rowsToRead);
3301
//
3302
//  size_t               size = 0;
3303
//  const SSysTableMeta* pSysDbTableMeta = NULL;
3304
//
3305
//  getInfosDbMeta(&pSysDbTableMeta, &size);
3306
//  p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);
3307
//
3308
//  getPerfDbMeta(&pSysDbTableMeta, &size);
3309
//  p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
3310
//
3311
//  blockDataDestroy(p);
3312
//
3313
//
3314
//  while (numOfRows < rowsToRead) {
3315
//    pShow->pIter = sdbFetchAll(pSdb, SDB_DB, pShow->pIter, (void **)&pDb, &objStatus, true);
3316
//    if (pShow->pIter == NULL) break;
3317
//    if (strncmp(retrieveReq.db, pDb->name, strlen(retrieveReq.db)) != 0){
3318
//      continue;
3319
//    }
3320
//    if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) {
3321
//      continue;
3322
//    }
3323
//
3324
//    while (numOfRows < rowsToRead) {
3325
//      pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
3326
//      if (pShow->pIter == NULL) break;
3327
//
3328
//      if (pDb != NULL && pStb->dbUid != pDb->uid) {
3329
//        sdbRelease(pSdb, pStb);
3330
//        continue;
3331
//      }
3332
//
3333
//      cols = 0;
3334
//
3335
//      SName name = {0};
3336
//      char  stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
3337
//      mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
3338
//      varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
3339
//
3340
//      SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3341
//      colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
3342
//
3343
//      char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
3344
//      tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
3345
//      tNameGetDbName(&name, varDataVal(db));
3346
//      varDataSetLen(db, strlen(varDataVal(db)));
3347
//
3348
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3349
//      colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
3350
//
3351
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3352
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
3353
//
3354
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3355
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false);
3356
//
3357
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3358
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false);
3359
//
3360
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3361
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->updateTime, false);  // number of tables
3362
//
3363
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3364
//      if (pStb->commentLen > 0) {
3365
//        char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
3366
//        STR_TO_VARSTR(comment, pStb->comment);
3367
//        colDataSetVal(pColInfo, numOfRows, comment, false);
3368
//      } else if (pStb->commentLen == 0) {
3369
//        char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
3370
//        STR_TO_VARSTR(comment, "");
3371
//        colDataSetVal(pColInfo, numOfRows, comment, false);
3372
//      } else {
3373
//        colDataSetNULL(pColInfo, numOfRows);
3374
//      }
3375
//
3376
//      char watermark[64 + VARSTR_HEADER_SIZE] = {0};
3377
//      sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]);
3378
//      varDataSetLen(watermark, strlen(varDataVal(watermark)));
3379
//
3380
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3381
//      colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false);
3382
//
3383
//      char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
3384
//      sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
3385
//      varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
3386
//
3387
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3388
//      colDataSetVal(pColInfo, numOfRows, (const char *)maxDelay, false);
3389
//
3390
//      char    rollup[160 + VARSTR_HEADER_SIZE] = {0};
3391
//      int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
3392
//      char   *sep = ", ";
3393
//      int32_t sepLen = strlen(sep);
3394
//      int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2;
3395
//      for (int32_t i = 0; i < rollupNum; ++i) {
3396
//        char *funcName = taosArrayGet(pStb->pFuncs, i);
3397
//        if (i) {
3398
//          strncat(varDataVal(rollup), sep, rollupLen);
3399
//          rollupLen -= sepLen;
3400
//        }
3401
//        strncat(varDataVal(rollup), funcName, rollupLen);
3402
//        rollupLen -= strlen(funcName);
3403
//      }
3404
//      varDataSetLen(rollup, strlen(varDataVal(rollup)));
3405
//
3406
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3407
//      colDataSetVal(pColInfo, numOfRows, (const char *)rollup, false);
3408
//
3409
//      numOfRows++;
3410
//      sdbRelease(pSdb, pStb);
3411
//    }
3412
//
3413
//    if (pDb != NULL) {
3414
//      mndReleaseDb(pMnode, pDb);
3415
//    }
3416
//
3417
//    sdbRelease(pSdb, pDb);
3418
//  }
3419
//
3420
//  pShow->numOfRows += numOfRows;
3421
//  mndReleaseUser(pMnode, pUser);
3422
//
3423
//
3424
//
3425
//
3426
//
3427
//
3428
//
3429
//
3430
//  ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
3431
//  if (retrieveFp == NULL) {
3432
//    mndReleaseShowObj(pShow, false);
3433
//    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
3434
//    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
3435
//    return -1;
3436
//  }
3437
//
3438
//  mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
3439
//  if (retrieveReq.user[0] != 0) {
3440
//    memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN);
3441
//  } else {
3442
//    memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
3443
//  }
3444
//  if (retrieveReq.db[0] && mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
3445
//    return -1;
3446
//  }
3447
//
3448
//  int32_t numOfCols = pShow->pMeta->numOfColumns;
3449
//
3450
//  SSDataBlock *pBlock = createDataBlock();
3451
//  for (int32_t i = 0; i < numOfCols; ++i) {
3452
//    SColumnInfoData idata = {0};
3453
//
3454
//    SSchema *p = &pShow->pMeta->pSchemas[i];
3455
//
3456
//    idata.info.bytes = p->bytes;
3457
//    idata.info.type = p->type;
3458
//    idata.info.colId = p->colId;
3459
//    blockDataAppendColInfo(pBlock, &idata);
3460
//  }
3461
//
3462
//  blockDataEnsureCapacity(pBlock, rowsToRead);
3463
//
3464
//  if (mndCheckRetrieveFinished(pShow)) {
3465
//    mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
3466
//    rowsRead = 0;
3467
//  } else {
3468
//    rowsRead = (*retrieveFp)(pReq, pShow, pBlock, rowsToRead);
3469
//    if (rowsRead < 0) {
3470
//      terrno = rowsRead;
3471
//      mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
3472
//      mndReleaseShowObj(pShow, true);
3473
//      blockDataDestroy(pBlock);
3474
//      return -1;
3475
//    }
3476
//
3477
//    pBlock->info.rows = rowsRead;
3478
//    mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
3479
//  }
3480
//
3481
//  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
3482
//         blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock));
3483
//
3484
//  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
3485
//  if (pRsp == NULL) {
3486
//    mndReleaseShowObj(pShow, false);
3487
//    terrno = TSDB_CODE_OUT_OF_MEMORY;
3488
//    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
3489
//    blockDataDestroy(pBlock);
3490
//    return -1;
3491
//  }
3492
//
3493
//  pRsp->handle = htobe64(pShow->id);
3494
//
3495
//  if (rowsRead > 0) {
3496
//    char    *pStart = pRsp->data;
3497
//    SSchema *ps = pShow->pMeta->pSchemas;
3498
//
3499
//    *(int32_t *)pStart = htonl(pShow->pMeta->numOfColumns);
3500
//    pStart += sizeof(int32_t);  // number of columns
3501
//
3502
//    for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
3503
//      SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
3504
//      pSchema->bytes = htonl(ps[i].bytes);
3505
//      pSchema->colId = htons(ps[i].colId);
3506
//      pSchema->type = ps[i].type;
3507
//
3508
//      pStart += sizeof(SSysTableSchema);
3509
//    }
3510
//
3511
//    int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
3512
//  }
3513
//
3514
//  pRsp->numOfRows = htonl(rowsRead);
3515
//  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
3516
//  pReq->info.rsp = pRsp;
3517
//  pReq->info.rspLen = size;
3518
//
3519
//  if (rowsRead == 0 || rowsRead < rowsToRead) {
3520
//    pRsp->completed = 1;
3521
//    mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
3522
//    mndReleaseShowObj(pShow, true);
3523
//  } else {
3524
//    mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
3525
//    mndReleaseShowObj(pShow, false);
3526
//  }
3527
//
3528
//  blockDataDestroy(pBlock);
3529
//  return TSDB_CODE_SUCCESS;
3530
//}
3531

3532
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
260✔
3533
  SMnode  *pMnode = pReq->info.node;
260✔
3534
  SSdb    *pSdb = pMnode->pSdb;
260✔
3535
  int32_t  numOfRows = 0;
260✔
3536
  SStbObj *pStb = NULL;
260✔
3537
  int32_t  cols = 0;
260✔
3538
  int32_t  lino = 0;
260✔
3539
  int32_t  code = 0;
260✔
3540

3541
  SDbObj *pDb = NULL;
260✔
3542
  if (strlen(pShow->db) > 0) {
260✔
3543
    pDb = mndAcquireDb(pMnode, pShow->db);
215✔
3544
    if (pDb == NULL) return terrno;
215!
3545
  }
3546

3547
  while (numOfRows < rows) {
1,367!
3548
    pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
1,367✔
3549
    if (pShow->pIter == NULL) break;
1,367✔
3550

3551
    if (pDb != NULL && pStb->dbUid != pDb->uid) {
1,107✔
3552
      sdbRelease(pSdb, pStb);
615✔
3553
      continue;
615✔
3554
    }
3555

3556
    if (isTsmaResSTb(pStb->name)) {
492!
3557
      sdbRelease(pSdb, pStb);
×
3558
      continue;
×
3559
    }
3560

3561
    cols = 0;
492✔
3562

3563
    SName name = {0};
492✔
3564

3565
    char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
492✔
3566
    mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
492✔
3567
    varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
492✔
3568
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
3569
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false), pStb, &lino, _ERROR);
492!
3570

3571
    char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
492✔
3572
    RETRIEVE_CHECK_GOTO(tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB), pStb, &lino, _ERROR);
492!
3573
    RETRIEVE_CHECK_GOTO(tNameGetDbName(&name, varDataVal(db)), pStb, &lino, _ERROR);
492!
3574
    varDataSetLen(db, strlen(varDataVal(db)));
492✔
3575
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
3576
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)db, false), pStb, &lino, _ERROR);
492!
3577

3578
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
3579
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->createdTime, false), pStb, &lino,
492!
3580
                        _ERROR);
3581

3582
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
3583
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false), pStb, &lino,
492!
3584
                        _ERROR);
3585

3586
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
3587
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false), pStb, &lino, _ERROR);
492!
3588

3589
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
3590
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->updateTime, false), pStb, &lino,
492!
3591
                        _ERROR);  // number of tables
3592

3593
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
3594
    if (pStb->commentLen > 0) {
492✔
3595
      char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
20✔
3596
      STR_TO_VARSTR(comment, pStb->comment);
20✔
3597
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, comment, false), pStb, &lino, _ERROR);
20!
3598
    } else if (pStb->commentLen == 0) {
472✔
3599
      char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
3✔
3600
      STR_TO_VARSTR(comment, "");
3✔
3601
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, comment, false), pStb, &lino, _ERROR);
3!
3602
    } else {
3603
      colDataSetNULL(pColInfo, numOfRows);
469!
3604
    }
3605

3606
    char watermark[64 + VARSTR_HEADER_SIZE] = {0};
492✔
3607
    sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]);
492✔
3608
    varDataSetLen(watermark, strlen(varDataVal(watermark)));
492✔
3609

3610
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
3611
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false), pStb, &lino, _ERROR);
492!
3612

3613
    char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
492✔
3614
    sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
492✔
3615
    varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
492✔
3616

3617
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
3618
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)maxDelay, false), pStb, &lino, _ERROR);
492!
3619

3620
    char    rollup[160 + VARSTR_HEADER_SIZE] = {0};
492✔
3621
    int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
492✔
3622
    char   *sep = ", ";
492✔
3623
    int32_t sepLen = strlen(sep);
492✔
3624
    int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2;
492✔
3625
    for (int32_t i = 0; i < rollupNum; ++i) {
495✔
3626
      char *funcName = taosArrayGet(pStb->pFuncs, i);
3✔
3627
      if (i) {
3!
3628
        (void)strncat(varDataVal(rollup), sep, rollupLen);
×
3629
        rollupLen -= sepLen;
×
3630
      }
3631
      (void)strncat(varDataVal(rollup), funcName, rollupLen);
3✔
3632
      rollupLen -= strlen(funcName);
3✔
3633
    }
3634
    varDataSetLen(rollup, strlen(varDataVal(rollup)));
492✔
3635

3636
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
3637
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)rollup, false), pStb, &lino, _ERROR);
492!
3638

3639
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
3640
    if (pColInfo) {
492!
3641
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)(&pStb->uid), false), pStb, &lino, _ERROR);
492!
3642
    }
3643

3644
    numOfRows++;
492✔
3645
    sdbRelease(pSdb, pStb);
492✔
3646
  }
3647

3648
  if (pDb != NULL) {
260✔
3649
    mndReleaseDb(pMnode, pDb);
215✔
3650
  }
3651

3652
  goto _OVER;
260✔
3653

3654
_ERROR:
×
3655
  mError("show:0x%" PRIx64 ", failed to retrieve data at %s:%d since %s", pShow->id, __FUNCTION__, lino,
×
3656
         tstrerror(code));
3657

3658
_OVER:
×
3659
  pShow->numOfRows += numOfRows;
260✔
3660
  return numOfRows;
260✔
3661
}
3662

3663
static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *pSysDbTableMeta, size_t size,
×
3664
                                    const char *dbName, const char *tbName) {
3665
  char    tName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
3666
  char    dName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
3667
  char    typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
3668
  int32_t numOfRows = p->info.rows;
×
3669
  int32_t lino = 0;
×
3670
  int32_t code = 0;
×
3671

3672
  STR_TO_VARSTR(dName, dbName);
×
3673
  STR_TO_VARSTR(typeName, "SYSTEM_TABLE");
×
3674

3675
  for (int32_t i = 0; i < size; ++i) {
×
3676
    const SSysTableMeta *pm = &pSysDbTableMeta[i];
×
3677
    //    if (pm->sysInfo) {
3678
    //      continue;
3679
    //    }
3680
    if (tbName[0] && strncmp(tbName, pm->name, TSDB_TABLE_NAME_LEN) != 0) {
×
3681
      continue;
×
3682
    }
3683

3684
    STR_TO_VARSTR(tName, pm->name);
×
3685

3686
    for (int32_t j = 0; j < pm->colNum; j++) {
×
3687
      // table name
3688
      SColumnInfoData *pColInfoData = taosArrayGet(p->pDataBlock, 0);
×
3689
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, tName, false), &lino, _OVER);
×
3690

3691
      // database name
3692
      pColInfoData = taosArrayGet(p->pDataBlock, 1);
×
3693
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, dName, false), &lino, _OVER);
×
3694

3695
      pColInfoData = taosArrayGet(p->pDataBlock, 2);
×
3696
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, typeName, false), &lino, _OVER);
×
3697

3698
      // col name
3699
      char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
3700
      STR_TO_VARSTR(colName, pm->schema[j].name);
×
3701
      pColInfoData = taosArrayGet(p->pDataBlock, 3);
×
3702
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, colName, false), &lino, _OVER);
×
3703

3704
      // col type
3705
      int8_t colType = pm->schema[j].type;
×
3706
      pColInfoData = taosArrayGet(p->pDataBlock, 4);
×
3707
      char colTypeStr[VARSTR_HEADER_SIZE + 32];
3708
      int  colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
×
3709
      if (colType == TSDB_DATA_TYPE_VARCHAR) {
×
3710
        colTypeLen +=
×
3711
            sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)", (int32_t)(pm->schema[j].bytes - VARSTR_HEADER_SIZE));
×
3712
      } else if (colType == TSDB_DATA_TYPE_NCHAR) {
×
3713
        colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
×
3714
                              (int32_t)((pm->schema[j].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
×
3715
      }
3716
      varDataSetLen(colTypeStr, colTypeLen);
×
3717
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, (char *)colTypeStr, false), &lino, _OVER);
×
3718

3719
      pColInfoData = taosArrayGet(p->pDataBlock, 5);
×
3720
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, (const char *)&pm->schema[j].bytes, false), &lino, _OVER);
×
3721
      for (int32_t k = 6; k <= 8; ++k) {
×
3722
        pColInfoData = taosArrayGet(p->pDataBlock, k);
×
3723
        colDataSetNULL(pColInfoData, numOfRows);
×
3724
      }
3725

3726
      numOfRows += 1;
×
3727
    }
3728
  }
3729
_OVER:
×
3730
  mError("failed at %s:%d since %s", __FUNCTION__, lino, tstrerror(code));
×
3731
  return numOfRows;
×
3732
}
3733
#define BUILD_COL_FOR_INFO_DB 1
3734
#define BUILD_COL_FOR_PERF_DB 1 << 1
3735
#define BUILD_COL_FOR_USER_DB 1 << 2
3736
#define BUILD_COL_FOR_ALL_DB  (BUILD_COL_FOR_INFO_DB | BUILD_COL_FOR_PERF_DB | BUILD_COL_FOR_USER_DB)
3737

3738
static int32_t buildSysDbColsInfo(SSDataBlock *p, int8_t buildWhichDBs, char *tb) {
×
3739
  size_t               size = 0;
×
3740
  const SSysTableMeta *pSysDbTableMeta = NULL;
×
3741

3742
  if (buildWhichDBs & BUILD_COL_FOR_INFO_DB) {
×
3743
    getInfosDbMeta(&pSysDbTableMeta, &size);
×
3744
    p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB, tb);
×
3745
  }
3746

3747
  if (buildWhichDBs & BUILD_COL_FOR_PERF_DB) {
×
3748
    getPerfDbMeta(&pSysDbTableMeta, &size);
×
3749
    p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB, tb);
×
3750
  }
3751

3752
  return p->info.rows;
×
3753
}
3754

3755
static int8_t determineBuildColForWhichDBs(const char *db) {
×
3756
  int8_t buildWhichDBs;
3757
  if (!db[0])
×
3758
    buildWhichDBs = BUILD_COL_FOR_ALL_DB;
×
3759
  else {
3760
    char *p = strchr(db, '.');
×
3761
    if (p && strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB) == 0) {
×
3762
      buildWhichDBs = BUILD_COL_FOR_INFO_DB;
×
3763
    } else if (p && strcmp(p + 1, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
×
3764
      buildWhichDBs = BUILD_COL_FOR_PERF_DB;
×
3765
    } else {
3766
      buildWhichDBs = BUILD_COL_FOR_USER_DB;
×
3767
    }
3768
  }
3769
  return buildWhichDBs;
×
3770
}
3771

3772
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
3773
  uint8_t  buildWhichDBs;
3774
  SMnode  *pMnode = pReq->info.node;
×
3775
  SSdb    *pSdb = pMnode->pSdb;
×
3776
  SStbObj *pStb = NULL;
×
3777
  int32_t  numOfRows = 0;
×
3778
  int32_t  lino = 0;
×
3779
  int32_t  code = 0;
×
3780

3781
  buildWhichDBs = determineBuildColForWhichDBs(pShow->db);
×
3782

3783
  if (!pShow->sysDbRsp) {
×
3784
    numOfRows = buildSysDbColsInfo(pBlock, buildWhichDBs, pShow->filterTb);
×
3785
    mDebug("mndRetrieveStbCol get system table cols, rows:%d, db:%s", numOfRows, pShow->db);
×
3786
    pShow->sysDbRsp = true;
×
3787
  }
3788

3789
  if (buildWhichDBs & BUILD_COL_FOR_USER_DB) {
×
3790
    SDbObj *pDb = NULL;
×
3791
    if (strlen(pShow->db) > 0) {
×
3792
      pDb = mndAcquireDb(pMnode, pShow->db);
×
3793
      if (pDb == NULL && TSDB_CODE_MND_DB_NOT_EXIST != terrno && pBlock->info.rows == 0) return terrno;
×
3794
    }
3795

3796
    char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
3797
    STR_TO_VARSTR(typeName, "SUPER_TABLE");
×
3798
    bool fetch = pShow->restore ? false : true;
×
3799
    pShow->restore = false;
×
3800
    while (numOfRows < rows) {
×
3801
      if (fetch) {
×
3802
        pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
×
3803
        if (pShow->pIter == NULL) break;
×
3804
      } else {
3805
        fetch = true;
×
3806
        void *pKey = taosHashGetKey(pShow->pIter, NULL);
×
3807
        pStb = sdbAcquire(pSdb, SDB_STB, pKey);
×
3808
        if (!pStb) continue;
×
3809
      }
3810

3811
      if (pDb != NULL && pStb->dbUid != pDb->uid) {
×
3812
        sdbRelease(pSdb, pStb);
×
3813
        continue;
×
3814
      }
3815

3816
      SName name = {0};
×
3817
      char  stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
3818
      mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
×
3819
      if (pShow->filterTb[0] && strncmp(pShow->filterTb, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN) != 0) {
×
3820
        sdbRelease(pSdb, pStb);
×
3821
        continue;
×
3822
      }
3823

3824
      if ((numOfRows + pStb->numOfColumns) > rows) {
×
3825
        pShow->restore = true;
×
3826
        if (numOfRows == 0) {
×
3827
          mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
×
3828
                 rows, pStb->numOfColumns, pStb->name, pStb->db);
3829
        }
3830
        sdbRelease(pSdb, pStb);
×
3831
        break;
×
3832
      }
3833

3834
      varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
×
3835

3836
      mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
×
3837

3838
      char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
3839
      RETRIEVE_CHECK_GOTO(tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB), pStb, &lino, _OVER);
×
3840
      RETRIEVE_CHECK_GOTO(tNameGetDbName(&name, varDataVal(db)), pStb, &lino, _OVER);
×
3841
      varDataSetLen(db, strlen(varDataVal(db)));
×
3842

3843
      for (int i = 0; i < pStb->numOfColumns; i++) {
×
3844
        int32_t          cols = 0;
×
3845
        SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3846
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false), pStb, &lino, _OVER);
×
3847

3848
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3849
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)db, false), pStb, &lino, _OVER);
×
3850

3851
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3852
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, typeName, false), pStb, &lino, _OVER);
×
3853

3854
        // col name
3855
        char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
3856
        STR_TO_VARSTR(colName, pStb->pColumns[i].name);
×
3857
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3858
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, colName, false), pStb, &lino, _OVER);
×
3859

3860
        // col type
3861
        int8_t colType = pStb->pColumns[i].type;
×
3862
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3863
        char colTypeStr[VARSTR_HEADER_SIZE + 32];
3864
        int  colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
×
3865
        if (colType == TSDB_DATA_TYPE_VARCHAR) {
×
3866
          colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
×
3867
                                (int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
×
3868
        } else if (colType == TSDB_DATA_TYPE_NCHAR) {
×
3869
          colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
×
3870
                                (int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
×
3871
        }
3872
        varDataSetLen(colTypeStr, colTypeLen);
×
3873
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false), pStb, &lino, _OVER);
×
3874

3875
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3876
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false), pStb,
×
3877
                            &lino, _OVER);
3878
        while (cols < pShow->numOfColumns) {
×
3879
          pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3880
          colDataSetNULL(pColInfo, numOfRows);
×
3881
        }
3882
        numOfRows++;
×
3883
      }
3884

3885
      sdbRelease(pSdb, pStb);
×
3886
    }
3887

3888
    if (pDb != NULL) {
×
3889
      mndReleaseDb(pMnode, pDb);
×
3890
    }
3891
  }
3892

3893
  mDebug("mndRetrieveStbCol success, rows:%d, pShow->numOfRows:%d", numOfRows, pShow->numOfRows);
×
3894
  goto _OVER;
×
3895

3896
_ERROR:
3897
  mError("failed to mndRetrieveStbCol, rows:%d, pShow->numOfRows:%d, at %s:%d since %s", numOfRows, pShow->numOfRows,
3898
         __FUNCTION__, lino, tstrerror(code));
3899

3900
_OVER:
×
3901
  pShow->numOfRows += numOfRows;
×
3902
  return numOfRows;
×
3903
}
3904

3905
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) {
×
3906
  SSdb *pSdb = pMnode->pSdb;
×
3907
  sdbCancelFetchByType(pSdb, pIter, SDB_STB);
×
3908
}
×
3909

3910
const char *mndGetStbStr(const char *src) {
360✔
3911
  char *posDb = strstr(src, TS_PATH_DELIMITER);
360✔
3912
  if (posDb != NULL) ++posDb;
360✔
3913
  if (posDb == NULL) return src;
360✔
3914

3915
  char *posStb = strstr(posDb, TS_PATH_DELIMITER);
359✔
3916
  if (posStb != NULL) ++posStb;
359!
3917
  if (posStb == NULL) return posDb;
359!
3918
  return posStb;
359✔
3919
}
3920

3921
static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
×
3922
  // impl
3923
  return TSDB_CODE_SUCCESS;
×
3924
}
3925

3926
/*int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void *sql,
3927
                        int32_t len) {
3928
  // impl later
3929
  int32_t code = 0;
3930
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-stb-index");
3931
  if (pTrans == NULL) goto _OVER;
3932

3933
  mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
3934
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
3935
  if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
3936

3937
  if (mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
3938
  if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
3939
  if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, sql, len) != 0) goto _OVER;
3940
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
3941

3942
  return code;
3943

3944
_OVER:
3945
  mndTransDrop(pTrans);
3946
  return code;
3947
}
3948
static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *tagIdxReq, SDbObj *pDb, SStbObj *pOld) {
3949
  bool    needRsp = true;
3950
  int32_t code = -1;
3951
  SField *pField0 = NULL;
3952

3953
  SStbObj  stbObj = {0};
3954
  SStbObj *pNew = &stbObj;
3955

3956
  taosRLockLatch(&pOld->lock);
3957
  memcpy(&stbObj, pOld, sizeof(SStbObj));
3958
  taosRUnLockLatch(&pOld->lock);
3959

3960
  stbObj.pColumns = NULL;
3961
  stbObj.pTags = NULL;
3962
  stbObj.updateTime = taosGetTimestampMs();
3963
  stbObj.lock = 0;
3964

3965
  int32_t tag = mndFindSuperTableTagIndex(pOld, tagIdxReq->colName);
3966
  if (tag < 0) {
3967
    terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
3968
    return -1;
3969
  }
3970
  col_id_t colId = pOld->pTags[tag].colId;
3971
  if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) {
3972
    return -1;
3973
  }
3974
  if (mndAllocStbSchemas(pOld, pNew) != 0) {
3975
    return -1;
3976
  }
3977

3978
  SSchema *pTag = pNew->pTags + tag;
3979
  if (IS_IDX_ON(pTag)) {
3980
    terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
3981
    return -1;
3982
  } else {
3983
    pTag->flags |= COL_IDX_ON;
3984
  }
3985
  pNew->tagVer++;
3986

3987
  code = mndAddIndexImpl(pMnode, pReq, pDb, pNew, needRsp, pReq->pCont, pReq->contLen);
3988

3989
  return code;
3990
}
3991
static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq) {
3992
  SMnode            *pMnode = pReq->info.node;
3993
  int32_t            code = -1;
3994
  SDbObj            *pDb = NULL;
3995
  SStbObj           *pStb = NULL;
3996
  SCreateTagIndexReq tagIdxReq = {0};
3997

3998
  if (tDeserializeSCreateTagIdxReq(pReq->pCont, pReq->contLen, &tagIdxReq) != 0) {
3999
    terrno = TSDB_CODE_INVALID_MSG;
4000
    goto _OVER;
4001
  }
4002

4003
  mInfo("stb:%s, start to alter", tagIdxReq.stbName);
4004

4005
  if (mndCheckIndexReq(&tagIdxReq) != TSDB_CODE_SUCCESS) {
4006
    goto _OVER;
4007
  }
4008

4009
  pDb = mndAcquireDbByStb(pMnode, tagIdxReq.dbFName);
4010
  if (pDb == NULL) {
4011
    terrno = TSDB_CODE_MND_DB_NOT_EXIST;
4012
    goto _OVER;
4013
  }
4014

4015
  pStb = mndAcquireStb(pMnode, tagIdxReq.stbName);
4016
  if (pStb == NULL) {
4017
    terrno = TSDB_CODE_MND_STB_NOT_EXIST;
4018
    goto _OVER;
4019
  }
4020
  if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
4021
    goto _OVER;
4022
  }
4023

4024
  code = mndAddIndex(pMnode, pReq, &tagIdxReq, pDb, pStb);
4025
  if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST || terrno == TSDB_CODE_MND_TAG_NOT_EXIST) {
4026
    return terrno;
4027
  } else {
4028
    if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
4029
  }
4030
_OVER:
4031
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
4032
    mError("stb:%s, failed to create index since %s", tagIdxReq.stbName, terrstr());
4033
  }
4034
  mndReleaseStb(pMnode, pStb);
4035
  mndReleaseDb(pMnode, pDb);
4036
  return code;
4037
}
4038
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq) {
4039
  SMnode          *pMnode = pReq->info.node;
4040
  int32_t          code = -1;
4041
  SDbObj          *pDb = NULL;
4042
  SStbObj         *pStb = NULL;
4043
  SDropTagIndexReq dropReq = {0};
4044
  if (tDeserializeSDropTagIdxReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
4045
    terrno = TSDB_CODE_INVALID_MSG;
4046
    goto _OVER;
4047
  }
4048
  //
4049
  return TSDB_CODE_SUCCESS;
4050
_OVER:
4051
  return code;
4052
}*/
4053

4054
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) {
×
4055
  int32_t code = mndProcessDropStbReq(pReq);
×
4056
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
4057
    pReq->info.rsp = rpcMallocCont(1);
×
4058
    pReq->info.rspLen = 1;
×
4059
    pReq->info.noResp = false;
×
4060
    pReq->code = code;
×
4061
  }
4062
  return code;
×
4063
}
4064

4065
typedef struct SVDropTbVgReqs {
4066
  SVDropTbBatchReq req;
4067
  SVgroupInfo      info;
4068
} SVDropTbVgReqs;
4069

4070
typedef struct SMDropTbDbInfo {
4071
  SArray *dbVgInfos;
4072
  int32_t hashPrefix;
4073
  int32_t hashSuffix;
4074
  int32_t hashMethod;
4075
} SMDropTbDbInfo;
4076

4077
typedef struct SMDropTbTsmaInfo {
4078
  char           tsmaResTbDbFName[TSDB_DB_FNAME_LEN];
4079
  char           tsmaResTbNamePrefix[TSDB_TABLE_FNAME_LEN];
4080
  int32_t        suid;
4081
  SMDropTbDbInfo dbInfo;  // reference to DbInfo in pDbMap
4082
} SMDropTbTsmaInfo;
4083

4084
typedef struct SMDropTbTsmaInfos {
4085
  SArray *pTsmaInfos;  // SMDropTbTsmaInfo
4086
} SMDropTbTsmaInfos;
4087

4088
typedef struct SMndDropTbsWithTsmaCtx {
4089
  SHashObj *pTsmaMap;     // <suid, SMDropTbTsmaInfos>
4090
  SHashObj *pDbMap;       // <dbuid, SMDropTbDbInfo>
4091
  SHashObj *pVgMap;       // <vgId, SVDropTbVgReqs>
4092
  SArray   *pResTbNames;  // SArray<char*>
4093
} SMndDropTbsWithTsmaCtx;
4094

4095
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
4096
                                                 int32_t vgId);
4097

4098
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
×
4099
  if (!p) return;
×
4100

4101
  if (p->pDbMap) {
×
4102
    void *pIter = taosHashIterate(p->pDbMap, NULL);
×
4103
    while (pIter) {
×
4104
      SMDropTbDbInfo *pInfo = pIter;
×
4105
      taosArrayDestroy(pInfo->dbVgInfos);
×
4106
      pIter = taosHashIterate(p->pDbMap, pIter);
×
4107
    }
4108
    taosHashCleanup(p->pDbMap);
×
4109
  }
4110
  if (p->pResTbNames) {
×
4111
    taosArrayDestroyP(p->pResTbNames, taosMemoryFree);
×
4112
  }
4113
  if (p->pTsmaMap) {
×
4114
    void *pIter = taosHashIterate(p->pTsmaMap, NULL);
×
4115
    while (pIter) {
×
4116
      SMDropTbTsmaInfos *pInfos = pIter;
×
4117
      taosArrayDestroy(pInfos->pTsmaInfos);
×
4118
      pIter = taosHashIterate(p->pTsmaMap, pIter);
×
4119
    }
4120
    taosHashCleanup(p->pTsmaMap);
×
4121
  }
4122

4123
  if (p->pVgMap) {
×
4124
    void *pIter = taosHashIterate(p->pVgMap, NULL);
×
4125
    while (pIter) {
×
4126
      SVDropTbVgReqs *pReqs = pIter;
×
4127
      taosArrayDestroy(pReqs->req.pArray);
×
4128
      pIter = taosHashIterate(p->pVgMap, pIter);
×
4129
    }
4130
    taosHashCleanup(p->pVgMap);
×
4131
  }
4132
  taosMemoryFree(p);
×
4133
}
4134

4135
static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx **ppCtx) {
×
4136
  int32_t                 code = 0;
×
4137
  SMndDropTbsWithTsmaCtx *pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx));
×
4138
  if (!pCtx) return terrno;
×
4139
  pCtx->pTsmaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
4140
  if (!pCtx->pTsmaMap) {
×
4141
    code = terrno;
×
4142
    goto _end;
×
4143
  }
4144

4145
  pCtx->pDbMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
×
4146
  if (!pCtx->pDbMap) {
×
4147
    code = terrno;
×
4148
    goto _end;
×
4149
  }
4150
  pCtx->pResTbNames = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
×
4151

4152
  pCtx->pVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
4153
  if (!pCtx->pVgMap) {
×
4154
    code = terrno;
×
4155
    goto _end;
×
4156
  }
4157
  *ppCtx = pCtx;
×
4158
_end:
×
4159
  if (code) mndDestroyDropTbsWithTsmaCtx(pCtx);
×
4160
  return code;
×
4161
}
4162

4163
static void *mndBuildVDropTbsReq(SMnode *pMnode, const SVgroupInfo *pVgInfo, const SVDropTbBatchReq *pReq,
×
4164
                                 int32_t *len) {
4165
  int32_t   contLen = 0;
×
4166
  int32_t   ret = 0;
×
4167
  SMsgHead *pHead = NULL;
×
4168
  SEncoder  encoder = {0};
×
4169

4170
  tEncodeSize(tEncodeSVDropTbBatchReq, pReq, contLen, ret);
×
4171
  if (ret < 0) return NULL;
×
4172

4173
  contLen += sizeof(SMsgHead);
×
4174
  pHead = taosMemoryMalloc(contLen);
×
4175
  if (pHead == NULL) {
×
4176
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4177
    return NULL;
×
4178
  }
4179

4180
  pHead->contLen = htonl(contLen);
×
4181
  pHead->vgId = htonl(pVgInfo->vgId);
×
4182

4183
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
×
4184

4185
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
×
4186
  int32_t code = tEncodeSVDropTbBatchReq(&encoder, pReq);
×
4187
  tEncoderClear(&encoder);
×
4188
  if (code != 0) return NULL;
×
4189

4190
  *len = contLen;
×
4191
  return pHead;
×
4192
}
4193

4194
static int32_t mndSetDropTbsRedoActions(SMnode *pMnode, STrans *pTrans, const SVDropTbVgReqs *pVgReqs, void *pCont,
×
4195
                                        int32_t contLen) {
4196
  STransAction action = {0};
×
4197
  action.epSet = pVgReqs->info.epSet;
×
4198
  action.pCont = pCont;
×
4199
  action.contLen = contLen;
×
4200
  action.msgType = TDMT_VND_DROP_TABLE;
×
4201
  action.acceptableCode = TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
4202
  return mndTransAppendRedoAction(pTrans, &action);
×
4203
}
4204

4205
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx *pCtx) {
×
4206
  int32_t code = 0;
×
4207
  SMnode *pMnode = pRsp->info.node;
×
4208
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs");
×
4209
  mndTransSetChangeless(pTrans);
×
4210
  mndTransSetSerial(pTrans);
×
4211
  if (pTrans == NULL) {
×
4212
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4213
    if (terrno != 0) code = terrno;
×
4214
    goto _OVER;
×
4215
  }
4216

4217
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
4218

4219
  void *pIter = taosHashIterate(pCtx->pVgMap, NULL);
×
4220
  while (pIter) {
×
4221
    const SVDropTbVgReqs *pVgReqs = pIter;
×
4222
    int32_t               len = 0;
×
4223
    void                 *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len);
×
4224
    if (!p) {
×
4225
      taosHashCancelIterate(pCtx->pVgMap, pIter);
×
4226
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4227
      if (terrno != 0) code = terrno;
×
4228
      goto _OVER;
×
4229
    }
4230
    if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len)) != 0) {
×
4231
      taosHashCancelIterate(pCtx->pVgMap, pIter);
×
4232
      goto _OVER;
×
4233
    }
4234
    pIter = taosHashIterate(pCtx->pVgMap, pIter);
×
4235
  }
4236
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
×
4237

4238
_OVER:
×
4239
  mndTransDrop(pTrans);
×
4240
  TAOS_RETURN(code);
×
4241
}
4242

4243
static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
×
4244
  int32_t      code = -1;
×
4245
  SMnode      *pMnode = pReq->info.node;
×
4246
  SDbObj      *pDb = NULL;
×
4247
  SStbObj     *pStb = NULL;
×
4248
  SMDropTbsReq dropReq = {0};
×
4249
  bool         locked = false;
×
4250
  if (tDeserializeSMDropTbsReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
×
4251
    code = TSDB_CODE_INVALID_MSG;
×
4252
    goto _OVER;
×
4253
  }
4254

4255
  SMndDropTbsWithTsmaCtx *pCtx = NULL;
×
4256
  code = mndInitDropTbsWithTsmaCtx(&pCtx);
×
4257
  if (code) goto _OVER;
×
4258
  for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
×
4259
    SMDropTbReqsOnSingleVg *pReq = taosArrayGet(dropReq.pVgReqs, i);
×
4260
    code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
×
4261
    if (code) goto _OVER;
×
4262
  }
4263
  if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) {
×
4264
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
4265
  }
4266
_OVER:
×
4267
  tFreeSMDropTbsReq(&dropReq);
×
4268
  if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
×
4269
  TAOS_RETURN(code);
×
4270
}
4271

4272
static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupInfo *pVgInfo, char *name, tb_uid_t suid,
×
4273
                            bool ignoreNotExists) {
4274
  SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists};
×
4275

4276
  SVDropTbVgReqs *pReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
×
4277
  SVDropTbVgReqs  reqs = {0};
×
4278
  if (pReqs == NULL) {
×
4279
    reqs.info = *pVgInfo;
×
4280
    reqs.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
×
4281
    if (reqs.req.pArray == NULL) {
×
4282
      return terrno;
×
4283
    }
4284
    if (taosArrayPush(reqs.req.pArray, &req) == NULL) {
×
4285
      return terrno;
×
4286
    }
4287
    if (taosHashPut(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &reqs, sizeof(reqs)) != 0) {
×
4288
      return terrno;
×
4289
    }
4290
  } else {
4291
    if (taosArrayPush(pReqs->req.pArray, &req) == NULL) {
×
4292
      return terrno;
×
4293
    }
4294
  }
4295
  return 0;
×
4296
}
4297

4298
int vgInfoCmp(const void *lp, const void *rp) {
×
4299
  SVgroupInfo *pLeft = (SVgroupInfo *)lp;
×
4300
  SVgroupInfo *pRight = (SVgroupInfo *)rp;
×
4301
  if (pLeft->hashBegin < pRight->hashBegin) {
×
4302
    return -1;
×
4303
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
4304
    return 1;
×
4305
  }
4306

4307
  return 0;
×
4308
}
4309

4310
static int32_t mndGetDbVgInfoForTsma(SMnode *pMnode, const char *dbname, SMDropTbTsmaInfo *pInfo) {
×
4311
  int32_t code = 0;
×
4312
  SDbObj *pDb = mndAcquireDb(pMnode, dbname);
×
4313
  if (!pDb) {
×
4314
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
4315
    goto _end;
×
4316
  }
4317

4318
  pInfo->dbInfo.dbVgInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
×
4319
  if (!pInfo->dbInfo.dbVgInfos) {
×
4320
    code = terrno;
×
4321
    goto _end;
×
4322
  }
4323
  mndBuildDBVgroupInfo(pDb, pMnode, pInfo->dbInfo.dbVgInfos);
×
4324
  taosArraySort(pInfo->dbInfo.dbVgInfos, vgInfoCmp);
×
4325

4326
  pInfo->dbInfo.hashPrefix = pDb->cfg.hashPrefix;
×
4327
  pInfo->dbInfo.hashSuffix = pDb->cfg.hashSuffix;
×
4328
  pInfo->dbInfo.hashMethod = pDb->cfg.hashMethod;
×
4329

4330
_end:
×
4331
  if (pDb) mndReleaseDb(pMnode, pDb);
×
4332
  if (code && pInfo->dbInfo.dbVgInfos) {
×
4333
    taosArrayDestroy(pInfo->dbInfo.dbVgInfos);
×
4334
    pInfo->dbInfo.dbVgInfos = NULL;
×
4335
  }
4336
  TAOS_RETURN(code);
×
4337
}
4338

4339
int32_t vgHashValCmp(const void *lp, const void *rp) {
×
4340
  uint32_t    *key = (uint32_t *)lp;
×
4341
  SVgroupInfo *pVg = (SVgroupInfo *)rp;
×
4342

4343
  if (*key < pVg->hashBegin) {
×
4344
    return -1;
×
4345
  } else if (*key > pVg->hashEnd) {
×
4346
    return 1;
×
4347
  }
4348

4349
  return 0;
×
4350
}
4351

4352
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
×
4353
                                                 int32_t vgId) {
4354
  int32_t code = 0;
×
4355

4356
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
×
4357
  if (!pVgObj) {
×
4358
    code = 0;
×
4359
    goto _end;
×
4360
  }
4361
  SVgroupInfo vgInfo = {.hashBegin = pVgObj->hashBegin,
×
4362
                        .hashEnd = pVgObj->hashEnd,
×
4363
                        .numOfTable = pVgObj->numOfTables,
×
4364
                        .vgId = pVgObj->vgId};
×
4365
  vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj);
×
4366
  mndReleaseVgroup(pMnode, pVgObj);
×
4367

4368
  // get all stb uids
4369
  for (int32_t i = 0; i < pTbs->size; ++i) {
×
4370
    const SVDropTbReq *pTb = taosArrayGet(pTbs, i);
×
4371
    if (taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid))) {
×
4372
    } else {
4373
      SMDropTbTsmaInfos infos = {0};
×
4374
      infos.pTsmaInfos = taosArrayInit(2, sizeof(SMDropTbTsmaInfo));
×
4375
      if (!infos.pTsmaInfos) {
×
4376
        code = terrno;
×
4377
        goto _end;
×
4378
      }
4379
      if (taosHashPut(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid), &infos, sizeof(infos)) != 0) {
×
4380
        code = terrno;
×
4381
        goto _end;
×
4382
      }
4383
    }
4384
  }
4385

4386
  void    *pIter = NULL;
×
4387
  SSmaObj *pSma = NULL;
×
4388
  char     buf[TSDB_TABLE_FNAME_LEN] = {0};
×
4389
  // get used tsmas and it's dbs
4390
  while (1) {
×
4391
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
4392
    if (!pIter) break;
×
4393
    SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid));
×
4394
    if (pInfos) {
×
4395
      SMDropTbTsmaInfo info = {0};
×
4396
      int32_t          len = sprintf(buf, "%s", pSma->name);
×
4397
      sprintf(info.tsmaResTbDbFName, "%s", pSma->db);
×
4398
      snprintf(info.tsmaResTbNamePrefix, TSDB_TABLE_FNAME_LEN, "%s", buf);
×
4399
      SMDropTbDbInfo *pDbInfo = taosHashGet(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN);
×
4400
      info.suid = pSma->dstTbUid;
×
4401
      if (!pDbInfo) {
×
4402
        code = mndGetDbVgInfoForTsma(pMnode, pSma->db, &info);
×
4403
        if (code != TSDB_CODE_SUCCESS) {
×
4404
          sdbCancelFetch(pMnode->pSdb, pIter);
×
4405
          sdbRelease(pMnode->pSdb, pSma);
×
4406
          goto _end;
×
4407
        }
4408
        if (taosHashPut(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN, &info.dbInfo, sizeof(SMDropTbDbInfo)) != 0) {
×
4409
          sdbCancelFetch(pMnode->pSdb, pIter);
×
4410
          sdbRelease(pMnode->pSdb, pSma);
×
4411
          goto _end;
×
4412
        }
4413
      } else {
4414
        info.dbInfo = *pDbInfo;
×
4415
      }
4416
      if (taosArrayPush(pInfos->pTsmaInfos, &info) == NULL) {
×
4417
        code = terrno;
×
4418
        sdbCancelFetch(pMnode->pSdb, pIter);
×
4419
        sdbRelease(pMnode->pSdb, pSma);
×
4420
        goto _end;
×
4421
      }
4422
    }
4423
    sdbRelease(pMnode->pSdb, pSma);
×
4424
  }
4425

4426
  // generate vg req map
4427
  for (int32_t i = 0; i < pTbs->size; ++i) {
×
4428
    SVDropTbReq *pTb = taosArrayGet(pTbs, i);
×
4429
    TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists), NULL, _end);
×
4430

4431
    SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid));
×
4432
    SArray            *pVgInfos = NULL;
×
4433
    char               buf[TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1];
4434
    char               resTbFullName[TSDB_TABLE_FNAME_LEN + 1] = {0};
×
4435
    for (int32_t j = 0; j < pInfos->pTsmaInfos->size; ++j) {
×
4436
      SMDropTbTsmaInfo *pInfo = taosArrayGet(pInfos->pTsmaInfos, j);
×
4437
      int32_t           len = sprintf(buf, "%s_%s", pInfo->tsmaResTbNamePrefix, pTb->name);
×
4438
      len = taosCreateMD5Hash(buf, len);
×
4439
      len = snprintf(resTbFullName, TSDB_TABLE_FNAME_LEN + 1, "%s.%s", pInfo->tsmaResTbDbFName, buf);
×
4440
      uint32_t hashVal = taosGetTbHashVal(resTbFullName, len, pInfo->dbInfo.hashMethod, pInfo->dbInfo.hashPrefix,
×
4441
                                          pInfo->dbInfo.hashSuffix);
4442
      const SVgroupInfo *pVgInfo = taosArraySearch(pInfo->dbInfo.dbVgInfos, &hashVal, vgHashValCmp, TD_EQ);
×
4443
      void              *p = taosStrdup(resTbFullName + strlen(pInfo->tsmaResTbDbFName) + TSDB_NAME_DELIMITER_LEN);
×
4444
      if (taosArrayPush(pCtx->pResTbNames, &p) == NULL) {
×
4445
        code = terrno;
×
4446
        goto _end;
×
4447
      }
4448
      TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pVgMap, pVgInfo, p, pInfo->suid, true), NULL, _end);
×
4449
    }
4450
  }
4451
_end:
×
4452
  return code;
×
4453
}
4454

4455
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
396,518✔
4456
  int32_t                 code = -1;
396,518✔
4457
  SDecoder                decoder = {0};
396,518✔
4458
  SMnode                 *pMnode = pRsp->info.node;
396,518✔
4459
  SVFetchTtlExpiredTbsRsp rsp = {0};
396,518✔
4460
  SMndDropTbsWithTsmaCtx *pCtx = NULL;
396,518✔
4461
  if (pRsp->code != TSDB_CODE_SUCCESS) {
396,518✔
4462
    code = pRsp->code;
95,236✔
4463
    goto _end;
95,236✔
4464
  }
4465
  if (pRsp->contLen == 0) {
301,282!
4466
    code = 0;
301,282✔
4467
    goto _end;
301,282✔
4468
  }
4469

4470
  tDecoderInit(&decoder, pRsp->pCont, pRsp->contLen);
×
4471
  code = tDecodeVFetchTtlExpiredTbsRsp(&decoder, &rsp);
×
4472
  if (code) goto _end;
×
4473

4474
  code = mndInitDropTbsWithTsmaCtx(&pCtx);
×
4475
  if (code) goto _end;
×
4476

4477
  code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
×
4478
  if (code) goto _end;
×
4479
  if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
4480
_end:
×
4481
  if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
396,518!
4482
  tDecoderClear(&decoder);
396,518✔
4483
  tFreeFetchTtlExpiredTbsRsp(&rsp);
396,518✔
4484
  TAOS_RETURN(code);
396,518✔
4485
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc