• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4944

30 Jan 2026 06:19AM UTC coverage: 66.849% (+0.1%) from 66.718%
#4944

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1124 of 2018 new or added lines in 72 files covered. (55.7%)

13677 existing lines in 155 files now uncovered.

205211 of 306978 relevant lines covered (66.85%)

125657591.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.49
/source/dnode/mnode/impl/src/mndRsma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndInfoSchema.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndShow.h"
25
#include "mndSma.h"
26
#include "mndStb.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "parser.h"
31
#include "tname.h"
32

33
#define MND_RSMA_VER_NUMBER   1
34
#define MND_RSMA_RESERVE_SIZE 64
35

36
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pSma);
37
static SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw);
38
static int32_t  mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pSma);
39
static int32_t  mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pSpSmatb);
40
static int32_t  mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew);
41
static int32_t  mndProcessCreateRsmaReq(SRpcMsg *pReq);
42
static int32_t  mndProcessDropRsmaReq(SRpcMsg *pReq);
43
static int32_t  mndProcessAlterRsmaReq(SRpcMsg *pReq);
44
static int32_t  mndProcessGetRsmaReq(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelRetrieveRsma(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveRsmaTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelRetrieveRsmaTask(SMnode *pMnode, void *pIter);
50

51
int32_t mndInitRsma(SMnode *pMnode) {
414,868✔
52
  SSdbTable table = {
414,868✔
53
      .sdbType = SDB_RSMA,
54
      .keyType = SDB_KEY_BINARY,
55
      .encodeFp = (SdbEncodeFp)mndRsmaActionEncode,
56
      .decodeFp = (SdbDecodeFp)mndRsmaActionDecode,
57
      .insertFp = (SdbInsertFp)mndRsmaActionInsert,
58
      .updateFp = (SdbUpdateFp)mndRsmaActionUpdate,
59
      .deleteFp = (SdbDeleteFp)mndRsmaActionDelete,
60
  };
61

62
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_RSMA, mndProcessCreateRsmaReq);
414,868✔
63
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_RSMA_RSP, mndTransProcessRsp);
414,868✔
64
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_RSMA, mndProcessDropRsmaReq);
414,868✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_RSMA_RSP, mndTransProcessRsp);
414,868✔
66
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_RSMA, mndProcessAlterRsmaReq);
414,868✔
67
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_RSMA_RSP, mndTransProcessRsp);
414,868✔
68
  mndSetMsgHandle(pMnode, TDMT_MND_GET_RSMA, mndProcessGetRsmaReq);
414,868✔
69
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndRetrieveRsma);
414,868✔
70
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndCancelRetrieveRsma);
414,868✔
71

72
  return sdbSetTable(pMnode->pSdb, table);
414,868✔
73
}
74

75
void mndCleanupRsma(SMnode *pMnode) {}
414,808✔
76

77
void mndRsmaFreeObj(SRsmaObj *pObj) {
63,468✔
78
  if (pObj) {
63,468✔
79
    taosMemoryFreeClear(pObj->funcColIds);
63,468✔
80
    taosMemoryFreeClear(pObj->funcIds);
63,468✔
81
  }
82
}
63,468✔
83

84
static int32_t tSerializeSRsmaObj(void *buf, int32_t bufLen, const SRsmaObj *pObj) {
97,416✔
85
  int32_t  code = 0, lino = 0;
97,416✔
86
  int32_t  tlen = 0;
97,416✔
87
  SEncoder encoder = {0};
97,416✔
88
  tEncoderInit(&encoder, buf, bufLen);
97,416✔
89

90
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
97,416✔
91

92
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->name));
194,832✔
93
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->tbName));
194,832✔
94
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbFName));
194,832✔
95
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->createUser));
194,832✔
96
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->createdTime));
194,832✔
97
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->updateTime));
194,832✔
98
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->uid));
194,832✔
99
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->tbUid));
194,832✔
100
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
194,832✔
101
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[0]));
194,832✔
102
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[1]));
194,832✔
103
  TAOS_CHECK_EXIT(tEncodeU64v(&encoder, pObj->reserved));
194,832✔
104
  TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->version));
194,832✔
105
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->tbType));
194,832✔
106
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->intervalUnit));
194,832✔
107
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nFuncs));
194,832✔
108
  for (int16_t i = 0; i < pObj->nFuncs; ++i) {
643,536✔
109
    TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->funcColIds[i]));
1,092,240✔
110
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->funcIds[i]));
1,092,240✔
111
  }
112
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->ownerId));
194,832✔
113

114
  tEndEncode(&encoder);
97,416✔
115

116
  tlen = encoder.pos;
97,416✔
117
_exit:
97,416✔
118
  tEncoderClear(&encoder);
97,416✔
119
  if (code < 0) {
97,416✔
120
    mError("rsma, %s failed at line %d since %s", __func__, lino, tstrerror(code));
×
121
    TAOS_RETURN(code);
×
122
  }
123

124
  return tlen;
97,416✔
125
}
126

127
static int32_t tDeserializeSRsmaObj(void *buf, int32_t bufLen, SRsmaObj *pObj) {
46,494✔
128
  int32_t  code = 0, lino = 0;
46,494✔
129
  SDecoder decoder = {0};
46,494✔
130
  tDecoderInit(&decoder, buf, bufLen);
46,494✔
131

132
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
46,494✔
133

134
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->name));
46,494✔
135
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->tbName));
46,494✔
136
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbFName));
46,494✔
137
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->createUser));
46,494✔
138
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->createdTime));
92,988✔
139
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->updateTime));
92,988✔
140
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->uid));
92,988✔
141
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->tbUid));
92,988✔
142
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
92,988✔
143
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[0]));
92,988✔
144
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[1]));
92,988✔
145
  TAOS_CHECK_EXIT(tDecodeU64v(&decoder, &pObj->reserved));
92,988✔
146
  TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->version));
92,988✔
147
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->tbType));
92,988✔
148
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->intervalUnit));
92,988✔
149
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nFuncs));
92,988✔
150
  if (pObj->nFuncs > 0) {
46,494✔
151
    if (!(pObj->funcColIds = taosMemoryMalloc(sizeof(col_id_t) * pObj->nFuncs))) {
46,494✔
152
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
153
    }
154
    if (!(pObj->funcIds = taosMemoryMalloc(sizeof(int32_t) * pObj->nFuncs))) {
46,494✔
155
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
156
    }
157
    for (int16_t i = 0; i < pObj->nFuncs; ++i) {
309,960✔
158
      TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->funcColIds[i]));
526,932✔
159
      TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->funcIds[i]));
526,932✔
160
    }
161
  }
162
  if (!tDecodeIsEnd(&decoder)) {
46,494✔
163
    TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->ownerId));
92,988✔
164
  }
165

166
_exit:
46,494✔
167
  tEndDecode(&decoder);
46,494✔
168
  tDecoderClear(&decoder);
46,494✔
169
  if (code < 0) {
46,494✔
170
    mError("rsma, %s failed at line %d since %s, row:%p", __func__, lino, tstrerror(code), pObj);
×
171
  }
172
  TAOS_RETURN(code);
46,494✔
173
}
174

175
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pObj) {
48,708✔
176
  int32_t  code = 0, lino = 0;
48,708✔
177
  void    *buf = NULL;
48,708✔
178
  SSdbRaw *pRaw = NULL;
48,708✔
179
  int32_t  tlen = tSerializeSRsmaObj(NULL, 0, pObj);
48,708✔
180
  if (tlen < 0) {
48,708✔
181
    TAOS_CHECK_EXIT(tlen);
×
182
  }
183

184
  int32_t size = sizeof(int32_t) + tlen;
48,708✔
185
  pRaw = sdbAllocRaw(SDB_RSMA, MND_RSMA_VER_NUMBER, size);
48,708✔
186
  if (pRaw == NULL) {
48,708✔
187
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
188
  }
189

190
  buf = taosMemoryMalloc(tlen);
48,708✔
191
  if (buf == NULL) {
48,708✔
192
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
193
  }
194

195
  tlen = tSerializeSRsmaObj(buf, tlen, pObj);
48,708✔
196
  if (tlen < 0) {
48,708✔
197
    TAOS_CHECK_EXIT(tlen);
×
198
  }
199

200
  int32_t dataPos = 0;
48,708✔
201
  SDB_SET_INT32(pRaw, dataPos, tlen, _exit);
48,708✔
202
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _exit);
48,708✔
203
  SDB_SET_DATALEN(pRaw, dataPos, _exit);
48,708✔
204

205
_exit:
48,708✔
206
  taosMemoryFreeClear(buf);
48,708✔
207
  if (code != TSDB_CODE_SUCCESS) {
48,708✔
208
    terrno = code;
×
209
    mError("rsma, failed at line %d to encode to raw:%p since %s", lino, pRaw, tstrerror(code));
×
210
    sdbFreeRaw(pRaw);
×
211
    return NULL;
×
212
  }
213

214
  mTrace("rsma, encode to raw:%p, row:%p", pRaw, pObj);
48,708✔
215
  return pRaw;
48,708✔
216
}
217

218
SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw) {
46,494✔
219
  int32_t   code = 0, lino = 0;
46,494✔
220
  SSdbRow  *pRow = NULL;
46,494✔
221
  SRsmaObj *pObj = NULL;
46,494✔
222
  void     *buf = NULL;
46,494✔
223

224
  int8_t sver = 0;
46,494✔
225
  TAOS_CHECK_EXIT(sdbGetRawSoftVer(pRaw, &sver));
46,494✔
226

227
  if (sver != MND_RSMA_VER_NUMBER) {
46,494✔
228
    mError("rsma read invalid ver, data ver: %d, curr ver: %d", sver, MND_RSMA_VER_NUMBER);
×
229
    TAOS_CHECK_EXIT(TSDB_CODE_SDB_INVALID_DATA_VER);
×
230
  }
231

232
  if (!(pRow = sdbAllocRow(sizeof(SRsmaObj)))) {
46,494✔
233
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
234
  }
235

236
  if (!(pObj = sdbGetRowObj(pRow))) {
46,494✔
237
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
238
  }
239

240
  int32_t tlen;
46,494✔
241
  int32_t dataPos = 0;
46,494✔
242
  SDB_GET_INT32(pRaw, dataPos, &tlen, _exit);
46,494✔
243
  buf = taosMemoryMalloc(tlen + 1);
46,494✔
244
  if (buf == NULL) {
46,494✔
245
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
246
  }
247
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _exit);
46,494✔
248

249
  TAOS_CHECK_EXIT(tDeserializeSRsmaObj(buf, tlen, pObj));
46,494✔
250

251
  taosInitRWLatch(&pObj->lock);
46,494✔
252

253
_exit:
46,494✔
254
  taosMemoryFreeClear(buf);
46,494✔
255
  if (code != TSDB_CODE_SUCCESS) {
46,494✔
256
    terrno = code;
×
257
    mError("rsma, failed at line %d to decode from raw:%p since %s", lino, pRaw, tstrerror(code));
×
258
    mndRsmaFreeObj(pObj);
×
259
    taosMemoryFreeClear(pRow);
×
260
    return NULL;
×
261
  }
262
  mTrace("rsma, decode from raw:%p, row:%p", pRaw, pObj);
46,494✔
263
  return pRow;
46,494✔
264
}
265

266
static int32_t mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pObj) {
11,070✔
267
  mTrace("rsma:%s, perform insert action, row:%p", pObj->name, pObj);
11,070✔
268
  return 0;
11,070✔
269
}
270

271
static int32_t mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pObj) {
46,494✔
272
  mTrace("rsma:%s, perform delete action, row:%p", pObj->name, pObj);
46,494✔
273
  mndRsmaFreeObj(pObj);
46,494✔
274
  return 0;
46,494✔
275
}
276

277
static int32_t mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew) {
25,092✔
278
  mTrace("rsma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
25,092✔
279
  taosWLockLatch(&pOld->lock);
25,092✔
280
  pOld->updateTime = pNew->updateTime;
25,092✔
281
  pOld->nFuncs = pNew->nFuncs;
25,092✔
282
  pOld->ownerId = pNew->ownerId;
25,092✔
283
  TSWAP(pOld->funcColIds, pNew->funcColIds);
25,092✔
284
  TSWAP(pOld->funcIds, pNew->funcIds);
25,092✔
285
  taosWUnLockLatch(&pOld->lock);
25,092✔
286
  return 0;
25,092✔
287
}
288

289
SRsmaObj *mndAcquireRsma(SMnode *pMnode, char *name) {
48,708✔
290
  SSdb     *pSdb = pMnode->pSdb;
48,708✔
291
  SRsmaObj *pObj = sdbAcquire(pSdb, SDB_RSMA, name);
48,708✔
292
  if (pObj == NULL) {
48,708✔
293
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
19,926✔
294
      terrno = TSDB_CODE_RSMA_NOT_EXIST;
19,926✔
295
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
296
      terrno = TSDB_CODE_MND_RSMA_IN_CREATING;
×
297
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
298
      terrno = TSDB_CODE_MND_RSMA_IN_DROPPING;
×
299
    } else {
300
      terrno = TSDB_CODE_APP_ERROR;
×
301
      mFatal("rsma:%s, failed to acquire rsma since %s", name, terrstr());
×
302
    }
303
  }
304
  return pObj;
48,708✔
305
}
306

307
void mndReleaseRsma(SMnode *pMnode, SRsmaObj *pSma) {
28,782✔
308
  SSdb *pSdb = pMnode->pSdb;
28,782✔
309
  sdbRelease(pSdb, pSma);
28,782✔
310
}
28,782✔
311
#ifdef TD_ENTERPRISE
312
static int32_t mndSetCreateRsmaRedoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
313
  int32_t  code = 0;
×
314
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
×
315
  if (pRedoRaw == NULL) {
×
316
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
317
    if (terrno != 0) code = terrno;
×
318
    TAOS_RETURN(code);
×
319
  }
320
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
321
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
322

323
  TAOS_RETURN(code);
×
324
}
325

326
static int32_t mndSetCreateRsmaUndoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
327
  int32_t  code = 0;
×
328
  SSdbRaw *pUndoRaw = mndRsmaActionEncode(pSma);
×
329
  if (!pUndoRaw) {
×
330
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
331
    if (terrno != 0) code = terrno;
×
332
    TAOS_RETURN(code);
×
333
  }
334
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
335
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
336
  TAOS_RETURN(code);
×
337
}
338

339
static int32_t mndSetCreateRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
11,070✔
340
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
11,070✔
341
  if (pDbRaw == NULL) return -1;
11,070✔
342

343
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
11,070✔
344
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
11,070✔
345
  return 0;
11,070✔
346
}
347

348
static void *mndBuildVCreateRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
22,140✔
349
                                    SMCreateRsmaReq *pCreate, int32_t *pContLen) {
350
  int32_t         code = 0, lino = 0;
22,140✔
351
  SMsgHead       *pHead = NULL;
22,140✔
352
  SVCreateRsmaReq req = *pCreate;
22,140✔
353

354
  req.uid = pObj->uid;  // use the uid generated by mnode
22,140✔
355

356
  int32_t contLen = tSerializeSVCreateRsmaReq(NULL, 0, &req);
22,140✔
357
  TAOS_CHECK_EXIT(contLen);
22,140✔
358
  contLen += sizeof(SMsgHead);
22,140✔
359
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
22,140✔
360
  pHead->contLen = htonl(contLen);
22,140✔
361
  pHead->vgId = htonl(pVgroup->vgId);
22,140✔
362
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
22,140✔
363
  TAOS_CHECK_EXIT(tSerializeSVCreateRsmaReq(pBuf, contLen, &req));
22,140✔
364
_exit:
22,140✔
365
  if (code < 0) {
22,140✔
366
    taosMemoryFreeClear(pHead);
×
367
    terrno = code;
×
368
    *pContLen = 0;
×
369
    return NULL;
×
370
  }
371
  *pContLen = contLen;
22,140✔
372
  return pHead;
22,140✔
373
}
374

375
static int32_t mndSetCreateRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
11,070✔
376
                                           SMCreateRsmaReq *pCreate) {
377
  int32_t code = 0;
11,070✔
378
  SSdb   *pSdb = pMnode->pSdb;
11,070✔
379
  SVgObj *pVgroup = NULL;
11,070✔
380
  void   *pIter = NULL;
11,070✔
381

382
  SName name = {0};
11,070✔
383
  if ((code = tNameFromString(&name, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
11,070✔
384
    return code;
×
385
  }
386
  tstrncpy(pCreate->tbFName, (char *)tNameGetTableName(&name), sizeof(pCreate->tbFName));  // convert tbFName to tbName
11,070✔
387

388
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
55,350✔
389
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
44,280✔
390
      sdbRelease(pSdb, pVgroup);
22,140✔
391
      continue;
22,140✔
392
    }
393

394
    int32_t contLen = 0;
22,140✔
395
    void   *pReq = mndBuildVCreateRsmaReq(pMnode, pVgroup, pStb, pObj, pCreate, &contLen);
22,140✔
396
    if (pReq == NULL) {
22,140✔
397
      sdbCancelFetch(pSdb, pIter);
×
398
      sdbRelease(pSdb, pVgroup);
×
399
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
400
      TAOS_RETURN(code);
×
401
    }
402

403
    STransAction action = {0};
22,140✔
404
    action.mTraceId = pTrans->mTraceId;
22,140✔
405
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
22,140✔
406
    action.pCont = pReq;
22,140✔
407
    action.contLen = contLen;
22,140✔
408
    action.msgType = TDMT_VND_CREATE_RSMA;
22,140✔
409
    action.acceptableCode = TSDB_CODE_RSMA_ALREADY_EXISTS;  // check whether the rsma uid exist
22,140✔
410
    action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;         // retry if relative table not exist
22,140✔
411
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
22,140✔
412
      taosMemoryFree(pReq);
×
413
      sdbCancelFetch(pSdb, pIter);
×
414
      sdbRelease(pSdb, pVgroup);
×
415
      TAOS_RETURN(code);
×
416
    }
417
    sdbRelease(pSdb, pVgroup);
22,140✔
418
  }
419

420
  TAOS_RETURN(code);
11,070✔
421
}
422

423
static int32_t mndSetCreateRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
16,974✔
424
  int32_t  code = 0;
16,974✔
425
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
16,974✔
426
  if (pCommitRaw == NULL) {
16,974✔
427
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
428
    if (terrno != 0) code = terrno;
×
429
    TAOS_RETURN(code);
×
430
  }
431
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
16,974✔
432
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
16,974✔
433

434
  TAOS_RETURN(code);
16,974✔
435
}
436

437
static int32_t mndSetDropRsmaPrepareLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
2,214✔
438
  int32_t  code = 0;
2,214✔
439
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
2,214✔
440
  if (pRedoRaw == NULL) {
2,214✔
441
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
442
    if (terrno != 0) code = terrno;
×
443
    return -1;
×
444
  }
445
  TAOS_CHECK_RETURN(mndTransAppendPrepareLog(pTrans, pRedoRaw));
2,214✔
446
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
2,214✔
447

448
  return 0;
2,214✔
449
}
450

451
static int32_t mndSetDropRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
10,332✔
452
  int32_t  code = 0;
10,332✔
453
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
10,332✔
454
  if (pCommitRaw == NULL) {
10,332✔
455
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
456
    if (terrno != 0) code = terrno;
×
457
    return -1;
×
458
  }
459
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
10,332✔
460
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
10,332✔
461

462
  return 0;
10,332✔
463
}
464

465
static void *mndBuildVDropRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SRsmaObj *pObj, int32_t *pContLen) {
4,428✔
466
  int32_t       code = 0, lino = 0;
4,428✔
467
  SMsgHead     *pHead = NULL;
4,428✔
468
  SVDropRsmaReq req = {0};
4,428✔
469

470
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
4,428✔
471
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
4,428✔
472
  req.tbType = pObj->tbType;
4,428✔
473
  req.uid = pObj->uid;
4,428✔
474
  req.tbUid = pObj->tbUid;
4,428✔
475

476
  int32_t contLen = tSerializeSVDropRsmaReq(NULL, 0, &req);
4,428✔
477
  TAOS_CHECK_EXIT(contLen);
4,428✔
478
  contLen += sizeof(SMsgHead);
4,428✔
479
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
4,428✔
480
  pHead->contLen = htonl(contLen);
4,428✔
481
  pHead->vgId = htonl(pVgroup->vgId);
4,428✔
482
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
4,428✔
483
  TAOS_CHECK_EXIT(tSerializeSVDropRsmaReq(pBuf, contLen, &req));
4,428✔
484
_exit:
4,428✔
485
  if (code < 0) {
4,428✔
486
    taosMemoryFreeClear(pHead);
×
487
    terrno = code;
×
488
    *pContLen = 0;
×
489
    return NULL;
×
490
  }
491
  *pContLen = contLen;
4,428✔
492
  return pHead;
4,428✔
493
}
494

495
static int32_t mndSetDropRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SRsmaObj *pSma) {
2,214✔
496
  int32_t code = 0;
2,214✔
497
  SSdb   *pSdb = pMnode->pSdb;
2,214✔
498
  SVgObj *pVgroup = NULL;
2,214✔
499
  void   *pIter = NULL;
2,214✔
500

501
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
11,070✔
502
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
8,856✔
503
      sdbRelease(pSdb, pVgroup);
4,428✔
504
      continue;
4,428✔
505
    }
506

507
    int32_t contLen = 0;
4,428✔
508
    void   *pReq = mndBuildVDropRsmaReq(pMnode, pVgroup, pSma, &contLen);
4,428✔
509
    if (pReq == NULL) {
4,428✔
510
      sdbCancelFetch(pSdb, pIter);
×
511
      sdbRelease(pSdb, pVgroup);
×
512
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
513
      TAOS_RETURN(code);
×
514
    }
515

516
    STransAction action = {0};
4,428✔
517
    action.mTraceId = pTrans->mTraceId;
4,428✔
518
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
4,428✔
519
    action.pCont = pReq;
4,428✔
520
    action.contLen = contLen;
4,428✔
521
    action.msgType = TDMT_VND_DROP_RSMA;
4,428✔
522
    action.acceptableCode = TSDB_CODE_RSMA_NOT_EXIST;
4,428✔
523
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
4,428✔
524
      taosMemoryFree(pReq);
×
525
      sdbCancelFetch(pSdb, pIter);
×
526
      sdbRelease(pSdb, pVgroup);
×
527
      TAOS_RETURN(code);
×
528
    }
529
    sdbRelease(pSdb, pVgroup);
4,428✔
530
  }
531
  TAOS_RETURN(code);
2,214✔
532
}
533

534
static int32_t mndDropRsma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SRsmaObj *pObj) {
2,214✔
535
  int32_t code = 0, lino = 0;
2,214✔
536

537
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-rsma");
2,214✔
538
  if (pTrans == NULL) {
2,214✔
539
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
540
    if (terrno != 0) code = terrno;
×
541
    goto _exit;
×
542
  }
543

544
  mInfo("trans:%d start to drop rsma:%s", pTrans->id, pObj->name);
2,214✔
545

546
  mndTransSetDbName(pTrans, pDb->name, pObj->name);
2,214✔
547
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
2,214✔
548
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
2,214✔
549

550
  mndTransSetOper(pTrans, MND_OPER_DROP_RSMA);
2,214✔
551
  TAOS_CHECK_EXIT(mndSetDropRsmaPrepareLogs(pMnode, pTrans, pObj));
2,214✔
552
  TAOS_CHECK_EXIT(mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj));
2,214✔
553
  TAOS_CHECK_EXIT(mndSetDropRsmaRedoActions(pMnode, pTrans, pDb, pObj));
2,214✔
554

555
  // int32_t rspLen = 0;
556
  // void   *pRsp = NULL;
557
  // TAOS_CHECK_EXIT(mndBuildDropRsmaRsp(pObj, &rspLen, &pRsp, false));
558
  // mndTransSetRpcRsp(pTrans, pRsp, rspLen);
559

560
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
2,214✔
561
_exit:
2,214✔
562
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,214✔
563
    mError("rsma:%s, failed to drop at line:%d since %s", pObj->name, lino, tstrerror(code));
×
564
  }
565
  mndTransDrop(pTrans);
2,214✔
566
  TAOS_RETURN(code);
2,214✔
567
}
568
#endif
569
static int32_t mndProcessDropRsmaReq(SRpcMsg *pReq) {
2,214✔
570
  SMnode *pMnode = pReq->info.node;
2,214✔
571
  int32_t code = 0, lino = 0;
2,214✔
572
#ifdef TD_ENTERPRISE
573
  SDbObj       *pDb = NULL;
2,214✔
574
  SRsmaObj     *pObj = NULL;
2,214✔
575
  SUserObj     *pUser = NULL;
2,214✔
576
  SMDropRsmaReq dropReq = {0};
2,214✔
577
  int64_t       tss = taosGetTimestampMs();
2,214✔
578

579
  TAOS_CHECK_GOTO(tDeserializeSMDropRsmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _exit);
2,214✔
580

581
  mInfo("rsma:%s, start to drop", dropReq.name);
2,214✔
582

583
  pObj = mndAcquireRsma(pMnode, dropReq.name);
2,214✔
584
  if (pObj == NULL) {
2,214✔
585
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
586
    if (terrno != 0) code = terrno;
×
587
    if (dropReq.igNotExists) {
×
588
      code = 0;  // mndBuildDropMountRsp(pObj, &pReq->info.rspLen, &pReq->info.rsp, true);
×
589
    }
590
    goto _exit;
×
591
  }
592

593
  SName name = {0};
2,214✔
594
  TAOS_CHECK_EXIT(tNameFromString(&name, pObj->dbFName, T_NAME_ACCT | T_NAME_DB));
2,214✔
595
  if (!(pDb = mndAcquireDb(pMnode, pObj->dbFName))) {
2,214✔
596
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
597
  }
598

599
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
2,214✔
600

601
  // TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb), NULL, _exit);
602
  TAOS_CHECK_EXIT(
2,214✔
603
      mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_DROP, PRIV_OBJ_RSMA, pObj->ownerId, pObj->dbFName, pObj->name));
604

605
  code = mndDropRsma(pMnode, pReq, pDb, pObj);
2,214✔
606
  if (code == TSDB_CODE_SUCCESS) {
2,214✔
607
    code = TSDB_CODE_ACTION_IN_PROGRESS;
2,214✔
608
  }
609

610
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
2,214✔
611
    int64_t tse = taosGetTimestampMs();
2,214✔
612
    double  duration = (double)(tse - tss);
2,214✔
613
    duration = duration / 1000;
2,214✔
614
    auditRecord(pReq, pMnode->clusterId, "dropRsma", dropReq.name, "", "", 0, duration, 0);
2,214✔
615
  }
616
_exit:
2,214✔
617
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,214✔
618
    mError("rsma:%s, failed at line %d to drop since %s", dropReq.name, lino, tstrerror(code));
×
619
  }
620

621
  mndReleaseDb(pMnode, pDb);
2,214✔
622
  mndReleaseRsma(pMnode, pObj);
2,214✔
623
  mndReleaseUser(pMnode, pUser);
2,214✔
624
#endif
625
  TAOS_RETURN(code);
2,214✔
626
}
627
#ifdef TD_ENTERPRISE
628
static int32_t mndCreateRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
11,070✔
629
                             SMCreateRsmaReq *pCreate) {
630
  int32_t  code = 0, lino = 0;
11,070✔
631
  SRsmaObj obj = {0};
11,070✔
632
  STrans  *pTrans = NULL;
11,070✔
633

634
  (void)snprintf(obj.name, TSDB_TABLE_NAME_LEN, "%s", pCreate->name);
11,070✔
635
  (void)snprintf(obj.dbFName, TSDB_DB_FNAME_LEN, "%s", pDb->name);
11,070✔
636

637
  const char *tbName = strrchr(pCreate->tbFName, '.');
11,070✔
638
  (void)snprintf(obj.tbName, TSDB_TABLE_NAME_LEN, "%s", tbName ? tbName + 1 : pCreate->tbFName);
11,070✔
639
  (void)snprintf(obj.createUser, TSDB_USER_LEN, "%s", pUser->user);
11,070✔
640
  obj.ownerId = pUser->uid;
11,070✔
641
  obj.createdTime = taosGetTimestampMs();
11,070✔
642
  obj.updateTime = obj.createdTime;
11,070✔
643
  obj.uid = mndGenerateUid(obj.name, strlen(obj.name));
11,070✔
644
  obj.tbUid = pCreate->tbUid;
11,070✔
645
  obj.dbUid = pDb->uid;
11,070✔
646
  obj.interval[0] = pCreate->interval[0];
11,070✔
647
  obj.interval[1] = pCreate->interval[1];
11,070✔
648
  obj.version = 1;
11,070✔
649
  obj.tbType = pCreate->tbType;  // ETableType: 1 stable. Only super table supported currently.
11,070✔
650
  obj.intervalUnit = pCreate->intervalUnit;
11,070✔
651
  obj.nFuncs = pCreate->nFuncs;
11,070✔
652
  if (obj.nFuncs > 0) {
11,070✔
653
    TSDB_CHECK_NULL((obj.funcColIds = taosMemoryCalloc(obj.nFuncs, sizeof(col_id_t))), code, lino, _exit, terrno);
11,070✔
654
    TSDB_CHECK_NULL((obj.funcIds = taosMemoryCalloc(obj.nFuncs, sizeof(func_id_t))), code, lino, _exit, terrno);
11,070✔
655
    for (int16_t i = 0; i < obj.nFuncs; ++i) {
72,324✔
656
      obj.funcColIds[i] = pCreate->funcColIds[i];
61,254✔
657
      obj.funcIds[i] = pCreate->funcIds[i];
61,254✔
658
    }
659
  }
660

661
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-rsma")),
11,070✔
662
                  code, lino, _exit, terrno);
663
  mInfo("trans:%d, used to create rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
11,070✔
664

665
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
11,070✔
666
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
11,070✔
667
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
11,070✔
668

669
  mndTransSetOper(pTrans, MND_OPER_CREATE_RSMA);
11,070✔
670
  TAOS_CHECK_EXIT(mndSetCreateRsmaPrepareActions(pMnode, pTrans, &obj));
11,070✔
671
  TAOS_CHECK_EXIT(mndSetCreateRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pCreate));
11,070✔
672
  TAOS_CHECK_EXIT(mndSetCreateRsmaCommitLogs(pMnode, pTrans, &obj));
11,070✔
673
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
11,070✔
674
_exit:
11,070✔
675
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11,070✔
676
    mError("rsma:%s, failed at line %d to create rsma, since %s", obj.name, lino, tstrerror(code));
×
677
  }
678
  mndTransDrop(pTrans);
11,070✔
679
  mndRsmaFreeObj(&obj);
11,070✔
680
  TAOS_RETURN(code);
11,070✔
681
}
682

683
static int32_t mndCheckCreateRsmaReq(SMCreateRsmaReq *pCreate) {
18,450✔
684
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
18,450✔
685
  if (pCreate->name[0] == 0) goto _exit;
18,450✔
686
  if (pCreate->tbFName[0] == 0) goto _exit;
18,450✔
687
  if (pCreate->igExists < 0 || pCreate->igExists > 1) goto _exit;
18,450✔
688
  if (pCreate->intervalUnit < 0) goto _exit;
18,450✔
689
  if (pCreate->interval[0] < 0) goto _exit;
18,450✔
690
  if (pCreate->interval[1] < 0) goto _exit;
18,450✔
691
  if (pCreate->interval[0] == 0 && pCreate->interval[1] == 0) goto _exit;
18,450✔
692

693
  SName fname = {0};
18,450✔
694
  if ((code = tNameFromString(&fname, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) < 0) goto _exit;
18,450✔
695
  if (*(char *)tNameGetTableName(&fname) == 0) goto _exit;
18,450✔
696
  code = 0;
18,450✔
697
_exit:
18,450✔
698
  TAOS_RETURN(code);
18,450✔
699
}
700

701
static int32_t mndCheckRsmaConflicts(SMnode *pMnode, SDbObj *pDbObj, SMCreateRsmaReq *pCreate) {
14,760✔
702
  void     *pIter = NULL;
14,760✔
703
  SSdb     *pSdb = pMnode->pSdb;
14,760✔
704
  SRsmaObj *pObj = NULL;
14,760✔
705
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
20,664✔
706
    if (pObj->tbUid == pCreate->tbUid && pObj->dbUid == pDbObj->uid) {
9,594✔
707
      sdbCancelFetch(pSdb, (pIter));
3,690✔
708
      sdbRelease(pSdb, pObj);
3,690✔
709
      mError("rsma:%s, conflict with existing rsma %s on same table %s.%s:%" PRIi64, pCreate->name, pObj->name,
3,690✔
710
             pObj->dbFName, pObj->tbName, pObj->tbUid);
711
      return TSDB_CODE_MND_RSMA_EXIST_IN_TABLE;
3,690✔
712
    }
713
    sdbRelease(pSdb, pObj);
5,904✔
714
  }
715
  return 0;
11,070✔
716
}
717
#endif
718
static int32_t mndProcessCreateRsmaReq(SRpcMsg *pReq) {
18,450✔
719
  int32_t code = 0, lino = 0;
18,450✔
720
#ifdef TD_ENTERPRISE
721
  SMnode         *pMnode = pReq->info.node;
18,450✔
722
  SDbObj         *pDb = NULL;
18,450✔
723
  SStbObj        *pStb = NULL;
18,450✔
724
  SRsmaObj       *pSma = NULL;
18,450✔
725
  SUserObj       *pUser = NULL;
18,450✔
726
  int64_t         mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
18,450✔
727
  SMCreateRsmaReq createReq = {0};
18,450✔
728
  int64_t         tss = taosGetTimestampMs();
18,450✔
729

730
  TAOS_CHECK_EXIT(tDeserializeSMCreateRsmaReq(pReq->pCont, pReq->contLen, &createReq));
18,450✔
731

732
  mInfo("start to create rsma: %s", createReq.name);
18,450✔
733
  TAOS_CHECK_EXIT(mndCheckCreateRsmaReq(&createReq));
18,450✔
734

735
  if ((pSma = mndAcquireRsma(pMnode, createReq.name))) {
18,450✔
736
    if (createReq.igExists) {
3,690✔
737
      mInfo("rsma:%s, already exist, ignore exist is set", createReq.name);
×
738
      code = 0;
×
739
      goto _exit;
×
740
    } else {
741
      TAOS_CHECK_EXIT(TSDB_CODE_RSMA_ALREADY_EXISTS);
3,690✔
742
    }
743
  } else {
744
    if ((code = terrno) == TSDB_CODE_RSMA_NOT_EXIST) {
14,760✔
745
      // continue
746
    } else {  // TSDB_CODE_MND_RSMA_IN_CREATING | TSDB_CODE_MND_RSMA_IN_DROPPING | TSDB_CODE_APP_ERROR
747
      goto _exit;
×
748
    }
749
  }
750

751
  SName name = {0};
14,760✔
752
  TAOS_CHECK_EXIT(tNameFromString(&name, createReq.tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
14,760✔
753
  char db[TSDB_TABLE_FNAME_LEN] = {0};
14,760✔
754
  (void)tNameGetFullDbName(&name, db);
14,760✔
755

756
  pDb = mndAcquireDb(pMnode, db);
14,760✔
757
  if (pDb == NULL) {
14,760✔
758
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
759
  }
760

761
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
14,760✔
762

763
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_READ_DB, pDb));
764
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb));
765

766
  // already check select table/insert table/create rsma privileges in parser
767
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pDb));
14,760✔
768

769
  pStb = mndAcquireStb(pMnode, createReq.tbFName);
14,760✔
770
  if (pStb == NULL) {
14,760✔
UNCOV
771
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
772
  }
773

774
  TAOS_CHECK_EXIT(mndCheckRsmaConflicts(pMnode, pDb, &createReq));
14,760✔
775

776
  TAOS_CHECK_EXIT(mndCreateRsma(pMnode, pReq, pUser, pDb, pStb, &createReq));
11,070✔
777

778
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
11,070✔
779

780
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
11,070✔
781
    int64_t tse = taosGetTimestampMs();
11,070✔
782
    double  duration = (double)(tse - tss);
11,070✔
783
    duration = duration / 1000;
11,070✔
784
    auditRecord(pReq, pMnode->clusterId, "createRsma", createReq.name, createReq.tbFName, "", 0, duration, 0);
11,070✔
785
  }
786
_exit:
18,450✔
787
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
18,450✔
788
    mError("rsma:%s, failed at line %d to create since %s", createReq.name, lino, tstrerror(code));
7,380✔
789
  }
790
  if (pSma) mndReleaseRsma(pMnode, pSma);
18,450✔
791
  if (pStb) mndReleaseStb(pMnode, pStb);
18,450✔
792
  if (pDb) mndReleaseDb(pMnode, pDb);
18,450✔
793
  if (pUser) mndReleaseUser(pMnode, pUser);
18,450✔
794
  tFreeSMCreateRsmaReq(&createReq);
18,450✔
795
#endif
796
  TAOS_RETURN(code);
18,450✔
797
}
798

799
#ifdef TD_ENTERPRISE
800
static int32_t mndCheckAlterRsmaReq(SMAlterRsmaReq *pReq) {
5,904✔
801
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
5,904✔
802
  if (pReq->name[0] == 0) goto _exit;
5,904✔
803
  if (pReq->igNotExists < 0 || pReq->igNotExists > 1) goto _exit;
5,904✔
804

805
  code = 0;
5,904✔
806
_exit:
5,904✔
807
  TAOS_RETURN(code);
5,904✔
808
}
809

810
static int32_t mndSetAlterRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
5,904✔
811
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
5,904✔
812
  if (pDbRaw == NULL) return -1;
5,904✔
813

814
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
5,904✔
815
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_READY) != 0) return -1;
5,904✔
816
  return 0;
5,904✔
817
}
818

819
static int32_t mndSetAlterRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
5,904✔
820
  return mndSetCreateRsmaCommitLogs(pMnode, pTrans, pSma);
5,904✔
821
}
822

823
static void *mndBuildVAlterRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
11,808✔
824
                                   SMAlterRsmaReq *pAlter, int32_t *pContLen) {
825
  int32_t        code = 0, lino = 0;
11,808✔
826
  SMsgHead      *pHead = NULL;
11,808✔
827
  SVAlterRsmaReq req = {0};
11,808✔
828
  req.alterType = pAlter->alterType;
11,808✔
829
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
11,808✔
830
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
11,808✔
831
  req.tbType = pObj->tbType;
11,808✔
832
  req.intervalUnit = pObj->intervalUnit;
11,808✔
833
  req.interval[0] = pObj->interval[0];
11,808✔
834
  req.interval[1] = pObj->interval[1];
11,808✔
835
  req.tbUid = pObj->tbUid;
11,808✔
836
  req.uid = pObj->uid;
11,808✔
837
  req.nFuncs = pObj->nFuncs;
11,808✔
838
  req.funcColIds = pObj->funcColIds;
11,808✔
839
  req.funcIds = pObj->funcIds;
11,808✔
840

841
  int32_t contLen = tSerializeSVAlterRsmaReq(NULL, 0, &req);
11,808✔
842
  TAOS_CHECK_EXIT(contLen);
11,808✔
843
  contLen += sizeof(SMsgHead);
11,808✔
844
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
11,808✔
845
  pHead->contLen = htonl(contLen);
11,808✔
846
  pHead->vgId = htonl(pVgroup->vgId);
11,808✔
847
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
11,808✔
848
  TAOS_CHECK_EXIT(tSerializeSVAlterRsmaReq(pBuf, contLen, &req));
11,808✔
849
_exit:
11,808✔
850
  if (code < 0) {
11,808✔
851
    taosMemoryFreeClear(pHead);
×
852
    terrno = code;
×
853
    *pContLen = 0;
×
UNCOV
854
    return NULL;
×
855
  }
856
  *pContLen = contLen;
11,808✔
857
  return pHead;
11,808✔
858
}
859

860
static int32_t mndSetAlterRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
5,904✔
861
                                          SMAlterRsmaReq *pAlter) {
862
  int32_t code = 0;
5,904✔
863
  SSdb   *pSdb = pMnode->pSdb;
5,904✔
864
  SVgObj *pVgroup = NULL;
5,904✔
865
  void   *pIter = NULL;
5,904✔
866

867
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
29,520✔
868
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
23,616✔
869
      sdbRelease(pSdb, pVgroup);
11,808✔
870
      continue;
11,808✔
871
    }
872

873
    int32_t contLen = 0;
11,808✔
874
    void   *pReq = mndBuildVAlterRsmaReq(pMnode, pVgroup, pStb, pObj, pAlter, &contLen);
11,808✔
875
    if (pReq == NULL) {
11,808✔
876
      sdbCancelFetch(pSdb, pIter);
×
877
      sdbRelease(pSdb, pVgroup);
×
878
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
879
      TAOS_RETURN(code);
×
880
    }
881

882
    STransAction action = {0};
11,808✔
883
    action.mTraceId = pTrans->mTraceId;
11,808✔
884
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
11,808✔
885
    action.pCont = pReq;
11,808✔
886
    action.contLen = contLen;
11,808✔
887
    action.msgType = TDMT_VND_ALTER_RSMA;
11,808✔
888
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
11,808✔
889
      taosMemoryFree(pReq);
×
890
      sdbCancelFetch(pSdb, pIter);
×
891
      sdbRelease(pSdb, pVgroup);
×
UNCOV
892
      TAOS_RETURN(code);
×
893
    }
894
    sdbRelease(pSdb, pVgroup);
11,808✔
895
  }
896

897
  TAOS_RETURN(code);
5,904✔
898
}
899

900
static int32_t mndAlterRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
5,904✔
901
                            SMAlterRsmaReq *pAlter, SRsmaObj *pOld) {
902
  int32_t  code = 0, lino = 0;
5,904✔
903
  STrans  *pTrans = NULL;
5,904✔
904
  SRsmaObj obj = *pOld;
5,904✔
905

906
  obj.updateTime = taosGetTimestampMs();
5,904✔
907
  ++obj.version;
5,904✔
908
  if (pAlter->alterType == TSDB_ALTER_RSMA_FUNCTION) {
5,904✔
909
    obj.nFuncs = pOld->nFuncs + pAlter->nFuncs;
5,904✔
910
    obj.funcColIds = taosMemoryMalloc(obj.nFuncs * sizeof(col_id_t));
5,904✔
911
    obj.funcIds = taosMemoryMalloc(obj.nFuncs * sizeof(func_id_t));
5,904✔
912
    if (obj.funcColIds == NULL || obj.funcIds == NULL) {
5,904✔
UNCOV
913
      TAOS_CHECK_EXIT(terrno);
×
914
    }
915
    int32_t n = 0, i = 0, j = 0;
5,904✔
916
    while (i < pOld->nFuncs && j < pAlter->nFuncs) {
31,734✔
917
      if (pOld->funcColIds[i] < pAlter->funcColIds[j]) {
25,830✔
918
        obj.funcColIds[n] = pOld->funcColIds[i];
22,140✔
919
        obj.funcIds[n++] = pOld->funcIds[i++];
22,140✔
920
      } else if (pOld->funcColIds[i] > pAlter->funcColIds[j]) {
3,690✔
921
        obj.funcColIds[n] = pAlter->funcColIds[j];
3,690✔
922
        obj.funcIds[n++] = pAlter->funcIds[j++];
3,690✔
923
      } else {
924
        mError("rsma:%s, conflict function on column id:%d", pOld->name, pAlter->funcColIds[j]);
×
UNCOV
925
        TAOS_CHECK_EXIT(TSDB_CODE_MND_RSMA_FUNC_CONFLICT);
×
926
      }
927
    }
928
    if (i < pOld->nFuncs) {
5,904✔
929
      while (i < pOld->nFuncs) {
7,380✔
930
        obj.funcColIds[n] = pOld->funcColIds[i];
3,690✔
931
        obj.funcIds[n++] = pOld->funcIds[i++];
3,690✔
932
      }
933
    } else if (j < pAlter->nFuncs) {
2,214✔
934
      while (j < pAlter->nFuncs) {
4,428✔
935
        obj.funcColIds[n] = pAlter->funcColIds[j];
2,214✔
936
        obj.funcIds[n++] = pAlter->funcIds[j++];
2,214✔
937
      }
938
    }
939
  } else {
UNCOV
940
    TAOS_CHECK_EXIT(TSDB_CODE_OPS_NOT_SUPPORT);
×
941
  }
942

943
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-rsma")), code,
5,904✔
944
                  lino, _exit, terrno);
945
  mInfo("trans:%d, used to alter rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
5,904✔
946

947
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
5,904✔
948
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
5,904✔
949
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
5,904✔
950

951
  mndTransSetOper(pTrans, MND_OPER_ALTER_RSMA);
5,904✔
952
  TAOS_CHECK_EXIT(mndSetAlterRsmaPrepareActions(pMnode, pTrans, &obj));
5,904✔
953
  TAOS_CHECK_EXIT(mndSetAlterRsmaCommitLogs(pMnode, pTrans, &obj));
5,904✔
954
  TAOS_CHECK_EXIT(mndSetAlterRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pAlter));
5,904✔
955

956
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
5,904✔
957
_exit:
5,904✔
958
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
5,904✔
UNCOV
959
    mError("rsma:%s, failed at line %d to alter rsma, since %s", obj.name, lino, tstrerror(code));
×
960
  }
961
  mndTransDrop(pTrans);
5,904✔
962
  mndRsmaFreeObj(&obj);
5,904✔
963
  TAOS_RETURN(code);
5,904✔
964
}
965
#endif
966
static int32_t mndProcessAlterRsmaReq(SRpcMsg *pReq) {
5,904✔
967
  int32_t code = 0, lino = 0;
5,904✔
968
#ifdef TD_ENTERPRISE
969
  SMnode        *pMnode = pReq->info.node;
5,904✔
970
  SDbObj        *pDb = NULL;
5,904✔
971
  SStbObj       *pStb = NULL;
5,904✔
972
  SRsmaObj      *pObj = NULL;
5,904✔
973
  SUserObj      *pUser = NULL;
5,904✔
974
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
5,904✔
975
  SMAlterRsmaReq req = {0};
5,904✔
976
  char           tbFName[TSDB_TABLE_FNAME_LEN] = "\0";
5,904✔
977
  int64_t        tss = taosGetTimestampMs();
5,904✔
978

979
  TAOS_CHECK_EXIT(tDeserializeSMAlterRsmaReq(pReq->pCont, pReq->contLen, &req));
5,904✔
980

981
  mInfo("start to alter rsma: %s", req.name);
5,904✔
982
  TAOS_CHECK_EXIT(mndCheckAlterRsmaReq(&req));
5,904✔
983

984
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
5,904✔
985
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
986
    if (terrno != 0) code = terrno;
×
987
    if (req.igNotExists) {
×
UNCOV
988
      code = 0;
×
989
    }
UNCOV
990
    goto _exit;
×
991
  }
992

993
  if (!(pDb = mndAcquireDb(pMnode, pObj->dbFName))) {
5,904✔
UNCOV
994
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
995
  }
996

997
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
5,904✔
998

999
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_READ_DB, pDb));
1000
  // TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb));
1001
  TAOS_CHECK_EXIT(
5,904✔
1002
      mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_ALTER, PRIV_OBJ_RSMA, pObj->ownerId, pObj->dbFName, pObj->name));
1003

1004
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
5,904✔
1005

1006
  pStb = mndAcquireStb(pMnode, tbFName);
5,904✔
1007
  if (pStb == NULL) {
5,904✔
UNCOV
1008
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
1009
  }
1010

1011
  TAOS_CHECK_EXIT(mndAlterRsma(pMnode, pReq, pUser, pDb, pStb, &req, pObj));
5,904✔
1012

1013
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
5,904✔
1014

1015
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
5,904✔
1016
    char alterType[32] = "\0";
5,904✔
1017
    (void)snprintf(alterType, sizeof(alterType), "alterType:%" PRIi8, req.alterType);
5,904✔
1018
    int64_t tse = taosGetTimestampMs();
5,904✔
1019
    double  duration = (double)(tse - tss);
5,904✔
1020
    duration = duration / 1000;
5,904✔
1021
    auditRecord(pReq, pMnode->clusterId, "alterRsma", req.name, tbFName, alterType, 0, duration, 0);
5,904✔
1022
  }
1023
_exit:
5,904✔
1024
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
5,904✔
UNCOV
1025
    mError("rsma:%s, failed at line %d to alter since %s", req.name, lino, tstrerror(code));
×
1026
  }
1027
  if (pObj) mndReleaseRsma(pMnode, pObj);
5,904✔
1028
  if (pStb) mndReleaseStb(pMnode, pStb);
5,904✔
1029
  if (pDb) mndReleaseDb(pMnode, pDb);
5,904✔
1030
  if (pUser) mndReleaseUser(pMnode, pUser);
5,904✔
1031
  tFreeSMAlterRsmaReq(&req);
5,904✔
1032
#endif
1033
  TAOS_RETURN(code);
5,904✔
1034
}
1035
#ifdef TD_ENTERPRISE
1036
static int32_t mndFillRsmaInfo(SRsmaObj *pObj, SStbObj *pStb, SRsmaInfoRsp *pRsp, bool withColName) {
16,974✔
1037
  int32_t code = 0, lino = 0;
16,974✔
1038
  pRsp->id = pObj->uid;
16,974✔
1039
  (void)snprintf(pRsp->name, sizeof(pRsp->name), "%s", pObj->name);
16,974✔
1040
  (void)snprintf(pRsp->tbFName, sizeof(pRsp->tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
16,974✔
1041
  pRsp->ownerId = pObj->ownerId;
16,974✔
1042
  pRsp->version = pObj->version;
16,974✔
1043
  pRsp->tbType = pObj->tbType;
16,974✔
1044
  pRsp->intervalUnit = pObj->intervalUnit;
16,974✔
1045
  pRsp->nFuncs = pObj->nFuncs;
16,974✔
1046
  pRsp->interval[0] = pObj->interval[0];
16,974✔
1047
  pRsp->interval[1] = pObj->interval[1];
16,974✔
1048
  if (pRsp->nFuncs > 0) {
16,974✔
1049
    pRsp->funcColIds = pObj->funcColIds;  // shallow copy, no need to free
16,974✔
1050
    pRsp->funcIds = pObj->funcIds;        // shallow copy, no need to free
16,974✔
1051
    if (withColName) {
16,974✔
1052
      pRsp->colNames = taosArrayInit(pRsp->nFuncs, sizeof(char *));
16,974✔
1053
      if (pRsp->colNames == NULL) {
16,974✔
UNCOV
1054
        TAOS_CHECK_EXIT(terrno);
×
1055
      }
1056
      pRsp->nColNames = pRsp->nFuncs;
16,974✔
1057
      int16_t i = 0, j = 0;
16,974✔
1058
      for (; i < pRsp->nFuncs; ++i) {
100,368✔
1059
        bool found = false;
83,394✔
1060
        for (; j < pStb->numOfColumns;) {
178,596✔
1061
          if (pStb->pColumns[j].colId == pRsp->funcColIds[i]) {
178,596✔
1062
            found = true;
83,394✔
1063
            break;
83,394✔
1064
          } else if (pStb->pColumns[j].colId < pRsp->funcColIds[i]) {
95,202✔
1065
            ++j;
95,202✔
1066
          } else {
UNCOV
1067
            break;
×
1068
          }
1069
        }
1070
        if (found) {
83,394✔
1071
          SSchema *pCol = pStb->pColumns + j;
83,394✔
1072
          char    *colName = taosStrdup(pCol->name);
83,394✔
1073
          if (colName == NULL) {
83,394✔
UNCOV
1074
            TAOS_CHECK_EXIT(terrno);
×
1075
          }
1076
          if (!taosArrayPush(pRsp->colNames, &colName)) {
166,788✔
1077
            taosMemoryFree(colName);
×
UNCOV
1078
            TAOS_CHECK_EXIT(terrno);
×
1079
          }
1080
        } else {
UNCOV
1081
          TAOS_CHECK_EXIT(TSDB_CODE_MND_COLUMN_NOT_EXIST);
×
1082
        }
1083
      }
1084
    }
1085
  }
1086
_exit:
16,974✔
1087
  if (code != 0) {
16,974✔
UNCOV
1088
    mError("rsma:%s, failed at line %d to get rsma info since %s", pObj->name, lino, tstrerror(code));
×
1089
  }
1090
  TAOS_RETURN(code);
16,974✔
1091
}
1092
#endif
1093
static int32_t mndProcessGetRsmaReq(SRpcMsg *pReq) {
22,140✔
1094
#ifdef TD_ENTERPRISE
1095
  int32_t      code = 0, lino = 0;
22,140✔
1096
  SMnode      *pMnode = pReq->info.node;
22,140✔
1097
  SRsmaInfoReq req = {0};
22,140✔
1098
  SRsmaInfoRsp rsp = {0};
22,140✔
1099
  SRsmaObj    *pObj = NULL;
22,140✔
1100
  SStbObj     *pStb = NULL;
22,140✔
1101
  void        *pRsp = NULL;
22,140✔
1102
  int32_t      contLen = 0;
22,140✔
1103

1104
  TAOS_CHECK_EXIT(tDeserializeRsmaInfoReq(pReq->pCont, pReq->contLen, &req));
22,140✔
1105

1106
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
22,140✔
1107
    TAOS_CHECK_EXIT(terrno);
5,166✔
1108
  }
1109

1110
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
16,974✔
1111
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
16,974✔
1112

1113
  if ((pStb = mndAcquireStb(pMnode, tbFName)) == NULL) {
16,974✔
UNCOV
1114
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
1115
  }
1116

1117
  TAOS_CHECK_EXIT(mndFillRsmaInfo(pObj, pStb, &rsp, req.withColName));
16,974✔
1118

1119
  if ((contLen = tSerializeRsmaInfoRsp(NULL, 0, &rsp)) < 0) {
16,974✔
UNCOV
1120
    TAOS_CHECK_EXIT(contLen);
×
1121
  }
1122
  if (!(pRsp = rpcMallocCont(contLen))) {
16,974✔
UNCOV
1123
    TAOS_CHECK_EXIT(terrno);
×
1124
  }
1125
  if ((contLen = tSerializeRsmaInfoRsp(pRsp, contLen, &rsp)) < 0) {
16,974✔
UNCOV
1126
    TAOS_CHECK_EXIT(contLen);
×
1127
  }
1128

1129
  pReq->info.rsp = pRsp;
16,974✔
1130
  pReq->info.rspLen = contLen;
16,974✔
1131

1132
_exit:
22,140✔
1133
  if (code != 0) {
22,140✔
1134
    rpcFreeCont(pRsp);
5,166✔
1135
  }
1136
  if (pObj) mndReleaseRsma(pMnode, pObj);
22,140✔
1137
  if (pStb) mndReleaseStb(pMnode, pStb);
22,140✔
1138
  tFreeRsmaInfoRsp(&rsp, false);
22,140✔
1139
  TAOS_RETURN(code);
22,140✔
1140
#else
1141
  return TSDB_CODE_OPS_NOT_SUPPORT;
1142
#endif
1143
}
1144
#ifdef TD_ENTERPRISE
1145
static void mndRetrieveRsmaFuncList(SMnode *pMnode, SRsmaObj *pObj, char *buf, int32_t bufLen) {
23,616✔
1146
  SSdb    *pSdb = pMnode->pSdb;
23,616✔
1147
  int32_t  numOfRows = 0;
23,616✔
1148
  SStbObj *pStb = NULL;
23,616✔
1149
  char    *qBuf = POINTER_SHIFT(buf, VARSTR_HEADER_SIZE);
23,616✔
1150
  int32_t  qBufLen = bufLen - VARSTR_HEADER_SIZE;
23,616✔
1151

1152
  qBuf[0] = 0;
23,616✔
1153
  varDataSetLen(buf, 0);  // initialize to empty string
23,616✔
1154

1155
  if (pObj->nFuncs <= 0) return;
23,616✔
1156

1157
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
23,616✔
1158
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
23,616✔
1159
  pStb = mndAcquireStb(pMnode, tbFName);
23,616✔
1160
  if (pStb == NULL) {
23,616✔
1161
    mWarn("rsma:%s, failed to acquire table %s for function list", pObj->name, tbFName);
×
UNCOV
1162
    return;
×
1163
  }
1164

1165
  SSchema *pColumns = pStb->pColumns;
23,616✔
1166

1167
  int32_t len = 0, j = 0;
23,616✔
1168
  char    colFunc[TSDB_COL_NAME_LEN + TSDB_FUNC_NAME_LEN + 2] = {0};
23,616✔
1169
  for (int32_t i = 0; i < pObj->nFuncs; ++i) {
169,002✔
1170
    col_id_t colId = pObj->funcColIds[i];
145,386✔
1171
    for (; j < pStb->numOfColumns;) {
301,104✔
1172
      if (pColumns[j].colId == colId) {
301,104✔
1173
        int32_t colFuncLen =
145,386✔
1174
            tsnprintf(colFunc, sizeof(colFunc), "%s(%s),", fmGetFuncName(pObj->funcIds[i]), pColumns[j].name);
145,386✔
1175
        if ((qBufLen - len) > colFuncLen) {
145,386✔
1176
          len += tsnprintf(qBuf + len, colFuncLen + 1, "%s", colFunc);
145,386✔
1177
        } else {
UNCOV
1178
          goto _exit;
×
1179
        }
1180
        break;
145,386✔
1181
      } else if (pColumns[j].colId > colId) {
155,718✔
UNCOV
1182
        break;
×
1183
      } else {
1184
        ++j;
155,718✔
1185
      }
1186
    }
1187
  }
1188
_exit:
23,616✔
1189
  qBuf[len > 0 ? len - 1 : 0] = 0;  // remove the last ','
23,616✔
1190
  varDataSetLen(buf, len > 0 ? len - 1 : 0);
23,616✔
1191
  mndReleaseStb(pMnode, pStb);
23,616✔
1192
}
1193
#endif
1194
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
16,236✔
1195
  SMnode          *pMnode = pReq->info.node;
16,236✔
1196
  int32_t          code = 0, lino = 0;
16,236✔
1197
  int32_t          numOfRows = 0;
16,236✔
1198
  int32_t          cols = 0;
16,236✔
1199
  char             tmp[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE];
16,236✔
1200
  int32_t          tmpLen = 0;
16,236✔
1201
  int32_t          bufLen = 0;
16,236✔
1202
  char            *pBuf = NULL;
16,236✔
1203
  char            *qBuf = NULL;
16,236✔
1204
  void            *pIter = NULL;
16,236✔
1205
  SSdb            *pSdb = pMnode->pSdb;
16,236✔
1206
  SColumnInfoData *pColInfo = NULL;
16,236✔
1207
  SUserObj        *pUser = NULL;
16,236✔
1208
  char             objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
16,236✔
1209
  bool             showAll = false;
16,236✔
1210

1211
#ifdef TD_ENTERPRISE
1212
  pBuf = tmp;
16,236✔
1213
  bufLen = sizeof(tmp) - VARSTR_HEADER_SIZE;
16,236✔
1214
  if (pShow->numOfRows < 1) {
16,236✔
1215
    TAOS_CHECK_EXIT(mndAcquireUser(pMnode, (RPC_MSG_USER(pReq)), &pUser));
16,236✔
1216
    (void)snprintf(objFName, sizeof(objFName), "%d.*", pUser->acctId);
16,236✔
1217
    int32_t objLevel = privObjGetLevel(PRIV_OBJ_RSMA);
16,236✔
1218
    showAll =
16,236✔
1219
        (0 == mndCheckSysObjPrivilege(pMnode, pUser, RPC_MSG_TOKEN(pReq), PRIV_CM_SHOW, PRIV_OBJ_RSMA, 0, objFName,
16,236✔
1220
                                      objLevel == 0 ? NULL : "*"));  // 1.*.*
1221

1222
    SRsmaObj *pObj = NULL;
16,236✔
1223
    int32_t   index = 0;
16,236✔
1224
    while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
39,852✔
1225
      if (!showAll) {
23,616✔
1226
        if (mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_SHOW, PRIV_OBJ_RSMA, pObj->ownerId, pObj->dbFName,
×
1227
                                     objLevel == 0 ? NULL : pObj->name)) {  // 1.db1.rsma1
×
1228
          sdbRelease(pSdb, pObj);
×
1229
          continue;
×
1230
        }
1231
      }
1232

1233
      cols = 0;
23,616✔
1234
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
23,616✔
1235
      qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
23,616✔
1236
      TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->name));
23,616✔
1237
      varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
23,616✔
1238
      COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
23,616✔
1239

1240
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
23,616✔
1241
        COL_DATA_SET_VAL_GOTO((const char *)(&pObj->uid), false, pObj, pIter, _exit);
23,616✔
1242
      }
1243

1244
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
23,616✔
1245
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
23,616✔
1246
        const char *db = strchr(pObj->dbFName, '.');
23,616✔
1247
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", db ? db + 1 : pObj->dbFName));
23,616✔
1248
        varDataSetLen(pBuf, strlen(qBuf));
23,616✔
1249
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
23,616✔
1250
      }
1251

1252
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
23,616✔
1253
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
23,616✔
1254
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->tbName));
23,616✔
1255
        varDataSetLen(pBuf, strlen(qBuf));
23,616✔
1256
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
23,616✔
1257
      }
1258

1259
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
23,616✔
1260
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
23,616✔
1261
        if (pObj->tbType == TSDB_SUPER_TABLE) {
23,616✔
1262
          TAOS_UNUSED(snprintf(qBuf, bufLen, "SUPER_TABLE"));
23,616✔
1263
        } else if (pObj->tbType == TSDB_NORMAL_TABLE) {
×
1264
          TAOS_UNUSED(snprintf(qBuf, bufLen, "NORMAL_TABLE"));
×
1265
        } else if (pObj->tbType == TSDB_CHILD_TABLE) {
×
1266
          TAOS_UNUSED(snprintf(qBuf, bufLen, "CHILD_TABLE"));
×
1267
        } else {
1268
          TAOS_UNUSED(snprintf(qBuf, bufLen, "UNKNOWN"));
×
1269
        }
1270
        varDataSetLen(pBuf, strlen(qBuf));
23,616✔
1271
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
23,616✔
1272
      }
1273

1274
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
23,616✔
1275
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->createdTime, false, pObj, pIter, _exit);
23,616✔
1276
      }
1277

1278
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
23,616✔
1279
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
23,616✔
1280
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%" PRIi64 "%c", pObj->interval[0], pObj->intervalUnit));
23,616✔
1281
        if (pObj->interval[1] > 0) {
23,616✔
1282
          tmpLen = strlen(qBuf);
23,616✔
1283
          TAOS_UNUSED(
23,616✔
1284
              snprintf(qBuf + tmpLen, bufLen - tmpLen, ",%" PRIi64 "%c", pObj->interval[1], pObj->intervalUnit));
1285
        }
1286
        varDataSetLen(pBuf, strlen(qBuf));
23,616✔
1287
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
23,616✔
1288
      }
1289

1290
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
23,616✔
1291
        mndRetrieveRsmaFuncList(pMnode, pObj, pBuf, bufLen);
23,616✔
1292
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
23,616✔
1293
      }
1294

1295
      sdbRelease(pSdb, pObj);
23,616✔
1296
      ++numOfRows;
23,616✔
1297
    }
1298
  }
1299

1300
  pShow->numOfRows += numOfRows;
16,236✔
1301

1302
_exit:
16,236✔
1303
  if (pUser) mndReleaseUser(pMnode, pUser);
16,236✔
1304
  if (code < 0) {
16,236✔
1305
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1306
    TAOS_RETURN(code);
×
1307
  }
1308
#endif
1309
  return numOfRows;
16,236✔
1310
}
1311

1312
static void mndCancelRetrieveRsma(SMnode *pMnode, void *pIter) {
×
1313
  SSdb *pSdb = pMnode->pSdb;
×
1314
  sdbCancelFetchByType(pSdb, pIter, SDB_RSMA);
×
1315
}
×
1316

1317
int32_t mndDropRsmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
627,958✔
1318
  int32_t code = 0;
627,958✔
1319
#ifdef TD_ENTERPRISE
1320
  SSdb     *pSdb = pMnode->pSdb;
627,958✔
1321
  SRsmaObj *pObj = NULL;
627,958✔
1322
  void     *pIter = NULL;
627,958✔
1323

1324
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
635,338✔
1325
    if (pObj->dbUid == pDb->uid) {
7,380✔
1326
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
7,380✔
1327
        sdbCancelFetch(pSdb, pIter);
×
1328
        sdbRelease(pSdb, pObj);
×
1329
        TAOS_RETURN(code);
×
1330
      }
1331
    }
1332
    sdbRelease(pSdb, pObj);
7,380✔
1333
  }
1334
#endif
1335
  TAOS_RETURN(code);
627,958✔
1336
}
1337

1338
int32_t mndDropRsmaByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
446,386✔
1339
  int32_t code = 0;
446,386✔
1340
#ifdef TD_ENTERPRISE
1341
  SSdb     *pSdb = pMnode->pSdb;
446,386✔
1342
  SRsmaObj *pObj = NULL;
446,386✔
1343
  void     *pIter = NULL;
446,386✔
1344

1345
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
447,124✔
1346
    if (pObj->tbUid == pStb->uid && pObj->dbUid == pStb->dbUid) {
738✔
1347
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
738✔
1348
        sdbCancelFetch(pSdb, pIter);
×
1349
        sdbRelease(pSdb, pObj);
×
1350
        TAOS_RETURN(code);
×
1351
      }
1352
    }
1353
    sdbRelease(pSdb, pObj);
738✔
1354
  }
1355
#endif
1356
  TAOS_RETURN(code);
446,386✔
1357
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc