• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4851

14 Nov 2025 08:06AM UTC coverage: 63.754% (+0.03%) from 63.728%
#4851

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

354 of 675 new or added lines in 18 files covered. (52.44%)

3145 existing lines in 113 files now uncovered.

149128 of 233910 relevant lines covered (63.75%)

117183401.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.23
/source/dnode/mnode/impl/src/mndRsma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndInfoSchema.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndShow.h"
25
#include "mndSma.h"
26
#include "mndStb.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "parser.h"
31
#include "tname.h"
32

33
#define MND_RSMA_VER_NUMBER   1
34
#define MND_RSMA_RESERVE_SIZE 64
35

36
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pSma);
37
static SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw);
38
static int32_t  mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pSma);
39
static int32_t  mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pSpSmatb);
40
static int32_t  mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew);
41
static int32_t  mndProcessCreateRsmaReq(SRpcMsg *pReq);
42
static int32_t  mndProcessDropRsmaReq(SRpcMsg *pReq);
43
static int32_t  mndProcessAlterRsmaReq(SRpcMsg *pReq);
44
static int32_t  mndProcessGetRsmaReq(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelRetrieveRsma(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveRsmaTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelRetrieveRsmaTask(SMnode *pMnode, void *pIter);
50

51
int32_t mndInitRsma(SMnode *pMnode) {
490,510✔
52
  SSdbTable table = {
490,510✔
53
      .sdbType = SDB_RSMA,
54
      .keyType = SDB_KEY_BINARY,
55
      .encodeFp = (SdbEncodeFp)mndRsmaActionEncode,
56
      .decodeFp = (SdbDecodeFp)mndRsmaActionDecode,
57
      .insertFp = (SdbInsertFp)mndRsmaActionInsert,
58
      .updateFp = (SdbUpdateFp)mndRsmaActionUpdate,
59
      .deleteFp = (SdbDeleteFp)mndRsmaActionDelete,
60
  };
61

62
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_RSMA, mndProcessCreateRsmaReq);
490,510✔
63
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_RSMA_RSP, mndTransProcessRsp);
490,510✔
64
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_RSMA, mndProcessDropRsmaReq);
490,510✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_RSMA_RSP, mndTransProcessRsp);
490,510✔
66
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_RSMA, mndProcessAlterRsmaReq);
490,510✔
67
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_RSMA_RSP, mndTransProcessRsp);
490,510✔
68
  mndSetMsgHandle(pMnode, TDMT_MND_GET_RSMA, mndProcessGetRsmaReq);
490,510✔
69
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndRetrieveRsma);
490,510✔
70
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndCancelRetrieveRsma);
490,510✔
71

72
  return sdbSetTable(pMnode->pSdb, table);
490,510✔
73
}
74

75
void mndCleanupRsma(SMnode *pMnode) {}
489,696✔
76

77
void mndRsmaFreeObj(SRsmaObj *pObj) {
132,784✔
78
  if (pObj) {
132,784✔
79
    taosMemoryFreeClear(pObj->funcColIds);
132,784✔
80
    taosMemoryFreeClear(pObj->funcIds);
132,784✔
81
  }
82
}
132,784✔
83

84
static int32_t tSerializeSRsmaObj(void *buf, int32_t bufLen, const SRsmaObj *pObj) {
203,808✔
85
  int32_t  code = 0, lino = 0;
203,808✔
86
  int32_t  tlen = 0;
203,808✔
87
  SEncoder encoder = {0};
203,808✔
88
  tEncoderInit(&encoder, buf, bufLen);
203,808✔
89

90
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
203,808✔
91

92
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->name));
407,616✔
93
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->tbName));
407,616✔
94
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbFName));
407,616✔
95
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->createUser));
407,616✔
96
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->createdTime));
407,616✔
97
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->updateTime));
407,616✔
98
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->uid));
407,616✔
99
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->tbUid));
407,616✔
100
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
407,616✔
101
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[0]));
407,616✔
102
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[1]));
407,616✔
103
  TAOS_CHECK_EXIT(tEncodeU64v(&encoder, pObj->reserved));
407,616✔
104
  TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->version));
407,616✔
105
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->tbType));
407,616✔
106
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->intervalUnit));
407,616✔
107
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nFuncs));
407,616✔
108
  for (int16_t i = 0; i < pObj->nFuncs; ++i) {
1,346,368✔
109
    TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->funcColIds[i]));
2,285,120✔
110
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->funcIds[i]));
2,285,120✔
111
  }
112

113
  tEndEncode(&encoder);
203,808✔
114

115
  tlen = encoder.pos;
203,808✔
116
_exit:
203,808✔
117
  tEncoderClear(&encoder);
203,808✔
118
  if (code < 0) {
203,808✔
119
    mError("rsma, %s failed at line %d since %s", __func__, lino, tstrerror(code));
×
120
    TAOS_RETURN(code);
×
121
  }
122

123
  return tlen;
203,808✔
124
}
125

126
static int32_t tDeserializeSRsmaObj(void *buf, int32_t bufLen, SRsmaObj *pObj) {
97,272✔
127
  int32_t  code = 0, lino = 0;
97,272✔
128
  SDecoder decoder = {0};
97,272✔
129
  tDecoderInit(&decoder, buf, bufLen);
97,272✔
130

131
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
97,272✔
132

133
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->name));
97,272✔
134
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->tbName));
97,272✔
135
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbFName));
97,272✔
136
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->createUser));
97,272✔
137
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->createdTime));
194,544✔
138
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->updateTime));
194,544✔
139
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->uid));
194,544✔
140
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->tbUid));
194,544✔
141
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
194,544✔
142
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[0]));
194,544✔
143
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[1]));
194,544✔
144
  TAOS_CHECK_EXIT(tDecodeU64v(&decoder, &pObj->reserved));
194,544✔
145
  TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->version));
194,544✔
146
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->tbType));
194,544✔
147
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->intervalUnit));
194,544✔
148
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nFuncs));
194,544✔
149
  if (pObj->nFuncs > 0) {
97,272✔
150
    if (!(pObj->funcColIds = taosMemoryMalloc(sizeof(col_id_t) * pObj->nFuncs))) {
97,272✔
151
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
152
    }
153
    if (!(pObj->funcIds = taosMemoryMalloc(sizeof(int32_t) * pObj->nFuncs))) {
97,272✔
154
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
155
    }
156
    for (int16_t i = 0; i < pObj->nFuncs; ++i) {
648,480✔
157
      TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->funcColIds[i]));
1,102,416✔
158
      TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->funcIds[i]));
1,102,416✔
159
    }
160
  }
161

162
_exit:
97,272✔
163
  tEndDecode(&decoder);
97,272✔
164
  tDecoderClear(&decoder);
97,272✔
165
  if (code < 0) {
97,272✔
166
    mError("rsma, %s failed at line %d since %s, row:%p", __func__, lino, tstrerror(code), pObj);
×
167
  }
168
  TAOS_RETURN(code);
97,272✔
169
}
170

171
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pObj) {
101,904✔
172
  int32_t  code = 0, lino = 0;
101,904✔
173
  void    *buf = NULL;
101,904✔
174
  SSdbRaw *pRaw = NULL;
101,904✔
175
  int32_t  tlen = tSerializeSRsmaObj(NULL, 0, pObj);
101,904✔
176
  if (tlen < 0) {
101,904✔
177
    TAOS_CHECK_EXIT(tlen);
×
178
  }
179

180
  int32_t size = sizeof(int32_t) + tlen;
101,904✔
181
  pRaw = sdbAllocRaw(SDB_RSMA, MND_RSMA_VER_NUMBER, size);
101,904✔
182
  if (pRaw == NULL) {
101,904✔
183
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
184
  }
185

186
  buf = taosMemoryMalloc(tlen);
101,904✔
187
  if (buf == NULL) {
101,904✔
188
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
189
  }
190

191
  tlen = tSerializeSRsmaObj(buf, tlen, pObj);
101,904✔
192
  if (tlen < 0) {
101,904✔
193
    TAOS_CHECK_EXIT(tlen);
×
194
  }
195

196
  int32_t dataPos = 0;
101,904✔
197
  SDB_SET_INT32(pRaw, dataPos, tlen, _exit);
101,904✔
198
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _exit);
101,904✔
199
  SDB_SET_DATALEN(pRaw, dataPos, _exit);
101,904✔
200

201
_exit:
101,904✔
202
  taosMemoryFreeClear(buf);
101,904✔
203
  if (code != TSDB_CODE_SUCCESS) {
101,904✔
204
    terrno = code;
×
205
    mError("rsma, failed at line %d to encode to raw:%p since %s", lino, pRaw, tstrerror(code));
×
206
    sdbFreeRaw(pRaw);
×
207
    return NULL;
×
208
  }
209

210
  mTrace("rsma, encode to raw:%p, row:%p", pRaw, pObj);
101,904✔
211
  return pRaw;
101,904✔
212
}
213

214
SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw) {
97,272✔
215
  int32_t   code = 0, lino = 0;
97,272✔
216
  SSdbRow  *pRow = NULL;
97,272✔
217
  SRsmaObj *pObj = NULL;
97,272✔
218
  void     *buf = NULL;
97,272✔
219

220
  int8_t sver = 0;
97,272✔
221
  TAOS_CHECK_EXIT(sdbGetRawSoftVer(pRaw, &sver));
97,272✔
222

223
  if (sver != MND_RSMA_VER_NUMBER) {
97,272✔
UNCOV
224
    mError("rsma read invalid ver, data ver: %d, curr ver: %d", sver, MND_RSMA_VER_NUMBER);
×
NEW
225
    TAOS_CHECK_EXIT(TSDB_CODE_SDB_INVALID_DATA_VER);
×
226
  }
227

228
  if (!(pRow = sdbAllocRow(sizeof(SRsmaObj)))) {
97,272✔
NEW
229
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
230
  }
231

232
  if (!(pObj = sdbGetRowObj(pRow))) {
97,272✔
NEW
233
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
234
  }
235

236
  int32_t tlen;
97,272✔
237
  int32_t dataPos = 0;
97,272✔
238
  SDB_GET_INT32(pRaw, dataPos, &tlen, _exit);
97,272✔
239
  buf = taosMemoryMalloc(tlen + 1);
97,272✔
240
  if (buf == NULL) {
97,272✔
NEW
241
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
242
  }
243
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _exit);
97,272✔
244

245
  TAOS_CHECK_EXIT(tDeserializeSRsmaObj(buf, tlen, pObj));
97,272✔
246

247
  taosInitRWLatch(&pObj->lock);
97,272✔
248

249
_exit:
97,272✔
250
  taosMemoryFreeClear(buf);
97,272✔
251
  if (code != TSDB_CODE_SUCCESS) {
97,272✔
UNCOV
252
    terrno = code;
×
UNCOV
253
    mError("rsma, failed at line %d to decode from raw:%p since %s", lino, pRaw, tstrerror(code));
×
UNCOV
254
    mndRsmaFreeObj(pObj);
×
UNCOV
255
    taosMemoryFreeClear(pRow);
×
UNCOV
256
    return NULL;
×
257
  }
258
  mTrace("rsma, decode from raw:%p, row:%p", pRaw, pObj);
97,272✔
259
  return pRow;
97,272✔
260
}
261

262
static int32_t mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pObj) {
23,160✔
263
  mTrace("rsma:%s, perform insert action, row:%p", pObj->name, pObj);
23,160✔
264
  return 0;
23,160✔
265
}
266

267
static int32_t mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pObj) {
97,272✔
268
  mTrace("rsma:%s, perform delete action, row:%p", pObj->name, pObj);
97,272✔
269
  mndRsmaFreeObj(pObj);
97,272✔
270
  return 0;
97,272✔
271
}
272

273
static int32_t mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew) {
52,496✔
274
  mTrace("rsma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
52,496✔
275
  taosWLockLatch(&pOld->lock);
52,496✔
276
  pOld->updateTime = pNew->updateTime;
52,496✔
277
  pOld->nFuncs = pNew->nFuncs;
52,496✔
278
  TSWAP(pOld->funcColIds, pNew->funcColIds);
52,496✔
279
  TSWAP(pOld->funcIds, pNew->funcIds);
52,496✔
280
  taosWUnLockLatch(&pOld->lock);
52,496✔
281
  return 0;
52,496✔
282
}
283

284
SRsmaObj *mndAcquireRsma(SMnode *pMnode, char *name) {
101,904✔
285
  SSdb     *pSdb = pMnode->pSdb;
101,904✔
286
  SRsmaObj *pObj = sdbAcquire(pSdb, SDB_RSMA, name);
101,904✔
287
  if (pObj == NULL) {
101,904✔
288
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
41,688✔
289
      terrno = TSDB_CODE_RSMA_NOT_EXIST;
41,688✔
UNCOV
290
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
UNCOV
291
      terrno = TSDB_CODE_MND_RSMA_IN_CREATING;
×
UNCOV
292
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
UNCOV
293
      terrno = TSDB_CODE_MND_RSMA_IN_DROPPING;
×
294
    } else {
UNCOV
295
      terrno = TSDB_CODE_APP_ERROR;
×
UNCOV
296
      mFatal("rsma:%s, failed to acquire rsma since %s", name, terrstr());
×
297
    }
298
  }
299
  return pObj;
101,904✔
300
}
301

302
void mndReleaseRsma(SMnode *pMnode, SRsmaObj *pSma) {
60,216✔
303
  SSdb *pSdb = pMnode->pSdb;
60,216✔
304
  sdbRelease(pSdb, pSma);
60,216✔
305
}
60,216✔
306
#ifdef TD_ENTERPRISE
307
static int32_t mndSetCreateRsmaRedoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
UNCOV
308
  int32_t  code = 0;
×
309
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
×
UNCOV
310
  if (pRedoRaw == NULL) {
×
UNCOV
311
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
312
    if (terrno != 0) code = terrno;
×
313
    TAOS_RETURN(code);
×
314
  }
315
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
316
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
317

318
  TAOS_RETURN(code);
×
319
}
320

321
static int32_t mndSetCreateRsmaUndoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
322
  int32_t  code = 0;
×
UNCOV
323
  SSdbRaw *pUndoRaw = mndRsmaActionEncode(pSma);
×
UNCOV
324
  if (!pUndoRaw) {
×
UNCOV
325
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
326
    if (terrno != 0) code = terrno;
×
UNCOV
327
    TAOS_RETURN(code);
×
328
  }
UNCOV
329
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
UNCOV
330
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
UNCOV
331
  TAOS_RETURN(code);
×
332
}
333

334
static int32_t mndSetCreateRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
23,160✔
335
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
23,160✔
336
  if (pDbRaw == NULL) return -1;
23,160✔
337

338
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
23,160✔
339
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
23,160✔
340
  return 0;
23,160✔
341
}
342

343
static void *mndBuildVCreateRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
46,320✔
344
                                    SMCreateRsmaReq *pCreate, int32_t *pContLen) {
345
  int32_t         code = 0, lino = 0;
46,320✔
346
  SMsgHead       *pHead = NULL;
46,320✔
347
  SVCreateRsmaReq req = *pCreate;
46,320✔
348

349
  req.uid = pObj->uid;  // use the uid generated by mnode
46,320✔
350

351
  int32_t contLen = tSerializeSVCreateRsmaReq(NULL, 0, &req);
46,320✔
352
  TAOS_CHECK_EXIT(contLen);
46,320✔
353
  contLen += sizeof(SMsgHead);
46,320✔
354
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
46,320✔
355
  pHead->contLen = htonl(contLen);
46,320✔
356
  pHead->vgId = htonl(pVgroup->vgId);
46,320✔
357
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
46,320✔
358
  TAOS_CHECK_EXIT(tSerializeSVCreateRsmaReq(pBuf, contLen, &req));
46,320✔
359
_exit:
46,320✔
360
  if (code < 0) {
46,320✔
UNCOV
361
    taosMemoryFreeClear(pHead);
×
UNCOV
362
    terrno = code;
×
UNCOV
363
    *pContLen = 0;
×
UNCOV
364
    return NULL;
×
365
  }
366
  *pContLen = contLen;
46,320✔
367
  return pHead;
46,320✔
368
}
369

370
static int32_t mndSetCreateRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
23,160✔
371
                                           SMCreateRsmaReq *pCreate) {
372
  int32_t code = 0;
23,160✔
373
  SSdb   *pSdb = pMnode->pSdb;
23,160✔
374
  SVgObj *pVgroup = NULL;
23,160✔
375
  void   *pIter = NULL;
23,160✔
376

377
  SName name = {0};
23,160✔
378
  if ((code = tNameFromString(&name, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
23,160✔
UNCOV
379
    return code;
×
380
  }
381
  tstrncpy(pCreate->tbFName, (char *)tNameGetTableName(&name), sizeof(pCreate->tbFName));  // convert tbFName to tbName
23,160✔
382

383
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
115,800✔
384
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
92,640✔
385
      sdbRelease(pSdb, pVgroup);
46,320✔
386
      continue;
46,320✔
387
    }
388

389
    int32_t contLen = 0;
46,320✔
390
    void   *pReq = mndBuildVCreateRsmaReq(pMnode, pVgroup, pStb, pObj, pCreate, &contLen);
46,320✔
391
    if (pReq == NULL) {
46,320✔
UNCOV
392
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
393
      sdbRelease(pSdb, pVgroup);
×
UNCOV
394
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
395
      TAOS_RETURN(code);
×
396
    }
397

398
    STransAction action = {0};
46,320✔
399
    action.mTraceId = pTrans->mTraceId;
46,320✔
400
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
46,320✔
401
    action.pCont = pReq;
46,320✔
402
    action.contLen = contLen;
46,320✔
403
    action.msgType = TDMT_VND_CREATE_RSMA;
46,320✔
404
    action.acceptableCode = TSDB_CODE_RSMA_ALREADY_EXISTS;  // check whether the rsma uid exist
46,320✔
405
    action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;         // retry if relative table not exist
46,320✔
406
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
46,320✔
UNCOV
407
      taosMemoryFree(pReq);
×
UNCOV
408
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
409
      sdbRelease(pSdb, pVgroup);
×
UNCOV
410
      TAOS_RETURN(code);
×
411
    }
412
    sdbRelease(pSdb, pVgroup);
46,320✔
413
  }
414

415
  TAOS_RETURN(code);
23,160✔
416
}
417

418
static int32_t mndSetCreateRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
35,512✔
419
  int32_t  code = 0;
35,512✔
420
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
35,512✔
421
  if (pCommitRaw == NULL) {
35,512✔
UNCOV
422
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
423
    if (terrno != 0) code = terrno;
×
UNCOV
424
    TAOS_RETURN(code);
×
425
  }
426
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
35,512✔
427
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
35,512✔
428

429
  TAOS_RETURN(code);
35,512✔
430
}
431

432
static int32_t mndSetDropRsmaPrepareLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
4,632✔
433
  int32_t  code = 0;
4,632✔
434
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
4,632✔
435
  if (pRedoRaw == NULL) {
4,632✔
UNCOV
436
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
437
    if (terrno != 0) code = terrno;
×
UNCOV
438
    return -1;
×
439
  }
440
  TAOS_CHECK_RETURN(mndTransAppendPrepareLog(pTrans, pRedoRaw));
4,632✔
441
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
4,632✔
442

443
  return 0;
4,632✔
444
}
445

446
static int32_t mndSetDropRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
21,616✔
447
  int32_t  code = 0;
21,616✔
448
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
21,616✔
449
  if (pCommitRaw == NULL) {
21,616✔
UNCOV
450
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
451
    if (terrno != 0) code = terrno;
×
UNCOV
452
    return -1;
×
453
  }
454
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
21,616✔
455
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
21,616✔
456

457
  return 0;
21,616✔
458
}
459

460
static void *mndBuildVDropRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SRsmaObj *pObj, int32_t *pContLen) {
9,264✔
461
  int32_t       code = 0, lino = 0;
9,264✔
462
  SMsgHead     *pHead = NULL;
9,264✔
463
  SVDropRsmaReq req = {0};
9,264✔
464

465
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
9,264✔
466
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
9,264✔
467
  req.tbType = pObj->tbType;
9,264✔
468
  req.uid = pObj->uid;
9,264✔
469
  req.tbUid = pObj->tbUid;
9,264✔
470

471
  int32_t contLen = tSerializeSVDropRsmaReq(NULL, 0, &req);
9,264✔
472
  TAOS_CHECK_EXIT(contLen);
9,264✔
473
  contLen += sizeof(SMsgHead);
9,264✔
474
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
9,264✔
475
  pHead->contLen = htonl(contLen);
9,264✔
476
  pHead->vgId = htonl(pVgroup->vgId);
9,264✔
477
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
9,264✔
478
  TAOS_CHECK_EXIT(tSerializeSVDropRsmaReq(pBuf, contLen, &req));
9,264✔
479
_exit:
9,264✔
480
  if (code < 0) {
9,264✔
UNCOV
481
    taosMemoryFreeClear(pHead);
×
UNCOV
482
    terrno = code;
×
UNCOV
483
    *pContLen = 0;
×
UNCOV
484
    return NULL;
×
485
  }
486
  *pContLen = contLen;
9,264✔
487
  return pHead;
9,264✔
488
}
489

490
static int32_t mndSetDropRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SRsmaObj *pSma) {
4,632✔
491
  int32_t code = 0;
4,632✔
492
  SSdb   *pSdb = pMnode->pSdb;
4,632✔
493
  SVgObj *pVgroup = NULL;
4,632✔
494
  void   *pIter = NULL;
4,632✔
495

496
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
23,160✔
497
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
18,528✔
498
      sdbRelease(pSdb, pVgroup);
9,264✔
499
      continue;
9,264✔
500
    }
501

502
    int32_t contLen = 0;
9,264✔
503
    void   *pReq = mndBuildVDropRsmaReq(pMnode, pVgroup, pSma, &contLen);
9,264✔
504
    if (pReq == NULL) {
9,264✔
UNCOV
505
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
506
      sdbRelease(pSdb, pVgroup);
×
UNCOV
507
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
508
      TAOS_RETURN(code);
×
509
    }
510

511
    STransAction action = {0};
9,264✔
512
    action.mTraceId = pTrans->mTraceId;
9,264✔
513
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
9,264✔
514
    action.pCont = pReq;
9,264✔
515
    action.contLen = contLen;
9,264✔
516
    action.msgType = TDMT_VND_DROP_RSMA;
9,264✔
517
    action.acceptableCode = TSDB_CODE_RSMA_NOT_EXIST;
9,264✔
518
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
9,264✔
UNCOV
519
      taosMemoryFree(pReq);
×
UNCOV
520
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
521
      sdbRelease(pSdb, pVgroup);
×
UNCOV
522
      TAOS_RETURN(code);
×
523
    }
524
    sdbRelease(pSdb, pVgroup);
9,264✔
525
  }
526
  TAOS_RETURN(code);
4,632✔
527
}
528

529
static int32_t mndDropRsma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SRsmaObj *pObj) {
4,632✔
530
  int32_t code = 0, lino = 0;
4,632✔
531

532
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-rsma");
4,632✔
533
  if (pTrans == NULL) {
4,632✔
UNCOV
534
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
535
    if (terrno != 0) code = terrno;
×
UNCOV
536
    goto _exit;
×
537
  }
538

539
  mInfo("trans:%d start to drop rsma:%s", pTrans->id, pObj->name);
4,632✔
540

541
  mndTransSetDbName(pTrans, pDb->name, pObj->name);
4,632✔
542
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
4,632✔
543
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
4,632✔
544

545
  mndTransSetOper(pTrans, MND_OPER_DROP_RSMA);
4,632✔
546
  TAOS_CHECK_EXIT(mndSetDropRsmaPrepareLogs(pMnode, pTrans, pObj));
4,632✔
547
  TAOS_CHECK_EXIT(mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj));
4,632✔
548
  TAOS_CHECK_EXIT(mndSetDropRsmaRedoActions(pMnode, pTrans, pDb, pObj));
4,632✔
549

550
  // int32_t rspLen = 0;
551
  // void   *pRsp = NULL;
552
  // TAOS_CHECK_EXIT(mndBuildDropRsmaRsp(pObj, &rspLen, &pRsp, false));
553
  // mndTransSetRpcRsp(pTrans, pRsp, rspLen);
554

555
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
4,632✔
556
_exit:
4,632✔
557
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
4,632✔
UNCOV
558
    mError("rsma:%s, failed to drop at line:%d since %s", pObj->name, lino, tstrerror(code));
×
559
  }
560
  mndTransDrop(pTrans);
4,632✔
561
  TAOS_RETURN(code);
4,632✔
562
}
563
#endif
564
static int32_t mndProcessDropRsmaReq(SRpcMsg *pReq) {
4,632✔
565
  SMnode *pMnode = pReq->info.node;
4,632✔
566
  int32_t code = 0, lino = 0;
4,632✔
567
#ifdef TD_ENTERPRISE
568
  SDbObj       *pDb = NULL;
4,632✔
569
  SRsmaObj     *pObj = NULL;
4,632✔
570
  SMDropRsmaReq dropReq = {0};
4,632✔
571

572
  TAOS_CHECK_GOTO(tDeserializeSMDropRsmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _exit);
4,632✔
573

574
  mInfo("rsma:%s, start to drop", dropReq.name);
4,632✔
575

576
  pObj = mndAcquireRsma(pMnode, dropReq.name);
4,632✔
577
  if (pObj == NULL) {
4,632✔
UNCOV
578
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
579
    if (terrno != 0) code = terrno;
×
UNCOV
580
    if (dropReq.igNotExists) {
×
UNCOV
581
      code = 0;  // mndBuildDropMountRsp(pObj, &pReq->info.rspLen, &pReq->info.rsp, true);
×
582
    }
583
    goto _exit;
×
584
  }
585

586
  SName name = {0};
4,632✔
587
  TAOS_CHECK_EXIT(tNameFromString(&name, pObj->dbFName, T_NAME_ACCT | T_NAME_DB));
4,632✔
588

589
  char db[TSDB_TABLE_FNAME_LEN] = {0};
4,632✔
590
  (void)tNameGetFullDbName(&name, db);
4,632✔
591
  if (!(pDb = mndAcquireDb(pMnode, db))) {
4,632✔
UNCOV
592
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
593
  }
594

595
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _exit);
4,632✔
596

597
  code = mndDropRsma(pMnode, pReq, pDb, pObj);
4,632✔
598
  if (code == TSDB_CODE_SUCCESS) {
4,632✔
599
    code = TSDB_CODE_ACTION_IN_PROGRESS;
4,632✔
600
  }
601

602
  auditRecord(pReq, pMnode->clusterId, "dropRsma", dropReq.name, "", "", 0);
4,632✔
603
_exit:
4,632✔
604
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
4,632✔
UNCOV
605
    mError("rsma:%s, failed at line %d to drop since %s", dropReq.name, lino, tstrerror(code));
×
606
  }
607

608
  mndReleaseDb(pMnode, pDb);
4,632✔
609
  mndReleaseRsma(pMnode, pObj);
4,632✔
610
#endif
611
  TAOS_RETURN(code);
4,632✔
612
}
613
#ifdef TD_ENTERPRISE
614
static int32_t mndCreateRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
23,160✔
615
                             SMCreateRsmaReq *pCreate) {
616
  int32_t  code = 0, lino = 0;
23,160✔
617
  SRsmaObj obj = {0};
23,160✔
618
  STrans  *pTrans = NULL;
23,160✔
619

620
  (void)snprintf(obj.name, TSDB_TABLE_NAME_LEN, "%s", pCreate->name);
23,160✔
621
  (void)snprintf(obj.dbFName, TSDB_DB_FNAME_LEN, "%s", pDb->name);
23,160✔
622

623
  const char *tbName = strrchr(pCreate->tbFName, '.');
23,160✔
624
  (void)snprintf(obj.tbName, TSDB_TABLE_NAME_LEN, "%s", tbName ? tbName + 1 : pCreate->tbFName);
23,160✔
625
  (void)snprintf(obj.createUser, TSDB_USER_LEN, "%s", pUser->user);
23,160✔
626
  obj.createdTime = taosGetTimestampMs();
23,160✔
627
  obj.updateTime = obj.createdTime;
23,160✔
628
  obj.uid = mndGenerateUid(obj.name, strlen(obj.name));
23,160✔
629
  obj.tbUid = pCreate->tbUid;
23,160✔
630
  obj.dbUid = pDb->uid;
23,160✔
631
  obj.interval[0] = pCreate->interval[0];
23,160✔
632
  obj.interval[1] = pCreate->interval[1];
23,160✔
633
  obj.version = 1;
23,160✔
634
  obj.tbType = pCreate->tbType;  // ETableType: 1 stable. Only super table supported currently.
23,160✔
635
  obj.intervalUnit = pCreate->intervalUnit;
23,160✔
636
  obj.nFuncs = pCreate->nFuncs;
23,160✔
637
  if (obj.nFuncs > 0) {
23,160✔
638
    TSDB_CHECK_NULL((obj.funcColIds = taosMemoryCalloc(obj.nFuncs, sizeof(col_id_t))), code, lino, _exit, terrno);
23,160✔
639
    TSDB_CHECK_NULL((obj.funcIds = taosMemoryCalloc(obj.nFuncs, sizeof(func_id_t))), code, lino, _exit, terrno);
23,160✔
640
    for (int16_t i = 0; i < obj.nFuncs; ++i) {
151,312✔
641
      obj.funcColIds[i] = pCreate->funcColIds[i];
128,152✔
642
      obj.funcIds[i] = pCreate->funcIds[i];
128,152✔
643
    }
644
  }
645

646
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-rsma")),
23,160✔
647
                  code, lino, _exit, terrno);
648
  mInfo("trans:%d, used to create rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
23,160✔
649

650
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
23,160✔
651
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
23,160✔
652
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
23,160✔
653

654
  mndTransSetOper(pTrans, MND_OPER_CREATE_RSMA);
23,160✔
655
  TAOS_CHECK_EXIT(mndSetCreateRsmaPrepareActions(pMnode, pTrans, &obj));
23,160✔
656
  TAOS_CHECK_EXIT(mndSetCreateRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pCreate));
23,160✔
657
  TAOS_CHECK_EXIT(mndSetCreateRsmaCommitLogs(pMnode, pTrans, &obj));
23,160✔
658
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
23,160✔
659
_exit:
23,160✔
660
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
23,160✔
UNCOV
661
    mError("rsma:%s, failed at line %d to create rsma, since %s", obj.name, lino, tstrerror(code));
×
662
  }
663
  mndTransDrop(pTrans);
23,160✔
664
  mndRsmaFreeObj(&obj);
23,160✔
665
  TAOS_RETURN(code);
23,160✔
666
}
667

668
static int32_t mndCheckCreateRsmaReq(SMCreateRsmaReq *pCreate) {
38,600✔
669
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
38,600✔
670
  if (pCreate->name[0] == 0) goto _exit;
38,600✔
671
  if (pCreate->tbFName[0] == 0) goto _exit;
38,600✔
672
  if (pCreate->igExists < 0 || pCreate->igExists > 1) goto _exit;
38,600✔
673
  if (pCreate->intervalUnit < 0) goto _exit;
38,600✔
674
  if (pCreate->interval[0] < 0) goto _exit;
38,600✔
675
  if (pCreate->interval[1] < 0) goto _exit;
38,600✔
676
  if (pCreate->interval[0] == 0 && pCreate->interval[1] == 0) goto _exit;
38,600✔
677

678
  SName fname = {0};
38,600✔
679
  if ((code = tNameFromString(&fname, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) < 0) goto _exit;
38,600✔
680
  if (*(char *)tNameGetTableName(&fname) == 0) goto _exit;
38,600✔
681
  code = 0;
38,600✔
682
_exit:
38,600✔
683
  TAOS_RETURN(code);
38,600✔
684
}
685

686
static int32_t mndCheckRsmaConflicts(SMnode *pMnode, SDbObj *pDbObj, SMCreateRsmaReq *pCreate) {
30,880✔
687
  void     *pIter = NULL;
30,880✔
688
  SSdb     *pSdb = pMnode->pSdb;
30,880✔
689
  SRsmaObj *pObj = NULL;
30,880✔
690
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
43,232✔
691
    if (pObj->tbUid == pCreate->tbUid && pObj->dbUid == pDbObj->uid) {
20,072✔
692
      sdbCancelFetch(pSdb, (pIter));
7,720✔
693
      sdbRelease(pSdb, pObj);
7,720✔
694
      mError("rsma:%s, conflict with existing rsma %s on same table %s.%s:%" PRIi64, pCreate->name, pObj->name,
7,720✔
695
             pObj->dbFName, pObj->tbName, pObj->tbUid);
696
      return TSDB_CODE_MND_RSMA_EXIST_IN_TABLE;
7,720✔
697
    }
698
    sdbRelease(pSdb, pObj);
12,352✔
699
  }
700
  return 0;
23,160✔
701
}
702
#endif
703
static int32_t mndProcessCreateRsmaReq(SRpcMsg *pReq) {
38,600✔
704
  int32_t code = 0, lino = 0;
38,600✔
705
#ifdef TD_ENTERPRISE
706
  SMnode         *pMnode = pReq->info.node;
38,600✔
707
  SDbObj         *pDb = NULL;
38,600✔
708
  SStbObj        *pStb = NULL;
38,600✔
709
  SRsmaObj       *pSma = NULL;
38,600✔
710
  SUserObj       *pUser = NULL;
38,600✔
711
  int64_t         mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
38,600✔
712
  SMCreateRsmaReq createReq = {0};
38,600✔
713

714
  TAOS_CHECK_EXIT(tDeserializeSMCreateRsmaReq(pReq->pCont, pReq->contLen, &createReq));
38,600✔
715

716
  mInfo("start to create rsma: %s", createReq.name);
38,600✔
717
  TAOS_CHECK_EXIT(mndCheckCreateRsmaReq(&createReq));
38,600✔
718

719
  if ((pSma = mndAcquireRsma(pMnode, createReq.name))) {
38,600✔
720
    if (createReq.igExists) {
7,720✔
UNCOV
721
      mInfo("rsma:%s, already exist, ignore exist is set", createReq.name);
×
722
      code = 0;
×
UNCOV
723
      goto _exit;
×
724
    } else {
725
      TAOS_CHECK_EXIT(TSDB_CODE_RSMA_ALREADY_EXISTS);
7,720✔
726
    }
727
  } else {
728
    if ((code = terrno) == TSDB_CODE_RSMA_NOT_EXIST) {
30,880✔
729
      // continue
730
    } else {  // TSDB_CODE_MND_RSMA_IN_CREATING | TSDB_CODE_MND_RSMA_IN_DROPPING | TSDB_CODE_APP_ERROR
UNCOV
731
      goto _exit;
×
732
    }
733
  }
734

735
  SName name = {0};
30,880✔
736
  TAOS_CHECK_EXIT(tNameFromString(&name, createReq.tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
30,880✔
737
  char db[TSDB_TABLE_FNAME_LEN] = {0};
30,880✔
738
  (void)tNameGetFullDbName(&name, db);
30,880✔
739

740
  pDb = mndAcquireDb(pMnode, db);
30,880✔
741
  if (pDb == NULL) {
30,880✔
UNCOV
742
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
743
  }
744

745
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pDb));
30,880✔
746
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb));
30,880✔
747

748
  pStb = mndAcquireStb(pMnode, createReq.tbFName);
30,880✔
749
  if (pStb == NULL) {
30,880✔
UNCOV
750
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
751
  }
752

753
  TAOS_CHECK_EXIT(mndCheckRsmaConflicts(pMnode, pDb, &createReq));
30,880✔
754

755
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
23,160✔
756
  TAOS_CHECK_EXIT(mndCreateRsma(pMnode, pReq, pUser, pDb, pStb, &createReq));
23,160✔
757

758
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
23,160✔
759

760
  auditRecord(pReq, pMnode->clusterId, "createRsma", createReq.name, createReq.tbFName, "", 0);
23,160✔
761
_exit:
38,600✔
762
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
38,600✔
763
    mError("rsma:%s, failed at line %d to create since %s", createReq.name, lino, tstrerror(code));
15,440✔
764
  }
765
  if (pSma) mndReleaseRsma(pMnode, pSma);
38,600✔
766
  if (pStb) mndReleaseStb(pMnode, pStb);
38,600✔
767
  if (pDb) mndReleaseDb(pMnode, pDb);
38,600✔
768
  tFreeSMCreateRsmaReq(&createReq);
38,600✔
769
#endif
770
  TAOS_RETURN(code);
38,600✔
771
}
772

773
#ifdef TD_ENTERPRISE
774
static int32_t mndCheckAlterRsmaReq(SMAlterRsmaReq *pReq) {
12,352✔
775
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
12,352✔
776
  if (pReq->name[0] == 0) goto _exit;
12,352✔
777
  if (pReq->igNotExists < 0 || pReq->igNotExists > 1) goto _exit;
12,352✔
778

779
  code = 0;
12,352✔
780
_exit:
12,352✔
781
  TAOS_RETURN(code);
12,352✔
782
}
783

784
static int32_t mndSetAlterRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
12,352✔
785
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
12,352✔
786
  if (pDbRaw == NULL) return -1;
12,352✔
787

788
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
12,352✔
789
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_READY) != 0) return -1;
12,352✔
790
  return 0;
12,352✔
791
}
792

793
static int32_t mndSetAlterRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
12,352✔
794
  return mndSetCreateRsmaCommitLogs(pMnode, pTrans, pSma);
12,352✔
795
}
796

797
static void *mndBuildVAlterRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
24,704✔
798
                                   SMAlterRsmaReq *pAlter, int32_t *pContLen) {
799
  int32_t        code = 0, lino = 0;
24,704✔
800
  SMsgHead      *pHead = NULL;
24,704✔
801
  SVAlterRsmaReq req = {0};
24,704✔
802
  req.alterType = pAlter->alterType;
24,704✔
803
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
24,704✔
804
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
24,704✔
805
  req.tbType = pObj->tbType;
24,704✔
806
  req.intervalUnit = pObj->intervalUnit;
24,704✔
807
  req.interval[0] = pObj->interval[0];
24,704✔
808
  req.interval[1] = pObj->interval[1];
24,704✔
809
  req.tbUid = pObj->tbUid;
24,704✔
810
  req.uid = pObj->uid;
24,704✔
811
  req.nFuncs = pObj->nFuncs;
24,704✔
812
  req.funcColIds = pObj->funcColIds;
24,704✔
813
  req.funcIds = pObj->funcIds;
24,704✔
814

815
  int32_t contLen = tSerializeSVAlterRsmaReq(NULL, 0, &req);
24,704✔
816
  TAOS_CHECK_EXIT(contLen);
24,704✔
817
  contLen += sizeof(SMsgHead);
24,704✔
818
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
24,704✔
819
  pHead->contLen = htonl(contLen);
24,704✔
820
  pHead->vgId = htonl(pVgroup->vgId);
24,704✔
821
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
24,704✔
822
  TAOS_CHECK_EXIT(tSerializeSVAlterRsmaReq(pBuf, contLen, &req));
24,704✔
823
_exit:
24,704✔
824
  if (code < 0) {
24,704✔
UNCOV
825
    taosMemoryFreeClear(pHead);
×
UNCOV
826
    terrno = code;
×
UNCOV
827
    *pContLen = 0;
×
UNCOV
828
    return NULL;
×
829
  }
830
  *pContLen = contLen;
24,704✔
831
  return pHead;
24,704✔
832
}
833

834
static int32_t mndSetAlterRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
12,352✔
835
                                          SMAlterRsmaReq *pAlter) {
836
  int32_t code = 0;
12,352✔
837
  SSdb   *pSdb = pMnode->pSdb;
12,352✔
838
  SVgObj *pVgroup = NULL;
12,352✔
839
  void   *pIter = NULL;
12,352✔
840

841
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
61,760✔
842
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
49,408✔
843
      sdbRelease(pSdb, pVgroup);
24,704✔
844
      continue;
24,704✔
845
    }
846

847
    int32_t contLen = 0;
24,704✔
848
    void   *pReq = mndBuildVAlterRsmaReq(pMnode, pVgroup, pStb, pObj, pAlter, &contLen);
24,704✔
849
    if (pReq == NULL) {
24,704✔
UNCOV
850
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
851
      sdbRelease(pSdb, pVgroup);
×
UNCOV
852
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
853
      TAOS_RETURN(code);
×
854
    }
855

856
    STransAction action = {0};
24,704✔
857
    action.mTraceId = pTrans->mTraceId;
24,704✔
858
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
24,704✔
859
    action.pCont = pReq;
24,704✔
860
    action.contLen = contLen;
24,704✔
861
    action.msgType = TDMT_VND_ALTER_RSMA;
24,704✔
862
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
24,704✔
UNCOV
863
      taosMemoryFree(pReq);
×
UNCOV
864
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
865
      sdbRelease(pSdb, pVgroup);
×
UNCOV
866
      TAOS_RETURN(code);
×
867
    }
868
    sdbRelease(pSdb, pVgroup);
24,704✔
869
  }
870

871
  TAOS_RETURN(code);
12,352✔
872
}
873

874
static int32_t mndAlterRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
12,352✔
875
                            SMAlterRsmaReq *pAlter, SRsmaObj *pOld) {
876
  int32_t  code = 0, lino = 0;
12,352✔
877
  STrans  *pTrans = NULL;
12,352✔
878
  SRsmaObj obj = *pOld;
12,352✔
879

880
  obj.updateTime = taosGetTimestampMs();
12,352✔
881
  ++obj.version;
12,352✔
882
  if (pAlter->alterType == TSDB_ALTER_RSMA_FUNCTION) {
12,352✔
883
    obj.nFuncs = pOld->nFuncs + pAlter->nFuncs;
12,352✔
884
    obj.funcColIds = taosMemoryMalloc(obj.nFuncs * sizeof(col_id_t));
12,352✔
885
    obj.funcIds = taosMemoryMalloc(obj.nFuncs * sizeof(func_id_t));
12,352✔
886
    if (obj.funcColIds == NULL || obj.funcIds == NULL) {
12,352✔
UNCOV
887
      TAOS_CHECK_EXIT(terrno);
×
888
    }
889
    int32_t n = 0, i = 0, j = 0;
12,352✔
890
    while (i < pOld->nFuncs && j < pAlter->nFuncs) {
66,392✔
891
      if (pOld->funcColIds[i] < pAlter->funcColIds[j]) {
54,040✔
892
        obj.funcColIds[n] = pOld->funcColIds[i];
46,320✔
893
        obj.funcIds[n++] = pOld->funcIds[i++];
46,320✔
894
      } else if (pOld->funcColIds[i] > pAlter->funcColIds[j]) {
7,720✔
895
        obj.funcColIds[n] = pAlter->funcColIds[j];
7,720✔
896
        obj.funcIds[n++] = pAlter->funcIds[j++];
7,720✔
897
      } else {
UNCOV
898
        mError("rsma:%s, conflict function on column id:%d", pOld->name, pAlter->funcColIds[j]);
×
UNCOV
899
        TAOS_CHECK_EXIT(TSDB_CODE_MND_RSMA_FUNC_CONFLICT);
×
900
      }
901
    }
902
    if (i < pOld->nFuncs) {
12,352✔
903
      while (i < pOld->nFuncs) {
15,440✔
904
        obj.funcColIds[n] = pOld->funcColIds[i];
7,720✔
905
        obj.funcIds[n++] = pOld->funcIds[i++];
7,720✔
906
      }
907
    } else if (j < pAlter->nFuncs) {
4,632✔
908
      while (j < pAlter->nFuncs) {
9,264✔
909
        obj.funcColIds[n] = pAlter->funcColIds[j];
4,632✔
910
        obj.funcIds[n++] = pAlter->funcIds[j++];
4,632✔
911
      }
912
    }
913
  } else {
UNCOV
914
    TAOS_CHECK_EXIT(TSDB_CODE_OPS_NOT_SUPPORT);
×
915
  }
916

917
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-rsma")), code,
12,352✔
918
                  lino, _exit, terrno);
919
  mInfo("trans:%d, used to alter rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
12,352✔
920

921
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
12,352✔
922
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
12,352✔
923
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
12,352✔
924

925
  mndTransSetOper(pTrans, MND_OPER_ALTER_RSMA);
12,352✔
926
  TAOS_CHECK_EXIT(mndSetAlterRsmaPrepareActions(pMnode, pTrans, &obj));
12,352✔
927
  TAOS_CHECK_EXIT(mndSetAlterRsmaCommitLogs(pMnode, pTrans, &obj));
12,352✔
928
  TAOS_CHECK_EXIT(mndSetAlterRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pAlter));
12,352✔
929

930
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
12,352✔
931
_exit:
12,352✔
932
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
12,352✔
UNCOV
933
    mError("rsma:%s, failed at line %d to alter rsma, since %s", obj.name, lino, tstrerror(code));
×
934
  }
935
  mndTransDrop(pTrans);
12,352✔
936
  mndRsmaFreeObj(&obj);
12,352✔
937
  TAOS_RETURN(code);
12,352✔
938
}
939
#endif
940
static int32_t mndProcessAlterRsmaReq(SRpcMsg *pReq) {
12,352✔
941
  int32_t code = 0, lino = 0;
12,352✔
942
#ifdef TD_ENTERPRISE
943
  SMnode        *pMnode = pReq->info.node;
12,352✔
944
  SDbObj        *pDb = NULL;
12,352✔
945
  SStbObj       *pStb = NULL;
12,352✔
946
  SRsmaObj      *pObj = NULL;
12,352✔
947
  SUserObj      *pUser = NULL;
12,352✔
948
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
12,352✔
949
  SMAlterRsmaReq req = {0};
12,352✔
950
  char           tbFName[TSDB_TABLE_FNAME_LEN] = "\0";
12,352✔
951

952
  TAOS_CHECK_EXIT(tDeserializeSMAlterRsmaReq(pReq->pCont, pReq->contLen, &req));
12,352✔
953

954
  mInfo("start to alter rsma: %s", req.name);
12,352✔
955
  TAOS_CHECK_EXIT(mndCheckAlterRsmaReq(&req));
12,352✔
956

957
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
12,352✔
958
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
959
    if (terrno != 0) code = terrno;
×
UNCOV
960
    if (req.igNotExists) {
×
UNCOV
961
      code = 0;
×
962
    }
UNCOV
963
    goto _exit;
×
964
  }
965

966
  if (!(pDb = mndAcquireDb(pMnode, pObj->dbFName))) {
12,352✔
UNCOV
967
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
968
  }
969

970
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pDb));
12,352✔
971
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb));
12,352✔
972

973
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
12,352✔
974

975
  pStb = mndAcquireStb(pMnode, tbFName);
12,352✔
976
  if (pStb == NULL) {
12,352✔
UNCOV
977
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
978
  }
979

980
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
12,352✔
981
  TAOS_CHECK_EXIT(mndAlterRsma(pMnode, pReq, pUser, pDb, pStb, &req, pObj));
12,352✔
982

983
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
12,352✔
984

985
  char alterType[32] = "\0";
12,352✔
986
  (void)snprintf(alterType, sizeof(alterType), "alterType:%" PRIi8, req.alterType);
12,352✔
987
  auditRecord(pReq, pMnode->clusterId, "alterRsma", req.name, tbFName, alterType, 0);
12,352✔
988
_exit:
12,352✔
989
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
12,352✔
UNCOV
990
    mError("rsma:%s, failed at line %d to alter since %s", req.name, lino, tstrerror(code));
×
991
  }
992
  if (pObj) mndReleaseRsma(pMnode, pObj);
12,352✔
993
  if (pStb) mndReleaseStb(pMnode, pStb);
12,352✔
994
  if (pDb) mndReleaseDb(pMnode, pDb);
12,352✔
995
  tFreeSMAlterRsmaReq(&req);
12,352✔
996
#endif
997
  TAOS_RETURN(code);
12,352✔
998
}
999
#ifdef TD_ENTERPRISE
1000
static int32_t mndFillRsmaInfo(SRsmaObj *pObj, SStbObj *pStb, SRsmaInfoRsp *pRsp, bool withColName) {
35,512✔
1001
  int32_t code = 0, lino = 0;
35,512✔
1002
  pRsp->id = pObj->uid;
35,512✔
1003
  (void)snprintf(pRsp->name, sizeof(pRsp->name), "%s", pObj->name);
35,512✔
1004
  (void)snprintf(pRsp->tbFName, sizeof(pRsp->tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
35,512✔
1005
  pRsp->version = pObj->version;
35,512✔
1006
  pRsp->tbType = pObj->tbType;
35,512✔
1007
  pRsp->intervalUnit = pObj->intervalUnit;
35,512✔
1008
  pRsp->nFuncs = pObj->nFuncs;
35,512✔
1009
  pRsp->interval[0] = pObj->interval[0];
35,512✔
1010
  pRsp->interval[1] = pObj->interval[1];
35,512✔
1011
  if (pRsp->nFuncs > 0) {
35,512✔
1012
    pRsp->funcColIds = pObj->funcColIds;  // shallow copy, no need to free
35,512✔
1013
    pRsp->funcIds = pObj->funcIds;        // shallow copy, no need to free
35,512✔
1014
    if (withColName) {
35,512✔
1015
      pRsp->colNames = taosArrayInit(pRsp->nFuncs, sizeof(char *));
35,512✔
1016
      if (pRsp->colNames == NULL) {
35,512✔
UNCOV
1017
        TAOS_CHECK_EXIT(terrno);
×
1018
      }
1019
      pRsp->nColNames = pRsp->nFuncs;
35,512✔
1020
      int16_t i = 0, j = 0;
35,512✔
1021
      for (; i < pRsp->nFuncs; ++i) {
209,984✔
1022
        bool found = false;
174,472✔
1023
        for (; j < pStb->numOfColumns;) {
373,648✔
1024
          if (pStb->pColumns[j].colId == pRsp->funcColIds[i]) {
373,648✔
1025
            found = true;
174,472✔
1026
            break;
174,472✔
1027
          } else if (pStb->pColumns[j].colId < pRsp->funcColIds[i]) {
199,176✔
1028
            ++j;
199,176✔
1029
          } else {
UNCOV
1030
            break;
×
1031
          }
1032
        }
1033
        if (found) {
174,472✔
1034
          SSchema *pCol = pStb->pColumns + j;
174,472✔
1035
          char    *colName = taosStrdup(pCol->name);
174,472✔
1036
          if (colName == NULL) {
174,472✔
UNCOV
1037
            TAOS_CHECK_EXIT(terrno);
×
1038
          }
1039
          if (!taosArrayPush(pRsp->colNames, &colName)) {
348,944✔
UNCOV
1040
            taosMemoryFree(colName);
×
UNCOV
1041
            TAOS_CHECK_EXIT(terrno);
×
1042
          }
1043
        } else {
UNCOV
1044
          TAOS_CHECK_EXIT(TSDB_CODE_MND_COLUMN_NOT_EXIST);
×
1045
        }
1046
      }
1047
    }
1048
  }
1049
_exit:
35,512✔
1050
  if (code != 0) {
35,512✔
UNCOV
1051
    mError("rsma:%s, failed at line %d to get rsma info since %s", pObj->name, lino, tstrerror(code));
×
1052
  }
1053
  TAOS_RETURN(code);
35,512✔
1054
}
1055
#endif
1056
static int32_t mndProcessGetRsmaReq(SRpcMsg *pReq) {
46,320✔
1057
#ifdef TD_ENTERPRISE
1058
  int32_t      code = 0, lino = 0;
46,320✔
1059
  SMnode      *pMnode = pReq->info.node;
46,320✔
1060
  SRsmaInfoReq req = {0};
46,320✔
1061
  SRsmaInfoRsp rsp = {0};
46,320✔
1062
  SRsmaObj    *pObj = NULL;
46,320✔
1063
  SStbObj     *pStb = NULL;
46,320✔
1064
  void        *pRsp = NULL;
46,320✔
1065
  int32_t      contLen = 0;
46,320✔
1066

1067
  TAOS_CHECK_EXIT(tDeserializeRsmaInfoReq(pReq->pCont, pReq->contLen, &req));
46,320✔
1068

1069
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
46,320✔
1070
    TAOS_CHECK_EXIT(terrno);
10,808✔
1071
  }
1072

1073
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
35,512✔
1074
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
35,512✔
1075

1076
  if ((pStb = mndAcquireStb(pMnode, tbFName)) == NULL) {
35,512✔
1077
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
1078
  }
1079

1080
  TAOS_CHECK_EXIT(mndFillRsmaInfo(pObj, pStb, &rsp, req.withColName));
35,512✔
1081

1082
  if ((contLen = tSerializeRsmaInfoRsp(NULL, 0, &rsp)) < 0) {
35,512✔
UNCOV
1083
    TAOS_CHECK_EXIT(contLen);
×
1084
  }
1085
  if (!(pRsp = rpcMallocCont(contLen))) {
35,512✔
UNCOV
1086
    TAOS_CHECK_EXIT(terrno);
×
1087
  }
1088
  if ((contLen = tSerializeRsmaInfoRsp(pRsp, contLen, &rsp)) < 0) {
35,512✔
UNCOV
1089
    TAOS_CHECK_EXIT(contLen);
×
1090
  }
1091

1092
  pReq->info.rsp = pRsp;
35,512✔
1093
  pReq->info.rspLen = contLen;
35,512✔
1094

1095
_exit:
46,320✔
1096
  if (code != 0) {
46,320✔
1097
    rpcFreeCont(pRsp);
10,808✔
1098
  }
1099
  if (pObj) mndReleaseRsma(pMnode, pObj);
46,320✔
1100
  if (pStb) mndReleaseStb(pMnode, pStb);
46,320✔
1101
  tFreeRsmaInfoRsp(&rsp, false);
46,320✔
1102
  TAOS_RETURN(code);
46,320✔
1103
#else
1104
  return TSDB_CODE_OPS_NOT_SUPPORT;
1105
#endif
1106
}
1107
#ifdef TD_ENTERPRISE
1108
static void mndRetrieveRsmaFuncList(SMnode *pMnode, SRsmaObj *pObj, char *buf, int32_t bufLen) {
49,408✔
1109
  SSdb    *pSdb = pMnode->pSdb;
49,408✔
1110
  int32_t  numOfRows = 0;
49,408✔
1111
  SStbObj *pStb = NULL;
49,408✔
1112
  char    *qBuf = POINTER_SHIFT(buf, VARSTR_HEADER_SIZE);
49,408✔
1113
  int32_t  qBufLen = bufLen - VARSTR_HEADER_SIZE;
49,408✔
1114

1115
  qBuf[0] = 0;
49,408✔
1116
  varDataSetLen(buf, 0);  // initialize to empty string
49,408✔
1117

1118
  if (pObj->nFuncs <= 0) return;
49,408✔
1119

1120
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
49,408✔
1121
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
49,408✔
1122
  pStb = mndAcquireStb(pMnode, tbFName);
49,408✔
1123
  if (pStb == NULL) {
49,408✔
UNCOV
1124
    mWarn("rsma:%s, failed to acquire table %s for function list", pObj->name, tbFName);
×
UNCOV
1125
    return;
×
1126
  }
1127

1128
  SSchema *pColumns = pStb->pColumns;
49,408✔
1129

1130
  int32_t len = 0, j = 0;
49,408✔
1131
  char    colFunc[TSDB_COL_NAME_LEN + TSDB_FUNC_NAME_LEN + 2] = {0};
49,408✔
1132
  for (int32_t i = 0; i < pObj->nFuncs; ++i) {
353,576✔
1133
    col_id_t colId = pObj->funcColIds[i];
304,168✔
1134
    for (; j < pStb->numOfColumns;) {
629,952✔
1135
      if (pColumns[j].colId == colId) {
629,952✔
1136
        int32_t colFuncLen =
304,168✔
1137
            tsnprintf(colFunc, sizeof(colFunc), "%s(%s),", fmGetFuncName(pObj->funcIds[i]), pColumns[j].name);
304,168✔
1138
        if ((qBufLen - len) > colFuncLen) {
304,168✔
1139
          len += tsnprintf(qBuf + len, colFuncLen + 1, "%s", colFunc);
304,168✔
1140
        } else {
UNCOV
1141
          goto _exit;
×
1142
        }
1143
        break;
304,168✔
1144
      } else if (pColumns[j].colId > colId) {
325,784✔
UNCOV
1145
        break;
×
1146
      } else {
1147
        ++j;
325,784✔
1148
      }
1149
    }
1150
  }
1151
_exit:
49,408✔
1152
  qBuf[len > 0 ? len - 1 : 0] = 0;  // remove the last ','
49,408✔
1153
  varDataSetLen(buf, len > 0 ? len - 1 : 0);
49,408✔
1154
  mndReleaseStb(pMnode, pStb);
49,408✔
1155
}
1156
#endif
1157
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
33,968✔
1158
  SMnode          *pMnode = pReq->info.node;
33,968✔
1159
  int32_t          code = 0, lino = 0;
33,968✔
1160
  int32_t          numOfRows = 0;
33,968✔
1161
  int32_t          cols = 0;
33,968✔
1162
  char             tmp[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE];
33,968✔
1163
  int32_t          tmpLen = 0;
33,968✔
1164
  int32_t          bufLen = 0;
33,968✔
1165
  char            *pBuf = NULL;
33,968✔
1166
  char            *qBuf = NULL;
33,968✔
1167
  void            *pIter = NULL;
33,968✔
1168
  SSdb            *pSdb = pMnode->pSdb;
33,968✔
1169
  SColumnInfoData *pColInfo = NULL;
33,968✔
1170
#ifdef TD_ENTERPRISE
1171
  pBuf = tmp;
33,968✔
1172
  bufLen = sizeof(tmp) - VARSTR_HEADER_SIZE;
33,968✔
1173
  if (pShow->numOfRows < 1) {
33,968✔
1174
    SRsmaObj *pObj = NULL;
33,968✔
1175
    int32_t   index = 0;
33,968✔
1176
    while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
83,376✔
1177
      cols = 0;
49,408✔
1178
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
49,408✔
1179
      qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
49,408✔
1180
      TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->name));
49,408✔
1181
      varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
49,408✔
1182
      COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
49,408✔
1183

1184
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
49,408✔
1185
        COL_DATA_SET_VAL_GOTO((const char *)(&pObj->uid), false, pObj, pIter, _exit);
49,408✔
1186
      }
1187

1188
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
49,408✔
1189
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
49,408✔
1190
        const char *db = strchr(pObj->dbFName, '.');
49,408✔
1191
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", db ? db + 1 : pObj->dbFName));
49,408✔
1192
        varDataSetLen(pBuf, strlen(qBuf));
49,408✔
1193
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
49,408✔
1194
      }
1195

1196
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
49,408✔
1197
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
49,408✔
1198
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->tbName));
49,408✔
1199
        varDataSetLen(pBuf, strlen(qBuf));
49,408✔
1200
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
49,408✔
1201
      }
1202

1203
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
49,408✔
1204
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
49,408✔
1205
        if (pObj->tbType == TSDB_SUPER_TABLE) {
49,408✔
1206
          TAOS_UNUSED(snprintf(qBuf, bufLen, "SUPER_TABLE"));
49,408✔
UNCOV
1207
        } else if (pObj->tbType == TSDB_NORMAL_TABLE) {
×
UNCOV
1208
          TAOS_UNUSED(snprintf(qBuf, bufLen, "NORMAL_TABLE"));
×
UNCOV
1209
        } else if (pObj->tbType == TSDB_CHILD_TABLE) {
×
UNCOV
1210
          TAOS_UNUSED(snprintf(qBuf, bufLen, "CHILD_TABLE"));
×
1211
        } else {
UNCOV
1212
          TAOS_UNUSED(snprintf(qBuf, bufLen, "UNKNOWN"));
×
1213
        }
1214
        varDataSetLen(pBuf, strlen(qBuf));
49,408✔
1215
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
49,408✔
1216
      }
1217

1218
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
49,408✔
1219
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->createdTime, false, pObj, pIter, _exit);
49,408✔
1220
      }
1221

1222
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
49,408✔
1223
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
49,408✔
1224
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%" PRIi64 "%c", pObj->interval[0], pObj->intervalUnit));
49,408✔
1225
        if (pObj->interval[1] > 0) {
49,408✔
1226
          tmpLen = strlen(qBuf);
49,408✔
1227
          TAOS_UNUSED(
49,408✔
1228
              snprintf(qBuf + tmpLen, bufLen - tmpLen, ",%" PRIi64 "%c", pObj->interval[1], pObj->intervalUnit));
1229
        }
1230
        varDataSetLen(pBuf, strlen(qBuf));
49,408✔
1231
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
49,408✔
1232
      }
1233

1234
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
49,408✔
1235
        mndRetrieveRsmaFuncList(pMnode, pObj, pBuf, bufLen);
49,408✔
1236
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
49,408✔
1237
      }
1238

1239
      sdbRelease(pSdb, pObj);
49,408✔
1240
      ++numOfRows;
49,408✔
1241
    }
1242
  }
1243

1244
  pShow->numOfRows += numOfRows;
33,968✔
1245

1246
_exit:
33,968✔
1247
  if (code < 0) {
33,968✔
1248
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1249
    TAOS_RETURN(code);
×
1250
  }
1251
#endif
1252
  return numOfRows;
33,968✔
1253
}
1254

UNCOV
1255
static void mndCancelRetrieveRsma(SMnode *pMnode, void *pIter) {
×
UNCOV
1256
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1257
  sdbCancelFetchByType(pSdb, pIter, SDB_RSMA);
×
UNCOV
1258
}
×
1259

1260
int32_t mndDropRsmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
720,125✔
1261
  int32_t code = 0;
720,125✔
1262
#ifdef TD_ENTERPRISE
1263
  SSdb     *pSdb = pMnode->pSdb;
720,125✔
1264
  SRsmaObj *pObj = NULL;
720,125✔
1265
  void     *pIter = NULL;
720,125✔
1266

1267
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
735,565✔
1268
    if (pObj->dbUid == pDb->uid) {
15,440✔
1269
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
15,440✔
UNCOV
1270
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
1271
        sdbRelease(pSdb, pObj);
×
UNCOV
1272
        TAOS_RETURN(code);
×
1273
      }
1274
    }
1275
    sdbRelease(pSdb, pObj);
15,440✔
1276
  }
1277
#endif
1278
  TAOS_RETURN(code);
720,125✔
1279
}
1280

1281
int32_t mndDropRsmaByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
552,853✔
1282
  int32_t code = 0;
552,853✔
1283
#ifdef TD_ENTERPRISE
1284
  SSdb     *pSdb = pMnode->pSdb;
552,853✔
1285
  SRsmaObj *pObj = NULL;
552,853✔
1286
  void     *pIter = NULL;
552,853✔
1287

1288
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
554,397✔
1289
    if (pObj->tbUid == pStb->uid && pObj->dbUid == pStb->dbUid) {
1,544✔
1290
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
1,544✔
UNCOV
1291
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
1292
        sdbRelease(pSdb, pObj);
×
UNCOV
1293
        TAOS_RETURN(code);
×
1294
      }
1295
    }
1296
    sdbRelease(pSdb, pObj);
1,544✔
1297
  }
1298
#endif
1299
  TAOS_RETURN(code);
552,853✔
1300
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc