• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4945

30 Jan 2026 06:19AM UTC coverage: 66.87% (+0.02%) from 66.849%
#4945

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1126 of 2018 new or added lines in 72 files covered. (55.8%)

13708 existing lines in 159 files now uncovered.

205277 of 306978 relevant lines covered (66.87%)

126353544.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.49
/source/dnode/mnode/impl/src/mndRsma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndInfoSchema.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndShow.h"
25
#include "mndSma.h"
26
#include "mndStb.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "parser.h"
31
#include "tname.h"
32

33
#define MND_RSMA_VER_NUMBER   1
34
#define MND_RSMA_RESERVE_SIZE 64
35

36
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pSma);
37
static SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw);
38
static int32_t  mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pSma);
39
static int32_t  mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pSpSmatb);
40
static int32_t  mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew);
41
static int32_t  mndProcessCreateRsmaReq(SRpcMsg *pReq);
42
static int32_t  mndProcessDropRsmaReq(SRpcMsg *pReq);
43
static int32_t  mndProcessAlterRsmaReq(SRpcMsg *pReq);
44
static int32_t  mndProcessGetRsmaReq(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelRetrieveRsma(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveRsmaTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelRetrieveRsmaTask(SMnode *pMnode, void *pIter);
50

51
int32_t mndInitRsma(SMnode *pMnode) {
399,553✔
52
  SSdbTable table = {
399,553✔
53
      .sdbType = SDB_RSMA,
54
      .keyType = SDB_KEY_BINARY,
55
      .encodeFp = (SdbEncodeFp)mndRsmaActionEncode,
56
      .decodeFp = (SdbDecodeFp)mndRsmaActionDecode,
57
      .insertFp = (SdbInsertFp)mndRsmaActionInsert,
58
      .updateFp = (SdbUpdateFp)mndRsmaActionUpdate,
59
      .deleteFp = (SdbDeleteFp)mndRsmaActionDelete,
60
  };
61

62
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_RSMA, mndProcessCreateRsmaReq);
399,553✔
63
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_RSMA_RSP, mndTransProcessRsp);
399,553✔
64
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_RSMA, mndProcessDropRsmaReq);
399,553✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_RSMA_RSP, mndTransProcessRsp);
399,553✔
66
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_RSMA, mndProcessAlterRsmaReq);
399,553✔
67
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_RSMA_RSP, mndTransProcessRsp);
399,553✔
68
  mndSetMsgHandle(pMnode, TDMT_MND_GET_RSMA, mndProcessGetRsmaReq);
399,553✔
69
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndRetrieveRsma);
399,553✔
70
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndCancelRetrieveRsma);
399,553✔
71

72
  return sdbSetTable(pMnode->pSdb, table);
399,553✔
73
}
74

75
void mndCleanupRsma(SMnode *pMnode) {}
399,495✔
76

77
void mndRsmaFreeObj(SRsmaObj *pObj) {
61,748✔
78
  if (pObj) {
61,748✔
79
    taosMemoryFreeClear(pObj->funcColIds);
61,748✔
80
    taosMemoryFreeClear(pObj->funcIds);
61,748✔
81
  }
82
}
61,748✔
83

84
static int32_t tSerializeSRsmaObj(void *buf, int32_t bufLen, const SRsmaObj *pObj) {
93,340✔
85
  int32_t  code = 0, lino = 0;
93,340✔
86
  int32_t  tlen = 0;
93,340✔
87
  SEncoder encoder = {0};
93,340✔
88
  tEncoderInit(&encoder, buf, bufLen);
93,340✔
89

90
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
93,340✔
91

92
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->name));
186,680✔
93
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->tbName));
186,680✔
94
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbFName));
186,680✔
95
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->createUser));
186,680✔
96
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->createdTime));
186,680✔
97
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->updateTime));
186,680✔
98
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->uid));
186,680✔
99
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->tbUid));
186,680✔
100
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
186,680✔
101
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[0]));
186,680✔
102
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[1]));
186,680✔
103
  TAOS_CHECK_EXIT(tEncodeU64v(&encoder, pObj->reserved));
186,680✔
104
  TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->version));
186,680✔
105
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->tbType));
186,680✔
106
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->intervalUnit));
186,680✔
107
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nFuncs));
186,680✔
108
  for (int16_t i = 0; i < pObj->nFuncs; ++i) {
618,916✔
109
    TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->funcColIds[i]));
1,051,152✔
110
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->funcIds[i]));
1,051,152✔
111
  }
112
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->ownerId));
186,680✔
113

114
  tEndEncode(&encoder);
93,340✔
115

116
  tlen = encoder.pos;
93,340✔
117
_exit:
93,340✔
118
  tEncoderClear(&encoder);
93,340✔
119
  if (code < 0) {
93,340✔
120
    mError("rsma, %s failed at line %d since %s", __func__, lino, tstrerror(code));
×
121
    TAOS_RETURN(code);
×
122
  }
123

124
  return tlen;
93,340✔
125
}
126

127
static int32_t tDeserializeSRsmaObj(void *buf, int32_t bufLen, SRsmaObj *pObj) {
45,234✔
128
  int32_t  code = 0, lino = 0;
45,234✔
129
  SDecoder decoder = {0};
45,234✔
130
  tDecoderInit(&decoder, buf, bufLen);
45,234✔
131

132
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
45,234✔
133

134
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->name));
45,234✔
135
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->tbName));
45,234✔
136
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbFName));
45,234✔
137
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->createUser));
45,234✔
138
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->createdTime));
90,468✔
139
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->updateTime));
90,468✔
140
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->uid));
90,468✔
141
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->tbUid));
90,468✔
142
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
90,468✔
143
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[0]));
90,468✔
144
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[1]));
90,468✔
145
  TAOS_CHECK_EXIT(tDecodeU64v(&decoder, &pObj->reserved));
90,468✔
146
  TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->version));
90,468✔
147
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->tbType));
90,468✔
148
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->intervalUnit));
90,468✔
149
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nFuncs));
90,468✔
150
  if (pObj->nFuncs > 0) {
45,234✔
151
    if (!(pObj->funcColIds = taosMemoryMalloc(sizeof(col_id_t) * pObj->nFuncs))) {
45,234✔
152
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
153
    }
154
    if (!(pObj->funcIds = taosMemoryMalloc(sizeof(int32_t) * pObj->nFuncs))) {
45,234✔
155
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
156
    }
157
    for (int16_t i = 0; i < pObj->nFuncs; ++i) {
301,560✔
158
      TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->funcColIds[i]));
512,652✔
159
      TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->funcIds[i]));
512,652✔
160
    }
161
  }
162
  if (!tDecodeIsEnd(&decoder)) {
45,234✔
163
    TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->ownerId));
90,468✔
164
  }
165

166
_exit:
45,234✔
167
  tEndDecode(&decoder);
45,234✔
168
  tDecoderClear(&decoder);
45,234✔
169
  if (code < 0) {
45,234✔
170
    mError("rsma, %s failed at line %d since %s, row:%p", __func__, lino, tstrerror(code), pObj);
×
171
  }
172
  TAOS_RETURN(code);
45,234✔
173
}
174

175
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pObj) {
46,670✔
176
  int32_t  code = 0, lino = 0;
46,670✔
177
  void    *buf = NULL;
46,670✔
178
  SSdbRaw *pRaw = NULL;
46,670✔
179
  int32_t  tlen = tSerializeSRsmaObj(NULL, 0, pObj);
46,670✔
180
  if (tlen < 0) {
46,670✔
181
    TAOS_CHECK_EXIT(tlen);
×
182
  }
183

184
  int32_t size = sizeof(int32_t) + tlen;
46,670✔
185
  pRaw = sdbAllocRaw(SDB_RSMA, MND_RSMA_VER_NUMBER, size);
46,670✔
186
  if (pRaw == NULL) {
46,670✔
187
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
188
  }
189

190
  buf = taosMemoryMalloc(tlen);
46,670✔
191
  if (buf == NULL) {
46,670✔
192
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
193
  }
194

195
  tlen = tSerializeSRsmaObj(buf, tlen, pObj);
46,670✔
196
  if (tlen < 0) {
46,670✔
197
    TAOS_CHECK_EXIT(tlen);
×
198
  }
199

200
  int32_t dataPos = 0;
46,670✔
201
  SDB_SET_INT32(pRaw, dataPos, tlen, _exit);
46,670✔
202
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _exit);
46,670✔
203
  SDB_SET_DATALEN(pRaw, dataPos, _exit);
46,670✔
204

205
_exit:
46,670✔
206
  taosMemoryFreeClear(buf);
46,670✔
207
  if (code != TSDB_CODE_SUCCESS) {
46,670✔
208
    terrno = code;
×
209
    mError("rsma, failed at line %d to encode to raw:%p since %s", lino, pRaw, tstrerror(code));
×
210
    sdbFreeRaw(pRaw);
×
211
    return NULL;
×
212
  }
213

214
  mTrace("rsma, encode to raw:%p, row:%p", pRaw, pObj);
46,670✔
215
  return pRaw;
46,670✔
216
}
217

218
SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw) {
45,234✔
219
  int32_t   code = 0, lino = 0;
45,234✔
220
  SSdbRow  *pRow = NULL;
45,234✔
221
  SRsmaObj *pObj = NULL;
45,234✔
222
  void     *buf = NULL;
45,234✔
223

224
  int8_t sver = 0;
45,234✔
225
  TAOS_CHECK_EXIT(sdbGetRawSoftVer(pRaw, &sver));
45,234✔
226

227
  if (sver != MND_RSMA_VER_NUMBER) {
45,234✔
228
    mError("rsma read invalid ver, data ver: %d, curr ver: %d", sver, MND_RSMA_VER_NUMBER);
×
229
    TAOS_CHECK_EXIT(TSDB_CODE_SDB_INVALID_DATA_VER);
×
230
  }
231

232
  if (!(pRow = sdbAllocRow(sizeof(SRsmaObj)))) {
45,234✔
233
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
234
  }
235

236
  if (!(pObj = sdbGetRowObj(pRow))) {
45,234✔
237
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
238
  }
239

240
  int32_t tlen;
45,234✔
241
  int32_t dataPos = 0;
45,234✔
242
  SDB_GET_INT32(pRaw, dataPos, &tlen, _exit);
45,234✔
243
  buf = taosMemoryMalloc(tlen + 1);
45,234✔
244
  if (buf == NULL) {
45,234✔
245
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
246
  }
247
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _exit);
45,234✔
248

249
  TAOS_CHECK_EXIT(tDeserializeSRsmaObj(buf, tlen, pObj));
45,234✔
250

251
  taosInitRWLatch(&pObj->lock);
45,234✔
252

253
_exit:
45,234✔
254
  taosMemoryFreeClear(buf);
45,234✔
255
  if (code != TSDB_CODE_SUCCESS) {
45,234✔
256
    terrno = code;
×
257
    mError("rsma, failed at line %d to decode from raw:%p since %s", lino, pRaw, tstrerror(code));
×
258
    mndRsmaFreeObj(pObj);
×
259
    taosMemoryFreeClear(pRow);
×
260
    return NULL;
×
261
  }
262
  mTrace("rsma, decode from raw:%p, row:%p", pRaw, pObj);
45,234✔
263
  return pRow;
45,234✔
264
}
265

266
static int32_t mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pObj) {
10,770✔
267
  mTrace("rsma:%s, perform insert action, row:%p", pObj->name, pObj);
10,770✔
268
  return 0;
10,770✔
269
}
270

271
static int32_t mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pObj) {
45,234✔
272
  mTrace("rsma:%s, perform delete action, row:%p", pObj->name, pObj);
45,234✔
273
  mndRsmaFreeObj(pObj);
45,234✔
274
  return 0;
45,234✔
275
}
276

277
static int32_t mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew) {
24,412✔
278
  mTrace("rsma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
24,412✔
279
  taosWLockLatch(&pOld->lock);
24,412✔
280
  pOld->updateTime = pNew->updateTime;
24,412✔
281
  pOld->nFuncs = pNew->nFuncs;
24,412✔
282
  pOld->ownerId = pNew->ownerId;
24,412✔
283
  TSWAP(pOld->funcColIds, pNew->funcColIds);
24,412✔
284
  TSWAP(pOld->funcIds, pNew->funcIds);
24,412✔
285
  taosWUnLockLatch(&pOld->lock);
24,412✔
286
  return 0;
24,412✔
287
}
288

289
SRsmaObj *mndAcquireRsma(SMnode *pMnode, char *name) {
47,388✔
290
  SSdb     *pSdb = pMnode->pSdb;
47,388✔
291
  SRsmaObj *pObj = sdbAcquire(pSdb, SDB_RSMA, name);
47,388✔
292
  if (pObj == NULL) {
47,388✔
293
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
19,386✔
294
      terrno = TSDB_CODE_RSMA_NOT_EXIST;
19,386✔
295
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
296
      terrno = TSDB_CODE_MND_RSMA_IN_CREATING;
×
297
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
298
      terrno = TSDB_CODE_MND_RSMA_IN_DROPPING;
×
299
    } else {
300
      terrno = TSDB_CODE_APP_ERROR;
×
301
      mFatal("rsma:%s, failed to acquire rsma since %s", name, terrstr());
×
302
    }
303
  }
304
  return pObj;
47,388✔
305
}
306

307
void mndReleaseRsma(SMnode *pMnode, SRsmaObj *pSma) {
28,002✔
308
  SSdb *pSdb = pMnode->pSdb;
28,002✔
309
  sdbRelease(pSdb, pSma);
28,002✔
310
}
28,002✔
311
#ifdef TD_ENTERPRISE
312
static int32_t mndSetCreateRsmaRedoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
313
  int32_t  code = 0;
×
314
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
×
315
  if (pRedoRaw == NULL) {
×
316
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
317
    if (terrno != 0) code = terrno;
×
318
    TAOS_RETURN(code);
×
319
  }
320
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
321
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
322

323
  TAOS_RETURN(code);
×
324
}
325

326
static int32_t mndSetCreateRsmaUndoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
327
  int32_t  code = 0;
×
328
  SSdbRaw *pUndoRaw = mndRsmaActionEncode(pSma);
×
329
  if (!pUndoRaw) {
×
330
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
331
    if (terrno != 0) code = terrno;
×
332
    TAOS_RETURN(code);
×
333
  }
334
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
335
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
336
  TAOS_RETURN(code);
×
337
}
338

339
static int32_t mndSetCreateRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
10,770✔
340
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
10,770✔
341
  if (pDbRaw == NULL) return -1;
10,770✔
342

343
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
10,770✔
344
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
10,770✔
345
  return 0;
10,770✔
346
}
347

348
static void *mndBuildVCreateRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
21,540✔
349
                                    SMCreateRsmaReq *pCreate, int32_t *pContLen) {
350
  int32_t         code = 0, lino = 0;
21,540✔
351
  SMsgHead       *pHead = NULL;
21,540✔
352
  SVCreateRsmaReq req = *pCreate;
21,540✔
353

354
  req.uid = pObj->uid;  // use the uid generated by mnode
21,540✔
355

356
  int32_t contLen = tSerializeSVCreateRsmaReq(NULL, 0, &req);
21,540✔
357
  TAOS_CHECK_EXIT(contLen);
21,540✔
358
  contLen += sizeof(SMsgHead);
21,540✔
359
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
21,540✔
360
  pHead->contLen = htonl(contLen);
21,540✔
361
  pHead->vgId = htonl(pVgroup->vgId);
21,540✔
362
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
21,540✔
363
  TAOS_CHECK_EXIT(tSerializeSVCreateRsmaReq(pBuf, contLen, &req));
21,540✔
364
_exit:
21,540✔
365
  if (code < 0) {
21,540✔
366
    taosMemoryFreeClear(pHead);
×
367
    terrno = code;
×
368
    *pContLen = 0;
×
369
    return NULL;
×
370
  }
371
  *pContLen = contLen;
21,540✔
372
  return pHead;
21,540✔
373
}
374

375
static int32_t mndSetCreateRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
10,770✔
376
                                           SMCreateRsmaReq *pCreate) {
377
  int32_t code = 0;
10,770✔
378
  SSdb   *pSdb = pMnode->pSdb;
10,770✔
379
  SVgObj *pVgroup = NULL;
10,770✔
380
  void   *pIter = NULL;
10,770✔
381

382
  SName name = {0};
10,770✔
383
  if ((code = tNameFromString(&name, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
10,770✔
384
    return code;
×
385
  }
386
  tstrncpy(pCreate->tbFName, (char *)tNameGetTableName(&name), sizeof(pCreate->tbFName));  // convert tbFName to tbName
10,770✔
387

388
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
53,850✔
389
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
43,080✔
390
      sdbRelease(pSdb, pVgroup);
21,540✔
391
      continue;
21,540✔
392
    }
393

394
    int32_t contLen = 0;
21,540✔
395
    void   *pReq = mndBuildVCreateRsmaReq(pMnode, pVgroup, pStb, pObj, pCreate, &contLen);
21,540✔
396
    if (pReq == NULL) {
21,540✔
397
      sdbCancelFetch(pSdb, pIter);
×
398
      sdbRelease(pSdb, pVgroup);
×
399
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
400
      TAOS_RETURN(code);
×
401
    }
402

403
    STransAction action = {0};
21,540✔
404
    action.mTraceId = pTrans->mTraceId;
21,540✔
405
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
21,540✔
406
    action.pCont = pReq;
21,540✔
407
    action.contLen = contLen;
21,540✔
408
    action.msgType = TDMT_VND_CREATE_RSMA;
21,540✔
409
    action.acceptableCode = TSDB_CODE_RSMA_ALREADY_EXISTS;  // check whether the rsma uid exist
21,540✔
410
    action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;         // retry if relative table not exist
21,540✔
411
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
21,540✔
412
      taosMemoryFree(pReq);
×
413
      sdbCancelFetch(pSdb, pIter);
×
414
      sdbRelease(pSdb, pVgroup);
×
415
      TAOS_RETURN(code);
×
416
    }
417
    sdbRelease(pSdb, pVgroup);
21,540✔
418
  }
419

420
  TAOS_RETURN(code);
10,770✔
421
}
422

423
static int32_t mndSetCreateRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
16,514✔
424
  int32_t  code = 0;
16,514✔
425
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
16,514✔
426
  if (pCommitRaw == NULL) {
16,514✔
427
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
428
    if (terrno != 0) code = terrno;
×
429
    TAOS_RETURN(code);
×
430
  }
431
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
16,514✔
432
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
16,514✔
433

434
  TAOS_RETURN(code);
16,514✔
435
}
436

437
static int32_t mndSetDropRsmaPrepareLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
2,154✔
438
  int32_t  code = 0;
2,154✔
439
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
2,154✔
440
  if (pRedoRaw == NULL) {
2,154✔
441
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
442
    if (terrno != 0) code = terrno;
×
443
    return -1;
×
444
  }
445
  TAOS_CHECK_RETURN(mndTransAppendPrepareLog(pTrans, pRedoRaw));
2,154✔
446
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
2,154✔
447

448
  return 0;
2,154✔
449
}
450

451
static int32_t mndSetDropRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
10,052✔
452
  int32_t  code = 0;
10,052✔
453
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
10,052✔
454
  if (pCommitRaw == NULL) {
10,052✔
455
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
456
    if (terrno != 0) code = terrno;
×
457
    return -1;
×
458
  }
459
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
10,052✔
460
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
10,052✔
461

462
  return 0;
10,052✔
463
}
464

465
static void *mndBuildVDropRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SRsmaObj *pObj, int32_t *pContLen) {
4,308✔
466
  int32_t       code = 0, lino = 0;
4,308✔
467
  SMsgHead     *pHead = NULL;
4,308✔
468
  SVDropRsmaReq req = {0};
4,308✔
469

470
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
4,308✔
471
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
4,308✔
472
  req.tbType = pObj->tbType;
4,308✔
473
  req.uid = pObj->uid;
4,308✔
474
  req.tbUid = pObj->tbUid;
4,308✔
475

476
  int32_t contLen = tSerializeSVDropRsmaReq(NULL, 0, &req);
4,308✔
477
  TAOS_CHECK_EXIT(contLen);
4,308✔
478
  contLen += sizeof(SMsgHead);
4,308✔
479
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
4,308✔
480
  pHead->contLen = htonl(contLen);
4,308✔
481
  pHead->vgId = htonl(pVgroup->vgId);
4,308✔
482
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
4,308✔
483
  TAOS_CHECK_EXIT(tSerializeSVDropRsmaReq(pBuf, contLen, &req));
4,308✔
484
_exit:
4,308✔
485
  if (code < 0) {
4,308✔
486
    taosMemoryFreeClear(pHead);
×
487
    terrno = code;
×
488
    *pContLen = 0;
×
489
    return NULL;
×
490
  }
491
  *pContLen = contLen;
4,308✔
492
  return pHead;
4,308✔
493
}
494

495
static int32_t mndSetDropRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SRsmaObj *pSma) {
2,154✔
496
  int32_t code = 0;
2,154✔
497
  SSdb   *pSdb = pMnode->pSdb;
2,154✔
498
  SVgObj *pVgroup = NULL;
2,154✔
499
  void   *pIter = NULL;
2,154✔
500

501
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
10,770✔
502
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
8,616✔
503
      sdbRelease(pSdb, pVgroup);
4,308✔
504
      continue;
4,308✔
505
    }
506

507
    int32_t contLen = 0;
4,308✔
508
    void   *pReq = mndBuildVDropRsmaReq(pMnode, pVgroup, pSma, &contLen);
4,308✔
509
    if (pReq == NULL) {
4,308✔
510
      sdbCancelFetch(pSdb, pIter);
×
511
      sdbRelease(pSdb, pVgroup);
×
512
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
513
      TAOS_RETURN(code);
×
514
    }
515

516
    STransAction action = {0};
4,308✔
517
    action.mTraceId = pTrans->mTraceId;
4,308✔
518
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
4,308✔
519
    action.pCont = pReq;
4,308✔
520
    action.contLen = contLen;
4,308✔
521
    action.msgType = TDMT_VND_DROP_RSMA;
4,308✔
522
    action.acceptableCode = TSDB_CODE_RSMA_NOT_EXIST;
4,308✔
523
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
4,308✔
524
      taosMemoryFree(pReq);
×
525
      sdbCancelFetch(pSdb, pIter);
×
526
      sdbRelease(pSdb, pVgroup);
×
527
      TAOS_RETURN(code);
×
528
    }
529
    sdbRelease(pSdb, pVgroup);
4,308✔
530
  }
531
  TAOS_RETURN(code);
2,154✔
532
}
533

534
static int32_t mndDropRsma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SRsmaObj *pObj) {
2,154✔
535
  int32_t code = 0, lino = 0;
2,154✔
536

537
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-rsma");
2,154✔
538
  if (pTrans == NULL) {
2,154✔
539
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
540
    if (terrno != 0) code = terrno;
×
541
    goto _exit;
×
542
  }
543

544
  mInfo("trans:%d start to drop rsma:%s", pTrans->id, pObj->name);
2,154✔
545

546
  mndTransSetDbName(pTrans, pDb->name, pObj->name);
2,154✔
547
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
2,154✔
548
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
2,154✔
549

550
  mndTransSetOper(pTrans, MND_OPER_DROP_RSMA);
2,154✔
551
  TAOS_CHECK_EXIT(mndSetDropRsmaPrepareLogs(pMnode, pTrans, pObj));
2,154✔
552
  TAOS_CHECK_EXIT(mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj));
2,154✔
553
  TAOS_CHECK_EXIT(mndSetDropRsmaRedoActions(pMnode, pTrans, pDb, pObj));
2,154✔
554

555
  // int32_t rspLen = 0;
556
  // void   *pRsp = NULL;
557
  // TAOS_CHECK_EXIT(mndBuildDropRsmaRsp(pObj, &rspLen, &pRsp, false));
558
  // mndTransSetRpcRsp(pTrans, pRsp, rspLen);
559

560
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
2,154✔
561
_exit:
2,154✔
562
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,154✔
563
    mError("rsma:%s, failed to drop at line:%d since %s", pObj->name, lino, tstrerror(code));
×
564
  }
565
  mndTransDrop(pTrans);
2,154✔
566
  TAOS_RETURN(code);
2,154✔
567
}
568
#endif
569
static int32_t mndProcessDropRsmaReq(SRpcMsg *pReq) {
2,154✔
570
  SMnode *pMnode = pReq->info.node;
2,154✔
571
  int32_t code = 0, lino = 0;
2,154✔
572
#ifdef TD_ENTERPRISE
573
  SDbObj       *pDb = NULL;
2,154✔
574
  SRsmaObj     *pObj = NULL;
2,154✔
575
  SUserObj     *pUser = NULL;
2,154✔
576
  SMDropRsmaReq dropReq = {0};
2,154✔
577
  int64_t       tss = taosGetTimestampMs();
2,154✔
578

579
  TAOS_CHECK_GOTO(tDeserializeSMDropRsmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _exit);
2,154✔
580

581
  mInfo("rsma:%s, start to drop", dropReq.name);
2,154✔
582

583
  pObj = mndAcquireRsma(pMnode, dropReq.name);
2,154✔
584
  if (pObj == NULL) {
2,154✔
585
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
586
    if (terrno != 0) code = terrno;
×
587
    if (dropReq.igNotExists) {
×
588
      code = 0;  // mndBuildDropMountRsp(pObj, &pReq->info.rspLen, &pReq->info.rsp, true);
×
589
    }
590
    goto _exit;
×
591
  }
592

593
  SName name = {0};
2,154✔
594
  TAOS_CHECK_EXIT(tNameFromString(&name, pObj->dbFName, T_NAME_ACCT | T_NAME_DB));
2,154✔
595
  if (!(pDb = mndAcquireDb(pMnode, pObj->dbFName))) {
2,154✔
596
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
597
  }
598

599
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
2,154✔
600

601
  // TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb), NULL, _exit);
602
  TAOS_CHECK_EXIT(
2,154✔
603
      mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_DROP, PRIV_OBJ_RSMA, pObj->ownerId, pObj->dbFName, pObj->name));
604

605
  code = mndDropRsma(pMnode, pReq, pDb, pObj);
2,154✔
606
  if (code == TSDB_CODE_SUCCESS) {
2,154✔
607
    code = TSDB_CODE_ACTION_IN_PROGRESS;
2,154✔
608
  }
609

610
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
2,154✔
611
    int64_t tse = taosGetTimestampMs();
2,154✔
612
    double  duration = (double)(tse - tss);
2,154✔
613
    duration = duration / 1000;
2,154✔
614
    auditRecord(pReq, pMnode->clusterId, "dropRsma", dropReq.name, "", "", 0, duration, 0);
2,154✔
615
  }
616
_exit:
2,154✔
617
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,154✔
618
    mError("rsma:%s, failed at line %d to drop since %s", dropReq.name, lino, tstrerror(code));
×
619
  }
620

621
  mndReleaseDb(pMnode, pDb);
2,154✔
622
  mndReleaseRsma(pMnode, pObj);
2,154✔
623
  mndReleaseUser(pMnode, pUser);
2,154✔
624
#endif
625
  TAOS_RETURN(code);
2,154✔
626
}
627
#ifdef TD_ENTERPRISE
628
static int32_t mndCreateRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
10,770✔
629
                             SMCreateRsmaReq *pCreate) {
630
  int32_t  code = 0, lino = 0;
10,770✔
631
  SRsmaObj obj = {0};
10,770✔
632
  STrans  *pTrans = NULL;
10,770✔
633

634
  (void)snprintf(obj.name, TSDB_TABLE_NAME_LEN, "%s", pCreate->name);
10,770✔
635
  (void)snprintf(obj.dbFName, TSDB_DB_FNAME_LEN, "%s", pDb->name);
10,770✔
636

637
  const char *tbName = strrchr(pCreate->tbFName, '.');
10,770✔
638
  (void)snprintf(obj.tbName, TSDB_TABLE_NAME_LEN, "%s", tbName ? tbName + 1 : pCreate->tbFName);
10,770✔
639
  (void)snprintf(obj.createUser, TSDB_USER_LEN, "%s", pUser->user);
10,770✔
640
  obj.ownerId = pUser->uid;
10,770✔
641
  obj.createdTime = taosGetTimestampMs();
10,770✔
642
  obj.updateTime = obj.createdTime;
10,770✔
643
  obj.uid = mndGenerateUid(obj.name, strlen(obj.name));
10,770✔
644
  obj.tbUid = pCreate->tbUid;
10,770✔
645
  obj.dbUid = pDb->uid;
10,770✔
646
  obj.interval[0] = pCreate->interval[0];
10,770✔
647
  obj.interval[1] = pCreate->interval[1];
10,770✔
648
  obj.version = 1;
10,770✔
649
  obj.tbType = pCreate->tbType;  // ETableType: 1 stable. Only super table supported currently.
10,770✔
650
  obj.intervalUnit = pCreate->intervalUnit;
10,770✔
651
  obj.nFuncs = pCreate->nFuncs;
10,770✔
652
  if (obj.nFuncs > 0) {
10,770✔
653
    TSDB_CHECK_NULL((obj.funcColIds = taosMemoryCalloc(obj.nFuncs, sizeof(col_id_t))), code, lino, _exit, terrno);
10,770✔
654
    TSDB_CHECK_NULL((obj.funcIds = taosMemoryCalloc(obj.nFuncs, sizeof(func_id_t))), code, lino, _exit, terrno);
10,770✔
655
    for (int16_t i = 0; i < obj.nFuncs; ++i) {
70,364✔
656
      obj.funcColIds[i] = pCreate->funcColIds[i];
59,594✔
657
      obj.funcIds[i] = pCreate->funcIds[i];
59,594✔
658
    }
659
  }
660

661
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-rsma")),
10,770✔
662
                  code, lino, _exit, terrno);
663
  mInfo("trans:%d, used to create rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
10,770✔
664

665
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
10,770✔
666
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
10,770✔
667
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
10,770✔
668

669
  mndTransSetOper(pTrans, MND_OPER_CREATE_RSMA);
10,770✔
670
  TAOS_CHECK_EXIT(mndSetCreateRsmaPrepareActions(pMnode, pTrans, &obj));
10,770✔
671
  TAOS_CHECK_EXIT(mndSetCreateRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pCreate));
10,770✔
672
  TAOS_CHECK_EXIT(mndSetCreateRsmaCommitLogs(pMnode, pTrans, &obj));
10,770✔
673
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
10,770✔
674
_exit:
10,770✔
675
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
10,770✔
676
    mError("rsma:%s, failed at line %d to create rsma, since %s", obj.name, lino, tstrerror(code));
×
677
  }
678
  mndTransDrop(pTrans);
10,770✔
679
  mndRsmaFreeObj(&obj);
10,770✔
680
  TAOS_RETURN(code);
10,770✔
681
}
682

683
static int32_t mndCheckCreateRsmaReq(SMCreateRsmaReq *pCreate) {
17,950✔
684
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
17,950✔
685
  if (pCreate->name[0] == 0) goto _exit;
17,950✔
686
  if (pCreate->tbFName[0] == 0) goto _exit;
17,950✔
687
  if (pCreate->igExists < 0 || pCreate->igExists > 1) goto _exit;
17,950✔
688
  if (pCreate->intervalUnit < 0) goto _exit;
17,950✔
689
  if (pCreate->interval[0] < 0) goto _exit;
17,950✔
690
  if (pCreate->interval[1] < 0) goto _exit;
17,950✔
691
  if (pCreate->interval[0] == 0 && pCreate->interval[1] == 0) goto _exit;
17,950✔
692

693
  SName fname = {0};
17,950✔
694
  if ((code = tNameFromString(&fname, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) < 0) goto _exit;
17,950✔
695
  if (*(char *)tNameGetTableName(&fname) == 0) goto _exit;
17,950✔
696
  code = 0;
17,950✔
697
_exit:
17,950✔
698
  TAOS_RETURN(code);
17,950✔
699
}
700

701
static int32_t mndCheckRsmaConflicts(SMnode *pMnode, SDbObj *pDbObj, SMCreateRsmaReq *pCreate) {
14,360✔
702
  void     *pIter = NULL;
14,360✔
703
  SSdb     *pSdb = pMnode->pSdb;
14,360✔
704
  SRsmaObj *pObj = NULL;
14,360✔
705
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
20,104✔
706
    if (pObj->tbUid == pCreate->tbUid && pObj->dbUid == pDbObj->uid) {
9,334✔
707
      sdbCancelFetch(pSdb, (pIter));
3,590✔
708
      sdbRelease(pSdb, pObj);
3,590✔
709
      mError("rsma:%s, conflict with existing rsma %s on same table %s.%s:%" PRIi64, pCreate->name, pObj->name,
3,590✔
710
             pObj->dbFName, pObj->tbName, pObj->tbUid);
711
      return TSDB_CODE_MND_RSMA_EXIST_IN_TABLE;
3,590✔
712
    }
713
    sdbRelease(pSdb, pObj);
5,744✔
714
  }
715
  return 0;
10,770✔
716
}
717
#endif
718
static int32_t mndProcessCreateRsmaReq(SRpcMsg *pReq) {
17,950✔
719
  int32_t code = 0, lino = 0;
17,950✔
720
#ifdef TD_ENTERPRISE
721
  SMnode         *pMnode = pReq->info.node;
17,950✔
722
  SDbObj         *pDb = NULL;
17,950✔
723
  SStbObj        *pStb = NULL;
17,950✔
724
  SRsmaObj       *pSma = NULL;
17,950✔
725
  SUserObj       *pUser = NULL;
17,950✔
726
  int64_t         mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
17,950✔
727
  SMCreateRsmaReq createReq = {0};
17,950✔
728
  int64_t         tss = taosGetTimestampMs();
17,950✔
729

730
  TAOS_CHECK_EXIT(tDeserializeSMCreateRsmaReq(pReq->pCont, pReq->contLen, &createReq));
17,950✔
731

732
  mInfo("start to create rsma: %s", createReq.name);
17,950✔
733
  TAOS_CHECK_EXIT(mndCheckCreateRsmaReq(&createReq));
17,950✔
734

735
  if ((pSma = mndAcquireRsma(pMnode, createReq.name))) {
17,950✔
736
    if (createReq.igExists) {
3,590✔
737
      mInfo("rsma:%s, already exist, ignore exist is set", createReq.name);
×
738
      code = 0;
×
739
      goto _exit;
×
740
    } else {
741
      TAOS_CHECK_EXIT(TSDB_CODE_RSMA_ALREADY_EXISTS);
3,590✔
742
    }
743
  } else {
744
    if ((code = terrno) == TSDB_CODE_RSMA_NOT_EXIST) {
14,360✔
745
      // continue
746
    } else {  // TSDB_CODE_MND_RSMA_IN_CREATING | TSDB_CODE_MND_RSMA_IN_DROPPING | TSDB_CODE_APP_ERROR
747
      goto _exit;
×
748
    }
749
  }
750

751
  SName name = {0};
14,360✔
752
  TAOS_CHECK_EXIT(tNameFromString(&name, createReq.tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
14,360✔
753
  char db[TSDB_TABLE_FNAME_LEN] = {0};
14,360✔
754
  (void)tNameGetFullDbName(&name, db);
14,360✔
755

756
  pDb = mndAcquireDb(pMnode, db);
14,360✔
757
  if (pDb == NULL) {
14,360✔
758
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
759
  }
760

761
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
14,360✔
762

763
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_READ_DB, pDb));
764
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb));
765

766
  // already check select table/insert table/create rsma privileges in parser
767
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pDb));
14,360✔
768

769
  pStb = mndAcquireStb(pMnode, createReq.tbFName);
14,360✔
770
  if (pStb == NULL) {
14,360✔
UNCOV
771
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
772
  }
773

774
  TAOS_CHECK_EXIT(mndCheckRsmaConflicts(pMnode, pDb, &createReq));
14,360✔
775

776
  TAOS_CHECK_EXIT(mndCreateRsma(pMnode, pReq, pUser, pDb, pStb, &createReq));
10,770✔
777

778
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
10,770✔
779

780
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
10,770✔
781
    int64_t tse = taosGetTimestampMs();
10,770✔
782
    double  duration = (double)(tse - tss);
10,770✔
783
    duration = duration / 1000;
10,770✔
784
    auditRecord(pReq, pMnode->clusterId, "createRsma", createReq.name, createReq.tbFName, "", 0, duration, 0);
10,770✔
785
  }
786
_exit:
17,950✔
787
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
17,950✔
788
    mError("rsma:%s, failed at line %d to create since %s", createReq.name, lino, tstrerror(code));
7,180✔
789
  }
790
  if (pSma) mndReleaseRsma(pMnode, pSma);
17,950✔
791
  if (pStb) mndReleaseStb(pMnode, pStb);
17,950✔
792
  if (pDb) mndReleaseDb(pMnode, pDb);
17,950✔
793
  if (pUser) mndReleaseUser(pMnode, pUser);
17,950✔
794
  tFreeSMCreateRsmaReq(&createReq);
17,950✔
795
#endif
796
  TAOS_RETURN(code);
17,950✔
797
}
798

799
#ifdef TD_ENTERPRISE
800
static int32_t mndCheckAlterRsmaReq(SMAlterRsmaReq *pReq) {
5,744✔
801
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
5,744✔
802
  if (pReq->name[0] == 0) goto _exit;
5,744✔
803
  if (pReq->igNotExists < 0 || pReq->igNotExists > 1) goto _exit;
5,744✔
804

805
  code = 0;
5,744✔
806
_exit:
5,744✔
807
  TAOS_RETURN(code);
5,744✔
808
}
809

810
static int32_t mndSetAlterRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
5,744✔
811
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
5,744✔
812
  if (pDbRaw == NULL) return -1;
5,744✔
813

814
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
5,744✔
815
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_READY) != 0) return -1;
5,744✔
816
  return 0;
5,744✔
817
}
818

819
static int32_t mndSetAlterRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
5,744✔
820
  return mndSetCreateRsmaCommitLogs(pMnode, pTrans, pSma);
5,744✔
821
}
822

823
static void *mndBuildVAlterRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
11,488✔
824
                                   SMAlterRsmaReq *pAlter, int32_t *pContLen) {
825
  int32_t        code = 0, lino = 0;
11,488✔
826
  SMsgHead      *pHead = NULL;
11,488✔
827
  SVAlterRsmaReq req = {0};
11,488✔
828
  req.alterType = pAlter->alterType;
11,488✔
829
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
11,488✔
830
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
11,488✔
831
  req.tbType = pObj->tbType;
11,488✔
832
  req.intervalUnit = pObj->intervalUnit;
11,488✔
833
  req.interval[0] = pObj->interval[0];
11,488✔
834
  req.interval[1] = pObj->interval[1];
11,488✔
835
  req.tbUid = pObj->tbUid;
11,488✔
836
  req.uid = pObj->uid;
11,488✔
837
  req.nFuncs = pObj->nFuncs;
11,488✔
838
  req.funcColIds = pObj->funcColIds;
11,488✔
839
  req.funcIds = pObj->funcIds;
11,488✔
840

841
  int32_t contLen = tSerializeSVAlterRsmaReq(NULL, 0, &req);
11,488✔
842
  TAOS_CHECK_EXIT(contLen);
11,488✔
843
  contLen += sizeof(SMsgHead);
11,488✔
844
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
11,488✔
845
  pHead->contLen = htonl(contLen);
11,488✔
846
  pHead->vgId = htonl(pVgroup->vgId);
11,488✔
847
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
11,488✔
848
  TAOS_CHECK_EXIT(tSerializeSVAlterRsmaReq(pBuf, contLen, &req));
11,488✔
849
_exit:
11,488✔
850
  if (code < 0) {
11,488✔
851
    taosMemoryFreeClear(pHead);
×
852
    terrno = code;
×
853
    *pContLen = 0;
×
UNCOV
854
    return NULL;
×
855
  }
856
  *pContLen = contLen;
11,488✔
857
  return pHead;
11,488✔
858
}
859

860
static int32_t mndSetAlterRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
5,744✔
861
                                          SMAlterRsmaReq *pAlter) {
862
  int32_t code = 0;
5,744✔
863
  SSdb   *pSdb = pMnode->pSdb;
5,744✔
864
  SVgObj *pVgroup = NULL;
5,744✔
865
  void   *pIter = NULL;
5,744✔
866

867
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
28,720✔
868
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
22,976✔
869
      sdbRelease(pSdb, pVgroup);
11,488✔
870
      continue;
11,488✔
871
    }
872

873
    int32_t contLen = 0;
11,488✔
874
    void   *pReq = mndBuildVAlterRsmaReq(pMnode, pVgroup, pStb, pObj, pAlter, &contLen);
11,488✔
875
    if (pReq == NULL) {
11,488✔
876
      sdbCancelFetch(pSdb, pIter);
×
877
      sdbRelease(pSdb, pVgroup);
×
878
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
879
      TAOS_RETURN(code);
×
880
    }
881

882
    STransAction action = {0};
11,488✔
883
    action.mTraceId = pTrans->mTraceId;
11,488✔
884
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
11,488✔
885
    action.pCont = pReq;
11,488✔
886
    action.contLen = contLen;
11,488✔
887
    action.msgType = TDMT_VND_ALTER_RSMA;
11,488✔
888
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
11,488✔
889
      taosMemoryFree(pReq);
×
890
      sdbCancelFetch(pSdb, pIter);
×
891
      sdbRelease(pSdb, pVgroup);
×
UNCOV
892
      TAOS_RETURN(code);
×
893
    }
894
    sdbRelease(pSdb, pVgroup);
11,488✔
895
  }
896

897
  TAOS_RETURN(code);
5,744✔
898
}
899

900
static int32_t mndAlterRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
5,744✔
901
                            SMAlterRsmaReq *pAlter, SRsmaObj *pOld) {
902
  int32_t  code = 0, lino = 0;
5,744✔
903
  STrans  *pTrans = NULL;
5,744✔
904
  SRsmaObj obj = *pOld;
5,744✔
905

906
  obj.updateTime = taosGetTimestampMs();
5,744✔
907
  ++obj.version;
5,744✔
908
  if (pAlter->alterType == TSDB_ALTER_RSMA_FUNCTION) {
5,744✔
909
    obj.nFuncs = pOld->nFuncs + pAlter->nFuncs;
5,744✔
910
    obj.funcColIds = taosMemoryMalloc(obj.nFuncs * sizeof(col_id_t));
5,744✔
911
    obj.funcIds = taosMemoryMalloc(obj.nFuncs * sizeof(func_id_t));
5,744✔
912
    if (obj.funcColIds == NULL || obj.funcIds == NULL) {
5,744✔
UNCOV
913
      TAOS_CHECK_EXIT(terrno);
×
914
    }
915
    int32_t n = 0, i = 0, j = 0;
5,744✔
916
    while (i < pOld->nFuncs && j < pAlter->nFuncs) {
30,874✔
917
      if (pOld->funcColIds[i] < pAlter->funcColIds[j]) {
25,130✔
918
        obj.funcColIds[n] = pOld->funcColIds[i];
21,540✔
919
        obj.funcIds[n++] = pOld->funcIds[i++];
21,540✔
920
      } else if (pOld->funcColIds[i] > pAlter->funcColIds[j]) {
3,590✔
921
        obj.funcColIds[n] = pAlter->funcColIds[j];
3,590✔
922
        obj.funcIds[n++] = pAlter->funcIds[j++];
3,590✔
923
      } else {
924
        mError("rsma:%s, conflict function on column id:%d", pOld->name, pAlter->funcColIds[j]);
×
UNCOV
925
        TAOS_CHECK_EXIT(TSDB_CODE_MND_RSMA_FUNC_CONFLICT);
×
926
      }
927
    }
928
    if (i < pOld->nFuncs) {
5,744✔
929
      while (i < pOld->nFuncs) {
7,180✔
930
        obj.funcColIds[n] = pOld->funcColIds[i];
3,590✔
931
        obj.funcIds[n++] = pOld->funcIds[i++];
3,590✔
932
      }
933
    } else if (j < pAlter->nFuncs) {
2,154✔
934
      while (j < pAlter->nFuncs) {
4,308✔
935
        obj.funcColIds[n] = pAlter->funcColIds[j];
2,154✔
936
        obj.funcIds[n++] = pAlter->funcIds[j++];
2,154✔
937
      }
938
    }
939
  } else {
UNCOV
940
    TAOS_CHECK_EXIT(TSDB_CODE_OPS_NOT_SUPPORT);
×
941
  }
942

943
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-rsma")), code,
5,744✔
944
                  lino, _exit, terrno);
945
  mInfo("trans:%d, used to alter rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
5,744✔
946

947
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
5,744✔
948
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
5,744✔
949
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
5,744✔
950

951
  mndTransSetOper(pTrans, MND_OPER_ALTER_RSMA);
5,744✔
952
  TAOS_CHECK_EXIT(mndSetAlterRsmaPrepareActions(pMnode, pTrans, &obj));
5,744✔
953
  TAOS_CHECK_EXIT(mndSetAlterRsmaCommitLogs(pMnode, pTrans, &obj));
5,744✔
954
  TAOS_CHECK_EXIT(mndSetAlterRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pAlter));
5,744✔
955

956
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
5,744✔
957
_exit:
5,744✔
958
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
5,744✔
UNCOV
959
    mError("rsma:%s, failed at line %d to alter rsma, since %s", obj.name, lino, tstrerror(code));
×
960
  }
961
  mndTransDrop(pTrans);
5,744✔
962
  mndRsmaFreeObj(&obj);
5,744✔
963
  TAOS_RETURN(code);
5,744✔
964
}
965
#endif
966
static int32_t mndProcessAlterRsmaReq(SRpcMsg *pReq) {
5,744✔
967
  int32_t code = 0, lino = 0;
5,744✔
968
#ifdef TD_ENTERPRISE
969
  SMnode        *pMnode = pReq->info.node;
5,744✔
970
  SDbObj        *pDb = NULL;
5,744✔
971
  SStbObj       *pStb = NULL;
5,744✔
972
  SRsmaObj      *pObj = NULL;
5,744✔
973
  SUserObj      *pUser = NULL;
5,744✔
974
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
5,744✔
975
  SMAlterRsmaReq req = {0};
5,744✔
976
  char           tbFName[TSDB_TABLE_FNAME_LEN] = "\0";
5,744✔
977
  int64_t        tss = taosGetTimestampMs();
5,744✔
978

979
  TAOS_CHECK_EXIT(tDeserializeSMAlterRsmaReq(pReq->pCont, pReq->contLen, &req));
5,744✔
980

981
  mInfo("start to alter rsma: %s", req.name);
5,744✔
982
  TAOS_CHECK_EXIT(mndCheckAlterRsmaReq(&req));
5,744✔
983

984
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
5,744✔
985
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
986
    if (terrno != 0) code = terrno;
×
987
    if (req.igNotExists) {
×
UNCOV
988
      code = 0;
×
989
    }
UNCOV
990
    goto _exit;
×
991
  }
992

993
  if (!(pDb = mndAcquireDb(pMnode, pObj->dbFName))) {
5,744✔
UNCOV
994
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
995
  }
996

997
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
5,744✔
998

999
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_READ_DB, pDb));
1000
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb));
1001
  TAOS_CHECK_EXIT(
5,744✔
1002
      mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_ALTER, PRIV_OBJ_RSMA, pObj->ownerId, pObj->dbFName, pObj->name));
1003

1004
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
5,744✔
1005

1006
  pStb = mndAcquireStb(pMnode, tbFName);
5,744✔
1007
  if (pStb == NULL) {
5,744✔
UNCOV
1008
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
1009
  }
1010

1011
  TAOS_CHECK_EXIT(mndAlterRsma(pMnode, pReq, pUser, pDb, pStb, &req, pObj));
5,744✔
1012

1013
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
5,744✔
1014

1015
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
5,744✔
1016
    char alterType[32] = "\0";
5,744✔
1017
    (void)snprintf(alterType, sizeof(alterType), "alterType:%" PRIi8, req.alterType);
5,744✔
1018
    int64_t tse = taosGetTimestampMs();
5,744✔
1019
    double  duration = (double)(tse - tss);
5,744✔
1020
    duration = duration / 1000;
5,744✔
1021
    auditRecord(pReq, pMnode->clusterId, "alterRsma", req.name, tbFName, alterType, 0, duration, 0);
5,744✔
1022
  }
1023
_exit:
5,744✔
1024
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
5,744✔
UNCOV
1025
    mError("rsma:%s, failed at line %d to alter since %s", req.name, lino, tstrerror(code));
×
1026
  }
1027
  if (pObj) mndReleaseRsma(pMnode, pObj);
5,744✔
1028
  if (pStb) mndReleaseStb(pMnode, pStb);
5,744✔
1029
  if (pDb) mndReleaseDb(pMnode, pDb);
5,744✔
1030
  if (pUser) mndReleaseUser(pMnode, pUser);
5,744✔
1031
  tFreeSMAlterRsmaReq(&req);
5,744✔
1032
#endif
1033
  TAOS_RETURN(code);
5,744✔
1034
}
1035
#ifdef TD_ENTERPRISE
1036
static int32_t mndFillRsmaInfo(SRsmaObj *pObj, SStbObj *pStb, SRsmaInfoRsp *pRsp, bool withColName) {
16,514✔
1037
  int32_t code = 0, lino = 0;
16,514✔
1038
  pRsp->id = pObj->uid;
16,514✔
1039
  (void)snprintf(pRsp->name, sizeof(pRsp->name), "%s", pObj->name);
16,514✔
1040
  (void)snprintf(pRsp->tbFName, sizeof(pRsp->tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
16,514✔
1041
  pRsp->ownerId = pObj->ownerId;
16,514✔
1042
  pRsp->version = pObj->version;
16,514✔
1043
  pRsp->tbType = pObj->tbType;
16,514✔
1044
  pRsp->intervalUnit = pObj->intervalUnit;
16,514✔
1045
  pRsp->nFuncs = pObj->nFuncs;
16,514✔
1046
  pRsp->interval[0] = pObj->interval[0];
16,514✔
1047
  pRsp->interval[1] = pObj->interval[1];
16,514✔
1048
  if (pRsp->nFuncs > 0) {
16,514✔
1049
    pRsp->funcColIds = pObj->funcColIds;  // shallow copy, no need to free
16,514✔
1050
    pRsp->funcIds = pObj->funcIds;        // shallow copy, no need to free
16,514✔
1051
    if (withColName) {
16,514✔
1052
      pRsp->colNames = taosArrayInit(pRsp->nFuncs, sizeof(char *));
16,514✔
1053
      if (pRsp->colNames == NULL) {
16,514✔
UNCOV
1054
        TAOS_CHECK_EXIT(terrno);
×
1055
      }
1056
      pRsp->nColNames = pRsp->nFuncs;
16,514✔
1057
      int16_t i = 0, j = 0;
16,514✔
1058
      for (; i < pRsp->nFuncs; ++i) {
97,648✔
1059
        bool found = false;
81,134✔
1060
        for (; j < pStb->numOfColumns;) {
173,756✔
1061
          if (pStb->pColumns[j].colId == pRsp->funcColIds[i]) {
173,756✔
1062
            found = true;
81,134✔
1063
            break;
81,134✔
1064
          } else if (pStb->pColumns[j].colId < pRsp->funcColIds[i]) {
92,622✔
1065
            ++j;
92,622✔
1066
          } else {
UNCOV
1067
            break;
×
1068
          }
1069
        }
1070
        if (found) {
81,134✔
1071
          SSchema *pCol = pStb->pColumns + j;
81,134✔
1072
          char    *colName = taosStrdup(pCol->name);
81,134✔
1073
          if (colName == NULL) {
81,134✔
UNCOV
1074
            TAOS_CHECK_EXIT(terrno);
×
1075
          }
1076
          if (!taosArrayPush(pRsp->colNames, &colName)) {
162,268✔
1077
            taosMemoryFree(colName);
×
UNCOV
1078
            TAOS_CHECK_EXIT(terrno);
×
1079
          }
1080
        } else {
UNCOV
1081
          TAOS_CHECK_EXIT(TSDB_CODE_MND_COLUMN_NOT_EXIST);
×
1082
        }
1083
      }
1084
    }
1085
  }
1086
_exit:
16,514✔
1087
  if (code != 0) {
16,514✔
UNCOV
1088
    mError("rsma:%s, failed at line %d to get rsma info since %s", pObj->name, lino, tstrerror(code));
×
1089
  }
1090
  TAOS_RETURN(code);
16,514✔
1091
}
1092
#endif
1093
static int32_t mndProcessGetRsmaReq(SRpcMsg *pReq) {
21,540✔
1094
#ifdef TD_ENTERPRISE
1095
  int32_t      code = 0, lino = 0;
21,540✔
1096
  SMnode      *pMnode = pReq->info.node;
21,540✔
1097
  SRsmaInfoReq req = {0};
21,540✔
1098
  SRsmaInfoRsp rsp = {0};
21,540✔
1099
  SRsmaObj    *pObj = NULL;
21,540✔
1100
  SStbObj     *pStb = NULL;
21,540✔
1101
  void        *pRsp = NULL;
21,540✔
1102
  int32_t      contLen = 0;
21,540✔
1103

1104
  TAOS_CHECK_EXIT(tDeserializeRsmaInfoReq(pReq->pCont, pReq->contLen, &req));
21,540✔
1105

1106
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
21,540✔
1107
    TAOS_CHECK_EXIT(terrno);
5,026✔
1108
  }
1109

1110
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
16,514✔
1111
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
16,514✔
1112

1113
  if ((pStb = mndAcquireStb(pMnode, tbFName)) == NULL) {
16,514✔
UNCOV
1114
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
1115
  }
1116

1117
  TAOS_CHECK_EXIT(mndFillRsmaInfo(pObj, pStb, &rsp, req.withColName));
16,514✔
1118

1119
  if ((contLen = tSerializeRsmaInfoRsp(NULL, 0, &rsp)) < 0) {
16,514✔
UNCOV
1120
    TAOS_CHECK_EXIT(contLen);
×
1121
  }
1122
  if (!(pRsp = rpcMallocCont(contLen))) {
16,514✔
UNCOV
1123
    TAOS_CHECK_EXIT(terrno);
×
1124
  }
1125
  if ((contLen = tSerializeRsmaInfoRsp(pRsp, contLen, &rsp)) < 0) {
16,514✔
UNCOV
1126
    TAOS_CHECK_EXIT(contLen);
×
1127
  }
1128

1129
  pReq->info.rsp = pRsp;
16,514✔
1130
  pReq->info.rspLen = contLen;
16,514✔
1131

1132
_exit:
21,540✔
1133
  if (code != 0) {
21,540✔
1134
    rpcFreeCont(pRsp);
5,026✔
1135
  }
1136
  if (pObj) mndReleaseRsma(pMnode, pObj);
21,540✔
1137
  if (pStb) mndReleaseStb(pMnode, pStb);
21,540✔
1138
  tFreeRsmaInfoRsp(&rsp, false);
21,540✔
1139
  TAOS_RETURN(code);
21,540✔
1140
#else
1141
  return TSDB_CODE_OPS_NOT_SUPPORT;
1142
#endif
1143
}
1144
#ifdef TD_ENTERPRISE
1145
static void mndRetrieveRsmaFuncList(SMnode *pMnode, SRsmaObj *pObj, char *buf, int32_t bufLen) {
22,976✔
1146
  SSdb    *pSdb = pMnode->pSdb;
22,976✔
1147
  int32_t  numOfRows = 0;
22,976✔
1148
  SStbObj *pStb = NULL;
22,976✔
1149
  char    *qBuf = POINTER_SHIFT(buf, VARSTR_HEADER_SIZE);
22,976✔
1150
  int32_t  qBufLen = bufLen - VARSTR_HEADER_SIZE;
22,976✔
1151

1152
  qBuf[0] = 0;
22,976✔
1153
  varDataSetLen(buf, 0);  // initialize to empty string
22,976✔
1154

1155
  if (pObj->nFuncs <= 0) return;
22,976✔
1156

1157
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
22,976✔
1158
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
22,976✔
1159
  pStb = mndAcquireStb(pMnode, tbFName);
22,976✔
1160
  if (pStb == NULL) {
22,976✔
1161
    mWarn("rsma:%s, failed to acquire table %s for function list", pObj->name, tbFName);
×
UNCOV
1162
    return;
×
1163
  }
1164

1165
  SSchema *pColumns = pStb->pColumns;
22,976✔
1166

1167
  int32_t len = 0, j = 0;
22,976✔
1168
  char    colFunc[TSDB_COL_NAME_LEN + TSDB_FUNC_NAME_LEN + 2] = {0};
22,976✔
1169
  for (int32_t i = 0; i < pObj->nFuncs; ++i) {
164,422✔
1170
    col_id_t colId = pObj->funcColIds[i];
141,446✔
1171
    for (; j < pStb->numOfColumns;) {
292,944✔
1172
      if (pColumns[j].colId == colId) {
292,944✔
1173
        int32_t colFuncLen =
141,446✔
1174
            tsnprintf(colFunc, sizeof(colFunc), "%s(%s),", fmGetFuncName(pObj->funcIds[i]), pColumns[j].name);
141,446✔
1175
        if ((qBufLen - len) > colFuncLen) {
141,446✔
1176
          len += tsnprintf(qBuf + len, colFuncLen + 1, "%s", colFunc);
141,446✔
1177
        } else {
UNCOV
1178
          goto _exit;
×
1179
        }
1180
        break;
141,446✔
1181
      } else if (pColumns[j].colId > colId) {
151,498✔
UNCOV
1182
        break;
×
1183
      } else {
1184
        ++j;
151,498✔
1185
      }
1186
    }
1187
  }
1188
_exit:
22,976✔
1189
  qBuf[len > 0 ? len - 1 : 0] = 0;  // remove the last ','
22,976✔
1190
  varDataSetLen(buf, len > 0 ? len - 1 : 0);
22,976✔
1191
  mndReleaseStb(pMnode, pStb);
22,976✔
1192
}
1193
#endif
1194
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
15,796✔
1195
  SMnode          *pMnode = pReq->info.node;
15,796✔
1196
  int32_t          code = 0, lino = 0;
15,796✔
1197
  int32_t          numOfRows = 0;
15,796✔
1198
  int32_t          cols = 0;
15,796✔
1199
  char             tmp[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE];
15,796✔
1200
  int32_t          tmpLen = 0;
15,796✔
1201
  int32_t          bufLen = 0;
15,796✔
1202
  char            *pBuf = NULL;
15,796✔
1203
  char            *qBuf = NULL;
15,796✔
1204
  void            *pIter = NULL;
15,796✔
1205
  SSdb            *pSdb = pMnode->pSdb;
15,796✔
1206
  SColumnInfoData *pColInfo = NULL;
15,796✔
1207
  SUserObj        *pUser = NULL;
15,796✔
1208
  char             objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
15,796✔
1209
  bool             showAll = false;
15,796✔
1210

1211
#ifdef TD_ENTERPRISE
1212
  pBuf = tmp;
15,796✔
1213
  bufLen = sizeof(tmp) - VARSTR_HEADER_SIZE;
15,796✔
1214
  if (pShow->numOfRows < 1) {
15,796✔
1215
    TAOS_CHECK_EXIT(mndAcquireUser(pMnode, (RPC_MSG_USER(pReq)), &pUser));
15,796✔
1216
    (void)snprintf(objFName, sizeof(objFName), "%d.*", pUser->acctId);
15,796✔
1217
    int32_t objLevel = privObjGetLevel(PRIV_OBJ_RSMA);
15,796✔
1218
    showAll =
15,796✔
1219
        (0 == mndCheckSysObjPrivilege(pMnode, pUser, RPC_MSG_TOKEN(pReq), PRIV_CM_SHOW, PRIV_OBJ_RSMA, 0, objFName,
15,796✔
1220
                                      objLevel == 0 ? NULL : "*"));  // 1.*.*
1221

1222
    SRsmaObj *pObj = NULL;
15,796✔
1223
    int32_t   index = 0;
15,796✔
1224
    while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
38,772✔
1225
      if (!showAll) {
22,976✔
1226
        if (mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_SHOW, PRIV_OBJ_RSMA, pObj->ownerId, pObj->dbFName,
×
1227
                                     objLevel == 0 ? NULL : pObj->name)) {  // 1.db1.rsma1
×
1228
          sdbRelease(pSdb, pObj);
×
1229
          continue;
×
1230
        }
1231
      }
1232

1233
      cols = 0;
22,976✔
1234
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
22,976✔
1235
      qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
22,976✔
1236
      TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->name));
22,976✔
1237
      varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
22,976✔
1238
      COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
22,976✔
1239

1240
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,976✔
1241
        COL_DATA_SET_VAL_GOTO((const char *)(&pObj->uid), false, pObj, pIter, _exit);
22,976✔
1242
      }
1243

1244
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,976✔
1245
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
22,976✔
1246
        const char *db = strchr(pObj->dbFName, '.');
22,976✔
1247
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", db ? db + 1 : pObj->dbFName));
22,976✔
1248
        varDataSetLen(pBuf, strlen(qBuf));
22,976✔
1249
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
22,976✔
1250
      }
1251

1252
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,976✔
1253
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
22,976✔
1254
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->tbName));
22,976✔
1255
        varDataSetLen(pBuf, strlen(qBuf));
22,976✔
1256
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
22,976✔
1257
      }
1258

1259
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,976✔
1260
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
22,976✔
1261
        if (pObj->tbType == TSDB_SUPER_TABLE) {
22,976✔
1262
          TAOS_UNUSED(snprintf(qBuf, bufLen, "SUPER_TABLE"));
22,976✔
1263
        } else if (pObj->tbType == TSDB_NORMAL_TABLE) {
×
1264
          TAOS_UNUSED(snprintf(qBuf, bufLen, "NORMAL_TABLE"));
×
1265
        } else if (pObj->tbType == TSDB_CHILD_TABLE) {
×
1266
          TAOS_UNUSED(snprintf(qBuf, bufLen, "CHILD_TABLE"));
×
1267
        } else {
1268
          TAOS_UNUSED(snprintf(qBuf, bufLen, "UNKNOWN"));
×
1269
        }
1270
        varDataSetLen(pBuf, strlen(qBuf));
22,976✔
1271
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
22,976✔
1272
      }
1273

1274
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,976✔
1275
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->createdTime, false, pObj, pIter, _exit);
22,976✔
1276
      }
1277

1278
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,976✔
1279
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
22,976✔
1280
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%" PRIi64 "%c", pObj->interval[0], pObj->intervalUnit));
22,976✔
1281
        if (pObj->interval[1] > 0) {
22,976✔
1282
          tmpLen = strlen(qBuf);
22,976✔
1283
          TAOS_UNUSED(
22,976✔
1284
              snprintf(qBuf + tmpLen, bufLen - tmpLen, ",%" PRIi64 "%c", pObj->interval[1], pObj->intervalUnit));
1285
        }
1286
        varDataSetLen(pBuf, strlen(qBuf));
22,976✔
1287
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
22,976✔
1288
      }
1289

1290
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,976✔
1291
        mndRetrieveRsmaFuncList(pMnode, pObj, pBuf, bufLen);
22,976✔
1292
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
22,976✔
1293
      }
1294

1295
      sdbRelease(pSdb, pObj);
22,976✔
1296
      ++numOfRows;
22,976✔
1297
    }
1298
  }
1299

1300
  pShow->numOfRows += numOfRows;
15,796✔
1301

1302
_exit:
15,796✔
1303
  if (pUser) mndReleaseUser(pMnode, pUser);
15,796✔
1304
  if (code < 0) {
15,796✔
1305
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1306
    TAOS_RETURN(code);
×
1307
  }
1308
#endif
1309
  return numOfRows;
15,796✔
1310
}
1311

1312
static void mndCancelRetrieveRsma(SMnode *pMnode, void *pIter) {
×
1313
  SSdb *pSdb = pMnode->pSdb;
×
1314
  sdbCancelFetchByType(pSdb, pIter, SDB_RSMA);
×
1315
}
×
1316

1317
int32_t mndDropRsmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
604,349✔
1318
  int32_t code = 0;
604,349✔
1319
#ifdef TD_ENTERPRISE
1320
  SSdb     *pSdb = pMnode->pSdb;
604,349✔
1321
  SRsmaObj *pObj = NULL;
604,349✔
1322
  void     *pIter = NULL;
604,349✔
1323

1324
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
611,529✔
1325
    if (pObj->dbUid == pDb->uid) {
7,180✔
1326
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
7,180✔
1327
        sdbCancelFetch(pSdb, pIter);
×
1328
        sdbRelease(pSdb, pObj);
×
1329
        TAOS_RETURN(code);
×
1330
      }
1331
    }
1332
    sdbRelease(pSdb, pObj);
7,180✔
1333
  }
1334
#endif
1335
  TAOS_RETURN(code);
604,349✔
1336
}
1337

1338
int32_t mndDropRsmaByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
433,827✔
1339
  int32_t code = 0;
433,827✔
1340
#ifdef TD_ENTERPRISE
1341
  SSdb     *pSdb = pMnode->pSdb;
433,827✔
1342
  SRsmaObj *pObj = NULL;
433,827✔
1343
  void     *pIter = NULL;
433,827✔
1344

1345
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
434,545✔
1346
    if (pObj->tbUid == pStb->uid && pObj->dbUid == pStb->dbUid) {
718✔
1347
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
718✔
1348
        sdbCancelFetch(pSdb, pIter);
×
1349
        sdbRelease(pSdb, pObj);
×
1350
        TAOS_RETURN(code);
×
1351
      }
1352
    }
1353
    sdbRelease(pSdb, pObj);
718✔
1354
  }
1355
#endif
1356
  TAOS_RETURN(code);
433,827✔
1357
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc