• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4791

13 Oct 2025 06:50AM UTC coverage: 57.628% (-0.8%) from 58.476%
#4791

push

travis-ci

web-flow
Merge pull request #33213 from taosdata/fix/huoh/timemoe_model_directory

fix: fix tdgpt timemoe model directory

136628 of 303332 branches covered (45.04%)

Branch coverage included in aggregate %.

208121 of 294900 relevant lines covered (70.57%)

4250784.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.57
/source/dnode/mnode/impl/src/mndRsma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndInfoSchema.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndShow.h"
25
#include "mndSma.h"
26
#include "mndStb.h"
27
#include "mndTrans.h"
28
#include "mndUser.h"
29
#include "mndVgroup.h"
30
#include "parser.h"
31
#include "tname.h"
32

33
#define MND_RSMA_VER_NUMBER   1
34
#define MND_RSMA_RESERVE_SIZE 64
35

36
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pSma);
37
static SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw);
38
static int32_t  mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pSma);
39
static int32_t  mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pSpSmatb);
40
static int32_t  mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew);
41
static int32_t  mndProcessCreateRsmaReq(SRpcMsg *pReq);
42
static int32_t  mndProcessDropRsmaReq(SRpcMsg *pReq);
43
static int32_t  mndProcessAlterRsmaReq(SRpcMsg *pReq);
44
static int32_t  mndProcessGetRsmaReq(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelRetrieveRsma(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveRsmaTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelRetrieveRsmaTask(SMnode *pMnode, void *pIter);
50

51
int32_t mndInitRsma(SMnode *pMnode) {
1,332✔
52
  SSdbTable table = {
1,332✔
53
      .sdbType = SDB_RSMA,
54
      .keyType = SDB_KEY_BINARY,
55
      .encodeFp = (SdbEncodeFp)mndRsmaActionEncode,
56
      .decodeFp = (SdbDecodeFp)mndRsmaActionDecode,
57
      .insertFp = (SdbInsertFp)mndRsmaActionInsert,
58
      .updateFp = (SdbUpdateFp)mndRsmaActionUpdate,
59
      .deleteFp = (SdbDeleteFp)mndRsmaActionDelete,
60
  };
61

62
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_RSMA, mndProcessCreateRsmaReq);
1,332✔
63
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_RSMA_RSP, mndTransProcessRsp);
1,332✔
64
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_RSMA, mndProcessDropRsmaReq);
1,332✔
65
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_RSMA_RSP, mndTransProcessRsp);
1,332✔
66
  mndSetMsgHandle(pMnode, TDMT_MND_ALTER_RSMA, mndProcessAlterRsmaReq);
1,332✔
67
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_RSMA_RSP, mndTransProcessRsp);
1,332✔
68
  mndSetMsgHandle(pMnode, TDMT_MND_GET_RSMA, mndProcessGetRsmaReq);
1,332✔
69
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndRetrieveRsma);
1,332✔
70
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_RSMA, mndCancelRetrieveRsma);
1,332✔
71

72
  return sdbSetTable(pMnode->pSdb, table);
1,332✔
73
}
74

75
void mndCleanupRsma(SMnode *pMnode) {}
1,332✔
76

77
void mndRsmaFreeObj(SRsmaObj *pObj) {
78✔
78
  if (pObj) {
78!
79
    taosMemoryFreeClear(pObj->funcColIds);
78!
80
    taosMemoryFreeClear(pObj->funcIds);
78!
81
  }
82
}
78✔
83

84
static int32_t tSerializeSRsmaObj(void *buf, int32_t bufLen, const SRsmaObj *pObj) {
122✔
85
  int32_t  code = 0, lino = 0;
122✔
86
  int32_t  tlen = 0;
122✔
87
  SEncoder encoder = {0};
122✔
88
  tEncoderInit(&encoder, buf, bufLen);
122✔
89

90
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
122!
91

92
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->name));
244!
93
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->tbName));
244!
94
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbFName));
244!
95
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->createUser));
244!
96
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->createdTime));
244!
97
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->updateTime));
244!
98
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->uid));
244!
99
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->tbUid));
244!
100
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
244!
101
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[0]));
244!
102
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->interval[1]));
244!
103
  TAOS_CHECK_EXIT(tEncodeU64v(&encoder, pObj->reserved));
244!
104
  TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->version));
244!
105
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->tbType));
244!
106
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pObj->intervalUnit));
244!
107
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nFuncs));
244!
108
  for (int16_t i = 0; i < pObj->nFuncs; ++i) {
814✔
109
    TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->funcColIds[i]));
1,384!
110
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->funcIds[i]));
1,384!
111
  }
112

113
  tEndEncode(&encoder);
122✔
114

115
  tlen = encoder.pos;
122✔
116
_exit:
122✔
117
  tEncoderClear(&encoder);
122✔
118
  if (code < 0) {
122!
119
    mError("rsma, %s failed at line %d since %s", __func__, lino, tstrerror(code));
×
120
    TAOS_RETURN(code);
×
121
  }
122

123
  return tlen;
122✔
124
}
125

126
static int32_t tDeserializeSRsmaObj(void *buf, int32_t bufLen, SRsmaObj *pObj) {
57✔
127
  int32_t  code = 0, lino = 0;
57✔
128
  SDecoder decoder = {0};
57✔
129
  tDecoderInit(&decoder, buf, bufLen);
57✔
130

131
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
57!
132

133
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->name));
57!
134
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->tbName));
57!
135
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbFName));
57!
136
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->createUser));
57!
137
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->createdTime));
114!
138
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->updateTime));
114!
139
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->uid));
114!
140
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->tbUid));
114!
141
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
114!
142
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[0]));
114!
143
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->interval[1]));
114!
144
  TAOS_CHECK_EXIT(tDecodeU64v(&decoder, &pObj->reserved));
114!
145
  TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->version));
114!
146
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->tbType));
114!
147
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pObj->intervalUnit));
114!
148
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nFuncs));
114!
149
  if (pObj->nFuncs > 0) {
57!
150
    if (!(pObj->funcColIds = taosMemoryMalloc(sizeof(col_id_t) * pObj->nFuncs))) {
57!
151
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
152
    }
153
    if (!(pObj->funcIds = taosMemoryMalloc(sizeof(int32_t) * pObj->nFuncs))) {
57!
154
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
155
    }
156
    for (int16_t i = 0; i < pObj->nFuncs; ++i) {
388✔
157
      TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->funcColIds[i]));
662!
158
      TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->funcIds[i]));
662!
159
    }
160
  }
161

162
_exit:
57✔
163
  tEndDecode(&decoder);
57✔
164
  tDecoderClear(&decoder);
57✔
165
  if (code < 0) {
57!
166
    mError("rsma, %s failed at line %d since %s, row:%p", __func__, lino, tstrerror(code), pObj);
×
167
  }
168
  TAOS_RETURN(code);
57✔
169
}
170

171
static SSdbRaw *mndRsmaActionEncode(SRsmaObj *pObj) {
61✔
172
  int32_t  code = 0, lino = 0;
61✔
173
  void    *buf = NULL;
61✔
174
  SSdbRaw *pRaw = NULL;
61✔
175
  int32_t  tlen = tSerializeSRsmaObj(NULL, 0, pObj);
61✔
176
  if (tlen < 0) {
61!
177
    TAOS_CHECK_EXIT(tlen);
×
178
  }
179

180
  int32_t size = sizeof(int32_t) + tlen;
61✔
181
  pRaw = sdbAllocRaw(SDB_RSMA, MND_RSMA_VER_NUMBER, size);
61✔
182
  if (pRaw == NULL) {
61!
183
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
184
  }
185

186
  buf = taosMemoryMalloc(tlen);
61!
187
  if (buf == NULL) {
61!
188
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
189
  }
190

191
  tlen = tSerializeSRsmaObj(buf, tlen, pObj);
61✔
192
  if (tlen < 0) {
61!
193
    TAOS_CHECK_EXIT(tlen);
×
194
  }
195

196
  int32_t dataPos = 0;
61✔
197
  SDB_SET_INT32(pRaw, dataPos, tlen, _exit);
61!
198
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _exit);
61!
199
  SDB_SET_DATALEN(pRaw, dataPos, _exit);
61!
200

201
_exit:
61✔
202
  taosMemoryFreeClear(buf);
61!
203
  if (code != TSDB_CODE_SUCCESS) {
61!
204
    terrno = code;
×
205
    mError("rsma, failed at line %d to encode to raw:%p since %s", lino, pRaw, tstrerror(code));
×
206
    sdbFreeRaw(pRaw);
×
207
    return NULL;
×
208
  }
209

210
  mTrace("rsma, encode to raw:%p, row:%p", pRaw, pObj);
61!
211
  return pRaw;
61✔
212
}
213

214
SSdbRow *mndRsmaActionDecode(SSdbRaw *pRaw) {
57✔
215
  int32_t   code = 0, lino = 0;
57✔
216
  SSdbRow  *pRow = NULL;
57✔
217
  SRsmaObj *pObj = NULL;
57✔
218
  void     *buf = NULL;
57✔
219

220
  int8_t sver = 0;
57✔
221
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
57!
222
    goto _exit;
×
223
  }
224

225
  if (sver != MND_RSMA_VER_NUMBER) {
57!
226
    code = TSDB_CODE_SDB_INVALID_DATA_VER;
×
227
    mError("rsma read invalid ver, data ver: %d, curr ver: %d", sver, MND_RSMA_VER_NUMBER);
×
228
    goto _exit;
×
229
  }
230

231
  if (!(pRow = sdbAllocRow(sizeof(SRsmaObj)))) {
57!
232
    code = TSDB_CODE_OUT_OF_MEMORY;
×
233
    goto _exit;
×
234
  }
235

236
  if (!(pObj = sdbGetRowObj(pRow))) {
57!
237
    code = TSDB_CODE_OUT_OF_MEMORY;
×
238
    goto _exit;
×
239
  }
240

241
  int32_t tlen;
242
  int32_t dataPos = 0;
57✔
243
  SDB_GET_INT32(pRaw, dataPos, &tlen, _exit);
57!
244
  buf = taosMemoryMalloc(tlen + 1);
57!
245
  if (buf == NULL) {
57!
246
    code = TSDB_CODE_OUT_OF_MEMORY;
×
247
    goto _exit;
×
248
  }
249
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _exit);
57!
250

251
  if (tDeserializeSRsmaObj(buf, tlen, pObj) < 0) {
57!
252
    code = TSDB_CODE_OUT_OF_MEMORY;
×
253
    goto _exit;
×
254
  }
255

256
  taosInitRWLatch(&pObj->lock);
57✔
257

258
_exit:
57✔
259
  taosMemoryFreeClear(buf);
57!
260
  if (code != TSDB_CODE_SUCCESS) {
57!
261
    terrno = code;
×
262
    mError("rsma, failed at line %d to decode from raw:%p since %s", lino, pRaw, tstrerror(code));
×
263
    mndRsmaFreeObj(pObj);
×
264
    taosMemoryFreeClear(pRow);
×
265
    return NULL;
×
266
  }
267
  mTrace("rsma, decode from raw:%p, row:%p", pRaw, pObj);
57!
268
  return pRow;
57✔
269
}
270

271
static int32_t mndRsmaActionInsert(SSdb *pSdb, SRsmaObj *pObj) {
14✔
272
  mTrace("rsma:%s, perform insert action, row:%p", pObj->name, pObj);
14!
273
  return 0;
14✔
274
}
275

276
static int32_t mndRsmaActionDelete(SSdb *pSdb, SRsmaObj *pObj) {
57✔
277
  mTrace("rsma:%s, perform delete action, row:%p", pObj->name, pObj);
57!
278
  mndRsmaFreeObj(pObj);
57✔
279
  return 0;
57✔
280
}
281

282
static int32_t mndRsmaActionUpdate(SSdb *pSdb, SRsmaObj *pOld, SRsmaObj *pNew) {
31✔
283
  mTrace("rsma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
31!
284
  taosWLockLatch(&pOld->lock);
31✔
285
  pOld->updateTime = pNew->updateTime;
31✔
286
  pOld->nFuncs = pNew->nFuncs;
31✔
287
  TSWAP(pOld->funcColIds, pNew->funcColIds);
31✔
288
  TSWAP(pOld->funcIds, pNew->funcIds);
31✔
289
  taosWUnLockLatch(&pOld->lock);
31✔
290
  return 0;
31✔
291
}
292

293
SRsmaObj *mndAcquireRsma(SMnode *pMnode, char *name) {
63✔
294
  SSdb     *pSdb = pMnode->pSdb;
63✔
295
  SRsmaObj *pObj = sdbAcquire(pSdb, SDB_RSMA, name);
63✔
296
  if (pObj == NULL) {
63✔
297
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
26!
298
      terrno = TSDB_CODE_RSMA_NOT_EXIST;
26✔
299
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
300
      terrno = TSDB_CODE_MND_RSMA_IN_CREATING;
×
301
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
302
      terrno = TSDB_CODE_MND_RSMA_IN_DROPPING;
×
303
    } else {
304
      terrno = TSDB_CODE_APP_ERROR;
×
305
      mFatal("rsma:%s, failed to acquire rsma since %s", name, terrstr());
×
306
    }
307
  }
308
  return pObj;
63✔
309
}
310

311
void mndReleaseRsma(SMnode *pMnode, SRsmaObj *pSma) {
37✔
312
  SSdb *pSdb = pMnode->pSdb;
37✔
313
  sdbRelease(pSdb, pSma);
37✔
314
}
37✔
315
#ifdef TD_ENTERPRISE
316
static int32_t mndSetCreateRsmaRedoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
317
  int32_t  code = 0;
×
318
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
×
319
  if (pRedoRaw == NULL) {
×
320
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
321
    if (terrno != 0) code = terrno;
×
322
    TAOS_RETURN(code);
×
323
  }
324
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
325
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
326

327
  TAOS_RETURN(code);
×
328
}
329

330
static int32_t mndSetCreateRsmaUndoLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
×
331
  int32_t  code = 0;
×
332
  SSdbRaw *pUndoRaw = mndRsmaActionEncode(pSma);
×
333
  if (!pUndoRaw) {
×
334
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
335
    if (terrno != 0) code = terrno;
×
336
    TAOS_RETURN(code);
×
337
  }
338
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
339
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
340
  TAOS_RETURN(code);
×
341
}
342

343
static int32_t mndSetCreateRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
14✔
344
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
14✔
345
  if (pDbRaw == NULL) return -1;
14!
346

347
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
14!
348
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
14!
349
  return 0;
14✔
350
}
351

352
static void *mndBuildVCreateRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
28✔
353
                                    SMCreateRsmaReq *pCreate, int32_t *pContLen) {
354
  int32_t         code = 0, lino = 0;
28✔
355
  SMsgHead       *pHead = NULL;
28✔
356
  SVCreateRsmaReq req = *pCreate;
28✔
357

358
  req.uid = pObj->uid;  // use the uid generated by mnode
28✔
359

360
  int32_t contLen = tSerializeSVCreateRsmaReq(NULL, 0, &req);
28✔
361
  TAOS_CHECK_EXIT(contLen);
28!
362
  contLen += sizeof(SMsgHead);
28✔
363
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
28!
364
  pHead->contLen = htonl(contLen);
28✔
365
  pHead->vgId = htonl(pVgroup->vgId);
28✔
366
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
28✔
367
  TAOS_CHECK_EXIT(tSerializeSVCreateRsmaReq(pBuf, contLen, &req));
28!
368
_exit:
28✔
369
  if (code < 0) {
28!
370
    taosMemoryFreeClear(pHead);
×
371
    terrno = code;
×
372
    *pContLen = 0;
×
373
    return NULL;
×
374
  }
375
  *pContLen = contLen;
28✔
376
  return pHead;
28✔
377
}
378

379
static int32_t mndSetCreateRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
14✔
380
                                           SMCreateRsmaReq *pCreate) {
381
  int32_t code = 0;
14✔
382
  SSdb   *pSdb = pMnode->pSdb;
14✔
383
  SVgObj *pVgroup = NULL;
14✔
384
  void   *pIter = NULL;
14✔
385

386
  SName name = {0};
14✔
387
  if ((code = tNameFromString(&name, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
14!
388
    return code;
×
389
  }
390
  tstrncpy(pCreate->tbFName, (char *)tNameGetTableName(&name), sizeof(pCreate->tbFName));  // convert tbFName to tbName
14✔
391

392
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
70✔
393
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
56✔
394
      sdbRelease(pSdb, pVgroup);
28✔
395
      continue;
28✔
396
    }
397

398
    int32_t contLen = 0;
28✔
399
    void   *pReq = mndBuildVCreateRsmaReq(pMnode, pVgroup, pStb, pObj, pCreate, &contLen);
28✔
400
    if (pReq == NULL) {
28!
401
      sdbCancelFetch(pSdb, pIter);
×
402
      sdbRelease(pSdb, pVgroup);
×
403
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
404
      TAOS_RETURN(code);
×
405
    }
406

407
    STransAction action = {0};
28✔
408
    action.mTraceId = pTrans->mTraceId;
28✔
409
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
28✔
410
    action.pCont = pReq;
28✔
411
    action.contLen = contLen;
28✔
412
    action.msgType = TDMT_VND_CREATE_RSMA;
28✔
413
    action.acceptableCode = TSDB_CODE_RSMA_ALREADY_EXISTS;  // check whether the rsma uid exist
28✔
414
    action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;         // retry if relative table not exist
28✔
415
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
28!
416
      taosMemoryFree(pReq);
×
417
      sdbCancelFetch(pSdb, pIter);
×
418
      sdbRelease(pSdb, pVgroup);
×
419
      TAOS_RETURN(code);
×
420
    }
421
    sdbRelease(pSdb, pVgroup);
28✔
422
  }
423

424
  TAOS_RETURN(code);
14✔
425
}
426

427
static int32_t mndSetCreateRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
21✔
428
  int32_t  code = 0;
21✔
429
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
21✔
430
  if (pCommitRaw == NULL) {
21!
431
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
432
    if (terrno != 0) code = terrno;
×
433
    TAOS_RETURN(code);
×
434
  }
435
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
21!
436
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
21!
437

438
  TAOS_RETURN(code);
21✔
439
}
440

441
static int32_t mndSetDropRsmaPrepareLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
3✔
442
  int32_t  code = 0;
3✔
443
  SSdbRaw *pRedoRaw = mndRsmaActionEncode(pSma);
3✔
444
  if (pRedoRaw == NULL) {
3!
445
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
446
    if (terrno != 0) code = terrno;
×
447
    return -1;
×
448
  }
449
  TAOS_CHECK_RETURN(mndTransAppendPrepareLog(pTrans, pRedoRaw));
3!
450
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
3!
451

452
  return 0;
3✔
453
}
454

455
static int32_t mndSetDropRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
12✔
456
  int32_t  code = 0;
12✔
457
  SSdbRaw *pCommitRaw = mndRsmaActionEncode(pSma);
12✔
458
  if (pCommitRaw == NULL) {
12!
459
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
460
    if (terrno != 0) code = terrno;
×
461
    return -1;
×
462
  }
463
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
12!
464
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
12!
465

466
  return 0;
12✔
467
}
468

469
static void *mndBuildVDropRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SRsmaObj *pObj, int32_t *pContLen) {
6✔
470
  int32_t       code = 0, lino = 0;
6✔
471
  SMsgHead     *pHead = NULL;
6✔
472
  SVDropRsmaReq req = {0};
6✔
473

474
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
6✔
475
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
6✔
476
  req.tbType = pObj->tbType;
6✔
477
  req.uid = pObj->uid;
6✔
478
  req.tbUid = pObj->tbUid;
6✔
479

480
  int32_t contLen = tSerializeSVDropRsmaReq(NULL, 0, &req);
6✔
481
  TAOS_CHECK_EXIT(contLen);
6!
482
  contLen += sizeof(SMsgHead);
6✔
483
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
6!
484
  pHead->contLen = htonl(contLen);
6✔
485
  pHead->vgId = htonl(pVgroup->vgId);
6✔
486
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
6✔
487
  TAOS_CHECK_EXIT(tSerializeSVDropRsmaReq(pBuf, contLen, &req));
6!
488
_exit:
6✔
489
  if (code < 0) {
6!
490
    taosMemoryFreeClear(pHead);
×
491
    terrno = code;
×
492
    *pContLen = 0;
×
493
    return NULL;
×
494
  }
495
  *pContLen = contLen;
6✔
496
  return pHead;
6✔
497
}
498

499
static int32_t mndSetDropRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SRsmaObj *pSma) {
3✔
500
  int32_t code = 0;
3✔
501
  SSdb   *pSdb = pMnode->pSdb;
3✔
502
  SVgObj *pVgroup = NULL;
3✔
503
  void   *pIter = NULL;
3✔
504

505
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
15✔
506
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
12✔
507
      sdbRelease(pSdb, pVgroup);
6✔
508
      continue;
6✔
509
    }
510

511
    int32_t contLen = 0;
6✔
512
    void   *pReq = mndBuildVDropRsmaReq(pMnode, pVgroup, pSma, &contLen);
6✔
513
    if (pReq == NULL) {
6!
514
      sdbCancelFetch(pSdb, pIter);
×
515
      sdbRelease(pSdb, pVgroup);
×
516
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
517
      TAOS_RETURN(code);
×
518
    }
519

520
    STransAction action = {0};
6✔
521
    action.mTraceId = pTrans->mTraceId;
6✔
522
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
6✔
523
    action.pCont = pReq;
6✔
524
    action.contLen = contLen;
6✔
525
    action.msgType = TDMT_VND_DROP_RSMA;
6✔
526
    action.acceptableCode = TSDB_CODE_RSMA_NOT_EXIST;
6✔
527
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
6!
528
      taosMemoryFree(pReq);
×
529
      sdbCancelFetch(pSdb, pIter);
×
530
      sdbRelease(pSdb, pVgroup);
×
531
      TAOS_RETURN(code);
×
532
    }
533
    sdbRelease(pSdb, pVgroup);
6✔
534
  }
535
  TAOS_RETURN(code);
3✔
536
}
537

538
static int32_t mndDropRsma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SRsmaObj *pObj) {
3✔
539
  int32_t code = 0, lino = 0;
3✔
540

541
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-rsma");
3✔
542
  if (pTrans == NULL) {
3!
543
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
544
    if (terrno != 0) code = terrno;
×
545
    goto _exit;
×
546
  }
547

548
  mInfo("trans:%d start to drop rsma:%s", pTrans->id, pObj->name);
3!
549

550
  mndTransSetDbName(pTrans, pDb->name, pObj->name);
3✔
551
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
3✔
552
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
3!
553

554
  mndTransSetOper(pTrans, MND_OPER_DROP_RSMA);
3✔
555
  TAOS_CHECK_EXIT(mndSetDropRsmaPrepareLogs(pMnode, pTrans, pObj));
3!
556
  TAOS_CHECK_EXIT(mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj));
3!
557
  TAOS_CHECK_EXIT(mndSetDropRsmaRedoActions(pMnode, pTrans, pDb, pObj));
3!
558

559
  // int32_t rspLen = 0;
560
  // void   *pRsp = NULL;
561
  // TAOS_CHECK_EXIT(mndBuildDropRsmaRsp(pObj, &rspLen, &pRsp, false));
562
  // mndTransSetRpcRsp(pTrans, pRsp, rspLen);
563

564
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
3!
565
_exit:
3✔
566
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
3!
567
    mError("rsma:%s, failed to drop at line:%d since %s", pObj->name, lino, tstrerror(code));
×
568
  }
569
  mndTransDrop(pTrans);
3✔
570
  TAOS_RETURN(code);
3✔
571
}
572
#endif
573
static int32_t mndProcessDropRsmaReq(SRpcMsg *pReq) {
3✔
574
  SMnode *pMnode = pReq->info.node;
3✔
575
  int32_t code = 0, lino = 0;
3✔
576
#ifdef TD_ENTERPRISE
577
  SDbObj       *pDb = NULL;
3✔
578
  SRsmaObj     *pObj = NULL;
3✔
579
  SMDropRsmaReq dropReq = {0};
3✔
580

581
  TAOS_CHECK_GOTO(tDeserializeSMDropRsmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _exit);
3!
582

583
  mInfo("rsma:%s, start to drop", dropReq.name);
3!
584

585
  pObj = mndAcquireRsma(pMnode, dropReq.name);
3✔
586
  if (pObj == NULL) {
3!
587
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
588
    if (terrno != 0) code = terrno;
×
589
    if (dropReq.igNotExists) {
×
590
      code = 0;  // mndBuildDropMountRsp(pObj, &pReq->info.rspLen, &pReq->info.rsp, true);
×
591
    }
592
    goto _exit;
×
593
  }
594

595
  SName name = {0};
3✔
596
  TAOS_CHECK_EXIT(tNameFromString(&name, pObj->dbFName, T_NAME_ACCT | T_NAME_DB));
3!
597

598
  char db[TSDB_TABLE_FNAME_LEN] = {0};
3✔
599
  (void)tNameGetFullDbName(&name, db);
3✔
600
  if (!(pDb = mndAcquireDb(pMnode, db))) {
3!
601
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
602
  }
603

604
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _exit);
3!
605

606
  code = mndDropRsma(pMnode, pReq, pDb, pObj);
3✔
607
  if (code == TSDB_CODE_SUCCESS) {
3!
608
    code = TSDB_CODE_ACTION_IN_PROGRESS;
3✔
609
  }
610

611
  auditRecord(pReq, pMnode->clusterId, "dropRsma", dropReq.name, "", "", 0);
3✔
612
_exit:
3✔
613
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
3!
614
    mError("rsma:%s, failed at line %d to drop since %s", dropReq.name, lino, tstrerror(code));
×
615
  }
616

617
  mndReleaseDb(pMnode, pDb);
3✔
618
  mndReleaseRsma(pMnode, pObj);
3✔
619
#endif
620
  TAOS_RETURN(code);
3✔
621
}
622
#ifdef TD_ENTERPRISE
623
static int32_t mndCreateRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
14✔
624
                             SMCreateRsmaReq *pCreate) {
625
  int32_t  code = 0, lino = 0;
14✔
626
  SRsmaObj obj = {0};
14✔
627
  STrans  *pTrans = NULL;
14✔
628

629
  (void)snprintf(obj.name, TSDB_TABLE_NAME_LEN, "%s", pCreate->name);
14✔
630
  (void)snprintf(obj.dbFName, TSDB_DB_FNAME_LEN, "%s", pDb->name);
14✔
631

632
  const char *tbName = strrchr(pCreate->tbFName, '.');
14✔
633
  (void)snprintf(obj.tbName, TSDB_TABLE_NAME_LEN, "%s", tbName ? tbName + 1 : pCreate->tbFName);
14!
634
  (void)snprintf(obj.createUser, TSDB_USER_LEN, "%s", pUser->user);
14✔
635
  obj.createdTime = taosGetTimestampMs();
14✔
636
  obj.updateTime = obj.createdTime;
14✔
637
  obj.uid = mndGenerateUid(obj.name, TSDB_TABLE_FNAME_LEN);
14✔
638
  obj.tbUid = pCreate->tbUid;
14✔
639
  obj.dbUid = pDb->uid;
14✔
640
  obj.interval[0] = pCreate->interval[0];
14✔
641
  obj.interval[1] = pCreate->interval[1];
14✔
642
  obj.version = 1;
14✔
643
  obj.tbType = pCreate->tbType;  // ETableType: 1 stable. Only super table supported currently.
14✔
644
  obj.intervalUnit = pCreate->intervalUnit;
14✔
645
  obj.nFuncs = pCreate->nFuncs;
14✔
646
  if (obj.nFuncs > 0) {
14!
647
    TSDB_CHECK_NULL((obj.funcColIds = taosMemoryCalloc(obj.nFuncs, sizeof(col_id_t))), code, lino, _exit, terrno);
14!
648
    TSDB_CHECK_NULL((obj.funcIds = taosMemoryCalloc(obj.nFuncs, sizeof(func_id_t))), code, lino, _exit, terrno);
14!
649
    for (int16_t i = 0; i < obj.nFuncs; ++i) {
93✔
650
      obj.funcColIds[i] = pCreate->funcColIds[i];
79✔
651
      obj.funcIds[i] = pCreate->funcIds[i];
79✔
652
    }
653
  }
654

655
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-rsma")),
14!
656
                  code, lino, _exit, terrno);
657
  mInfo("trans:%d, used to create rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
14!
658

659
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
14✔
660
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
14✔
661
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
14!
662

663
  mndTransSetOper(pTrans, MND_OPER_CREATE_RSMA);
14✔
664
  TAOS_CHECK_EXIT(mndSetCreateRsmaPrepareActions(pMnode, pTrans, &obj));
14!
665
  TAOS_CHECK_EXIT(mndSetCreateRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pCreate));
14!
666
  TAOS_CHECK_EXIT(mndSetCreateRsmaCommitLogs(pMnode, pTrans, &obj));
14!
667
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
14!
668
_exit:
14✔
669
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
14!
670
    mError("rsma:%s, failed at line %d to create rsma, since %s", obj.name, lino, tstrerror(code));
×
671
  }
672
  mndTransDrop(pTrans);
14✔
673
  mndRsmaFreeObj(&obj);
14✔
674
  TAOS_RETURN(code);
14✔
675
}
676

677
static int32_t mndCheckCreateRsmaReq(SMCreateRsmaReq *pCreate) {
24✔
678
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
24✔
679
  if (pCreate->name[0] == 0) goto _exit;
24!
680
  if (pCreate->tbFName[0] == 0) goto _exit;
24!
681
  if (pCreate->igExists < 0 || pCreate->igExists > 1) goto _exit;
24!
682
  if (pCreate->intervalUnit < 0) goto _exit;
24!
683
  if (pCreate->interval[0] < 0) goto _exit;
24!
684
  if (pCreate->interval[1] < 0) goto _exit;
24!
685
  if (pCreate->interval[0] == 0 && pCreate->interval[1] == 0) goto _exit;
24!
686

687
  SName fname = {0};
24✔
688
  if ((code = tNameFromString(&fname, pCreate->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) < 0) goto _exit;
24!
689
  if (*(char *)tNameGetTableName(&fname) == 0) goto _exit;
24!
690
  code = 0;
24✔
691
_exit:
24✔
692
  TAOS_RETURN(code);
24✔
693
}
694

695
static int32_t mndCheckRsmaConflicts(SMnode *pMnode, SDbObj *pDbObj, SMCreateRsmaReq *pCreate) {
19✔
696
  void     *pIter = NULL;
19✔
697
  SSdb     *pSdb = pMnode->pSdb;
19✔
698
  SRsmaObj *pObj = NULL;
19✔
699
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
27✔
700
    if (pObj->tbUid == pCreate->tbUid && pObj->dbUid == pDbObj->uid) {
13!
701
      sdbCancelFetch(pSdb, (pIter));
5✔
702
      sdbRelease(pSdb, pObj);
5✔
703
      mError("rsma:%s, conflict with existing rsma %s on same table %s.%s:%" PRIi64, pCreate->name, pObj->name,
5!
704
             pObj->dbFName, pObj->tbName, pObj->tbUid);
705
      return TSDB_CODE_MND_RSMA_EXIST_IN_TABLE;
5✔
706
    }
707
    sdbRelease(pSdb, pObj);
8✔
708
  }
709
  return 0;
14✔
710
}
711
#endif
712
static int32_t mndProcessCreateRsmaReq(SRpcMsg *pReq) {
24✔
713
  int32_t code = 0, lino = 0;
24✔
714
#ifdef TD_ENTERPRISE
715
  SMnode         *pMnode = pReq->info.node;
24✔
716
  SDbObj         *pDb = NULL;
24✔
717
  SStbObj        *pStb = NULL;
24✔
718
  SRsmaObj       *pSma = NULL;
24✔
719
  SUserObj       *pUser = NULL;
24✔
720
  int64_t         mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
24✔
721
  SMCreateRsmaReq createReq = {0};
24✔
722

723
  TAOS_CHECK_EXIT(tDeserializeSMCreateRsmaReq(pReq->pCont, pReq->contLen, &createReq));
24!
724

725
  mInfo("start to create rsma: %s", createReq.name);
24!
726
  TAOS_CHECK_EXIT(mndCheckCreateRsmaReq(&createReq));
24!
727

728
  if ((pSma = mndAcquireRsma(pMnode, createReq.name))) {
24✔
729
    if (createReq.igExists) {
5!
730
      mInfo("rsma:%s, already exist, ignore exist is set", createReq.name);
×
731
      code = 0;
×
732
      goto _exit;
×
733
    } else {
734
      TAOS_CHECK_EXIT(TSDB_CODE_RSMA_ALREADY_EXISTS);
5!
735
    }
736
  } else {
737
    if ((code = terrno) == TSDB_CODE_RSMA_NOT_EXIST) {
19!
738
      // continue
739
    } else {  // TSDB_CODE_MND_RSMA_IN_CREATING | TSDB_CODE_MND_RSMA_IN_DROPPING | TSDB_CODE_APP_ERROR
740
      goto _exit;
×
741
    }
742
  }
743

744
  SName name = {0};
19✔
745
  TAOS_CHECK_EXIT(tNameFromString(&name, createReq.tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
19!
746
  char db[TSDB_TABLE_FNAME_LEN] = {0};
19✔
747
  (void)tNameGetFullDbName(&name, db);
19✔
748

749
  pDb = mndAcquireDb(pMnode, db);
19✔
750
  if (pDb == NULL) {
19!
751
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
752
  }
753

754
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pDb));
19!
755
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb));
19!
756

757
  pStb = mndAcquireStb(pMnode, createReq.tbFName);
19✔
758
  if (pStb == NULL) {
19!
759
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
760
  }
761

762
  TAOS_CHECK_EXIT(mndCheckRsmaConflicts(pMnode, pDb, &createReq));
19✔
763

764
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
14!
765
  TAOS_CHECK_EXIT(mndCreateRsma(pMnode, pReq, pUser, pDb, pStb, &createReq));
14!
766

767
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
14!
768

769
  auditRecord(pReq, pMnode->clusterId, "createRsma", createReq.name, createReq.tbFName, "", 0);
14✔
770
_exit:
24✔
771
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
24!
772
    mError("rsma:%s, failed at line %d to create since %s", createReq.name, lino, tstrerror(code));
10!
773
  }
774
  if (pSma) mndReleaseRsma(pMnode, pSma);
24✔
775
  if (pStb) mndReleaseStb(pMnode, pStb);
24✔
776
  if (pDb) mndReleaseDb(pMnode, pDb);
24✔
777
  tFreeSMCreateRsmaReq(&createReq);
24✔
778
#endif
779
  TAOS_RETURN(code);
24✔
780
}
781

782
#ifdef TD_ENTERPRISE
783
static int32_t mndCheckAlterRsmaReq(SMAlterRsmaReq *pReq) {
7✔
784
  int32_t code = TSDB_CODE_MND_INVALID_RSMA_OPTION;
7✔
785
  if (pReq->name[0] == 0) goto _exit;
7!
786
  if (pReq->igNotExists < 0 || pReq->igNotExists > 1) goto _exit;
7!
787

788
  code = 0;
7✔
789
_exit:
7✔
790
  TAOS_RETURN(code);
7✔
791
}
792

793
static int32_t mndSetAlterRsmaPrepareActions(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
7✔
794
  SSdbRaw *pDbRaw = mndRsmaActionEncode(pSma);
7✔
795
  if (pDbRaw == NULL) return -1;
7!
796

797
  if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
7!
798
  if (sdbSetRawStatus(pDbRaw, SDB_STATUS_READY) != 0) return -1;
7!
799
  return 0;
7✔
800
}
801

802
static int32_t mndSetAlterRsmaCommitLogs(SMnode *pMnode, STrans *pTrans, SRsmaObj *pSma) {
7✔
803
  return mndSetCreateRsmaCommitLogs(pMnode, pTrans, pSma);
7✔
804
}
805

806
static void *mndBuildVAlterRsmaReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, SRsmaObj *pObj,
14✔
807
                                   SMAlterRsmaReq *pAlter, int32_t *pContLen) {
808
  int32_t        code = 0, lino = 0;
14✔
809
  SMsgHead      *pHead = NULL;
14✔
810
  SVAlterRsmaReq req = {0};
14✔
811
  req.alterType = pAlter->alterType;
14✔
812
  (void)snprintf(req.name, sizeof(req.name), "%s", pObj->name);
14✔
813
  (void)snprintf(req.tbName, sizeof(req.tbName), "%s", pObj->tbName);
14✔
814
  req.tbType = pObj->tbType;
14✔
815
  req.intervalUnit = pObj->intervalUnit;
14✔
816
  req.interval[0] = pObj->interval[0];
14✔
817
  req.interval[1] = pObj->interval[1];
14✔
818
  req.tbUid = pObj->tbUid;
14✔
819
  req.uid = pObj->uid;
14✔
820
  req.nFuncs = pObj->nFuncs;
14✔
821
  req.funcColIds = pObj->funcColIds;
14✔
822
  req.funcIds = pObj->funcIds;
14✔
823

824
  int32_t contLen = tSerializeSVAlterRsmaReq(NULL, 0, &req);
14✔
825
  TAOS_CHECK_EXIT(contLen);
14!
826
  contLen += sizeof(SMsgHead);
14✔
827
  TSDB_CHECK_NULL((pHead = taosMemoryMalloc(contLen)), code, lino, _exit, terrno);
14!
828
  pHead->contLen = htonl(contLen);
14✔
829
  pHead->vgId = htonl(pVgroup->vgId);
14✔
830
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
14✔
831
  TAOS_CHECK_EXIT(tSerializeSVAlterRsmaReq(pBuf, contLen, &req));
14!
832
_exit:
14✔
833
  if (code < 0) {
14!
834
    taosMemoryFreeClear(pHead);
×
835
    terrno = code;
×
836
    *pContLen = 0;
×
837
    return NULL;
×
838
  }
839
  *pContLen = contLen;
14✔
840
  return pHead;
14✔
841
}
842

843
static int32_t mndSetAlterRsmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SRsmaObj *pObj,
7✔
844
                                          SMAlterRsmaReq *pAlter) {
845
  int32_t code = 0;
7✔
846
  SSdb   *pSdb = pMnode->pSdb;
7✔
847
  SVgObj *pVgroup = NULL;
7✔
848
  void   *pIter = NULL;
7✔
849

850
  while ((pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup))) {
35✔
851
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
28✔
852
      sdbRelease(pSdb, pVgroup);
14✔
853
      continue;
14✔
854
    }
855

856
    int32_t contLen = 0;
14✔
857
    void   *pReq = mndBuildVAlterRsmaReq(pMnode, pVgroup, pStb, pObj, pAlter, &contLen);
14✔
858
    if (pReq == NULL) {
14!
859
      sdbCancelFetch(pSdb, pIter);
×
860
      sdbRelease(pSdb, pVgroup);
×
861
      code = terrno ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL;
×
862
      TAOS_RETURN(code);
×
863
    }
864

865
    STransAction action = {0};
14✔
866
    action.mTraceId = pTrans->mTraceId;
14✔
867
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
14✔
868
    action.pCont = pReq;
14✔
869
    action.contLen = contLen;
14✔
870
    action.msgType = TDMT_VND_ALTER_RSMA;
14✔
871
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
14!
872
      taosMemoryFree(pReq);
×
873
      sdbCancelFetch(pSdb, pIter);
×
874
      sdbRelease(pSdb, pVgroup);
×
875
      TAOS_RETURN(code);
×
876
    }
877
    sdbRelease(pSdb, pVgroup);
14✔
878
  }
879

880
  TAOS_RETURN(code);
7✔
881
}
882

883
static int32_t mndAlterRsma(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser, SDbObj *pDb, SStbObj *pStb,
7✔
884
                            SMAlterRsmaReq *pAlter, SRsmaObj *pOld) {
885
  int32_t  code = 0, lino = 0;
7✔
886
  STrans  *pTrans = NULL;
7✔
887
  SRsmaObj obj = *pOld;
7✔
888

889
  obj.updateTime = taosGetTimestampMs();
7✔
890
  ++obj.version;
7✔
891
  if (pAlter->alterType == TSDB_ALTER_RSMA_FUNCTION) {
7!
892
    obj.nFuncs = pOld->nFuncs + pAlter->nFuncs;
7✔
893
    obj.funcColIds = taosMemoryMalloc(obj.nFuncs * sizeof(col_id_t));
7!
894
    obj.funcIds = taosMemoryMalloc(obj.nFuncs * sizeof(func_id_t));
7!
895
    if (obj.funcColIds == NULL || obj.funcIds == NULL) {
7!
896
      TAOS_CHECK_EXIT(terrno);
×
897
    }
898
    int32_t n = 0, i = 0, j = 0;
7✔
899
    while (i < pOld->nFuncs && j < pAlter->nFuncs) {
38✔
900
      if (pOld->funcColIds[i] < pAlter->funcColIds[j]) {
31✔
901
        obj.funcColIds[n] = pOld->funcColIds[i];
26✔
902
        obj.funcIds[n++] = pOld->funcIds[i++];
26✔
903
      } else if (pOld->funcColIds[i] > pAlter->funcColIds[j]) {
5!
904
        obj.funcColIds[n] = pAlter->funcColIds[j];
5✔
905
        obj.funcIds[n++] = pAlter->funcIds[j++];
5✔
906
      } else {
907
        mError("rsma:%s, conflict function on column id:%d", pOld->name, pAlter->funcColIds[j]);
×
908
        TAOS_CHECK_EXIT(TSDB_CODE_MND_RSMA_FUNC_CONFLICT);
×
909
      }
910
    }
911
    if (i < pOld->nFuncs) {
7✔
912
      while (i < pOld->nFuncs) {
10✔
913
        obj.funcColIds[n] = pOld->funcColIds[i];
5✔
914
        obj.funcIds[n++] = pOld->funcIds[i++];
5✔
915
      }
916
    } else if (j < pAlter->nFuncs) {
2!
917
      while (j < pAlter->nFuncs) {
4✔
918
        obj.funcColIds[n] = pAlter->funcColIds[j];
2✔
919
        obj.funcIds[n++] = pAlter->funcIds[j++];
2✔
920
      }
921
    }
922
  } else {
923
    TAOS_CHECK_EXIT(TSDB_CODE_OPS_NOT_SUPPORT);
×
924
  }
925

926
  TSDB_CHECK_NULL((pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "alter-rsma")), code,
7!
927
                  lino, _exit, terrno);
928
  mInfo("trans:%d, used to alter rsma %s on tb %s.%s", pTrans->id, obj.name, obj.dbFName, obj.tbName);
7!
929

930
  mndTransSetDbName(pTrans, obj.dbFName, obj.name);
7✔
931
  mndTransSetKillMode(pTrans, TRN_KILL_MODE_SKIP);
7✔
932
  TAOS_CHECK_EXIT(mndTransCheckConflict(pMnode, pTrans));
7!
933

934
  mndTransSetOper(pTrans, MND_OPER_ALTER_RSMA);
7✔
935
  TAOS_CHECK_EXIT(mndSetAlterRsmaPrepareActions(pMnode, pTrans, &obj));
7!
936
  TAOS_CHECK_EXIT(mndSetAlterRsmaCommitLogs(pMnode, pTrans, &obj));
7!
937
  TAOS_CHECK_EXIT(mndSetAlterRsmaRedoActions(pMnode, pTrans, pDb, pStb, &obj, pAlter));
7!
938

939
  TAOS_CHECK_EXIT(mndTransPrepare(pMnode, pTrans));
7!
940
_exit:
7✔
941
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
7!
942
    mError("rsma:%s, failed at line %d to alter rsma, since %s", obj.name, lino, tstrerror(code));
×
943
  }
944
  mndTransDrop(pTrans);
7✔
945
  mndRsmaFreeObj(&obj);
7✔
946
  TAOS_RETURN(code);
7✔
947
}
948
#endif
949
static int32_t mndProcessAlterRsmaReq(SRpcMsg *pReq) {
7✔
950
  int32_t code = 0, lino = 0;
7✔
951
#ifdef TD_ENTERPRISE
952
  SMnode        *pMnode = pReq->info.node;
7✔
953
  SDbObj        *pDb = NULL;
7✔
954
  SStbObj       *pStb = NULL;
7✔
955
  SRsmaObj      *pObj = NULL;
7✔
956
  SUserObj      *pUser = NULL;
7✔
957
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
7✔
958
  SMAlterRsmaReq req = {0};
7✔
959
  char           tbFName[TSDB_TABLE_FNAME_LEN] = "\0";
7✔
960

961
  TAOS_CHECK_EXIT(tDeserializeSMAlterRsmaReq(pReq->pCont, pReq->contLen, &req));
7!
962

963
  mInfo("start to alter rsma: %s", req.name);
7!
964
  TAOS_CHECK_EXIT(mndCheckAlterRsmaReq(&req));
7!
965

966
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
7!
967
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
968
    if (terrno != 0) code = terrno;
×
969
    if (req.igNotExists) {
×
970
      code = 0;
×
971
    }
972
    goto _exit;
×
973
  }
974

975
  if (!(pDb = mndAcquireDb(pMnode, pObj->dbFName))) {
7!
976
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_SELECTED);
×
977
  }
978

979
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pDb));
7!
980
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb));
7!
981

982
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
7✔
983

984
  pStb = mndAcquireStb(pMnode, tbFName);
7✔
985
  if (pStb == NULL) {
7!
986
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
987
  }
988

989
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
7!
990
  TAOS_CHECK_EXIT(mndAlterRsma(pMnode, pReq, pUser, pDb, pStb, &req, pObj));
7!
991

992
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
7!
993

994
  char alterType[32] = "\0";
7✔
995
  (void)snprintf(alterType, sizeof(alterType), "alterType:%" PRIi8, req.alterType);
7✔
996
  auditRecord(pReq, pMnode->clusterId, "alterRsma", req.name, tbFName, alterType, 0);
7✔
997
_exit:
7✔
998
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
7!
999
    mError("rsma:%s, failed at line %d to alter since %s", req.name, lino, tstrerror(code));
×
1000
  }
1001
  if (pObj) mndReleaseRsma(pMnode, pObj);
7!
1002
  if (pStb) mndReleaseStb(pMnode, pStb);
7!
1003
  if (pDb) mndReleaseDb(pMnode, pDb);
7!
1004
  tFreeSMAlterRsmaReq(&req);
7✔
1005
#endif
1006
  TAOS_RETURN(code);
7✔
1007
}
1008
#ifdef TD_ENTERPRISE
1009
static int32_t mndFillRsmaInfo(SRsmaObj *pObj, SStbObj *pStb, SRsmaInfoRsp *pRsp, bool withColName) {
22✔
1010
  int32_t code = 0, lino = 0;
22✔
1011
  pRsp->id = pObj->uid;
22✔
1012
  (void)snprintf(pRsp->name, sizeof(pRsp->name), "%s", pObj->name);
22✔
1013
  (void)snprintf(pRsp->tbFName, sizeof(pRsp->tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
22✔
1014
  pRsp->version = pObj->version;
22✔
1015
  pRsp->tbType = pObj->tbType;
22✔
1016
  pRsp->intervalUnit = pObj->intervalUnit;
22✔
1017
  pRsp->nFuncs = pObj->nFuncs;
22✔
1018
  pRsp->interval[0] = pObj->interval[0];
22✔
1019
  pRsp->interval[1] = pObj->interval[1];
22✔
1020
  if (pRsp->nFuncs > 0) {
22!
1021
    pRsp->funcColIds = pObj->funcColIds;  // shallow copy, no need to free
22✔
1022
    pRsp->funcIds = pObj->funcIds;        // shallow copy, no need to free
22✔
1023
    if (withColName) {
22!
1024
      pRsp->colNames = taosArrayInit(pRsp->nFuncs, sizeof(char *));
22✔
1025
      if (pRsp->colNames == NULL) {
22!
1026
        TAOS_CHECK_EXIT(terrno);
×
1027
      }
1028
      pRsp->nColNames = pRsp->nFuncs;
22✔
1029
      int16_t i = 0, j = 0;
22✔
1030
      for (; i < pRsp->nFuncs; ++i) {
131✔
1031
        bool found = false;
109✔
1032
        for (; j < pStb->numOfColumns;) {
234!
1033
          if (pStb->pColumns[j].colId == pRsp->funcColIds[i]) {
234✔
1034
            found = true;
109✔
1035
            break;
109✔
1036
          } else if (pStb->pColumns[j].colId < pRsp->funcColIds[i]) {
125!
1037
            ++j;
125✔
1038
          } else {
1039
            break;
×
1040
          }
1041
        }
1042
        if (found) {
109!
1043
          SSchema *pCol = pStb->pColumns + j;
109✔
1044
          char    *colName = taosStrdup(pCol->name);
109!
1045
          if (colName == NULL) {
109!
1046
            TAOS_CHECK_EXIT(terrno);
×
1047
          }
1048
          if (!taosArrayPush(pRsp->colNames, &colName)) {
218!
1049
            taosMemoryFree(colName);
×
1050
            TAOS_CHECK_EXIT(terrno);
×
1051
          }
1052
        } else {
1053
          TAOS_CHECK_EXIT(TSDB_CODE_MND_COLUMN_NOT_EXIST);
×
1054
        }
1055
      }
1056
    }
1057
  }
1058
_exit:
22✔
1059
  if (code != 0) {
22!
1060
    mError("rsma:%s, failed at line %d to get rsma info since %s", pObj->name, lino, tstrerror(code));
×
1061
  }
1062
  TAOS_RETURN(code);
22✔
1063
}
1064
#endif
1065
static int32_t mndProcessGetRsmaReq(SRpcMsg *pReq) {
29✔
1066
#ifdef TD_ENTERPRISE
1067
  int32_t      code = 0, lino = 0;
29✔
1068
  SMnode      *pMnode = pReq->info.node;
29✔
1069
  SRsmaInfoReq req = {0};
29✔
1070
  SRsmaInfoRsp rsp = {0};
29✔
1071
  SRsmaObj    *pObj = NULL;
29✔
1072
  SStbObj     *pStb = NULL;
29✔
1073
  void        *pRsp = NULL;
29✔
1074
  int32_t      contLen = 0;
29✔
1075

1076
  TAOS_CHECK_EXIT(tDeserializeRsmaInfoReq(pReq->pCont, pReq->contLen, &req));
29!
1077

1078
  if (!(pObj = mndAcquireRsma(pMnode, req.name))) {
29✔
1079
    TAOS_CHECK_EXIT(terrno);
7!
1080
  }
1081

1082
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
22✔
1083
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
22✔
1084

1085
  if ((pStb = mndAcquireStb(pMnode, tbFName)) == NULL) {
22!
1086
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STB_NOT_EXIST);
×
1087
  }
1088

1089
  TAOS_CHECK_EXIT(mndFillRsmaInfo(pObj, pStb, &rsp, req.withColName));
22!
1090

1091
  if ((contLen = tSerializeRsmaInfoRsp(NULL, 0, &rsp)) < 0) {
22!
1092
    TAOS_CHECK_EXIT(contLen);
×
1093
  }
1094
  if (!(pRsp = rpcMallocCont(contLen))) {
22!
1095
    TAOS_CHECK_EXIT(terrno);
×
1096
  }
1097
  if ((contLen = tSerializeRsmaInfoRsp(pRsp, contLen, &rsp)) < 0) {
22!
1098
    TAOS_CHECK_EXIT(contLen);
×
1099
  }
1100

1101
  pReq->info.rsp = pRsp;
22✔
1102
  pReq->info.rspLen = contLen;
22✔
1103

1104
_exit:
29✔
1105
  if (code != 0) {
29✔
1106
    rpcFreeCont(pRsp);
7✔
1107
  }
1108
  if (pObj) mndReleaseRsma(pMnode, pObj);
29✔
1109
  if (pStb) mndReleaseStb(pMnode, pStb);
29✔
1110
  tFreeRsmaInfoRsp(&rsp, false);
29✔
1111
  TAOS_RETURN(code);
29✔
1112
#else
1113
  return TSDB_CODE_OPS_NOT_SUPPORT;
1114
#endif
1115
}
1116
#ifdef TD_ENTERPRISE
1117
static void mndRetrieveRsmaFuncList(SMnode *pMnode, SRsmaObj *pObj, char *buf, int32_t bufLen) {
31✔
1118
  SSdb    *pSdb = pMnode->pSdb;
31✔
1119
  int32_t  numOfRows = 0;
31✔
1120
  SStbObj *pStb = NULL;
31✔
1121
  char    *qBuf = POINTER_SHIFT(buf, VARSTR_HEADER_SIZE);
31✔
1122
  int32_t  qBufLen = bufLen - VARSTR_HEADER_SIZE;
31✔
1123

1124
  qBuf[0] = 0;
31✔
1125
  varDataSetLen(buf, 0);  // initialize to empty string
31✔
1126

1127
  if (pObj->nFuncs <= 0) return;
31!
1128

1129
  char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
31✔
1130
  (void)snprintf(tbFName, sizeof(tbFName), "%s.%s", pObj->dbFName, pObj->tbName);
31✔
1131
  pStb = mndAcquireStb(pMnode, tbFName);
31✔
1132
  if (pStb == NULL) {
31!
1133
    mWarn("rsma:%s, failed to acquire table %s for function list", pObj->name, tbFName);
×
1134
    return;
×
1135
  }
1136

1137
  SSchema *pColumns = pStb->pColumns;
31✔
1138

1139
  int32_t len = 0, j = 0;
31✔
1140
  char    colFunc[TSDB_COL_NAME_LEN + TSDB_FUNC_NAME_LEN + 2] = {0};
31✔
1141
  for (int32_t i = 0; i < pObj->nFuncs; ++i) {
223✔
1142
    col_id_t colId = pObj->funcColIds[i];
192✔
1143
    for (; j < pStb->numOfColumns;) {
398!
1144
      if (pColumns[j].colId == colId) {
398✔
1145
        int32_t colFuncLen =
192✔
1146
            tsnprintf(colFunc, sizeof(colFunc), "%s(%s),", fmGetFuncName(pObj->funcIds[i]), pColumns[j].name);
192✔
1147
        if ((qBufLen - len) > colFuncLen) {
192!
1148
          len += tsnprintf(qBuf + len, colFuncLen + 1, "%s", colFunc);
192✔
1149
        } else {
1150
          goto _exit;
×
1151
        }
1152
        break;
192✔
1153
      } else if (pColumns[j].colId > colId) {
206!
1154
        break;
×
1155
      } else {
1156
        ++j;
206✔
1157
      }
1158
    }
1159
  }
1160
_exit:
31✔
1161
  qBuf[len > 0 ? len - 1 : 0] = 0;  // remove the last ','
31!
1162
  varDataSetLen(buf, len > 0 ? len - 1 : 0);
31!
1163
  mndReleaseStb(pMnode, pStb);
31✔
1164
}
1165
#endif
1166
static int32_t mndRetrieveRsma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
20✔
1167
  SMnode          *pMnode = pReq->info.node;
20✔
1168
  int32_t          code = 0, lino = 0;
20✔
1169
  int32_t          numOfRows = 0;
20✔
1170
  int32_t          cols = 0;
20✔
1171
  char             tmp[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE];
1172
  int32_t          tmpLen = 0;
20✔
1173
  int32_t          bufLen = 0;
20✔
1174
  char            *pBuf = NULL;
20✔
1175
  char            *qBuf = NULL;
20✔
1176
  void            *pIter = NULL;
20✔
1177
  SSdb            *pSdb = pMnode->pSdb;
20✔
1178
  SColumnInfoData *pColInfo = NULL;
20✔
1179
#ifdef TD_ENTERPRISE
1180
  pBuf = tmp;
20✔
1181
  bufLen = sizeof(tmp) - VARSTR_HEADER_SIZE;
20✔
1182
  if (pShow->numOfRows < 1) {
20!
1183
    SRsmaObj *pObj = NULL;
20✔
1184
    int32_t   index = 0;
20✔
1185
    while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
51✔
1186
      cols = 0;
31✔
1187
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
31✔
1188
      qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
31✔
1189
      TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->name));
31✔
1190
      varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
31✔
1191
      COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
31!
1192

1193
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
31!
1194
        COL_DATA_SET_VAL_GOTO((const char *)(&pObj->uid), false, pObj, pIter, _exit);
31!
1195
      }
1196

1197
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
31!
1198
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
31✔
1199
        const char *db = strchr(pObj->dbFName, '.');
31✔
1200
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", db ? db + 1 : pObj->dbFName));
31!
1201
        varDataSetLen(pBuf, strlen(qBuf));
31✔
1202
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
31!
1203
      }
1204

1205
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
31!
1206
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
31✔
1207
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->tbName));
31✔
1208
        varDataSetLen(pBuf, strlen(qBuf));
31✔
1209
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
31!
1210
      }
1211

1212
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
31!
1213
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
31✔
1214
        if (pObj->tbType == TSDB_SUPER_TABLE) {
31!
1215
          TAOS_UNUSED(snprintf(qBuf, bufLen, "SUPER_TABLE"));
31✔
1216
        } else if (pObj->tbType == TSDB_NORMAL_TABLE) {
×
1217
          TAOS_UNUSED(snprintf(qBuf, bufLen, "NORMAL_TABLE"));
×
1218
        } else if (pObj->tbType == TSDB_CHILD_TABLE) {
×
1219
          TAOS_UNUSED(snprintf(qBuf, bufLen, "CHILD_TABLE"));
×
1220
        } else {
1221
          TAOS_UNUSED(snprintf(qBuf, bufLen, "UNKNOWN"));
×
1222
        }
1223
        varDataSetLen(pBuf, strlen(qBuf));
31✔
1224
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
31!
1225
      }
1226

1227
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
31!
1228
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->createdTime, false, pObj, pIter, _exit);
31!
1229
      }
1230

1231
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
31!
1232
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
31✔
1233
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%" PRIi64 "%c", pObj->interval[0], pObj->intervalUnit));
31✔
1234
        if (pObj->interval[1] > 0) {
31!
1235
          tmpLen = strlen(qBuf);
31✔
1236
          TAOS_UNUSED(
31✔
1237
              snprintf(qBuf + tmpLen, bufLen - tmpLen, ",%" PRIi64 "%c", pObj->interval[1], pObj->intervalUnit));
1238
        }
1239
        varDataSetLen(pBuf, strlen(qBuf));
31✔
1240
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
31!
1241
      }
1242

1243
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
31!
1244
        mndRetrieveRsmaFuncList(pMnode, pObj, pBuf, bufLen);
31✔
1245
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
31!
1246
      }
1247

1248
      sdbRelease(pSdb, pObj);
31✔
1249
      ++numOfRows;
31✔
1250
    }
1251
  }
1252

1253
  pShow->numOfRows += numOfRows;
20✔
1254

1255
_exit:
20✔
1256
  if (code < 0) {
20!
1257
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1258
    TAOS_RETURN(code);
×
1259
  }
1260
#endif
1261
  return numOfRows;
20✔
1262
}
1263

1264
static void mndCancelRetrieveRsma(SMnode *pMnode, void *pIter) {
×
1265
  SSdb *pSdb = pMnode->pSdb;
×
1266
  sdbCancelFetchByType(pSdb, pIter, SDB_RSMA);
×
1267
}
×
1268

1269
int32_t mndDropRsmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,446✔
1270
  int32_t code = 0;
1,446✔
1271
#ifdef TD_ENTERPRISE
1272
  SSdb     *pSdb = pMnode->pSdb;
1,446✔
1273
  SRsmaObj *pObj = NULL;
1,446✔
1274
  void     *pIter = NULL;
1,446✔
1275

1276
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
1,454✔
1277
    if (pObj->dbUid == pDb->uid) {
8!
1278
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
8!
1279
        sdbCancelFetch(pSdb, pIter);
×
1280
        sdbRelease(pSdb, pObj);
×
1281
        TAOS_RETURN(code);
×
1282
      }
1283
    }
1284
    sdbRelease(pSdb, pObj);
8✔
1285
  }
1286
#endif
1287
  TAOS_RETURN(code);
1,446✔
1288
}
1289

1290
int32_t mndDropRsmaByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
941✔
1291
  int32_t code = 0;
941✔
1292
#ifdef TD_ENTERPRISE
1293
  SSdb     *pSdb = pMnode->pSdb;
941✔
1294
  SRsmaObj *pObj = NULL;
941✔
1295
  void     *pIter = NULL;
941✔
1296

1297
  while ((pIter = sdbFetch(pSdb, SDB_RSMA, pIter, (void **)&pObj))) {
942✔
1298
    if (pObj->tbUid == pStb->uid && pObj->dbUid == pStb->dbUid) {
1!
1299
      if ((code = mndSetDropRsmaCommitLogs(pMnode, pTrans, pObj)) != 0) {
1!
1300
        sdbCancelFetch(pSdb, pIter);
×
1301
        sdbRelease(pSdb, pObj);
×
1302
        TAOS_RETURN(code);
×
1303
      }
1304
    }
1305
    sdbRelease(pSdb, pObj);
1✔
1306
  }
1307
#endif
1308
  TAOS_RETURN(code);
941✔
1309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc