• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4850

14 Nov 2025 08:06AM UTC coverage: 63.728% (-0.1%) from 63.829%
#4850

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

355 of 675 new or added lines in 18 files covered. (52.59%)

634 existing lines in 110 files now uncovered.

149066 of 233910 relevant lines covered (63.73%)

115676883.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.23
/source/dnode/mnode/impl/src/mndRsma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndInfoSchema.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndShow.h"
25
#include "mndSma.h"
26
#include "mndStb.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "parser.h"
31
#include "tname.h"
32

33
#define MND_RSMA_VER_NUMBER   1
34
#define MND_RSMA_RESERVE_SIZE 64
35

36
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pSma);
37
static SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw);
38
static int32_t  mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pSma);
39
static int32_t  mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pSpSmatb);
40
static int32_t  mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew);
41
static int32_t  mndProcessCreateRsmaReq(SRpcMsg *pReq);
42
static int32_t  mndProcessDropRsmaReq(SRpcMsg *pReq);
43
static int32_t  mndProcessAlterRsmaReq(SRpcMsg *pReq);
44
static int32_t  mndProcessGetRsmaReq(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelRetrieveRsma(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveRsmaTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelRetrieveRsmaTask(SMnode *pMnode, void *pIter);
50

51
int32_t mndInitRsma(SMnode *pMnode) {
481,661✔
52
  SSdbTable table = {
481,661✔
53
      .sdbType = SDB_RSMA,
54
      .keyType = SDB_KEY_BINARY,
55
      .encodeFp = (SdbEncodeFp)mndRsmaActionEncode,
56
      .decodeFp = (SdbDecodeFp)mndRsmaActionDecode,
57
      .insertFp = (SdbInsertFp)mndRsmaActionInsert,
58
      .updateFp = (SdbUpdateFp)mndRsmaActionUpdate,
59
      .deleteFp = (SdbDeleteFp)mndRsmaActionDelete,
60
  };
61

62
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_RSMA, mndProcessCreateRsmaReq);
481,661✔
63
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_RSMA_RSP, mndTransProcessRsp);
481,661✔
64
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_RSMA, mndProcessDropRsmaReq);
481,661✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_RSMA_RSP, mndTransProcessRsp);
481,661✔
66
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_RSMA, mndProcessAlterRsmaReq);
481,661✔
67
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_RSMA_RSP, mndTransProcessRsp);
481,661✔
68
  mndSetMsgHandle(pMnode, TDMT_MND_GET_RSMA, mndProcessGetRsmaReq);
481,661✔
69
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndRetrieveRsma);
481,661✔
70
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndCancelRetrieveRsma);
481,661✔
71

72
  return sdbSetTable(pMnode->pSdb, table);
481,661✔
73
}
74

75
void mndCleanupRsma(SMnode *pMnode) {}
480,874✔
76

77
void mndRsmaFreeObj(SRsmaObj *pObj) {
130,204✔
78
  if (pObj) {
130,204✔
79
    taosMemoryFreeClear(pObj->funcColIds);
130,204✔
80
    taosMemoryFreeClear(pObj->funcIds);
130,204✔
81
  }
82
}
130,204✔
83

84
static int32_t tSerializeSRsmaObj(void *buf, int32_t bufLen, const SRsmaObj *pObj) {
199,848✔
85
  int32_t  code = 0, lino = 0;
199,848✔
86
  int32_t  tlen = 0;
199,848✔
87
  SEncoder encoder = {0};
199,848✔
88
  tEncoderInit(&encoder, buf, bufLen);
199,848✔
89

90
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
199,848✔
91

92
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->name));
399,696✔
93
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->tbName));
399,696✔
94
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbFName));
399,696✔
95
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->createUser));
399,696✔
96
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->createdTime));
399,696✔
97
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->updateTime));
399,696✔
98
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->uid));
399,696✔
99
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->tbUid));
399,696✔
100
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
399,696✔
101
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[0]));
399,696✔
102
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[1]));
399,696✔
103
  TAOS_CHECK_EXIT(tEncodeU64v(&encoder, pObj->reserved));
399,696✔
104
  TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->version));
399,696✔
105
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->tbType));
399,696✔
106
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->intervalUnit));
399,696✔
107
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nFuncs));
399,696✔
108
  for (int16_t i = 0; i < pObj->nFuncs; ++i) {
1,317,180✔
109
    TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->funcColIds[i]));
2,234,664✔
110
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->funcIds[i]));
2,234,664✔
111
  }
112

113
  tEndEncode(&encoder);
199,848✔
114

115
  tlen = encoder.pos;
199,848✔
116
_exit:
199,848✔
117
  tEncoderClear(&encoder);
199,848✔
118
  if (code < 0) {
199,848✔
119
    mError("rsma, %s failed at line %d since %s", __func__, lino, tstrerror(code));
×
120
    TAOS_RETURN(code);
×
121
  }
122

123
  return tlen;
199,848✔
124
}
125

126
static int32_t tDeserializeSRsmaObj(void *buf, int32_t bufLen, SRsmaObj *pObj) {
95,382✔
127
  int32_t  code = 0, lino = 0;
95,382✔
128
  SDecoder decoder = {0};
95,382✔
129
  tDecoderInit(&decoder, buf, bufLen);
95,382✔
130

131
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
95,382✔
132

133
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->name));
95,382✔
134
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->tbName));
95,382✔
135
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbFName));
95,382✔
136
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->createUser));
95,382✔
137
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->createdTime));
190,764✔
138
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->updateTime));
190,764✔
139
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->uid));
190,764✔
140
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->tbUid));
190,764✔
141
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
190,764✔
142
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[0]));
190,764✔
143
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[1]));
190,764✔
144
  TAOS_CHECK_EXIT(tDecodeU64v(&decoder, &pObj->reserved));
190,764✔
145
  TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->version));
190,764✔
146
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->tbType));
190,764✔
147
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->intervalUnit));
190,764✔
148
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nFuncs));
190,764✔
149
  if (pObj->nFuncs > 0) {
95,382✔
150
    if (!(pObj->funcColIds = taosMemoryMalloc(sizeof(col_id_t) * pObj->nFuncs))) {
95,382✔
151
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
152
    }
153
    if (!(pObj->funcIds = taosMemoryMalloc(sizeof(int32_t) * pObj->nFuncs))) {
95,382✔
154
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
155
    }
156
    for (int16_t i = 0; i < pObj->nFuncs; ++i) {
635,880✔
157
      TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->funcColIds[i]));
1,080,996✔
158
      TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->funcIds[i]));
1,080,996✔
159
    }
160
  }
161

162
_exit:
95,382✔
163
  tEndDecode(&decoder);
95,382✔
164
  tDecoderClear(&decoder);
95,382✔
165
  if (code < 0) {
95,382✔
166
    mError("rsma, %s failed at line %d since %s, row:%p", __func__, lino, tstrerror(code), pObj);
×
167
  }
168
  TAOS_RETURN(code);
95,382✔
169
}
170

171
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pObj) {
99,924✔
172
  int32_t  code = 0, lino = 0;
99,924✔
173
  void    *buf = NULL;
99,924✔
174
  SSdbRaw *pRaw = NULL;
99,924✔
175
  int32_t  tlen = tSerializeSRsmaObj(NULL, 0, pObj);
99,924✔
176
  if (tlen < 0) {
99,924✔
177
    TAOS_CHECK_EXIT(tlen);
×
178
  }
179

180
  int32_t size = sizeof(int32_t) + tlen;
99,924✔
181
  pRaw = sdbAllocRaw(SDB_RSMA, MND_RSMA_VER_NUMBER, size);
99,924✔
182
  if (pRaw == NULL) {
99,924✔
183
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
184
  }
185

186
  buf = taosMemoryMalloc(tlen);
99,924✔
187
  if (buf == NULL) {
99,924✔
188
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
189
  }
190

191
  tlen = tSerializeSRsmaObj(buf, tlen, pObj);
99,924✔
192
  if (tlen < 0) {
99,924✔
193
    TAOS_CHECK_EXIT(tlen);
×
194
  }
195

196
  int32_t dataPos = 0;
99,924✔
197
  SDB_SET_INT32(pRaw, dataPos, tlen, _exit);
99,924✔
198
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _exit);
99,924✔
199
  SDB_SET_DATALEN(pRaw, dataPos, _exit);
99,924✔
200

201
_exit:
99,924✔
202
  taosMemoryFreeClear(buf);
99,924✔
203
  if (code != TSDB_CODE_SUCCESS) {
99,924✔
204
    terrno = code;
×
205
    mError("rsma, failed at line %d to encode to raw:%p since %s", lino, pRaw, tstrerror(code));
×
206
    sdbFreeRaw(pRaw);
×
207
    return NULL;
×
208
  }
209

210
  mTrace("rsma, encode to raw:%p, row:%p", pRaw, pObj);
99,924✔
211
  return pRaw;
99,924✔
212
}
213

214
SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw) {
95,382✔
215
  int32_t   code = 0, lino = 0;
95,382✔
216
  SSdbRow  *pRow = NULL;
95,382✔
217
  SRsmaObj *pObj = NULL;
95,382✔
218
  void     *buf = NULL;
95,382✔
219

220
  int8_t sver = 0;
95,382✔
221
  TAOS_CHECK_EXIT(sdbGetRawSoftVer(pRaw, &sver));
95,382✔
222

223
  if (sver != MND_RSMA_VER_NUMBER) {
95,382✔
224
    mError("rsma read invalid ver, data ver: %d, curr ver: %d", sver, MND_RSMA_VER_NUMBER);
×
NEW
225
    TAOS_CHECK_EXIT(TSDB_CODE_SDB_INVALID_DATA_VER);
×
226
  }
227

228
  if (!(pRow = sdbAllocRow(sizeof(SRsmaObj)))) {
95,382✔
NEW
229
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
230
  }
231

232
  if (!(pObj = sdbGetRowObj(pRow))) {
95,382✔
NEW
233
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
234
  }
235

236
  int32_t tlen;
95,382✔
237
  int32_t dataPos = 0;
95,382✔
238
  SDB_GET_INT32(pRaw, dataPos, &tlen, _exit);
95,382✔
239
  buf = taosMemoryMalloc(tlen + 1);
95,382✔
240
  if (buf == NULL) {
95,382✔
NEW
241
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
242
  }
243
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _exit);
95,382✔
244

245
  TAOS_CHECK_EXIT(tDeserializeSRsmaObj(buf, tlen, pObj));
95,382✔
246

247
  taosInitRWLatch(&pObj->lock);
95,382✔
248

249
_exit:
95,382✔
250
  taosMemoryFreeClear(buf);
95,382✔
251
  if (code != TSDB_CODE_SUCCESS) {
95,382✔
252
    terrno = code;
×
253
    mError("rsma, failed at line %d to decode from raw:%p since %s", lino, pRaw, tstrerror(code));
×
254
    mndRsmaFreeObj(pObj);
×
255
    taosMemoryFreeClear(pRow);
×
256
    return NULL;
×
257
  }
258
  mTrace("rsma, decode from raw:%p, row:%p", pRaw, pObj);
95,382✔
259
  return pRow;
95,382✔
260
}
261

262
static int32_t mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pObj) {
22,710✔
263
  mTrace("rsma:%s, perform insert action, row:%p", pObj->name, pObj);
22,710✔
264
  return 0;
22,710✔
265
}
266

267
static int32_t mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pObj) {
95,382✔
268
  mTrace("rsma:%s, perform delete action, row:%p", pObj->name, pObj);
95,382✔
269
  mndRsmaFreeObj(pObj);
95,382✔
270
  return 0;
95,382✔
271
}
272

273
static int32_t mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew) {
51,476✔
274
  mTrace("rsma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
51,476✔
275
  taosWLockLatch(&pOld->lock);
51,476✔
276
  pOld->updateTime = pNew->updateTime;
51,476✔
277
  pOld->nFuncs = pNew->nFuncs;
51,476✔
278
  TSWAP(pOld->funcColIds, pNew->funcColIds);
51,476✔
279
  TSWAP(pOld->funcIds, pNew->funcIds);
51,476✔
280
  taosWUnLockLatch(&pOld->lock);
51,476✔
281
  return 0;
51,476✔
282
}
283

284
SRsmaObj *mndAcquireRsma(SMnode *pMnode, char *name) {
99,924✔
285
  SSdb     *pSdb = pMnode->pSdb;
99,924✔
286
  SRsmaObj *pObj = sdbAcquire(pSdb, SDB_RSMA, name);
99,924✔
287
  if (pObj == NULL) {
99,924✔
288
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
40,878✔
289
      terrno = TSDB_CODE_RSMA_NOT_EXIST;
40,878✔
290
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
291
      terrno = TSDB_CODE_MND_RSMA_IN_CREATING;
×
292
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
293
      terrno = TSDB_CODE_MND_RSMA_IN_DROPPING;
×
294
    } else {
295
      terrno = TSDB_CODE_APP_ERROR;
×
296
      mFatal("rsma:%s, failed to acquire rsma since %s", name, terrstr());
×
297
    }
298
  }
299
  return pObj;
99,924✔
300
}
301

302
void mndReleaseRsma(SMnode *pMnode, SRsmaObj *pSma) {
59,046✔
303
  SSdb *pSdb = pMnode->pSdb;
59,046✔
304
  sdbRelease(pSdb, pSma);
59,046✔
305
}
59,046✔
306
#ifdef TD_ENTERPRISE
307
static int32_t mndSetCreateRsmaRedoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
308
  int32_t  code = 0;
×
309
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
×
310
  if (pRedoRaw == NULL) {
×
311
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
312
    if (terrno != 0) code = terrno;
×
313
    TAOS_RETURN(code);
×
314
  }
315
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
316
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
317

318
  TAOS_RETURN(code);
×
319
}
320

321
static int32_t mndSetCreateRsmaUndoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
322
  int32_t  code = 0;
×
323
  SSdbRaw *pUndoRaw = mndRsmaActionEncode(pSma);
×
324
  if (!pUndoRaw) {
×
325
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
326
    if (terrno != 0) code = terrno;
×
327
    TAOS_RETURN(code);
×
328
  }
329
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
330
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
331
  TAOS_RETURN(code);
×
332
}
333

334
static int32_t mndSetCreateRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
22,710✔
335
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
22,710✔
336
  if (pDbRaw == NULL) return -1;
22,710✔
337

338
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
22,710✔
339
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
22,710✔
340
  return 0;
22,710✔
341
}
342

343
static void *mndBuildVCreateRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
45,420✔
344
                                    SMCreateRsmaReq *pCreate, int32_t *pContLen) {
345
  int32_t         code = 0, lino = 0;
45,420✔
346
  SMsgHead       *pHead = NULL;
45,420✔
347
  SVCreateRsmaReq req = *pCreate;
45,420✔
348

349
  req.uid = pObj->uid;  // use the uid generated by mnode
45,420✔
350

351
  int32_t contLen = tSerializeSVCreateRsmaReq(NULL, 0, &req);
45,420✔
352
  TAOS_CHECK_EXIT(contLen);
45,420✔
353
  contLen += sizeof(SMsgHead);
45,420✔
354
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
45,420✔
355
  pHead->contLen = htonl(contLen);
45,420✔
356
  pHead->vgId = htonl(pVgroup->vgId);
45,420✔
357
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
45,420✔
358
  TAOS_CHECK_EXIT(tSerializeSVCreateRsmaReq(pBuf, contLen, &req));
45,420✔
359
_exit:
45,420✔
360
  if (code < 0) {
45,420✔
361
    taosMemoryFreeClear(pHead);
×
362
    terrno = code;
×
363
    *pContLen = 0;
×
364
    return NULL;
×
365
  }
366
  *pContLen = contLen;
45,420✔
367
  return pHead;
45,420✔
368
}
369

370
static int32_t mndSetCreateRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
22,710✔
371
                                           SMCreateRsmaReq *pCreate) {
372
  int32_t code = 0;
22,710✔
373
  SSdb   *pSdb = pMnode->pSdb;
22,710✔
374
  SVgObj *pVgroup = NULL;
22,710✔
375
  void   *pIter = NULL;
22,710✔
376

377
  SName name = {0};
22,710✔
378
  if ((code = tNameFromString(&name, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
22,710✔
379
    return code;
×
380
  }
381
  tstrncpy(pCreate->tbFName, (char *)tNameGetTableName(&name), sizeof(pCreate->tbFName));  // convert tbFName to tbName
22,710✔
382

383
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
113,550✔
384
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
90,840✔
385
      sdbRelease(pSdb, pVgroup);
45,420✔
386
      continue;
45,420✔
387
    }
388

389
    int32_t contLen = 0;
45,420✔
390
    void   *pReq = mndBuildVCreateRsmaReq(pMnode, pVgroup, pStb, pObj, pCreate, &contLen);
45,420✔
391
    if (pReq == NULL) {
45,420✔
392
      sdbCancelFetch(pSdb, pIter);
×
393
      sdbRelease(pSdb, pVgroup);
×
394
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
395
      TAOS_RETURN(code);
×
396
    }
397

398
    STransAction action = {0};
45,420✔
399
    action.mTraceId = pTrans->mTraceId;
45,420✔
400
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
45,420✔
401
    action.pCont = pReq;
45,420✔
402
    action.contLen = contLen;
45,420✔
403
    action.msgType = TDMT_VND_CREATE_RSMA;
45,420✔
404
    action.acceptableCode = TSDB_CODE_RSMA_ALREADY_EXISTS;  // check whether the rsma uid exist
45,420✔
405
    action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;         // retry if relative table not exist
45,420✔
406
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
45,420✔
407
      taosMemoryFree(pReq);
×
408
      sdbCancelFetch(pSdb, pIter);
×
409
      sdbRelease(pSdb, pVgroup);
×
410
      TAOS_RETURN(code);
×
411
    }
412
    sdbRelease(pSdb, pVgroup);
45,420✔
413
  }
414

415
  TAOS_RETURN(code);
22,710✔
416
}
417

418
static int32_t mndSetCreateRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
34,822✔
419
  int32_t  code = 0;
34,822✔
420
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
34,822✔
421
  if (pCommitRaw == NULL) {
34,822✔
422
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
423
    if (terrno != 0) code = terrno;
×
424
    TAOS_RETURN(code);
×
425
  }
426
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
34,822✔
427
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
34,822✔
428

429
  TAOS_RETURN(code);
34,822✔
430
}
431

432
static int32_t mndSetDropRsmaPrepareLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
4,542✔
433
  int32_t  code = 0;
4,542✔
434
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
4,542✔
435
  if (pRedoRaw == NULL) {
4,542✔
436
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
437
    if (terrno != 0) code = terrno;
×
438
    return -1;
×
439
  }
440
  TAOS_CHECK_RETURN(mndTransAppendPrepareLog(pTrans, pRedoRaw));
4,542✔
441
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
4,542✔
442

443
  return 0;
4,542✔
444
}
445

446
static int32_t mndSetDropRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
21,196✔
447
  int32_t  code = 0;
21,196✔
448
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
21,196✔
449
  if (pCommitRaw == NULL) {
21,196✔
450
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
451
    if (terrno != 0) code = terrno;
×
452
    return -1;
×
453
  }
454
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
21,196✔
455
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
21,196✔
456

457
  return 0;
21,196✔
458
}
459

460
static void *mndBuildVDropRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SRsmaObj *pObj, int32_t *pContLen) {
9,084✔
461
  int32_t       code = 0, lino = 0;
9,084✔
462
  SMsgHead     *pHead = NULL;
9,084✔
463
  SVDropRsmaReq req = {0};
9,084✔
464

465
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
9,084✔
466
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
9,084✔
467
  req.tbType = pObj->tbType;
9,084✔
468
  req.uid = pObj->uid;
9,084✔
469
  req.tbUid = pObj->tbUid;
9,084✔
470

471
  int32_t contLen = tSerializeSVDropRsmaReq(NULL, 0, &req);
9,084✔
472
  TAOS_CHECK_EXIT(contLen);
9,084✔
473
  contLen += sizeof(SMsgHead);
9,084✔
474
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
9,084✔
475
  pHead->contLen = htonl(contLen);
9,084✔
476
  pHead->vgId = htonl(pVgroup->vgId);
9,084✔
477
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
9,084✔
478
  TAOS_CHECK_EXIT(tSerializeSVDropRsmaReq(pBuf, contLen, &req));
9,084✔
479
_exit:
9,084✔
480
  if (code < 0) {
9,084✔
481
    taosMemoryFreeClear(pHead);
×
482
    terrno = code;
×
483
    *pContLen = 0;
×
484
    return NULL;
×
485
  }
486
  *pContLen = contLen;
9,084✔
487
  return pHead;
9,084✔
488
}
489

490
static int32_t mndSetDropRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SRsmaObj *pSma) {
4,542✔
491
  int32_t code = 0;
4,542✔
492
  SSdb   *pSdb = pMnode->pSdb;
4,542✔
493
  SVgObj *pVgroup = NULL;
4,542✔
494
  void   *pIter = NULL;
4,542✔
495

496
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
22,710✔
497
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
18,168✔
498
      sdbRelease(pSdb, pVgroup);
9,084✔
499
      continue;
9,084✔
500
    }
501

502
    int32_t contLen = 0;
9,084✔
503
    void   *pReq = mndBuildVDropRsmaReq(pMnode, pVgroup, pSma, &contLen);
9,084✔
504
    if (pReq == NULL) {
9,084✔
505
      sdbCancelFetch(pSdb, pIter);
×
506
      sdbRelease(pSdb, pVgroup);
×
507
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
508
      TAOS_RETURN(code);
×
509
    }
510

511
    STransAction action = {0};
9,084✔
512
    action.mTraceId = pTrans->mTraceId;
9,084✔
513
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
9,084✔
514
    action.pCont = pReq;
9,084✔
515
    action.contLen = contLen;
9,084✔
516
    action.msgType = TDMT_VND_DROP_RSMA;
9,084✔
517
    action.acceptableCode = TSDB_CODE_RSMA_NOT_EXIST;
9,084✔
518
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
9,084✔
519
      taosMemoryFree(pReq);
×
520
      sdbCancelFetch(pSdb, pIter);
×
521
      sdbRelease(pSdb, pVgroup);
×
522
      TAOS_RETURN(code);
×
523
    }
524
    sdbRelease(pSdb, pVgroup);
9,084✔
525
  }
526
  TAOS_RETURN(code);
4,542✔
527
}
528

529
static int32_t mndDropRsma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SRsmaObj *pObj) {
4,542✔
530
  int32_t code = 0, lino = 0;
4,542✔
531

532
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-rsma");
4,542✔
533
  if (pTrans == NULL) {
4,542✔
534
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
535
    if (terrno != 0) code = terrno;
×
536
    goto _exit;
×
537
  }
538

539
  mInfo("trans:%d start to drop rsma:%s", pTrans->id, pObj->name);
4,542✔
540

541
  mndTransSetDbName(pTrans, pDb->name, pObj->name);
4,542✔
542
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
4,542✔
543
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
4,542✔
544

545
  mndTransSetOper(pTrans, MND_OPER_DROP_RSMA);
4,542✔
546
  TAOS_CHECK_EXIT(mndSetDropRsmaPrepareLogs(pMnode, pTrans, pObj));
4,542✔
547
  TAOS_CHECK_EXIT(mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj));
4,542✔
548
  TAOS_CHECK_EXIT(mndSetDropRsmaRedoActions(pMnode, pTrans, pDb, pObj));
4,542✔
549

550
  // int32_t rspLen = 0;
551
  // void   *pRsp = NULL;
552
  // TAOS_CHECK_EXIT(mndBuildDropRsmaRsp(pObj, &rspLen, &pRsp, false));
553
  // mndTransSetRpcRsp(pTrans, pRsp, rspLen);
554

555
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
4,542✔
556
_exit:
4,542✔
557
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
4,542✔
558
    mError("rsma:%s, failed to drop at line:%d since %s", pObj->name, lino, tstrerror(code));
×
559
  }
560
  mndTransDrop(pTrans);
4,542✔
561
  TAOS_RETURN(code);
4,542✔
562
}
563
#endif
564
static int32_t mndProcessDropRsmaReq(SRpcMsg *pReq) {
4,542✔
565
  SMnode *pMnode = pReq->info.node;
4,542✔
566
  int32_t code = 0, lino = 0;
4,542✔
567
#ifdef TD_ENTERPRISE
568
  SDbObj       *pDb = NULL;
4,542✔
569
  SRsmaObj     *pObj = NULL;
4,542✔
570
  SMDropRsmaReq dropReq = {0};
4,542✔
571

572
  TAOS_CHECK_GOTO(tDeserializeSMDropRsmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _exit);
4,542✔
573

574
  mInfo("rsma:%s, start to drop", dropReq.name);
4,542✔
575

576
  pObj = mndAcquireRsma(pMnode, dropReq.name);
4,542✔
577
  if (pObj == NULL) {
4,542✔
578
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
579
    if (terrno != 0) code = terrno;
×
580
    if (dropReq.igNotExists) {
×
581
      code = 0;  // mndBuildDropMountRsp(pObj, &pReq->info.rspLen, &pReq->info.rsp, true);
×
582
    }
583
    goto _exit;
×
584
  }
585

586
  SName name = {0};
4,542✔
587
  TAOS_CHECK_EXIT(tNameFromString(&name, pObj->dbFName, T_NAME_ACCT | T_NAME_DB));
4,542✔
588

589
  char db[TSDB_TABLE_FNAME_LEN] = {0};
4,542✔
590
  (void)tNameGetFullDbName(&name, db);
4,542✔
591
  if (!(pDb = mndAcquireDb(pMnode, db))) {
4,542✔
592
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
593
  }
594

595
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _exit);
4,542✔
596

597
  code = mndDropRsma(pMnode, pReq, pDb, pObj);
4,542✔
598
  if (code == TSDB_CODE_SUCCESS) {
4,542✔
599
    code = TSDB_CODE_ACTION_IN_PROGRESS;
4,542✔
600
  }
601

602
  auditRecord(pReq, pMnode->clusterId, "dropRsma", dropReq.name, "", "", 0);
4,542✔
603
_exit:
4,542✔
604
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
4,542✔
605
    mError("rsma:%s, failed at line %d to drop since %s", dropReq.name, lino, tstrerror(code));
×
606
  }
607

608
  mndReleaseDb(pMnode, pDb);
4,542✔
609
  mndReleaseRsma(pMnode, pObj);
4,542✔
610
#endif
611
  TAOS_RETURN(code);
4,542✔
612
}
613
#ifdef TD_ENTERPRISE
614
static int32_t mndCreateRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
22,710✔
615
                             SMCreateRsmaReq *pCreate) {
616
  int32_t  code = 0, lino = 0;
22,710✔
617
  SRsmaObj obj = {0};
22,710✔
618
  STrans  *pTrans = NULL;
22,710✔
619

620
  (void)snprintf(obj.name, TSDB_TABLE_NAME_LEN, "%s", pCreate->name);
22,710✔
621
  (void)snprintf(obj.dbFName, TSDB_DB_FNAME_LEN, "%s", pDb->name);
22,710✔
622

623
  const char *tbName = strrchr(pCreate->tbFName, '.');
22,710✔
624
  (void)snprintf(obj.tbName, TSDB_TABLE_NAME_LEN, "%s", tbName ? tbName + 1 : pCreate->tbFName);
22,710✔
625
  (void)snprintf(obj.createUser, TSDB_USER_LEN, "%s", pUser->user);
22,710✔
626
  obj.createdTime = taosGetTimestampMs();
22,710✔
627
  obj.updateTime = obj.createdTime;
22,710✔
628
  obj.uid = mndGenerateUid(obj.name, strlen(obj.name));
22,710✔
629
  obj.tbUid = pCreate->tbUid;
22,710✔
630
  obj.dbUid = pDb->uid;
22,710✔
631
  obj.interval[0] = pCreate->interval[0];
22,710✔
632
  obj.interval[1] = pCreate->interval[1];
22,710✔
633
  obj.version = 1;
22,710✔
634
  obj.tbType = pCreate->tbType;  // ETableType: 1 stable. Only super table supported currently.
22,710✔
635
  obj.intervalUnit = pCreate->intervalUnit;
22,710✔
636
  obj.nFuncs = pCreate->nFuncs;
22,710✔
637
  if (obj.nFuncs > 0) {
22,710✔
638
    TSDB_CHECK_NULL((obj.funcColIds = taosMemoryCalloc(obj.nFuncs, sizeof(col_id_t))), code, lino, _exit, terrno);
22,710✔
639
    TSDB_CHECK_NULL((obj.funcIds = taosMemoryCalloc(obj.nFuncs, sizeof(func_id_t))), code, lino, _exit, terrno);
22,710✔
640
    for (int16_t i = 0; i < obj.nFuncs; ++i) {
148,372✔
641
      obj.funcColIds[i] = pCreate->funcColIds[i];
125,662✔
642
      obj.funcIds[i] = pCreate->funcIds[i];
125,662✔
643
    }
644
  }
645

646
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-rsma")),
22,710✔
647
                  code, lino, _exit, terrno);
648
  mInfo("trans:%d, used to create rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
22,710✔
649

650
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
22,710✔
651
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
22,710✔
652
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
22,710✔
653

654
  mndTransSetOper(pTrans, MND_OPER_CREATE_RSMA);
22,710✔
655
  TAOS_CHECK_EXIT(mndSetCreateRsmaPrepareActions(pMnode, pTrans, &obj));
22,710✔
656
  TAOS_CHECK_EXIT(mndSetCreateRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pCreate));
22,710✔
657
  TAOS_CHECK_EXIT(mndSetCreateRsmaCommitLogs(pMnode, pTrans, &obj));
22,710✔
658
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
22,710✔
659
_exit:
22,710✔
660
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
22,710✔
661
    mError("rsma:%s, failed at line %d to create rsma, since %s", obj.name, lino, tstrerror(code));
×
662
  }
663
  mndTransDrop(pTrans);
22,710✔
664
  mndRsmaFreeObj(&obj);
22,710✔
665
  TAOS_RETURN(code);
22,710✔
666
}
667

668
static int32_t mndCheckCreateRsmaReq(SMCreateRsmaReq *pCreate) {
37,850✔
669
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
37,850✔
670
  if (pCreate->name[0] == 0) goto _exit;
37,850✔
671
  if (pCreate->tbFName[0] == 0) goto _exit;
37,850✔
672
  if (pCreate->igExists < 0 || pCreate->igExists > 1) goto _exit;
37,850✔
673
  if (pCreate->intervalUnit < 0) goto _exit;
37,850✔
674
  if (pCreate->interval[0] < 0) goto _exit;
37,850✔
675
  if (pCreate->interval[1] < 0) goto _exit;
37,850✔
676
  if (pCreate->interval[0] == 0 && pCreate->interval[1] == 0) goto _exit;
37,850✔
677

678
  SName fname = {0};
37,850✔
679
  if ((code = tNameFromString(&fname, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) < 0) goto _exit;
37,850✔
680
  if (*(char *)tNameGetTableName(&fname) == 0) goto _exit;
37,850✔
681
  code = 0;
37,850✔
682
_exit:
37,850✔
683
  TAOS_RETURN(code);
37,850✔
684
}
685

686
static int32_t mndCheckRsmaConflicts(SMnode *pMnode, SDbObj *pDbObj, SMCreateRsmaReq *pCreate) {
30,280✔
687
  void     *pIter = NULL;
30,280✔
688
  SSdb     *pSdb = pMnode->pSdb;
30,280✔
689
  SRsmaObj *pObj = NULL;
30,280✔
690
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
42,392✔
691
    if (pObj->tbUid == pCreate->tbUid && pObj->dbUid == pDbObj->uid) {
19,682✔
692
      sdbCancelFetch(pSdb, (pIter));
7,570✔
693
      sdbRelease(pSdb, pObj);
7,570✔
694
      mError("rsma:%s, conflict with existing rsma %s on same table %s.%s:%" PRIi64, pCreate->name, pObj->name,
7,570✔
695
             pObj->dbFName, pObj->tbName, pObj->tbUid);
696
      return TSDB_CODE_MND_RSMA_EXIST_IN_TABLE;
7,570✔
697
    }
698
    sdbRelease(pSdb, pObj);
12,112✔
699
  }
700
  return 0;
22,710✔
701
}
702
#endif
703
static int32_t mndProcessCreateRsmaReq(SRpcMsg *pReq) {
37,850✔
704
  int32_t code = 0, lino = 0;
37,850✔
705
#ifdef TD_ENTERPRISE
706
  SMnode         *pMnode = pReq->info.node;
37,850✔
707
  SDbObj         *pDb = NULL;
37,850✔
708
  SStbObj        *pStb = NULL;
37,850✔
709
  SRsmaObj       *pSma = NULL;
37,850✔
710
  SUserObj       *pUser = NULL;
37,850✔
711
  int64_t         mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
37,850✔
712
  SMCreateRsmaReq createReq = {0};
37,850✔
713

714
  TAOS_CHECK_EXIT(tDeserializeSMCreateRsmaReq(pReq->pCont, pReq->contLen, &createReq));
37,850✔
715

716
  mInfo("start to create rsma: %s", createReq.name);
37,850✔
717
  TAOS_CHECK_EXIT(mndCheckCreateRsmaReq(&createReq));
37,850✔
718

719
  if ((pSma = mndAcquireRsma(pMnode, createReq.name))) {
37,850✔
720
    if (createReq.igExists) {
7,570✔
721
      mInfo("rsma:%s, already exist, ignore exist is set", createReq.name);
×
722
      code = 0;
×
723
      goto _exit;
×
724
    } else {
725
      TAOS_CHECK_EXIT(TSDB_CODE_RSMA_ALREADY_EXISTS);
7,570✔
726
    }
727
  } else {
728
    if ((code = terrno) == TSDB_CODE_RSMA_NOT_EXIST) {
30,280✔
729
      // continue
730
    } else {  // TSDB_CODE_MND_RSMA_IN_CREATING | TSDB_CODE_MND_RSMA_IN_DROPPING | TSDB_CODE_APP_ERROR
731
      goto _exit;
×
732
    }
733
  }
734

735
  SName name = {0};
30,280✔
736
  TAOS_CHECK_EXIT(tNameFromString(&name, createReq.tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
30,280✔
737
  char db[TSDB_TABLE_FNAME_LEN] = {0};
30,280✔
738
  (void)tNameGetFullDbName(&name, db);
30,280✔
739

740
  pDb = mndAcquireDb(pMnode, db);
30,280✔
741
  if (pDb == NULL) {
30,280✔
742
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
743
  }
744

745
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pDb));
30,280✔
746
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb));
30,280✔
747

748
  pStb = mndAcquireStb(pMnode, createReq.tbFName);
30,280✔
749
  if (pStb == NULL) {
30,280✔
750
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
751
  }
752

753
  TAOS_CHECK_EXIT(mndCheckRsmaConflicts(pMnode, pDb, &createReq));
30,280✔
754

755
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
22,710✔
756
  TAOS_CHECK_EXIT(mndCreateRsma(pMnode, pReq, pUser, pDb, pStb, &createReq));
22,710✔
757

758
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
22,710✔
759

760
  auditRecord(pReq, pMnode->clusterId, "createRsma", createReq.name, createReq.tbFName, "", 0);
22,710✔
761
_exit:
37,850✔
762
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
37,850✔
763
    mError("rsma:%s, failed at line %d to create since %s", createReq.name, lino, tstrerror(code));
15,140✔
764
  }
765
  if (pSma) mndReleaseRsma(pMnode, pSma);
37,850✔
766
  if (pStb) mndReleaseStb(pMnode, pStb);
37,850✔
767
  if (pDb) mndReleaseDb(pMnode, pDb);
37,850✔
768
  tFreeSMCreateRsmaReq(&createReq);
37,850✔
769
#endif
770
  TAOS_RETURN(code);
37,850✔
771
}
772

773
#ifdef TD_ENTERPRISE
774
static int32_t mndCheckAlterRsmaReq(SMAlterRsmaReq *pReq) {
12,112✔
775
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
12,112✔
776
  if (pReq->name[0] == 0) goto _exit;
12,112✔
777
  if (pReq->igNotExists < 0 || pReq->igNotExists > 1) goto _exit;
12,112✔
778

779
  code = 0;
12,112✔
780
_exit:
12,112✔
781
  TAOS_RETURN(code);
12,112✔
782
}
783

784
static int32_t mndSetAlterRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
12,112✔
785
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
12,112✔
786
  if (pDbRaw == NULL) return -1;
12,112✔
787

788
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
12,112✔
789
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_READY) != 0) return -1;
12,112✔
790
  return 0;
12,112✔
791
}
792

793
static int32_t mndSetAlterRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
12,112✔
794
  return mndSetCreateRsmaCommitLogs(pMnode, pTrans, pSma);
12,112✔
795
}
796

797
static void *mndBuildVAlterRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
24,224✔
798
                                   SMAlterRsmaReq *pAlter, int32_t *pContLen) {
799
  int32_t        code = 0, lino = 0;
24,224✔
800
  SMsgHead      *pHead = NULL;
24,224✔
801
  SVAlterRsmaReq req = {0};
24,224✔
802
  req.alterType = pAlter->alterType;
24,224✔
803
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
24,224✔
804
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
24,224✔
805
  req.tbType = pObj->tbType;
24,224✔
806
  req.intervalUnit = pObj->intervalUnit;
24,224✔
807
  req.interval[0] = pObj->interval[0];
24,224✔
808
  req.interval[1] = pObj->interval[1];
24,224✔
809
  req.tbUid = pObj->tbUid;
24,224✔
810
  req.uid = pObj->uid;
24,224✔
811
  req.nFuncs = pObj->nFuncs;
24,224✔
812
  req.funcColIds = pObj->funcColIds;
24,224✔
813
  req.funcIds = pObj->funcIds;
24,224✔
814

815
  int32_t contLen = tSerializeSVAlterRsmaReq(NULL, 0, &req);
24,224✔
816
  TAOS_CHECK_EXIT(contLen);
24,224✔
817
  contLen += sizeof(SMsgHead);
24,224✔
818
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
24,224✔
819
  pHead->contLen = htonl(contLen);
24,224✔
820
  pHead->vgId = htonl(pVgroup->vgId);
24,224✔
821
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
24,224✔
822
  TAOS_CHECK_EXIT(tSerializeSVAlterRsmaReq(pBuf, contLen, &req));
24,224✔
823
_exit:
24,224✔
824
  if (code < 0) {
24,224✔
825
    taosMemoryFreeClear(pHead);
×
826
    terrno = code;
×
827
    *pContLen = 0;
×
828
    return NULL;
×
829
  }
830
  *pContLen = contLen;
24,224✔
831
  return pHead;
24,224✔
832
}
833

834
static int32_t mndSetAlterRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
12,112✔
835
                                          SMAlterRsmaReq *pAlter) {
836
  int32_t code = 0;
12,112✔
837
  SSdb   *pSdb = pMnode->pSdb;
12,112✔
838
  SVgObj *pVgroup = NULL;
12,112✔
839
  void   *pIter = NULL;
12,112✔
840

841
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
60,560✔
842
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
48,448✔
843
      sdbRelease(pSdb, pVgroup);
24,224✔
844
      continue;
24,224✔
845
    }
846

847
    int32_t contLen = 0;
24,224✔
848
    void   *pReq = mndBuildVAlterRsmaReq(pMnode, pVgroup, pStb, pObj, pAlter, &contLen);
24,224✔
849
    if (pReq == NULL) {
24,224✔
850
      sdbCancelFetch(pSdb, pIter);
×
851
      sdbRelease(pSdb, pVgroup);
×
852
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
853
      TAOS_RETURN(code);
×
854
    }
855

856
    STransAction action = {0};
24,224✔
857
    action.mTraceId = pTrans->mTraceId;
24,224✔
858
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
24,224✔
859
    action.pCont = pReq;
24,224✔
860
    action.contLen = contLen;
24,224✔
861
    action.msgType = TDMT_VND_ALTER_RSMA;
24,224✔
862
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
24,224✔
863
      taosMemoryFree(pReq);
×
864
      sdbCancelFetch(pSdb, pIter);
×
865
      sdbRelease(pSdb, pVgroup);
×
866
      TAOS_RETURN(code);
×
867
    }
868
    sdbRelease(pSdb, pVgroup);
24,224✔
869
  }
870

871
  TAOS_RETURN(code);
12,112✔
872
}
873

874
static int32_t mndAlterRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
12,112✔
875
                            SMAlterRsmaReq *pAlter, SRsmaObj *pOld) {
876
  int32_t  code = 0, lino = 0;
12,112✔
877
  STrans  *pTrans = NULL;
12,112✔
878
  SRsmaObj obj = *pOld;
12,112✔
879

880
  obj.updateTime = taosGetTimestampMs();
12,112✔
881
  ++obj.version;
12,112✔
882
  if (pAlter->alterType == TSDB_ALTER_RSMA_FUNCTION) {
12,112✔
883
    obj.nFuncs = pOld->nFuncs + pAlter->nFuncs;
12,112✔
884
    obj.funcColIds = taosMemoryMalloc(obj.nFuncs * sizeof(col_id_t));
12,112✔
885
    obj.funcIds = taosMemoryMalloc(obj.nFuncs * sizeof(func_id_t));
12,112✔
886
    if (obj.funcColIds == NULL || obj.funcIds == NULL) {
12,112✔
887
      TAOS_CHECK_EXIT(terrno);
×
888
    }
889
    int32_t n = 0, i = 0, j = 0;
12,112✔
890
    while (i < pOld->nFuncs && j < pAlter->nFuncs) {
65,102✔
891
      if (pOld->funcColIds[i] < pAlter->funcColIds[j]) {
52,990✔
892
        obj.funcColIds[n] = pOld->funcColIds[i];
45,420✔
893
        obj.funcIds[n++] = pOld->funcIds[i++];
45,420✔
894
      } else if (pOld->funcColIds[i] > pAlter->funcColIds[j]) {
7,570✔
895
        obj.funcColIds[n] = pAlter->funcColIds[j];
7,570✔
896
        obj.funcIds[n++] = pAlter->funcIds[j++];
7,570✔
897
      } else {
898
        mError("rsma:%s, conflict function on column id:%d", pOld->name, pAlter->funcColIds[j]);
×
899
        TAOS_CHECK_EXIT(TSDB_CODE_MND_RSMA_FUNC_CONFLICT);
×
900
      }
901
    }
902
    if (i < pOld->nFuncs) {
12,112✔
903
      while (i < pOld->nFuncs) {
15,140✔
904
        obj.funcColIds[n] = pOld->funcColIds[i];
7,570✔
905
        obj.funcIds[n++] = pOld->funcIds[i++];
7,570✔
906
      }
907
    } else if (j < pAlter->nFuncs) {
4,542✔
908
      while (j < pAlter->nFuncs) {
9,084✔
909
        obj.funcColIds[n] = pAlter->funcColIds[j];
4,542✔
910
        obj.funcIds[n++] = pAlter->funcIds[j++];
4,542✔
911
      }
912
    }
913
  } else {
914
    TAOS_CHECK_EXIT(TSDB_CODE_OPS_NOT_SUPPORT);
×
915
  }
916

917
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-rsma")), code,
12,112✔
918
                  lino, _exit, terrno);
919
  mInfo("trans:%d, used to alter rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
12,112✔
920

921
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
12,112✔
922
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
12,112✔
923
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
12,112✔
924

925
  mndTransSetOper(pTrans, MND_OPER_ALTER_RSMA);
12,112✔
926
  TAOS_CHECK_EXIT(mndSetAlterRsmaPrepareActions(pMnode, pTrans, &obj));
12,112✔
927
  TAOS_CHECK_EXIT(mndSetAlterRsmaCommitLogs(pMnode, pTrans, &obj));
12,112✔
928
  TAOS_CHECK_EXIT(mndSetAlterRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pAlter));
12,112✔
929

930
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
12,112✔
931
_exit:
12,112✔
932
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
12,112✔
933
    mError("rsma:%s, failed at line %d to alter rsma, since %s", obj.name, lino, tstrerror(code));
×
934
  }
935
  mndTransDrop(pTrans);
12,112✔
936
  mndRsmaFreeObj(&obj);
12,112✔
937
  TAOS_RETURN(code);
12,112✔
938
}
939
#endif
940
static int32_t mndProcessAlterRsmaReq(SRpcMsg *pReq) {
12,112✔
941
  int32_t code = 0, lino = 0;
12,112✔
942
#ifdef TD_ENTERPRISE
943
  SMnode        *pMnode = pReq->info.node;
12,112✔
944
  SDbObj        *pDb = NULL;
12,112✔
945
  SStbObj       *pStb = NULL;
12,112✔
946
  SRsmaObj      *pObj = NULL;
12,112✔
947
  SUserObj      *pUser = NULL;
12,112✔
948
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
12,112✔
949
  SMAlterRsmaReq req = {0};
12,112✔
950
  char           tbFName[TSDB_TABLE_FNAME_LEN] = "\0";
12,112✔
951

952
  TAOS_CHECK_EXIT(tDeserializeSMAlterRsmaReq(pReq->pCont, pReq->contLen, &req));
12,112✔
953

954
  mInfo("start to alter rsma: %s", req.name);
12,112✔
955
  TAOS_CHECK_EXIT(mndCheckAlterRsmaReq(&req));
12,112✔
956

957
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
12,112✔
958
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
959
    if (terrno != 0) code = terrno;
×
960
    if (req.igNotExists) {
×
961
      code = 0;
×
962
    }
963
    goto _exit;
×
964
  }
965

966
  if (!(pDb = mndAcquireDb(pMnode, pObj->dbFName))) {
12,112✔
967
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
968
  }
969

970
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pDb));
12,112✔
971
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb));
12,112✔
972

973
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
12,112✔
974

975
  pStb = mndAcquireStb(pMnode, tbFName);
12,112✔
976
  if (pStb == NULL) {
12,112✔
977
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
978
  }
979

980
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
12,112✔
981
  TAOS_CHECK_EXIT(mndAlterRsma(pMnode, pReq, pUser, pDb, pStb, &req, pObj));
12,112✔
982

983
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
12,112✔
984

985
  char alterType[32] = "\0";
12,112✔
986
  (void)snprintf(alterType, sizeof(alterType), "alterType:%" PRIi8, req.alterType);
12,112✔
987
  auditRecord(pReq, pMnode->clusterId, "alterRsma", req.name, tbFName, alterType, 0);
12,112✔
988
_exit:
12,112✔
989
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
12,112✔
990
    mError("rsma:%s, failed at line %d to alter since %s", req.name, lino, tstrerror(code));
×
991
  }
992
  if (pObj) mndReleaseRsma(pMnode, pObj);
12,112✔
993
  if (pStb) mndReleaseStb(pMnode, pStb);
12,112✔
994
  if (pDb) mndReleaseDb(pMnode, pDb);
12,112✔
995
  tFreeSMAlterRsmaReq(&req);
12,112✔
996
#endif
997
  TAOS_RETURN(code);
12,112✔
998
}
999
#ifdef TD_ENTERPRISE
1000
static int32_t mndFillRsmaInfo(SRsmaObj *pObj, SStbObj *pStb, SRsmaInfoRsp *pRsp, bool withColName) {
34,822✔
1001
  int32_t code = 0, lino = 0;
34,822✔
1002
  pRsp->id = pObj->uid;
34,822✔
1003
  (void)snprintf(pRsp->name, sizeof(pRsp->name), "%s", pObj->name);
34,822✔
1004
  (void)snprintf(pRsp->tbFName, sizeof(pRsp->tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
34,822✔
1005
  pRsp->version = pObj->version;
34,822✔
1006
  pRsp->tbType = pObj->tbType;
34,822✔
1007
  pRsp->intervalUnit = pObj->intervalUnit;
34,822✔
1008
  pRsp->nFuncs = pObj->nFuncs;
34,822✔
1009
  pRsp->interval[0] = pObj->interval[0];
34,822✔
1010
  pRsp->interval[1] = pObj->interval[1];
34,822✔
1011
  if (pRsp->nFuncs > 0) {
34,822✔
1012
    pRsp->funcColIds = pObj->funcColIds;  // shallow copy, no need to free
34,822✔
1013
    pRsp->funcIds = pObj->funcIds;        // shallow copy, no need to free
34,822✔
1014
    if (withColName) {
34,822✔
1015
      pRsp->colNames = taosArrayInit(pRsp->nFuncs, sizeof(char *));
34,822✔
1016
      if (pRsp->colNames == NULL) {
34,822✔
1017
        TAOS_CHECK_EXIT(terrno);
×
1018
      }
1019
      pRsp->nColNames = pRsp->nFuncs;
34,822✔
1020
      int16_t i = 0, j = 0;
34,822✔
1021
      for (; i < pRsp->nFuncs; ++i) {
205,904✔
1022
        bool found = false;
171,082✔
1023
        for (; j < pStb->numOfColumns;) {
366,388✔
1024
          if (pStb->pColumns[j].colId == pRsp->funcColIds[i]) {
366,388✔
1025
            found = true;
171,082✔
1026
            break;
171,082✔
1027
          } else if (pStb->pColumns[j].colId < pRsp->funcColIds[i]) {
195,306✔
1028
            ++j;
195,306✔
1029
          } else {
1030
            break;
×
1031
          }
1032
        }
1033
        if (found) {
171,082✔
1034
          SSchema *pCol = pStb->pColumns + j;
171,082✔
1035
          char    *colName = taosStrdup(pCol->name);
171,082✔
1036
          if (colName == NULL) {
171,082✔
1037
            TAOS_CHECK_EXIT(terrno);
×
1038
          }
1039
          if (!taosArrayPush(pRsp->colNames, &colName)) {
342,164✔
1040
            taosMemoryFree(colName);
×
1041
            TAOS_CHECK_EXIT(terrno);
×
1042
          }
1043
        } else {
1044
          TAOS_CHECK_EXIT(TSDB_CODE_MND_COLUMN_NOT_EXIST);
×
1045
        }
1046
      }
1047
    }
1048
  }
1049
_exit:
34,822✔
1050
  if (code != 0) {
34,822✔
1051
    mError("rsma:%s, failed at line %d to get rsma info since %s", pObj->name, lino, tstrerror(code));
×
1052
  }
1053
  TAOS_RETURN(code);
34,822✔
1054
}
1055
#endif
1056
static int32_t mndProcessGetRsmaReq(SRpcMsg *pReq) {
45,420✔
1057
#ifdef TD_ENTERPRISE
1058
  int32_t      code = 0, lino = 0;
45,420✔
1059
  SMnode      *pMnode = pReq->info.node;
45,420✔
1060
  SRsmaInfoReq req = {0};
45,420✔
1061
  SRsmaInfoRsp rsp = {0};
45,420✔
1062
  SRsmaObj    *pObj = NULL;
45,420✔
1063
  SStbObj     *pStb = NULL;
45,420✔
1064
  void        *pRsp = NULL;
45,420✔
1065
  int32_t      contLen = 0;
45,420✔
1066

1067
  TAOS_CHECK_EXIT(tDeserializeRsmaInfoReq(pReq->pCont, pReq->contLen, &req));
45,420✔
1068

1069
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
45,420✔
1070
    TAOS_CHECK_EXIT(terrno);
10,598✔
1071
  }
1072

1073
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
34,822✔
1074
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
34,822✔
1075

1076
  if ((pStb = mndAcquireStb(pMnode, tbFName)) == NULL) {
34,822✔
1077
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
1078
  }
1079

1080
  TAOS_CHECK_EXIT(mndFillRsmaInfo(pObj, pStb, &rsp, req.withColName));
34,822✔
1081

1082
  if ((contLen = tSerializeRsmaInfoRsp(NULL, 0, &rsp)) < 0) {
34,822✔
1083
    TAOS_CHECK_EXIT(contLen);
×
1084
  }
1085
  if (!(pRsp = rpcMallocCont(contLen))) {
34,822✔
1086
    TAOS_CHECK_EXIT(terrno);
×
1087
  }
1088
  if ((contLen = tSerializeRsmaInfoRsp(pRsp, contLen, &rsp)) < 0) {
34,822✔
1089
    TAOS_CHECK_EXIT(contLen);
×
1090
  }
1091

1092
  pReq->info.rsp = pRsp;
34,822✔
1093
  pReq->info.rspLen = contLen;
34,822✔
1094

1095
_exit:
45,420✔
1096
  if (code != 0) {
45,420✔
1097
    rpcFreeCont(pRsp);
10,598✔
1098
  }
1099
  if (pObj) mndReleaseRsma(pMnode, pObj);
45,420✔
1100
  if (pStb) mndReleaseStb(pMnode, pStb);
45,420✔
1101
  tFreeRsmaInfoRsp(&rsp, false);
45,420✔
1102
  TAOS_RETURN(code);
45,420✔
1103
#else
1104
  return TSDB_CODE_OPS_NOT_SUPPORT;
1105
#endif
1106
}
1107
#ifdef TD_ENTERPRISE
1108
static void mndRetrieveRsmaFuncList(SMnode *pMnode, SRsmaObj *pObj, char *buf, int32_t bufLen) {
48,448✔
1109
  SSdb    *pSdb = pMnode->pSdb;
48,448✔
1110
  int32_t  numOfRows = 0;
48,448✔
1111
  SStbObj *pStb = NULL;
48,448✔
1112
  char    *qBuf = POINTER_SHIFT(buf, VARSTR_HEADER_SIZE);
48,448✔
1113
  int32_t  qBufLen = bufLen - VARSTR_HEADER_SIZE;
48,448✔
1114

1115
  qBuf[0] = 0;
48,448✔
1116
  varDataSetLen(buf, 0);  // initialize to empty string
48,448✔
1117

1118
  if (pObj->nFuncs <= 0) return;
48,448✔
1119

1120
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
48,448✔
1121
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
48,448✔
1122
  pStb = mndAcquireStb(pMnode, tbFName);
48,448✔
1123
  if (pStb == NULL) {
48,448✔
1124
    mWarn("rsma:%s, failed to acquire table %s for function list", pObj->name, tbFName);
×
1125
    return;
×
1126
  }
1127

1128
  SSchema *pColumns = pStb->pColumns;
48,448✔
1129

1130
  int32_t len = 0, j = 0;
48,448✔
1131
  char    colFunc[TSDB_COL_NAME_LEN + TSDB_FUNC_NAME_LEN + 2] = {0};
48,448✔
1132
  for (int32_t i = 0; i < pObj->nFuncs; ++i) {
346,706✔
1133
    col_id_t colId = pObj->funcColIds[i];
298,258✔
1134
    for (; j < pStb->numOfColumns;) {
617,712✔
1135
      if (pColumns[j].colId == colId) {
617,712✔
1136
        int32_t colFuncLen =
298,258✔
1137
            tsnprintf(colFunc, sizeof(colFunc), "%s(%s),", fmGetFuncName(pObj->funcIds[i]), pColumns[j].name);
298,258✔
1138
        if ((qBufLen - len) > colFuncLen) {
298,258✔
1139
          len += tsnprintf(qBuf + len, colFuncLen + 1, "%s", colFunc);
298,258✔
1140
        } else {
1141
          goto _exit;
×
1142
        }
1143
        break;
298,258✔
1144
      } else if (pColumns[j].colId > colId) {
319,454✔
1145
        break;
×
1146
      } else {
1147
        ++j;
319,454✔
1148
      }
1149
    }
1150
  }
1151
_exit:
48,448✔
1152
  qBuf[len > 0 ? len - 1 : 0] = 0;  // remove the last ','
48,448✔
1153
  varDataSetLen(buf, len > 0 ? len - 1 : 0);
48,448✔
1154
  mndReleaseStb(pMnode, pStb);
48,448✔
1155
}
1156
#endif
1157
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
33,308✔
1158
  SMnode          *pMnode = pReq->info.node;
33,308✔
1159
  int32_t          code = 0, lino = 0;
33,308✔
1160
  int32_t          numOfRows = 0;
33,308✔
1161
  int32_t          cols = 0;
33,308✔
1162
  char             tmp[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE];
33,308✔
1163
  int32_t          tmpLen = 0;
33,308✔
1164
  int32_t          bufLen = 0;
33,308✔
1165
  char            *pBuf = NULL;
33,308✔
1166
  char            *qBuf = NULL;
33,308✔
1167
  void            *pIter = NULL;
33,308✔
1168
  SSdb            *pSdb = pMnode->pSdb;
33,308✔
1169
  SColumnInfoData *pColInfo = NULL;
33,308✔
1170
#ifdef TD_ENTERPRISE
1171
  pBuf = tmp;
33,308✔
1172
  bufLen = sizeof(tmp) - VARSTR_HEADER_SIZE;
33,308✔
1173
  if (pShow->numOfRows < 1) {
33,308✔
1174
    SRsmaObj *pObj = NULL;
33,308✔
1175
    int32_t   index = 0;
33,308✔
1176
    while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
81,756✔
1177
      cols = 0;
48,448✔
1178
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
48,448✔
1179
      qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
48,448✔
1180
      TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->name));
48,448✔
1181
      varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
48,448✔
1182
      COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
48,448✔
1183

1184
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
48,448✔
1185
        COL_DATA_SET_VAL_GOTO((const char *)(&pObj->uid), false, pObj, pIter, _exit);
48,448✔
1186
      }
1187

1188
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
48,448✔
1189
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
48,448✔
1190
        const char *db = strchr(pObj->dbFName, '.');
48,448✔
1191
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", db ? db + 1 : pObj->dbFName));
48,448✔
1192
        varDataSetLen(pBuf, strlen(qBuf));
48,448✔
1193
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
48,448✔
1194
      }
1195

1196
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
48,448✔
1197
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
48,448✔
1198
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->tbName));
48,448✔
1199
        varDataSetLen(pBuf, strlen(qBuf));
48,448✔
1200
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
48,448✔
1201
      }
1202

1203
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
48,448✔
1204
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
48,448✔
1205
        if (pObj->tbType == TSDB_SUPER_TABLE) {
48,448✔
1206
          TAOS_UNUSED(snprintf(qBuf, bufLen, "SUPER_TABLE"));
48,448✔
1207
        } else if (pObj->tbType == TSDB_NORMAL_TABLE) {
×
1208
          TAOS_UNUSED(snprintf(qBuf, bufLen, "NORMAL_TABLE"));
×
1209
        } else if (pObj->tbType == TSDB_CHILD_TABLE) {
×
1210
          TAOS_UNUSED(snprintf(qBuf, bufLen, "CHILD_TABLE"));
×
1211
        } else {
1212
          TAOS_UNUSED(snprintf(qBuf, bufLen, "UNKNOWN"));
×
1213
        }
1214
        varDataSetLen(pBuf, strlen(qBuf));
48,448✔
1215
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
48,448✔
1216
      }
1217

1218
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
48,448✔
1219
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->createdTime, false, pObj, pIter, _exit);
48,448✔
1220
      }
1221

1222
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
48,448✔
1223
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
48,448✔
1224
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%" PRIi64 "%c", pObj->interval[0], pObj->intervalUnit));
48,448✔
1225
        if (pObj->interval[1] > 0) {
48,448✔
1226
          tmpLen = strlen(qBuf);
48,448✔
1227
          TAOS_UNUSED(
48,448✔
1228
              snprintf(qBuf + tmpLen, bufLen - tmpLen, ",%" PRIi64 "%c", pObj->interval[1], pObj->intervalUnit));
1229
        }
1230
        varDataSetLen(pBuf, strlen(qBuf));
48,448✔
1231
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
48,448✔
1232
      }
1233

1234
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
48,448✔
1235
        mndRetrieveRsmaFuncList(pMnode, pObj, pBuf, bufLen);
48,448✔
1236
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
48,448✔
1237
      }
1238

1239
      sdbRelease(pSdb, pObj);
48,448✔
1240
      ++numOfRows;
48,448✔
1241
    }
1242
  }
1243

1244
  pShow->numOfRows += numOfRows;
33,308✔
1245

1246
_exit:
33,308✔
1247
  if (code < 0) {
33,308✔
1248
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1249
    TAOS_RETURN(code);
×
1250
  }
1251
#endif
1252
  return numOfRows;
33,308✔
1253
}
1254

1255
static void mndCancelRetrieveRsma(SMnode *pMnode, void *pIter) {
×
1256
  SSdb *pSdb = pMnode->pSdb;
×
1257
  sdbCancelFetchByType(pSdb, pIter, SDB_RSMA);
×
1258
}
×
1259

1260
int32_t mndDropRsmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
703,963✔
1261
  int32_t code = 0;
703,963✔
1262
#ifdef TD_ENTERPRISE
1263
  SSdb     *pSdb = pMnode->pSdb;
703,963✔
1264
  SRsmaObj *pObj = NULL;
703,963✔
1265
  void     *pIter = NULL;
703,963✔
1266

1267
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
719,103✔
1268
    if (pObj->dbUid == pDb->uid) {
15,140✔
1269
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
15,140✔
1270
        sdbCancelFetch(pSdb, pIter);
×
1271
        sdbRelease(pSdb, pObj);
×
1272
        TAOS_RETURN(code);
×
1273
      }
1274
    }
1275
    sdbRelease(pSdb, pObj);
15,140✔
1276
  }
1277
#endif
1278
  TAOS_RETURN(code);
703,963✔
1279
}
1280

1281
int32_t mndDropRsmaByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
540,791✔
1282
  int32_t code = 0;
540,791✔
1283
#ifdef TD_ENTERPRISE
1284
  SSdb     *pSdb = pMnode->pSdb;
540,791✔
1285
  SRsmaObj *pObj = NULL;
540,791✔
1286
  void     *pIter = NULL;
540,791✔
1287

1288
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
542,305✔
1289
    if (pObj->tbUid == pStb->uid && pObj->dbUid == pStb->dbUid) {
1,514✔
1290
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
1,514✔
1291
        sdbCancelFetch(pSdb, pIter);
×
1292
        sdbRelease(pSdb, pObj);
×
1293
        TAOS_RETURN(code);
×
1294
      }
1295
    }
1296
    sdbRelease(pSdb, pObj);
1,514✔
1297
  }
1298
#endif
1299
  TAOS_RETURN(code);
540,791✔
1300
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc