• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4905

29 Dec 2025 02:08PM UTC coverage: 65.423% (-0.3%) from 65.734%
#4905

push

travis-ci

web-flow
enh: sign connect request (#34067)

23 of 29 new or added lines in 4 files covered. (79.31%)

11614 existing lines in 186 files now uncovered.

193476 of 295730 relevant lines covered (65.42%)

115752566.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.47
/source/dnode/mnode/impl/src/mndRsma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndInfoSchema.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndShow.h"
25
#include "mndSma.h"
26
#include "mndStb.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "parser.h"
31
#include "tname.h"
32

33
#define MND_RSMA_VER_NUMBER   1
34
#define MND_RSMA_RESERVE_SIZE 64
35

36
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pSma);
37
static SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw);
38
static int32_t  mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pSma);
39
static int32_t  mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pSpSmatb);
40
static int32_t  mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew);
41
static int32_t  mndProcessCreateRsmaReq(SRpcMsg *pReq);
42
static int32_t  mndProcessDropRsmaReq(SRpcMsg *pReq);
43
static int32_t  mndProcessAlterRsmaReq(SRpcMsg *pReq);
44
static int32_t  mndProcessGetRsmaReq(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelRetrieveRsma(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveRsmaTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelRetrieveRsmaTask(SMnode *pMnode, void *pIter);
50

51
int32_t mndInitRsma(SMnode *pMnode) {
385,813✔
52
  SSdbTable table = {
385,813✔
53
      .sdbType = SDB_RSMA,
54
      .keyType = SDB_KEY_BINARY,
55
      .encodeFp = (SdbEncodeFp)mndRsmaActionEncode,
56
      .decodeFp = (SdbDecodeFp)mndRsmaActionDecode,
57
      .insertFp = (SdbInsertFp)mndRsmaActionInsert,
58
      .updateFp = (SdbUpdateFp)mndRsmaActionUpdate,
59
      .deleteFp = (SdbDeleteFp)mndRsmaActionDelete,
60
  };
61

62
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_RSMA, mndProcessCreateRsmaReq);
385,813✔
63
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_RSMA_RSP, mndTransProcessRsp);
385,813✔
64
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_RSMA, mndProcessDropRsmaReq);
385,813✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_RSMA_RSP, mndTransProcessRsp);
385,813✔
66
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_RSMA, mndProcessAlterRsmaReq);
385,813✔
67
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_RSMA_RSP, mndTransProcessRsp);
385,813✔
68
  mndSetMsgHandle(pMnode, TDMT_MND_GET_RSMA, mndProcessGetRsmaReq);
385,813✔
69
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndRetrieveRsma);
385,813✔
70
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndCancelRetrieveRsma);
385,813✔
71

72
  return sdbSetTable(pMnode->pSdb, table);
385,813✔
73
}
74

75
void mndCleanupRsma(SMnode *pMnode) {}
385,750✔
76

77
void mndRsmaFreeObj(SRsmaObj *pObj) {
60,544✔
78
  if (pObj) {
60,544✔
79
    taosMemoryFreeClear(pObj->funcColIds);
60,544✔
80
    taosMemoryFreeClear(pObj->funcIds);
60,544✔
81
  }
82
}
60,544✔
83

84
static int32_t tSerializeSRsmaObj(void *buf, int32_t bufLen, const SRsmaObj *pObj) {
91,520✔
85
  int32_t  code = 0, lino = 0;
91,520✔
86
  int32_t  tlen = 0;
91,520✔
87
  SEncoder encoder = {0};
91,520✔
88
  tEncoderInit(&encoder, buf, bufLen);
91,520✔
89

90
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
91,520✔
91

92
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->name));
183,040✔
93
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->tbName));
183,040✔
94
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbFName));
183,040✔
95
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->createUser));
183,040✔
96
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->createdTime));
183,040✔
97
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->updateTime));
183,040✔
98
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->uid));
183,040✔
99
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->tbUid));
183,040✔
100
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
183,040✔
101
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[0]));
183,040✔
102
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[1]));
183,040✔
103
  TAOS_CHECK_EXIT(tEncodeU64v(&encoder, pObj->reserved));
183,040✔
104
  TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->version));
183,040✔
105
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->tbType));
183,040✔
106
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->intervalUnit));
183,040✔
107
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nFuncs));
183,040✔
108
  for (int16_t i = 0; i < pObj->nFuncs; ++i) {
608,256✔
109
    TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->funcColIds[i]));
1,033,472✔
110
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->funcIds[i]));
1,033,472✔
111
  }
112
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->ownerId));
183,040✔
113

114
  tEndEncode(&encoder);
91,520✔
115

116
  tlen = encoder.pos;
91,520✔
117
_exit:
91,520✔
118
  tEncoderClear(&encoder);
91,520✔
119
  if (code < 0) {
91,520✔
120
    mError("rsma, %s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
121
    TAOS_RETURN(code);
×
122
  }
123

124
  return tlen;
91,520✔
125
}
126

127
static int32_t tDeserializeSRsmaObj(void *buf, int32_t bufLen, SRsmaObj *pObj) {
44,352✔
128
  int32_t  code = 0, lino = 0;
44,352✔
129
  SDecoder decoder = {0};
44,352✔
130
  tDecoderInit(&decoder, buf, bufLen);
44,352✔
131

132
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
44,352✔
133

134
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->name));
44,352✔
135
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->tbName));
44,352✔
136
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbFName));
44,352✔
137
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->createUser));
44,352✔
138
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->createdTime));
88,704✔
139
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->updateTime));
88,704✔
140
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->uid));
88,704✔
141
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->tbUid));
88,704✔
142
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
88,704✔
143
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[0]));
88,704✔
144
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[1]));
88,704✔
145
  TAOS_CHECK_EXIT(tDecodeU64v(&decoder, &pObj->reserved));
88,704✔
146
  TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->version));
88,704✔
147
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->tbType));
88,704✔
148
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->intervalUnit));
88,704✔
149
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nFuncs));
88,704✔
150
  if (pObj->nFuncs > 0) {
44,352✔
151
    if (!(pObj->funcColIds = taosMemoryMalloc(sizeof(col_id_t) * pObj->nFuncs))) {
44,352✔
UNCOV
152
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
153
    }
154
    if (!(pObj->funcIds = taosMemoryMalloc(sizeof(int32_t) * pObj->nFuncs))) {
44,352✔
UNCOV
155
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
156
    }
157
    for (int16_t i = 0; i < pObj->nFuncs; ++i) {
295,680✔
158
      TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->funcColIds[i]));
502,656✔
159
      TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->funcIds[i]));
502,656✔
160
    }
161
  }
162
  if (!tDecodeIsEnd(&decoder)) {
44,352✔
163
    TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->ownerId));
88,704✔
164
  }
165

166
_exit:
44,352✔
167
  tEndDecode(&decoder);
44,352✔
168
  tDecoderClear(&decoder);
44,352✔
169
  if (code < 0) {
44,352✔
UNCOV
170
    mError("rsma, %s failed at line %d since %s, row:%p", __func__, lino, tstrerror(code), pObj);
×
171
  }
172
  TAOS_RETURN(code);
44,352✔
173
}
174

175
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pObj) {
45,760✔
176
  int32_t  code = 0, lino = 0;
45,760✔
177
  void    *buf = NULL;
45,760✔
178
  SSdbRaw *pRaw = NULL;
45,760✔
179
  int32_t  tlen = tSerializeSRsmaObj(NULL, 0, pObj);
45,760✔
180
  if (tlen < 0) {
45,760✔
UNCOV
181
    TAOS_CHECK_EXIT(tlen);
×
182
  }
183

184
  int32_t size = sizeof(int32_t) + tlen;
45,760✔
185
  pRaw = sdbAllocRaw(SDB_RSMA, MND_RSMA_VER_NUMBER, size);
45,760✔
186
  if (pRaw == NULL) {
45,760✔
UNCOV
187
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
188
  }
189

190
  buf = taosMemoryMalloc(tlen);
45,760✔
191
  if (buf == NULL) {
45,760✔
UNCOV
192
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
193
  }
194

195
  tlen = tSerializeSRsmaObj(buf, tlen, pObj);
45,760✔
196
  if (tlen < 0) {
45,760✔
UNCOV
197
    TAOS_CHECK_EXIT(tlen);
×
198
  }
199

200
  int32_t dataPos = 0;
45,760✔
201
  SDB_SET_INT32(pRaw, dataPos, tlen, _exit);
45,760✔
202
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _exit);
45,760✔
203
  SDB_SET_DATALEN(pRaw, dataPos, _exit);
45,760✔
204

205
_exit:
45,760✔
206
  taosMemoryFreeClear(buf);
45,760✔
207
  if (code != TSDB_CODE_SUCCESS) {
45,760✔
UNCOV
208
    terrno = code;
×
UNCOV
209
    mError("rsma, failed at line %d to encode to raw:%p since %s", lino, pRaw, tstrerror(code));
×
UNCOV
210
    sdbFreeRaw(pRaw);
×
UNCOV
211
    return NULL;
×
212
  }
213

214
  mTrace("rsma, encode to raw:%p, row:%p", pRaw, pObj);
45,760✔
215
  return pRaw;
45,760✔
216
}
217

218
SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw) {
44,352✔
219
  int32_t   code = 0, lino = 0;
44,352✔
220
  SSdbRow  *pRow = NULL;
44,352✔
221
  SRsmaObj *pObj = NULL;
44,352✔
222
  void     *buf = NULL;
44,352✔
223

224
  int8_t sver = 0;
44,352✔
225
  TAOS_CHECK_EXIT(sdbGetRawSoftVer(pRaw, &sver));
44,352✔
226

227
  if (sver != MND_RSMA_VER_NUMBER) {
44,352✔
UNCOV
228
    mError("rsma read invalid ver, data ver: %d, curr ver: %d", sver, MND_RSMA_VER_NUMBER);
×
229
    TAOS_CHECK_EXIT(TSDB_CODE_SDB_INVALID_DATA_VER);
×
230
  }
231

232
  if (!(pRow = sdbAllocRow(sizeof(SRsmaObj)))) {
44,352✔
233
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
234
  }
235

236
  if (!(pObj = sdbGetRowObj(pRow))) {
44,352✔
UNCOV
237
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
238
  }
239

240
  int32_t tlen;
44,352✔
241
  int32_t dataPos = 0;
44,352✔
242
  SDB_GET_INT32(pRaw, dataPos, &tlen, _exit);
44,352✔
243
  buf = taosMemoryMalloc(tlen + 1);
44,352✔
244
  if (buf == NULL) {
44,352✔
UNCOV
245
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
246
  }
247
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _exit);
44,352✔
248

249
  TAOS_CHECK_EXIT(tDeserializeSRsmaObj(buf, tlen, pObj));
44,352✔
250

251
  taosInitRWLatch(&pObj->lock);
44,352✔
252

253
_exit:
44,352✔
254
  taosMemoryFreeClear(buf);
44,352✔
255
  if (code != TSDB_CODE_SUCCESS) {
44,352✔
256
    terrno = code;
×
UNCOV
257
    mError("rsma, failed at line %d to decode from raw:%p since %s", lino, pRaw, tstrerror(code));
×
UNCOV
258
    mndRsmaFreeObj(pObj);
×
UNCOV
259
    taosMemoryFreeClear(pRow);
×
UNCOV
260
    return NULL;
×
261
  }
262
  mTrace("rsma, decode from raw:%p, row:%p", pRaw, pObj);
44,352✔
263
  return pRow;
44,352✔
264
}
265

266
static int32_t mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pObj) {
10,560✔
267
  mTrace("rsma:%s, perform insert action, row:%p", pObj->name, pObj);
10,560✔
268
  return 0;
10,560✔
269
}
270

271
static int32_t mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pObj) {
44,352✔
272
  mTrace("rsma:%s, perform delete action, row:%p", pObj->name, pObj);
44,352✔
273
  mndRsmaFreeObj(pObj);
44,352✔
274
  return 0;
44,352✔
275
}
276

277
static int32_t mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew) {
23,936✔
278
  mTrace("rsma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
23,936✔
279
  taosWLockLatch(&pOld->lock);
23,936✔
280
  pOld->updateTime = pNew->updateTime;
23,936✔
281
  pOld->nFuncs = pNew->nFuncs;
23,936✔
282
  pOld->ownerId = pNew->ownerId;
23,936✔
283
  TSWAP(pOld->funcColIds, pNew->funcColIds);
23,936✔
284
  TSWAP(pOld->funcIds, pNew->funcIds);
23,936✔
285
  taosWUnLockLatch(&pOld->lock);
23,936✔
286
  return 0;
23,936✔
287
}
288

289
SRsmaObj *mndAcquireRsma(SMnode *pMnode, char *name) {
46,464✔
290
  SSdb     *pSdb = pMnode->pSdb;
46,464✔
291
  SRsmaObj *pObj = sdbAcquire(pSdb, SDB_RSMA, name);
46,464✔
292
  if (pObj == NULL) {
46,464✔
293
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
19,008✔
294
      terrno = TSDB_CODE_RSMA_NOT_EXIST;
19,008✔
295
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
296
      terrno = TSDB_CODE_MND_RSMA_IN_CREATING;
×
UNCOV
297
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
UNCOV
298
      terrno = TSDB_CODE_MND_RSMA_IN_DROPPING;
×
299
    } else {
UNCOV
300
      terrno = TSDB_CODE_APP_ERROR;
×
UNCOV
301
      mFatal("rsma:%s, failed to acquire rsma since %s", name, terrstr());
×
302
    }
303
  }
304
  return pObj;
46,464✔
305
}
306

307
void mndReleaseRsma(SMnode *pMnode, SRsmaObj *pSma) {
27,456✔
308
  SSdb *pSdb = pMnode->pSdb;
27,456✔
309
  sdbRelease(pSdb, pSma);
27,456✔
310
}
27,456✔
311
#ifdef TD_ENTERPRISE
312
static int32_t mndSetCreateRsmaRedoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
313
  int32_t  code = 0;
×
UNCOV
314
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
×
315
  if (pRedoRaw == NULL) {
×
316
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
317
    if (terrno != 0) code = terrno;
×
318
    TAOS_RETURN(code);
×
319
  }
UNCOV
320
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
321
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
322

323
  TAOS_RETURN(code);
×
324
}
325

326
static int32_t mndSetCreateRsmaUndoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
327
  int32_t  code = 0;
×
UNCOV
328
  SSdbRaw *pUndoRaw = mndRsmaActionEncode(pSma);
×
329
  if (!pUndoRaw) {
×
330
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
331
    if (terrno != 0) code = terrno;
×
UNCOV
332
    TAOS_RETURN(code);
×
333
  }
UNCOV
334
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
UNCOV
335
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
UNCOV
336
  TAOS_RETURN(code);
×
337
}
338

339
static int32_t mndSetCreateRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
10,560✔
340
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
10,560✔
341
  if (pDbRaw == NULL) return -1;
10,560✔
342

343
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
10,560✔
344
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
10,560✔
345
  return 0;
10,560✔
346
}
347

348
static void *mndBuildVCreateRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
21,120✔
349
                                    SMCreateRsmaReq *pCreate, int32_t *pContLen) {
350
  int32_t         code = 0, lino = 0;
21,120✔
351
  SMsgHead       *pHead = NULL;
21,120✔
352
  SVCreateRsmaReq req = *pCreate;
21,120✔
353

354
  req.uid = pObj->uid;  // use the uid generated by mnode
21,120✔
355

356
  int32_t contLen = tSerializeSVCreateRsmaReq(NULL, 0, &req);
21,120✔
357
  TAOS_CHECK_EXIT(contLen);
21,120✔
358
  contLen += sizeof(SMsgHead);
21,120✔
359
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
21,120✔
360
  pHead->contLen = htonl(contLen);
21,120✔
361
  pHead->vgId = htonl(pVgroup->vgId);
21,120✔
362
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
21,120✔
363
  TAOS_CHECK_EXIT(tSerializeSVCreateRsmaReq(pBuf, contLen, &req));
21,120✔
364
_exit:
21,120✔
365
  if (code < 0) {
21,120✔
UNCOV
366
    taosMemoryFreeClear(pHead);
×
UNCOV
367
    terrno = code;
×
UNCOV
368
    *pContLen = 0;
×
UNCOV
369
    return NULL;
×
370
  }
371
  *pContLen = contLen;
21,120✔
372
  return pHead;
21,120✔
373
}
374

375
static int32_t mndSetCreateRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
10,560✔
376
                                           SMCreateRsmaReq *pCreate) {
377
  int32_t code = 0;
10,560✔
378
  SSdb   *pSdb = pMnode->pSdb;
10,560✔
379
  SVgObj *pVgroup = NULL;
10,560✔
380
  void   *pIter = NULL;
10,560✔
381

382
  SName name = {0};
10,560✔
383
  if ((code = tNameFromString(&name, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
10,560✔
UNCOV
384
    return code;
×
385
  }
386
  tstrncpy(pCreate->tbFName, (char *)tNameGetTableName(&name), sizeof(pCreate->tbFName));  // convert tbFName to tbName
10,560✔
387

388
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
52,800✔
389
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
42,240✔
390
      sdbRelease(pSdb, pVgroup);
21,120✔
391
      continue;
21,120✔
392
    }
393

394
    int32_t contLen = 0;
21,120✔
395
    void   *pReq = mndBuildVCreateRsmaReq(pMnode, pVgroup, pStb, pObj, pCreate, &contLen);
21,120✔
396
    if (pReq == NULL) {
21,120✔
UNCOV
397
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
398
      sdbRelease(pSdb, pVgroup);
×
UNCOV
399
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
400
      TAOS_RETURN(code);
×
401
    }
402

403
    STransAction action = {0};
21,120✔
404
    action.mTraceId = pTrans->mTraceId;
21,120✔
405
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
21,120✔
406
    action.pCont = pReq;
21,120✔
407
    action.contLen = contLen;
21,120✔
408
    action.msgType = TDMT_VND_CREATE_RSMA;
21,120✔
409
    action.acceptableCode = TSDB_CODE_RSMA_ALREADY_EXISTS;  // check whether the rsma uid exist
21,120✔
410
    action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;         // retry if relative table not exist
21,120✔
411
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
21,120✔
UNCOV
412
      taosMemoryFree(pReq);
×
UNCOV
413
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
414
      sdbRelease(pSdb, pVgroup);
×
UNCOV
415
      TAOS_RETURN(code);
×
416
    }
417
    sdbRelease(pSdb, pVgroup);
21,120✔
418
  }
419

420
  TAOS_RETURN(code);
10,560✔
421
}
422

423
static int32_t mndSetCreateRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
16,192✔
424
  int32_t  code = 0;
16,192✔
425
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
16,192✔
426
  if (pCommitRaw == NULL) {
16,192✔
UNCOV
427
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
428
    if (terrno != 0) code = terrno;
×
UNCOV
429
    TAOS_RETURN(code);
×
430
  }
431
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
16,192✔
432
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
16,192✔
433

434
  TAOS_RETURN(code);
16,192✔
435
}
436

437
static int32_t mndSetDropRsmaPrepareLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
2,112✔
438
  int32_t  code = 0;
2,112✔
439
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
2,112✔
440
  if (pRedoRaw == NULL) {
2,112✔
UNCOV
441
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
442
    if (terrno != 0) code = terrno;
×
UNCOV
443
    return -1;
×
444
  }
445
  TAOS_CHECK_RETURN(mndTransAppendPrepareLog(pTrans, pRedoRaw));
2,112✔
446
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
2,112✔
447

448
  return 0;
2,112✔
449
}
450

451
static int32_t mndSetDropRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
9,856✔
452
  int32_t  code = 0;
9,856✔
453
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
9,856✔
454
  if (pCommitRaw == NULL) {
9,856✔
UNCOV
455
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
456
    if (terrno != 0) code = terrno;
×
UNCOV
457
    return -1;
×
458
  }
459
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
9,856✔
460
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
9,856✔
461

462
  return 0;
9,856✔
463
}
464

465
static void *mndBuildVDropRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SRsmaObj *pObj, int32_t *pContLen) {
4,224✔
466
  int32_t       code = 0, lino = 0;
4,224✔
467
  SMsgHead     *pHead = NULL;
4,224✔
468
  SVDropRsmaReq req = {0};
4,224✔
469

470
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
4,224✔
471
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
4,224✔
472
  req.tbType = pObj->tbType;
4,224✔
473
  req.uid = pObj->uid;
4,224✔
474
  req.tbUid = pObj->tbUid;
4,224✔
475

476
  int32_t contLen = tSerializeSVDropRsmaReq(NULL, 0, &req);
4,224✔
477
  TAOS_CHECK_EXIT(contLen);
4,224✔
478
  contLen += sizeof(SMsgHead);
4,224✔
479
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
4,224✔
480
  pHead->contLen = htonl(contLen);
4,224✔
481
  pHead->vgId = htonl(pVgroup->vgId);
4,224✔
482
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
4,224✔
483
  TAOS_CHECK_EXIT(tSerializeSVDropRsmaReq(pBuf, contLen, &req));
4,224✔
484
_exit:
4,224✔
485
  if (code < 0) {
4,224✔
UNCOV
486
    taosMemoryFreeClear(pHead);
×
UNCOV
487
    terrno = code;
×
UNCOV
488
    *pContLen = 0;
×
UNCOV
489
    return NULL;
×
490
  }
491
  *pContLen = contLen;
4,224✔
492
  return pHead;
4,224✔
493
}
494

495
static int32_t mndSetDropRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SRsmaObj *pSma) {
2,112✔
496
  int32_t code = 0;
2,112✔
497
  SSdb   *pSdb = pMnode->pSdb;
2,112✔
498
  SVgObj *pVgroup = NULL;
2,112✔
499
  void   *pIter = NULL;
2,112✔
500

501
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
10,560✔
502
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
8,448✔
503
      sdbRelease(pSdb, pVgroup);
4,224✔
504
      continue;
4,224✔
505
    }
506

507
    int32_t contLen = 0;
4,224✔
508
    void   *pReq = mndBuildVDropRsmaReq(pMnode, pVgroup, pSma, &contLen);
4,224✔
509
    if (pReq == NULL) {
4,224✔
UNCOV
510
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
511
      sdbRelease(pSdb, pVgroup);
×
UNCOV
512
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
513
      TAOS_RETURN(code);
×
514
    }
515

516
    STransAction action = {0};
4,224✔
517
    action.mTraceId = pTrans->mTraceId;
4,224✔
518
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
4,224✔
519
    action.pCont = pReq;
4,224✔
520
    action.contLen = contLen;
4,224✔
521
    action.msgType = TDMT_VND_DROP_RSMA;
4,224✔
522
    action.acceptableCode = TSDB_CODE_RSMA_NOT_EXIST;
4,224✔
523
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
4,224✔
UNCOV
524
      taosMemoryFree(pReq);
×
UNCOV
525
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
526
      sdbRelease(pSdb, pVgroup);
×
UNCOV
527
      TAOS_RETURN(code);
×
528
    }
529
    sdbRelease(pSdb, pVgroup);
4,224✔
530
  }
531
  TAOS_RETURN(code);
2,112✔
532
}
533

534
static int32_t mndDropRsma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SRsmaObj *pObj) {
2,112✔
535
  int32_t code = 0, lino = 0;
2,112✔
536

537
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-rsma");
2,112✔
538
  if (pTrans == NULL) {
2,112✔
UNCOV
539
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
540
    if (terrno != 0) code = terrno;
×
UNCOV
541
    goto _exit;
×
542
  }
543

544
  mInfo("trans:%d start to drop rsma:%s", pTrans->id, pObj->name);
2,112✔
545

546
  mndTransSetDbName(pTrans, pDb->name, pObj->name);
2,112✔
547
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
2,112✔
548
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
2,112✔
549

550
  mndTransSetOper(pTrans, MND_OPER_DROP_RSMA);
2,112✔
551
  TAOS_CHECK_EXIT(mndSetDropRsmaPrepareLogs(pMnode, pTrans, pObj));
2,112✔
552
  TAOS_CHECK_EXIT(mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj));
2,112✔
553
  TAOS_CHECK_EXIT(mndSetDropRsmaRedoActions(pMnode, pTrans, pDb, pObj));
2,112✔
554

555
  // int32_t rspLen = 0;
556
  // void   *pRsp = NULL;
557
  // TAOS_CHECK_EXIT(mndBuildDropRsmaRsp(pObj, &rspLen, &pRsp, false));
558
  // mndTransSetRpcRsp(pTrans, pRsp, rspLen);
559

560
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
2,112✔
561
_exit:
2,112✔
562
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,112✔
UNCOV
563
    mError("rsma:%s, failed to drop at line:%d since %s", pObj->name, lino, tstrerror(code));
×
564
  }
565
  mndTransDrop(pTrans);
2,112✔
566
  TAOS_RETURN(code);
2,112✔
567
}
568
#endif
569
static int32_t mndProcessDropRsmaReq(SRpcMsg *pReq) {
2,112✔
570
  SMnode *pMnode = pReq->info.node;
2,112✔
571
  int32_t code = 0, lino = 0;
2,112✔
572
#ifdef TD_ENTERPRISE
573
  SDbObj       *pDb = NULL;
2,112✔
574
  SRsmaObj     *pObj = NULL;
2,112✔
575
  SUserObj     *pUser = NULL;
2,112✔
576
  SMDropRsmaReq dropReq = {0};
2,112✔
577
  int64_t       tss = taosGetTimestampMs();
2,112✔
578

579
  TAOS_CHECK_GOTO(tDeserializeSMDropRsmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _exit);
2,112✔
580

581
  mInfo("rsma:%s, start to drop", dropReq.name);
2,112✔
582

583
  pObj = mndAcquireRsma(pMnode, dropReq.name);
2,112✔
584
  if (pObj == NULL) {
2,112✔
UNCOV
585
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
586
    if (terrno != 0) code = terrno;
×
UNCOV
587
    if (dropReq.igNotExists) {
×
UNCOV
588
      code = 0;  // mndBuildDropMountRsp(pObj, &pReq->info.rspLen, &pReq->info.rsp, true);
×
589
    }
UNCOV
590
    goto _exit;
×
591
  }
592

593
  SName name = {0};
2,112✔
594
  TAOS_CHECK_EXIT(tNameFromString(&name, pObj->dbFName, T_NAME_ACCT | T_NAME_DB));
2,112✔
595
  if (!(pDb = mndAcquireDb(pMnode, pObj->dbFName))) {
2,112✔
UNCOV
596
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
597
  }
598

599
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
2,112✔
600

601
  // TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb), NULL, _exit);
602
  TAOS_CHECK_EXIT(
2,112✔
603
      mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_DROP, PRIV_OBJ_RSMA, pObj->ownerId, pObj->dbFName, pObj->name));
604

605
  code = mndDropRsma(pMnode, pReq, pDb, pObj);
2,112✔
606
  if (code == TSDB_CODE_SUCCESS) {
2,112✔
607
    code = TSDB_CODE_ACTION_IN_PROGRESS;
2,112✔
608
  }
609

610
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
2,112✔
611
    int64_t tse = taosGetTimestampMs();
2,112✔
612
    double  duration = (double)(tse - tss);
2,112✔
613
    duration = duration / 1000;
2,112✔
614
    auditRecord(pReq, pMnode->clusterId, "dropRsma", dropReq.name, "", "", 0, duration, 0);
2,112✔
615
  }
616
_exit:
2,112✔
617
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,112✔
UNCOV
618
    mError("rsma:%s, failed at line %d to drop since %s", dropReq.name, lino, tstrerror(code));
×
619
  }
620

621
  mndReleaseDb(pMnode, pDb);
2,112✔
622
  mndReleaseRsma(pMnode, pObj);
2,112✔
623
  mndReleaseUser(pMnode, pUser);
2,112✔
624
#endif
625
  TAOS_RETURN(code);
2,112✔
626
}
627
#ifdef TD_ENTERPRISE
628
static int32_t mndCreateRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
10,560✔
629
                             SMCreateRsmaReq *pCreate) {
630
  int32_t  code = 0, lino = 0;
10,560✔
631
  SRsmaObj obj = {0};
10,560✔
632
  STrans  *pTrans = NULL;
10,560✔
633

634
  (void)snprintf(obj.name, TSDB_TABLE_NAME_LEN, "%s", pCreate->name);
10,560✔
635
  (void)snprintf(obj.dbFName, TSDB_DB_FNAME_LEN, "%s", pDb->name);
10,560✔
636

637
  const char *tbName = strrchr(pCreate->tbFName, '.');
10,560✔
638
  (void)snprintf(obj.tbName, TSDB_TABLE_NAME_LEN, "%s", tbName ? tbName + 1 : pCreate->tbFName);
10,560✔
639
  (void)snprintf(obj.createUser, TSDB_USER_LEN, "%s", pUser->user);
10,560✔
640
  obj.ownerId = pUser->uid;
10,560✔
641
  obj.createdTime = taosGetTimestampMs();
10,560✔
642
  obj.updateTime = obj.createdTime;
10,560✔
643
  obj.uid = mndGenerateUid(obj.name, strlen(obj.name));
10,560✔
644
  obj.tbUid = pCreate->tbUid;
10,560✔
645
  obj.dbUid = pDb->uid;
10,560✔
646
  obj.interval[0] = pCreate->interval[0];
10,560✔
647
  obj.interval[1] = pCreate->interval[1];
10,560✔
648
  obj.version = 1;
10,560✔
649
  obj.tbType = pCreate->tbType;  // ETableType: 1 stable. Only super table supported currently.
10,560✔
650
  obj.intervalUnit = pCreate->intervalUnit;
10,560✔
651
  obj.nFuncs = pCreate->nFuncs;
10,560✔
652
  if (obj.nFuncs > 0) {
10,560✔
653
    TSDB_CHECK_NULL((obj.funcColIds = taosMemoryCalloc(obj.nFuncs, sizeof(col_id_t))), code, lino, _exit, terrno);
10,560✔
654
    TSDB_CHECK_NULL((obj.funcIds = taosMemoryCalloc(obj.nFuncs, sizeof(func_id_t))), code, lino, _exit, terrno);
10,560✔
655
    for (int16_t i = 0; i < obj.nFuncs; ++i) {
68,992✔
656
      obj.funcColIds[i] = pCreate->funcColIds[i];
58,432✔
657
      obj.funcIds[i] = pCreate->funcIds[i];
58,432✔
658
    }
659
  }
660

661
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-rsma")),
10,560✔
662
                  code, lino, _exit, terrno);
663
  mInfo("trans:%d, used to create rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
10,560✔
664

665
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
10,560✔
666
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
10,560✔
667
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
10,560✔
668

669
  mndTransSetOper(pTrans, MND_OPER_CREATE_RSMA);
10,560✔
670
  TAOS_CHECK_EXIT(mndSetCreateRsmaPrepareActions(pMnode, pTrans, &obj));
10,560✔
671
  TAOS_CHECK_EXIT(mndSetCreateRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pCreate));
10,560✔
672
  TAOS_CHECK_EXIT(mndSetCreateRsmaCommitLogs(pMnode, pTrans, &obj));
10,560✔
673
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
10,560✔
674
_exit:
10,560✔
675
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
10,560✔
UNCOV
676
    mError("rsma:%s, failed at line %d to create rsma, since %s", obj.name, lino, tstrerror(code));
×
677
  }
678
  mndTransDrop(pTrans);
10,560✔
679
  mndRsmaFreeObj(&obj);
10,560✔
680
  TAOS_RETURN(code);
10,560✔
681
}
682

683
static int32_t mndCheckCreateRsmaReq(SMCreateRsmaReq *pCreate) {
17,600✔
684
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
17,600✔
685
  if (pCreate->name[0] == 0) goto _exit;
17,600✔
686
  if (pCreate->tbFName[0] == 0) goto _exit;
17,600✔
687
  if (pCreate->igExists < 0 || pCreate->igExists > 1) goto _exit;
17,600✔
688
  if (pCreate->intervalUnit < 0) goto _exit;
17,600✔
689
  if (pCreate->interval[0] < 0) goto _exit;
17,600✔
690
  if (pCreate->interval[1] < 0) goto _exit;
17,600✔
691
  if (pCreate->interval[0] == 0 && pCreate->interval[1] == 0) goto _exit;
17,600✔
692

693
  SName fname = {0};
17,600✔
694
  if ((code = tNameFromString(&fname, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) < 0) goto _exit;
17,600✔
695
  if (*(char *)tNameGetTableName(&fname) == 0) goto _exit;
17,600✔
696
  code = 0;
17,600✔
697
_exit:
17,600✔
698
  TAOS_RETURN(code);
17,600✔
699
}
700

701
static int32_t mndCheckRsmaConflicts(SMnode *pMnode, SDbObj *pDbObj, SMCreateRsmaReq *pCreate) {
14,080✔
702
  void     *pIter = NULL;
14,080✔
703
  SSdb     *pSdb = pMnode->pSdb;
14,080✔
704
  SRsmaObj *pObj = NULL;
14,080✔
705
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
19,712✔
706
    if (pObj->tbUid == pCreate->tbUid && pObj->dbUid == pDbObj->uid) {
9,152✔
707
      sdbCancelFetch(pSdb, (pIter));
3,520✔
708
      sdbRelease(pSdb, pObj);
3,520✔
709
      mError("rsma:%s, conflict with existing rsma %s on same table %s.%s:%" PRIi64, pCreate->name, pObj->name,
3,520✔
710
             pObj->dbFName, pObj->tbName, pObj->tbUid);
711
      return TSDB_CODE_MND_RSMA_EXIST_IN_TABLE;
3,520✔
712
    }
713
    sdbRelease(pSdb, pObj);
5,632✔
714
  }
715
  return 0;
10,560✔
716
}
717
#endif
718
static int32_t mndProcessCreateRsmaReq(SRpcMsg *pReq) {
17,600✔
719
  int32_t code = 0, lino = 0;
17,600✔
720
#ifdef TD_ENTERPRISE
721
  SMnode         *pMnode = pReq->info.node;
17,600✔
722
  SDbObj         *pDb = NULL;
17,600✔
723
  SStbObj        *pStb = NULL;
17,600✔
724
  SRsmaObj       *pSma = NULL;
17,600✔
725
  SUserObj       *pUser = NULL;
17,600✔
726
  int64_t         mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
17,600✔
727
  SMCreateRsmaReq createReq = {0};
17,600✔
728
  int64_t         tss = taosGetTimestampMs();
17,600✔
729

730
  TAOS_CHECK_EXIT(tDeserializeSMCreateRsmaReq(pReq->pCont, pReq->contLen, &createReq));
17,600✔
731

732
  mInfo("start to create rsma: %s", createReq.name);
17,600✔
733
  TAOS_CHECK_EXIT(mndCheckCreateRsmaReq(&createReq));
17,600✔
734

735
  if ((pSma = mndAcquireRsma(pMnode, createReq.name))) {
17,600✔
736
    if (createReq.igExists) {
3,520✔
UNCOV
737
      mInfo("rsma:%s, already exist, ignore exist is set", createReq.name);
×
738
      code = 0;
×
UNCOV
739
      goto _exit;
×
740
    } else {
741
      TAOS_CHECK_EXIT(TSDB_CODE_RSMA_ALREADY_EXISTS);
3,520✔
742
    }
743
  } else {
744
    if ((code = terrno) == TSDB_CODE_RSMA_NOT_EXIST) {
14,080✔
745
      // continue
746
    } else {  // TSDB_CODE_MND_RSMA_IN_CREATING | TSDB_CODE_MND_RSMA_IN_DROPPING | TSDB_CODE_APP_ERROR
UNCOV
747
      goto _exit;
×
748
    }
749
  }
750

751
  SName name = {0};
14,080✔
752
  TAOS_CHECK_EXIT(tNameFromString(&name, createReq.tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
14,080✔
753
  char db[TSDB_TABLE_FNAME_LEN] = {0};
14,080✔
754
  (void)tNameGetFullDbName(&name, db);
14,080✔
755

756
  pDb = mndAcquireDb(pMnode, db);
14,080✔
757
  if (pDb == NULL) {
14,080✔
UNCOV
758
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
759
  }
760

761
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
14,080✔
762

763
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_READ_DB, pDb));
764
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb));
765

766
  // already check select table/insert table/create rsma privileges in parser
767
  TAOS_CHECK_EXIT(
14,080✔
768
      mndCheckObjPrivilegeRec(pMnode, pUser, PRIV_DB_USE, PRIV_OBJ_DB, pDb->ownerId, name.acctId, name.dbname, NULL));
769

770
  pStb = mndAcquireStb(pMnode, createReq.tbFName);
14,080✔
771
  if (pStb == NULL) {
14,080✔
UNCOV
772
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
773
  }
774

775
  TAOS_CHECK_EXIT(mndCheckRsmaConflicts(pMnode, pDb, &createReq));
14,080✔
776

777
  TAOS_CHECK_EXIT(mndCreateRsma(pMnode, pReq, pUser, pDb, pStb, &createReq));
10,560✔
778

779
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
10,560✔
780

781
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
10,560✔
782
    int64_t tse = taosGetTimestampMs();
10,560✔
783
    double  duration = (double)(tse - tss);
10,560✔
784
    duration = duration / 1000;
10,560✔
785
    auditRecord(pReq, pMnode->clusterId, "createRsma", createReq.name, createReq.tbFName, "", 0, duration, 0);
10,560✔
786
  }
787
_exit:
17,600✔
788
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
17,600✔
789
    mError("rsma:%s, failed at line %d to create since %s", createReq.name, lino, tstrerror(code));
7,040✔
790
  }
791
  if (pSma) mndReleaseRsma(pMnode, pSma);
17,600✔
792
  if (pStb) mndReleaseStb(pMnode, pStb);
17,600✔
793
  if (pDb) mndReleaseDb(pMnode, pDb);
17,600✔
794
  if (pUser) mndReleaseUser(pMnode, pUser);
17,600✔
795
  tFreeSMCreateRsmaReq(&createReq);
17,600✔
796
#endif
797
  TAOS_RETURN(code);
17,600✔
798
}
799

800
#ifdef TD_ENTERPRISE
801
static int32_t mndCheckAlterRsmaReq(SMAlterRsmaReq *pReq) {
5,632✔
802
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
5,632✔
803
  if (pReq->name[0] == 0) goto _exit;
5,632✔
804
  if (pReq->igNotExists < 0 || pReq->igNotExists > 1) goto _exit;
5,632✔
805

806
  code = 0;
5,632✔
807
_exit:
5,632✔
808
  TAOS_RETURN(code);
5,632✔
809
}
810

811
static int32_t mndSetAlterRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
5,632✔
812
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
5,632✔
813
  if (pDbRaw == NULL) return -1;
5,632✔
814

815
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
5,632✔
816
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_READY) != 0) return -1;
5,632✔
817
  return 0;
5,632✔
818
}
819

820
static int32_t mndSetAlterRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
5,632✔
821
  return mndSetCreateRsmaCommitLogs(pMnode, pTrans, pSma);
5,632✔
822
}
823

824
static void *mndBuildVAlterRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
11,264✔
825
                                   SMAlterRsmaReq *pAlter, int32_t *pContLen) {
826
  int32_t        code = 0, lino = 0;
11,264✔
827
  SMsgHead      *pHead = NULL;
11,264✔
828
  SVAlterRsmaReq req = {0};
11,264✔
829
  req.alterType = pAlter->alterType;
11,264✔
830
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
11,264✔
831
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
11,264✔
832
  req.tbType = pObj->tbType;
11,264✔
833
  req.intervalUnit = pObj->intervalUnit;
11,264✔
834
  req.interval[0] = pObj->interval[0];
11,264✔
835
  req.interval[1] = pObj->interval[1];
11,264✔
836
  req.tbUid = pObj->tbUid;
11,264✔
837
  req.uid = pObj->uid;
11,264✔
838
  req.nFuncs = pObj->nFuncs;
11,264✔
839
  req.funcColIds = pObj->funcColIds;
11,264✔
840
  req.funcIds = pObj->funcIds;
11,264✔
841

842
  int32_t contLen = tSerializeSVAlterRsmaReq(NULL, 0, &req);
11,264✔
843
  TAOS_CHECK_EXIT(contLen);
11,264✔
844
  contLen += sizeof(SMsgHead);
11,264✔
845
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
11,264✔
846
  pHead->contLen = htonl(contLen);
11,264✔
847
  pHead->vgId = htonl(pVgroup->vgId);
11,264✔
848
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
11,264✔
849
  TAOS_CHECK_EXIT(tSerializeSVAlterRsmaReq(pBuf, contLen, &req));
11,264✔
850
_exit:
11,264✔
851
  if (code < 0) {
11,264✔
UNCOV
852
    taosMemoryFreeClear(pHead);
×
UNCOV
853
    terrno = code;
×
UNCOV
854
    *pContLen = 0;
×
UNCOV
855
    return NULL;
×
856
  }
857
  *pContLen = contLen;
11,264✔
858
  return pHead;
11,264✔
859
}
860

861
static int32_t mndSetAlterRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
5,632✔
862
                                          SMAlterRsmaReq *pAlter) {
863
  int32_t code = 0;
5,632✔
864
  SSdb   *pSdb = pMnode->pSdb;
5,632✔
865
  SVgObj *pVgroup = NULL;
5,632✔
866
  void   *pIter = NULL;
5,632✔
867

868
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
28,160✔
869
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
22,528✔
870
      sdbRelease(pSdb, pVgroup);
11,264✔
871
      continue;
11,264✔
872
    }
873

874
    int32_t contLen = 0;
11,264✔
875
    void   *pReq = mndBuildVAlterRsmaReq(pMnode, pVgroup, pStb, pObj, pAlter, &contLen);
11,264✔
876
    if (pReq == NULL) {
11,264✔
877
      sdbCancelFetch(pSdb, pIter);
×
878
      sdbRelease(pSdb, pVgroup);
×
879
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
880
      TAOS_RETURN(code);
×
881
    }
882

883
    STransAction action = {0};
11,264✔
884
    action.mTraceId = pTrans->mTraceId;
11,264✔
885
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
11,264✔
886
    action.pCont = pReq;
11,264✔
887
    action.contLen = contLen;
11,264✔
888
    action.msgType = TDMT_VND_ALTER_RSMA;
11,264✔
889
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
11,264✔
UNCOV
890
      taosMemoryFree(pReq);
×
UNCOV
891
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
892
      sdbRelease(pSdb, pVgroup);
×
UNCOV
893
      TAOS_RETURN(code);
×
894
    }
895
    sdbRelease(pSdb, pVgroup);
11,264✔
896
  }
897

898
  TAOS_RETURN(code);
5,632✔
899
}
900

901
static int32_t mndAlterRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
5,632✔
902
                            SMAlterRsmaReq *pAlter, SRsmaObj *pOld) {
903
  int32_t  code = 0, lino = 0;
5,632✔
904
  STrans  *pTrans = NULL;
5,632✔
905
  SRsmaObj obj = *pOld;
5,632✔
906

907
  obj.updateTime = taosGetTimestampMs();
5,632✔
908
  ++obj.version;
5,632✔
909
  if (pAlter->alterType == TSDB_ALTER_RSMA_FUNCTION) {
5,632✔
910
    obj.nFuncs = pOld->nFuncs + pAlter->nFuncs;
5,632✔
911
    obj.funcColIds = taosMemoryMalloc(obj.nFuncs * sizeof(col_id_t));
5,632✔
912
    obj.funcIds = taosMemoryMalloc(obj.nFuncs * sizeof(func_id_t));
5,632✔
913
    if (obj.funcColIds == NULL || obj.funcIds == NULL) {
5,632✔
UNCOV
914
      TAOS_CHECK_EXIT(terrno);
×
915
    }
916
    int32_t n = 0, i = 0, j = 0;
5,632✔
917
    while (i < pOld->nFuncs && j < pAlter->nFuncs) {
30,272✔
918
      if (pOld->funcColIds[i] < pAlter->funcColIds[j]) {
24,640✔
919
        obj.funcColIds[n] = pOld->funcColIds[i];
21,120✔
920
        obj.funcIds[n++] = pOld->funcIds[i++];
21,120✔
921
      } else if (pOld->funcColIds[i] > pAlter->funcColIds[j]) {
3,520✔
922
        obj.funcColIds[n] = pAlter->funcColIds[j];
3,520✔
923
        obj.funcIds[n++] = pAlter->funcIds[j++];
3,520✔
924
      } else {
UNCOV
925
        mError("rsma:%s, conflict function on column id:%d", pOld->name, pAlter->funcColIds[j]);
×
UNCOV
926
        TAOS_CHECK_EXIT(TSDB_CODE_MND_RSMA_FUNC_CONFLICT);
×
927
      }
928
    }
929
    if (i < pOld->nFuncs) {
5,632✔
930
      while (i < pOld->nFuncs) {
7,040✔
931
        obj.funcColIds[n] = pOld->funcColIds[i];
3,520✔
932
        obj.funcIds[n++] = pOld->funcIds[i++];
3,520✔
933
      }
934
    } else if (j < pAlter->nFuncs) {
2,112✔
935
      while (j < pAlter->nFuncs) {
4,224✔
936
        obj.funcColIds[n] = pAlter->funcColIds[j];
2,112✔
937
        obj.funcIds[n++] = pAlter->funcIds[j++];
2,112✔
938
      }
939
    }
940
  } else {
UNCOV
941
    TAOS_CHECK_EXIT(TSDB_CODE_OPS_NOT_SUPPORT);
×
942
  }
943

944
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-rsma")), code,
5,632✔
945
                  lino, _exit, terrno);
946
  mInfo("trans:%d, used to alter rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
5,632✔
947

948
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
5,632✔
949
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
5,632✔
950
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
5,632✔
951

952
  mndTransSetOper(pTrans, MND_OPER_ALTER_RSMA);
5,632✔
953
  TAOS_CHECK_EXIT(mndSetAlterRsmaPrepareActions(pMnode, pTrans, &obj));
5,632✔
954
  TAOS_CHECK_EXIT(mndSetAlterRsmaCommitLogs(pMnode, pTrans, &obj));
5,632✔
955
  TAOS_CHECK_EXIT(mndSetAlterRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pAlter));
5,632✔
956

957
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
5,632✔
958
_exit:
5,632✔
959
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
5,632✔
UNCOV
960
    mError("rsma:%s, failed at line %d to alter rsma, since %s", obj.name, lino, tstrerror(code));
×
961
  }
962
  mndTransDrop(pTrans);
5,632✔
963
  mndRsmaFreeObj(&obj);
5,632✔
964
  TAOS_RETURN(code);
5,632✔
965
}
966
#endif
967
static int32_t mndProcessAlterRsmaReq(SRpcMsg *pReq) {
5,632✔
968
  int32_t code = 0, lino = 0;
5,632✔
969
#ifdef TD_ENTERPRISE
970
  SMnode        *pMnode = pReq->info.node;
5,632✔
971
  SDbObj        *pDb = NULL;
5,632✔
972
  SStbObj       *pStb = NULL;
5,632✔
973
  SRsmaObj      *pObj = NULL;
5,632✔
974
  SUserObj      *pUser = NULL;
5,632✔
975
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
5,632✔
976
  SMAlterRsmaReq req = {0};
5,632✔
977
  char           tbFName[TSDB_TABLE_FNAME_LEN] = "\0";
5,632✔
978
  int64_t        tss = taosGetTimestampMs();
5,632✔
979

980
  TAOS_CHECK_EXIT(tDeserializeSMAlterRsmaReq(pReq->pCont, pReq->contLen, &req));
5,632✔
981

982
  mInfo("start to alter rsma: %s", req.name);
5,632✔
983
  TAOS_CHECK_EXIT(mndCheckAlterRsmaReq(&req));
5,632✔
984

985
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
5,632✔
UNCOV
986
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
987
    if (terrno != 0) code = terrno;
×
UNCOV
988
    if (req.igNotExists) {
×
UNCOV
989
      code = 0;
×
990
    }
991
    goto _exit;
×
992
  }
993

994
  if (!(pDb = mndAcquireDb(pMnode, pObj->dbFName))) {
5,632✔
UNCOV
995
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
996
  }
997

998
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
5,632✔
999

1000
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_READ_DB, pDb));
1001
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb));
1002
  TAOS_CHECK_EXIT(
5,632✔
1003
      mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_ALTER, PRIV_OBJ_RSMA, pObj->ownerId, pObj->dbFName, pObj->name));
1004

1005
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
5,632✔
1006

1007
  pStb = mndAcquireStb(pMnode, tbFName);
5,632✔
1008
  if (pStb == NULL) {
5,632✔
1009
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
1010
  }
1011

1012
  TAOS_CHECK_EXIT(mndAlterRsma(pMnode, pReq, pUser, pDb, pStb, &req, pObj));
5,632✔
1013

1014
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
5,632✔
1015

1016
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
5,632✔
1017
    char alterType[32] = "\0";
5,632✔
1018
    (void)snprintf(alterType, sizeof(alterType), "alterType:%" PRIi8, req.alterType);
5,632✔
1019
    int64_t tse = taosGetTimestampMs();
5,632✔
1020
    double  duration = (double)(tse - tss);
5,632✔
1021
    duration = duration / 1000;
5,632✔
1022
    auditRecord(pReq, pMnode->clusterId, "alterRsma", req.name, tbFName, alterType, 0, duration, 0);
5,632✔
1023
  }
1024
_exit:
5,632✔
1025
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
5,632✔
UNCOV
1026
    mError("rsma:%s, failed at line %d to alter since %s", req.name, lino, tstrerror(code));
×
1027
  }
1028
  if (pObj) mndReleaseRsma(pMnode, pObj);
5,632✔
1029
  if (pStb) mndReleaseStb(pMnode, pStb);
5,632✔
1030
  if (pDb) mndReleaseDb(pMnode, pDb);
5,632✔
1031
  if (pUser) mndReleaseUser(pMnode, pUser);
5,632✔
1032
  tFreeSMAlterRsmaReq(&req);
5,632✔
1033
#endif
1034
  TAOS_RETURN(code);
5,632✔
1035
}
1036
#ifdef TD_ENTERPRISE
1037
static int32_t mndFillRsmaInfo(SRsmaObj *pObj, SStbObj *pStb, SRsmaInfoRsp *pRsp, bool withColName) {
16,192✔
1038
  int32_t code = 0, lino = 0;
16,192✔
1039
  pRsp->id = pObj->uid;
16,192✔
1040
  (void)snprintf(pRsp->name, sizeof(pRsp->name), "%s", pObj->name);
16,192✔
1041
  (void)snprintf(pRsp->tbFName, sizeof(pRsp->tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
16,192✔
1042
  pRsp->ownerId = pObj->ownerId;
16,192✔
1043
  pRsp->version = pObj->version;
16,192✔
1044
  pRsp->tbType = pObj->tbType;
16,192✔
1045
  pRsp->intervalUnit = pObj->intervalUnit;
16,192✔
1046
  pRsp->nFuncs = pObj->nFuncs;
16,192✔
1047
  pRsp->interval[0] = pObj->interval[0];
16,192✔
1048
  pRsp->interval[1] = pObj->interval[1];
16,192✔
1049
  if (pRsp->nFuncs > 0) {
16,192✔
1050
    pRsp->funcColIds = pObj->funcColIds;  // shallow copy, no need to free
16,192✔
1051
    pRsp->funcIds = pObj->funcIds;        // shallow copy, no need to free
16,192✔
1052
    if (withColName) {
16,192✔
1053
      pRsp->colNames = taosArrayInit(pRsp->nFuncs, sizeof(char *));
16,192✔
1054
      if (pRsp->colNames == NULL) {
16,192✔
UNCOV
1055
        TAOS_CHECK_EXIT(terrno);
×
1056
      }
1057
      pRsp->nColNames = pRsp->nFuncs;
16,192✔
1058
      int16_t i = 0, j = 0;
16,192✔
1059
      for (; i < pRsp->nFuncs; ++i) {
95,744✔
1060
        bool found = false;
79,552✔
1061
        for (; j < pStb->numOfColumns;) {
170,368✔
1062
          if (pStb->pColumns[j].colId == pRsp->funcColIds[i]) {
170,368✔
1063
            found = true;
79,552✔
1064
            break;
79,552✔
1065
          } else if (pStb->pColumns[j].colId < pRsp->funcColIds[i]) {
90,816✔
1066
            ++j;
90,816✔
1067
          } else {
UNCOV
1068
            break;
×
1069
          }
1070
        }
1071
        if (found) {
79,552✔
1072
          SSchema *pCol = pStb->pColumns + j;
79,552✔
1073
          char    *colName = taosStrdup(pCol->name);
79,552✔
1074
          if (colName == NULL) {
79,552✔
UNCOV
1075
            TAOS_CHECK_EXIT(terrno);
×
1076
          }
1077
          if (!taosArrayPush(pRsp->colNames, &colName)) {
159,104✔
UNCOV
1078
            taosMemoryFree(colName);
×
UNCOV
1079
            TAOS_CHECK_EXIT(terrno);
×
1080
          }
1081
        } else {
UNCOV
1082
          TAOS_CHECK_EXIT(TSDB_CODE_MND_COLUMN_NOT_EXIST);
×
1083
        }
1084
      }
1085
    }
1086
  }
1087
_exit:
16,192✔
1088
  if (code != 0) {
16,192✔
UNCOV
1089
    mError("rsma:%s, failed at line %d to get rsma info since %s", pObj->name, lino, tstrerror(code));
×
1090
  }
1091
  TAOS_RETURN(code);
16,192✔
1092
}
1093
#endif
1094
static int32_t mndProcessGetRsmaReq(SRpcMsg *pReq) {
21,120✔
1095
#ifdef TD_ENTERPRISE
1096
  int32_t      code = 0, lino = 0;
21,120✔
1097
  SMnode      *pMnode = pReq->info.node;
21,120✔
1098
  SRsmaInfoReq req = {0};
21,120✔
1099
  SRsmaInfoRsp rsp = {0};
21,120✔
1100
  SRsmaObj    *pObj = NULL;
21,120✔
1101
  SStbObj     *pStb = NULL;
21,120✔
1102
  void        *pRsp = NULL;
21,120✔
1103
  int32_t      contLen = 0;
21,120✔
1104

1105
  TAOS_CHECK_EXIT(tDeserializeRsmaInfoReq(pReq->pCont, pReq->contLen, &req));
21,120✔
1106

1107
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
21,120✔
1108
    TAOS_CHECK_EXIT(terrno);
4,928✔
1109
  }
1110

1111
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
16,192✔
1112
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
16,192✔
1113

1114
  if ((pStb = mndAcquireStb(pMnode, tbFName)) == NULL) {
16,192✔
UNCOV
1115
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
1116
  }
1117

1118
  TAOS_CHECK_EXIT(mndFillRsmaInfo(pObj, pStb, &rsp, req.withColName));
16,192✔
1119

1120
  if ((contLen = tSerializeRsmaInfoRsp(NULL, 0, &rsp)) < 0) {
16,192✔
UNCOV
1121
    TAOS_CHECK_EXIT(contLen);
×
1122
  }
1123
  if (!(pRsp = rpcMallocCont(contLen))) {
16,192✔
UNCOV
1124
    TAOS_CHECK_EXIT(terrno);
×
1125
  }
1126
  if ((contLen = tSerializeRsmaInfoRsp(pRsp, contLen, &rsp)) < 0) {
16,192✔
UNCOV
1127
    TAOS_CHECK_EXIT(contLen);
×
1128
  }
1129

1130
  pReq->info.rsp = pRsp;
16,192✔
1131
  pReq->info.rspLen = contLen;
16,192✔
1132

1133
_exit:
21,120✔
1134
  if (code != 0) {
21,120✔
1135
    rpcFreeCont(pRsp);
4,928✔
1136
  }
1137
  if (pObj) mndReleaseRsma(pMnode, pObj);
21,120✔
1138
  if (pStb) mndReleaseStb(pMnode, pStb);
21,120✔
1139
  tFreeRsmaInfoRsp(&rsp, false);
21,120✔
1140
  TAOS_RETURN(code);
21,120✔
1141
#else
1142
  return TSDB_CODE_OPS_NOT_SUPPORT;
1143
#endif
1144
}
1145
#ifdef TD_ENTERPRISE
1146
static void mndRetrieveRsmaFuncList(SMnode *pMnode, SRsmaObj *pObj, char *buf, int32_t bufLen) {
22,528✔
1147
  SSdb    *pSdb = pMnode->pSdb;
22,528✔
1148
  int32_t  numOfRows = 0;
22,528✔
1149
  SStbObj *pStb = NULL;
22,528✔
1150
  char    *qBuf = POINTER_SHIFT(buf, VARSTR_HEADER_SIZE);
22,528✔
1151
  int32_t  qBufLen = bufLen - VARSTR_HEADER_SIZE;
22,528✔
1152

1153
  qBuf[0] = 0;
22,528✔
1154
  varDataSetLen(buf, 0);  // initialize to empty string
22,528✔
1155

1156
  if (pObj->nFuncs <= 0) return;
22,528✔
1157

1158
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
22,528✔
1159
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
22,528✔
1160
  pStb = mndAcquireStb(pMnode, tbFName);
22,528✔
1161
  if (pStb == NULL) {
22,528✔
UNCOV
1162
    mWarn("rsma:%s, failed to acquire table %s for function list", pObj->name, tbFName);
×
UNCOV
1163
    return;
×
1164
  }
1165

1166
  SSchema *pColumns = pStb->pColumns;
22,528✔
1167

1168
  int32_t len = 0, j = 0;
22,528✔
1169
  char    colFunc[TSDB_COL_NAME_LEN + TSDB_FUNC_NAME_LEN + 2] = {0};
22,528✔
1170
  for (int32_t i = 0; i < pObj->nFuncs; ++i) {
161,216✔
1171
    col_id_t colId = pObj->funcColIds[i];
138,688✔
1172
    for (; j < pStb->numOfColumns;) {
287,232✔
1173
      if (pColumns[j].colId == colId) {
287,232✔
1174
        int32_t colFuncLen =
138,688✔
1175
            tsnprintf(colFunc, sizeof(colFunc), "%s(%s),", fmGetFuncName(pObj->funcIds[i]), pColumns[j].name);
138,688✔
1176
        if ((qBufLen - len) > colFuncLen) {
138,688✔
1177
          len += tsnprintf(qBuf + len, colFuncLen + 1, "%s", colFunc);
138,688✔
1178
        } else {
UNCOV
1179
          goto _exit;
×
1180
        }
1181
        break;
138,688✔
1182
      } else if (pColumns[j].colId > colId) {
148,544✔
UNCOV
1183
        break;
×
1184
      } else {
1185
        ++j;
148,544✔
1186
      }
1187
    }
1188
  }
1189
_exit:
22,528✔
1190
  qBuf[len > 0 ? len - 1 : 0] = 0;  // remove the last ','
22,528✔
1191
  varDataSetLen(buf, len > 0 ? len - 1 : 0);
22,528✔
1192
  mndReleaseStb(pMnode, pStb);
22,528✔
1193
}
1194
#endif
1195
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
15,488✔
1196
  SMnode          *pMnode = pReq->info.node;
15,488✔
1197
  int32_t          code = 0, lino = 0;
15,488✔
1198
  int32_t          numOfRows = 0;
15,488✔
1199
  int32_t          cols = 0;
15,488✔
1200
  char             tmp[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE];
15,488✔
1201
  int32_t          tmpLen = 0;
15,488✔
1202
  int32_t          bufLen = 0;
15,488✔
1203
  char            *pBuf = NULL;
15,488✔
1204
  char            *qBuf = NULL;
15,488✔
1205
  void            *pIter = NULL;
15,488✔
1206
  SSdb            *pSdb = pMnode->pSdb;
15,488✔
1207
  SColumnInfoData *pColInfo = NULL;
15,488✔
1208
  SUserObj        *pUser = NULL;
15,488✔
1209
  char             objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
15,488✔
1210
  bool             showAll = false;
15,488✔
1211

1212
#ifdef TD_ENTERPRISE
1213
  pBuf = tmp;
15,488✔
1214
  bufLen = sizeof(tmp) - VARSTR_HEADER_SIZE;
15,488✔
1215
  if (pShow->numOfRows < 1) {
15,488✔
1216
    TAOS_CHECK_EXIT(mndAcquireUser(pMnode, (RPC_MSG_USER(pReq)), &pUser));
15,488✔
1217
    (void)snprintf(objFName, sizeof(objFName), "%d.*", pUser->acctId);
15,488✔
1218
    int32_t objLevel = privObjGetLevel(PRIV_OBJ_RSMA);
15,488✔
1219
    showAll = (0 == mndCheckSysObjPrivilege(pMnode, pUser, PRIV_CM_SHOW, PRIV_OBJ_RSMA, 0, objFName,
15,488✔
1220
                                            objLevel == 0 ? NULL : "*"));  // 1.*.*
1221

1222
    SRsmaObj *pObj = NULL;
15,488✔
1223
    int32_t   index = 0;
15,488✔
1224
    while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
38,016✔
1225
      if (!showAll) {
22,528✔
UNCOV
1226
        if (mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_SHOW, PRIV_OBJ_RSMA, pObj->ownerId, pObj->dbFName,
×
1227
                                     objLevel == 0 ? NULL : pObj->name)) {  // 1.db1.rsma1
×
1228
          sdbRelease(pSdb, pObj);
×
1229
          continue;
×
1230
        }
1231
      }
1232

1233
      cols = 0;
22,528✔
1234
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
22,528✔
1235
      qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
22,528✔
1236
      TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->name));
22,528✔
1237
      varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
22,528✔
1238
      COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
22,528✔
1239

1240
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,528✔
1241
        COL_DATA_SET_VAL_GOTO((const char *)(&pObj->uid), false, pObj, pIter, _exit);
22,528✔
1242
      }
1243

1244
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,528✔
1245
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
22,528✔
1246
        const char *db = strchr(pObj->dbFName, '.');
22,528✔
1247
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", db ? db + 1 : pObj->dbFName));
22,528✔
1248
        varDataSetLen(pBuf, strlen(qBuf));
22,528✔
1249
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
22,528✔
1250
      }
1251

1252
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,528✔
1253
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
22,528✔
1254
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->tbName));
22,528✔
1255
        varDataSetLen(pBuf, strlen(qBuf));
22,528✔
1256
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
22,528✔
1257
      }
1258

1259
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,528✔
1260
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
22,528✔
1261
        if (pObj->tbType == TSDB_SUPER_TABLE) {
22,528✔
1262
          TAOS_UNUSED(snprintf(qBuf, bufLen, "SUPER_TABLE"));
22,528✔
UNCOV
1263
        } else if (pObj->tbType == TSDB_NORMAL_TABLE) {
×
UNCOV
1264
          TAOS_UNUSED(snprintf(qBuf, bufLen, "NORMAL_TABLE"));
×
UNCOV
1265
        } else if (pObj->tbType == TSDB_CHILD_TABLE) {
×
UNCOV
1266
          TAOS_UNUSED(snprintf(qBuf, bufLen, "CHILD_TABLE"));
×
1267
        } else {
1268
          TAOS_UNUSED(snprintf(qBuf, bufLen, "UNKNOWN"));
×
1269
        }
1270
        varDataSetLen(pBuf, strlen(qBuf));
22,528✔
1271
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
22,528✔
1272
      }
1273

1274
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,528✔
1275
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->createdTime, false, pObj, pIter, _exit);
22,528✔
1276
      }
1277

1278
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,528✔
1279
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
22,528✔
1280
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%" PRIi64 "%c", pObj->interval[0], pObj->intervalUnit));
22,528✔
1281
        if (pObj->interval[1] > 0) {
22,528✔
1282
          tmpLen = strlen(qBuf);
22,528✔
1283
          TAOS_UNUSED(
22,528✔
1284
              snprintf(qBuf + tmpLen, bufLen - tmpLen, ",%" PRIi64 "%c", pObj->interval[1], pObj->intervalUnit));
1285
        }
1286
        varDataSetLen(pBuf, strlen(qBuf));
22,528✔
1287
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
22,528✔
1288
      }
1289

1290
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
22,528✔
1291
        mndRetrieveRsmaFuncList(pMnode, pObj, pBuf, bufLen);
22,528✔
1292
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
22,528✔
1293
      }
1294

1295
      sdbRelease(pSdb, pObj);
22,528✔
1296
      ++numOfRows;
22,528✔
1297
    }
1298
  }
1299

1300
  pShow->numOfRows += numOfRows;
15,488✔
1301

1302
_exit:
15,488✔
1303
  if (pUser) mndReleaseUser(pMnode, pUser);
15,488✔
1304
  if (code < 0) {
15,488✔
UNCOV
1305
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1306
    TAOS_RETURN(code);
×
1307
  }
1308
#endif
1309
  return numOfRows;
15,488✔
1310
}
1311

1312
static void mndCancelRetrieveRsma(SMnode *pMnode, void *pIter) {
×
1313
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1314
  sdbCancelFetchByType(pSdb, pIter, SDB_RSMA);
×
UNCOV
1315
}
×
1316

1317
int32_t mndDropRsmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
581,474✔
1318
  int32_t code = 0;
581,474✔
1319
#ifdef TD_ENTERPRISE
1320
  SSdb     *pSdb = pMnode->pSdb;
581,474✔
1321
  SRsmaObj *pObj = NULL;
581,474✔
1322
  void     *pIter = NULL;
581,474✔
1323

1324
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
588,514✔
1325
    if (pObj->dbUid == pDb->uid) {
7,040✔
1326
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
7,040✔
UNCOV
1327
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
1328
        sdbRelease(pSdb, pObj);
×
UNCOV
1329
        TAOS_RETURN(code);
×
1330
      }
1331
    }
1332
    sdbRelease(pSdb, pObj);
7,040✔
1333
  }
1334
#endif
1335
  TAOS_RETURN(code);
581,474✔
1336
}
1337

1338
int32_t mndDropRsmaByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
423,202✔
1339
  int32_t code = 0;
423,202✔
1340
#ifdef TD_ENTERPRISE
1341
  SSdb     *pSdb = pMnode->pSdb;
423,202✔
1342
  SRsmaObj *pObj = NULL;
423,202✔
1343
  void     *pIter = NULL;
423,202✔
1344

1345
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
423,906✔
1346
    if (pObj->tbUid == pStb->uid && pObj->dbUid == pStb->dbUid) {
704✔
1347
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
704✔
UNCOV
1348
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
1349
        sdbRelease(pSdb, pObj);
×
UNCOV
1350
        TAOS_RETURN(code);
×
1351
      }
1352
    }
1353
    sdbRelease(pSdb, pObj);
704✔
1354
  }
1355
#endif
1356
  TAOS_RETURN(code);
423,202✔
1357
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc