• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5071

17 May 2026 01:15AM UTC coverage: 63.054% (-10.3%) from 73.326%
#5071

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

238317 of 377957 relevant lines covered (63.05%)

130539817.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.47
/source/dnode/mnode/impl/src/mndRsma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndInfoSchema.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndShow.h"
25
#include "mndSma.h"
26
#include "mndStb.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "parser.h"
31
#include "tname.h"
32

33
#define MND_RSMA_VER_NUMBER   1
34
#define MND_RSMA_RESERVE_SIZE 64
35

36
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pSma);
37
static SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw);
38
static int32_t  mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pSma);
39
static int32_t  mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pSpSmatb);
40
static int32_t  mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew);
41
static int32_t  mndProcessCreateRsmaReq(SRpcMsg *pReq);
42
static int32_t  mndProcessDropRsmaReq(SRpcMsg *pReq);
43
static int32_t  mndProcessAlterRsmaReq(SRpcMsg *pReq);
44
static int32_t  mndProcessGetRsmaReq(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelRetrieveRsma(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveRsmaTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelRetrieveRsmaTask(SMnode *pMnode, void *pIter);
50

51
int32_t mndInitRsma(SMnode *pMnode) {
404,517✔
52
  SSdbTable table = {
404,517✔
53
      .sdbType = SDB_RSMA,
54
      .keyType = SDB_KEY_BINARY,
55
      .encodeFp = (SdbEncodeFp)mndRsmaActionEncode,
56
      .decodeFp = (SdbDecodeFp)mndRsmaActionDecode,
57
      .insertFp = (SdbInsertFp)mndRsmaActionInsert,
58
      .updateFp = (SdbUpdateFp)mndRsmaActionUpdate,
59
      .deleteFp = (SdbDeleteFp)mndRsmaActionDelete,
60
  };
61

62
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_RSMA, mndProcessCreateRsmaReq);
404,517✔
63
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_RSMA_RSP, mndTransProcessRsp);
404,517✔
64
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_RSMA, mndProcessDropRsmaReq);
404,517✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_RSMA_RSP, mndTransProcessRsp);
404,517✔
66
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_RSMA, mndProcessAlterRsmaReq);
404,517✔
67
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_RSMA_RSP, mndTransProcessRsp);
404,517✔
68
  mndSetMsgHandle(pMnode, TDMT_MND_GET_RSMA, mndProcessGetRsmaReq);
404,517✔
69
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndRetrieveRsma);
404,517✔
70
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndCancelRetrieveRsma);
404,517✔
71

72
  return sdbSetTable(pMnode->pSdb, table);
404,517✔
73
}
74

75
void mndCleanupRsma(SMnode *pMnode) {}
404,517✔
76

77
void mndRsmaFreeObj(SRsmaObj *pObj) {
118,160✔
78
  if (pObj) {
118,160✔
79
    taosMemoryFreeClear(pObj->funcColIds);
118,160✔
80
    taosMemoryFreeClear(pObj->funcIds);
118,160✔
81
  }
82
}
118,160✔
83

84
static int32_t tSerializeSRsmaObj(void *buf, int32_t bufLen, const SRsmaObj *pObj) {
182,304✔
85
  int32_t  code = 0, lino = 0;
182,304✔
86
  int32_t  tlen = 0;
182,304✔
87
  SEncoder encoder = {0};
182,304✔
88
  tEncoderInit(&encoder, buf, bufLen);
182,304✔
89

90
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
182,304✔
91

92
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->name));
364,608✔
93
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->tbName));
364,608✔
94
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbFName));
364,608✔
95
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->createUser));
364,608✔
96
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->createdTime));
364,608✔
97
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->updateTime));
364,608✔
98
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->uid));
364,608✔
99
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->tbUid));
364,608✔
100
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
364,608✔
101
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[0]));
364,608✔
102
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[1]));
364,608✔
103
  TAOS_CHECK_EXIT(tEncodeU64v(&encoder, pObj->reserved));
364,608✔
104
  TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->version));
364,608✔
105
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->tbType));
364,608✔
106
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->intervalUnit));
364,608✔
107
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nFuncs));
364,608✔
108
  for (int16_t i = 0; i < pObj->nFuncs; ++i) {
867,632✔
109
    TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->funcColIds[i]));
1,370,656✔
110
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->funcIds[i]));
1,370,656✔
111
  }
112
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->ownerId));
364,608✔
113

114
  tEndEncode(&encoder);
182,304✔
115

116
  tlen = encoder.pos;
182,304✔
117
_exit:
182,304✔
118
  tEncoderClear(&encoder);
182,304✔
119
  if (code < 0) {
182,304✔
120
    mError("rsma, %s failed at line %d since %s", __func__, lino, tstrerror(code));
×
121
    TAOS_RETURN(code);
×
122
  }
123

124
  return tlen;
182,304✔
125
}
126

127
static int32_t tDeserializeSRsmaObj(void *buf, int32_t bufLen, SRsmaObj *pObj) {
89,464✔
128
  int32_t  code = 0, lino = 0;
89,464✔
129
  SDecoder decoder = {0};
89,464✔
130
  tDecoderInit(&decoder, buf, bufLen);
89,464✔
131

132
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
89,464✔
133

134
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->name));
89,464✔
135
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->tbName));
89,464✔
136
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbFName));
89,464✔
137
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->createUser));
89,464✔
138
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->createdTime));
178,928✔
139
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->updateTime));
178,928✔
140
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->uid));
178,928✔
141
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->tbUid));
178,928✔
142
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
178,928✔
143
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[0]));
178,928✔
144
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[1]));
178,928✔
145
  TAOS_CHECK_EXIT(tDecodeU64v(&decoder, &pObj->reserved));
178,928✔
146
  TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->version));
178,928✔
147
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->tbType));
178,928✔
148
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->intervalUnit));
178,928✔
149
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nFuncs));
178,928✔
150
  if (pObj->nFuncs > 0) {
89,464✔
151
    if (!(pObj->funcColIds = taosMemoryMalloc(sizeof(col_id_t) * pObj->nFuncs))) {
89,464✔
152
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
153
    }
154
    if (!(pObj->funcIds = taosMemoryMalloc(sizeof(int32_t) * pObj->nFuncs))) {
89,464✔
155
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
156
    }
157
    for (int16_t i = 0; i < pObj->nFuncs; ++i) {
430,440✔
158
      TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->funcColIds[i]));
681,952✔
159
      TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->funcIds[i]));
681,952✔
160
    }
161
  }
162
  if (!tDecodeIsEnd(&decoder)) {
89,464✔
163
    TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->ownerId));
178,928✔
164
  }
165

166
_exit:
89,464✔
167
  tEndDecode(&decoder);
89,464✔
168
  tDecoderClear(&decoder);
89,464✔
169
  if (code < 0) {
89,464✔
170
    mError("rsma, %s failed at line %d since %s, row:%p", __func__, lino, tstrerror(code), pObj);
×
171
  }
172
  TAOS_RETURN(code);
89,464✔
173
}
174

175
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pObj) {
91,152✔
176
  int32_t  code = 0, lino = 0;
91,152✔
177
  void    *buf = NULL;
91,152✔
178
  SSdbRaw *pRaw = NULL;
91,152✔
179
  int32_t  tlen = tSerializeSRsmaObj(NULL, 0, pObj);
91,152✔
180
  if (tlen < 0) {
91,152✔
181
    TAOS_CHECK_EXIT(tlen);
×
182
  }
183

184
  int32_t size = sizeof(int32_t) + tlen;
91,152✔
185
  pRaw = sdbAllocRaw(SDB_RSMA, MND_RSMA_VER_NUMBER, size);
91,152✔
186
  if (pRaw == NULL) {
91,152✔
187
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
188
  }
189

190
  buf = taosMemoryMalloc(tlen);
91,152✔
191
  if (buf == NULL) {
91,152✔
192
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
193
  }
194

195
  tlen = tSerializeSRsmaObj(buf, tlen, pObj);
91,152✔
196
  if (tlen < 0) {
91,152✔
197
    TAOS_CHECK_EXIT(tlen);
×
198
  }
199

200
  int32_t dataPos = 0;
91,152✔
201
  SDB_SET_INT32(pRaw, dataPos, tlen, _exit);
91,152✔
202
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _exit);
91,152✔
203
  SDB_SET_DATALEN(pRaw, dataPos, _exit);
91,152✔
204

205
_exit:
91,152✔
206
  taosMemoryFreeClear(buf);
91,152✔
207
  if (code != TSDB_CODE_SUCCESS) {
91,152✔
208
    terrno = code;
×
209
    mError("rsma, failed at line %d to encode to raw:%p since %s", lino, pRaw, tstrerror(code));
×
210
    sdbFreeRaw(pRaw);
×
211
    return NULL;
×
212
  }
213

214
  mTrace("rsma, encode to raw:%p, row:%p", pRaw, pObj);
91,152✔
215
  return pRaw;
91,152✔
216
}
217

218
SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw) {
89,464✔
219
  int32_t   code = 0, lino = 0;
89,464✔
220
  SSdbRow  *pRow = NULL;
89,464✔
221
  SRsmaObj *pObj = NULL;
89,464✔
222
  void     *buf = NULL;
89,464✔
223

224
  int8_t sver = 0;
89,464✔
225
  TAOS_CHECK_EXIT(sdbGetRawSoftVer(pRaw, &sver));
89,464✔
226

227
  if (sver != MND_RSMA_VER_NUMBER) {
89,464✔
228
    mError("rsma read invalid ver, data ver: %d, curr ver: %d", sver, MND_RSMA_VER_NUMBER);
×
229
    TAOS_CHECK_EXIT(TSDB_CODE_SDB_INVALID_DATA_VER);
×
230
  }
231

232
  if (!(pRow = sdbAllocRow(sizeof(SRsmaObj)))) {
89,464✔
233
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
234
  }
235

236
  if (!(pObj = sdbGetRowObj(pRow))) {
89,464✔
237
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
238
  }
239

240
  int32_t tlen;
89,464✔
241
  int32_t dataPos = 0;
89,464✔
242
  SDB_GET_INT32(pRaw, dataPos, &tlen, _exit);
89,464✔
243
  buf = taosMemoryMalloc(tlen + 1);
89,464✔
244
  if (buf == NULL) {
89,464✔
245
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
246
  }
247
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _exit);
89,464✔
248

249
  TAOS_CHECK_EXIT(tDeserializeSRsmaObj(buf, tlen, pObj));
89,464✔
250

251
  taosInitRWLatch(&pObj->lock);
89,464✔
252

253
_exit:
89,464✔
254
  taosMemoryFreeClear(buf);
89,464✔
255
  if (code != TSDB_CODE_SUCCESS) {
89,464✔
256
    terrno = code;
×
257
    mError("rsma, failed at line %d to decode from raw:%p since %s", lino, pRaw, tstrerror(code));
×
258
    mndRsmaFreeObj(pObj);
×
259
    taosMemoryFreeClear(pRow);
×
260
    return NULL;
×
261
  }
262
  mTrace("rsma, decode from raw:%p, row:%p", pRaw, pObj);
89,464✔
263
  return pRow;
89,464✔
264
}
265

266
static int32_t mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pObj) {
21,944✔
267
  mTrace("rsma:%s, perform insert action, row:%p", pObj->name, pObj);
21,944✔
268
  return 0;
21,944✔
269
}
270

271
static int32_t mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pObj) {
89,464✔
272
  mTrace("rsma:%s, perform delete action, row:%p", pObj->name, pObj);
89,464✔
273
  mndRsmaFreeObj(pObj);
89,464✔
274
  return 0;
89,464✔
275
}
276

277
static int32_t mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew) {
46,420✔
278
  mTrace("rsma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
46,420✔
279
  taosWLockLatch(&pOld->lock);
46,420✔
280
  pOld->updateTime = pNew->updateTime;
46,420✔
281
  pOld->nFuncs = pNew->nFuncs;
46,420✔
282
  pOld->ownerId = pNew->ownerId;
46,420✔
283
  TSWAP(pOld->funcColIds, pNew->funcColIds);
46,420✔
284
  TSWAP(pOld->funcIds, pNew->funcIds);
46,420✔
285
  taosWUnLockLatch(&pOld->lock);
46,420✔
286
  return 0;
46,420✔
287
}
288

289
SRsmaObj *mndAcquireRsma(SMnode *pMnode, char *name) {
73,428✔
290
  SSdb     *pSdb = pMnode->pSdb;
73,428✔
291
  SRsmaObj *pObj = sdbAcquire(pSdb, SDB_RSMA, name);
73,428✔
292
  if (pObj == NULL) {
73,428✔
293
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
32,072✔
294
      terrno = TSDB_CODE_RSMA_NOT_EXIST;
32,072✔
295
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
296
      terrno = TSDB_CODE_MND_RSMA_IN_CREATING;
×
297
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
298
      terrno = TSDB_CODE_MND_RSMA_IN_DROPPING;
×
299
    } else {
300
      terrno = TSDB_CODE_APP_ERROR;
×
301
      mFatal("rsma:%s, failed to acquire rsma since %s", name, terrstr());
×
302
    }
303
  }
304
  return pObj;
73,428✔
305
}
306

307
void mndReleaseRsma(SMnode *pMnode, SRsmaObj *pSma) {
41,356✔
308
  SSdb *pSdb = pMnode->pSdb;
41,356✔
309
  sdbRelease(pSdb, pSma);
41,356✔
310
}
41,356✔
311
#ifdef TD_ENTERPRISE
312
static int32_t mndSetCreateRsmaRedoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
313
  int32_t  code = 0;
×
314
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
×
315
  if (pRedoRaw == NULL) {
×
316
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
317
    if (terrno != 0) code = terrno;
×
318
    TAOS_RETURN(code);
×
319
  }
320
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
321
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
322

323
  TAOS_RETURN(code);
×
324
}
325

326
static int32_t mndSetCreateRsmaUndoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
327
  int32_t  code = 0;
×
328
  SSdbRaw *pUndoRaw = mndRsmaActionEncode(pSma);
×
329
  if (!pUndoRaw) {
×
330
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
331
    if (terrno != 0) code = terrno;
×
332
    TAOS_RETURN(code);
×
333
  }
334
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
335
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
336
  TAOS_RETURN(code);
×
337
}
338

339
static int32_t mndSetCreateRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
21,944✔
340
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
21,944✔
341
  if (pDbRaw == NULL) return -1;
21,944✔
342

343
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
21,944✔
344
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
21,944✔
345
  return 0;
21,944✔
346
}
347

348
static void *mndBuildVCreateRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
43,888✔
349
                                    SMCreateRsmaReq *pCreate, int32_t *pContLen) {
350
  int32_t         code = 0, lino = 0;
43,888✔
351
  SMsgHead       *pHead = NULL;
43,888✔
352
  SVCreateRsmaReq req = *pCreate;
43,888✔
353

354
  req.uid = pObj->uid;  // use the uid generated by mnode
43,888✔
355

356
  int32_t contLen = tSerializeSVCreateRsmaReq(NULL, 0, &req);
43,888✔
357
  TAOS_CHECK_EXIT(contLen);
43,888✔
358
  contLen += sizeof(SMsgHead);
43,888✔
359
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
43,888✔
360
  pHead->contLen = htonl(contLen);
43,888✔
361
  pHead->vgId = htonl(pVgroup->vgId);
43,888✔
362
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
43,888✔
363
  TAOS_CHECK_EXIT(tSerializeSVCreateRsmaReq(pBuf, contLen, &req));
43,888✔
364
_exit:
43,888✔
365
  if (code < 0) {
43,888✔
366
    taosMemoryFreeClear(pHead);
×
367
    terrno = code;
×
368
    *pContLen = 0;
×
369
    return NULL;
×
370
  }
371
  *pContLen = contLen;
43,888✔
372
  return pHead;
43,888✔
373
}
374

375
static int32_t mndSetCreateRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
21,944✔
376
                                           SMCreateRsmaReq *pCreate) {
377
  int32_t code = 0;
21,944✔
378
  SSdb   *pSdb = pMnode->pSdb;
21,944✔
379
  SVgObj *pVgroup = NULL;
21,944✔
380
  void   *pIter = NULL;
21,944✔
381

382
  SName name = {0};
21,944✔
383
  if ((code = tNameFromString(&name, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
21,944✔
384
    return code;
×
385
  }
386
  tstrncpy(pCreate->tbFName, (char *)tNameGetTableName(&name), sizeof(pCreate->tbFName));  // convert tbFName to tbName
21,944✔
387

388
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
109,720✔
389
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
87,776✔
390
      sdbRelease(pSdb, pVgroup);
43,888✔
391
      continue;
43,888✔
392
    }
393

394
    int32_t contLen = 0;
43,888✔
395
    void   *pReq = mndBuildVCreateRsmaReq(pMnode, pVgroup, pStb, pObj, pCreate, &contLen);
43,888✔
396
    if (pReq == NULL) {
43,888✔
397
      sdbCancelFetch(pSdb, pIter);
×
398
      sdbRelease(pSdb, pVgroup);
×
399
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
400
      TAOS_RETURN(code);
×
401
    }
402

403
    STransAction action = {0};
43,888✔
404
    action.mTraceId = pTrans->mTraceId;
43,888✔
405
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
43,888✔
406
    action.pCont = pReq;
43,888✔
407
    action.contLen = contLen;
43,888✔
408
    action.msgType = TDMT_VND_CREATE_RSMA;
43,888✔
409
    action.acceptableCode = TSDB_CODE_RSMA_ALREADY_EXISTS;  // check whether the rsma uid exist
43,888✔
410
    action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;         // retry if relative table not exist
43,888✔
411
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
43,888✔
412
      taosMemoryFree(pReq);
×
413
      sdbCancelFetch(pSdb, pIter);
×
414
      sdbRelease(pSdb, pVgroup);
×
415
      TAOS_RETURN(code);
×
416
    }
417
    sdbRelease(pSdb, pVgroup);
43,888✔
418
  }
419

420
  TAOS_RETURN(code);
21,944✔
421
}
422

423
static int32_t mndSetCreateRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
28,696✔
424
  int32_t  code = 0;
28,696✔
425
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
28,696✔
426
  if (pCommitRaw == NULL) {
28,696✔
427
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
428
    if (terrno != 0) code = terrno;
×
429
    TAOS_RETURN(code);
×
430
  }
431
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
28,696✔
432
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
28,696✔
433

434
  TAOS_RETURN(code);
28,696✔
435
}
436

437
static int32_t mndSetDropRsmaPrepareLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
10,972✔
438
  int32_t  code = 0;
10,972✔
439
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
10,972✔
440
  if (pRedoRaw == NULL) {
10,972✔
441
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
442
    if (terrno != 0) code = terrno;
×
443
    return -1;
×
444
  }
445
  TAOS_CHECK_RETURN(mndTransAppendPrepareLog(pTrans, pRedoRaw));
10,972✔
446
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
10,972✔
447

448
  return 0;
10,972✔
449
}
450

451
static int32_t mndSetDropRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
21,100✔
452
  int32_t  code = 0;
21,100✔
453
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
21,100✔
454
  if (pCommitRaw == NULL) {
21,100✔
455
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
456
    if (terrno != 0) code = terrno;
×
457
    return -1;
×
458
  }
459
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
21,100✔
460
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
21,100✔
461

462
  return 0;
21,100✔
463
}
464

465
static void *mndBuildVDropRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SRsmaObj *pObj, int32_t *pContLen) {
21,944✔
466
  int32_t       code = 0, lino = 0;
21,944✔
467
  SMsgHead     *pHead = NULL;
21,944✔
468
  SVDropRsmaReq req = {0};
21,944✔
469

470
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
21,944✔
471
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
21,944✔
472
  req.tbType = pObj->tbType;
21,944✔
473
  req.uid = pObj->uid;
21,944✔
474
  req.tbUid = pObj->tbUid;
21,944✔
475

476
  int32_t contLen = tSerializeSVDropRsmaReq(NULL, 0, &req);
21,944✔
477
  TAOS_CHECK_EXIT(contLen);
21,944✔
478
  contLen += sizeof(SMsgHead);
21,944✔
479
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
21,944✔
480
  pHead->contLen = htonl(contLen);
21,944✔
481
  pHead->vgId = htonl(pVgroup->vgId);
21,944✔
482
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
21,944✔
483
  TAOS_CHECK_EXIT(tSerializeSVDropRsmaReq(pBuf, contLen, &req));
21,944✔
484
_exit:
21,944✔
485
  if (code < 0) {
21,944✔
486
    taosMemoryFreeClear(pHead);
×
487
    terrno = code;
×
488
    *pContLen = 0;
×
489
    return NULL;
×
490
  }
491
  *pContLen = contLen;
21,944✔
492
  return pHead;
21,944✔
493
}
494

495
static int32_t mndSetDropRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SRsmaObj *pSma) {
10,972✔
496
  int32_t code = 0;
10,972✔
497
  SSdb   *pSdb = pMnode->pSdb;
10,972✔
498
  SVgObj *pVgroup = NULL;
10,972✔
499
  void   *pIter = NULL;
10,972✔
500

501
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
54,860✔
502
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
43,888✔
503
      sdbRelease(pSdb, pVgroup);
21,944✔
504
      continue;
21,944✔
505
    }
506

507
    int32_t contLen = 0;
21,944✔
508
    void   *pReq = mndBuildVDropRsmaReq(pMnode, pVgroup, pSma, &contLen);
21,944✔
509
    if (pReq == NULL) {
21,944✔
510
      sdbCancelFetch(pSdb, pIter);
×
511
      sdbRelease(pSdb, pVgroup);
×
512
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
513
      TAOS_RETURN(code);
×
514
    }
515

516
    STransAction action = {0};
21,944✔
517
    action.mTraceId = pTrans->mTraceId;
21,944✔
518
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
21,944✔
519
    action.pCont = pReq;
21,944✔
520
    action.contLen = contLen;
21,944✔
521
    action.msgType = TDMT_VND_DROP_RSMA;
21,944✔
522
    action.acceptableCode = TSDB_CODE_RSMA_NOT_EXIST;
21,944✔
523
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
21,944✔
524
      taosMemoryFree(pReq);
×
525
      sdbCancelFetch(pSdb, pIter);
×
526
      sdbRelease(pSdb, pVgroup);
×
527
      TAOS_RETURN(code);
×
528
    }
529
    sdbRelease(pSdb, pVgroup);
21,944✔
530
  }
531
  TAOS_RETURN(code);
10,972✔
532
}
533

534
static int32_t mndDropRsma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SRsmaObj *pObj) {
10,972✔
535
  int32_t code = 0, lino = 0;
10,972✔
536

537
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-rsma");
10,972✔
538
  if (pTrans == NULL) {
10,972✔
539
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
540
    if (terrno != 0) code = terrno;
×
541
    goto _exit;
×
542
  }
543

544
  mInfo("trans:%d start to drop rsma:%s", pTrans->id, pObj->name);
10,972✔
545

546
  mndTransSetDbName(pTrans, pDb->name, pObj->name);
10,972✔
547
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
10,972✔
548
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
10,972✔
549

550
  mndTransSetOper(pTrans, MND_OPER_DROP_RSMA);
10,972✔
551
  TAOS_CHECK_EXIT(mndSetDropRsmaPrepareLogs(pMnode, pTrans, pObj));
10,972✔
552
  TAOS_CHECK_EXIT(mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj));
10,972✔
553
  TAOS_CHECK_EXIT(mndSetDropRsmaRedoActions(pMnode, pTrans, pDb, pObj));
10,972✔
554

555
  // int32_t rspLen = 0;
556
  // void   *pRsp = NULL;
557
  // TAOS_CHECK_EXIT(mndBuildDropRsmaRsp(pObj, &rspLen, &pRsp, false));
558
  // mndTransSetRpcRsp(pTrans, pRsp, rspLen);
559

560
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
10,972✔
561
_exit:
10,972✔
562
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
10,972✔
563
    mError("rsma:%s, failed to drop at line:%d since %s", pObj->name, lino, tstrerror(code));
×
564
  }
565
  mndTransDrop(pTrans);
10,972✔
566
  TAOS_RETURN(code);
10,972✔
567
}
568
#endif
569
static int32_t mndProcessDropRsmaReq(SRpcMsg *pReq) {
10,972✔
570
  SMnode *pMnode = pReq->info.node;
10,972✔
571
  int32_t code = 0, lino = 0;
10,972✔
572
#ifdef TD_ENTERPRISE
573
  SDbObj       *pDb = NULL;
10,972✔
574
  SRsmaObj     *pObj = NULL;
10,972✔
575
  SUserObj     *pUser = NULL;
10,972✔
576
  SMDropRsmaReq dropReq = {0};
10,972✔
577
  int64_t       tss = taosGetTimestampMs();
10,972✔
578

579
  TAOS_CHECK_GOTO(tDeserializeSMDropRsmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _exit);
10,972✔
580

581
  mInfo("rsma:%s, start to drop", dropReq.name);
10,972✔
582

583
  pObj = mndAcquireRsma(pMnode, dropReq.name);
10,972✔
584
  if (pObj == NULL) {
10,972✔
585
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
586
    if (terrno != 0) code = terrno;
×
587
    if (dropReq.igNotExists) {
×
588
      code = 0;  // mndBuildDropMountRsp(pObj, &pReq->info.rspLen, &pReq->info.rsp, true);
×
589
    }
590
    goto _exit;
×
591
  }
592

593
  SName name = {0};
10,972✔
594
  TAOS_CHECK_EXIT(tNameFromString(&name, pObj->dbFName, T_NAME_ACCT | T_NAME_DB));
10,972✔
595
  if (!(pDb = mndAcquireDb(pMnode, pObj->dbFName))) {
10,972✔
596
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
597
  }
598

599
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
10,972✔
600

601
  // TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb), NULL, _exit);
602
  TAOS_CHECK_EXIT(
10,972✔
603
      mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_DROP, PRIV_OBJ_RSMA, pObj->ownerId, pObj->dbFName, pObj->name));
604

605
  code = mndDropRsma(pMnode, pReq, pDb, pObj);
10,972✔
606
  if (code == TSDB_CODE_SUCCESS) {
10,972✔
607
    code = TSDB_CODE_ACTION_IN_PROGRESS;
10,972✔
608
  }
609

610
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
10,972✔
611
    int64_t tse = taosGetTimestampMs();
10,972✔
612
    double  duration = (double)(tse - tss);
10,972✔
613
    duration = duration / 1000;
10,972✔
614
    auditRecord(pReq, pMnode->clusterId, "dropRsma", dropReq.name, "", "", 0, duration, 0);
10,972✔
615
  }
616
_exit:
10,972✔
617
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
10,972✔
618
    mError("rsma:%s, failed at line %d to drop since %s", dropReq.name, lino, tstrerror(code));
×
619
  }
620

621
  mndReleaseDb(pMnode, pDb);
10,972✔
622
  mndReleaseRsma(pMnode, pObj);
10,972✔
623
  mndReleaseUser(pMnode, pUser);
10,972✔
624
#endif
625
  TAOS_RETURN(code);
10,972✔
626
}
627
#ifdef TD_ENTERPRISE
628
static int32_t mndCreateRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
21,944✔
629
                             SMCreateRsmaReq *pCreate) {
630
  int32_t  code = 0, lino = 0;
21,944✔
631
  SRsmaObj obj = {0};
21,944✔
632
  STrans  *pTrans = NULL;
21,944✔
633

634
  (void)snprintf(obj.name, TSDB_TABLE_NAME_LEN, "%s", pCreate->name);
21,944✔
635
  (void)snprintf(obj.dbFName, TSDB_DB_FNAME_LEN, "%s", pDb->name);
21,944✔
636

637
  const char *tbName = strrchr(pCreate->tbFName, '.');
21,944✔
638
  (void)snprintf(obj.tbName, TSDB_TABLE_NAME_LEN, "%s", tbName ? tbName + 1 : pCreate->tbFName);
21,944✔
639
  (void)snprintf(obj.createUser, TSDB_USER_LEN, "%s", pUser->user);
21,944✔
640
  obj.ownerId = pUser->uid;
21,944✔
641
  obj.createdTime = taosGetTimestampMs();
21,944✔
642
  obj.updateTime = obj.createdTime;
21,944✔
643
  obj.uid = mndGenerateUid(obj.name, strlen(obj.name));
21,944✔
644
  obj.tbUid = pCreate->tbUid;
21,944✔
645
  obj.dbUid = pDb->uid;
21,944✔
646
  obj.interval[0] = pCreate->interval[0];
21,944✔
647
  obj.interval[1] = pCreate->interval[1];
21,944✔
648
  obj.version = 1;
21,944✔
649
  obj.tbType = pCreate->tbType;  // ETableType: 1 stable. Only super table supported currently.
21,944✔
650
  obj.intervalUnit = pCreate->intervalUnit;
21,944✔
651
  obj.nFuncs = pCreate->nFuncs;
21,944✔
652
  if (obj.nFuncs > 0) {
21,944✔
653
    TSDB_CHECK_NULL((obj.funcColIds = taosMemoryCalloc(obj.nFuncs, sizeof(col_id_t))), code, lino, _exit, terrno);
21,944✔
654
    TSDB_CHECK_NULL((obj.funcIds = taosMemoryCalloc(obj.nFuncs, sizeof(func_id_t))), code, lino, _exit, terrno);
21,944✔
655
    for (int16_t i = 0; i < obj.nFuncs; ++i) {
101,280✔
656
      obj.funcColIds[i] = pCreate->funcColIds[i];
79,336✔
657
      obj.funcIds[i] = pCreate->funcIds[i];
79,336✔
658
    }
659
  }
660

661
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-rsma")),
21,944✔
662
                  code, lino, _exit, terrno);
663
  mInfo("trans:%d, used to create rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
21,944✔
664

665
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
21,944✔
666
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
21,944✔
667
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
21,944✔
668

669
  mndTransSetOper(pTrans, MND_OPER_CREATE_RSMA);
21,944✔
670
  TAOS_CHECK_EXIT(mndSetCreateRsmaPrepareActions(pMnode, pTrans, &obj));
21,944✔
671
  TAOS_CHECK_EXIT(mndSetCreateRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pCreate));
21,944✔
672
  TAOS_CHECK_EXIT(mndSetCreateRsmaCommitLogs(pMnode, pTrans, &obj));
21,944✔
673
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
21,944✔
674
_exit:
21,944✔
675
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
21,944✔
676
    mError("rsma:%s, failed at line %d to create rsma, since %s", obj.name, lino, tstrerror(code));
×
677
  }
678
  mndTransDrop(pTrans);
21,944✔
679
  mndRsmaFreeObj(&obj);
21,944✔
680
  TAOS_RETURN(code);
21,944✔
681
}
682

683
static int32_t mndCheckCreateRsmaReq(SMCreateRsmaReq *pCreate) {
30,384✔
684
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
30,384✔
685
  if (pCreate->name[0] == 0) goto _exit;
30,384✔
686
  if (pCreate->tbFName[0] == 0) goto _exit;
30,384✔
687
  if (pCreate->igExists < 0 || pCreate->igExists > 1) goto _exit;
30,384✔
688
  if (pCreate->intervalUnit < 0) goto _exit;
30,384✔
689
  if (pCreate->interval[0] < 0) goto _exit;
30,384✔
690
  if (pCreate->interval[1] < 0) goto _exit;
30,384✔
691
  if (pCreate->interval[0] == 0 && pCreate->interval[1] == 0) goto _exit;
30,384✔
692

693
  SName fname = {0};
30,384✔
694
  if ((code = tNameFromString(&fname, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) < 0) goto _exit;
30,384✔
695
  if (*(char *)tNameGetTableName(&fname) == 0) goto _exit;
30,384✔
696
  code = 0;
30,384✔
697
_exit:
30,384✔
698
  TAOS_RETURN(code);
30,384✔
699
}
700

701
static int32_t mndCheckRsmaConflicts(SMnode *pMnode, SDbObj *pDbObj, SMCreateRsmaReq *pCreate) {
26,164✔
702
  void     *pIter = NULL;
26,164✔
703
  SSdb     *pSdb = pMnode->pSdb;
26,164✔
704
  SRsmaObj *pObj = NULL;
26,164✔
705
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
54,016✔
706
    if (pObj->tbUid == pCreate->tbUid && pObj->dbUid == pDbObj->uid) {
32,072✔
707
      sdbCancelFetch(pSdb, (pIter));
4,220✔
708
      sdbRelease(pSdb, pObj);
4,220✔
709
      mError("rsma:%s, conflict with existing rsma %s on same table %s.%s:%" PRIi64, pCreate->name, pObj->name,
4,220✔
710
             pObj->dbFName, pObj->tbName, pObj->tbUid);
711
      return TSDB_CODE_MND_RSMA_EXIST_IN_TABLE;
4,220✔
712
    }
713
    sdbRelease(pSdb, pObj);
27,852✔
714
  }
715
  return 0;
21,944✔
716
}
717
#endif
718
static int32_t mndProcessCreateRsmaReq(SRpcMsg *pReq) {
30,384✔
719
  int32_t code = 0, lino = 0;
30,384✔
720
#ifdef TD_ENTERPRISE
721
  SMnode         *pMnode = pReq->info.node;
30,384✔
722
  SDbObj         *pDb = NULL;
30,384✔
723
  SStbObj        *pStb = NULL;
30,384✔
724
  SRsmaObj       *pSma = NULL;
30,384✔
725
  SUserObj       *pUser = NULL;
30,384✔
726
  int64_t         mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
30,384✔
727
  SMCreateRsmaReq createReq = {0};
30,384✔
728
  int64_t         tss = taosGetTimestampMs();
30,384✔
729

730
  TAOS_CHECK_EXIT(tDeserializeSMCreateRsmaReq(pReq->pCont, pReq->contLen, &createReq));
30,384✔
731

732
  mInfo("start to create rsma: %s", createReq.name);
30,384✔
733
  TAOS_CHECK_EXIT(mndCheckCreateRsmaReq(&createReq));
30,384✔
734

735
  if ((pSma = mndAcquireRsma(pMnode, createReq.name))) {
30,384✔
736
    if (createReq.igExists) {
4,220✔
737
      mInfo("rsma:%s, already exist, ignore exist is set", createReq.name);
×
738
      code = 0;
×
739
      goto _exit;
×
740
    } else {
741
      TAOS_CHECK_EXIT(TSDB_CODE_RSMA_ALREADY_EXISTS);
4,220✔
742
    }
743
  } else {
744
    if ((code = terrno) == TSDB_CODE_RSMA_NOT_EXIST) {
26,164✔
745
      // continue
746
    } else {  // TSDB_CODE_MND_RSMA_IN_CREATING | TSDB_CODE_MND_RSMA_IN_DROPPING | TSDB_CODE_APP_ERROR
747
      goto _exit;
×
748
    }
749
  }
750

751
  SName name = {0};
26,164✔
752
  TAOS_CHECK_EXIT(tNameFromString(&name, createReq.tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
26,164✔
753
  char db[TSDB_TABLE_FNAME_LEN] = {0};
26,164✔
754
  (void)tNameGetFullDbName(&name, db);
26,164✔
755

756
  pDb = mndAcquireDb(pMnode, db);
26,164✔
757
  if (pDb == NULL) {
26,164✔
758
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
759
  }
760

761
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
26,164✔
762

763
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_READ_DB, pDb));
764
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb));
765

766
  // already check select table/insert table/create rsma privileges in parser
767
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pDb));
26,164✔
768

769
  pStb = mndAcquireStb(pMnode, createReq.tbFName);
26,164✔
770
  if (pStb == NULL) {
26,164✔
771
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
772
  }
773

774
  TAOS_CHECK_EXIT(mndCheckRsmaConflicts(pMnode, pDb, &createReq));
26,164✔
775

776
  TAOS_CHECK_EXIT(mndCreateRsma(pMnode, pReq, pUser, pDb, pStb, &createReq));
21,944✔
777

778
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
21,944✔
779

780
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
21,944✔
781
    int64_t tse = taosGetTimestampMs();
21,944✔
782
    double  duration = (double)(tse - tss);
21,944✔
783
    duration = duration / 1000;
21,944✔
784
    auditRecord(pReq, pMnode->clusterId, "createRsma", createReq.name, createReq.tbFName, "", 0, duration, 0);
21,944✔
785
  }
786
_exit:
30,384✔
787
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
30,384✔
788
    mError("rsma:%s, failed at line %d to create since %s", createReq.name, lino, tstrerror(code));
8,440✔
789
  }
790
  if (pSma) mndReleaseRsma(pMnode, pSma);
30,384✔
791
  if (pStb) mndReleaseStb(pMnode, pStb);
30,384✔
792
  if (pDb) mndReleaseDb(pMnode, pDb);
30,384✔
793
  if (pUser) mndReleaseUser(pMnode, pUser);
30,384✔
794
  tFreeSMCreateRsmaReq(&createReq);
30,384✔
795
#endif
796
  TAOS_RETURN(code);
30,384✔
797
}
798

799
#ifdef TD_ENTERPRISE
800
static int32_t mndCheckAlterRsmaReq(SMAlterRsmaReq *pReq) {
6,752✔
801
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
6,752✔
802
  if (pReq->name[0] == 0) goto _exit;
6,752✔
803
  if (pReq->igNotExists < 0 || pReq->igNotExists > 1) goto _exit;
6,752✔
804

805
  code = 0;
6,752✔
806
_exit:
6,752✔
807
  TAOS_RETURN(code);
6,752✔
808
}
809

810
static int32_t mndSetAlterRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
6,752✔
811
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
6,752✔
812
  if (pDbRaw == NULL) return -1;
6,752✔
813

814
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
6,752✔
815
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_READY) != 0) return -1;
6,752✔
816
  return 0;
6,752✔
817
}
818

819
static int32_t mndSetAlterRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
6,752✔
820
  return mndSetCreateRsmaCommitLogs(pMnode, pTrans, pSma);
6,752✔
821
}
822

823
static void *mndBuildVAlterRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
13,504✔
824
                                   SMAlterRsmaReq *pAlter, int32_t *pContLen) {
825
  int32_t        code = 0, lino = 0;
13,504✔
826
  SMsgHead      *pHead = NULL;
13,504✔
827
  SVAlterRsmaReq req = {0};
13,504✔
828
  req.alterType = pAlter->alterType;
13,504✔
829
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
13,504✔
830
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
13,504✔
831
  req.tbType = pObj->tbType;
13,504✔
832
  req.intervalUnit = pObj->intervalUnit;
13,504✔
833
  req.interval[0] = pObj->interval[0];
13,504✔
834
  req.interval[1] = pObj->interval[1];
13,504✔
835
  req.tbUid = pObj->tbUid;
13,504✔
836
  req.uid = pObj->uid;
13,504✔
837
  req.nFuncs = pObj->nFuncs;
13,504✔
838
  req.funcColIds = pObj->funcColIds;
13,504✔
839
  req.funcIds = pObj->funcIds;
13,504✔
840

841
  int32_t contLen = tSerializeSVAlterRsmaReq(NULL, 0, &req);
13,504✔
842
  TAOS_CHECK_EXIT(contLen);
13,504✔
843
  contLen += sizeof(SMsgHead);
13,504✔
844
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
13,504✔
845
  pHead->contLen = htonl(contLen);
13,504✔
846
  pHead->vgId = htonl(pVgroup->vgId);
13,504✔
847
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
13,504✔
848
  TAOS_CHECK_EXIT(tSerializeSVAlterRsmaReq(pBuf, contLen, &req));
13,504✔
849
_exit:
13,504✔
850
  if (code < 0) {
13,504✔
851
    taosMemoryFreeClear(pHead);
×
852
    terrno = code;
×
853
    *pContLen = 0;
×
854
    return NULL;
×
855
  }
856
  *pContLen = contLen;
13,504✔
857
  return pHead;
13,504✔
858
}
859

860
static int32_t mndSetAlterRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
6,752✔
861
                                          SMAlterRsmaReq *pAlter) {
862
  int32_t code = 0;
6,752✔
863
  SSdb   *pSdb = pMnode->pSdb;
6,752✔
864
  SVgObj *pVgroup = NULL;
6,752✔
865
  void   *pIter = NULL;
6,752✔
866

867
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
33,760✔
868
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
27,008✔
869
      sdbRelease(pSdb, pVgroup);
13,504✔
870
      continue;
13,504✔
871
    }
872

873
    int32_t contLen = 0;
13,504✔
874
    void   *pReq = mndBuildVAlterRsmaReq(pMnode, pVgroup, pStb, pObj, pAlter, &contLen);
13,504✔
875
    if (pReq == NULL) {
13,504✔
876
      sdbCancelFetch(pSdb, pIter);
×
877
      sdbRelease(pSdb, pVgroup);
×
878
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
879
      TAOS_RETURN(code);
×
880
    }
881

882
    STransAction action = {0};
13,504✔
883
    action.mTraceId = pTrans->mTraceId;
13,504✔
884
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
13,504✔
885
    action.pCont = pReq;
13,504✔
886
    action.contLen = contLen;
13,504✔
887
    action.msgType = TDMT_VND_ALTER_RSMA;
13,504✔
888
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
13,504✔
889
      taosMemoryFree(pReq);
×
890
      sdbCancelFetch(pSdb, pIter);
×
891
      sdbRelease(pSdb, pVgroup);
×
892
      TAOS_RETURN(code);
×
893
    }
894
    sdbRelease(pSdb, pVgroup);
13,504✔
895
  }
896

897
  TAOS_RETURN(code);
6,752✔
898
}
899

900
static int32_t mndAlterRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
6,752✔
901
                            SMAlterRsmaReq *pAlter, SRsmaObj *pOld) {
902
  int32_t  code = 0, lino = 0;
6,752✔
903
  STrans  *pTrans = NULL;
6,752✔
904
  SRsmaObj obj = *pOld;
6,752✔
905

906
  obj.updateTime = taosGetTimestampMs();
6,752✔
907
  ++obj.version;
6,752✔
908
  if (pAlter->alterType == TSDB_ALTER_RSMA_FUNCTION) {
6,752✔
909
    obj.nFuncs = pOld->nFuncs + pAlter->nFuncs;
6,752✔
910
    obj.funcColIds = taosMemoryMalloc(obj.nFuncs * sizeof(col_id_t));
6,752✔
911
    obj.funcIds = taosMemoryMalloc(obj.nFuncs * sizeof(func_id_t));
6,752✔
912
    if (obj.funcColIds == NULL || obj.funcIds == NULL) {
6,752✔
913
      TAOS_CHECK_EXIT(terrno);
×
914
    }
915
    int32_t n = 0, i = 0, j = 0;
6,752✔
916
    while (i < pOld->nFuncs && j < pAlter->nFuncs) {
36,292✔
917
      if (pOld->funcColIds[i] < pAlter->funcColIds[j]) {
29,540✔
918
        obj.funcColIds[n] = pOld->funcColIds[i];
25,320✔
919
        obj.funcIds[n++] = pOld->funcIds[i++];
25,320✔
920
      } else if (pOld->funcColIds[i] > pAlter->funcColIds[j]) {
4,220✔
921
        obj.funcColIds[n] = pAlter->funcColIds[j];
4,220✔
922
        obj.funcIds[n++] = pAlter->funcIds[j++];
4,220✔
923
      } else {
924
        mError("rsma:%s, conflict function on column id:%d", pOld->name, pAlter->funcColIds[j]);
×
925
        TAOS_CHECK_EXIT(TSDB_CODE_MND_RSMA_FUNC_CONFLICT);
×
926
      }
927
    }
928
    if (i < pOld->nFuncs) {
6,752✔
929
      while (i < pOld->nFuncs) {
8,440✔
930
        obj.funcColIds[n] = pOld->funcColIds[i];
4,220✔
931
        obj.funcIds[n++] = pOld->funcIds[i++];
4,220✔
932
      }
933
    } else if (j < pAlter->nFuncs) {
2,532✔
934
      while (j < pAlter->nFuncs) {
5,064✔
935
        obj.funcColIds[n] = pAlter->funcColIds[j];
2,532✔
936
        obj.funcIds[n++] = pAlter->funcIds[j++];
2,532✔
937
      }
938
    }
939
  } else {
940
    TAOS_CHECK_EXIT(TSDB_CODE_OPS_NOT_SUPPORT);
×
941
  }
942

943
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-rsma")), code,
6,752✔
944
                  lino, _exit, terrno);
945
  mInfo("trans:%d, used to alter rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
6,752✔
946

947
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
6,752✔
948
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
6,752✔
949
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
6,752✔
950

951
  mndTransSetOper(pTrans, MND_OPER_ALTER_RSMA);
6,752✔
952
  TAOS_CHECK_EXIT(mndSetAlterRsmaPrepareActions(pMnode, pTrans, &obj));
6,752✔
953
  TAOS_CHECK_EXIT(mndSetAlterRsmaCommitLogs(pMnode, pTrans, &obj));
6,752✔
954
  TAOS_CHECK_EXIT(mndSetAlterRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pAlter));
6,752✔
955

956
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
6,752✔
957
_exit:
6,752✔
958
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
6,752✔
959
    mError("rsma:%s, failed at line %d to alter rsma, since %s", obj.name, lino, tstrerror(code));
×
960
  }
961
  mndTransDrop(pTrans);
6,752✔
962
  mndRsmaFreeObj(&obj);
6,752✔
963
  TAOS_RETURN(code);
6,752✔
964
}
965
#endif
966
static int32_t mndProcessAlterRsmaReq(SRpcMsg *pReq) {
6,752✔
967
  int32_t code = 0, lino = 0;
6,752✔
968
#ifdef TD_ENTERPRISE
969
  SMnode        *pMnode = pReq->info.node;
6,752✔
970
  SDbObj        *pDb = NULL;
6,752✔
971
  SStbObj       *pStb = NULL;
6,752✔
972
  SRsmaObj      *pObj = NULL;
6,752✔
973
  SUserObj      *pUser = NULL;
6,752✔
974
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
6,752✔
975
  SMAlterRsmaReq req = {0};
6,752✔
976
  char           tbFName[TSDB_TABLE_FNAME_LEN] = "\0";
6,752✔
977
  int64_t        tss = taosGetTimestampMs();
6,752✔
978

979
  TAOS_CHECK_EXIT(tDeserializeSMAlterRsmaReq(pReq->pCont, pReq->contLen, &req));
6,752✔
980

981
  mInfo("start to alter rsma: %s", req.name);
6,752✔
982
  TAOS_CHECK_EXIT(mndCheckAlterRsmaReq(&req));
6,752✔
983

984
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
6,752✔
985
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
986
    if (terrno != 0) code = terrno;
×
987
    if (req.igNotExists) {
×
988
      code = 0;
×
989
    }
990
    goto _exit;
×
991
  }
992

993
  if (!(pDb = mndAcquireDb(pMnode, pObj->dbFName))) {
6,752✔
994
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
995
  }
996

997
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
6,752✔
998

999
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_READ_DB, pDb));
1000
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb));
1001
  TAOS_CHECK_EXIT(
6,752✔
1002
      mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_ALTER, PRIV_OBJ_RSMA, pObj->ownerId, pObj->dbFName, pObj->name));
1003

1004
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
6,752✔
1005

1006
  pStb = mndAcquireStb(pMnode, tbFName);
6,752✔
1007
  if (pStb == NULL) {
6,752✔
1008
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
1009
  }
1010

1011
  TAOS_CHECK_EXIT(mndAlterRsma(pMnode, pReq, pUser, pDb, pStb, &req, pObj));
6,752✔
1012

1013
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
6,752✔
1014

1015
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
6,752✔
1016
    char alterType[32] = "\0";
6,752✔
1017
    (void)snprintf(alterType, sizeof(alterType), "alterType:%" PRIi8, req.alterType);
6,752✔
1018
    int64_t tse = taosGetTimestampMs();
6,752✔
1019
    double  duration = (double)(tse - tss);
6,752✔
1020
    duration = duration / 1000;
6,752✔
1021
    auditRecord(pReq, pMnode->clusterId, "alterRsma", req.name, tbFName, alterType, 0, duration, 0);
6,752✔
1022
  }
1023
_exit:
6,752✔
1024
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
6,752✔
1025
    mError("rsma:%s, failed at line %d to alter since %s", req.name, lino, tstrerror(code));
×
1026
  }
1027
  if (pObj) mndReleaseRsma(pMnode, pObj);
6,752✔
1028
  if (pStb) mndReleaseStb(pMnode, pStb);
6,752✔
1029
  if (pDb) mndReleaseDb(pMnode, pDb);
6,752✔
1030
  if (pUser) mndReleaseUser(pMnode, pUser);
6,752✔
1031
  tFreeSMAlterRsmaReq(&req);
6,752✔
1032
#endif
1033
  TAOS_RETURN(code);
6,752✔
1034
}
1035
#ifdef TD_ENTERPRISE
1036
static int32_t mndFillRsmaInfo(SRsmaObj *pObj, SStbObj *pStb, SRsmaInfoRsp *pRsp, bool withColName) {
19,412✔
1037
  int32_t code = 0, lino = 0;
19,412✔
1038
  pRsp->id = pObj->uid;
19,412✔
1039
  (void)snprintf(pRsp->name, sizeof(pRsp->name), "%s", pObj->name);
19,412✔
1040
  (void)snprintf(pRsp->tbFName, sizeof(pRsp->tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
19,412✔
1041
  pRsp->ownerId = pObj->ownerId;
19,412✔
1042
  pRsp->version = pObj->version;
19,412✔
1043
  pRsp->tbType = pObj->tbType;
19,412✔
1044
  pRsp->intervalUnit = pObj->intervalUnit;
19,412✔
1045
  pRsp->nFuncs = pObj->nFuncs;
19,412✔
1046
  pRsp->interval[0] = pObj->interval[0];
19,412✔
1047
  pRsp->interval[1] = pObj->interval[1];
19,412✔
1048
  if (pRsp->nFuncs > 0) {
19,412✔
1049
    pRsp->funcColIds = pObj->funcColIds;  // shallow copy, no need to free
19,412✔
1050
    pRsp->funcIds = pObj->funcIds;        // shallow copy, no need to free
19,412✔
1051
    if (withColName) {
19,412✔
1052
      pRsp->colNames = taosArrayInit(pRsp->nFuncs, sizeof(char *));
19,412✔
1053
      if (pRsp->colNames == NULL) {
19,412✔
1054
        TAOS_CHECK_EXIT(terrno);
×
1055
      }
1056
      pRsp->nColNames = pRsp->nFuncs;
19,412✔
1057
      int16_t i = 0, j = 0;
19,412✔
1058
      for (; i < pRsp->nFuncs; ++i) {
114,784✔
1059
        bool found = false;
95,372✔
1060
        for (; j < pStb->numOfColumns;) {
204,248✔
1061
          if (pStb->pColumns[j].colId == pRsp->funcColIds[i]) {
204,248✔
1062
            found = true;
95,372✔
1063
            break;
95,372✔
1064
          } else if (pStb->pColumns[j].colId < pRsp->funcColIds[i]) {
108,876✔
1065
            ++j;
108,876✔
1066
          } else {
1067
            break;
×
1068
          }
1069
        }
1070
        if (found) {
95,372✔
1071
          SSchema *pCol = pStb->pColumns + j;
95,372✔
1072
          char    *colName = taosStrdup(pCol->name);
95,372✔
1073
          if (colName == NULL) {
95,372✔
1074
            TAOS_CHECK_EXIT(terrno);
×
1075
          }
1076
          if (!taosArrayPush(pRsp->colNames, &colName)) {
190,744✔
1077
            taosMemoryFree(colName);
×
1078
            TAOS_CHECK_EXIT(terrno);
×
1079
          }
1080
        } else {
1081
          TAOS_CHECK_EXIT(TSDB_CODE_MND_COLUMN_NOT_EXIST);
×
1082
        }
1083
      }
1084
    }
1085
  }
1086
_exit:
19,412✔
1087
  if (code != 0) {
19,412✔
1088
    mError("rsma:%s, failed at line %d to get rsma info since %s", pObj->name, lino, tstrerror(code));
×
1089
  }
1090
  TAOS_RETURN(code);
19,412✔
1091
}
1092
#endif
1093
static int32_t mndProcessGetRsmaReq(SRpcMsg *pReq) {
25,320✔
1094
#ifdef TD_ENTERPRISE
1095
  int32_t      code = 0, lino = 0;
25,320✔
1096
  SMnode      *pMnode = pReq->info.node;
25,320✔
1097
  SRsmaInfoReq req = {0};
25,320✔
1098
  SRsmaInfoRsp rsp = {0};
25,320✔
1099
  SRsmaObj    *pObj = NULL;
25,320✔
1100
  SStbObj     *pStb = NULL;
25,320✔
1101
  void        *pRsp = NULL;
25,320✔
1102
  int32_t      contLen = 0;
25,320✔
1103

1104
  TAOS_CHECK_EXIT(tDeserializeRsmaInfoReq(pReq->pCont, pReq->contLen, &req));
25,320✔
1105

1106
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
25,320✔
1107
    TAOS_CHECK_EXIT(terrno);
5,908✔
1108
  }
1109

1110
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
19,412✔
1111
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
19,412✔
1112

1113
  if ((pStb = mndAcquireStb(pMnode, tbFName)) == NULL) {
19,412✔
1114
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
1115
  }
1116

1117
  TAOS_CHECK_EXIT(mndFillRsmaInfo(pObj, pStb, &rsp, req.withColName));
19,412✔
1118

1119
  if ((contLen = tSerializeRsmaInfoRsp(NULL, 0, &rsp)) < 0) {
19,412✔
1120
    TAOS_CHECK_EXIT(contLen);
×
1121
  }
1122
  if (!(pRsp = rpcMallocCont(contLen))) {
19,412✔
1123
    TAOS_CHECK_EXIT(terrno);
×
1124
  }
1125
  if ((contLen = tSerializeRsmaInfoRsp(pRsp, contLen, &rsp)) < 0) {
19,412✔
1126
    TAOS_CHECK_EXIT(contLen);
×
1127
  }
1128

1129
  pReq->info.rsp = pRsp;
19,412✔
1130
  pReq->info.rspLen = contLen;
19,412✔
1131

1132
_exit:
25,320✔
1133
  if (code != 0) {
25,320✔
1134
    rpcFreeCont(pRsp);
5,908✔
1135
  }
1136
  if (pObj) mndReleaseRsma(pMnode, pObj);
25,320✔
1137
  if (pStb) mndReleaseStb(pMnode, pStb);
25,320✔
1138
  tFreeRsmaInfoRsp(&rsp, false);
25,320✔
1139
  TAOS_RETURN(code);
25,320✔
1140
#else
1141
  return TSDB_CODE_OPS_NOT_SUPPORT;
1142
#endif
1143
}
1144
#ifdef TD_ENTERPRISE
1145
static void mndRetrieveRsmaFuncList(SMnode *pMnode, SRsmaObj *pObj, char *buf, int32_t bufLen) {
27,008✔
1146
  SSdb    *pSdb = pMnode->pSdb;
27,008✔
1147
  int32_t  numOfRows = 0;
27,008✔
1148
  SStbObj *pStb = NULL;
27,008✔
1149
  char    *qBuf = POINTER_SHIFT(buf, VARSTR_HEADER_SIZE);
27,008✔
1150
  int32_t  qBufLen = bufLen - VARSTR_HEADER_SIZE;
27,008✔
1151

1152
  qBuf[0] = 0;
27,008✔
1153
  varDataSetLen(buf, 0);  // initialize to empty string
27,008✔
1154

1155
  if (pObj->nFuncs <= 0) return;
27,008✔
1156

1157
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
27,008✔
1158
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
27,008✔
1159
  pStb = mndAcquireStb(pMnode, tbFName);
27,008✔
1160
  if (pStb == NULL) {
27,008✔
1161
    mWarn("rsma:%s, failed to acquire table %s for function list", pObj->name, tbFName);
×
1162
    return;
×
1163
  }
1164

1165
  SSchema *pColumns = pStb->pColumns;
27,008✔
1166

1167
  int32_t len = 0, j = 0;
27,008✔
1168
  char    colFunc[TSDB_COL_NAME_LEN + TSDB_FUNC_NAME_LEN + 2] = {0};
27,008✔
1169
  for (int32_t i = 0; i < pObj->nFuncs; ++i) {
193,276✔
1170
    col_id_t colId = pObj->funcColIds[i];
166,268✔
1171
    for (; j < pStb->numOfColumns;) {
344,352✔
1172
      if (pColumns[j].colId == colId) {
344,352✔
1173
        int32_t colFuncLen =
1174
            snprintf(colFunc, sizeof(colFunc), "%s(%s),", fmGetFuncName(pObj->funcIds[i]), pColumns[j].name);
166,268✔
1175
        if ((qBufLen - len) > colFuncLen) {
166,268✔
1176
          len += snprintf(qBuf + len, colFuncLen + 1, "%s", colFunc);
166,268✔
1177
        } else {
1178
          goto _exit;
×
1179
        }
1180
        break;
166,268✔
1181
      } else if (pColumns[j].colId > colId) {
178,084✔
1182
        break;
×
1183
      } else {
1184
        ++j;
178,084✔
1185
      }
1186
    }
1187
  }
1188
_exit:
27,008✔
1189
  qBuf[len > 0 ? len - 1 : 0] = 0;  // remove the last ','
27,008✔
1190
  varDataSetLen(buf, len > 0 ? len - 1 : 0);
27,008✔
1191
  mndReleaseStb(pMnode, pStb);
27,008✔
1192
}
1193
#endif
1194
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
18,568✔
1195
  SMnode          *pMnode = pReq->info.node;
18,568✔
1196
  int32_t          code = 0, lino = 0;
18,568✔
1197
  int32_t          numOfRows = 0;
18,568✔
1198
  int32_t          cols = 0;
18,568✔
1199
  char             tmp[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE];
18,568✔
1200
  int32_t          tmpLen = 0;
18,568✔
1201
  int32_t          bufLen = 0;
18,568✔
1202
  char            *pBuf = NULL;
18,568✔
1203
  char            *qBuf = NULL;
18,568✔
1204
  void            *pIter = NULL;
18,568✔
1205
  SSdb            *pSdb = pMnode->pSdb;
18,568✔
1206
  SColumnInfoData *pColInfo = NULL;
18,568✔
1207
  SUserObj        *pUser = NULL;
18,568✔
1208
  char             objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
18,568✔
1209
  bool             showAll = false;
18,568✔
1210

1211
#ifdef TD_ENTERPRISE
1212
  pBuf = tmp;
18,568✔
1213
  bufLen = sizeof(tmp) - VARSTR_HEADER_SIZE;
18,568✔
1214
  if (pShow->numOfRows < 1) {
18,568✔
1215
    TAOS_CHECK_EXIT(mndAcquireUser(pMnode, (RPC_MSG_USER(pReq)), &pUser));
18,568✔
1216
    (void)snprintf(objFName, sizeof(objFName), "%d.*", pUser->acctId);
18,568✔
1217
    int32_t objLevel = privObjGetLevel(PRIV_OBJ_RSMA);
18,568✔
1218
    showAll =
18,568✔
1219
        (0 == mndCheckSysObjPrivilege(pMnode, pUser, RPC_MSG_TOKEN(pReq), PRIV_CM_SHOW, PRIV_OBJ_RSMA, 0, objFName,
18,568✔
1220
                                      objLevel == 0 ? NULL : "*"));  // 1.*.*
1221

1222
    SRsmaObj *pObj = NULL;
18,568✔
1223
    int32_t   index = 0;
18,568✔
1224
    while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
45,576✔
1225
      if (!showAll) {
27,008✔
1226
        if (mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_SHOW, PRIV_OBJ_RSMA, pObj->ownerId, pObj->dbFName,
×
1227
                                     objLevel == 0 ? NULL : pObj->name)) {  // 1.db1.rsma1
×
1228
          sdbRelease(pSdb, pObj);
×
1229
          continue;
×
1230
        }
1231
      }
1232

1233
      cols = 0;
27,008✔
1234
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
27,008✔
1235
      qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
27,008✔
1236
      TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->name));
27,008✔
1237
      varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
27,008✔
1238
      COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
27,008✔
1239

1240
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27,008✔
1241
        COL_DATA_SET_VAL_GOTO((const char *)(&pObj->uid), false, pObj, pIter, _exit);
27,008✔
1242
      }
1243

1244
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27,008✔
1245
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
27,008✔
1246
        const char *db = strchr(pObj->dbFName, '.');
27,008✔
1247
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", db ? db + 1 : pObj->dbFName));
27,008✔
1248
        varDataSetLen(pBuf, strlen(qBuf));
27,008✔
1249
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
27,008✔
1250
      }
1251

1252
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27,008✔
1253
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
27,008✔
1254
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->tbName));
27,008✔
1255
        varDataSetLen(pBuf, strlen(qBuf));
27,008✔
1256
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
27,008✔
1257
      }
1258

1259
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27,008✔
1260
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
27,008✔
1261
        if (pObj->tbType == TSDB_SUPER_TABLE) {
27,008✔
1262
          TAOS_UNUSED(snprintf(qBuf, bufLen, "SUPER_TABLE"));
27,008✔
1263
        } else if (pObj->tbType == TSDB_NORMAL_TABLE) {
×
1264
          TAOS_UNUSED(snprintf(qBuf, bufLen, "NORMAL_TABLE"));
×
1265
        } else if (pObj->tbType == TSDB_CHILD_TABLE) {
×
1266
          TAOS_UNUSED(snprintf(qBuf, bufLen, "CHILD_TABLE"));
×
1267
        } else {
1268
          TAOS_UNUSED(snprintf(qBuf, bufLen, "UNKNOWN"));
×
1269
        }
1270
        varDataSetLen(pBuf, strlen(qBuf));
27,008✔
1271
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
27,008✔
1272
      }
1273

1274
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27,008✔
1275
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->createdTime, false, pObj, pIter, _exit);
27,008✔
1276
      }
1277

1278
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27,008✔
1279
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
27,008✔
1280
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%" PRIi64 "%c", pObj->interval[0], pObj->intervalUnit));
27,008✔
1281
        if (pObj->interval[1] > 0) {
27,008✔
1282
          tmpLen = strlen(qBuf);
27,008✔
1283
          TAOS_UNUSED(
27,008✔
1284
              snprintf(qBuf + tmpLen, bufLen - tmpLen, ",%" PRIi64 "%c", pObj->interval[1], pObj->intervalUnit));
1285
        }
1286
        varDataSetLen(pBuf, strlen(qBuf));
27,008✔
1287
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
27,008✔
1288
      }
1289

1290
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27,008✔
1291
        mndRetrieveRsmaFuncList(pMnode, pObj, pBuf, bufLen);
27,008✔
1292
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
27,008✔
1293
      }
1294

1295
      sdbRelease(pSdb, pObj);
27,008✔
1296
      ++numOfRows;
27,008✔
1297
    }
1298
  }
1299

1300
  pShow->numOfRows += numOfRows;
18,568✔
1301

1302
_exit:
18,568✔
1303
  if (pUser) mndReleaseUser(pMnode, pUser);
18,568✔
1304
  if (code < 0) {
18,568✔
1305
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1306
    TAOS_RETURN(code);
×
1307
  }
1308
#endif
1309
  return numOfRows;
18,568✔
1310
}
1311

1312
static void mndCancelRetrieveRsma(SMnode *pMnode, void *pIter) {
×
1313
  SSdb *pSdb = pMnode->pSdb;
×
1314
  sdbCancelFetchByType(pSdb, pIter, SDB_RSMA);
×
1315
}
×
1316

1317
int32_t mndDropRsmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
704,003✔
1318
  int32_t code = 0;
704,003✔
1319
#ifdef TD_ENTERPRISE
1320
  SSdb     *pSdb = pMnode->pSdb;
704,003✔
1321
  SRsmaObj *pObj = NULL;
704,003✔
1322
  void     *pIter = NULL;
704,003✔
1323

1324
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
713,287✔
1325
    if (pObj->dbUid == pDb->uid) {
9,284✔
1326
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
9,284✔
1327
        sdbCancelFetch(pSdb, pIter);
×
1328
        sdbRelease(pSdb, pObj);
×
1329
        TAOS_RETURN(code);
×
1330
      }
1331
    }
1332
    sdbRelease(pSdb, pObj);
9,284✔
1333
  }
1334
#endif
1335
  TAOS_RETURN(code);
704,003✔
1336
}
1337

1338
int32_t mndDropRsmaByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
479,137✔
1339
  int32_t code = 0;
479,137✔
1340
#ifdef TD_ENTERPRISE
1341
  SSdb     *pSdb = pMnode->pSdb;
479,137✔
1342
  SRsmaObj *pObj = NULL;
479,137✔
1343
  void     *pIter = NULL;
479,137✔
1344

1345
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
479,981✔
1346
    if (pObj->tbUid == pStb->uid && pObj->dbUid == pStb->dbUid) {
844✔
1347
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
844✔
1348
        sdbCancelFetch(pSdb, pIter);
×
1349
        sdbRelease(pSdb, pObj);
×
1350
        TAOS_RETURN(code);
×
1351
      }
1352
    }
1353
    sdbRelease(pSdb, pObj);
844✔
1354
  }
1355
#endif
1356
  TAOS_RETURN(code);
479,137✔
1357
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc