• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3546

03 Dec 2024 10:02AM UTC coverage: 60.691% (-0.1%) from 60.839%
#3546

push

travis-ci

web-flow
Merge pull request #29015 from taosdata/fix/TS-5668

[TS-5668] fix(keeper): fix endpoint value too long for column/tag and eliminate warnings

120577 of 253823 branches covered (47.5%)

Branch coverage included in aggregate %.

201666 of 277134 relevant lines covered (72.77%)

18719900.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.99
/source/dnode/mnode/impl/src/mndStb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndStb.h"
18
#include "audit.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndIndex.h"
22
#include "mndIndexComm.h"
23
#include "mndInfoSchema.h"
24
#include "mndMnode.h"
25
#include "mndPerfSchema.h"
26
#include "mndPrivilege.h"
27
#include "mndScheduler.h"
28
#include "mndShow.h"
29
#include "mndSma.h"
30
#include "mndTopic.h"
31
#include "mndTrans.h"
32
#include "mndUser.h"
33
#include "mndVgroup.h"
34
#include "tname.h"
35

36
#define STB_VER_NUMBER   2
37
#define STB_RESERVE_SIZE 64
38

39
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
40
static int32_t  mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
41
static int32_t  mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
42
static int32_t  mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
43
static int32_t  mndProcessTtlTimer(SRpcMsg *pReq);
44
static int32_t  mndProcessTrimDbTimer(SRpcMsg *pReq);
45
static int32_t  mndProcessS3MigrateDbTimer(SRpcMsg *pReq);
46
static int32_t  mndProcessS3MigrateDbRsp(SRpcMsg *pReq);
47
static int32_t  mndProcessCreateStbReq(SRpcMsg *pReq);
48
static int32_t  mndProcessAlterStbReq(SRpcMsg *pReq);
49
static int32_t  mndProcessDropStbReq(SRpcMsg *pReq);
50
static int32_t  mndProcessDropTtltbRsp(SRpcMsg *pReq);
51
static int32_t  mndProcessTrimDbRsp(SRpcMsg *pReq);
52
static int32_t  mndProcessTableMetaReq(SRpcMsg *pReq);
53
static int32_t  mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
54
static int32_t  mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
55
static void     mndCancelGetNextStb(SMnode *pMnode, void *pIter);
56
static int32_t  mndProcessTableCfgReq(SRpcMsg *pReq);
57
static int32_t  mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
58
                               void *alterOriData, int32_t alterOriDataLen);
59
static int32_t  mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
60
                                              void *alterOriData, int32_t alterOriDataLen, const SMAlterStbReq *pAlter);
61

62
static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq);
63
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq);
64

65
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq);
66
static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq);
67
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pReq);
68

69
int32_t mndInitStb(SMnode *pMnode) {
2,014✔
70
  SSdbTable table = {
2,014✔
71
      .sdbType = SDB_STB,
72
      .keyType = SDB_KEY_BINARY,
73
      .encodeFp = (SdbEncodeFp)mndStbActionEncode,
74
      .decodeFp = (SdbDecodeFp)mndStbActionDecode,
75
      .insertFp = (SdbInsertFp)mndStbActionInsert,
76
      .updateFp = (SdbUpdateFp)mndStbActionUpdate,
77
      .deleteFp = (SdbDeleteFp)mndStbActionDelete,
78
  };
79

80
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessCreateStbReq);
2,014✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq);
2,014✔
82
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq);
2,014✔
83
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp);
2,014✔
84
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbRsp);
2,014✔
85
  mndSetMsgHandle(pMnode, TDMT_VND_TRIM_RSP, mndProcessTrimDbRsp);
2,014✔
86
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
2,014✔
87
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
2,014✔
88
  mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
2,014✔
89
  mndSetMsgHandle(pMnode, TDMT_MND_TTL_TIMER, mndProcessTtlTimer);
2,014✔
90
  mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB_TIMER, mndProcessTrimDbTimer);
2,014✔
91
  mndSetMsgHandle(pMnode, TDMT_VND_S3MIGRATE_RSP, mndProcessS3MigrateDbRsp);
2,014✔
92
  mndSetMsgHandle(pMnode, TDMT_MND_S3MIGRATE_DB_TIMER, mndProcessS3MigrateDbTimer);
2,014✔
93
  mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
2,014✔
94
  mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP, mndProcessDropStbReqFromMNode);
2,014✔
95
  mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP_RSP, mndTransProcessRsp);
2,014✔
96
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma);
2,014✔
97
  mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs);
2,014✔
98
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_TABLE_RSP, mndTransProcessRsp);
2,014✔
99
  //  mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
100

101
  // mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq);
102
  // mndSetMsgHandle(pMnode, TDMT_MND_DROP_INDEX, mndProcessDropIndexReq);
103
  // mndSetMsgHandle(pMnode, TDMT_VND_CREATE_INDEX_RSP, mndTransProcessRsp);
104
  // mndSetMsgHandle(pMnode, TDMT_VND_DROP_INDEX_RSP, mndTransProcessRsp);
105

106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
2,014✔
107
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb);
2,014✔
108

109
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COL, mndRetrieveStbCol);
2,014✔
110
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_COL, mndCancelGetNextStb);
2,014✔
111

112
  return sdbSetTable(pMnode->pSdb, table);
2,014✔
113
}
114

115
void mndCleanupStb(SMnode *pMnode) {}
2,013✔
116

117
SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
43,092✔
118
  int32_t code = 0;
43,092✔
119
  int32_t lino = 0;
43,092✔
120
  terrno = TSDB_CODE_OUT_OF_MEMORY;
43,092✔
121

122
  int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + pStb->commentLen +
86,184✔
123
                 pStb->ast1Len + pStb->ast2Len + pStb->numOfColumns * sizeof(SColCmpr) + STB_RESERVE_SIZE +
86,184✔
124
                 taosArrayGetSize(pStb->pFuncs) * TSDB_FUNC_NAME_LEN;
43,092✔
125
  SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
43,092✔
126
  if (pRaw == NULL) goto _OVER;
43,092!
127

128
  int32_t dataPos = 0;
43,092✔
129
  SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN, _OVER)
43,092!
130
  SDB_SET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN, _OVER)
43,092!
131
  SDB_SET_INT64(pRaw, dataPos, pStb->createdTime, _OVER)
43,092!
132
  SDB_SET_INT64(pRaw, dataPos, pStb->updateTime, _OVER)
43,092!
133
  SDB_SET_INT64(pRaw, dataPos, pStb->uid, _OVER)
43,092!
134
  SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, _OVER)
43,092!
135
  SDB_SET_INT32(pRaw, dataPos, pStb->tagVer, _OVER)
43,092!
136
  SDB_SET_INT32(pRaw, dataPos, pStb->colVer, _OVER)
43,092!
137
  SDB_SET_INT32(pRaw, dataPos, pStb->smaVer, _OVER)
43,092!
138
  SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER)
43,092!
139
  SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[0], _OVER)
43,092!
140
  SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[1], _OVER)
43,092!
141
  SDB_SET_INT64(pRaw, dataPos, pStb->watermark[0], _OVER)
43,092!
142
  SDB_SET_INT64(pRaw, dataPos, pStb->watermark[1], _OVER)
43,092!
143
  SDB_SET_INT32(pRaw, dataPos, pStb->ttl, _OVER)
43,092!
144
  SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, _OVER)
43,092!
145
  SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER)
43,092!
146
  SDB_SET_INT32(pRaw, dataPos, pStb->numOfFuncs, _OVER)
43,092!
147
  SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, _OVER)
43,092!
148
  SDB_SET_INT32(pRaw, dataPos, pStb->ast1Len, _OVER)
43,092!
149
  SDB_SET_INT32(pRaw, dataPos, pStb->ast2Len, _OVER)
43,092!
150

151
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
1,588,351✔
152
    SSchema *pSchema = &pStb->pColumns[i];
1,545,259✔
153
    SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
1,545,259!
154
    SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
1,545,259!
155
    SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
1,545,259!
156
    SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
1,545,259!
157
    SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
1,545,259!
158
  }
159

160
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
357,236✔
161
    SSchema *pSchema = &pStb->pTags[i];
314,144✔
162
    SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
314,144!
163
    SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
314,144!
164
    SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
314,144!
165
    SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
314,144!
166
    SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
314,144!
167
  }
168

169
  for (int32_t i = 0; i < pStb->numOfFuncs; ++i) {
43,098✔
170
    char *func = taosArrayGet(pStb->pFuncs, i);
6✔
171
    SDB_SET_BINARY(pRaw, dataPos, func, TSDB_FUNC_NAME_LEN, _OVER)
6!
172
  }
173

174
  if (pStb->commentLen > 0) {
43,092✔
175
    SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen + 1, _OVER)
158!
176
  }
177

178
  if (pStb->ast1Len > 0) {
43,092✔
179
    SDB_SET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
6!
180
  }
181

182
  if (pStb->ast2Len > 0) {
43,092✔
183
    SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
6!
184
  }
185

186
  if (pStb->pCmpr != NULL) {
43,092!
187
    for (int i = 0; i < pStb->numOfColumns; i++) {
1,588,351✔
188
      SColCmpr *p = &pStb->pCmpr[i];
1,545,259✔
189
      SDB_SET_INT16(pRaw, dataPos, p->id, _OVER)
1,545,259!
190
      SDB_SET_INT32(pRaw, dataPos, p->alg, _OVER)
1,545,259!
191
    }
192
  }
193
  SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
43,092!
194
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
43,092!
195

196
  terrno = 0;
43,092✔
197

198
_OVER:
43,092✔
199
  if (terrno != 0) {
43,092!
200
    mError("stb:%s, failed to encode to raw:%p since %s", pStb->name, pRaw, terrstr());
×
201
    sdbFreeRaw(pRaw);
×
202
    return NULL;
×
203
  }
204

205
  mTrace("stb:%s, encode to raw:%p, row:%p", pStb->name, pRaw, pStb);
43,092✔
206
  return pRaw;
43,092✔
207
}
208

209
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
26,803✔
210
  int32_t code = 0;
26,803✔
211
  int32_t lino = 0;
26,803✔
212
  terrno = TSDB_CODE_OUT_OF_MEMORY;
26,803✔
213
  SSdbRow *pRow = NULL;
26,803✔
214
  SStbObj *pStb = NULL;
26,803✔
215

216
  int8_t sver = 0;
26,803✔
217
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
26,803!
218

219
  if (sver > STB_VER_NUMBER) {
26,803!
220
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
221
    goto _OVER;
×
222
  }
223

224
  pRow = sdbAllocRow(sizeof(SStbObj));
26,803✔
225
  if (pRow == NULL) goto _OVER;
26,803!
226

227
  pStb = sdbGetRowObj(pRow);
26,803✔
228
  if (pStb == NULL) goto _OVER;
26,803!
229

230
  int32_t dataPos = 0;
26,803✔
231
  SDB_GET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN, _OVER)
26,803!
232
  SDB_GET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN, _OVER)
26,803!
233
  SDB_GET_INT64(pRaw, dataPos, &pStb->createdTime, _OVER)
26,803!
234
  SDB_GET_INT64(pRaw, dataPos, &pStb->updateTime, _OVER)
26,803!
235
  SDB_GET_INT64(pRaw, dataPos, &pStb->uid, _OVER)
26,803!
236
  SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, _OVER)
26,803!
237
  SDB_GET_INT32(pRaw, dataPos, &pStb->tagVer, _OVER)
26,803!
238
  SDB_GET_INT32(pRaw, dataPos, &pStb->colVer, _OVER)
26,803!
239
  SDB_GET_INT32(pRaw, dataPos, &pStb->smaVer, _OVER)
26,803!
240
  SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER)
26,803!
241
  SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[0], _OVER)
26,803!
242
  SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[1], _OVER)
26,803!
243
  SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[0], _OVER)
26,803!
244
  SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[1], _OVER)
26,803!
245
  SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, _OVER)
26,803!
246
  SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, _OVER)
26,803!
247
  SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER)
26,803!
248
  SDB_GET_INT32(pRaw, dataPos, &pStb->numOfFuncs, _OVER)
26,803!
249
  SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, _OVER)
26,803!
250
  SDB_GET_INT32(pRaw, dataPos, &pStb->ast1Len, _OVER)
26,803!
251
  SDB_GET_INT32(pRaw, dataPos, &pStb->ast2Len, _OVER)
26,803!
252

253
  pStb->pColumns = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
26,803✔
254
  pStb->pTags = taosMemoryCalloc(pStb->numOfTags, sizeof(SSchema));
26,803✔
255
  pStb->pFuncs = taosArrayInit(pStb->numOfFuncs, TSDB_FUNC_NAME_LEN);
26,803✔
256
  if (pStb->pColumns == NULL || pStb->pTags == NULL || pStb->pFuncs == NULL) {
26,803!
257
    goto _OVER;
×
258
  }
259

260
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
1,146,728✔
261
    SSchema *pSchema = &pStb->pColumns[i];
1,119,925✔
262
    SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
1,119,925!
263
    SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
1,119,925!
264
    SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
1,119,925!
265
    SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
1,119,925!
266
    SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
1,119,925!
267
  }
268

269
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
228,264✔
270
    SSchema *pSchema = &pStb->pTags[i];
201,461✔
271
    SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
201,461!
272
    SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
201,461!
273
    SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
201,461!
274
    SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
201,461!
275
    SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
201,461!
276
  }
277

278
  for (int32_t i = 0; i < pStb->numOfFuncs; ++i) {
26,813✔
279
    char funcName[TSDB_FUNC_NAME_LEN] = {0};
10✔
280
    SDB_GET_BINARY(pRaw, dataPos, funcName, TSDB_FUNC_NAME_LEN, _OVER)
10!
281
    if (taosArrayPush(pStb->pFuncs, funcName) == NULL) goto _OVER;
20!
282
  }
283

284
  if (pStb->commentLen > 0) {
26,803✔
285
    pStb->comment = taosMemoryCalloc(pStb->commentLen + 1, 1);
140✔
286
    if (pStb->comment == NULL) goto _OVER;
140!
287
    SDB_GET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen + 1, _OVER)
140!
288
  }
289

290
  if (pStb->ast1Len > 0) {
26,803✔
291
    pStb->pAst1 = taosMemoryCalloc(pStb->ast1Len, 1);
10✔
292
    if (pStb->pAst1 == NULL) goto _OVER;
10!
293
    SDB_GET_BINARY(pRaw, dataPos, pStb->pAst1, pStb->ast1Len, _OVER)
10!
294
  }
295

296
  if (pStb->ast2Len > 0) {
26,803✔
297
    pStb->pAst2 = taosMemoryCalloc(pStb->ast2Len, 1);
10✔
298
    if (pStb->pAst2 == NULL) goto _OVER;
10!
299
    SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
10!
300
  }
301

302
  pStb->pCmpr = taosMemoryCalloc(pStb->numOfColumns, sizeof(SColCmpr));
26,803✔
303
  if (sver < STB_VER_NUMBER) {
26,803!
304
    // compatible with old data, setup default compress value
305
    // impl later
306
    for (int i = 0; i < pStb->numOfColumns; i++) {
×
307
      SSchema  *pSchema = &pStb->pColumns[i];
×
308
      SColCmpr *pCmpr = &pStb->pCmpr[i];
×
309
      pCmpr->id = pSchema->colId;
×
310
      pCmpr->alg = createDefaultColCmprByType(pSchema->type);
×
311
    }
312
  } else {
313
    for (int i = 0; i < pStb->numOfColumns; i++) {
1,146,728✔
314
      SColCmpr *pCmpr = &pStb->pCmpr[i];
1,119,925✔
315
      SDB_GET_INT16(pRaw, dataPos, &pCmpr->id, _OVER)
1,119,925!
316
      SDB_GET_INT32(pRaw, dataPos, (int32_t *)&pCmpr->alg, _OVER)  // compatiable
1,119,925!
317
    }
318
  }
319

320
  SDB_GET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
26,803!
321

322
  terrno = 0;
26,803✔
323

324
_OVER:
26,803✔
325
  if (terrno != 0) {
26,803!
326
    mError("stb:%s, failed to decode from raw:%p since %s", pStb == NULL ? "null" : pStb->name, pRaw, terrstr());
×
327
    if (pStb != NULL) {
×
328
      taosMemoryFreeClear(pStb->pColumns);
×
329
      taosMemoryFreeClear(pStb->pTags);
×
330
      taosMemoryFreeClear(pStb->comment);
×
331
      taosMemoryFree(pStb->pCmpr);
×
332
    }
333
    taosMemoryFreeClear(pRow);
×
334
    return NULL;
×
335
  }
336

337
  mTrace("stb:%s, decode from raw:%p, row:%p", pStb->name, pRaw, pStb);
26,803✔
338
  return pRow;
26,803✔
339
}
340

341
void mndFreeStb(SStbObj *pStb) {
37,207✔
342
  taosArrayDestroy(pStb->pFuncs);
37,207✔
343
  taosMemoryFreeClear(pStb->pColumns);
37,207!
344
  taosMemoryFreeClear(pStb->pTags);
37,207!
345
  taosMemoryFreeClear(pStb->comment);
37,207✔
346
  taosMemoryFreeClear(pStb->pAst1);
37,207✔
347
  taosMemoryFreeClear(pStb->pAst2);
37,207✔
348
  taosMemoryFreeClear(pStb->pCmpr);
37,207!
349
}
37,207✔
350

351
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
13,397✔
352
  mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb);
13,397✔
353
  return 0;
13,397✔
354
}
355

356
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
35,740✔
357
  mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
35,740✔
358
  mndFreeStb(pStb);
35,740✔
359
  return 0;
35,740✔
360
}
361

362
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
7,566✔
363
  mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
7,566✔
364

365
  taosWLockLatch(&pOld->lock);
7,566✔
366
  int32_t numOfColumns = pOld->numOfColumns;
7,566✔
367
  if (pOld->numOfColumns < pNew->numOfColumns) {
7,566✔
368
    void *pColumns = taosMemoryMalloc(pNew->numOfColumns * sizeof(SSchema));
747✔
369
    if (pColumns != NULL) {
747!
370
      taosMemoryFree(pOld->pColumns);
747✔
371
      pOld->pColumns = pColumns;
747✔
372
    } else {
373
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
374
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
375
      taosWUnLockLatch(&pOld->lock);
×
376
    }
377
  }
378

379
  if (pOld->numOfTags < pNew->numOfTags) {
7,566✔
380
    void *pTags = taosMemoryMalloc(pNew->numOfTags * sizeof(SSchema));
417✔
381
    if (pTags != NULL) {
417!
382
      taosMemoryFree(pOld->pTags);
417✔
383
      pOld->pTags = pTags;
417✔
384
    } else {
385
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
386
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
387
      taosWUnLockLatch(&pOld->lock);
×
388
    }
389
  }
390

391
  if (pOld->commentLen < pNew->commentLen && pNew->commentLen > 0) {
7,566✔
392
    void *comment = taosMemoryMalloc(pNew->commentLen + 1);
22✔
393
    if (comment != NULL) {
22!
394
      taosMemoryFree(pOld->comment);
22✔
395
      pOld->comment = comment;
22✔
396
    } else {
397
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
398
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
399
      taosWUnLockLatch(&pOld->lock);
×
400
    }
401
  }
402
  pOld->commentLen = pNew->commentLen;
7,566✔
403

404
  if (pOld->ast1Len < pNew->ast1Len) {
7,566!
405
    void *pAst1 = taosMemoryMalloc(pNew->ast1Len + 1);
×
406
    if (pAst1 != NULL) {
×
407
      taosMemoryFree(pOld->pAst1);
×
408
      pOld->pAst1 = pAst1;
×
409
    } else {
410
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
411
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
412
      taosWUnLockLatch(&pOld->lock);
×
413
    }
414
  }
415

416
  if (pOld->ast2Len < pNew->ast2Len) {
7,566!
417
    void *pAst2 = taosMemoryMalloc(pNew->ast2Len + 1);
×
418
    if (pAst2 != NULL) {
×
419
      taosMemoryFree(pOld->pAst2);
×
420
      pOld->pAst2 = pAst2;
×
421
    } else {
422
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
423
      mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
×
424
      taosWUnLockLatch(&pOld->lock);
×
425
    }
426
  }
427

428
  pOld->updateTime = pNew->updateTime;
7,566✔
429
  pOld->tagVer = pNew->tagVer;
7,566✔
430
  pOld->colVer = pNew->colVer;
7,566✔
431
  pOld->smaVer = pNew->smaVer;
7,566✔
432
  pOld->nextColId = pNew->nextColId;
7,566✔
433
  pOld->ttl = pNew->ttl;
7,566✔
434
  if (pNew->numOfColumns > 0) {
7,566✔
435
    pOld->numOfColumns = pNew->numOfColumns;
7,518✔
436
    memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
7,518✔
437
  }
438
  if (pNew->numOfTags > 0) {
7,566✔
439
    pOld->numOfTags = pNew->numOfTags;
7,518✔
440
    memcpy(pOld->pTags, pNew->pTags, pOld->numOfTags * sizeof(SSchema));
7,518✔
441
  }
442
  if (pNew->commentLen > 0) {
7,566✔
443
    memcpy(pOld->comment, pNew->comment, pNew->commentLen + 1);
112✔
444
    pOld->commentLen = pNew->commentLen;
112✔
445
  }
446
  if (pNew->ast1Len != 0) {
7,566!
447
    memcpy(pOld->pAst1, pNew->pAst1, pNew->ast1Len);
×
448
    pOld->ast1Len = pNew->ast1Len;
×
449
  }
450
  if (pNew->ast2Len != 0) {
7,566!
451
    memcpy(pOld->pAst2, pNew->pAst2, pNew->ast2Len);
×
452
    pOld->ast2Len = pNew->ast2Len;
×
453
  }
454
  if (numOfColumns < pNew->numOfColumns) {
7,566✔
455
    taosMemoryFree(pOld->pCmpr);
747✔
456
    pOld->pCmpr = taosMemoryCalloc(pNew->numOfColumns, sizeof(SColCmpr));
747✔
457
    memcpy(pOld->pCmpr, pNew->pCmpr, pNew->numOfColumns * sizeof(SColCmpr));
747✔
458
  } else {
459
    memcpy(pOld->pCmpr, pNew->pCmpr, pNew->numOfColumns * sizeof(SColCmpr));
6,819✔
460
  }
461

462
  taosWUnLockLatch(&pOld->lock);
7,566✔
463
  return 0;
7,566✔
464
}
465

466
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) {
901,013✔
467
  SSdb    *pSdb = pMnode->pSdb;
901,013✔
468
  SStbObj *pStb = sdbAcquire(pSdb, SDB_STB, stbName);
901,013✔
469
  if (pStb == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
901,020✔
470
    terrno = TSDB_CODE_MND_STB_NOT_EXIST;
20,302✔
471
  }
472
  return pStb;
901,020✔
473
}
474

475
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb) {
891,880✔
476
  SSdb *pSdb = pMnode->pSdb;
891,880✔
477
  sdbRelease(pSdb, pStb);
891,880✔
478
}
891,881✔
479

480
SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
17,811✔
481
  SName name = {0};
17,811✔
482
  if ((terrno = tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) return NULL;
17,811!
483

484
  char db[TSDB_TABLE_FNAME_LEN] = {0};
17,811✔
485
  if ((terrno = tNameGetFullDbName(&name, db)) != 0) return NULL;
17,811!
486

487
  return mndAcquireDb(pMnode, db);
17,811✔
488
}
489

490
static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *pSchema) {
491
  if (*(col_id_t *)colId < ((SSchema *)pSchema)->colId) {
492
    return -1;
493
  } else if (*(col_id_t *)colId > ((SSchema *)pSchema)->colId) {
494
    return 1;
495
  }
496
  return 0;
497
}
498

499
void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, void *alterOriData,
31,573✔
500
                            int32_t alterOriDataLen) {
501
  SEncoder       encoder = {0};
31,573✔
502
  int32_t        contLen;
503
  SName          name = {0};
31,573✔
504
  SVCreateStbReq req = {0};
31,573✔
505

506
  if ((terrno = tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
31,573!
507
    goto _err;
×
508
  }
509
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
31,573✔
510
  if ((terrno = tNameGetFullDbName(&name, dbFName)) != 0) {
31,573!
511
    goto _err;
×
512
  };
513

514
  req.name = (char *)tNameGetTableName(&name);
31,573✔
515
  req.suid = pStb->uid;
31,573✔
516
  req.rollup = pStb->ast1Len > 0 ? 1 : 0;
31,573✔
517
  req.alterOriData = alterOriData;
31,573✔
518
  req.alterOriDataLen = alterOriDataLen;
31,573✔
519
  req.source = pStb->source;
31,573✔
520
  // todo
521
  req.schemaRow.nCols = pStb->numOfColumns;
31,573✔
522
  req.schemaRow.version = pStb->colVer;
31,573✔
523
  req.schemaRow.pSchema = pStb->pColumns;
31,573✔
524
  req.schemaTag.nCols = pStb->numOfTags;
31,573✔
525
  req.schemaTag.version = pStb->tagVer;
31,573✔
526
  req.schemaTag.pSchema = pStb->pTags;
31,573✔
527

528
  req.colCmpred = 1;
31,573✔
529
  SColCmprWrapper *pCmpr = &req.colCmpr;
31,573✔
530
  pCmpr->version = pStb->colVer;
31,573✔
531
  pCmpr->nCols = pStb->numOfColumns;
31,573✔
532

533
  req.colCmpr.pColCmpr = taosMemoryCalloc(pCmpr->nCols, sizeof(SColCmpr));
31,573✔
534
  for (int32_t i = 0; i < pStb->numOfColumns; i++) {
1,280,405✔
535
    SColCmpr *p = &pCmpr->pColCmpr[i];
1,248,832✔
536
    p->alg = pStb->pCmpr[i].alg;
1,248,832✔
537
    p->id = pStb->pCmpr[i].id;
1,248,832✔
538
  }
539

540
  if (req.rollup) {
31,573✔
541
    req.rsmaParam.maxdelay[0] = pStb->maxdelay[0];
4✔
542
    req.rsmaParam.maxdelay[1] = pStb->maxdelay[1];
4✔
543
    req.rsmaParam.watermark[0] = pStb->watermark[0];
4✔
544
    req.rsmaParam.watermark[1] = pStb->watermark[1];
4✔
545
    if (pStb->ast1Len > 0) {
4!
546
      if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
4!
547
                             STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0],
548
                             req.rsmaParam.deleteMark[0]) < 0) {
549
        goto _err;
×
550
      }
551
    }
552
    if (pStb->ast2Len > 0) {
4!
553
      if (mndConvertRsmaTask(&req.rsmaParam.qmsg[1], &req.rsmaParam.qmsgLen[1], pStb->pAst2, pStb->uid,
4!
554
                             STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[1],
555
                             req.rsmaParam.deleteMark[1]) < 0) {
556
        goto _err;
×
557
      }
558
    }
559
  }
560
  // get length
561
  int32_t ret = 0;
31,573✔
562
  tEncodeSize(tEncodeSVCreateStbReq, &req, contLen, ret);
31,573!
563
  if (ret < 0) {
31,573!
564
    goto _err;
×
565
  }
566

567
  contLen += sizeof(SMsgHead);
31,573✔
568

569
  SMsgHead *pHead = taosMemoryCalloc(1, contLen);
31,573✔
570
  if (pHead == NULL) {
31,573!
571
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
572
    goto _err;
×
573
  }
574

575
  pHead->contLen = htonl(contLen);
31,573✔
576
  pHead->vgId = htonl(pVgroup->vgId);
31,573✔
577

578
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
31,573✔
579
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
31,573✔
580
  if (tEncodeSVCreateStbReq(&encoder, &req) < 0) {
31,573!
581
    taosMemoryFreeClear(pHead);
×
582
    tEncoderClear(&encoder);
×
583
    goto _err;
×
584
  }
585
  tEncoderClear(&encoder);
31,573✔
586

587
  *pContLen = contLen;
31,573✔
588
  taosMemoryFreeClear(req.rsmaParam.qmsg[0]);
31,573✔
589
  taosMemoryFreeClear(req.rsmaParam.qmsg[1]);
31,573✔
590
  taosMemoryFreeClear(req.colCmpr.pColCmpr);
31,573!
591
  return pHead;
31,573✔
592
_err:
×
593
  taosMemoryFreeClear(req.rsmaParam.qmsg[0]);
×
594
  taosMemoryFreeClear(req.rsmaParam.qmsg[1]);
×
595
  taosMemoryFreeClear(req.colCmpr.pColCmpr);
×
596
  return NULL;
×
597
}
598

599
static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
29,327✔
600
  SName        name = {0};
29,327✔
601
  SVDropStbReq req = {0};
29,327✔
602
  int32_t      contLen = 0;
29,327✔
603
  int32_t      ret = 0;
29,327✔
604
  SMsgHead    *pHead = NULL;
29,327✔
605
  SEncoder     encoder = {0};
29,327✔
606

607
  if ((terrno = tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
29,327!
608
    return NULL;
×
609
  }
610

611
  req.name = (char *)tNameGetTableName(&name);
29,327✔
612
  req.suid = pStb->uid;
29,327✔
613

614
  tEncodeSize(tEncodeSVDropStbReq, &req, contLen, ret);
29,327!
615
  if (ret < 0) return NULL;
29,327!
616

617
  contLen += sizeof(SMsgHead);
29,327✔
618
  pHead = taosMemoryMalloc(contLen);
29,327✔
619
  if (pHead == NULL) {
29,327!
620
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
621
    return NULL;
×
622
  }
623

624
  pHead->contLen = htonl(contLen);
29,327✔
625
  pHead->vgId = htonl(pVgroup->vgId);
29,327✔
626

627
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
29,327✔
628

629
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
29,327✔
630
  terrno = tEncodeSVDropStbReq(&encoder, &req);
29,327✔
631
  tEncoderClear(&encoder);
29,327✔
632
  if (terrno != 0) return NULL;
29,327!
633

634
  *pContLen = contLen;
29,327✔
635
  return pHead;
29,327✔
636
}
637

638
int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
10,613✔
639
  int32_t code = 0;
10,613✔
640
  if (pCreate->igExists < 0 || pCreate->igExists > 1) {
10,613!
641
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
642
    TAOS_RETURN(code);
×
643
  }
644

645
  if (pCreate->numOfColumns < TSDB_MIN_COLUMNS || pCreate->numOfTags + pCreate->numOfColumns > TSDB_MAX_COLUMNS) {
10,613!
646
    code = TSDB_CODE_PAR_INVALID_COLUMNS_NUM;
×
647
    TAOS_RETURN(code);
×
648
  }
649

650
  if (pCreate->numOfTags <= 0 || pCreate->numOfTags > TSDB_MAX_TAGS) {
10,613!
651
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
652
    TAOS_RETURN(code);
×
653
  }
654

655
  SField *pField = taosArrayGet(pCreate->pColumns, 0);
10,613✔
656
  if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
10,613!
657
    code = TSDB_CODE_PAR_INVALID_FIRST_COLUMN;
×
658
    TAOS_RETURN(code);
×
659
  }
660

661
  for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
357,299✔
662
    SFieldWithOptions *pField1 = taosArrayGet(pCreate->pColumns, i);
346,686✔
663
    if (pField1->type >= TSDB_DATA_TYPE_MAX) {
346,686!
664
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
665
      TAOS_RETURN(code);
×
666
    }
667
    if (pField1->bytes <= 0) {
346,686!
668
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
669
      TAOS_RETURN(code);
×
670
    }
671
    if (pField1->name[0] == 0) {
346,686!
672
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
673
      TAOS_RETURN(code);
×
674
    }
675
  }
676

677
  for (int32_t i = 0; i < pCreate->numOfTags; ++i) {
73,830✔
678
    SField *pField1 = taosArrayGet(pCreate->pTags, i);
63,217✔
679
    if (pField1->type >= TSDB_DATA_TYPE_MAX) {
63,217!
680
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
681
      TAOS_RETURN(code);
×
682
    }
683
    if (pField1->bytes <= 0) {
63,217!
684
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
685
      TAOS_RETURN(code);
×
686
    }
687
    if (pField1->name[0] == 0) {
63,217!
688
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
689
      TAOS_RETURN(code);
×
690
    }
691
  }
692

693
  TAOS_RETURN(code);
10,613✔
694
}
695

696
static int32_t mndSetCreateStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
×
697
  int32_t  code = 0;
×
698
  SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
×
699
  if (pRedoRaw == NULL) {
×
700
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
701
    if (terrno != 0) code = terrno;
×
702
    TAOS_RETURN(code);
×
703
  }
704
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
×
705
    sdbFreeRaw(pRedoRaw);
×
706
    TAOS_RETURN(code);
×
707
  }
708
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
709

710
  TAOS_RETURN(code);
×
711
}
712

713
static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
10,369✔
714
  int32_t  code = 0;
10,369✔
715
  SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
10,369✔
716
  if (pCommitRaw == NULL) {
10,369!
717
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
718
    if (terrno != 0) code = terrno;
×
719
    TAOS_RETURN(code);
×
720
  }
721
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
10,369!
722
    sdbFreeRaw(pCommitRaw);
×
723
    TAOS_RETURN(code);
×
724
  }
725
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
10,369!
726

727
  TAOS_RETURN(code);
10,369✔
728
}
729

730
static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
10,369✔
731
  int32_t code = 0;
10,369✔
732
  SSdb   *pSdb = pMnode->pSdb;
10,369✔
733
  SVgObj *pVgroup = NULL;
10,369✔
734
  void   *pIter = NULL;
10,369✔
735
  int32_t contLen;
736

737
  while (1) {
82,623✔
738
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
92,992✔
739
    if (pIter == NULL) break;
92,992✔
740
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
82,623✔
741
      sdbRelease(pSdb, pVgroup);
56,717✔
742
      continue;
56,717✔
743
    }
744

745
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, NULL, 0);
25,906✔
746
    if (pReq == NULL) {
25,906!
747
      sdbCancelFetch(pSdb, pIter);
×
748
      sdbRelease(pSdb, pVgroup);
×
749
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
750
      if (terrno != 0) code = terrno;
×
751
      TAOS_RETURN(code);
×
752
    }
753

754
    STransAction action = {0};
25,906✔
755
    action.mTraceId = pTrans->mTraceId;
25,906✔
756
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
25,906✔
757
    action.pCont = pReq;
25,906✔
758
    action.contLen = contLen;
25,906✔
759
    action.msgType = TDMT_VND_CREATE_STB;
25,906✔
760
    action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
25,906✔
761
    action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;
25,906✔
762
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
25,906!
763
      taosMemoryFree(pReq);
×
764
      sdbCancelFetch(pSdb, pIter);
×
765
      sdbRelease(pSdb, pVgroup);
×
766
      TAOS_RETURN(code);
×
767
    }
768
    sdbRelease(pSdb, pVgroup);
25,906✔
769
  }
770

771
  TAOS_RETURN(code);
10,369✔
772
}
773

774
int32_t mndSetForceDropCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SStbObj *pStb) {
2✔
775
  int32_t code = 0;
2✔
776
  SSdb   *pSdb = pMnode->pSdb;
2✔
777
  int32_t contLen;
778

779
  void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, NULL, 0);
2✔
780
  if (pReq == NULL) {
2!
781
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
782
    if (terrno != 0) code = terrno;
×
783
    TAOS_RETURN(code);
×
784
  }
785

786
  STransAction action = {0};
2✔
787
  action.mTraceId = pTrans->mTraceId;
2✔
788
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
2✔
789
  action.pCont = pReq;
2✔
790
  action.contLen = contLen;
2✔
791
  action.msgType = TDMT_VND_CREATE_STB;
2✔
792
  action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
2✔
793
  action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;
2✔
794
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2!
795
    taosMemoryFree(pReq);
×
796
    TAOS_RETURN(code);
×
797
  }
798

799
  TAOS_RETURN(code);
2✔
800
}
801

802
static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
10,369✔
803
  int32_t code = 0;
10,369✔
804
  SSdb   *pSdb = pMnode->pSdb;
10,369✔
805
  SVgObj *pVgroup = NULL;
10,369✔
806
  void   *pIter = NULL;
10,369✔
807

808
  while (1) {
82,623✔
809
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
92,992✔
810
    if (pIter == NULL) break;
92,992✔
811
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
82,623✔
812
      sdbRelease(pSdb, pVgroup);
56,717✔
813
      continue;
56,717✔
814
    }
815

816
    int32_t contLen = 0;
25,906✔
817
    void   *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
25,906✔
818
    if (pReq == NULL) {
25,906!
819
      sdbCancelFetch(pSdb, pIter);
×
820
      sdbRelease(pSdb, pVgroup);
×
821
      code = TSDB_CODE_OUT_OF_MEMORY;
×
822
      TAOS_RETURN(code);
×
823
    }
824

825
    STransAction action = {0};
25,906✔
826
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
25,906✔
827
    action.pCont = pReq;
25,906✔
828
    action.contLen = contLen;
25,906✔
829
    action.msgType = TDMT_VND_DROP_STB;
25,906✔
830
    action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
25,906✔
831
    if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
25,906!
832
      taosMemoryFree(pReq);
×
833
      sdbCancelFetch(pSdb, pIter);
×
834
      sdbRelease(pSdb, pVgroup);
×
835
      TAOS_RETURN(code);
×
836
    }
837
    sdbRelease(pSdb, pVgroup);
25,906✔
838
  }
839

840
  TAOS_RETURN(code);
10,369✔
841
}
842

843
static SSchema *mndFindStbColumns(const SStbObj *pStb, const char *colName) {
×
844
  for (int32_t col = 0; col < pStb->numOfColumns; ++col) {
×
845
    SSchema *pSchema = &pStb->pColumns[col];
×
846
    if (strncasecmp(pSchema->name, colName, TSDB_COL_NAME_LEN) == 0) {
×
847
      return pSchema;
×
848
    }
849
  }
850
  return NULL;
×
851
}
852

853
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb) {
10,407✔
854
  int32_t code = 0;
10,407✔
855
  memcpy(pDst->name, pCreate->name, TSDB_TABLE_FNAME_LEN);
10,407✔
856
  memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
10,407✔
857
  pDst->createdTime = taosGetTimestampMs();
10,407✔
858
  pDst->updateTime = pDst->createdTime;
10,407✔
859
  pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX)
10,407✔
860
                  ? pCreate->suid
861
                  : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
20,814!
862
  pDst->dbUid = pDb->uid;
10,407✔
863
  pDst->tagVer = 1;
10,407✔
864
  pDst->colVer = 1;
10,407✔
865
  pDst->smaVer = 1;
10,407✔
866
  pDst->nextColId = 1;
10,407✔
867
  pDst->maxdelay[0] = pCreate->delay1;
10,407✔
868
  pDst->maxdelay[1] = pCreate->delay2;
10,407✔
869
  pDst->watermark[0] = pCreate->watermark1;
10,407✔
870
  pDst->watermark[1] = pCreate->watermark2;
10,407✔
871
  pDst->ttl = pCreate->ttl;
10,407✔
872
  pDst->numOfColumns = pCreate->numOfColumns;
10,407✔
873
  pDst->numOfTags = pCreate->numOfTags;
10,407✔
874
  pDst->numOfFuncs = pCreate->numOfFuncs;
10,407✔
875
  pDst->commentLen = pCreate->commentLen;
10,407✔
876
  pDst->pFuncs = pCreate->pFuncs;
10,407✔
877
  pDst->source = pCreate->source;
10,407✔
878
  pCreate->pFuncs = NULL;
10,407✔
879

880
  if (pDst->commentLen > 0) {
10,407✔
881
    pDst->comment = taosMemoryCalloc(pDst->commentLen + 1, 1);
18✔
882
    if (pDst->comment == NULL) {
18!
883
      code = terrno;
×
884
      TAOS_RETURN(code);
×
885
    }
886
    memcpy(pDst->comment, pCreate->pComment, pDst->commentLen + 1);
18✔
887
  }
888

889
  pDst->ast1Len = pCreate->ast1Len;
10,407✔
890
  if (pDst->ast1Len > 0) {
10,407✔
891
    pDst->pAst1 = taosMemoryCalloc(pDst->ast1Len, 1);
3✔
892
    if (pDst->pAst1 == NULL) {
3!
893
      code = terrno;
×
894
      TAOS_RETURN(code);
×
895
    }
896
    memcpy(pDst->pAst1, pCreate->pAst1, pDst->ast1Len);
3✔
897
  }
898

899
  pDst->ast2Len = pCreate->ast2Len;
10,407✔
900
  if (pDst->ast2Len > 0) {
10,407✔
901
    pDst->pAst2 = taosMemoryCalloc(pDst->ast2Len, 1);
3✔
902
    if (pDst->pAst2 == NULL) {
3!
903
      code = terrno;
×
904
      TAOS_RETURN(code);
×
905
    }
906
    memcpy(pDst->pAst2, pCreate->pAst2, pDst->ast2Len);
3✔
907
  }
908

909
  pDst->pColumns = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SSchema));
10,407✔
910
  pDst->pTags = taosMemoryCalloc(1, pDst->numOfTags * sizeof(SSchema));
10,407✔
911
  if (pDst->pColumns == NULL || pDst->pTags == NULL) {
10,407!
912
    code = terrno;
×
913
    TAOS_RETURN(code);
×
914
  }
915

916
  if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) {
10,407!
917
    code = TSDB_CODE_OUT_OF_RANGE;
×
918
    TAOS_RETURN(code);
×
919
  }
920

921
  for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
355,794✔
922
    SFieldWithOptions *pField = taosArrayGet(pCreate->pColumns, i);
345,387✔
923
    SSchema           *pSchema = &pDst->pColumns[i];
345,387✔
924
    pSchema->type = pField->type;
345,387✔
925
    pSchema->bytes = pField->bytes;
345,387✔
926
    pSchema->flags = pField->flags;
345,387✔
927
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
345,387✔
928
    pSchema->colId = pDst->nextColId;
345,387✔
929
    pDst->nextColId++;
345,387✔
930
  }
931

932
  for (int32_t i = 0; i < pDst->numOfTags; ++i) {
72,409✔
933
    SField  *pField = taosArrayGet(pCreate->pTags, i);
62,002✔
934
    SSchema *pSchema = &pDst->pTags[i];
62,002✔
935
    pSchema->type = pField->type;
62,002✔
936
    pSchema->bytes = pField->bytes;
62,002✔
937
    if (i == 0) {
62,002✔
938
      SSCHMEA_SET_IDX_ON(pSchema);
10,407✔
939
    }
940
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
62,002✔
941
    pSchema->colId = pDst->nextColId;
62,002✔
942
    pDst->nextColId++;
62,002✔
943
  }
944
  // set col compress
945
  pDst->pCmpr = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SCmprObj));
10,407✔
946
  for (int32_t i = 0; i < pDst->numOfColumns; i++) {
355,794✔
947
    SFieldWithOptions *pField = taosArrayGet(pCreate->pColumns, i);
345,387✔
948
    SSchema           *pSchema = &pDst->pColumns[i];
345,387✔
949

950
    SColCmpr *pColCmpr = &pDst->pCmpr[i];
345,387✔
951
    pColCmpr->id = pSchema->colId;
345,387✔
952
    pColCmpr->alg = pField->compress;
345,387✔
953
  }
954
  TAOS_RETURN(code);
10,407✔
955
}
956
static int32_t mndGenIdxNameForFirstTag(char *fullname, char *dbname, char *stbname, char *tagname) {
8,940✔
957
  SName name = {0};
8,940✔
958
  if ((terrno = tNameFromString(&name, stbname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
8,940!
959
    return -1;
×
960
  }
961
  return snprintf(fullname, TSDB_INDEX_FNAME_LEN, "%s.%s_%s", dbname, tagname, tNameGetTableName(&name));
8,940✔
962
}
963

964
static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
8,940✔
965
  SStbObj stbObj = {0};
8,940✔
966
  int32_t code = -1;
8,940✔
967

968
  char fullIdxName[TSDB_INDEX_FNAME_LEN * 2] = {0};
8,940✔
969

970
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stb");
8,940✔
971
  if (pTrans == NULL) {
8,940!
972
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
973
    if (terrno != 0) code = terrno;
×
974
    goto _OVER;
×
975
  }
976

977
  mInfo("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
8,940!
978
  TAOS_CHECK_GOTO(mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb), NULL, _OVER);
8,940!
979

980
  SSchema *pSchema = &(stbObj.pTags[0]);
8,940✔
981
  if (mndGenIdxNameForFirstTag(fullIdxName, pDb->name, stbObj.name, pSchema->name) < 0) {
8,940!
982
    goto _OVER;
×
983
  }
984
  SSIdx idx = {0};
8,940✔
985
  if (mndAcquireGlobalIdx(pMnode, fullIdxName, SDB_IDX, &idx) == 0 && idx.pIdx != NULL) {
8,940!
986
    code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
×
987
    mndReleaseIdx(pMnode, idx.pIdx);
×
988
    goto _OVER;
×
989
  }
990

991
  SIdxObj idxObj = {0};
8,940✔
992
  memcpy(idxObj.name, fullIdxName, TSDB_INDEX_FNAME_LEN);
8,940✔
993
  memcpy(idxObj.stb, stbObj.name, TSDB_TABLE_FNAME_LEN);
8,940✔
994
  memcpy(idxObj.db, stbObj.db, TSDB_DB_FNAME_LEN);
8,940✔
995
  memcpy(idxObj.colName, pSchema->name, TSDB_COL_NAME_LEN);
8,940✔
996
  idxObj.createdTime = taosGetTimestampMs();
8,940✔
997
  idxObj.uid = mndGenerateUid(fullIdxName, strlen(fullIdxName));
8,940✔
998
  idxObj.stbUid = stbObj.uid;
8,940✔
999
  idxObj.dbUid = stbObj.dbUid;
8,940✔
1000

1001
  TAOS_CHECK_GOTO(mndSetCreateIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
8,940!
1002

1003
  TAOS_CHECK_GOTO(mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj), NULL, _OVER);
8,940✔
1004
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
8,902✔
1005
  code = 0;
8,899✔
1006

1007
_OVER:
8,940✔
1008
  mndTransDrop(pTrans);
8,940✔
1009
  if (mndStbActionDelete(pMnode->pSdb, &stbObj) != 0) mError("failed to mndStbActionDelete");
8,940!
1010
  TAOS_RETURN(code);
8,940✔
1011
}
1012

1013
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
10,407✔
1014
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
10,407✔
1015
  TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
10,407✔
1016
  TAOS_CHECK_RETURN(mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
10,369!
1017
  TAOS_CHECK_RETURN(mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
10,369!
1018
  TAOS_CHECK_RETURN(mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
10,369!
1019
  return 0;
10,369✔
1020
}
1021

1022
static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
6,014✔
1023
  SMnode           *pMnode = pReq->info.node;
6,014✔
1024
  SSdb             *pSdb = pMnode->pSdb;
6,014✔
1025
  SVgObj           *pVgroup = NULL;
6,014✔
1026
  void             *pIter = NULL;
6,014✔
1027
  SVDropTtlTableReq ttlReq = {
6,014✔
1028
      .timestampSec = taosGetTimestampSec(), .ttlDropMaxCount = tsTtlBatchDropNum, .nUids = 0, .pTbUids = NULL};
6,014✔
1029
  int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
6,014✔
1030
  int32_t contLen = reqLen + sizeof(SMsgHead);
6,014✔
1031

1032
  mDebug("start to process ttl timer");
6,014✔
1033

1034
  while (1) {
93,941✔
1035
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
99,955✔
1036
    if (pIter == NULL) break;
99,955✔
1037

1038
    int32_t   code = 0;
93,941✔
1039
    SMsgHead *pHead = rpcMallocCont(contLen);
93,941✔
1040
    if (pHead == NULL) {
93,941!
1041
      sdbRelease(pSdb, pVgroup);
×
1042
      continue;
×
1043
    }
1044
    pHead->contLen = htonl(contLen);
93,941✔
1045
    pHead->vgId = htonl(pVgroup->vgId);
93,941✔
1046
    if ((code = tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), reqLen, &ttlReq)) < 0) {
93,941!
1047
      mError("vgId:%d, failed to serialize drop ttl table request since %s", pVgroup->vgId, tstrerror(code));
×
1048
      sdbRelease(pSdb, pVgroup);
×
1049
      continue;
×
1050
    }
1051

1052
    SRpcMsg rpcMsg = {
93,941✔
1053
        .msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
1054
    SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
93,941✔
1055
    code = tmsgSendReq(&epSet, &rpcMsg);
93,941✔
1056
    if (code != 0) {
93,941✔
1057
      mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
28!
1058
    } else {
1059
      mDebug("vgId:%d, send drop ttl table request to vnode, time:%" PRId32, pVgroup->vgId, ttlReq.timestampSec);
93,913✔
1060
    }
1061
    sdbRelease(pSdb, pVgroup);
93,941✔
1062
  }
1063

1064
  return 0;
6,014✔
1065
}
1066

1067
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq) {
3✔
1068
  SMnode     *pMnode = pReq->info.node;
3✔
1069
  SSdb       *pSdb = pMnode->pSdb;
3✔
1070
  SVgObj     *pVgroup = NULL;
3✔
1071
  void       *pIter = NULL;
3✔
1072
  SVTrimDbReq trimReq = {.timestamp = taosGetTimestampSec()};
3✔
1073
  int32_t     reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq);
3✔
1074
  int32_t     contLen = reqLen + sizeof(SMsgHead);
3✔
1075

1076
  while (1) {
171✔
1077
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
174✔
1078
    if (pIter == NULL) break;
174✔
1079

1080
    int32_t code = 0;
171✔
1081

1082
    SMsgHead *pHead = rpcMallocCont(contLen);
171✔
1083
    if (pHead == NULL) {
171!
1084
      sdbCancelFetch(pSdb, pVgroup);
×
1085
      sdbRelease(pSdb, pVgroup);
×
1086
      continue;
×
1087
    }
1088
    pHead->contLen = htonl(contLen);
171✔
1089
    pHead->vgId = htonl(pVgroup->vgId);
171✔
1090
    if ((code = tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), reqLen, &trimReq)) < 0) {
171!
1091
      mError("vgId:%d, failed to serialize trim db request since %s", pVgroup->vgId, tstrerror(code));
×
1092
    }
1093

1094
    SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen};
171✔
1095
    SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
171✔
1096
    code = tmsgSendReq(&epSet, &rpcMsg);
171✔
1097
    if (code != 0) {
171!
1098
      mError("vgId:%d, timer failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code);
×
1099
    } else {
1100
      mInfo("vgId:%d, timer send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp);
171!
1101
    }
1102
    sdbRelease(pSdb, pVgroup);
171✔
1103
  }
1104

1105
  return 0;
3✔
1106
}
1107

1108
static int32_t mndProcessS3MigrateDbTimer(SRpcMsg *pReq) {
×
1109
  SMnode          *pMnode = pReq->info.node;
×
1110
  SSdb            *pSdb = pMnode->pSdb;
×
1111
  SVgObj          *pVgroup = NULL;
×
1112
  void            *pIter = NULL;
×
1113
  SVS3MigrateDbReq s3migrateReq = {.timestamp = taosGetTimestampSec()};
×
1114
  int32_t          reqLen = tSerializeSVS3MigrateDbReq(NULL, 0, &s3migrateReq);
×
1115
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
1116

1117
  while (1) {
×
1118
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
1119
    if (pIter == NULL) break;
×
1120

1121
    int32_t code = 0;
×
1122

1123
    SMsgHead *pHead = rpcMallocCont(contLen);
×
1124
    if (pHead == NULL) {
×
1125
      sdbRelease(pSdb, pVgroup);
×
1126
      continue;
×
1127
    }
1128
    pHead->contLen = htonl(contLen);
×
1129
    pHead->vgId = htonl(pVgroup->vgId);
×
1130
    if ((code = tSerializeSVS3MigrateDbReq((char *)pHead + sizeof(SMsgHead), reqLen, &s3migrateReq)) < 0) {
×
1131
      mError("vgId:%d, failed to serialize s3migrate db request since %s", pVgroup->vgId, tstrerror(code));
×
1132
      sdbRelease(pSdb, pVgroup);
×
1133
      continue;
×
1134
    }
1135

1136
    SRpcMsg rpcMsg = {.msgType = TDMT_VND_S3MIGRATE, .pCont = pHead, .contLen = contLen};
×
1137
    SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
1138
    code = tmsgSendReq(&epSet, &rpcMsg);
×
1139
    if (code != 0) {
×
1140
      mError("vgId:%d, timer failed to send vnode-s3migrate request to vnode since 0x%x", pVgroup->vgId, code);
×
1141
    } else {
1142
      mInfo("vgId:%d, timer send vnode-s3migrate request to vnode, time:%d", pVgroup->vgId, s3migrateReq.timestamp);
×
1143
    }
1144
    sdbRelease(pSdb, pVgroup);
×
1145
  }
1146

1147
  return 0;
×
1148
}
1149

1150
static int32_t mndFindSuperTableTagIndex(const SStbObj *pStb, const char *tagName) {
2,616✔
1151
  for (int32_t tag = 0; tag < pStb->numOfTags; tag++) {
18,366✔
1152
    if (strcmp(pStb->pTags[tag].name, tagName) == 0) {
17,145✔
1153
      return tag;
1,395✔
1154
    }
1155
  }
1156

1157
  return -1;
1,221✔
1158
}
1159

1160
static int32_t mndFindSuperTableColumnIndex(const SStbObj *pStb, const char *colName) {
3,482✔
1161
  for (int32_t col = 0; col < pStb->numOfColumns; col++) {
202,563✔
1162
    if (strcmp(pStb->pColumns[col].name, colName) == 0) {
201,221✔
1163
      return col;
2,140✔
1164
    }
1165
  }
1166

1167
  return -1;
1,342✔
1168
}
1169

1170
static bool mndValidateSchema(SSchema *pSchemas, int32_t nSchema, SArray *pFields, int32_t maxLen) {
1,758✔
1171
  int32_t rowLen = 0;
1,758✔
1172
  for (int32_t i = 0; i < nSchema; ++i) {
78,349✔
1173
    rowLen += (pSchemas + i)->bytes;
76,591✔
1174
  }
1175

1176
  int32_t nField = taosArrayGetSize(pFields);
1,758✔
1177
  for (int32_t i = 0; i < nField; ++i) {
3,516✔
1178
    rowLen += ((SField *)TARRAY_GET_ELEM(pFields, i))->bytes;
1,758✔
1179
  }
1180

1181
  return rowLen <= maxLen;
1,758✔
1182
}
1183

1184
static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq *createReq) {
99✔
1185
  int32_t code = 0;
99✔
1186
  taosRLockLatch(&pStb->lock);
99✔
1187
  memcpy(pDst, pStb, sizeof(SStbObj));
99✔
1188
  taosRUnLockLatch(&pStb->lock);
99✔
1189

1190
  pDst->source = createReq->source;
99✔
1191
  pDst->updateTime = taosGetTimestampMs();
99✔
1192
  pDst->numOfColumns = createReq->numOfColumns;
99✔
1193
  pDst->numOfTags = createReq->numOfTags;
99✔
1194
  pDst->pColumns = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SSchema));
99✔
1195
  pDst->pTags = taosMemoryCalloc(1, pDst->numOfTags * sizeof(SSchema));
99✔
1196
  pDst->pCmpr = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SColCmpr));
99✔
1197

1198
  if (pDst->pColumns == NULL || pDst->pTags == NULL || pDst->pCmpr == NULL) {
99!
1199
    code = terrno;
×
1200
    TAOS_RETURN(code);
×
1201
  }
1202

1203
  if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) {
99!
1204
    code = TSDB_CODE_OUT_OF_RANGE;
×
1205
    TAOS_RETURN(code);
×
1206
  }
1207

1208
  for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
720✔
1209
    SFieldWithOptions *pField = taosArrayGet(createReq->pColumns, i);
621✔
1210
    SSchema           *pSchema = &pDst->pColumns[i];
621✔
1211
    pSchema->type = pField->type;
621✔
1212
    pSchema->bytes = pField->bytes;
621✔
1213
    pSchema->flags = pField->flags;
621✔
1214
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
621✔
1215
    int32_t cIndex = mndFindSuperTableColumnIndex(pStb, pField->name);
621✔
1216
    if (cIndex >= 0) {
621✔
1217
      pSchema->colId = pStb->pColumns[cIndex].colId;
548✔
1218
    } else {
1219
      pSchema->colId = pDst->nextColId++;
73✔
1220
    }
1221
  }
1222

1223
  for (int32_t i = 0; i < pDst->numOfTags; ++i) {
661✔
1224
    SField  *pField = taosArrayGet(createReq->pTags, i);
562✔
1225
    SSchema *pSchema = &pDst->pTags[i];
562✔
1226
    pSchema->type = pField->type;
562✔
1227
    pSchema->bytes = pField->bytes;
562✔
1228
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
562✔
1229
    int32_t cIndex = mndFindSuperTableTagIndex(pStb, pField->name);
562✔
1230
    if (cIndex >= 0) {
562✔
1231
      pSchema->colId = pStb->pTags[cIndex].colId;
500✔
1232
    } else {
1233
      pSchema->colId = pDst->nextColId++;
62✔
1234
    }
1235
  }
1236
  for (int32_t i = 0; i < pDst->numOfColumns; i++) {
720✔
1237
    SColCmpr          *p = pDst->pCmpr + i;
621✔
1238
    SFieldWithOptions *pField = taosArrayGet(createReq->pColumns, i);
621✔
1239
    SSchema           *pSchema = &pDst->pColumns[i];
621✔
1240
    p->id = pSchema->colId;
621✔
1241
    if (pField->compress == 0) {
621!
1242
      p->alg = createDefaultColCmprByType(pSchema->type);
×
1243
    } else {
1244
      p->alg = pField->compress;
621✔
1245
    }
1246
  }
1247
  pDst->tagVer = createReq->tagVer;
99✔
1248
  pDst->colVer = createReq->colVer;
99✔
1249
  return TSDB_CODE_SUCCESS;
99✔
1250
}
1251

1252
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
9,146✔
1253
  SMnode        *pMnode = pReq->info.node;
9,146✔
1254
  int32_t        code = -1;
9,146✔
1255
  SStbObj       *pStb = NULL;
9,146✔
1256
  SDbObj        *pDb = NULL;
9,146✔
1257
  SMCreateStbReq createReq = {0};
9,146✔
1258
  bool           isAlter = false;
9,146✔
1259

1260
  if (tDeserializeSMCreateStbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
9,146!
1261
    code = TSDB_CODE_INVALID_MSG;
×
1262
    goto _OVER;
×
1263
  }
1264

1265
  mInfo("stb:%s, start to create", createReq.name);
9,146!
1266
  if (mndCheckCreateStbReq(&createReq) != 0) {
9,146!
1267
    code = TSDB_CODE_INVALID_MSG;
×
1268
    goto _OVER;
×
1269
  }
1270

1271
  pStb = mndAcquireStb(pMnode, createReq.name);
9,146✔
1272
  if (pStb != NULL) {
9,146✔
1273
    if (createReq.igExists) {
203✔
1274
      if (createReq.source == TD_REQ_FROM_APP) {
201✔
1275
        mInfo("stb:%s, already exist, ignore exist is set", createReq.name);
10!
1276
        code = 0;
10✔
1277
        goto _OVER;
10✔
1278
      } else if (pStb->uid != createReq.suid) {
191!
1279
        mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
×
1280
        code = 0;
×
1281
        goto _OVER;
×
1282
      } else if (createReq.tagVer > 0 || createReq.colVer > 0) {
290!
1283
        int32_t tagDelta = createReq.tagVer - pStb->tagVer;
191✔
1284
        int32_t colDelta = createReq.colVer - pStb->colVer;
191✔
1285
        mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d",
191!
1286
              createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
1287
        if (tagDelta <= 0 && colDelta <= 0) {
191✔
1288
          mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name);
92!
1289
          code = 0;
92✔
1290
          goto _OVER;
92✔
1291
        } else if ((tagDelta == 1 && colDelta == 0) || (tagDelta == 0 && colDelta == 1) ||
99!
1292
                   (pStb->colVer == 1 && createReq.colVer > 1) || (pStb->tagVer == 1 && createReq.tagVer > 1)) {
×
1293
          isAlter = true;
99✔
1294
          mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
99!
1295
        } else {
1296
          mError("stb:%s, schema version increase more than 1 number, error is returned", createReq.name);
×
1297
          code = TSDB_CODE_MND_INVALID_SCHEMA_VER;
×
1298
          goto _OVER;
×
1299
        }
1300
      } else {
1301
        mError("stb:%s, already exist while create, input tagVer:%d colVer:%d is invalid, origin tagVer:%d colVer:%d",
×
1302
               createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
1303
        code = TSDB_CODE_MND_INVALID_SCHEMA_VER;
×
1304
        goto _OVER;
×
1305
      }
1306
    } else {
1307
      code = TSDB_CODE_MND_STB_ALREADY_EXIST;
2✔
1308
      goto _OVER;
2✔
1309
    }
1310
  } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
8,943!
1311
    goto _OVER;
×
1312
  } else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) &&
8,943!
1313
             (createReq.tagVer != 1 || createReq.colVer != 1)) {
50✔
1314
    mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
3!
1315
    code = 0;
3✔
1316
    goto _OVER;
3✔
1317
  }
1318

1319
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
9,039✔
1320
  if (pDb == NULL) {
9,039!
1321
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1322
    goto _OVER;
×
1323
  }
1324

1325
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
9,039!
1326
    goto _OVER;
×
1327
  }
1328

1329
  int32_t numOfStbs = -1;
9,039✔
1330
  if ((code = mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs)) != 0) {
9,039!
1331
    goto _OVER;
×
1332
  }
1333

1334
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
9,039!
1335
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
1336
    goto _OVER;
×
1337
  }
1338

1339
  if ((code = grantCheck(TSDB_GRANT_STABLE)) < 0) {
9,039!
1340
    goto _OVER;
×
1341
  }
1342

1343
  if (isAlter) {
9,039✔
1344
    bool    needRsp = false;
99✔
1345
    SStbObj pDst = {0};
99✔
1346
    if ((code = mndBuildStbFromAlter(pStb, &pDst, &createReq)) != 0) {
99!
1347
      taosMemoryFreeClear(pDst.pTags);
×
1348
      taosMemoryFreeClear(pDst.pColumns);
×
1349
      taosMemoryFreeClear(pDst.pCmpr);
×
1350
      goto _OVER;
×
1351
    }
1352

1353
    code = mndAlterStbImp(pMnode, pReq, pDb, &pDst, needRsp, NULL, 0);
99✔
1354
    taosMemoryFreeClear(pDst.pTags);
99!
1355
    taosMemoryFreeClear(pDst.pColumns);
99!
1356
    taosMemoryFreeClear(pDst.pCmpr);
99!
1357
  } else {
1358
    code = mndCreateStb(pMnode, pReq, &createReq, pDb);
8,940✔
1359
  }
1360
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
9,039✔
1361

1362
  SName name = {0};
9,039✔
1363
  TAOS_CHECK_RETURN(tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
9,039!
1364

1365
  if (createReq.sql == NULL && createReq.sqlLen == 0) {
10,491!
1366
    char detail[1000] = {0};
1,452✔
1367

1368
    sprintf(detail, "dbname:%s, stable name:%s", name.dbname, name.tname);
1,452✔
1369

1370
    auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, detail, strlen(detail));
1,452✔
1371
  } else {
1372
    auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, createReq.sql, createReq.sqlLen);
7,587✔
1373
  }
1374
_OVER:
9,146✔
1375
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
9,146✔
1376
    mError("stb:%s, failed to create since %s", createReq.name, tstrerror(code));
43!
1377
  }
1378

1379
  mndReleaseStb(pMnode, pStb);
9,146✔
1380
  mndReleaseDb(pMnode, pDb);
9,146✔
1381
  tFreeSMCreateStbReq(&createReq);
9,146✔
1382

1383
  TAOS_RETURN(code);
9,146✔
1384
}
1385

1386
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
3,693✔
1387
  int32_t code = 0;
3,693✔
1388
  if (pAlter->commentLen >= 0) return 0;
3,693!
1389
  if (pAlter->ttl != 0) return 0;
×
1390

1391
  if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
×
1392
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1393
    TAOS_RETURN(code);
×
1394
  }
1395

1396
  for (int32_t i = 0; i < pAlter->numOfFields; ++i) {
×
1397
    SField *pField = taosArrayGet(pAlter->pFields, i);
×
1398
    if (pField->name[0] == 0) {
×
1399
      code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1400
      TAOS_RETURN(code);
×
1401
    }
1402
  }
1403

1404
  TAOS_RETURN(code);
×
1405
}
1406

1407
int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
4,655✔
1408
  pNew->pTags = taosMemoryCalloc(pNew->numOfTags, sizeof(SSchema));
4,655✔
1409
  pNew->pColumns = taosMemoryCalloc(pNew->numOfColumns, sizeof(SSchema));
4,655✔
1410
  pNew->pCmpr = taosMemoryCalloc(pNew->numOfColumns, sizeof(SColCmpr));
4,655✔
1411
  if (pNew->pTags == NULL || pNew->pColumns == NULL || pNew->pCmpr == NULL) {
4,655!
1412
    TAOS_RETURN(terrno);
×
1413
  }
1414

1415
  memcpy(pNew->pColumns, pOld->pColumns, sizeof(SSchema) * pOld->numOfColumns);
4,655✔
1416
  memcpy(pNew->pTags, pOld->pTags, sizeof(SSchema) * pOld->numOfTags);
4,655✔
1417
  memcpy(pNew->pCmpr, pOld->pCmpr, sizeof(SColCmpr) * pOld->numOfColumns);
4,655✔
1418

1419
  TAOS_RETURN(0);
4,655✔
1420
}
1421

1422
static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen,
33✔
1423
                                         int32_t ttl) {
1424
  int32_t code = 0;
33✔
1425
  if (commentLen > 0) {
33✔
1426
    pNew->commentLen = commentLen;
22✔
1427
    pNew->comment = taosMemoryCalloc(1, commentLen + 1);
22✔
1428
    if (pNew->comment == NULL) {
22!
1429
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1430
      return -1;
×
1431
    }
1432
    memcpy(pNew->comment, pComment, commentLen + 1);
22✔
1433
  } else if (commentLen == 0) {
11!
1434
    pNew->commentLen = 0;
11✔
1435
  } else {
1436
  }
1437

1438
  if (ttl >= 0) {
33!
1439
    pNew->ttl = ttl;
33✔
1440
  }
1441

1442
  if ((code = mndAllocStbSchemas(pOld, pNew)) != 0) {
33!
1443
    TAOS_RETURN(code);
×
1444
  }
1445
  TAOS_RETURN(code);
33✔
1446
}
1447

1448
static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ntags) {
528✔
1449
  int32_t code = 0;
528✔
1450
  if (pOld->numOfTags + ntags > TSDB_MAX_TAGS) {
528!
1451
    code = TSDB_CODE_MND_TOO_MANY_TAGS;
×
1452
    TAOS_RETURN(code);
×
1453
  }
1454

1455
  if (pOld->numOfColumns + ntags + pOld->numOfTags > TSDB_MAX_COLUMNS) {
528!
1456
    code = TSDB_CODE_MND_TOO_MANY_COLUMNS;
×
1457
    TAOS_RETURN(code);
×
1458
  }
1459

1460
  if (!mndValidateSchema(pOld->pTags, pOld->numOfTags, pFields, TSDB_MAX_TAGS_LEN)) {
528✔
1461
    code = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
40✔
1462
    TAOS_RETURN(code);
40✔
1463
  }
1464

1465
  pNew->numOfTags = pNew->numOfTags + ntags;
488✔
1466
  if ((code = mndAllocStbSchemas(pOld, pNew)) != 0) {
488!
1467
    TAOS_RETURN(code);
×
1468
  }
1469

1470
  if (pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags) {
488!
1471
    code = TSDB_CODE_OUT_OF_RANGE;
×
1472
    TAOS_RETURN(code);
×
1473
  }
1474

1475
  for (int32_t i = 0; i < ntags; i++) {
864✔
1476
    SField *pField = taosArrayGet(pFields, i);
488✔
1477
    if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
488✔
1478
      code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
5✔
1479
      TAOS_RETURN(code);
5✔
1480
    }
1481

1482
    if (mndFindSuperTableTagIndex(pOld, pField->name) >= 0) {
483✔
1483
      code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
107✔
1484
      TAOS_RETURN(code);
107✔
1485
    }
1486

1487
    SSchema *pSchema = &pNew->pTags[pOld->numOfTags + i];
376✔
1488
    pSchema->bytes = pField->bytes;
376✔
1489
    pSchema->type = pField->type;
376✔
1490
    memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
376✔
1491
    pSchema->colId = pNew->nextColId;
376✔
1492
    pNew->nextColId++;
376✔
1493

1494
    mInfo("stb:%s, start to add tag %s", pNew->name, pSchema->name);
376!
1495
  }
1496

1497
  pNew->tagVer++;
376✔
1498
  TAOS_RETURN(code);
376✔
1499
}
1500

1501
static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
3,789✔
1502
  int32_t code = 0;
3,789✔
1503
  SSdb   *pSdb = pMnode->pSdb;
3,789✔
1504
  void   *pIter = NULL;
3,789✔
1505
  while (1) {
1,017✔
1506
    SMqTopicObj *pTopic = NULL;
4,806✔
1507
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
4,806✔
1508
    if (pIter == NULL) break;
4,806✔
1509

1510
    mInfo("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
1,095!
1511
          pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql);
1512
    if (pTopic->ast == NULL) {
1,095✔
1513
      sdbRelease(pSdb, pTopic);
103✔
1514
      continue;
103✔
1515
    }
1516

1517
    SNode *pAst = NULL;
992✔
1518
    if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
992!
1519
      code = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
×
1520
      mError("topic:%s, create ast error", pTopic->name);
×
1521
      sdbRelease(pSdb, pTopic);
×
1522
      sdbCancelFetch(pSdb, pIter);
×
1523
      TAOS_RETURN(code);
78✔
1524
    }
1525

1526
    SNodeList *pNodeList = NULL;
992✔
1527
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
992!
1528
        0) {
1529
      sdbRelease(pSdb, pTopic);
×
1530
      sdbCancelFetch(pSdb, pIter);
×
1531
      TAOS_RETURN(code);
×
1532
    }
1533
    SNode *pNode = NULL;
992✔
1534
    FOREACH(pNode, pNodeList) {
1,633✔
1535
      SColumnNode *pCol = (SColumnNode *)pNode;
1,556✔
1536
      mInfo("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId, pCol->tableId,
1,556!
1537
            pTopic->ctbStbUid);
1538

1539
      if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
1,556✔
1540
        mInfo("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
837!
1541
        goto NEXT;
837✔
1542
      }
1543
      if (pCol->colId > 0 && pCol->colId == colId) {
719!
1544
        code = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
78✔
1545
        mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
78!
1546
        nodesDestroyNode(pAst);
78✔
1547
        nodesDestroyList(pNodeList);
78✔
1548
        sdbCancelFetch(pSdb, pIter);
78✔
1549
        sdbRelease(pSdb, pTopic);
78✔
1550
        TAOS_RETURN(code);
78✔
1551
      }
1552
      mInfo("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
641!
1553
    }
1554

1555
  NEXT:
77✔
1556
    sdbRelease(pSdb, pTopic);
914✔
1557
    nodesDestroyNode(pAst);
914✔
1558
    nodesDestroyList(pNodeList);
914✔
1559
  }
1560
  TAOS_RETURN(code);
3,711✔
1561
}
1562

1563
static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
3,711✔
1564
  int32_t code = 0;
3,711✔
1565
  SSdb   *pSdb = pMnode->pSdb;
3,711✔
1566
  void   *pIter = NULL;
3,711✔
1567
  while (1) {
5,749✔
1568
    SStreamObj *pStream = NULL;
9,460✔
1569
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
9,460✔
1570
    if (pIter == NULL) break;
9,460✔
1571

1572
    SNode *pAst = NULL;
5,769✔
1573
    if (nodesStringToNode(pStream->ast, &pAst) != 0) {
5,769!
1574
      code = TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
1575
      mError("stream:%s, create ast error", pStream->name);
×
1576
      sdbRelease(pSdb, pStream);
×
1577
      sdbCancelFetch(pSdb, pIter);
×
1578
      TAOS_RETURN(code);
20✔
1579
    }
1580

1581
    SNodeList *pNodeList = NULL;
5,769✔
1582
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
5,769!
1583
        0) {
1584
      sdbRelease(pSdb, pStream);
×
1585
      sdbCancelFetch(pSdb, pIter);
×
1586
      TAOS_RETURN(code);
×
1587
    }
1588

1589
    SNode *pNode = NULL;
5,769✔
1590
    FOREACH(pNode, pNodeList) {
5,834!
1591
      SColumnNode *pCol = (SColumnNode *)pNode;
5,829✔
1592

1593
      if (pCol->tableId != suid) {
5,829✔
1594
        mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
5,744!
1595
        goto NEXT;
5,744✔
1596
      }
1597
      if (pCol->colId > 0 && pCol->colId == colId) {
85!
1598
        code = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
20✔
1599
        mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId);
20!
1600
        nodesDestroyNode(pAst);
20✔
1601
        nodesDestroyList(pNodeList);
20✔
1602
        sdbRelease(pSdb, pStream);
20✔
1603
        sdbCancelFetch(pSdb, pIter);
20✔
1604
        TAOS_RETURN(code);
20✔
1605
      }
1606
      mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
65!
1607
    }
1608

1609
  NEXT:
5✔
1610
    sdbRelease(pSdb, pStream);
5,749✔
1611
    nodesDestroyNode(pAst);
5,749✔
1612
    nodesDestroyList(pNodeList);
5,749✔
1613
  }
1614
  TAOS_RETURN(code);
3,691✔
1615
}
1616

1617
static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
3,691✔
1618
  int32_t code = 0;
3,691✔
1619
  SSdb   *pSdb = pMnode->pSdb;
3,691✔
1620
  void   *pIter = NULL;
3,691✔
1621
  while (1) {
10✔
1622
    SSmaObj *pSma = NULL;
3,701✔
1623
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
3,701✔
1624
    if (pIter == NULL) break;
3,701✔
1625

1626
    mInfo("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbFullName,
10!
1627
          suid, colId, pSma->sql);
1628

1629
    SNode *pAst = NULL;
10✔
1630
    if (nodesStringToNode(pSma->ast, &pAst) != 0) {
10!
1631
      code = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
×
1632
      mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
×
1633
             pSma->name, stbFullName, suid, colId);
1634
      sdbCancelFetch(pSdb, pIter);
×
1635
      TAOS_RETURN(code);
×
1636
    }
1637

1638
    SNodeList *pNodeList = NULL;
10✔
1639
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
10!
1640
        0) {
1641
      sdbCancelFetch(pSdb, pIter);
×
1642
      TAOS_RETURN(code);
×
1643
    }
1644
    SNode *pNode = NULL;
10✔
1645
    FOREACH(pNode, pNodeList) {
55!
1646
      SColumnNode *pCol = (SColumnNode *)pNode;
50✔
1647
      mInfo("tsma:%s, check colId:%d tableId:%" PRId64, pSma->name, pCol->colId, pCol->tableId);
50!
1648

1649
      if ((pCol->tableId != suid) && (pSma->stbUid != suid)) {
50!
1650
        mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
5!
1651
        goto NEXT;
5✔
1652
      }
1653
      if ((pCol->colId) > 0 && (pCol->colId == colId)) {
45!
1654
        code = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA;
×
1655
        mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId);
×
1656
        nodesDestroyNode(pAst);
×
1657
        nodesDestroyList(pNodeList);
×
1658
        sdbRelease(pSdb, pSma);
×
1659
        sdbCancelFetch(pSdb, pIter);
×
1660
        TAOS_RETURN(code);
×
1661
      }
1662
      mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
45!
1663
    }
1664

1665
  NEXT:
5✔
1666
    sdbRelease(pSdb, pSma);
10✔
1667
    nodesDestroyNode(pAst);
10✔
1668
    nodesDestroyList(pNodeList);
10✔
1669
  }
1670
  TAOS_RETURN(code);
3,691✔
1671
}
1672

1673
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
3,789✔
1674
  TAOS_CHECK_RETURN(mndCheckAlterColForTopic(pMnode, stbFullName, suid, colId));
3,789✔
1675
  TAOS_CHECK_RETURN(mndCheckAlterColForStream(pMnode, stbFullName, suid, colId));
3,711✔
1676
  TAOS_CHECK_RETURN(mndCheckAlterColForTSma(pMnode, stbFullName, suid, colId));
3,691!
1677
  TAOS_RETURN(0);
3,691✔
1678
}
1679

1680
static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
190✔
1681
  int32_t code = 0;
190✔
1682
  int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
190✔
1683
  if (tag < 0) {
190✔
1684
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
3✔
1685
    TAOS_RETURN(code);
3✔
1686
  }
1687

1688
  col_id_t colId = pOld->pTags[tag].colId;
187✔
1689
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
187✔
1690

1691
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
159!
1692

1693
  memmove(pNew->pTags + tag, pNew->pTags + tag + 1, sizeof(SSchema) * (pNew->numOfTags - tag - 1));
159✔
1694
  pNew->numOfTags--;
159✔
1695

1696
  pNew->tagVer++;
159✔
1697

1698
  // if (mndDropIndexByTag(pMnode, pOld, tagName) != 0) {
1699
  //   return -1;
1700
  // }
1701
  mInfo("stb:%s, start to drop tag %s", pNew->name, tagName);
159!
1702
  TAOS_RETURN(code);
159✔
1703
}
1704

1705
static int32_t mndAlterStbTagName(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
101✔
1706
  int32_t code = 0;
101✔
1707
  if ((int32_t)taosArrayGetSize(pFields) != 2) {
101!
1708
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1709
    TAOS_RETURN(code);
×
1710
  }
1711

1712
  SField *pField0 = taosArrayGet(pFields, 0);
101✔
1713
  SField *pField1 = taosArrayGet(pFields, 1);
101✔
1714

1715
  const char *oldTagName = pField0->name;
101✔
1716
  const char *newTagName = pField1->name;
101✔
1717

1718
  int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName);
101✔
1719
  if (tag < 0) {
101✔
1720
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
7✔
1721
    TAOS_RETURN(code);
7✔
1722
  }
1723

1724
  col_id_t colId = pOld->pTags[tag].colId;
94✔
1725
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
94✔
1726

1727
  if (mndFindSuperTableTagIndex(pOld, newTagName) >= 0) {
72✔
1728
    code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
9✔
1729
    TAOS_RETURN(code);
9✔
1730
  }
1731

1732
  if (mndFindSuperTableColumnIndex(pOld, newTagName) >= 0) {
63!
1733
    code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
×
1734
    TAOS_RETURN(code);
×
1735
  }
1736

1737
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
63!
1738

1739
  SSchema *pSchema = (SSchema *)(pNew->pTags + tag);
63✔
1740
  memcpy(pSchema->name, newTagName, TSDB_COL_NAME_LEN);
63✔
1741

1742
  pNew->tagVer++;
63✔
1743
  mInfo("stb:%s, start to modify tag %s to %s", pNew->name, oldTagName, newTagName);
63!
1744
  TAOS_RETURN(code);
63✔
1745
}
1746

1747
static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
492✔
1748
  int32_t code = 0;
492✔
1749
  int32_t tag = mndFindSuperTableTagIndex(pOld, pField->name);
492✔
1750
  if (tag < 0) {
492!
1751
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
×
1752
    TAOS_RETURN(code);
×
1753
  }
1754

1755
  col_id_t colId = pOld->pTags[tag].colId;
492✔
1756
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
492✔
1757

1758
  uint32_t nLen = 0;
483✔
1759
  for (int32_t i = 0; i < pOld->numOfTags; ++i) {
18,783✔
1760
    nLen += (pOld->pTags[i].colId == colId) ? pField->bytes : pOld->pTags[i].bytes;
18,300✔
1761
  }
1762

1763
  if (nLen > TSDB_MAX_TAGS_LEN) {
483✔
1764
    code = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
290✔
1765
    TAOS_RETURN(code);
290✔
1766
  }
1767

1768
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
193!
1769

1770
  SSchema *pTag = pNew->pTags + tag;
193✔
1771

1772
  if (!(pTag->type == TSDB_DATA_TYPE_BINARY || pTag->type == TSDB_DATA_TYPE_VARBINARY ||
193!
1773
        pTag->type == TSDB_DATA_TYPE_NCHAR || pTag->type == TSDB_DATA_TYPE_GEOMETRY)) {
173!
1774
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1775
    TAOS_RETURN(code);
×
1776
  }
1777

1778
  if (pField->bytes <= pTag->bytes) {
193✔
1779
    code = TSDB_CODE_MND_INVALID_ROW_BYTES;
55✔
1780
    TAOS_RETURN(code);
55✔
1781
  }
1782

1783
  pTag->bytes = pField->bytes;
138✔
1784
  pNew->tagVer++;
138✔
1785

1786
  mInfo("stb:%s, start to modify tag len %s to %d", pNew->name, pField->name, pField->bytes);
138!
1787
  TAOS_RETURN(code);
138✔
1788
}
1789

1790
static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pField,
25✔
1791
                                                 int32_t nCols) {
1792
  // if (pColCmpr == NULL || colName == NULL) return -1;
1793

1794
  if (taosArrayGetSize(pField) != nCols) return TSDB_CODE_FAILED;
25!
1795
  TAOS_FIELD *p = taosArrayGet(pField, 0);
25✔
1796

1797
  int32_t code = 0;
25✔
1798
  int32_t idx = mndFindSuperTableColumnIndex(pOld, p->name);
25✔
1799
  if (idx == -1) {
25!
1800
    code = TSDB_CODE_MND_COLUMN_NOT_EXIST;
×
1801
    TAOS_RETURN(code);
×
1802
  }
1803
  SSchema *pTarget = &pOld->pColumns[idx];
25✔
1804
  col_id_t colId = pTarget->colId;
25✔
1805
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
25!
1806

1807
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
25!
1808
  code = validColCmprByType(pTarget->type, p->bytes);
25✔
1809
  if (code != TSDB_CODE_SUCCESS) {
25✔
1810
    TAOS_RETURN(code);
1✔
1811
  }
1812

1813
  int8_t updated = 0;
24✔
1814
  for (int i = 0; i < pNew->numOfColumns; i++) {
158!
1815
    SColCmpr *pCmpr = &pNew->pCmpr[i];
158✔
1816
    if (pCmpr->id == colId) {
158✔
1817
      uint32_t dst = 0;
24✔
1818
      updated = tUpdateCompress(pCmpr->alg, p->bytes, TSDB_COLVAL_COMPRESS_DISABLED, TSDB_COLVAL_LEVEL_DISABLED,
24✔
1819
                                TSDB_COLVAL_LEVEL_MEDIUM, &dst);
1820
      if (updated > 0) pCmpr->alg = dst;
24✔
1821
      break;
24✔
1822
    }
1823
  }
1824

1825
  if (updated == 0) {
24✔
1826
    code = TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST;
7✔
1827
    TAOS_RETURN(code);
7✔
1828
  } else if (updated == -1) {
17!
1829
    code = TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
×
1830
    TAOS_RETURN(code);
×
1831
  }
1832

1833
  pNew->colVer++;
17✔
1834

1835
  TAOS_RETURN(code);
17✔
1836
}
1837
static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ncols,
1,230✔
1838
                                      int8_t withCompress) {
1839
  int32_t code = 0;
1,230✔
1840
  if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) {
1,230!
1841
    code = TSDB_CODE_MND_TOO_MANY_COLUMNS;
×
1842
    TAOS_RETURN(code);
×
1843
  }
1844

1845
  if ((code = grantCheck(TSDB_GRANT_TIMESERIES)) != 0) {
1,230!
1846
    TAOS_RETURN(code);
×
1847
  }
1848

1849
  if (!mndValidateSchema(pOld->pColumns, pOld->numOfColumns, pFields, TSDB_MAX_BYTES_PER_ROW)) {
1,230✔
1850
    code = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
17✔
1851
    TAOS_RETURN(code);
17✔
1852
  }
1853

1854
  pNew->numOfColumns = pNew->numOfColumns + ncols;
1,213✔
1855

1856
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
1,213!
1857

1858
  if (pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ncols) {
1,213!
1859
    code = TSDB_CODE_OUT_OF_RANGE;
×
1860
    TAOS_RETURN(code);
×
1861
  }
1862

1863
  for (int32_t i = 0; i < ncols; i++) {
1,923✔
1864
    if (withCompress) {
1,213✔
1865
      SFieldWithOptions *pField = taosArrayGet(pFields, i);
2✔
1866
      if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
2!
1867
        code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
×
1868
        TAOS_RETURN(code);
×
1869
      }
1870

1871
      if (mndFindSuperTableTagIndex(pOld, pField->name) >= 0) {
2!
1872
        code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
×
1873
        TAOS_RETURN(code);
×
1874
      }
1875

1876
      SSchema *pSchema = &pNew->pColumns[pOld->numOfColumns + i];
2✔
1877
      pSchema->bytes = pField->bytes;
2✔
1878
      pSchema->type = pField->type;
2✔
1879
      memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
2✔
1880
      pSchema->colId = pNew->nextColId;
2✔
1881
      pNew->nextColId++;
2✔
1882

1883
      SColCmpr *pCmpr = &pNew->pCmpr[pOld->numOfColumns + i];
2✔
1884
      pCmpr->id = pSchema->colId;
2✔
1885
      pCmpr->alg = pField->compress;
2✔
1886
      mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name);
2!
1887
    } else {
1888
      SField *pField = taosArrayGet(pFields, i);
1,211✔
1889
      if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
1,211✔
1890
        code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
497✔
1891
        TAOS_RETURN(code);
497✔
1892
      }
1893

1894
      if (mndFindSuperTableTagIndex(pOld, pField->name) >= 0) {
714✔
1895
        code = TSDB_CODE_MND_TAG_ALREADY_EXIST;
6✔
1896
        TAOS_RETURN(code);
6✔
1897
      }
1898

1899
      SSchema *pSchema = &pNew->pColumns[pOld->numOfColumns + i];
708✔
1900
      pSchema->bytes = pField->bytes;
708✔
1901
      pSchema->type = pField->type;
708✔
1902
      memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
708✔
1903
      pSchema->colId = pNew->nextColId;
708✔
1904
      pNew->nextColId++;
708✔
1905

1906
      SColCmpr *pCmpr = &pNew->pCmpr[pOld->numOfColumns + i];
708✔
1907
      pCmpr->id = pSchema->colId;
708✔
1908
      pCmpr->alg = createDefaultColCmprByType(pSchema->type);
708✔
1909
      mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name);
708!
1910
    }
1911
  }
1912

1913
  pNew->colVer++;
710✔
1914
  TAOS_RETURN(code);
710✔
1915
}
1916

1917
static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *colName) {
274✔
1918
  int32_t code = 0;
274✔
1919
  int32_t col = mndFindSuperTableColumnIndex(pOld, colName);
274✔
1920
  if (col < 0) {
274✔
1921
    code = TSDB_CODE_MND_COLUMN_NOT_EXIST;
6✔
1922
    TAOS_RETURN(code);
6✔
1923
  }
1924

1925
  if (col == 0) {
268✔
1926
    code = TSDB_CODE_MND_INVALID_STB_ALTER_OPTION;
4✔
1927
    TAOS_RETURN(code);
4✔
1928
  }
1929

1930
  if (pOld->numOfColumns == 2) {
264!
1931
    code = TSDB_CODE_PAR_INVALID_DROP_COL;
×
1932
    TAOS_RETURN(code);
×
1933
  }
1934

1935
  col_id_t colId = pOld->pColumns[col].colId;
264✔
1936
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
264✔
1937

1938
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
237!
1939

1940
  int32_t sz = pNew->numOfColumns - col - 1;
237✔
1941
  memmove(pNew->pColumns + col, pNew->pColumns + col + 1, sizeof(SSchema) * sz);
237✔
1942
  memmove(pNew->pCmpr + col, pNew->pCmpr + col + 1, sizeof(SColCmpr) * sz);
237✔
1943
  pNew->numOfColumns--;
237✔
1944

1945
  pNew->colVer++;
237✔
1946
  mInfo("stb:%s, start to drop col %s", pNew->name, colName);
237!
1947
  TAOS_RETURN(code);
237✔
1948
}
1949

1950
static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
798✔
1951
  int32_t code = 0;
798✔
1952
  int32_t col = mndFindSuperTableColumnIndex(pOld, pField->name);
798✔
1953
  if (col < 0) {
798✔
1954
    code = TSDB_CODE_MND_COLUMN_NOT_EXIST;
1✔
1955
    TAOS_RETURN(code);
1✔
1956
  }
1957

1958
  col_id_t colId = pOld->pColumns[col].colId;
797✔
1959

1960
  uint32_t nLen = 0;
797✔
1961
  for (int32_t i = 0; i < pOld->numOfColumns; ++i) {
31,384✔
1962
    nLen += (pOld->pColumns[i].colId == colId) ? pField->bytes : pOld->pColumns[i].bytes;
30,587✔
1963
  }
1964

1965
  if (nLen > TSDB_MAX_BYTES_PER_ROW) {
797✔
1966
    code = TSDB_CODE_MND_INVALID_ROW_BYTES;
58✔
1967
    TAOS_RETURN(code);
58✔
1968
  }
1969

1970
  TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
739✔
1971

1972
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
727!
1973

1974
  SSchema *pCol = pNew->pColumns + col;
727✔
1975
  if (!(pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_VARBINARY ||
727✔
1976
        pCol->type == TSDB_DATA_TYPE_NCHAR || pCol->type == TSDB_DATA_TYPE_GEOMETRY)) {
690!
1977
    code = TSDB_CODE_MND_INVALID_STB_OPTION;
×
1978
    TAOS_RETURN(code);
×
1979
  }
1980

1981
  if (pField->bytes <= pCol->bytes) {
727✔
1982
    code = TSDB_CODE_MND_INVALID_ROW_BYTES;
326✔
1983
    TAOS_RETURN(code);
326✔
1984
  }
1985

1986
  pCol->bytes = pField->bytes;
401✔
1987
  pNew->colVer++;
401✔
1988

1989
  mInfo("stb:%s, start to modify col len %s to %d", pNew->name, pField->name, pField->bytes);
401!
1990
  TAOS_RETURN(code);
401✔
1991
}
1992

1993
static int32_t mndSetAlterStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
2,203✔
1994
  int32_t  code = 0;
2,203✔
1995
  SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
2,203✔
1996
  if (pRedoRaw == NULL) {
2,203!
1997
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1998
    if (terrno != 0) code = terrno;
×
1999
    TAOS_RETURN(code);
×
2000
  }
2001
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
2,203!
2002
    sdbFreeRaw(pRedoRaw);
×
2003
    TAOS_RETURN(code);
×
2004
  }
2005
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
2,203!
2006

2007
  TAOS_RETURN(code);
2,203✔
2008
}
2009

2010
static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
2,203✔
2011
  int32_t  code = 0;
2,203✔
2012
  SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
2,203✔
2013
  if (pCommitRaw == NULL) {
2,203!
2014
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2015
    if (terrno != 0) code = terrno;
×
2016
    TAOS_RETURN(code);
×
2017
  }
2018
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
2,203!
2019
    sdbFreeRaw(pCommitRaw);
×
2020
    TAOS_RETURN(code);
×
2021
  }
2022
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
2,203!
2023

2024
  TAOS_RETURN(code);
2,203✔
2025
}
2026

2027
static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void *alterOriData,
2,203✔
2028
                                         int32_t alterOriDataLen) {
2029
  int32_t code = 0;
2,203✔
2030
  SSdb   *pSdb = pMnode->pSdb;
2,203✔
2031
  SVgObj *pVgroup = NULL;
2,203✔
2032
  void   *pIter = NULL;
2,203✔
2033
  int32_t contLen;
2034

2035
  while (1) {
10,667✔
2036
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
12,870✔
2037
    if (pIter == NULL) break;
12,870✔
2038
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
10,667✔
2039
      sdbRelease(pSdb, pVgroup);
5,943✔
2040
      continue;
5,943✔
2041
    }
2042

2043
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, alterOriData, alterOriDataLen);
4,724✔
2044
    if (pReq == NULL) {
4,724!
2045
      sdbCancelFetch(pSdb, pIter);
×
2046
      sdbRelease(pSdb, pVgroup);
×
2047
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2048
      if (terrno != 0) code = terrno;
×
2049
      TAOS_RETURN(code);
×
2050
    }
2051
    STransAction action = {0};
4,724✔
2052
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
4,724✔
2053
    action.pCont = pReq;
4,724✔
2054
    action.contLen = contLen;
4,724✔
2055
    action.msgType = TDMT_VND_ALTER_STB;
4,724✔
2056
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
4,724!
2057
      taosMemoryFree(pReq);
×
2058
      sdbCancelFetch(pSdb, pIter);
×
2059
      sdbRelease(pSdb, pVgroup);
×
2060
      TAOS_RETURN(code);
×
2061
    }
2062
    sdbRelease(pSdb, pVgroup);
4,724✔
2063
  }
2064

2065
  TAOS_RETURN(code);
2,203✔
2066
}
2067

2068
static int32_t mndSetAlterStbRedoActions2(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb,
×
2069
                                          void *alterOriData, int32_t alterOriDataLen) {
2070
  int32_t code = 0;
×
2071
  SSdb   *pSdb = pMnode->pSdb;
×
2072
  SVgObj *pVgroup = NULL;
×
2073
  void   *pIter = NULL;
×
2074
  int32_t contLen;
2075

2076
  while (1) {
×
2077
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
2078
    if (pIter == NULL) break;
×
2079
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
×
2080
      sdbRelease(pSdb, pVgroup);
×
2081
      continue;
×
2082
    }
2083

2084
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, alterOriData, alterOriDataLen);
×
2085
    if (pReq == NULL) {
×
2086
      sdbCancelFetch(pSdb, pIter);
×
2087
      sdbRelease(pSdb, pVgroup);
×
2088
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2089
      if (terrno != 0) code = terrno;
×
2090
      TAOS_RETURN(code);
×
2091
    }
2092
    STransAction action = {0};
×
2093
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
2094
    action.pCont = pReq;
×
2095
    action.contLen = contLen;
×
2096
    action.msgType = TDMT_VND_CREATE_INDEX;
×
2097
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
2098
      taosMemoryFree(pReq);
×
2099
      sdbCancelFetch(pSdb, pIter);
×
2100
      sdbRelease(pSdb, pVgroup);
×
2101
      TAOS_RETURN(code);
×
2102
    }
2103
    sdbRelease(pSdb, pVgroup);
×
2104
  }
2105

2106
  TAOS_RETURN(code);
×
2107
}
2108
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
795,077✔
2109
  int32_t code = 0;
795,077✔
2110
  taosRLockLatch(&pStb->lock);
795,077✔
2111

2112
  int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
795,081✔
2113
  pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
795,081✔
2114
  if (pRsp->pSchemas == NULL) {
795,069!
2115
    taosRUnLockLatch(&pStb->lock);
×
2116
    code = terrno;
×
2117
    TAOS_RETURN(code);
×
2118
  }
2119
  pRsp->pSchemaExt = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaExt));
795,069✔
2120
  if (pRsp->pSchemaExt == NULL) {
795,079!
2121
    taosRUnLockLatch(&pStb->lock);
×
2122
    code = terrno;
×
2123
    TAOS_RETURN(code);
×
2124
  }
2125

2126
  tstrncpy(pRsp->dbFName, pStb->db, sizeof(pRsp->dbFName));
795,081✔
2127
  tstrncpy(pRsp->tbName, tbName, sizeof(pRsp->tbName));
795,081✔
2128
  tstrncpy(pRsp->stbName, tbName, sizeof(pRsp->stbName));
795,081✔
2129
  pRsp->dbId = pDb->uid;
795,081✔
2130
  pRsp->numOfTags = pStb->numOfTags;
795,081✔
2131
  pRsp->numOfColumns = pStb->numOfColumns;
795,081✔
2132
  pRsp->precision = pDb->cfg.precision;
795,081✔
2133
  pRsp->tableType = TSDB_SUPER_TABLE;
795,081✔
2134
  pRsp->sversion = pStb->colVer;
795,081✔
2135
  pRsp->tversion = pStb->tagVer;
795,081✔
2136
  pRsp->suid = pStb->uid;
795,081✔
2137
  pRsp->tuid = pStb->uid;
795,081✔
2138

2139
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
21,147,571✔
2140
    SSchema *pSchema = &pRsp->pSchemas[i];
20,352,490✔
2141
    SSchema *pSrcSchema = &pStb->pColumns[i];
20,352,490✔
2142
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
20,352,490✔
2143
    pSchema->type = pSrcSchema->type;
20,352,490✔
2144
    pSchema->flags = pSrcSchema->flags;
20,352,490✔
2145
    pSchema->colId = pSrcSchema->colId;
20,352,490✔
2146
    pSchema->bytes = pSrcSchema->bytes;
20,352,490✔
2147
  }
2148

2149
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
12,550,218✔
2150
    SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns];
11,755,137✔
2151
    SSchema *pSrcSchema = &pStb->pTags[i];
11,755,137✔
2152
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
11,755,137✔
2153
    pSchema->type = pSrcSchema->type;
11,755,137✔
2154
    pSchema->flags = pSrcSchema->flags;
11,755,137✔
2155
    pSchema->colId = pSrcSchema->colId;
11,755,137✔
2156
    pSchema->bytes = pSrcSchema->bytes;
11,755,137✔
2157
  }
2158
  for (int32_t i = 0; i < pStb->numOfColumns; i++) {
21,148,151✔
2159
    SColCmpr   *pCmpr = &pStb->pCmpr[i];
20,353,070✔
2160
    SSchemaExt *pSchEx = &pRsp->pSchemaExt[i];
20,353,070✔
2161
    pSchEx->colId = pCmpr->id;
20,353,070✔
2162
    pSchEx->compress = pCmpr->alg;
20,353,070✔
2163
  }
2164

2165
  taosRUnLockLatch(&pStb->lock);
795,081✔
2166
  TAOS_RETURN(code);
795,084✔
2167
}
2168

2169
static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableCfgRsp *pRsp) {
38✔
2170
  int32_t code = 0;
38✔
2171
  taosRLockLatch(&pStb->lock);
38✔
2172

2173
  int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
38✔
2174
  pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
38✔
2175
  if (pRsp->pSchemas == NULL) {
38!
2176
    taosRUnLockLatch(&pStb->lock);
×
2177
    code = terrno;
×
2178
    TAOS_RETURN(code);
×
2179
  }
2180

2181
  tstrncpy(pRsp->dbFName, pStb->db, sizeof(pRsp->dbFName));
38✔
2182
  tstrncpy(pRsp->tbName, tbName, sizeof(pRsp->tbName));
38✔
2183
  tstrncpy(pRsp->stbName, tbName, sizeof(pRsp->stbName));
38✔
2184
  pRsp->numOfTags = pStb->numOfTags;
38✔
2185
  pRsp->numOfColumns = pStb->numOfColumns;
38✔
2186
  pRsp->tableType = TSDB_SUPER_TABLE;
38✔
2187
  pRsp->delay1 = pStb->maxdelay[0];
38✔
2188
  pRsp->delay2 = pStb->maxdelay[1];
38✔
2189
  pRsp->watermark1 = pStb->watermark[0];
38✔
2190
  pRsp->watermark2 = pStb->watermark[1];
38✔
2191
  pRsp->ttl = pStb->ttl;
38✔
2192
  pRsp->commentLen = pStb->commentLen;
38✔
2193
  if (pStb->commentLen > 0) {
38!
2194
    pRsp->pComment = taosStrdup(pStb->comment);
×
2195
  }
2196

2197
  for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
188✔
2198
    SSchema *pSchema = &pRsp->pSchemas[i];
150✔
2199
    SSchema *pSrcSchema = &pStb->pColumns[i];
150✔
2200
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
150✔
2201
    pSchema->type = pSrcSchema->type;
150✔
2202
    pSchema->flags = pSrcSchema->flags;
150✔
2203
    pSchema->colId = pSrcSchema->colId;
150✔
2204
    pSchema->bytes = pSrcSchema->bytes;
150✔
2205
  }
2206

2207
  for (int32_t i = 0; i < pStb->numOfTags; ++i) {
117✔
2208
    SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns];
79✔
2209
    SSchema *pSrcSchema = &pStb->pTags[i];
79✔
2210
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
79✔
2211
    pSchema->type = pSrcSchema->type;
79✔
2212
    pSchema->flags = pSrcSchema->flags;
79✔
2213
    pSchema->colId = pSrcSchema->colId;
79✔
2214
    pSchema->bytes = pSrcSchema->bytes;
79✔
2215
  }
2216

2217
  if (pStb->numOfFuncs > 0) {
38!
2218
    pRsp->pFuncs = taosArrayDup(pStb->pFuncs, NULL);
×
2219
  }
2220

2221
  pRsp->pSchemaExt = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaExt));
38✔
2222
  for (int32_t i = 0; i < pStb->numOfColumns; i++) {
188✔
2223
    SColCmpr *pCmpr = &pStb->pCmpr[i];
150✔
2224

2225
    SSchemaExt *pSchExt = &pRsp->pSchemaExt[i];
150✔
2226
    pSchExt->colId = pCmpr->id;
150✔
2227
    pSchExt->compress = pCmpr->alg;
150✔
2228
  }
2229

2230
  taosRUnLockLatch(&pStb->lock);
38✔
2231
  TAOS_RETURN(code);
38✔
2232
}
2233

2234
static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bool *schema, bool *sma) {
75,692✔
2235
  int32_t code = 0;
75,692✔
2236
  char    tbFName[TSDB_TABLE_FNAME_LEN] = {0};
75,692✔
2237
  snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName);
75,692✔
2238

2239
  SDbObj *pDb = mndAcquireDb(pMnode, pStbVer->dbFName);
75,692✔
2240
  if (pDb == NULL) {
75,693✔
2241
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
384✔
2242
    TAOS_RETURN(code);
384✔
2243
  }
2244

2245
  if (pDb->uid != pStbVer->dbId) {
75,309✔
2246
    mndReleaseDb(pMnode, pDb);
190✔
2247
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
190✔
2248
    TAOS_RETURN(code);
190✔
2249
  }
2250

2251
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
75,119✔
2252
  if (pStb == NULL) {
75,119✔
2253
    mndReleaseDb(pMnode, pDb);
161✔
2254
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
161✔
2255
    TAOS_RETURN(code);
161✔
2256
  }
2257

2258
  taosRLockLatch(&pStb->lock);
74,958✔
2259

2260
  if (pStbVer->sversion != pStb->colVer || pStbVer->tversion != pStb->tagVer) {
74,957✔
2261
    *schema = true;
358✔
2262
  } else {
2263
    *schema = false;
74,599✔
2264
  }
2265

2266
  if (pStbVer->smaVer && pStbVer->smaVer != pStb->smaVer) {
74,957!
2267
    *sma = true;
×
2268
  } else {
2269
    *sma = false;
74,957✔
2270
  }
2271

2272
  taosRUnLockLatch(&pStb->lock);
74,957✔
2273

2274
  mndReleaseDb(pMnode, pDb);
74,958✔
2275
  mndReleaseStb(pMnode, pStb);
74,958✔
2276
  return TSDB_CODE_SUCCESS;
74,958✔
2277
}
2278

2279
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
786,341✔
2280
  int32_t code = 0;
786,341✔
2281
  char    tbFName[TSDB_TABLE_FNAME_LEN] = {0};
786,341✔
2282
  snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
786,341✔
2283

2284
  SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
786,341✔
2285
  if (pDb == NULL) {
786,337!
2286
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2287
    TAOS_RETURN(code);
×
2288
  }
2289

2290
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
786,337✔
2291
  if (pStb == NULL) {
786,336✔
2292
    mndReleaseDb(pMnode, pDb);
2,786✔
2293
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
2,786✔
2294
    TAOS_RETURN(code);
2,786✔
2295
  }
2296

2297
  code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp);
783,550✔
2298
  mndReleaseDb(pMnode, pDb);
783,552✔
2299
  mndReleaseStb(pMnode, pStb);
783,555✔
2300
  TAOS_RETURN(code);
783,555✔
2301
}
2302

2303
static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
38✔
2304
  int32_t code = 0;
38✔
2305
  char    tbFName[TSDB_TABLE_FNAME_LEN] = {0};
38✔
2306
  snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
38✔
2307

2308
  SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
38✔
2309
  if (pDb == NULL) {
38!
2310
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2311
    TAOS_RETURN(code);
×
2312
  }
2313

2314
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
38✔
2315
  if (pStb == NULL) {
38!
2316
    mndReleaseDb(pMnode, pDb);
×
2317
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
2318
    TAOS_RETURN(code);
×
2319
  }
2320

2321
  code = mndBuildStbCfgImp(pDb, pStb, tbName, pRsp);
38✔
2322

2323
  mndReleaseDb(pMnode, pDb);
38✔
2324
  mndReleaseStb(pMnode, pStb);
38✔
2325
  TAOS_RETURN(code);
38✔
2326
}
2327

2328
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, int32_t *pLen) {
2,071✔
2329
  int32_t       code = 0;
2,071✔
2330
  SEncoder      ec = {0};
2,071✔
2331
  uint32_t      contLen = 0;
2,071✔
2332
  SMAlterStbRsp alterRsp = {0};
2,071✔
2333
  SName         name = {0};
2,071✔
2334
  TAOS_CHECK_RETURN(tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
2,071!
2335

2336
  alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
2,071✔
2337
  if (NULL == alterRsp.pMeta) {
2,071!
2338
    code = terrno;
×
2339
    TAOS_RETURN(code);
×
2340
  }
2341

2342
  code = mndBuildStbSchemaImp(pDb, pObj, name.tname, alterRsp.pMeta);
2,071✔
2343
  if (code) {
2,071!
2344
    tFreeSMAlterStbRsp(&alterRsp);
×
2345
    return code;
×
2346
  }
2347

2348
  tEncodeSize(tEncodeSMAlterStbRsp, &alterRsp, contLen, code);
2,071!
2349
  if (code) {
2,071!
2350
    tFreeSMAlterStbRsp(&alterRsp);
×
2351
    return code;
×
2352
  }
2353

2354
  void *cont = taosMemoryMalloc(contLen);
2,071✔
2355
  if (NULL == cont) {
2,071!
2356
    code = terrno;
×
2357
    tFreeSMAlterStbRsp(&alterRsp);
×
2358
    TAOS_RETURN(code);
×
2359
  }
2360
  tEncoderInit(&ec, cont, contLen);
2,071✔
2361
  code = tEncodeSMAlterStbRsp(&ec, &alterRsp);
2,071✔
2362
  tEncoderClear(&ec);
2,071✔
2363

2364
  tFreeSMAlterStbRsp(&alterRsp);
2,071✔
2365

2366
  if (code < 0) TAOS_RETURN(code);
2,071!
2367

2368
  *pCont = cont;
2,071✔
2369
  *pLen = contLen;
2,071✔
2370

2371
  TAOS_RETURN(code);
2,071✔
2372
}
2373

2374
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char *dbFName, char *stbFName, void **pCont, int32_t *pLen) {
9,463✔
2375
  int32_t code = -1;
9,463✔
2376
  SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
9,463✔
2377
  if (NULL == pDb) {
9,463!
2378
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2379
    if (terrno != 0) code = terrno;
×
2380
    TAOS_RETURN(code);
×
2381
  }
2382

2383
  SStbObj *pObj = mndAcquireStb(pMnode, stbFName);
9,463✔
2384
  if (NULL == pObj) {
9,463✔
2385
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
4✔
2386
    if (terrno != 0) code = terrno;
4!
2387
    goto _OVER;
4✔
2388
  }
2389

2390
  SEncoder       ec = {0};
9,459✔
2391
  uint32_t       contLen = 0;
9,459✔
2392
  SMCreateStbRsp stbRsp = {0};
9,459✔
2393
  SName          name = {0};
9,459✔
2394
  TAOS_CHECK_GOTO(tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE), NULL, _OVER);
9,459!
2395

2396
  stbRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
9,459✔
2397
  if (NULL == stbRsp.pMeta) {
9,459!
2398
    code = terrno;
×
2399
    goto _OVER;
×
2400
  }
2401

2402
  code = mndBuildStbSchemaImp(pDb, pObj, name.tname, stbRsp.pMeta);
9,459✔
2403
  if (code) {
9,459!
2404
    tFreeSMCreateStbRsp(&stbRsp);
×
2405
    goto _OVER;
×
2406
  }
2407

2408
  tEncodeSize(tEncodeSMCreateStbRsp, &stbRsp, contLen, code);
9,459!
2409
  if (code) {
9,459!
2410
    tFreeSMCreateStbRsp(&stbRsp);
×
2411
    goto _OVER;
×
2412
  }
2413

2414
  void *cont = taosMemoryMalloc(contLen);
9,459✔
2415
  if (NULL == cont) {
9,459!
2416
    code = terrno;
×
2417
    tFreeSMCreateStbRsp(&stbRsp);
×
2418
    goto _OVER;
×
2419
  }
2420
  tEncoderInit(&ec, cont, contLen);
9,459✔
2421
  TAOS_CHECK_GOTO(tEncodeSMCreateStbRsp(&ec, &stbRsp), NULL, _OVER);
9,459!
2422
  tEncoderClear(&ec);
9,459✔
2423

2424
  tFreeSMCreateStbRsp(&stbRsp);
9,459✔
2425

2426
  *pCont = cont;
9,459✔
2427
  *pLen = contLen;
9,459✔
2428

2429
  code = 0;
9,459✔
2430

2431
_OVER:
9,463✔
2432
  if (pObj) {
9,463✔
2433
    mndReleaseStb(pMnode, pObj);
9,459✔
2434
  }
2435

2436
  if (pDb) {
9,463!
2437
    mndReleaseDb(pMnode, pDb);
9,463✔
2438
  }
2439

2440
  TAOS_RETURN(code);
9,463✔
2441
}
2442

2443
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
2,011✔
2444
                              void *alterOriData, int32_t alterOriDataLen) {
2445
  int32_t code = -1;
2,011✔
2446
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-stb");
2,011✔
2447
  if (pTrans == NULL) {
2,011!
2448
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2449
    if (terrno != 0) code = terrno;
×
2450
    goto _OVER;
×
2451
  }
2452

2453
  mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
2,011!
2454
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
2,011✔
2455
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
2,011✔
2456

2457
  if (needRsp) {
1,981✔
2458
    void   *pCont = NULL;
1,849✔
2459
    int32_t contLen = 0;
1,849✔
2460
    TAOS_CHECK_GOTO(mndBuildSMAlterStbRsp(pDb, pStb, &pCont, &contLen), NULL, _OVER);
1,849!
2461
    mndTransSetRpcRsp(pTrans, pCont, contLen);
1,849✔
2462
  }
2463

2464
  TAOS_CHECK_GOTO(mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
1,981!
2465
  TAOS_CHECK_GOTO(mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
1,981!
2466
  TAOS_CHECK_GOTO(mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen), NULL, _OVER);
1,981!
2467
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
1,981!
2468

2469
  code = 0;
1,981✔
2470

2471
_OVER:
2,011✔
2472
  mndTransDrop(pTrans);
2,011✔
2473
  TAOS_RETURN(code);
2,011✔
2474
}
2475

2476
static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
222✔
2477
                                             void *alterOriData, int32_t alterOriDataLen, const SMAlterStbReq *pAlter) {
2478
  int32_t code = -1;
222✔
2479
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-stb");
222✔
2480
  if (pTrans == NULL) {
222!
2481
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2482
    if (terrno != 0) code = terrno;
×
2483
    goto _OVER;
×
2484
  }
2485

2486
  mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
222!
2487
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
222✔
2488

2489
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
222!
2490

2491
  if (needRsp) {
222!
2492
    void   *pCont = NULL;
222✔
2493
    int32_t contLen = 0;
222✔
2494
    TAOS_CHECK_GOTO(mndBuildSMAlterStbRsp(pDb, pStb, &pCont, &contLen), NULL, _OVER);
222!
2495
    mndTransSetRpcRsp(pTrans, pCont, contLen);
222✔
2496
  }
2497

2498
  if (pAlter->alterType == TSDB_ALTER_TABLE_DROP_TAG) {
222✔
2499
    SIdxObj idxObj = {0};
159✔
2500
    SField *pField0 = taosArrayGet(pAlter->pFields, 0);
159✔
2501
    bool    exist = false;
159✔
2502
    if (mndGetIdxsByTagName(pMnode, pStb, pField0->name, &idxObj) == 0) {
159✔
2503
      exist = true;
21✔
2504
    }
2505
    TAOS_CHECK_GOTO(mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
159!
2506
    TAOS_CHECK_GOTO(mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
159!
2507

2508
    if (exist == true) {
159✔
2509
      TAOS_CHECK_GOTO(mndSetDropIdxPrepareLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
21!
2510
      TAOS_CHECK_GOTO(mndSetDropIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
21!
2511
    }
2512

2513
    TAOS_CHECK_GOTO(mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen), NULL, _OVER);
159!
2514
    TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
159!
2515

2516
  } else if (pAlter->alterType == TSDB_ALTER_TABLE_UPDATE_TAG_NAME) {
63!
2517
    SIdxObj     idxObj = {0};
63✔
2518
    SField     *pField0 = taosArrayGet(pAlter->pFields, 0);
63✔
2519
    SField     *pField1 = taosArrayGet(pAlter->pFields, 1);
63✔
2520
    const char *oTagName = pField0->name;
63✔
2521
    const char *nTagName = pField1->name;
63✔
2522
    bool        exist = false;
63✔
2523

2524
    if (mndGetIdxsByTagName(pMnode, pStb, pField0->name, &idxObj) == 0) {
63✔
2525
      exist = true;
32✔
2526
    }
2527

2528
    TAOS_CHECK_GOTO(mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
63!
2529
    TAOS_CHECK_GOTO(mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb), NULL, _OVER);
63!
2530

2531
    if (exist == true) {
63✔
2532
      memcpy(idxObj.colName, nTagName, strlen(nTagName));
32✔
2533
      idxObj.colName[strlen(nTagName)] = 0;
32✔
2534
      TAOS_CHECK_GOTO(mndSetAlterIdxPrepareLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
32!
2535
      TAOS_CHECK_GOTO(mndSetAlterIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER);
32!
2536
    }
2537

2538
    TAOS_CHECK_GOTO(mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen), NULL, _OVER);
63!
2539
    TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
63!
2540
  }
2541
  code = 0;
222✔
2542

2543
_OVER:
222✔
2544
  mndTransDrop(pTrans);
222✔
2545
  TAOS_RETURN(code);
222✔
2546
}
2547

2548
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
3,671✔
2549
  bool    needRsp = true;
3,671✔
2550
  int32_t code = -1;
3,671✔
2551
  SField *pField0 = NULL;
3,671✔
2552

2553
  SStbObj stbObj = {0};
3,671✔
2554
  taosRLockLatch(&pOld->lock);
3,671✔
2555
  memcpy(&stbObj, pOld, sizeof(SStbObj));
3,671✔
2556
  taosRUnLockLatch(&pOld->lock);
3,671✔
2557
  stbObj.pColumns = NULL;
3,671✔
2558
  stbObj.pTags = NULL;
3,671✔
2559
  stbObj.pFuncs = NULL;
3,671✔
2560
  stbObj.pCmpr = NULL;
3,671✔
2561
  stbObj.updateTime = taosGetTimestampMs();
3,671✔
2562
  stbObj.lock = 0;
3,671✔
2563
  bool updateTagIndex = false;
3,671✔
2564
  switch (pAlter->alterType) {
3,671!
2565
    case TSDB_ALTER_TABLE_ADD_TAG:
528✔
2566
      code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
528✔
2567
      break;
528✔
2568
    case TSDB_ALTER_TABLE_DROP_TAG:
190✔
2569
      pField0 = taosArrayGet(pAlter->pFields, 0);
190✔
2570
      code = mndDropSuperTableTag(pMnode, pOld, &stbObj, pField0->name);
190✔
2571
      updateTagIndex = true;
190✔
2572
      break;
190✔
2573
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
101✔
2574
      code = mndAlterStbTagName(pMnode, pOld, &stbObj, pAlter->pFields);
101✔
2575
      updateTagIndex = true;
101✔
2576
      break;
101✔
2577
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
492✔
2578
      pField0 = taosArrayGet(pAlter->pFields, 0);
492✔
2579
      code = mndAlterStbTagBytes(pMnode, pOld, &stbObj, pField0);
492✔
2580
      break;
492✔
2581
    case TSDB_ALTER_TABLE_ADD_COLUMN:
1,228✔
2582
      code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields, 0);
1,228✔
2583
      break;
1,228✔
2584
    case TSDB_ALTER_TABLE_DROP_COLUMN:
274✔
2585
      pField0 = taosArrayGet(pAlter->pFields, 0);
274✔
2586
      code = mndDropSuperTableColumn(pMnode, pOld, &stbObj, pField0->name);
274✔
2587
      break;
274✔
2588
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
798✔
2589
      pField0 = taosArrayGet(pAlter->pFields, 0);
798✔
2590
      code = mndAlterStbColumnBytes(pMnode, pOld, &stbObj, pField0);
798✔
2591
      break;
798✔
2592
    case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
33✔
2593
      needRsp = false;
33✔
2594
      code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl);
33✔
2595
      break;
33✔
2596
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
25✔
2597
      code = mndUpdateSuperTableColumnCompress(pMnode, pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
25✔
2598
      break;
25✔
2599
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
2✔
2600
      code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields, 1);
2✔
2601
      break;
2✔
2602
    default:
×
2603
      needRsp = false;
×
2604
      terrno = TSDB_CODE_OPS_NOT_SUPPORT;
×
2605
      break;
×
2606
  }
2607

2608
  if (code != 0) goto _OVER;
3,671✔
2609
  if (updateTagIndex == false) {
2,134✔
2610
    code = mndAlterStbImp(pMnode, pReq, pDb, &stbObj, needRsp, pReq->pCont, pReq->contLen);
1,912✔
2611
  } else {
2612
    code = mndAlterStbAndUpdateTagIdxImp(pMnode, pReq, pDb, &stbObj, needRsp, pReq->pCont, pReq->contLen, pAlter);
222✔
2613
  }
2614

2615
_OVER:
3,671✔
2616
  taosMemoryFreeClear(stbObj.pTags);
3,671✔
2617
  taosMemoryFreeClear(stbObj.pColumns);
3,671✔
2618
  taosMemoryFreeClear(stbObj.pCmpr);
3,671✔
2619
  if (pAlter->commentLen > 0) {
3,671✔
2620
    taosMemoryFreeClear(stbObj.comment);
22!
2621
  }
2622
  TAOS_RETURN(code);
3,671✔
2623
}
2624

2625
static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
3,693✔
2626
  SMnode       *pMnode = pReq->info.node;
3,693✔
2627
  int32_t       code = -1;
3,693✔
2628
  SDbObj       *pDb = NULL;
3,693✔
2629
  SStbObj      *pStb = NULL;
3,693✔
2630
  SMAlterStbReq alterReq = {0};
3,693✔
2631

2632
  if (tDeserializeSMAlterStbReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
3,693!
2633
    code = TSDB_CODE_INVALID_MSG;
×
2634
    goto _OVER;
×
2635
  }
2636

2637
  mInfo("stb:%s, start to alter", alterReq.name);
3,693!
2638
  if (mndCheckAlterStbReq(&alterReq) != 0) goto _OVER;
3,693!
2639

2640
  pDb = mndAcquireDbByStb(pMnode, alterReq.name);
3,693✔
2641
  if (pDb == NULL) {
3,693!
2642
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
2643
    goto _OVER;
×
2644
  }
2645

2646
  pStb = mndAcquireStb(pMnode, alterReq.name);
3,693✔
2647
  if (pStb == NULL) {
3,693✔
2648
    code = TSDB_CODE_MND_STB_NOT_EXIST;
22✔
2649
    goto _OVER;
22✔
2650
  }
2651

2652
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
3,671!
2653
    goto _OVER;
×
2654
  }
2655

2656
  code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
3,671✔
2657
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
3,671✔
2658

2659
  SName   name = {0};
3,671✔
2660
  int32_t ret = 0;
3,671✔
2661
  if ((ret = tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
3,671!
2662
    mError("stb:%s, failed to tNameFromString since %s", alterReq.name, tstrerror(ret));
×
2663

2664
  auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, name.tname, alterReq.sql, alterReq.sqlLen);
3,671✔
2665

2666
_OVER:
3,693✔
2667
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
3,693!
2668
    mError("stb:%s, failed to alter since %s", alterReq.name, tstrerror(code));
1,589!
2669
  }
2670

2671
  mndReleaseStb(pMnode, pStb);
3,693✔
2672
  mndReleaseDb(pMnode, pDb);
3,693✔
2673
  tFreeSMAltertbReq(&alterReq);
3,693✔
2674

2675
  TAOS_RETURN(code);
3,693✔
2676
}
2677

2678
static int32_t mndSetDropStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
1,520✔
2679
  int32_t  code = 0;
1,520✔
2680
  SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
1,520✔
2681
  if (pRedoRaw == NULL) {
1,520!
2682
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2683
    if (terrno != 0) code = terrno;
×
2684
    TAOS_RETURN(code);
×
2685
  }
2686
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
1,520!
2687
    sdbFreeRaw(pRedoRaw);
×
2688
    TAOS_RETURN(code);
×
2689
  }
2690
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
1,520!
2691

2692
  TAOS_RETURN(code);
1,520✔
2693
}
2694

2695
static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
1,520✔
2696
  int32_t  code = 0;
1,520✔
2697
  SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
1,520✔
2698
  if (pCommitRaw == NULL) {
1,520!
2699
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2700
    if (terrno != 0) code = terrno;
×
2701
    TAOS_RETURN(code);
×
2702
  }
2703
  if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
1,520!
2704
    sdbFreeRaw(pCommitRaw);
×
2705
    TAOS_RETURN(code);
×
2706
  }
2707
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
1,520!
2708

2709
  TAOS_RETURN(code);
1,520✔
2710
}
2711

2712
static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
1,520✔
2713
  int32_t code = 0;
1,520✔
2714
  SSdb   *pSdb = pMnode->pSdb;
1,520✔
2715
  SVgObj *pVgroup = NULL;
1,520✔
2716
  void   *pIter = NULL;
1,520✔
2717

2718
  while (1) {
5,660✔
2719
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
7,180✔
2720
    if (pIter == NULL) break;
7,180✔
2721
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
5,660✔
2722
      sdbRelease(pSdb, pVgroup);
2,239✔
2723
      continue;
2,239✔
2724
    }
2725

2726
    int32_t contLen = 0;
3,421✔
2727
    void   *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
3,421✔
2728
    if (pReq == NULL) {
3,421!
2729
      sdbCancelFetch(pSdb, pIter);
×
2730
      sdbRelease(pSdb, pVgroup);
×
2731
      code = TSDB_CODE_OUT_OF_MEMORY;
×
2732
      TAOS_RETURN(code);
×
2733
    }
2734

2735
    STransAction action = {0};
3,421✔
2736
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
3,421✔
2737
    action.pCont = pReq;
3,421✔
2738
    action.contLen = contLen;
3,421✔
2739
    action.msgType = TDMT_VND_DROP_STB;
3,421✔
2740
    action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
3,421✔
2741
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
3,421!
2742
      taosMemoryFree(pReq);
×
2743
      sdbCancelFetch(pSdb, pIter);
×
2744
      sdbRelease(pSdb, pVgroup);
×
2745
      TAOS_RETURN(code);
×
2746
    }
2747
    sdbRelease(pSdb, pVgroup);
3,421✔
2748
  }
2749

2750
  TAOS_RETURN(code);
1,520✔
2751
}
2752

2753
static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
1,520✔
2754
  int32_t code = -1;
1,520✔
2755
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stb");
1,520✔
2756
  if (pTrans == NULL) {
1,520!
2757
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2758
    if (terrno != 0) code = terrno;
×
2759
    goto _OVER;
×
2760
  }
2761

2762
  mInfo("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
1,520!
2763
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
1,520✔
2764
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
1,520!
2765

2766
  TAOS_CHECK_GOTO(mndSetDropStbPrepareLogs(pMnode, pTrans, pStb), NULL, _OVER);
1,520!
2767
  TAOS_CHECK_GOTO(mndSetDropStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
1,520!
2768
  TAOS_CHECK_GOTO(mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb), NULL, _OVER);
1,520!
2769
  TAOS_CHECK_GOTO(mndDropIdxsByStb(pMnode, pTrans, pDb, pStb), NULL, _OVER);
1,520!
2770
  TAOS_CHECK_GOTO(mndDropSmasByStb(pMnode, pTrans, pDb, pStb), NULL, _OVER);
1,520!
2771
  TAOS_CHECK_GOTO(mndUserRemoveStb(pMnode, pTrans, pStb->name), NULL, _OVER);
1,520!
2772
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
1,520!
2773
  code = 0;
1,520✔
2774

2775
_OVER:
1,520✔
2776
  mndTransDrop(pTrans);
1,520✔
2777
  TAOS_RETURN(code);
1,520✔
2778
}
2779

2780
static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) {
1,524✔
2781
  int32_t code = 0;
1,524✔
2782
  SSdb   *pSdb = pMnode->pSdb;
1,524✔
2783
  void   *pIter = NULL;
1,524✔
2784
  while (1) {
80✔
2785
    SMqTopicObj *pTopic = NULL;
1,604✔
2786
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
1,604✔
2787
    if (pIter == NULL) break;
1,604✔
2788

2789
    if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
80✔
2790
      if (pTopic->stbUid == suid) {
14!
2791
        sdbRelease(pSdb, pTopic);
×
2792
        sdbCancelFetch(pSdb, pIter);
×
2793
        TAOS_RETURN(-1);
×
2794
      }
2795
    }
2796

2797
    if (pTopic->ast == NULL) {
80✔
2798
      sdbRelease(pSdb, pTopic);
38✔
2799
      continue;
38✔
2800
    }
2801

2802
    SNode *pAst = NULL;
42✔
2803
    if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
42!
2804
      code = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
×
2805
      mError("topic:%s, create ast error", pTopic->name);
×
2806
      sdbRelease(pSdb, pTopic);
×
2807
      sdbCancelFetch(pSdb, pIter);
×
2808
      TAOS_RETURN(code);
×
2809
    }
2810

2811
    SNodeList *pNodeList = NULL;
42✔
2812
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
42!
2813
        0) {
2814
      sdbRelease(pSdb, pTopic);
×
2815
      sdbCancelFetch(pSdb, pIter);
×
2816
      TAOS_RETURN(code);
×
2817
    }
2818
    SNode *pNode = NULL;
42✔
2819
    FOREACH(pNode, pNodeList) {
42✔
2820
      SColumnNode *pCol = (SColumnNode *)pNode;
28✔
2821

2822
      if (pCol->tableId == suid) {
28!
2823
        sdbRelease(pSdb, pTopic);
×
2824
        nodesDestroyNode(pAst);
×
2825
        nodesDestroyList(pNodeList);
×
2826
        sdbCancelFetch(pSdb, pIter);
×
2827
        TAOS_RETURN(-1);
×
2828
      } else {
2829
        goto NEXT;
28✔
2830
      }
2831
    }
2832
  NEXT:
14✔
2833
    sdbRelease(pSdb, pTopic);
42✔
2834
    nodesDestroyNode(pAst);
42✔
2835
    nodesDestroyList(pNodeList);
42✔
2836
  }
2837
  TAOS_RETURN(code);
1,524✔
2838
}
2839

2840
static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) {
1,524✔
2841
  int32_t code = 0;
1,524✔
2842
  SSdb   *pSdb = pMnode->pSdb;
1,524✔
2843
  void   *pIter = NULL;
1,524✔
2844
  while (1) {
146✔
2845
    SStreamObj *pStream = NULL;
1,670✔
2846
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
1,670✔
2847
    if (pIter == NULL) break;
1,670✔
2848

2849
    if (pStream->targetStbUid == suid) {
150!
2850
      sdbCancelFetch(pSdb, pIter);
×
2851
      sdbRelease(pSdb, pStream);
×
2852
      TAOS_RETURN(-1);
4✔
2853
    }
2854

2855
    SNode *pAst = NULL;
150✔
2856
    if (nodesStringToNode(pStream->ast, &pAst) != 0) {
150!
2857
      code = TSDB_CODE_MND_INVALID_STREAM_OPTION;
×
2858
      mError("stream:%s, create ast error", pStream->name);
×
2859
      sdbCancelFetch(pSdb, pIter);
×
2860
      sdbRelease(pSdb, pStream);
×
2861
      TAOS_RETURN(code);
×
2862
    }
2863

2864
    SNodeList *pNodeList = NULL;
150✔
2865
    if ((code = nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList)) !=
150!
2866
        0) {
2867
      sdbCancelFetch(pSdb, pIter);
×
2868
      sdbRelease(pSdb, pStream);
×
2869
      TAOS_RETURN(code);
×
2870
    }
2871
    SNode *pNode = NULL;
150✔
2872
    FOREACH(pNode, pNodeList) {
150!
2873
      SColumnNode *pCol = (SColumnNode *)pNode;
150✔
2874

2875
      if (pCol->tableId == suid) {
150✔
2876
        sdbCancelFetch(pSdb, pIter);
4✔
2877
        sdbRelease(pSdb, pStream);
4✔
2878
        nodesDestroyNode(pAst);
4✔
2879
        nodesDestroyList(pNodeList);
4✔
2880
        TAOS_RETURN(-1);
4✔
2881
      } else {
2882
        goto NEXT;
146✔
2883
      }
2884
    }
2885
  NEXT:
×
2886
    sdbRelease(pSdb, pStream);
146✔
2887
    nodesDestroyNode(pAst);
146✔
2888
    nodesDestroyList(pNodeList);
146✔
2889
  }
2890
  TAOS_RETURN(code);
1,520✔
2891
}
2892

2893
static int32_t mndProcessDropTtltbRsp(SRpcMsg *pRsp) { return 0; }
×
2894
static int32_t mndProcessTrimDbRsp(SRpcMsg *pRsp) { return 0; }
176✔
2895
static int32_t mndProcessS3MigrateDbRsp(SRpcMsg *pRsp) { return 0; }
×
2896

2897
static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
2,260✔
2898
  SMnode      *pMnode = pReq->info.node;
2,260✔
2899
  int32_t      code = -1;
2,260✔
2900
  SDbObj      *pDb = NULL;
2,260✔
2901
  SStbObj     *pStb = NULL;
2,260✔
2902
  SMDropStbReq dropReq = {0};
2,260✔
2903

2904
  TAOS_CHECK_GOTO(tDeserializeSMDropStbReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
2,260!
2905

2906
  mInfo("stb:%s, start to drop", dropReq.name);
2,260!
2907

2908
  pStb = mndAcquireStb(pMnode, dropReq.name);
2,260✔
2909
  if (pStb == NULL) {
2,260✔
2910
    if (dropReq.igNotExists) {
736✔
2911
      mInfo("stb:%s, not exist, ignore not exist is set", dropReq.name);
735!
2912
      code = 0;
735✔
2913
      goto _OVER;
735✔
2914
    } else {
2915
      code = TSDB_CODE_MND_STB_NOT_EXIST;
1✔
2916
      goto _OVER;
1✔
2917
    }
2918
  }
2919

2920
  if ((dropReq.source == TD_REQ_FROM_TAOX_OLD || dropReq.source == TD_REQ_FROM_TAOX) && pStb->uid != dropReq.suid) {
1,524!
2921
    code = 0;
×
2922
    goto _OVER;
×
2923
  }
2924

2925
  pDb = mndAcquireDbByStb(pMnode, dropReq.name);
1,524✔
2926
  if (pDb == NULL) {
1,524!
2927
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
2928
    goto _OVER;
×
2929
  }
2930

2931
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
1,524!
2932
    goto _OVER;
×
2933
  }
2934

2935
  if (mndCheckDropStbForTopic(pMnode, dropReq.name, pStb->uid) < 0) {
1,524!
2936
    code = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
×
2937
    goto _OVER;
×
2938
  }
2939

2940
  if (mndCheckDropStbForStream(pMnode, dropReq.name, pStb->uid) < 0) {
1,524✔
2941
    code = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
4✔
2942
    goto _OVER;
4✔
2943
  }
2944

2945
  code = mndDropStb(pMnode, pReq, pDb, pStb);
1,520✔
2946
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
1,520!
2947

2948
  SName   name = {0};
1,520✔
2949
  int32_t ret = 0;
1,520✔
2950
  if ((ret = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
1,520!
2951
    mError("stb:%s, failed to tNameFromString since %s", dropReq.name, tstrerror(ret));
×
2952

2953
  auditRecord(pReq, pMnode->clusterId, "dropStb", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
1,520✔
2954

2955
_OVER:
2,260✔
2956
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,260✔
2957
    mError("stb:%s, failed to drop since %s", dropReq.name, tstrerror(code));
5!
2958
  }
2959

2960
  mndReleaseDb(pMnode, pDb);
2,260✔
2961
  mndReleaseStb(pMnode, pStb);
2,260✔
2962
  tFreeSMDropStbReq(&dropReq);
2,260✔
2963
  TAOS_RETURN(code);
2,260✔
2964
}
2965

2966
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
917,009✔
2967
  SMnode       *pMnode = pReq->info.node;
917,009✔
2968
  int32_t       code = -1;
917,009✔
2969
  STableInfoReq infoReq = {0};
917,009✔
2970
  STableMetaRsp metaRsp = {0};
917,009✔
2971
  SUserObj     *pUser = NULL;
917,009✔
2972

2973
  code = mndAcquireUser(pMnode, pReq->info.conn.user, &pUser);
917,009✔
2974
  if (pUser == NULL) return 0;
917,020!
2975
  bool sysinfo = pUser->sysInfo;
917,020✔
2976

2977
  TAOS_CHECK_GOTO(tDeserializeSTableInfoReq(pReq->pCont, pReq->contLen, &infoReq), NULL, _OVER);
917,020!
2978

2979
  if (0 == strcmp(infoReq.dbFName, TSDB_INFORMATION_SCHEMA_DB)) {
917,023✔
2980
    mInfo("information_schema table:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
106,194!
2981
    TAOS_CHECK_GOTO(mndBuildInsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, sysinfo, &metaRsp), NULL, _OVER);
106,194✔
2982
  } else if (0 == strcmp(infoReq.dbFName, TSDB_PERFORMANCE_SCHEMA_DB)) {
810,829✔
2983
    mInfo("performance_schema table:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
24,853!
2984
    TAOS_CHECK_GOTO(mndBuildPerfsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp), NULL, _OVER);
24,853✔
2985
  } else {
2986
    mInfo("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
785,976✔
2987
    TAOS_CHECK_GOTO(mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp), NULL, _OVER);
786,001✔
2988
  }
2989

2990
  int32_t rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
914,020✔
2991
  if (rspLen < 0) {
914,005!
2992
    code = TSDB_CODE_INVALID_MSG;
×
2993
    goto _OVER;
×
2994
  }
2995

2996
  void *pRsp = rpcMallocCont(rspLen);
914,005✔
2997
  if (pRsp == NULL) {
913,983!
2998
    code = terrno;
×
2999
    goto _OVER;
×
3000
  }
3001

3002
  if ((rspLen = tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp)) < 0) {
913,983!
3003
    code = rspLen;
×
3004
    goto _OVER;
×
3005
  }
3006
  pReq->info.rsp = pRsp;
913,989✔
3007
  pReq->info.rspLen = rspLen;
913,989✔
3008
  code = 0;
913,989✔
3009

3010
  mTrace("%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName);
913,989✔
3011

3012
_OVER:
912,278✔
3013
  if (code != 0) {
916,999✔
3014
    mError("stb:%s.%s, failed to retrieve meta since %s", infoReq.dbFName, infoReq.tbName, tstrerror(code));
3,010!
3015
  }
3016

3017
  mndReleaseUser(pMnode, pUser);
916,999✔
3018
  tFreeSTableMetaRsp(&metaRsp);
917,028✔
3019
  // TODO change to TAOS_RETURN
3020
  return code;
917,028✔
3021
}
3022

3023
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq) {
68✔
3024
  SMnode      *pMnode = pReq->info.node;
68✔
3025
  int32_t      code = -1;
68✔
3026
  STableCfgReq cfgReq = {0};
68✔
3027
  STableCfgRsp cfgRsp = {0};
68✔
3028

3029
  TAOS_CHECK_GOTO(tDeserializeSTableCfgReq(pReq->pCont, pReq->contLen, &cfgReq), NULL, _OVER);
68!
3030

3031
  char dbName[TSDB_DB_NAME_LEN] = {0};
68✔
3032
  TAOS_CHECK_GOTO(mndExtractShortDbNameFromDbFullName(cfgReq.dbFName, dbName), NULL, _OVER);
68!
3033
  if (0 == strcmp(dbName, TSDB_INFORMATION_SCHEMA_DB)) {
68✔
3034
    mInfo("information_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
20!
3035
    TAOS_CHECK_GOTO(mndBuildInsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp), NULL, _OVER);
20!
3036
  } else if (0 == strcmp(dbName, TSDB_PERFORMANCE_SCHEMA_DB)) {
48✔
3037
    mInfo("performance_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
10!
3038
    TAOS_CHECK_GOTO(mndBuildPerfsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp), NULL, _OVER);
10!
3039
  } else {
3040
    mInfo("stb:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
38!
3041
    TAOS_CHECK_GOTO(mndBuildStbCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp), NULL, _OVER);
38!
3042
  }
3043

3044
  int32_t rspLen = tSerializeSTableCfgRsp(NULL, 0, &cfgRsp);
68✔
3045
  if (rspLen < 0) {
68!
3046
    code = TSDB_CODE_INVALID_MSG;
×
3047
    goto _OVER;
×
3048
  }
3049

3050
  void *pRsp = rpcMallocCont(rspLen);
68✔
3051
  if (pRsp == NULL) {
68!
3052
    code = terrno;
×
3053
    goto _OVER;
×
3054
  }
3055

3056
  if ((rspLen = tSerializeSTableCfgRsp(pRsp, rspLen, &cfgRsp)) < 0) {
68!
3057
    code = rspLen;
×
3058
    goto _OVER;
×
3059
  }
3060
  pReq->info.rsp = pRsp;
68✔
3061
  pReq->info.rspLen = rspLen;
68✔
3062
  code = 0;
68✔
3063

3064
  mTrace("%s.%s, cfg is retrieved", cfgReq.dbFName, cfgReq.tbName);
68✔
3065

3066
_OVER:
63✔
3067
  if (code != 0) {
68!
3068
    mError("stb:%s.%s, failed to retrieve cfg since %s", cfgReq.dbFName, cfgReq.tbName, tstrerror(code));
×
3069
  }
3070

3071
  tFreeSTableCfgRsp(&cfgRsp);
68✔
3072
  TAOS_RETURN(code);
68✔
3073
}
3074

3075
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t numOfStbs, void **ppRsp,
62,711✔
3076
                           int32_t *pRspLen) {
3077
  int32_t   code = 0;
62,711✔
3078
  SSTbHbRsp hbRsp = {0};
62,711✔
3079
  hbRsp.pMetaRsp = taosArrayInit(numOfStbs, sizeof(STableMetaRsp));
62,711✔
3080
  if (hbRsp.pMetaRsp == NULL) {
62,712!
3081
    code = terrno;
×
3082
    TAOS_RETURN(code);
×
3083
  }
3084

3085
  hbRsp.pIndexRsp = taosArrayInit(numOfStbs, sizeof(STableIndexRsp));
62,712✔
3086
  if (NULL == hbRsp.pIndexRsp) {
62,711!
3087
    taosArrayDestroy(hbRsp.pMetaRsp);
×
3088
    code = terrno;
×
3089
    TAOS_RETURN(code);
×
3090
  }
3091

3092
  for (int32_t i = 0; i < numOfStbs; ++i) {
138,404✔
3093
    SSTableVersion *pStbVersion = &pStbVersions[i];
75,692✔
3094
    pStbVersion->suid = be64toh(pStbVersion->suid);
75,692✔
3095
    pStbVersion->sversion = ntohl(pStbVersion->sversion);
75,692✔
3096
    pStbVersion->tversion = ntohl(pStbVersion->tversion);
75,692✔
3097
    pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
75,692✔
3098

3099
    bool    schema = false;
75,692✔
3100
    bool    sma = false;
75,692✔
3101
    int32_t code = mndValidateStbVersion(pMnode, pStbVersion, &schema, &sma);
75,692✔
3102
    if (TSDB_CODE_SUCCESS != code) {
75,693✔
3103
      STableMetaRsp metaRsp = {0};
735✔
3104
      metaRsp.numOfColumns = -1;
735✔
3105
      metaRsp.suid = pStbVersion->suid;
735✔
3106
      tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName));
735✔
3107
      tstrncpy(metaRsp.tbName, pStbVersion->stbName, sizeof(metaRsp.tbName));
735✔
3108
      tstrncpy(metaRsp.stbName, pStbVersion->stbName, sizeof(metaRsp.stbName));
735✔
3109
      if (taosArrayPush(hbRsp.pMetaRsp, &metaRsp) == NULL) {
1,470!
3110
        code = terrno;
×
3111
        return code;
×
3112
      }
3113
      continue;
735✔
3114
    }
3115

3116
    if (schema) {
74,958✔
3117
      STableMetaRsp metaRsp = {0};
358✔
3118
      mInfo("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
358!
3119
      if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp) != 0) {
358!
3120
        metaRsp.numOfColumns = -1;
×
3121
        metaRsp.suid = pStbVersion->suid;
×
3122
        tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName));
×
3123
        tstrncpy(metaRsp.tbName, pStbVersion->stbName, sizeof(metaRsp.tbName));
×
3124
        tstrncpy(metaRsp.stbName, pStbVersion->stbName, sizeof(metaRsp.stbName));
×
3125
        if (taosArrayPush(hbRsp.pMetaRsp, &metaRsp) == NULL) {
×
3126
          code = terrno;
×
3127
          return code;
×
3128
        }
3129
        continue;
×
3130
      }
3131

3132
      if (taosArrayPush(hbRsp.pMetaRsp, &metaRsp) == NULL) {
716!
3133
        code = terrno;
×
3134
        return code;
×
3135
      }
3136
    }
3137

3138
    if (sma) {
74,958!
3139
      bool           exist = false;
×
3140
      char           tbFName[TSDB_TABLE_FNAME_LEN];
3141
      STableIndexRsp indexRsp = {0};
×
3142
      indexRsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
×
3143
      if (NULL == indexRsp.pIndex) {
×
3144
        code = terrno;
×
3145
        TAOS_RETURN(code);
×
3146
      }
3147

3148
      sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName);
×
3149
      int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist);
×
3150
      if (code || !exist) {
×
3151
        indexRsp.suid = pStbVersion->suid;
×
3152
        indexRsp.version = -1;
×
3153
        indexRsp.pIndex = NULL;
×
3154
      }
3155

3156
      strcpy(indexRsp.dbFName, pStbVersion->dbFName);
×
3157
      strcpy(indexRsp.tbName, pStbVersion->stbName);
×
3158

3159
      if (taosArrayPush(hbRsp.pIndexRsp, &indexRsp) == NULL) {
×
3160
        code = terrno;
×
3161
        return code;
×
3162
      }
3163
    }
3164
  }
3165

3166
  int32_t rspLen = tSerializeSSTbHbRsp(NULL, 0, &hbRsp);
62,712✔
3167
  if (rspLen < 0) {
62,711!
3168
    tFreeSSTbHbRsp(&hbRsp);
×
3169
    code = TSDB_CODE_INVALID_MSG;
×
3170
    TAOS_RETURN(code);
×
3171
  }
3172

3173
  void *pRsp = taosMemoryMalloc(rspLen);
62,711✔
3174
  if (pRsp == NULL) {
62,711!
3175
    tFreeSSTbHbRsp(&hbRsp);
×
3176
    code = terrno;
×
3177
    TAOS_RETURN(code);
×
3178
  }
3179

3180
  rspLen = tSerializeSSTbHbRsp(pRsp, rspLen, &hbRsp);
62,711✔
3181
  tFreeSSTbHbRsp(&hbRsp);
62,712✔
3182
  if (rspLen < 0) return rspLen;
62,711!
3183
  *ppRsp = pRsp;
62,711✔
3184
  *pRspLen = rspLen;
62,711✔
3185
  TAOS_RETURN(code);
62,711✔
3186
}
3187

3188
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
10,506✔
3189
  int32_t code = 0;
10,506✔
3190
  SSdb   *pSdb = pMnode->pSdb;
10,506✔
3191
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
10,506✔
3192
  if (pDb == NULL) {
10,506!
3193
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
3194
    TAOS_RETURN(code);
×
3195
  }
3196

3197
  int32_t numOfStbs = 0;
10,506✔
3198
  void   *pIter = NULL;
10,506✔
3199
  while (1) {
288,740✔
3200
    SStbObj *pStb = NULL;
299,246✔
3201
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
299,246✔
3202
    if (pIter == NULL) break;
299,246✔
3203

3204
    if (pStb->dbUid == pDb->uid) {
288,740✔
3205
      numOfStbs++;
140,708✔
3206
    }
3207

3208
    sdbRelease(pSdb, pStb);
288,740✔
3209
  }
3210

3211
  *pNumOfStbs = numOfStbs;
10,506✔
3212
  mndReleaseDb(pMnode, pDb);
10,506✔
3213
  TAOS_RETURN(code);
10,506✔
3214
}
3215

3216
int32_t mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst) {
×
3217
  SName name = {0};
×
3218
  TAOS_CHECK_RETURN(tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
×
3219

3220
  TAOS_CHECK_RETURN(tNameGetFullDbName(&name, dst));
×
3221

3222
  return 0;
×
3223
}
3224

3225
int32_t mndExtractShortDbNameFromStbFullName(const char *stbFullName, char *dst) {
30✔
3226
  SName name = {0};
30✔
3227
  TAOS_CHECK_RETURN(tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
30!
3228

3229
  TAOS_CHECK_RETURN(tNameGetDbName(&name, dst));
30!
3230

3231
  return 0;
30✔
3232
}
3233

3234
int32_t mndExtractShortDbNameFromDbFullName(const char *stbFullName, char *dst) {
68✔
3235
  SName name = {0};
68✔
3236
  TAOS_CHECK_RETURN(tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB));
68!
3237

3238
  TAOS_CHECK_RETURN(tNameGetDbName(&name, dst));
68!
3239

3240
  return 0;
68✔
3241
}
3242

3243
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) {
1,483,221✔
3244
  int32_t pos = -1;
1,483,221✔
3245
  int32_t num = 0;
1,483,221✔
3246
  for (pos = 0; stbFullName[pos] != 0; ++pos) {
25,939,596✔
3247
    if (stbFullName[pos] == TS_PATH_DELIMITER[0]) num++;
25,939,583✔
3248
    if (num == 2) break;
25,939,583✔
3249
  }
3250

3251
  if (num == 2) {
1,483,221✔
3252
    tstrncpy(dst, stbFullName + pos + 1, dstSize);
1,482,987✔
3253
  }
3254
}
1,483,221✔
3255

3256
// static int32_t mndProcessRetrieveStbReq(SRpcMsg *pReq) {
3257
//   SMnode    *pMnode = pReq->info.node;
3258
//   SShowMgmt *pMgmt = &pMnode->showMgmt;
3259
//   SShowObj  *pShow = NULL;
3260
//   int32_t    rowsToRead = SHOW_STEP_SIZE;
3261
//   int32_t    rowsRead = 0;
3262
//
3263
//   SRetrieveTableReq retrieveReq = {0};
3264
//   if (tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
3265
//     terrno = TSDB_CODE_INVALID_MSG;
3266
//     return -1;
3267
//   }
3268
//
3269
//   SMnode    *pMnode = pReq->info.node;
3270
//   SSdb      *pSdb = pMnode->pSdb;
3271
//   int32_t    numOfRows = 0;
3272
//   SDbObj    *pDb = NULL;
3273
//   ESdbStatus objStatus = 0;
3274
//
3275
//   SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
3276
//   if (pUser == NULL) return 0;
3277
//   bool sysinfo = pUser->sysInfo;
3278
//
3279
//   // Append the information_schema database into the result.
3280
////  if (!pShow->sysDbRsp) {
3281
////    SDbObj infoschemaDb = {0};
3282
////    setInformationSchemaDbCfg(pMnode, &infoschemaDb);
3283
////    size_t numOfTables = 0;
3284
////    getVisibleInfosTablesNum(sysinfo, &numOfTables);
3285
////    mndDumpDbInfoData(pMnode, pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
3286
////
3287
////    numOfRows += 1;
3288
////
3289
////    SDbObj perfschemaDb = {0};
3290
////    setPerfSchemaDbCfg(pMnode, &perfschemaDb);
3291
////    numOfTables = 0;
3292
////    getPerfDbMeta(NULL, &numOfTables);
3293
////    mndDumpDbInfoData(pMnode, pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
3294
////
3295
////    numOfRows += 1;
3296
////    pShow->sysDbRsp = true;
3297
////  }
3298
//
3299
//  SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS);
3300
//  blockDataEnsureCapacity(p, rowsToRead);
3301
//
3302
//  size_t               size = 0;
3303
//  const SSysTableMeta* pSysDbTableMeta = NULL;
3304
//
3305
//  getInfosDbMeta(&pSysDbTableMeta, &size);
3306
//  p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);
3307
//
3308
//  getPerfDbMeta(&pSysDbTableMeta, &size);
3309
//  p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
3310
//
3311
//  blockDataDestroy(p);
3312
//
3313
//
3314
//  while (numOfRows < rowsToRead) {
3315
//    pShow->pIter = sdbFetchAll(pSdb, SDB_DB, pShow->pIter, (void **)&pDb, &objStatus, true);
3316
//    if (pShow->pIter == NULL) break;
3317
//    if (strncmp(retrieveReq.db, pDb->name, strlen(retrieveReq.db)) != 0){
3318
//      continue;
3319
//    }
3320
//    if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) {
3321
//      continue;
3322
//    }
3323
//
3324
//    while (numOfRows < rowsToRead) {
3325
//      pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
3326
//      if (pShow->pIter == NULL) break;
3327
//
3328
//      if (pDb != NULL && pStb->dbUid != pDb->uid) {
3329
//        sdbRelease(pSdb, pStb);
3330
//        continue;
3331
//      }
3332
//
3333
//      cols = 0;
3334
//
3335
//      SName name = {0};
3336
//      char  stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
3337
//      mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
3338
//      varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
3339
//
3340
//      SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3341
//      colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
3342
//
3343
//      char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
3344
//      tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
3345
//      tNameGetDbName(&name, varDataVal(db));
3346
//      varDataSetLen(db, strlen(varDataVal(db)));
3347
//
3348
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3349
//      colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
3350
//
3351
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3352
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
3353
//
3354
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3355
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false);
3356
//
3357
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3358
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false);
3359
//
3360
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3361
//      colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->updateTime, false);  // number of tables
3362
//
3363
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3364
//      if (pStb->commentLen > 0) {
3365
//        char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
3366
//        STR_TO_VARSTR(comment, pStb->comment);
3367
//        colDataSetVal(pColInfo, numOfRows, comment, false);
3368
//      } else if (pStb->commentLen == 0) {
3369
//        char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
3370
//        STR_TO_VARSTR(comment, "");
3371
//        colDataSetVal(pColInfo, numOfRows, comment, false);
3372
//      } else {
3373
//        colDataSetNULL(pColInfo, numOfRows);
3374
//      }
3375
//
3376
//      char watermark[64 + VARSTR_HEADER_SIZE] = {0};
3377
//      sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]);
3378
//      varDataSetLen(watermark, strlen(varDataVal(watermark)));
3379
//
3380
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3381
//      colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false);
3382
//
3383
//      char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
3384
//      sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
3385
//      varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
3386
//
3387
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3388
//      colDataSetVal(pColInfo, numOfRows, (const char *)maxDelay, false);
3389
//
3390
//      char    rollup[160 + VARSTR_HEADER_SIZE] = {0};
3391
//      int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
3392
//      char   *sep = ", ";
3393
//      int32_t sepLen = strlen(sep);
3394
//      int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2;
3395
//      for (int32_t i = 0; i < rollupNum; ++i) {
3396
//        char *funcName = taosArrayGet(pStb->pFuncs, i);
3397
//        if (i) {
3398
//          strncat(varDataVal(rollup), sep, rollupLen);
3399
//          rollupLen -= sepLen;
3400
//        }
3401
//        strncat(varDataVal(rollup), funcName, rollupLen);
3402
//        rollupLen -= strlen(funcName);
3403
//      }
3404
//      varDataSetLen(rollup, strlen(varDataVal(rollup)));
3405
//
3406
//      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
3407
//      colDataSetVal(pColInfo, numOfRows, (const char *)rollup, false);
3408
//
3409
//      numOfRows++;
3410
//      sdbRelease(pSdb, pStb);
3411
//    }
3412
//
3413
//    if (pDb != NULL) {
3414
//      mndReleaseDb(pMnode, pDb);
3415
//    }
3416
//
3417
//    sdbRelease(pSdb, pDb);
3418
//  }
3419
//
3420
//  pShow->numOfRows += numOfRows;
3421
//  mndReleaseUser(pMnode, pUser);
3422
//
3423
//
3424
//
3425
//
3426
//
3427
//
3428
//
3429
//
3430
//  ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
3431
//  if (retrieveFp == NULL) {
3432
//    mndReleaseShowObj(pShow, false);
3433
//    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
3434
//    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
3435
//    return -1;
3436
//  }
3437
//
3438
//  mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
3439
//  if (retrieveReq.user[0] != 0) {
3440
//    memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN);
3441
//  } else {
3442
//    memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
3443
//  }
3444
//  if (retrieveReq.db[0] && mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
3445
//    return -1;
3446
//  }
3447
//
3448
//  int32_t numOfCols = pShow->pMeta->numOfColumns;
3449
//
3450
//  SSDataBlock *pBlock = createDataBlock();
3451
//  for (int32_t i = 0; i < numOfCols; ++i) {
3452
//    SColumnInfoData idata = {0};
3453
//
3454
//    SSchema *p = &pShow->pMeta->pSchemas[i];
3455
//
3456
//    idata.info.bytes = p->bytes;
3457
//    idata.info.type = p->type;
3458
//    idata.info.colId = p->colId;
3459
//    blockDataAppendColInfo(pBlock, &idata);
3460
//  }
3461
//
3462
//  blockDataEnsureCapacity(pBlock, rowsToRead);
3463
//
3464
//  if (mndCheckRetrieveFinished(pShow)) {
3465
//    mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
3466
//    rowsRead = 0;
3467
//  } else {
3468
//    rowsRead = (*retrieveFp)(pReq, pShow, pBlock, rowsToRead);
3469
//    if (rowsRead < 0) {
3470
//      terrno = rowsRead;
3471
//      mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
3472
//      mndReleaseShowObj(pShow, true);
3473
//      blockDataDestroy(pBlock);
3474
//      return -1;
3475
//    }
3476
//
3477
//    pBlock->info.rows = rowsRead;
3478
//    mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
3479
//  }
3480
//
3481
//  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
3482
//         blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock));
3483
//
3484
//  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
3485
//  if (pRsp == NULL) {
3486
//    mndReleaseShowObj(pShow, false);
3487
//    terrno = TSDB_CODE_OUT_OF_MEMORY;
3488
//    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
3489
//    blockDataDestroy(pBlock);
3490
//    return -1;
3491
//  }
3492
//
3493
//  pRsp->handle = htobe64(pShow->id);
3494
//
3495
//  if (rowsRead > 0) {
3496
//    char    *pStart = pRsp->data;
3497
//    SSchema *ps = pShow->pMeta->pSchemas;
3498
//
3499
//    *(int32_t *)pStart = htonl(pShow->pMeta->numOfColumns);
3500
//    pStart += sizeof(int32_t);  // number of columns
3501
//
3502
//    for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
3503
//      SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
3504
//      pSchema->bytes = htonl(ps[i].bytes);
3505
//      pSchema->colId = htons(ps[i].colId);
3506
//      pSchema->type = ps[i].type;
3507
//
3508
//      pStart += sizeof(SSysTableSchema);
3509
//    }
3510
//
3511
//    int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
3512
//  }
3513
//
3514
//  pRsp->numOfRows = htonl(rowsRead);
3515
//  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
3516
//  pReq->info.rsp = pRsp;
3517
//  pReq->info.rspLen = size;
3518
//
3519
//  if (rowsRead == 0 || rowsRead < rowsToRead) {
3520
//    pRsp->completed = 1;
3521
//    mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
3522
//    mndReleaseShowObj(pShow, true);
3523
//  } else {
3524
//    mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
3525
//    mndReleaseShowObj(pShow, false);
3526
//  }
3527
//
3528
//  blockDataDestroy(pBlock);
3529
//  return TSDB_CODE_SUCCESS;
3530
//}
3531

3532
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
16,236✔
3533
  SMnode  *pMnode = pReq->info.node;
16,236✔
3534
  SSdb    *pSdb = pMnode->pSdb;
16,236✔
3535
  int32_t  numOfRows = 0;
16,236✔
3536
  SStbObj *pStb = NULL;
16,236✔
3537
  int32_t  cols = 0;
16,236✔
3538
  int32_t  lino = 0;
16,236✔
3539
  int32_t  code = 0;
16,236✔
3540

3541
  SDbObj *pDb = NULL;
16,236✔
3542
  if (strlen(pShow->db) > 0) {
16,236✔
3543
    pDb = mndAcquireDb(pMnode, pShow->db);
953✔
3544
    if (pDb == NULL) return terrno;
953✔
3545
  }
3546

3547
  while (numOfRows < rows) {
834,309✔
3548
    pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
827,042✔
3549
    if (pShow->pIter == NULL) break;
826,614✔
3550

3551
    if (pDb != NULL && pStb->dbUid != pDb->uid) {
817,633✔
3552
      sdbRelease(pSdb, pStb);
2,035✔
3553
      continue;
2,047✔
3554
    }
3555

3556
    if (isTsmaResSTb(pStb->name)) {
815,598✔
3557
      sdbRelease(pSdb, pStb);
12✔
3558
      continue;
12✔
3559
    }
3560

3561
    cols = 0;
815,398✔
3562

3563
    SName name = {0};
815,398✔
3564

3565
    char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
815,398✔
3566
    mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
815,398✔
3567
    varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
815,429✔
3568
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
815,429✔
3569
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false), pStb, &lino, _ERROR);
814,568!
3570

3571
    char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
810,836✔
3572
    RETRIEVE_CHECK_GOTO(tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB), pStb, &lino, _ERROR);
810,836!
3573
    RETRIEVE_CHECK_GOTO(tNameGetDbName(&name, varDataVal(db)), pStb, &lino, _ERROR);
814,399!
3574
    varDataSetLen(db, strlen(varDataVal(db)));
814,695✔
3575
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
814,695✔
3576
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)db, false), pStb, &lino, _ERROR);
813,787!
3577

3578
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
808,466✔
3579
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->createdTime, false), pStb, &lino,
807,250!
3580
                        _ERROR);
3581

3582
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
805,615✔
3583
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false), pStb, &lino,
804,736!
3584
                        _ERROR);
3585

3586
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
804,410✔
3587
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false), pStb, &lino, _ERROR);
803,481!
3588

3589
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
803,164✔
3590
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->updateTime, false), pStb, &lino,
802,798!
3591
                        _ERROR);  // number of tables
3592

3593
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
802,955✔
3594
    if (pStb->commentLen > 0) {
802,456!
3595
      char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
×
3596
      STR_TO_VARSTR(comment, pStb->comment);
×
3597
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, comment, false), pStb, &lino, _ERROR);
×
3598
    } else if (pStb->commentLen == 0) {
803,334✔
3599
      char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
23,784✔
3600
      STR_TO_VARSTR(comment, "");
23,784✔
3601
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, comment, false), pStb, &lino, _ERROR);
23,784!
3602
    } else {
3603
      colDataSetNULL(pColInfo, numOfRows);
779,550!
3604
    }
3605

3606
    char watermark[64 + VARSTR_HEADER_SIZE] = {0};
803,414✔
3607
    sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]);
803,414✔
3608
    varDataSetLen(watermark, strlen(varDataVal(watermark)));
803,414✔
3609

3610
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
803,414✔
3611
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false), pStb, &lino, _ERROR);
812,491!
3612

3613
    char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
806,205✔
3614
    sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
806,205✔
3615
    varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
806,205✔
3616

3617
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
806,205✔
3618
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)maxDelay, false), pStb, &lino, _ERROR);
812,280!
3619

3620
    char    rollup[160 + VARSTR_HEADER_SIZE] = {0};
806,128✔
3621
    int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
806,128✔
3622
    char   *sep = ", ";
809,761✔
3623
    int32_t sepLen = strlen(sep);
809,761✔
3624
    int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2;
809,761✔
3625
    for (int32_t i = 0; i < rollupNum; ++i) {
809,764✔
3626
      char *funcName = taosArrayGet(pStb->pFuncs, i);
3✔
3627
      if (i) {
3!
3628
        (void)strncat(varDataVal(rollup), sep, rollupLen);
×
3629
        rollupLen -= sepLen;
×
3630
      }
3631
      (void)strncat(varDataVal(rollup), funcName, rollupLen);
3✔
3632
      rollupLen -= strlen(funcName);
3✔
3633
    }
3634
    varDataSetLen(rollup, strlen(varDataVal(rollup)));
809,761✔
3635

3636
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
809,761✔
3637
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)rollup, false), pStb, &lino, _ERROR);
810,515!
3638

3639
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
803,915✔
3640
    if (pColInfo) {
802,027!
3641
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)(&pStb->uid), false), pStb, &lino, _ERROR);
802,067!
3642
    }
3643

3644
    numOfRows++;
802,346✔
3645
    sdbRelease(pSdb, pStb);
802,346✔
3646
  }
3647

3648
  if (pDb != NULL) {
16,248✔
3649
    mndReleaseDb(pMnode, pDb);
950✔
3650
  }
3651

3652
  goto _OVER;
16,240✔
3653

3654
_ERROR:
×
3655
  mError("show:0x%" PRIx64 ", failed to retrieve data at %s:%d since %s", pShow->id, __FUNCTION__, lino,
×
3656
         tstrerror(code));
3657

3658
_OVER:
×
3659
  pShow->numOfRows += numOfRows;
16,240✔
3660
  return numOfRows;
16,240✔
3661
}
3662

3663
static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *pSysDbTableMeta, size_t size,
26,649✔
3664
                                    const char *dbName, const char *tbName) {
3665
  char    tName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
26,649✔
3666
  char    dName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
26,649✔
3667
  char    typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
26,649✔
3668
  int32_t numOfRows = p->info.rows;
26,649✔
3669
  int32_t lino = 0;
26,649✔
3670
  int32_t code = 0;
26,649✔
3671

3672
  STR_TO_VARSTR(dName, dbName);
26,649✔
3673
  STR_TO_VARSTR(typeName, "SYSTEM_TABLE");
26,649✔
3674

3675
  for (int32_t i = 0; i < size; ++i) {
543,309!
3676
    const SSysTableMeta *pm = &pSysDbTableMeta[i];
546,251✔
3677
    //    if (pm->sysInfo) {
3678
    //      continue;
3679
    //    }
3680
    if (tbName[0] && strncmp(tbName, pm->name, TSDB_TABLE_NAME_LEN) != 0) {
546,251!
3681
      continue;
×
3682
    }
3683

3684
    STR_TO_VARSTR(tName, pm->name);
546,251✔
3685

3686
    for (int32_t j = 0; j < pm->colNum; j++) {
5,140,745✔
3687
      // table name
3688
      SColumnInfoData *pColInfoData = taosArrayGet(p->pDataBlock, 0);
4,624,085✔
3689
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, tName, false), &lino, _OVER);
4,622,653!
3690

3691
      // database name
3692
      pColInfoData = taosArrayGet(p->pDataBlock, 1);
4,594,816✔
3693
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, dName, false), &lino, _OVER);
4,593,101!
3694

3695
      pColInfoData = taosArrayGet(p->pDataBlock, 2);
4,590,315✔
3696
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, typeName, false), &lino, _OVER);
4,589,069!
3697

3698
      // col name
3699
      char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
4,592,616✔
3700
      STR_TO_VARSTR(colName, pm->schema[j].name);
4,592,616✔
3701
      pColInfoData = taosArrayGet(p->pDataBlock, 3);
4,592,616✔
3702
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, colName, false), &lino, _OVER);
4,620,075!
3703

3704
      // col type
3705
      int8_t colType = pm->schema[j].type;
4,598,853✔
3706
      pColInfoData = taosArrayGet(p->pDataBlock, 4);
4,598,853✔
3707
      char colTypeStr[VARSTR_HEADER_SIZE + 32];
3708
      int  colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
4,597,492✔
3709
      if (colType == TSDB_DATA_TYPE_VARCHAR) {
4,597,492✔
3710
        colTypeLen +=
2,491,321✔
3711
            sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)", (int32_t)(pm->schema[j].bytes - VARSTR_HEADER_SIZE));
2,491,321✔
3712
      } else if (colType == TSDB_DATA_TYPE_NCHAR) {
2,106,171!
3713
        colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
×
3714
                              (int32_t)((pm->schema[j].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
×
3715
      }
3716
      varDataSetLen(colTypeStr, colTypeLen);
4,597,492✔
3717
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, (char *)colTypeStr, false), &lino, _OVER);
4,597,492!
3718

3719
      pColInfoData = taosArrayGet(p->pDataBlock, 5);
4,599,136✔
3720
      TAOS_CHECK_GOTO(colDataSetVal(pColInfoData, numOfRows, (const char *)&pm->schema[j].bytes, false), &lino, _OVER);
4,597,387!
3721
      for (int32_t k = 6; k <= 8; ++k) {
18,338,646✔
3722
        pColInfoData = taosArrayGet(p->pDataBlock, k);
13,744,152✔
3723
        colDataSetNULL(pColInfoData, numOfRows);
13,743,588!
3724
      }
3725

3726
      numOfRows += 1;
4,594,494✔
3727
    }
3728
  }
3729
_OVER:
×
3730
  mError("failed at %s:%d since %s", __FUNCTION__, lino, tstrerror(code));
×
3731
  return numOfRows;
26,650✔
3732
}
3733
#define BUILD_COL_FOR_INFO_DB 1
3734
#define BUILD_COL_FOR_PERF_DB 1 << 1
3735
#define BUILD_COL_FOR_USER_DB 1 << 2
3736
#define BUILD_COL_FOR_ALL_DB  (BUILD_COL_FOR_INFO_DB | BUILD_COL_FOR_PERF_DB | BUILD_COL_FOR_USER_DB)
3737

3738
static int32_t buildSysDbColsInfo(SSDataBlock *p, int8_t buildWhichDBs, char *tb) {
13,331✔
3739
  size_t               size = 0;
13,331✔
3740
  const SSysTableMeta *pSysDbTableMeta = NULL;
13,331✔
3741

3742
  if (buildWhichDBs & BUILD_COL_FOR_INFO_DB) {
13,331✔
3743
    getInfosDbMeta(&pSysDbTableMeta, &size);
13,324✔
3744
    p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB, tb);
13,325✔
3745
  }
3746

3747
  if (buildWhichDBs & BUILD_COL_FOR_PERF_DB) {
13,332✔
3748
    getPerfDbMeta(&pSysDbTableMeta, &size);
13,325✔
3749
    p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB, tb);
13,325✔
3750
  }
3751

3752
  return p->info.rows;
13,332✔
3753
}
3754

3755
static int8_t determineBuildColForWhichDBs(const char *db) {
13,356✔
3756
  int8_t buildWhichDBs;
3757
  if (!db[0])
13,356✔
3758
    buildWhichDBs = BUILD_COL_FOR_ALL_DB;
13,324✔
3759
  else {
3760
    char *p = strchr(db, '.');
32✔
3761
    if (p && strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB) == 0) {
32!
3762
      buildWhichDBs = BUILD_COL_FOR_INFO_DB;
1✔
3763
    } else if (p && strcmp(p + 1, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
31!
3764
      buildWhichDBs = BUILD_COL_FOR_PERF_DB;
1✔
3765
    } else {
3766
      buildWhichDBs = BUILD_COL_FOR_USER_DB;
30✔
3767
    }
3768
  }
3769
  return buildWhichDBs;
13,356✔
3770
}
3771

3772
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
13,356✔
3773
  uint8_t  buildWhichDBs;
3774
  SMnode  *pMnode = pReq->info.node;
13,356✔
3775
  SSdb    *pSdb = pMnode->pSdb;
13,356✔
3776
  SStbObj *pStb = NULL;
13,356✔
3777
  int32_t  numOfRows = 0;
13,356✔
3778
  int32_t  lino = 0;
13,356✔
3779
  int32_t  code = 0;
13,356✔
3780

3781
  buildWhichDBs = determineBuildColForWhichDBs(pShow->db);
13,356✔
3782

3783
  if (!pShow->sysDbRsp) {
13,356✔
3784
    numOfRows = buildSysDbColsInfo(pBlock, buildWhichDBs, pShow->filterTb);
13,332✔
3785
    mDebug("mndRetrieveStbCol get system table cols, rows:%d, db:%s", numOfRows, pShow->db);
13,332✔
3786
    pShow->sysDbRsp = true;
13,332✔
3787
  }
3788

3789
  if (buildWhichDBs & BUILD_COL_FOR_USER_DB) {
13,356✔
3790
    SDbObj *pDb = NULL;
13,354✔
3791
    if (strlen(pShow->db) > 0) {
13,354✔
3792
      pDb = mndAcquireDb(pMnode, pShow->db);
30✔
3793
      if (pDb == NULL && TSDB_CODE_MND_DB_NOT_EXIST != terrno && pBlock->info.rows == 0) return terrno;
30!
3794
    }
3795

3796
    char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
13,354✔
3797
    STR_TO_VARSTR(typeName, "SUPER_TABLE");
13,354✔
3798
    bool fetch = pShow->restore ? false : true;
13,354✔
3799
    pShow->restore = false;
13,354✔
3800
    while (numOfRows < rows) {
680,756✔
3801
      if (fetch) {
680,754✔
3802
        pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
680,730✔
3803
        if (pShow->pIter == NULL) break;
680,731✔
3804
      } else {
3805
        fetch = true;
24✔
3806
        void *pKey = taosHashGetKey(pShow->pIter, NULL);
24✔
3807
        pStb = sdbAcquire(pSdb, SDB_STB, pKey);
24✔
3808
        if (!pStb) continue;
36!
3809
      }
3810

3811
      if (pDb != NULL && pStb->dbUid != pDb->uid) {
667,401✔
3812
        sdbRelease(pSdb, pStb);
12✔
3813
        continue;
12✔
3814
      }
3815

3816
      SName name = {0};
667,389✔
3817
      char  stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
667,389✔
3818
      mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
667,389✔
3819
      if (pShow->filterTb[0] && strncmp(pShow->filterTb, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN) != 0) {
667,363!
3820
        sdbRelease(pSdb, pStb);
×
3821
        continue;
×
3822
      }
3823

3824
      if ((numOfRows + pStb->numOfColumns) > rows) {
667,363✔
3825
        pShow->restore = true;
24✔
3826
        if (numOfRows == 0) {
24!
3827
          mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
×
3828
                 rows, pStb->numOfColumns, pStb->name, pStb->db);
3829
        }
3830
        sdbRelease(pSdb, pStb);
24✔
3831
        break;
24✔
3832
      }
3833

3834
      varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
667,339✔
3835

3836
      mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
667,339✔
3837

3838
      char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
667,339✔
3839
      RETRIEVE_CHECK_GOTO(tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB), pStb, &lino, _OVER);
667,339!
3840
      RETRIEVE_CHECK_GOTO(tNameGetDbName(&name, varDataVal(db)), pStb, &lino, _OVER);
667,289!
3841
      varDataSetLen(db, strlen(varDataVal(db)));
667,374✔
3842

3843
      for (int i = 0; i < pStb->numOfColumns; i++) {
15,583,535✔
3844
        int32_t          cols = 0;
15,330,722✔
3845
        SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,330,722✔
3846
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false), pStb, &lino, _OVER);
15,352,663!
3847

3848
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,069,281✔
3849
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)db, false), pStb, &lino, _OVER);
14,959,506!
3850

3851
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,892,300✔
3852
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, typeName, false), pStb, &lino, _OVER);
14,808,569!
3853

3854
        // col name
3855
        char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
14,890,228✔
3856
        STR_TO_VARSTR(colName, pStb->pColumns[i].name);
14,890,228✔
3857
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,890,228✔
3858
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, colName, false), pStb, &lino, _OVER);
15,256,970!
3859

3860
        // col type
3861
        int8_t colType = pStb->pColumns[i].type;
15,076,153✔
3862
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,076,153✔
3863
        char colTypeStr[VARSTR_HEADER_SIZE + 32];
3864
        int  colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
14,995,115✔
3865
        if (colType == TSDB_DATA_TYPE_VARCHAR) {
14,995,115✔
3866
          colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
1,235,863✔
3867
                                (int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
1,235,863✔
3868
        } else if (colType == TSDB_DATA_TYPE_NCHAR) {
13,759,252✔
3869
          colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
1,248,636✔
3870
                                (int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
1,248,636✔
3871
        }
3872
        varDataSetLen(colTypeStr, colTypeLen);
14,995,115✔
3873
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false), pStb, &lino, _OVER);
14,995,115!
3874

3875
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,239,065✔
3876
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false), pStb,
15,105,633!
3877
                            &lino, _OVER);
3878
        while (cols < pShow->numOfColumns) {
58,959,665✔
3879
          pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
44,043,504✔
3880
          colDataSetNULL(pColInfo, numOfRows);
43,900,339!
3881
        }
3882
        numOfRows++;
14,916,161✔
3883
      }
3884

3885
      sdbRelease(pSdb, pStb);
252,813✔
3886
    }
3887

3888
    if (pDb != NULL) {
13,356✔
3889
      mndReleaseDb(pMnode, pDb);
30✔
3890
    }
3891
  }
3892

3893
  mDebug("mndRetrieveStbCol success, rows:%d, pShow->numOfRows:%d", numOfRows, pShow->numOfRows);
13,356✔
3894
  goto _OVER;
13,356✔
3895

3896
_ERROR:
3897
  mError("failed to mndRetrieveStbCol, rows:%d, pShow->numOfRows:%d, at %s:%d since %s", numOfRows, pShow->numOfRows,
3898
         __FUNCTION__, lino, tstrerror(code));
3899

3900
_OVER:
13,356✔
3901
  pShow->numOfRows += numOfRows;
13,356✔
3902
  return numOfRows;
13,356✔
3903
}
3904

3905
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) {
1✔
3906
  SSdb *pSdb = pMnode->pSdb;
1✔
3907
  sdbCancelFetchByType(pSdb, pIter, SDB_STB);
1✔
3908
}
1✔
3909

3910
const char *mndGetStbStr(const char *src) {
33,110✔
3911
  char *posDb = strstr(src, TS_PATH_DELIMITER);
33,110✔
3912
  if (posDb != NULL) ++posDb;
33,110!
3913
  if (posDb == NULL) return src;
33,110✔
3914

3915
  char *posStb = strstr(posDb, TS_PATH_DELIMITER);
33,109✔
3916
  if (posStb != NULL) ++posStb;
33,109!
3917
  if (posStb == NULL) return posDb;
33,109!
3918
  return posStb;
33,109✔
3919
}
3920

3921
static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
×
3922
  // impl
3923
  return TSDB_CODE_SUCCESS;
×
3924
}
3925

3926
/*int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void *sql,
3927
                        int32_t len) {
3928
  // impl later
3929
  int32_t code = 0;
3930
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-stb-index");
3931
  if (pTrans == NULL) goto _OVER;
3932

3933
  mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
3934
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
3935
  if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
3936

3937
  if (mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
3938
  if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
3939
  if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, sql, len) != 0) goto _OVER;
3940
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
3941

3942
  return code;
3943

3944
_OVER:
3945
  mndTransDrop(pTrans);
3946
  return code;
3947
}
3948
static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *tagIdxReq, SDbObj *pDb, SStbObj *pOld) {
3949
  bool    needRsp = true;
3950
  int32_t code = -1;
3951
  SField *pField0 = NULL;
3952

3953
  SStbObj  stbObj = {0};
3954
  SStbObj *pNew = &stbObj;
3955

3956
  taosRLockLatch(&pOld->lock);
3957
  memcpy(&stbObj, pOld, sizeof(SStbObj));
3958
  taosRUnLockLatch(&pOld->lock);
3959

3960
  stbObj.pColumns = NULL;
3961
  stbObj.pTags = NULL;
3962
  stbObj.updateTime = taosGetTimestampMs();
3963
  stbObj.lock = 0;
3964

3965
  int32_t tag = mndFindSuperTableTagIndex(pOld, tagIdxReq->colName);
3966
  if (tag < 0) {
3967
    terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
3968
    return -1;
3969
  }
3970
  col_id_t colId = pOld->pTags[tag].colId;
3971
  if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) {
3972
    return -1;
3973
  }
3974
  if (mndAllocStbSchemas(pOld, pNew) != 0) {
3975
    return -1;
3976
  }
3977

3978
  SSchema *pTag = pNew->pTags + tag;
3979
  if (IS_IDX_ON(pTag)) {
3980
    terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
3981
    return -1;
3982
  } else {
3983
    pTag->flags |= COL_IDX_ON;
3984
  }
3985
  pNew->tagVer++;
3986

3987
  code = mndAddIndexImpl(pMnode, pReq, pDb, pNew, needRsp, pReq->pCont, pReq->contLen);
3988

3989
  return code;
3990
}
3991
static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq) {
3992
  SMnode            *pMnode = pReq->info.node;
3993
  int32_t            code = -1;
3994
  SDbObj            *pDb = NULL;
3995
  SStbObj           *pStb = NULL;
3996
  SCreateTagIndexReq tagIdxReq = {0};
3997

3998
  if (tDeserializeSCreateTagIdxReq(pReq->pCont, pReq->contLen, &tagIdxReq) != 0) {
3999
    terrno = TSDB_CODE_INVALID_MSG;
4000
    goto _OVER;
4001
  }
4002

4003
  mInfo("stb:%s, start to alter", tagIdxReq.stbName);
4004

4005
  if (mndCheckIndexReq(&tagIdxReq) != TSDB_CODE_SUCCESS) {
4006
    goto _OVER;
4007
  }
4008

4009
  pDb = mndAcquireDbByStb(pMnode, tagIdxReq.dbFName);
4010
  if (pDb == NULL) {
4011
    terrno = TSDB_CODE_MND_DB_NOT_EXIST;
4012
    goto _OVER;
4013
  }
4014

4015
  pStb = mndAcquireStb(pMnode, tagIdxReq.stbName);
4016
  if (pStb == NULL) {
4017
    terrno = TSDB_CODE_MND_STB_NOT_EXIST;
4018
    goto _OVER;
4019
  }
4020
  if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
4021
    goto _OVER;
4022
  }
4023

4024
  code = mndAddIndex(pMnode, pReq, &tagIdxReq, pDb, pStb);
4025
  if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST || terrno == TSDB_CODE_MND_TAG_NOT_EXIST) {
4026
    return terrno;
4027
  } else {
4028
    if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
4029
  }
4030
_OVER:
4031
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
4032
    mError("stb:%s, failed to create index since %s", tagIdxReq.stbName, terrstr());
4033
  }
4034
  mndReleaseStb(pMnode, pStb);
4035
  mndReleaseDb(pMnode, pDb);
4036
  return code;
4037
}
4038
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq) {
4039
  SMnode          *pMnode = pReq->info.node;
4040
  int32_t          code = -1;
4041
  SDbObj          *pDb = NULL;
4042
  SStbObj         *pStb = NULL;
4043
  SDropTagIndexReq dropReq = {0};
4044
  if (tDeserializeSDropTagIdxReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
4045
    terrno = TSDB_CODE_INVALID_MSG;
4046
    goto _OVER;
4047
  }
4048
  //
4049
  return TSDB_CODE_SUCCESS;
4050
_OVER:
4051
  return code;
4052
}*/
4053

4054
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) {
214✔
4055
  int32_t code = mndProcessDropStbReq(pReq);
214✔
4056
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
214!
4057
    pReq->info.rsp = rpcMallocCont(1);
×
4058
    pReq->info.rspLen = 1;
×
4059
    pReq->info.noResp = false;
×
4060
    pReq->code = code;
×
4061
  }
4062
  return code;
214✔
4063
}
4064

4065
typedef struct SVDropTbVgReqs {
4066
  SArray     *pBatchReqs;
4067
  SVgroupInfo info;
4068
} SVDropTbVgReqs;
4069

4070
typedef struct SMDropTbDbInfo {
4071
  SArray *dbVgInfos;
4072
  int32_t hashPrefix;
4073
  int32_t hashSuffix;
4074
  int32_t hashMethod;
4075
} SMDropTbDbInfo;
4076

4077
typedef struct SMDropTbTsmaInfo {
4078
  char           tsmaResTbDbFName[TSDB_DB_FNAME_LEN];
4079
  char           tsmaResTbNamePrefix[TSDB_TABLE_FNAME_LEN];
4080
  int32_t        suid;
4081
  SMDropTbDbInfo dbInfo;  // reference to DbInfo in pDbMap
4082
} SMDropTbTsmaInfo;
4083

4084
typedef struct SMDropTbTsmaInfos {
4085
  SArray *pTsmaInfos;  // SMDropTbTsmaInfo
4086
} SMDropTbTsmaInfos;
4087

4088
typedef struct SMndDropTbsWithTsmaCtx {
4089
  SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>
4090
} SMndDropTbsWithTsmaCtx;
4091

4092
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
4093
                                                 int32_t vgId);
4094

4095
static void destroySVDropTbBatchReqs(void *p);
4096
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
63✔
4097
  if (!p) return;
63!
4098

4099
  if (p->pVgMap) {
63!
4100
    void *pIter = taosHashIterate(p->pVgMap, NULL);
63✔
4101
    while (pIter) {
131✔
4102
      SVDropTbVgReqs *pReqs = pIter;
68✔
4103
      taosArrayDestroyEx(pReqs->pBatchReqs, destroySVDropTbBatchReqs);
68✔
4104
      pIter = taosHashIterate(p->pVgMap, pIter);
68✔
4105
    }
4106
    taosHashCleanup(p->pVgMap);
63✔
4107
  }
4108
  taosMemoryFree(p);
63✔
4109
}
4110

4111
static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx **ppCtx) {
63✔
4112
  int32_t                 code = 0;
63✔
4113
  SMndDropTbsWithTsmaCtx *pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx));
63✔
4114
  if (!pCtx) return terrno;
63!
4115

4116
  pCtx->pVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
63✔
4117
  if (!pCtx->pVgMap) {
63!
4118
    code = terrno;
×
4119
    goto _end;
×
4120
  }
4121

4122
  *ppCtx = pCtx;
63✔
4123
_end:
63✔
4124
  if (code) mndDestroyDropTbsWithTsmaCtx(pCtx);
63!
4125
  return code;
63✔
4126
}
4127

4128
static void *mndBuildVDropTbsReq(SMnode *pMnode, const SVgroupInfo *pVgInfo, const SVDropTbBatchReq *pReq,
127✔
4129
                                 int32_t *len) {
4130
  int32_t   contLen = 0;
127✔
4131
  int32_t   ret = 0;
127✔
4132
  SMsgHead *pHead = NULL;
127✔
4133
  SEncoder  encoder = {0};
127✔
4134

4135
  tEncodeSize(tEncodeSVDropTbBatchReq, pReq, contLen, ret);
127!
4136
  if (ret < 0) return NULL;
127!
4137

4138
  contLen += sizeof(SMsgHead);
127✔
4139
  pHead = taosMemoryMalloc(contLen);
127✔
4140
  if (pHead == NULL) {
127!
4141
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
4142
    return NULL;
×
4143
  }
4144

4145
  pHead->contLen = htonl(contLen);
127✔
4146
  pHead->vgId = htonl(pVgInfo->vgId);
127✔
4147

4148
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
127✔
4149

4150
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
127✔
4151
  int32_t code = tEncodeSVDropTbBatchReq(&encoder, pReq);
127✔
4152
  tEncoderClear(&encoder);
127✔
4153
  if (code != 0) return NULL;
127!
4154

4155
  *len = contLen;
127✔
4156
  return pHead;
127✔
4157
}
4158

4159
static int32_t mndSetDropTbsRedoActions(SMnode *pMnode, STrans *pTrans, const SVDropTbVgReqs *pVgReqs, void *pCont,
127✔
4160
                                        int32_t contLen, tmsg_t msgType) {
4161
  STransAction action = {0};
127✔
4162
  action.epSet = pVgReqs->info.epSet;
127✔
4163
  action.pCont = pCont;
127✔
4164
  action.contLen = contLen;
127✔
4165
  action.msgType = msgType;
127✔
4166
  action.acceptableCode = TSDB_CODE_TDB_TABLE_NOT_EXIST;
127✔
4167
  return mndTransAppendRedoAction(pTrans, &action);
127✔
4168
}
4169

4170
static int32_t mndBuildDropTbRedoActions(SMnode *pMnode, STrans *pTrans, SHashObj *pVgMap, tmsg_t msgType) {
53✔
4171
  int32_t code = 0;
53✔
4172
  void   *pIter = taosHashIterate(pVgMap, NULL);
53✔
4173
  while (pIter) {
111✔
4174
    const SVDropTbVgReqs *pVgReqs = pIter;
58✔
4175
    int32_t               len = 0;
58✔
4176
    for (int32_t i = 0; i < taosArrayGetSize(pVgReqs->pBatchReqs) && code == TSDB_CODE_SUCCESS; ++i) {
185!
4177
      SVDropTbBatchReq *pBatchReq = taosArrayGet(pVgReqs->pBatchReqs, i);
127✔
4178
      void             *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, pBatchReq, &len);
127✔
4179
      if (!p) {
127!
4180
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4181
        if (terrno != 0) code = terrno;
×
4182
        break;
×
4183
      }
4184
      if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len, msgType)) != 0) {
127!
4185
        break;
×
4186
      }
4187
    }
4188
    if (TSDB_CODE_SUCCESS != code) {
58!
4189
      taosHashCancelIterate(pVgMap, pIter);
×
4190
      break;
×
4191
    }
4192
    pIter = taosHashIterate(pVgMap, pIter);
58✔
4193
  }
4194
  return code;
53✔
4195
}
4196

4197
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx *pCtx) {
63✔
4198
  int32_t code = 0;
63✔
4199
  SMnode *pMnode = pRsp->info.node;
63✔
4200
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs");
63✔
4201
  mndTransSetChangeless(pTrans);
63✔
4202
  mndTransSetSerial(pTrans);
63✔
4203
  if (pTrans == NULL) {
63!
4204
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4205
    if (terrno != 0) code = terrno;
×
4206
    goto _OVER;
×
4207
  }
4208

4209
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
63✔
4210

4211
  if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER;
53!
4212
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
53!
4213

4214
_OVER:
53✔
4215
  mndTransDrop(pTrans);
63✔
4216
  TAOS_RETURN(code);
63✔
4217
}
4218

4219
static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
25✔
4220
  int32_t      code = -1;
25✔
4221
  SMnode      *pMnode = pReq->info.node;
25✔
4222
  SDbObj      *pDb = NULL;
25✔
4223
  SStbObj     *pStb = NULL;
25✔
4224
  SMDropTbsReq dropReq = {0};
25✔
4225
  bool         locked = false;
25✔
4226
  if (tDeserializeSMDropTbsReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
25!
4227
    code = TSDB_CODE_INVALID_MSG;
×
4228
    goto _OVER;
×
4229
  }
4230

4231
  SMndDropTbsWithTsmaCtx *pCtx = NULL;
25✔
4232
  code = mndInitDropTbsWithTsmaCtx(&pCtx);
25✔
4233
  if (code) goto _OVER;
25!
4234
  for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
55✔
4235
    SMDropTbReqsOnSingleVg *pReq = taosArrayGet(dropReq.pVgReqs, i);
30✔
4236
    code = mndDropTbForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
30✔
4237
    if (code) goto _OVER;
30!
4238
  }
4239
  code = mndCreateDropTbsTxnPrepare(pReq, pCtx);
25✔
4240
  if (code == 0) {
25!
4241
    code = TSDB_CODE_ACTION_IN_PROGRESS;
25✔
4242
  }
4243
_OVER:
×
4244
  tFreeSMDropTbsReq(&dropReq);
25✔
4245
  if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
25!
4246
  TAOS_RETURN(code);
25✔
4247
}
4248

4249
static int32_t createDropTbBatchReq(const SVDropTbReq *pReq, SVDropTbBatchReq *pBatchReq) {
171✔
4250
  pBatchReq->nReqs = 1;
171✔
4251
  pBatchReq->pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
171✔
4252
  if (!pBatchReq->pArray) return terrno;
171!
4253
  if (taosArrayPush(pBatchReq->pArray, pReq) == NULL) {
342!
4254
    taosArrayDestroy(pBatchReq->pArray);
×
4255
    pBatchReq->pArray = NULL;
×
4256
    return terrno;
×
4257
  }
4258
  return TSDB_CODE_SUCCESS;
171✔
4259
}
4260

4261
static void destroySVDropTbBatchReqs(void *p) {
171✔
4262
  SVDropTbBatchReq *pReq = p;
171✔
4263
  taosArrayDestroy(pReq->pArray);
171✔
4264
  pReq->pArray = NULL;
171✔
4265
}
171✔
4266

4267
static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupInfo *pVgInfo, char *name, tb_uid_t suid,
171✔
4268
                            bool ignoreNotExists) {
4269
  SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists, .uid = 0};
171✔
4270

4271
  SVDropTbVgReqs *pVgReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
171✔
4272
  SVDropTbVgReqs  vgReqs = {0};
171✔
4273
  if (pVgReqs == NULL) {
171✔
4274
    vgReqs.info = *pVgInfo;
68✔
4275
    vgReqs.pBatchReqs = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbBatchReq));
68✔
4276
    if (!vgReqs.pBatchReqs) return terrno;
68!
4277
    SVDropTbBatchReq batchReq = {0};
68✔
4278
    int32_t          code = createDropTbBatchReq(&req, &batchReq);
68✔
4279
    if (TSDB_CODE_SUCCESS != code) return code;
68!
4280
    if (taosArrayPush(vgReqs.pBatchReqs, &batchReq) == NULL) {
136!
4281
      taosArrayDestroy(batchReq.pArray);
×
4282
      return terrno;
×
4283
    }
4284
    if (taosHashPut(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &vgReqs, sizeof(vgReqs)) != 0) {
68!
4285
      taosArrayDestroyEx(vgReqs.pBatchReqs, destroySVDropTbBatchReqs);
×
4286
      return terrno;
×
4287
    }
4288
  } else {
4289
    SVDropTbBatchReq batchReq = {0};
103✔
4290
    int32_t          code = createDropTbBatchReq(&req, &batchReq);
103✔
4291
    if (TSDB_CODE_SUCCESS != code) return code;
103!
4292
    if (taosArrayPush(pVgReqs->pBatchReqs, &batchReq) == NULL) {
206!
4293
      taosArrayDestroy(batchReq.pArray);
×
4294
      return terrno;
×
4295
    }
4296
  }
4297
  return 0;
171✔
4298
}
4299

4300
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
68✔
4301
                                                 int32_t vgId) {
4302
  int32_t code = 0;
68✔
4303

4304
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
68✔
4305
  if (!pVgObj) {
68!
4306
    code = 0;
×
4307
    goto _end;
×
4308
  }
4309
  SVgroupInfo vgInfo = {.hashBegin = pVgObj->hashBegin,
68✔
4310
                        .hashEnd = pVgObj->hashEnd,
68✔
4311
                        .numOfTable = pVgObj->numOfTables,
68✔
4312
                        .vgId = pVgObj->vgId};
68✔
4313
  vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj);
68✔
4314
  mndReleaseVgroup(pMnode, pVgObj);
68✔
4315

4316
  for (int32_t i = 0; i < pTbs->size; ++i) {
239✔
4317
    SVDropTbReq *pTb = taosArrayGet(pTbs, i);
171✔
4318
    TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists), NULL, _end);
171!
4319
  }
4320
_end:
68✔
4321
  return code;
68✔
4322
}
4323

4324
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
93,900✔
4325
  int32_t                 code = -1;
93,900✔
4326
  SDecoder                decoder = {0};
93,900✔
4327
  SMnode                 *pMnode = pRsp->info.node;
93,900✔
4328
  SVFetchTtlExpiredTbsRsp rsp = {0};
93,900✔
4329
  SMndDropTbsWithTsmaCtx *pCtx = NULL;
93,900✔
4330
  if (pRsp->code != TSDB_CODE_SUCCESS) {
93,900✔
4331
    code = pRsp->code;
400✔
4332
    goto _end;
400✔
4333
  }
4334
  if (pRsp->contLen == 0) {
93,500✔
4335
    code = 0;
93,462✔
4336
    goto _end;
93,462✔
4337
  }
4338

4339
  tDecoderInit(&decoder, pRsp->pCont, pRsp->contLen);
38✔
4340
  code = tDecodeVFetchTtlExpiredTbsRsp(&decoder, &rsp);
38✔
4341
  if (code) goto _end;
38!
4342

4343
  code = mndInitDropTbsWithTsmaCtx(&pCtx);
38✔
4344
  if (code) goto _end;
38!
4345

4346
  code = mndDropTbForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
38✔
4347
  if (code) goto _end;
38!
4348
  code = mndCreateDropTbsTxnPrepare(pRsp, pCtx);
38✔
4349
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
38✔
4350
_end:
10✔
4351
  if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
93,900✔
4352
  tDecoderClear(&decoder);
93,900✔
4353
  tFreeFetchTtlExpiredTbsRsp(&rsp);
93,900✔
4354
  TAOS_RETURN(code);
93,900✔
4355
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc