• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4768

01 Oct 2025 04:06AM UTC coverage: 57.85% (-0.8%) from 58.606%
#4768

push

travis-ci

web-flow
Merge pull request #33171 from taosdata/merge/3.3.6tomain

merge: from 3.3.6 to main branch

137167 of 302743 branches covered (45.31%)

Branch coverage included in aggregate %.

15 of 20 new or added lines in 2 files covered. (75.0%)

12125 existing lines in 175 files now uncovered.

208282 of 294403 relevant lines covered (70.75%)

5618137.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.73
/source/dnode/mnode/impl/src/mndRsma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndInfoSchema.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndShow.h"
25
#include "mndSma.h"
26
#include "mndStb.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "parser.h"
31
#include "tname.h"
32

33
#define MND_RSMA_VER_NUMBER   1
34
#define MND_RSMA_RESERVE_SIZE 64
35

36
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pSma);
37
static SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw);
38
static int32_t  mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pSma);
39
static int32_t  mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pSpSmatb);
40
static int32_t  mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew);
41
static int32_t  mndProcessCreateRsmaReq(SRpcMsg *pReq);
42
static int32_t  mndProcessDropRsmaReq(SRpcMsg *pReq);
43
static int32_t  mndProcessGetRsmaReq(SRpcMsg *pReq);
44

45
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
46
static void    mndCancelRetrieveRsma(SMnode *pMnode, void *pIter);
47
static int32_t mndRetrieveRsmaTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
48
static void    mndCancelRetrieveRsmaTask(SMnode *pMnode, void *pIter);
49

50
int32_t mndInitRsma(SMnode *pMnode) {
1,948✔
51
  SSdbTable table = {
1,948✔
52
      .sdbType = SDB_RSMA,
53
      .keyType = SDB_KEY_BINARY,
54
      .encodeFp = (SdbEncodeFp)mndRsmaActionEncode,
55
      .decodeFp = (SdbDecodeFp)mndRsmaActionDecode,
56
      .insertFp = (SdbInsertFp)mndRsmaActionInsert,
57
      .updateFp = (SdbUpdateFp)mndRsmaActionUpdate,
58
      .deleteFp = (SdbDeleteFp)mndRsmaActionDelete,
59
  };
60

61
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_RSMA, mndProcessCreateRsmaReq);
1,948✔
62
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_RSMA_RSP, mndTransProcessRsp);
1,948✔
63
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_RSMA, mndProcessDropRsmaReq);
1,948✔
64
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_RSMA_RSP, mndTransProcessRsp);
1,948✔
65
  mndSetMsgHandle(pMnode, TDMT_MND_GET_RSMA, mndProcessGetRsmaReq);
1,948✔
66
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndRetrieveRsma);
1,948✔
67
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndCancelRetrieveRsma);
1,948✔
68

69
  return sdbSetTable(pMnode->pSdb, table);
1,948✔
70
}
71

72
void mndCleanupRsma(SMnode *pMnode) {}
1,948✔
73

74
void mndRsmaFreeObj(SRsmaObj *pObj) {
49✔
75
  if (pObj) {
49!
76
    taosMemoryFreeClear(pObj->funcColIds);
49!
77
    taosMemoryFreeClear(pObj->funcIds);
49!
78
  }
79
}
49✔
80

81
static int32_t tSerializeSRsmaObj(void *buf, int32_t bufLen, const SRsmaObj *pObj) {
78✔
82
  int32_t  code = 0, lino = 0;
78✔
83
  int32_t  tlen = 0;
78✔
84
  SEncoder encoder = {0};
78✔
85
  tEncoderInit(&encoder, buf, bufLen);
78✔
86

87
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
78!
88

89
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->name));
156!
90
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->tbName));
156!
91
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbFName));
156!
92
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->createUser));
156!
93
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->createdTime));
156!
94
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->updateTime));
156!
95
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->uid));
156!
96
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->tbUid));
156!
97
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
156!
98
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[0]));
156!
99
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[1]));
156!
100
  TAOS_CHECK_EXIT(tEncodeU64v(&encoder, pObj->reserved));
156!
101
  TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->version));
156!
102
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->tbType));
156!
103
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->intervalUnit));
156!
104
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nFuncs));
156!
105
  for (int16_t i = 0; i < pObj->nFuncs; ++i) {
584✔
106
    TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->funcColIds[i]));
1,012!
107
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->funcIds[i]));
1,012!
108
  }
109

110
  tEndEncode(&encoder);
78✔
111

112
  tlen = encoder.pos;
78✔
113
_exit:
78✔
114
  tEncoderClear(&encoder);
78✔
115
  if (code < 0) {
78!
UNCOV
116
    mError("rsma, %s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
117
    TAOS_RETURN(code);
×
118
  }
119

120
  return tlen;
78✔
121
}
122

123
static int32_t tDeserializeSRsmaObj(void *buf, int32_t bufLen, SRsmaObj *pObj) {
37✔
124
  int32_t  code = 0, lino = 0;
37✔
125
  SDecoder decoder = {0};
37✔
126
  tDecoderInit(&decoder, buf, bufLen);
37✔
127

128
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
37!
129

130
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->name));
37!
131
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->tbName));
37!
132
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbFName));
37!
133
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->createUser));
37!
134
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->createdTime));
74!
135
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->updateTime));
74!
136
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->uid));
74!
137
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->tbUid));
74!
138
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
74!
139
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[0]));
74!
140
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[1]));
74!
141
  TAOS_CHECK_EXIT(tDecodeU64v(&decoder, &pObj->reserved));
74!
142
  TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->version));
74!
143
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->tbType));
74!
144
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->intervalUnit));
74!
145
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nFuncs));
74!
146
  if (pObj->nFuncs > 0) {
37!
147
    if (!(pObj->funcColIds = taosMemoryMalloc(sizeof(col_id_t) * pObj->nFuncs))) {
37!
UNCOV
148
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
149
    }
150
    if (!(pObj->funcIds = taosMemoryMalloc(sizeof(int32_t) * pObj->nFuncs))) {
37!
UNCOV
151
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
152
    }
153
    for (int16_t i = 0; i < pObj->nFuncs; ++i) {
277✔
154
      TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->funcColIds[i]));
480!
155
      TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->funcIds[i]));
480!
156
    }
157
  }
158

159
_exit:
37✔
160
  tEndDecode(&decoder);
37✔
161
  tDecoderClear(&decoder);
37✔
162
  if (code < 0) {
37!
UNCOV
163
    mError("rsma, %s failed at line %d since %s, row:%p", __func__, lino, tstrerror(code), pObj);
×
164
  }
165
  TAOS_RETURN(code);
37✔
166
}
167

168
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pObj) {
39✔
169
  int32_t  code = 0, lino = 0;
39✔
170
  void    *buf = NULL;
39✔
171
  SSdbRaw *pRaw = NULL;
39✔
172
  int32_t  tlen = tSerializeSRsmaObj(NULL, 0, pObj);
39✔
173
  if (tlen < 0) {
39!
UNCOV
174
    TAOS_CHECK_EXIT(tlen);
×
175
  }
176

177
  int32_t size = sizeof(int32_t) + tlen;
39✔
178
  pRaw = sdbAllocRaw(SDB_RSMA, MND_RSMA_VER_NUMBER, size);
39✔
179
  if (pRaw == NULL) {
39!
UNCOV
180
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
181
  }
182

183
  buf = taosMemoryMalloc(tlen);
39!
184
  if (buf == NULL) {
39!
UNCOV
185
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
186
  }
187

188
  tlen = tSerializeSRsmaObj(buf, tlen, pObj);
39✔
189
  if (tlen < 0) {
39!
UNCOV
190
    TAOS_CHECK_EXIT(tlen);
×
191
  }
192

193
  int32_t dataPos = 0;
39✔
194
  SDB_SET_INT32(pRaw, dataPos, tlen, _exit);
39!
195
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _exit);
39!
196
  SDB_SET_DATALEN(pRaw, dataPos, _exit);
39!
197

198
_exit:
39✔
199
  taosMemoryFreeClear(buf);
39!
200
  if (code != TSDB_CODE_SUCCESS) {
39!
UNCOV
201
    terrno = code;
×
UNCOV
202
    mError("rsma, failed at line %d to encode to raw:%p since %s", lino, pRaw, tstrerror(code));
×
UNCOV
203
    sdbFreeRaw(pRaw);
×
UNCOV
204
    return NULL;
×
205
  }
206

207
  mTrace("rsma, encode to raw:%p, row:%p", pRaw, pObj);
39!
208
  return pRaw;
39✔
209
}
210

211
SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw) {
37✔
212
  int32_t   code = 0, lino = 0;
37✔
213
  SSdbRow  *pRow = NULL;
37✔
214
  SRsmaObj *pObj = NULL;
37✔
215
  void     *buf = NULL;
37✔
216

217
  int8_t sver = 0;
37✔
218
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
37!
219
    goto _exit;
×
220
  }
221

222
  if (sver != MND_RSMA_VER_NUMBER) {
37!
UNCOV
223
    code = TSDB_CODE_SDB_INVALID_DATA_VER;
×
UNCOV
224
    mError("rsma read invalid ver, data ver: %d, curr ver: %d", sver, MND_RSMA_VER_NUMBER);
×
225
    goto _exit;
×
226
  }
227

228
  if (!(pRow = sdbAllocRow(sizeof(SRsmaObj)))) {
37!
UNCOV
229
    code = TSDB_CODE_OUT_OF_MEMORY;
×
230
    goto _exit;
×
231
  }
232

233
  if (!(pObj = sdbGetRowObj(pRow))) {
37!
UNCOV
234
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
235
    goto _exit;
×
236
  }
237

238
  int32_t tlen;
239
  int32_t dataPos = 0;
37✔
240
  SDB_GET_INT32(pRaw, dataPos, &tlen, _exit);
37!
241
  buf = taosMemoryMalloc(tlen + 1);
37!
242
  if (buf == NULL) {
37!
UNCOV
243
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
244
    goto _exit;
×
245
  }
246
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _exit);
37!
247

248
  if (tDeserializeSRsmaObj(buf, tlen, pObj) < 0) {
37!
UNCOV
249
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
250
    goto _exit;
×
251
  }
252

253
  taosInitRWLatch(&pObj->lock);
37✔
254

255
_exit:
37✔
256
  taosMemoryFreeClear(buf);
37!
257
  if (code != TSDB_CODE_SUCCESS) {
37!
258
    terrno = code;
×
UNCOV
259
    mError("rsma, failed at line %d to decode from raw:%p since %s", lino, pRaw, tstrerror(code));
×
UNCOV
260
    mndRsmaFreeObj(pObj);
×
UNCOV
261
    taosMemoryFreeClear(pRow);
×
UNCOV
262
    return NULL;
×
263
  }
264
  mTrace("rsma, decode from raw:%p, row:%p", pRaw, pObj);
37!
265
  return pRow;
37✔
266
}
267

268
static int32_t mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pObj) {
12✔
269
  mTrace("rsma:%s, perform insert action, row:%p", pObj->name, pObj);
12!
270
  return 0;
12✔
271
}
272

273
static int32_t mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pObj) {
37✔
274
  mTrace("rsma:%s, perform delete action, row:%p", pObj->name, pObj);
37!
275
  mndRsmaFreeObj(pObj);
37✔
276
  return 0;
37✔
277
}
278

279
static int32_t mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew) {
15✔
280
  mTrace("rsma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
15!
281
  taosWLockLatch(&pOld->lock);
15✔
282
  pOld->updateTime = pNew->updateTime;
15✔
283
  pOld->nFuncs = pNew->nFuncs;
15✔
284
  TSWAP(pOld->funcColIds, pNew->funcColIds);
15✔
285
  TSWAP(pOld->funcIds, pNew->funcIds);
15✔
286
  taosWUnLockLatch(&pOld->lock);
15✔
287
  return 0;
15✔
288
}
289

290
SRsmaObj *mndAcquireRsma(SMnode *pMnode, char *name) {
28✔
291
  SSdb     *pSdb = pMnode->pSdb;
28✔
292
  SRsmaObj *pObj = sdbAcquire(pSdb, SDB_RSMA, name);
28✔
293
  if (pObj == NULL) {
28✔
294
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
18!
295
      terrno = TSDB_CODE_RSMA_NOT_EXIST;
18✔
UNCOV
296
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
297
      terrno = TSDB_CODE_MND_RSMA_IN_CREATING;
×
298
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
UNCOV
299
      terrno = TSDB_CODE_MND_RSMA_IN_DROPPING;
×
300
    } else {
UNCOV
301
      terrno = TSDB_CODE_APP_ERROR;
×
UNCOV
302
      mFatal("rsma:%s, failed to acquire rsma since %s", name, terrstr());
×
303
    }
304
  }
305
  return pObj;
28✔
306
}
307

308
void mndReleaseRsma(SMnode *pMnode, SRsmaObj *pSma) {
10✔
309
  SSdb *pSdb = pMnode->pSdb;
10✔
310
  sdbRelease(pSdb, pSma);
10✔
311
}
10✔
312

313
static int32_t mndSetCreateRsmaRedoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
314
  int32_t  code = 0;
×
315
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
×
UNCOV
316
  if (pRedoRaw == NULL) {
×
317
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
318
    if (terrno != 0) code = terrno;
×
UNCOV
319
    TAOS_RETURN(code);
×
320
  }
UNCOV
321
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
UNCOV
322
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
323

324
  TAOS_RETURN(code);
×
325
}
326

327
static int32_t mndSetCreateRsmaUndoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
328
  int32_t  code = 0;
×
329
  SSdbRaw *pUndoRaw = mndRsmaActionEncode(pSma);
×
UNCOV
330
  if (!pUndoRaw) {
×
331
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
332
    if (terrno != 0) code = terrno;
×
333
    TAOS_RETURN(code);
×
334
  }
UNCOV
335
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
UNCOV
336
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
UNCOV
337
  TAOS_RETURN(code);
×
338
}
339

340
static int32_t mndSetCreateRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
12✔
341
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
12✔
342
  if (pDbRaw == NULL) return -1;
12!
343

344
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
12!
345
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
12!
346
  return 0;
12✔
347
}
348

349
static void *mndBuildVCreateRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
24✔
350
                                    SMCreateRsmaReq *pCreate, int32_t *pContLen) {
351
  int32_t         code = 0, lino = 0;
24✔
352
  SMsgHead       *pHead = NULL;
24✔
353
  SVCreateRsmaReq req = *pCreate;
24✔
354

355
  req.uid = pObj->uid;  // use the uid generated by mnode
24✔
356

357
  int32_t contLen = tSerializeSVCreateRsmaReq(NULL, 0, &req);
24✔
358
  TAOS_CHECK_EXIT(contLen);
24!
359
  contLen += sizeof(SMsgHead);
24✔
360
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
24!
361
  pHead->contLen = htonl(contLen);
24✔
362
  pHead->vgId = htonl(pVgroup->vgId);
24✔
363
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
24✔
364
  TAOS_CHECK_EXIT(tSerializeSVCreateRsmaReq(pBuf, contLen, &req));
24!
365
_exit:
24✔
366
  if (code < 0) {
24!
UNCOV
367
    taosMemoryFreeClear(pHead);
×
UNCOV
368
    terrno = code;
×
UNCOV
369
    *pContLen = 0;
×
UNCOV
370
    return NULL;
×
371
  }
372
  *pContLen = contLen;
24✔
373
  return pHead;
24✔
374
}
375

376
static int32_t mndSetCreateRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
12✔
377
                                           SMCreateRsmaReq *pCreate) {
378
  int32_t code = 0;
12✔
379
  SSdb   *pSdb = pMnode->pSdb;
12✔
380
  SVgObj *pVgroup = NULL;
12✔
381
  void   *pIter = NULL;
12✔
382

383
  SName name = {0};
12✔
384
  if ((code = tNameFromString(&name, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
12!
UNCOV
385
    return code;
×
386
  }
387
  tstrncpy(pCreate->tbFName, (char *)tNameGetTableName(&name), sizeof(pCreate->tbFName)); // convert tbFName to tbName
12✔
388

389
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
60✔
390
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
48✔
391
      sdbRelease(pSdb, pVgroup);
24✔
392
      continue;
24✔
393
    }
394

395
    int32_t contLen = 0;
24✔
396
    void   *pReq = mndBuildVCreateRsmaReq(pMnode, pVgroup, pStb, pObj, pCreate, &contLen);
24✔
397
    if (pReq == NULL) {
24!
UNCOV
398
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
399
      sdbRelease(pSdb, pVgroup);
×
UNCOV
400
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
401
      TAOS_RETURN(code);
×
402
    }
403

404
    STransAction action = {0};
24✔
405
    action.mTraceId = pTrans->mTraceId;
24✔
406
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
24✔
407
    action.pCont = pReq;
24✔
408
    action.contLen = contLen;
24✔
409
    action.msgType = TDMT_VND_CREATE_RSMA;
24✔
410
    action.acceptableCode = TSDB_CODE_RSMA_ALREADY_EXISTS;  // check whether the rsma uid exist
24✔
411
    action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;         // retry if relative table not exist
24✔
412
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
24!
UNCOV
413
      taosMemoryFree(pReq);
×
UNCOV
414
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
415
      sdbRelease(pSdb, pVgroup);
×
UNCOV
416
      TAOS_RETURN(code);
×
417
    }
418
    sdbRelease(pSdb, pVgroup);
24✔
419
  }
420

421
  TAOS_RETURN(code);
12✔
422
}
423

424
static int32_t mndSetCreateRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
12✔
425
  int32_t  code = 0;
12✔
426
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
12✔
427
  if (pCommitRaw == NULL) {
12!
UNCOV
428
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
429
    if (terrno != 0) code = terrno;
×
UNCOV
430
    TAOS_RETURN(code);
×
431
  }
432
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
12!
433
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
12!
434

435
  TAOS_RETURN(code);
12✔
436
}
437

438
static int32_t mndSetDropRsmaPrepareLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
3✔
439
  int32_t  code = 0;
3✔
440
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
3✔
441
  if (pRedoRaw == NULL) {
3!
UNCOV
442
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
443
    if (terrno != 0) code = terrno;
×
UNCOV
444
    return -1;
×
445
  }
446
  TAOS_CHECK_RETURN(mndTransAppendPrepareLog(pTrans, pRedoRaw));
3!
447
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
3!
448

449
  return 0;
3✔
450
}
451

452
static int32_t mndSetDropRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
10✔
453
  int32_t  code = 0;
10✔
454
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
10✔
455
  if (pCommitRaw == NULL) {
10!
UNCOV
456
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
457
    if (terrno != 0) code = terrno;
×
UNCOV
458
    return -1;
×
459
  }
460
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
10!
461
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
10!
462

463
  return 0;
10✔
464
}
465

466
static void *mndBuildVDropRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SRsmaObj *pObj, int32_t *pContLen) {
6✔
467
  int32_t       code = 0, lino = 0;
6✔
468
  SMsgHead     *pHead = NULL;
6✔
469
  SVDropRsmaReq req = {0};
6✔
470

471
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s",  pObj->tbName);
6✔
472
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
6✔
473
  req.tbType = pObj->tbType;
6✔
474
  req.uid = pObj->uid;
6✔
475
  req.tbUid = pObj->tbUid;
6✔
476

477
  int32_t contLen = tSerializeSVDropRsmaReq(NULL, 0, &req);
6✔
478
  TAOS_CHECK_EXIT(contLen);
6!
479
  contLen += sizeof(SMsgHead);
6✔
480
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
6!
481
  pHead->contLen = htonl(contLen);
6✔
482
  pHead->vgId = htonl(pVgroup->vgId);
6✔
483
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
6✔
484
  TAOS_CHECK_EXIT(tSerializeSVDropRsmaReq(pBuf, contLen, &req));
6!
485
_exit:
6✔
486
  if (code < 0) {
6!
UNCOV
487
    taosMemoryFreeClear(pHead);
×
UNCOV
488
    terrno = code;
×
UNCOV
489
    *pContLen = 0;
×
UNCOV
490
    return NULL;
×
491
  }
492
  *pContLen = contLen;
6✔
493
  return pHead;
6✔
494
}
495

496
static int32_t mndSetDropRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SRsmaObj *pSma) {
3✔
497
  int32_t code = 0;
3✔
498
  SSdb   *pSdb = pMnode->pSdb;
3✔
499
  SVgObj *pVgroup = NULL;
3✔
500
  void   *pIter = NULL;
3✔
501

502
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
15✔
503
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
12✔
504
      sdbRelease(pSdb, pVgroup);
6✔
505
      continue;
6✔
506
    }
507

508
    int32_t contLen = 0;
6✔
509
    void   *pReq = mndBuildVDropRsmaReq(pMnode, pVgroup, pSma, &contLen);
6✔
510
    if (pReq == NULL) {
6!
UNCOV
511
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
512
      sdbRelease(pSdb, pVgroup);
×
UNCOV
513
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
514
      TAOS_RETURN(code);
×
515
    }
516

517
    STransAction action = {0};
6✔
518
    action.mTraceId = pTrans->mTraceId;
6✔
519
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
6✔
520
    action.pCont = pReq;
6✔
521
    action.contLen = contLen;
6✔
522
    action.msgType = TDMT_VND_DROP_RSMA;
6✔
523
    action.acceptableCode = TSDB_CODE_RSMA_NOT_EXIST;
6✔
524
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
6!
UNCOV
525
      taosMemoryFree(pReq);
×
UNCOV
526
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
527
      sdbRelease(pSdb, pVgroup);
×
UNCOV
528
      TAOS_RETURN(code);
×
529
    }
530
    sdbRelease(pSdb, pVgroup);
6✔
531
  }
532
  TAOS_RETURN(code);
3✔
533
}
534

535
static int32_t mndDropRsma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SRsmaObj *pObj) {
3✔
536
  int32_t code = 0, lino = 0;
3✔
537

538
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-rsma");
3✔
539
  if (pTrans == NULL) {
3!
UNCOV
540
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
541
    if (terrno != 0) code = terrno;
×
UNCOV
542
    goto _exit;
×
543
  }
544

545
  mInfo("trans:%d start to drop rsma:%s", pTrans->id, pObj->name);
3!
546

547
  mndTransSetDbName(pTrans, pDb->name, pObj->name);
3✔
548
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
3✔
549
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
3!
550

551
  mndTransSetOper(pTrans, MND_OPER_DROP_RSMA);
3✔
552
  TAOS_CHECK_EXIT(mndSetDropRsmaPrepareLogs(pMnode, pTrans, pObj));
3!
553
  TAOS_CHECK_EXIT(mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj));
3!
554
  TAOS_CHECK_EXIT(mndSetDropRsmaRedoActions(pMnode, pTrans, pDb, pObj));
3!
555

556
  // int32_t rspLen = 0;
557
  // void   *pRsp = NULL;
558
  // TAOS_CHECK_EXIT(mndBuildDropRsmaRsp(pObj, &rspLen, &pRsp, false));
559
  // mndTransSetRpcRsp(pTrans, pRsp, rspLen);
560

561
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
3!
562
_exit:
3✔
563
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
3!
UNCOV
564
    mError("rsma:%s, failed to drop at line:%d since %s", pObj->name, lino, tstrerror(code));
×
565
  }
566
  mndTransDrop(pTrans);
3✔
567
  TAOS_RETURN(code);
3✔
568
}
569

570
static int32_t mndProcessDropRsmaReq(SRpcMsg *pReq) {
3✔
571
  SMnode       *pMnode = pReq->info.node;
3✔
572
  int32_t       code = 0, lino = 0;
3✔
573
  SDbObj       *pDb = NULL;
3✔
574
  SRsmaObj     *pObj = NULL;
3✔
575
  SMDropRsmaReq dropReq = {0};
3✔
576

577
  TAOS_CHECK_GOTO(tDeserializeSMDropRsmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _exit);
3!
578

579
  mInfo("rsma:%s, start to drop", dropReq.name);
3!
580

581
  pObj = mndAcquireRsma(pMnode, dropReq.name);
3✔
582
  if (pObj == NULL) {
3!
UNCOV
583
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
584
    if (terrno != 0) code = terrno;
×
UNCOV
585
    if (dropReq.igNotExists) {
×
UNCOV
586
      code = 0;  // mndBuildDropMountRsp(pObj, &pReq->info.rspLen, &pReq->info.rsp, true);
×
587
    }
UNCOV
588
    goto _exit;
×
589
  }
590

591
  SName name = {0};
3✔
592
  TAOS_CHECK_EXIT(tNameFromString(&name, pObj->dbFName, T_NAME_ACCT | T_NAME_DB));
3!
593

594
  char db[TSDB_TABLE_FNAME_LEN] = {0};
3✔
595
  (void)tNameGetFullDbName(&name, db);
3✔
596
  if (!(pDb = mndAcquireDb(pMnode, db))) {
3!
UNCOV
597
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
598
  }
599

600
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _exit);
3!
601

602
  code = mndDropRsma(pMnode, pReq, pDb, pObj);
3✔
603
  if (code == TSDB_CODE_SUCCESS) {
3!
604
    code = TSDB_CODE_ACTION_IN_PROGRESS;
3✔
605
  }
606

607
  auditRecord(pReq, pMnode->clusterId, "dropRsma", dropReq.name, "", "", 0);
3✔
608
_exit:
3✔
609
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
3!
UNCOV
610
    mError("rsma:%s, failed at line %d to drop since %s", dropReq.name, lino, tstrerror(code));
×
611
  }
612

613
  mndReleaseDb(pMnode, pDb);
3✔
614
  mndReleaseRsma(pMnode, pObj);
3✔
615
  TAOS_RETURN(code);
3✔
616
}
617

618
static int32_t mndCreateRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
12✔
619
                             SMCreateRsmaReq *pCreate) {
620
  int32_t  code = 0, lino = 0;
12✔
621
  SRsmaObj obj = {0};
12✔
622
  int32_t  nDbs = 0, nVgs = 0, nStbs = 0;
12✔
623
  SDbObj  *pDbs = NULL;
12✔
624
  SStbObj *pStbs = NULL;
12✔
625
  STrans  *pTrans = NULL;
12✔
626

627
  (void)snprintf(obj.name, TSDB_TABLE_NAME_LEN, "%s", pCreate->name);
12✔
628
  (void)snprintf(obj.dbFName, TSDB_DB_FNAME_LEN, "%s", pDb->name);
12✔
629

630
  const char *tbName = strrchr(pCreate->tbFName, '.');
12✔
631
  (void)snprintf(obj.tbName, TSDB_TABLE_NAME_LEN, "%s", tbName ? tbName + 1 : pCreate->tbFName);
12!
632
  (void)snprintf(obj.createUser, TSDB_USER_LEN, "%s", pUser->user);
12✔
633
  obj.createdTime = taosGetTimestampMs();
12✔
634
  obj.updateTime = obj.createdTime;
12✔
635
  obj.uid = mndGenerateUid(obj.name, TSDB_TABLE_FNAME_LEN);
12✔
636
  obj.tbUid = pCreate->tbUid;
12✔
637
  obj.dbUid = pDb->uid;
12✔
638
  obj.interval[0] = pCreate->interval[0];
12✔
639
  obj.interval[1] = pCreate->interval[1];
12✔
640
  obj.version = 1;
12✔
641
  obj.tbType = pCreate->tbType;  // ETableType: 1 stable. Only super table supported currently.
12✔
642
  obj.intervalUnit = pCreate->intervalUnit;
12✔
643
  obj.nFuncs = pCreate->nFuncs;
12✔
644
  if (obj.nFuncs > 0) {
12!
645
    TSDB_CHECK_NULL((obj.funcColIds = taosMemoryCalloc(obj.nFuncs, sizeof(col_id_t))), code, lino, _exit, terrno);
12!
646
    TSDB_CHECK_NULL((obj.funcIds = taosMemoryCalloc(obj.nFuncs, sizeof(func_id_t))), code, lino, _exit, terrno);
12!
647
    for (int16_t i = 0; i < obj.nFuncs; ++i) {
90✔
648
      obj.funcColIds[i] = pCreate->funcColIds[i];
78✔
649
      obj.funcIds[i] = pCreate->funcIds[i];
78✔
650
    }
651
  }
652

653
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-rsma")),
12!
654
                  code, lino, _exit, terrno);
655
  mInfo("trans:%d, used to create rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
12!
656

657
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
12✔
658
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
12✔
659
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
12!
660

661
  mndTransSetOper(pTrans, MND_OPER_CREATE_RSMA);
12✔
662
  TAOS_CHECK_EXIT(mndSetCreateRsmaPrepareActions(pMnode, pTrans, &obj));
12!
663
  TAOS_CHECK_EXIT(mndSetCreateRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pCreate));
12!
664
  TAOS_CHECK_EXIT(mndSetCreateRsmaCommitLogs(pMnode, pTrans, &obj));
12!
665
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
12!
666
_exit:
12✔
667
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
12!
UNCOV
668
    mError("rsma:%s, failed at line %d to create rsma, since %s", obj.name, lino, tstrerror(code));
×
669
  }
670
  mndTransDrop(pTrans);
12✔
671
  mndRsmaFreeObj(&obj);
12✔
672
  if (pStbs) {
12!
UNCOV
673
    for (int32_t i = 0; i < nStbs; ++i) {
×
UNCOV
674
      mndFreeStb(pStbs + i);
×
675
    }
UNCOV
676
    taosMemFreeClear(pStbs);
×
677
  }
678
  TAOS_RETURN(code);
12✔
679
}
680

681
static int32_t mndCheckCreateRsmaReq(SMCreateRsmaReq *pCreate) {
22✔
682
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
22✔
683
  if (pCreate->name[0] == 0) goto _exit;
22!
684
  if (pCreate->tbFName[0] == 0) goto _exit;
22!
685
  if (pCreate->igExists < 0 || pCreate->igExists > 1) goto _exit;
22!
686
  if (pCreate->intervalUnit < 0) goto _exit;
22!
687
  if (pCreate->interval[0] <= 0) goto _exit;
22!
688
  if (pCreate->interval[1] < 0) goto _exit;
22!
689

690
  SName fname = {0};
22✔
691
  if ((code = tNameFromString(&fname, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) < 0) goto _exit;
22!
692
  if (*(char *)tNameGetTableName(&fname) == 0) goto _exit;
22!
693
  code = 0;
22✔
694
_exit:
22✔
695
  TAOS_RETURN(code);
22✔
696
}
697

698
static int32_t mndCheckRsmaConflicts(SMnode *pMnode, SDbObj *pDbObj, SMCreateRsmaReq *pCreate) {
17✔
699
  void     *pIter = NULL;
17✔
700
  SSdb     *pSdb = pMnode->pSdb;
17✔
701
  SRsmaObj *pObj = NULL;
17✔
702
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
24✔
703
    if (pObj->tbUid == pCreate->tbUid && pObj->dbUid == pDbObj->uid) {
12!
704
      sdbCancelFetch(pSdb, (pIter));
5✔
705
      sdbRelease(pSdb, pObj);
5✔
706
      mError("rsma:%s, conflict with existing rsma %s on same table %s.%s:%" PRIi64, pCreate->name, pObj->name,
5!
707
             pObj->dbFName, pObj->tbName, pObj->tbUid);
708
      return TSDB_CODE_MND_RSMA_EXIST_IN_TABLE;
5✔
709
    }
710
    sdbRelease(pSdb, pObj);
7✔
711
  }
712
  return 0;
12✔
713
}
714

715
static int32_t mndProcessCreateRsmaReq(SRpcMsg *pReq) {
22✔
716
  int32_t         code = 0, lino = 0;
22✔
717
  SMnode         *pMnode = pReq->info.node;
22✔
718
  SDbObj         *pDb = NULL;
22✔
719
  SStbObj        *pStb = NULL;
22✔
720
  SRsmaObj       *pSma = NULL;
22✔
721
  SUserObj       *pUser = NULL;
22✔
722
  int64_t         mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
22✔
723
  SMCreateRsmaReq createReq = {0};
22✔
724

725
  TAOS_CHECK_EXIT(tDeserializeSMCreateRsmaReq(pReq->pCont, pReq->contLen, &createReq));
22!
726

727
  mInfo("start to create rsma: %s", createReq.name);
22!
728
  TAOS_CHECK_EXIT(mndCheckCreateRsmaReq(&createReq));
22!
729

730
  if ((pSma = mndAcquireRsma(pMnode, createReq.name))) {
22✔
731
    if (createReq.igExists) {
5!
UNCOV
732
      mInfo("rsma:%s, already exist, ignore exist is set", createReq.name);
×
UNCOV
733
      code = 0;
×
UNCOV
734
      goto _exit;
×
735
    } else {
736
      TAOS_CHECK_EXIT(TSDB_CODE_RSMA_ALREADY_EXISTS);
5!
737
    }
738
  } else {
739
    if ((code = terrno) == TSDB_CODE_RSMA_NOT_EXIST) {
17!
740
      // continue
741
    } else {  // TSDB_CODE_MND_RSMA_IN_CREATING | TSDB_CODE_MND_RSMA_IN_DROPPING | TSDB_CODE_APP_ERROR
UNCOV
742
      goto _exit;
×
743
    }
744
  }
745

746
  SName name = {0};
17✔
747
  TAOS_CHECK_EXIT(tNameFromString(&name, createReq.tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
17!
748
  char db[TSDB_TABLE_FNAME_LEN] = {0};
17✔
749
  (void)tNameGetFullDbName(&name, db);
17✔
750

751
  pDb = mndAcquireDb(pMnode, db);
17✔
752
  if (pDb == NULL) {
17!
UNCOV
753
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
754
  }
755

756
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pDb));
17!
757
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb));
17!
758

759
  pStb = mndAcquireStb(pMnode, createReq.tbFName);
17✔
760
  if (pStb == NULL) {
17!
UNCOV
761
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
762
  }
763

764
  TAOS_CHECK_EXIT(mndCheckRsmaConflicts(pMnode, pDb, &createReq));
17✔
765

766
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
12!
767
  TAOS_CHECK_EXIT(mndCreateRsma(pMnode, pReq, pUser, pDb, pStb, &createReq));
12!
768

769
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
12!
770

771
  auditRecord(pReq, pMnode->clusterId, "createRsma", createReq.name, createReq.tbFName, "", 0);
12✔
772
_exit:
22✔
773
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
22!
774
    mError("rsma:%s, failed at line %d to create since %s", createReq.name, lino, tstrerror(code));
10!
775
  }
776
  if (pSma) mndReleaseRsma(pMnode, pSma);
22✔
777
  if (pStb) mndReleaseStb(pMnode, pStb);
22✔
778
  if (pDb) mndReleaseDb(pMnode, pDb);
22✔
779
  tFreeSMCreateRsmaReq(&createReq);
22✔
780
  TAOS_RETURN(code);
22✔
781
}
782

783
#ifdef TD_ENTERPRISE
784
static int32_t mndFillRsmaInfo(SRsmaObj *pObj, SStbObj *pStb, SRsmaInfoRsp *pRsp, bool withColName) {
2✔
785
  int32_t code = 0, lino = 0;
2✔
786
  pRsp->id = pObj->uid;
2✔
787
  (void)snprintf(pRsp->name, sizeof(pRsp->name), "%s", pObj->name);
2✔
788
  (void)snprintf(pRsp->tbFName, sizeof(pRsp->tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
2✔
789
  pRsp->version = pObj->version;
2✔
790
  pRsp->tbType = pObj->tbType;
2✔
791
  pRsp->intervalUnit = pObj->intervalUnit;
2✔
792
  pRsp->nFuncs = pObj->nFuncs;
2✔
793
  pRsp->interval[0] = pObj->interval[0];
2✔
794
  pRsp->interval[1] = pObj->interval[1];
2✔
795
  if (pRsp->nFuncs > 0) {
2!
796
    pRsp->funcColIds = pObj->funcColIds;  // shallow copy, no need to free
2✔
797
    pRsp->funcIds = pObj->funcIds;        // shallow copy, no need to free
2✔
798
    if (withColName) {
2!
799
      pRsp->colNames = taosArrayInit(pRsp->nFuncs, sizeof(char *));
2✔
800
      if (pRsp->colNames == NULL) {
2!
UNCOV
801
        TAOS_CHECK_EXIT(terrno);
×
802
      }
803
      pRsp->nColNames = pRsp->nFuncs;
2✔
804
      int16_t i = 0, j = 0;
2✔
805
      for (; i < pRsp->nFuncs; ++i) {
15✔
806
        bool found = false;
13✔
807
        for (; j < pStb->numOfColumns;) {
27!
808
          if (pStb->pColumns[j].colId == pRsp->funcColIds[i]) {
27✔
809
            found = true;
13✔
810
            break;
13✔
811
          } else if (pStb->pColumns[j].colId < pRsp->funcColIds[i]) {
14!
812
            ++j;
14✔
813
          } else {
UNCOV
814
            break;
×
815
          }
816
        }
817
        if (found) {
13!
818
          SSchema *pCol = pStb->pColumns + j;
13✔
819
          char    *colName = taosStrdup(pCol->name);
13!
820
          if (colName == NULL) {
13!
UNCOV
821
            TAOS_CHECK_EXIT(terrno);
×
822
          }
823
          if (!taosArrayPush(pRsp->colNames, &colName)) {
26!
UNCOV
824
            taosMemoryFree(colName);
×
UNCOV
825
            TAOS_CHECK_EXIT(terrno);
×
826
          }
827
        } else {
UNCOV
828
          TAOS_CHECK_EXIT(TSDB_CODE_MND_COLUMN_NOT_EXIST);
×
829
        }
830
      }
831
    }
832
  }
833
_exit:
2✔
834
  if (code != 0) {
2!
UNCOV
835
    mError("rsma:%s, failed at line %d to get rsma info since %s", pObj->name, lino, tstrerror(code));
×
836
  }
837
  TAOS_RETURN(code);
2✔
838
}
839
#endif
840

841
static int32_t mndProcessGetRsmaReq(SRpcMsg *pReq) {
3✔
842
#ifdef TD_ENTERPRISE
843
  int32_t      code = 0, lino = 0;
3✔
844
  SMnode      *pMnode = pReq->info.node;
3✔
845
  SRsmaInfoReq req = {0};
3✔
846
  SRsmaInfoRsp rsp = {0};
3✔
847
  SRsmaObj    *pObj = NULL;
3✔
848
  SStbObj     *pStb = NULL;
3✔
849
  void        *pRsp = NULL;
3✔
850
  int32_t      contLen = 0;
3✔
851

852
  TAOS_CHECK_EXIT(tDeserializeRsmaInfoReq(pReq->pCont, pReq->contLen, &req));
3!
853

854
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
3✔
855
    TAOS_CHECK_EXIT(terrno);
1!
856
  }
857

858
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
2✔
859
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
2✔
860

861
  if( (pStb = mndAcquireStb(pMnode, tbFName)) == NULL) {
2!
UNCOV
862
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
863
  }
864

865
  TAOS_CHECK_EXIT(mndFillRsmaInfo(pObj, pStb, &rsp, req.withColName));
2!
866

867
  if ((contLen = tSerializeRsmaInfoRsp(NULL, 0, &rsp)) < 0) {
2!
UNCOV
868
    TAOS_CHECK_EXIT(contLen);
×
869
  }
870
  if (!(pRsp = rpcMallocCont(contLen))) {
2!
UNCOV
871
    TAOS_CHECK_EXIT(terrno);
×
872
  }
873
  if ((contLen = tSerializeRsmaInfoRsp(pRsp, contLen, &rsp)) < 0) {
2!
UNCOV
874
    TAOS_CHECK_EXIT(contLen);
×
875
  }
876

877
  pReq->info.rsp = pRsp;
2✔
878
  pReq->info.rspLen = contLen;
2✔
879

880
_exit:
3✔
881
  if (code != 0) {
3✔
882
    rpcFreeCont(pRsp);
1✔
883
  }
884
  if (pObj) mndReleaseRsma(pMnode, pObj);
3✔
885
  if (pStb) mndReleaseStb(pMnode, pStb);
3✔
886
  tFreeRsmaInfoRsp(&rsp, false);
3✔
887
  TAOS_RETURN(code);
3✔
888
#else
889
  return TSDB_CODE_OPS_NOT_SUPPORT;
890
#endif
891
}
892

893
static void mndRetrieveRsmaFuncList(SMnode *pMnode, SRsmaObj *pObj, char *buf, int32_t bufLen) {
27✔
894
  SSdb    *pSdb = pMnode->pSdb;
27✔
895
  int32_t  numOfRows = 0;
27✔
896
  SStbObj *pStb = NULL;
27✔
897
  char    *qBuf = POINTER_SHIFT(buf, VARSTR_HEADER_SIZE);
27✔
898
  int32_t  qBufLen = bufLen - VARSTR_HEADER_SIZE;
27✔
899

900
  qBuf[0] = 0;
27✔
901
  varDataSetLen(buf, 0);  // initialize to empty string
27✔
902

903
  if (pObj->nFuncs <= 0) return;
27!
904

905
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
27✔
906
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
27✔
907
  pStb = mndAcquireStb(pMnode, tbFName);
27✔
908
  if (pStb == NULL) {
27!
UNCOV
909
    mWarn("rsma:%s, failed to acquire table %s for function list", pObj->name, tbFName);
×
UNCOV
910
    return;
×
911
  }
912

913
  SSchema *pColumns = pStb->pColumns;
27✔
914

915
  int32_t  len = 0, j = 0;
27✔
916
  char     colFunc[TSDB_COL_NAME_LEN + TSDB_FUNC_NAME_LEN + 2] = {0};
27✔
917
  for (int32_t i = 0; i < pObj->nFuncs; ++i) {
203✔
918
    col_id_t colId = pObj->funcColIds[i];
176✔
919
    for (; j < pStb->numOfColumns;) {
366!
920
      if (pColumns[j].colId == colId) {
366✔
921
        int32_t colFuncLen =
176✔
922
            tsnprintf(colFunc, sizeof(colFunc), "%s(%s),", fmGetFuncName(pObj->funcIds[i]), pColumns[j].name);
176✔
923
        if ((qBufLen - len) > colFuncLen) {
176!
924
          len += tsnprintf(qBuf + len, colFuncLen + 1, "%s", colFunc);
176✔
925
        } else {
UNCOV
926
          goto _exit;
×
927
        }
928
        break;
176✔
929
      } else if (pColumns[j].colId > colId) {
190!
930
        break;
×
931
      } else {
932
        ++j;
190✔
933
      }
934
    }
935
  }
936
_exit:
27✔
937
  qBuf[len > 0 ? len - 1 : 0] = 0;  // remove the last ','
27!
938
  varDataSetLen(buf, len > 0 ? len - 1 : 0);
27!
939
  mndReleaseStb(pMnode, pStb);
27✔
940
}
941

942
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
17✔
943
  SMnode          *pMnode = pReq->info.node;
17✔
944
  int32_t          code = 0, lino = 0;
17✔
945
  int32_t          numOfRows = 0;
17✔
946
  int32_t          cols = 0;
17✔
947
  char             tmp[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE];
948
  int32_t          tmpLen = 0;
17✔
949
  int32_t          bufLen = 0;
17✔
950
  char            *pBuf = NULL;
17✔
951
  char            *qBuf = NULL;
17✔
952
  void            *pIter = NULL;
17✔
953
  SSdb            *pSdb = pMnode->pSdb;
17✔
954
  SColumnInfoData *pColInfo = NULL;
17✔
955

956
  pBuf = tmp;
17✔
957
  bufLen = sizeof(tmp) - VARSTR_HEADER_SIZE;
17✔
958
  if (pShow->numOfRows < 1) {
17!
959
    SRsmaObj *pObj = NULL;
17✔
960
    int32_t   index = 0;
17✔
961
    while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
44✔
962
      cols = 0;
27✔
963
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
27✔
964
      qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
27✔
965
      TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->name));
27✔
966
      varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
27✔
967
      COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
27!
968

969
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27!
970
        COL_DATA_SET_VAL_GOTO((const char *)(&pObj->uid), false, pObj, pIter, _exit);
27!
971
      }
972

973
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27!
974
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
27✔
975
        const char *db = strchr(pObj->dbFName, '.');
27✔
976
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", db ? db + 1 : pObj->dbFName));
27!
977
        varDataSetLen(pBuf, strlen(qBuf));
27✔
978
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
27!
979
      }
980

981
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27!
982
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
27✔
983
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->tbName));
27✔
984
        varDataSetLen(pBuf, strlen(qBuf));
27✔
985
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
27!
986
      }
987

988
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27!
989
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
27✔
990
        if (pObj->tbType == TSDB_SUPER_TABLE) {
27!
991
          TAOS_UNUSED(snprintf(qBuf, bufLen, "SUPER_TABLE"));
27✔
UNCOV
992
        } else if (pObj->tbType == TSDB_NORMAL_TABLE) {
×
UNCOV
993
          TAOS_UNUSED(snprintf(qBuf, bufLen, "NORMAL_TABLE"));
×
UNCOV
994
        } else if (pObj->tbType == TSDB_CHILD_TABLE) {
×
UNCOV
995
          TAOS_UNUSED(snprintf(qBuf, bufLen, "CHILD_TABLE"));
×
996
        } else {
UNCOV
997
          TAOS_UNUSED(snprintf(qBuf, bufLen, "UNKNOWN"));
×
998
        }
999
        varDataSetLen(pBuf, strlen(qBuf));
27✔
1000
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
27!
1001
      }
1002

1003
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27!
1004
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->createdTime, false, pObj, pIter, _exit);
27!
1005
      }
1006

1007
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27!
1008
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
27✔
1009
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%" PRIi64 "%c", pObj->interval[0], pObj->intervalUnit));
27✔
1010
        if (pObj->interval[1] > 0) {
27!
1011
          tmpLen = strlen(qBuf);
27✔
1012
          TAOS_UNUSED(
27✔
1013
              snprintf(qBuf + tmpLen, bufLen - tmpLen, ",%" PRIi64 "%c", pObj->interval[1], pObj->intervalUnit));
1014
        }
1015
        varDataSetLen(pBuf, strlen(qBuf));
27✔
1016
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
27!
1017
      }
1018

1019
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
27!
1020
        mndRetrieveRsmaFuncList(pMnode, pObj, pBuf, bufLen);
27✔
1021
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
27!
1022
      }
1023

1024
      sdbRelease(pSdb, pObj);
27✔
1025
      ++numOfRows;
27✔
1026
    }
1027
  }
1028

1029
  pShow->numOfRows += numOfRows;
17✔
1030

1031
_exit:
17✔
1032
  if (code < 0) {
17!
UNCOV
1033
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1034
    TAOS_RETURN(code);
×
1035
  }
1036
  return numOfRows;
17✔
1037
}
1038

UNCOV
1039
static void mndCancelRetrieveRsma(SMnode *pMnode, void *pIter) {
×
UNCOV
1040
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1041
  sdbCancelFetchByType(pSdb, pIter, SDB_RSMA);
×
UNCOV
1042
}
×
1043

1044
int32_t mndDropRsmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,931✔
1045
  int32_t   code = 0;
1,931✔
1046
  SSdb     *pSdb = pMnode->pSdb;
1,931✔
1047
  SRsmaObj *pObj = NULL;
1,931✔
1048
  void     *pIter = NULL;
1,931✔
1049

1050
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
1,937✔
1051
    if (pObj->dbUid == pDb->uid) {
6!
1052
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
6!
UNCOV
1053
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
1054
        sdbRelease(pSdb, pObj);
×
UNCOV
1055
        TAOS_RETURN(code);
×
1056
      }
1057
    }
1058
    sdbRelease(pSdb, pObj);
6✔
1059
  }
1060

1061
  TAOS_RETURN(code);
1,931✔
1062
}
1063

1064
int32_t mndDropRsmaByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
1,020✔
1065
  int32_t   code = 0;
1,020✔
1066
  SSdb     *pSdb = pMnode->pSdb;
1,020✔
1067
  SRsmaObj *pObj = NULL;
1,020✔
1068
  void     *pIter = NULL;
1,020✔
1069

1070
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
1,021✔
1071
    if (pObj->tbUid == pStb->uid && pObj->dbUid == pStb->dbUid) {
1!
1072
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
1!
UNCOV
1073
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
1074
        sdbRelease(pSdb, pObj);
×
UNCOV
1075
        TAOS_RETURN(code);
×
1076
      }
1077
    }
1078
    sdbRelease(pSdb, pObj);
1✔
1079
  }
1080

1081
  TAOS_RETURN(code);
1,020✔
1082
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc