• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.41
/source/dnode/mnode/impl/src/mndSma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSma.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndIndex.h"
22
#include "mndIndexComm.h"
23
#include "mndInfoSchema.h"
24
#include "mndMnode.h"
25
#include "mndPrivilege.h"
26
#include "mndScheduler.h"
27
#include "mndShow.h"
28
#include "mndStb.h"
29
#include "mndStream.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "parser.h"
34
#include "tname.h"
35

36
#define TSDB_SMA_VER_NUMBER   1
37
#define TSDB_SMA_RESERVE_SIZE 64
38

39
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma);
40
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
41
static int32_t  mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
42
static int32_t  mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
43
static int32_t  mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
44
static int32_t  mndProcessCreateSmaReq(SRpcMsg *pReq);
45
static int32_t  mndProcessDropSmaReq(SRpcMsg *pReq);
46
static int32_t  mndProcessGetSmaReq(SRpcMsg *pReq);
47
static int32_t  mndProcessGetTbSmaReq(SRpcMsg *pReq);
48
static int32_t  mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void     mndDestroySmaObj(SSmaObj *pSmaObj);
50

51
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq);
52
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq);
53

54
// sma and tag index comm func
55
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
56
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
57
static void    mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
58

59
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
60
static void    mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter);
61
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq);
62

63
typedef struct SCreateTSMACxt {
64
  SMnode        *pMnode;
65
  const SRpcMsg *pRpcReq;
66
  union {
67
    const SMCreateSmaReq *pCreateSmaReq;
68
    const SMDropSmaReq   *pDropSmaReq;
69
  };
70
  SDbObj             *pDb;
71
  SStbObj            *pSrcStb;
72
  SSmaObj            *pSma;
73
  const SSmaObj      *pBaseSma;
74
  SCMCreateStreamReq *pCreateStreamReq;
75
  SMDropStreamReq    *pDropStreamReq;
76
  const char         *streamName;
77
  const char         *targetStbFullName;
78
  SNodeList          *pProjects;
79
} SCreateTSMACxt;
80

81
int32_t mndInitSma(SMnode *pMnode) {
1,748✔
82
  SSdbTable table = {
1,748✔
83
      .sdbType = SDB_SMA,
84
      .keyType = SDB_KEY_BINARY,
85
      .encodeFp = (SdbEncodeFp)mndSmaActionEncode,
86
      .decodeFp = (SdbDecodeFp)mndSmaActionDecode,
87
      .insertFp = (SdbInsertFp)mndSmaActionInsert,
88
      .updateFp = (SdbUpdateFp)mndSmaActionUpdate,
89
      .deleteFp = (SdbDeleteFp)mndSmaActionDelete,
90
  };
91

92
//  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq);
93
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
1,748✔
94
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
1,748✔
95
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
1,748✔
96
  mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
1,748✔
97
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
1,748✔
98

99
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
1,748✔
100
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
1,748✔
101

102
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq);
1,748✔
103
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq);
1,748✔
104
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_TSMA, mndProcessGetTbTSMAReq);
1,748✔
105
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TSMA, mndProcessGetTbTSMAReq);
1,748✔
106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA);
1,748✔
107
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA);
1,748✔
108

109
  return sdbSetTable(pMnode->pSdb, table);
1,748✔
110
}
111

112
void mndCleanupSma(SMnode *pMnode) {}
1,747✔
113

114
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
1,295✔
115
  int32_t code = 0;
1,295✔
116
  int32_t lino = 0;
1,295✔
117
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,295✔
118

119
  int32_t size =
1,295✔
120
      sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
1,295✔
121
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size);
1,295✔
122
  if (pRaw == NULL) goto _OVER;
1,295!
123

124
  int32_t dataPos = 0;
1,295✔
125
  SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
1,295!
126
  SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
1,295!
127
  SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
1,295!
128
  SDB_SET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
1,295!
129
  SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER)
1,295!
130
  SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER)
1,295!
131
  SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER)
1,295!
132
  SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER)
1,295!
133
  SDB_SET_INT64(pRaw, dataPos, pSma->dstTbUid, _OVER)
1,295!
134
  SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER)
1,295!
135
  SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER)
1,295!
136
  SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER)
1,295!
137
  SDB_SET_INT32(pRaw, dataPos, pSma->dstVgId, _OVER)
1,295!
138
  SDB_SET_INT64(pRaw, dataPos, pSma->interval, _OVER)
1,295!
139
  SDB_SET_INT64(pRaw, dataPos, pSma->offset, _OVER)
1,295!
140
  SDB_SET_INT64(pRaw, dataPos, pSma->sliding, _OVER)
1,295!
141
  SDB_SET_INT32(pRaw, dataPos, pSma->exprLen, _OVER)
1,295!
142
  SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER)
1,295!
143
  SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER)
1,295!
144
  SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER)
1,295!
145
  SDB_SET_INT32(pRaw, dataPos, pSma->version, _OVER)
1,295!
146

147
  if (pSma->exprLen > 0) {
1,295!
148
    SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
1,295!
149
  }
150
  if (pSma->tagsFilterLen > 0) {
1,295!
151
    SDB_SET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
152
  }
153
  if (pSma->sqlLen > 0) {
1,295!
154
    SDB_SET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
1,295!
155
  }
156
  if (pSma->astLen > 0) {
1,295!
157
    SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
1,295!
158
  }
159
  SDB_SET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
1,295!
160

161
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
1,295!
162
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
1,295!
163

164
  terrno = 0;
1,295✔
165

166
_OVER:
1,295✔
167
  if (terrno != 0) {
1,295!
168
    mError("sma:%s, failed to encode to raw:%p since %s", pSma->name, pRaw, terrstr());
×
169
    sdbFreeRaw(pRaw);
×
170
    return NULL;
×
171
  }
172

173
  mTrace("sma:%s, encode to raw:%p, row:%p", pSma->name, pRaw, pSma);
1,295✔
174
  return pRaw;
1,295✔
175
}
176

177
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
995✔
178
  int32_t code = 0;
995✔
179
  int32_t lino = 0;
995✔
180
  terrno = TSDB_CODE_OUT_OF_MEMORY;
995✔
181
  SSdbRow *pRow = NULL;
995✔
182
  SSmaObj *pSma = NULL;
995✔
183

184
  int8_t sver = 0;
995✔
185
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
995!
186

187
  if (sver != TSDB_SMA_VER_NUMBER) {
995!
188
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
189
    goto _OVER;
×
190
  }
191

192
  pRow = sdbAllocRow(sizeof(SSmaObj));
995✔
193
  if (pRow == NULL) goto _OVER;
995!
194

195
  pSma = sdbGetRowObj(pRow);
995✔
196
  if (pSma == NULL) goto _OVER;
995!
197

198
  int32_t dataPos = 0;
995✔
199

200
  SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
995!
201
  SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
995!
202
  SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
995!
203
  SDB_GET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
995!
204
  SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER)
995!
205
  SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER)
995!
206
  SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER)
995!
207
  SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER)
995!
208
  SDB_GET_INT64(pRaw, dataPos, &pSma->dstTbUid, _OVER)
995!
209
  SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER)
995!
210
  SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER)
995!
211
  SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER)
995!
212
  SDB_GET_INT32(pRaw, dataPos, &pSma->dstVgId, _OVER)
995!
213
  SDB_GET_INT64(pRaw, dataPos, &pSma->interval, _OVER)
995!
214
  SDB_GET_INT64(pRaw, dataPos, &pSma->offset, _OVER)
995!
215
  SDB_GET_INT64(pRaw, dataPos, &pSma->sliding, _OVER)
995!
216
  SDB_GET_INT32(pRaw, dataPos, &pSma->exprLen, _OVER)
995!
217
  SDB_GET_INT32(pRaw, dataPos, &pSma->tagsFilterLen, _OVER)
995!
218
  SDB_GET_INT32(pRaw, dataPos, &pSma->sqlLen, _OVER)
995!
219
  SDB_GET_INT32(pRaw, dataPos, &pSma->astLen, _OVER)
995!
220
  SDB_GET_INT32(pRaw, dataPos, &pSma->version, _OVER)
995!
221

222
  if (pSma->exprLen > 0) {
995!
223
    pSma->expr = taosMemoryCalloc(pSma->exprLen, 1);
995!
224
    if (pSma->expr == NULL) goto _OVER;
995!
225
    SDB_GET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
995!
226
  }
227

228
  if (pSma->tagsFilterLen > 0) {
995!
229
    pSma->tagsFilter = taosMemoryCalloc(pSma->tagsFilterLen, 1);
×
230
    if (pSma->tagsFilter == NULL) goto _OVER;
×
231
    SDB_GET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
232
  }
233

234
  if (pSma->sqlLen > 0) {
995!
235
    pSma->sql = taosMemoryCalloc(pSma->sqlLen, 1);
995!
236
    if (pSma->sql == NULL) goto _OVER;
995!
237
    SDB_GET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
995!
238
  }
239

240
  if (pSma->astLen > 0) {
995!
241
    pSma->ast = taosMemoryCalloc(pSma->astLen, 1);
995!
242
    if (pSma->ast == NULL) goto _OVER;
995!
243
    SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
995!
244
  }
245
  SDB_GET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
995!
246

247
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
995!
248

249
  terrno = 0;
995✔
250

251
_OVER:
995✔
252
  if (terrno != 0) {
995!
253
    if (pSma != NULL) {
×
254
      mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr());
×
255
      taosMemoryFreeClear(pSma->expr);
×
256
      taosMemoryFreeClear(pSma->tagsFilter);
×
257
      taosMemoryFreeClear(pSma->sql);
×
258
      taosMemoryFreeClear(pSma->ast);
×
259
    }
260
    taosMemoryFreeClear(pRow);
×
261
    return NULL;
×
262
  }
263

264
  mTrace("sma:%s, decode from raw:%p, row:%p", pSma->name, pRaw, pSma);
995✔
265
  return pRow;
995✔
266
}
267

268
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma) {
267✔
269
  mTrace("sma:%s, perform insert action, row:%p", pSma->name, pSma);
267✔
270
  return 0;
267✔
271
}
272

273
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSma) {
995✔
274
  mTrace("sma:%s, perform delete action, row:%p", pSma->name, pSma);
995✔
275
  taosMemoryFreeClear(pSma->tagsFilter);
995!
276
  taosMemoryFreeClear(pSma->expr);
995!
277
  taosMemoryFreeClear(pSma->sql);
995!
278
  taosMemoryFreeClear(pSma->ast);
995!
279
  return 0;
995✔
280
}
281

282
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew) {
485✔
283
  mTrace("sma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
485✔
284
  return 0;
485✔
285
}
286

287
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName) {
1,159✔
288
  SSdb    *pSdb = pMnode->pSdb;
1,159✔
289
  SSmaObj *pSma = sdbAcquire(pSdb, SDB_SMA, smaName);
1,159✔
290
  if (pSma == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
1,159!
UNCOV
291
    terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
×
292
  }
293
  return pSma;
1,159✔
294
}
295

296
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) {
2,597✔
297
  SSdb *pSdb = pMnode->pSdb;
2,597✔
298
  sdbRelease(pSdb, pSma);
2,597✔
299
}
2,597✔
300

301
SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *db) { return mndAcquireDb(pMnode, db); }
×
302

UNCOV
303
static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
×
UNCOV
304
  SEncoder encoder = {0};
×
UNCOV
305
  int32_t  contLen = 0;
×
UNCOV
306
  SName    name = {0};
×
UNCOV
307
  int32_t  code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
308
  if (TSDB_CODE_SUCCESS != code) {
×
309
    return NULL;
×
310
  }
311

UNCOV
312
  SVCreateTSmaReq req = {0};
×
UNCOV
313
  req.version = 0;
×
UNCOV
314
  req.intervalUnit = pSma->intervalUnit;
×
UNCOV
315
  req.slidingUnit = pSma->slidingUnit;
×
316
  //  req.timezoneInt = pSma->timezone;
UNCOV
317
  tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
×
UNCOV
318
  req.exprLen = pSma->exprLen;
×
UNCOV
319
  req.tagsFilterLen = pSma->tagsFilterLen;
×
UNCOV
320
  req.indexUid = pSma->uid;
×
UNCOV
321
  req.tableUid = pSma->stbUid;
×
UNCOV
322
  req.dstVgId = pSma->dstVgId;
×
UNCOV
323
  req.dstTbUid = pSma->dstTbUid;
×
UNCOV
324
  req.interval = pSma->interval;
×
UNCOV
325
  req.offset = pSma->offset;
×
UNCOV
326
  req.sliding = pSma->sliding;
×
UNCOV
327
  req.expr = pSma->expr;
×
UNCOV
328
  req.tagsFilter = pSma->tagsFilter;
×
UNCOV
329
  req.schemaRow = pSma->schemaRow;
×
UNCOV
330
  req.schemaTag = pSma->schemaTag;
×
UNCOV
331
  req.dstTbName = pSma->dstTbName;
×
332

333
  // get length
UNCOV
334
  int32_t ret = 0;
×
UNCOV
335
  tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret);
×
UNCOV
336
  if (ret < 0) {
×
337
    return NULL;
×
338
  }
UNCOV
339
  contLen += sizeof(SMsgHead);
×
340

UNCOV
341
  SMsgHead *pHead = taosMemoryMalloc(contLen);
×
UNCOV
342
  if (pHead == NULL) {
×
343
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
344
    return NULL;
×
345
  }
346

UNCOV
347
  pHead->contLen = htonl(contLen);
×
UNCOV
348
  pHead->vgId = htonl(pVgroup->vgId);
×
349

UNCOV
350
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
×
UNCOV
351
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
×
UNCOV
352
  if (tEncodeSVCreateTSmaReq(&encoder, &req) < 0) {
×
353
    taosMemoryFreeClear(pHead);
×
354
    tEncoderClear(&encoder);
×
355
    return NULL;
×
356
  }
357

UNCOV
358
  tEncoderClear(&encoder);
×
359

UNCOV
360
  *pContLen = contLen;
×
UNCOV
361
  return pHead;
×
362
}
363

364
static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
×
365
  SEncoder encoder = {0};
×
366
  int32_t  contLen;
367
  SName    name = {0};
×
368
  int32_t  code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
369
  if (TSDB_CODE_SUCCESS != code) {
×
370
    terrno = code;
×
371
    return NULL;
×
372
  }
373

374
  SVDropTSmaReq req = {0};
×
375
  req.indexUid = pSma->uid;
×
376
  tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
×
377

378
  // get length
379
  int32_t ret = 0;
×
380
  tEncodeSize(tEncodeSVDropTSmaReq, &req, contLen, ret);
×
381
  if (ret < 0) {
×
382
    return NULL;
×
383
  }
384

385
  contLen += sizeof(SMsgHead);
×
386

387
  SMsgHead *pHead = taosMemoryMalloc(contLen);
×
388
  if (pHead == NULL) {
×
389
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
390
    return NULL;
×
391
  }
392

393
  pHead->contLen = htonl(contLen);
×
394
  pHead->vgId = htonl(pVgroup->vgId);
×
395

396
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
×
397
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
×
398

399
  if (tEncodeSVDropTSmaReq(&encoder, &req) < 0) {
×
400
    taosMemoryFreeClear(pHead);
×
401
    tEncoderClear(&encoder);
×
402
    return NULL;
×
403
  }
404
  tEncoderClear(&encoder);
×
405

406
  *pContLen = contLen;
×
407
  return pHead;
×
408
}
409

410
static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
263✔
411
  int32_t  code = 0;
263✔
412
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
263✔
413
  if (pRedoRaw == NULL) {
263!
414
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
415
    if (terrno != 0) code = terrno;
×
416
    TAOS_RETURN(code);
×
417
  }
418
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
263!
419
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
263!
420

421
  TAOS_RETURN(code);
263✔
422
}
423

424
static int32_t mndSetCreateSmaUndoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
263✔
425
  int32_t  code = 0;
263✔
426
  SSdbRaw *pUndoRaw = mndSmaActionEncode(pSma);
263✔
427
  if (!pUndoRaw) {
263!
428
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
429
    if (terrno != 0) code = terrno;
×
430
    TAOS_RETURN(code);
×
431
  }
432
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
263!
433
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
263!
434
  TAOS_RETURN(code);
263✔
435
}
436

437
static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
263✔
438
  int32_t  code = 0;
263✔
439
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
263✔
440
  if (pCommitRaw == NULL) {
263!
441
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
442
    if (terrno != 0) code = terrno;
×
443
    TAOS_RETURN(code);
×
444
  }
445
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
263!
446
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
263!
447

448
  TAOS_RETURN(code);
263✔
449
}
450

UNCOV
451
static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
UNCOV
452
  int32_t  code = 0;
×
UNCOV
453
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
UNCOV
454
  if (pVgRaw == NULL) {
×
455
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
456
    if (terrno != 0) code = terrno;
×
457
    TAOS_RETURN(code);
×
458
  }
UNCOV
459
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
×
UNCOV
460
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_UPDATE));
×
UNCOV
461
  TAOS_RETURN(code);
×
462
}
463

UNCOV
464
static int32_t mndSetCreateSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
UNCOV
465
  int32_t  code = 0;
×
UNCOV
466
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
UNCOV
467
  if (pVgRaw == NULL) {
×
468
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
469
    if (terrno != 0) code = terrno;
×
470
    TAOS_RETURN(code);
×
471
  }
UNCOV
472
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
×
UNCOV
473
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_READY));
×
UNCOV
474
  TAOS_RETURN(code);
×
475
}
476

UNCOV
477
static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
×
UNCOV
478
  int32_t code = 0;
×
UNCOV
479
  SStbObj stbObj = {0};
×
UNCOV
480
  taosRLockLatch(&pStb->lock);
×
UNCOV
481
  memcpy(&stbObj, pStb, sizeof(SStbObj));
×
UNCOV
482
  taosRUnLockLatch(&pStb->lock);
×
UNCOV
483
  stbObj.numOfColumns = 0;
×
UNCOV
484
  stbObj.pColumns = NULL;
×
UNCOV
485
  stbObj.numOfTags = 0;
×
UNCOV
486
  stbObj.pTags = NULL;
×
UNCOV
487
  stbObj.numOfFuncs = 0;
×
UNCOV
488
  stbObj.pFuncs = NULL;
×
UNCOV
489
  stbObj.updateTime = taosGetTimestampMs();
×
UNCOV
490
  stbObj.lock = 0;
×
UNCOV
491
  stbObj.smaVer++;
×
492

UNCOV
493
  SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj);
×
UNCOV
494
  if (pCommitRaw == NULL) {
×
495
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
496
    if (terrno != 0) code = terrno;
×
497
    TAOS_RETURN(code);
×
498
  }
UNCOV
499
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
UNCOV
500
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
501

UNCOV
502
  TAOS_RETURN(code);
×
503
}
504

UNCOV
505
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
×
506
                                                SSmaObj *pSma) {
UNCOV
507
  int32_t    code = 0;
×
UNCOV
508
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
×
UNCOV
509
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
UNCOV
510
  if (pDnode == NULL) {
×
511
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
512
    if (terrno != 0) code = terrno;
×
513
    TAOS_RETURN(code);
×
514
  }
515

UNCOV
516
  STransAction action = {0};
×
UNCOV
517
  action.epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
518
  mndReleaseDnode(pMnode, pDnode);
×
519

520
  // todo add sma info here
UNCOV
521
  SNode *pAst = NULL;
×
UNCOV
522
  TAOS_CHECK_RETURN(nodesStringToNode(pSma->ast, &pAst));
×
UNCOV
523
  if ((code = qExtractResultSchema(pAst, &pSma->schemaRow.nCols, &pSma->schemaRow.pSchema)) != 0) {
×
524
    nodesDestroyNode(pAst);
×
525
    TAOS_RETURN(code);
×
526
  }
UNCOV
527
  nodesDestroyNode(pAst);
×
UNCOV
528
  pSma->schemaRow.version = 1;
×
529

530
  // TODO: the schemaTag generated by qExtractResultXXX later.
UNCOV
531
  pSma->schemaTag.nCols = 1;
×
UNCOV
532
  pSma->schemaTag.version = 1;
×
UNCOV
533
  pSma->schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema));
×
UNCOV
534
  if (!pSma->schemaTag.pSchema) {
×
535
    TAOS_RETURN(-1);
×
536
  }
UNCOV
537
  pSma->schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT;
×
UNCOV
538
  pSma->schemaTag.pSchema[0].bytes = TYPE_BYTES[TSDB_DATA_TYPE_BIGINT];
×
UNCOV
539
  pSma->schemaTag.pSchema[0].colId = pSma->schemaRow.nCols + PRIMARYKEY_TIMESTAMP_COL_ID;
×
UNCOV
540
  pSma->schemaTag.pSchema[0].flags = 0;
×
UNCOV
541
  snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId");
×
542

UNCOV
543
  int32_t smaContLen = 0;
×
UNCOV
544
  void   *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
×
UNCOV
545
  if (pSmaReq == NULL) {
×
546
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
547
    if (terrno != 0) code = terrno;
×
548
    TAOS_RETURN(code);
×
549
  }
UNCOV
550
  pVgroup->pTsma = pSmaReq;
×
551

UNCOV
552
  int32_t contLen = 0;
×
UNCOV
553
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
UNCOV
554
  if (pReq == NULL) {
×
555
    taosMemoryFreeClear(pSmaReq);
×
556
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
557
    if (terrno != 0) code = terrno;
×
558
    TAOS_RETURN(code);
×
559
  }
560

UNCOV
561
  action.mTraceId = pTrans->mTraceId;
×
UNCOV
562
  action.pCont = pReq;
×
UNCOV
563
  action.contLen = contLen;
×
UNCOV
564
  action.msgType = TDMT_DND_CREATE_VNODE;
×
UNCOV
565
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
×
566

UNCOV
567
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
568
    taosMemoryFreeClear(pSmaReq);
×
569
    taosMemoryFree(pReq);
×
570
    TAOS_RETURN(code);
×
571
  }
572

UNCOV
573
  action.pCont = pSmaReq;
×
UNCOV
574
  action.contLen = smaContLen;
×
UNCOV
575
  action.msgType = TDMT_VND_CREATE_SMA;
×
UNCOV
576
  action.acceptableCode = TSDB_CODE_TSMA_ALREADY_EXIST;
×
577

UNCOV
578
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
579
    taosMemoryFreeClear(pSmaReq);
×
580
    TAOS_RETURN(code);
×
581
  }
582

UNCOV
583
  TAOS_RETURN(code);
×
584
}
585

UNCOV
586
static void mndDestroySmaObj(SSmaObj *pSmaObj) {
×
UNCOV
587
  if (pSmaObj) {
×
UNCOV
588
    taosMemoryFreeClear(pSmaObj->schemaRow.pSchema);
×
UNCOV
589
    taosMemoryFreeClear(pSmaObj->schemaTag.pSchema);
×
590
  }
UNCOV
591
}
×
592

593
#if 0
594
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb,
595
                            const char *streamName) {
596
  int32_t code = 0;
597
  if (pDb->cfg.replications > 1) {
598
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
599
    mError("sma:%s, failed to create since not support multiple replicas", pCreate->name);
600
    TAOS_RETURN(code);
601
  }
602
  SSmaObj smaObj = {0};
603
  memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
604
  memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
605
  memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
606
  smaObj.createdTime = taosGetTimestampMs();
607
  smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
608

609
  char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
610
  snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name);
611
  memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
612
  smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
613
  smaObj.stbUid = pStb->uid;
614
  smaObj.dbUid = pStb->dbUid;
615
  smaObj.intervalUnit = pCreate->intervalUnit;
616
  smaObj.slidingUnit = pCreate->slidingUnit;
617
#if 0
618
//  smaObj.timezone = pCreate->timezone;
619
#endif
620
  //  smaObj.timezone = taosGetLocalTimezoneOffset();  // use timezone of server
621
  smaObj.interval = pCreate->interval;
622
  smaObj.offset = pCreate->offset;
623
  smaObj.sliding = pCreate->sliding;
624
  smaObj.exprLen = pCreate->exprLen;
625
  smaObj.tagsFilterLen = pCreate->tagsFilterLen;
626
  smaObj.sqlLen = pCreate->sqlLen;
627
  smaObj.astLen = pCreate->astLen;
628
  if (smaObj.exprLen > 0) {
629
    smaObj.expr = pCreate->expr;
630
  }
631
  if (smaObj.tagsFilterLen > 0) {
632
    smaObj.tagsFilter = pCreate->tagsFilter;
633
  }
634
  if (smaObj.sqlLen > 0) {
635
    smaObj.sql = pCreate->sql;
636
  }
637
  if (smaObj.astLen > 0) {
638
    smaObj.ast = pCreate->ast;
639
  }
640

641
  SStreamObj streamObj = {0};
642
  tstrncpy(streamObj.name, streamName, TSDB_STREAM_FNAME_LEN);
643
  tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
644
  tstrncpy(streamObj.targetDb, streamObj.sourceDb, TSDB_DB_FNAME_LEN);
645
  streamObj.createTime = taosGetTimestampMs();
646
  streamObj.updateTime = streamObj.createTime;
647
  streamObj.uid = mndGenerateUid(streamName, strlen(streamName));
648
  streamObj.sourceDbUid = pDb->uid;
649
  streamObj.targetDbUid = pDb->uid;
650
  streamObj.version = 1;
651
  streamObj.sql = taosStrdup(pCreate->sql);
652
  if (!streamObj.sql) {
653
    return terrno;
654
  }
655
  streamObj.smaId = smaObj.uid;
656
  streamObj.conf.watermark = pCreate->watermark;
657
  streamObj.deleteMark = pCreate->deleteMark;
658
  streamObj.conf.fillHistory = STREAM_FILL_HISTORY_ON;
659
  streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
660
  streamObj.conf.triggerParam = pCreate->maxDelay;
661
  streamObj.ast = taosStrdup(smaObj.ast);
662
  if (!streamObj.ast) {
663
    taosMemoryFree(streamObj.sql);
664
    return terrno;
665
  }
666
  streamObj.indexForMultiAggBalance = -1;
667

668
  // check the maxDelay
669
  if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
670
    int64_t msInterval = -1;
671
    int32_t code =
672
        convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND, &msInterval);
673
    if (TSDB_CODE_SUCCESS != code) {
674
      mError("sma:%s, failed to create since convert time failed: %s", smaObj.name, tstrerror(code));
675
      return code;
676
    }
677
    streamObj.conf.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
678
  }
679
  if (streamObj.conf.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
680
    streamObj.conf.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
681
  }
682

683
  if ((code = mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg)) != 0) {
684
    mError("sma:%s, failed to create since %s", smaObj.name, tstrerror(code));
685
    TAOS_RETURN(code);
686
  }
687
  smaObj.dstVgId = streamObj.fixedSinkVg.vgId;
688
  streamObj.fixedSinkVgId = smaObj.dstVgId;
689

690
  SNode *pAst = NULL;
691
  if (nodesStringToNode(streamObj.ast, &pAst) < 0) {
692
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
693
    mError("sma:%s, failed to create since parse ast error", smaObj.name);
694
    TAOS_RETURN(code);
695
  }
696

697
  // extract output schema from ast
698
  if (qExtractResultSchema(pAst, (int32_t *)&streamObj.outputSchema.nCols, &streamObj.outputSchema.pSchema) != 0) {
699
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
700
    mError("sma:%s, failed to create since extract result schema error", smaObj.name);
701
    TAOS_RETURN(code);
702
  }
703

704
  SQueryPlan  *pPlan = NULL;
705
  SPlanContext cxt = {
706
      .pAstRoot = pAst,
707
      .topicQuery = false,
708
      .streamQuery = true,
709
      .triggerType = streamObj.conf.trigger,
710
      .watermark = streamObj.conf.watermark,
711
      .deleteMark = streamObj.deleteMark,
712
  };
713

714
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
715
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
716
    mError("sma:%s, failed to create since create query plan error", smaObj.name);
717
    TAOS_RETURN(code);
718
  }
719

720
  // save physcial plan
721
  if (nodesNodeToString((SNode *)pPlan, false, &streamObj.physicalPlan, NULL) != 0) {
722
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
723
    mError("sma:%s, failed to create since save physcial plan error", smaObj.name);
724
    TAOS_RETURN(code);
725
  }
726

727
  if (pAst != NULL) nodesDestroyNode(pAst);
728
  nodesDestroyNode((SNode *)pPlan);
729

730
  code = -1;
731
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-sma");
732
  if (pTrans == NULL) {
733
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
734
    if (terrno != 0) code = terrno;
735
    goto _OVER;
736
  }
737
  mndTransSetDbName(pTrans, pDb->name, NULL);
738
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
739

740
  mndTransSetSerial(pTrans);
741
  mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
742
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
743
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj), NULL, _OVER);
744
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
745
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj), NULL, _OVER);
746
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
747
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
748
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj), NULL, _OVER);
749
  TAOS_CHECK_GOTO(mndScheduleStream(pMnode, &streamObj, 1685959190000, NULL), NULL, _OVER);
750
  TAOS_CHECK_GOTO(mndPersistStream(pTrans, &streamObj), NULL, _OVER);
751
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
752

753
  mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreate->name,
754
        smaObj.uid, smaObj.stbUid, smaObj.dstTbUid, smaObj.dstTbName, smaObj.dstVgId);
755

756
  code = 0;
757

758
_OVER:
759
  tFreeStreamObj(&streamObj);
760
  mndDestroySmaObj(&smaObj);
761
  mndTransDrop(pTrans);
762
  TAOS_RETURN(code);
763
}
764
#endif
765

766
static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
278✔
767
  int32_t code = TSDB_CODE_MND_INVALID_SMA_OPTION;
278✔
768
  if (pCreate->name[0] == 0) TAOS_RETURN(code);
278!
769
  if (pCreate->stb[0] == 0) TAOS_RETURN(code);
278!
770
  if (pCreate->igExists < 0 || pCreate->igExists > 1) TAOS_RETURN(code);
278!
771
  if (pCreate->intervalUnit < 0) TAOS_RETURN(code);
278!
772
  if (pCreate->slidingUnit < 0) TAOS_RETURN(code);
278!
773
  if (pCreate->timezone < 0) TAOS_RETURN(code);
278!
774
  if (pCreate->interval < 0) TAOS_RETURN(code);
278!
775
  if (pCreate->offset < 0) TAOS_RETURN(code);
278!
776
  if (pCreate->sliding < 0) TAOS_RETURN(code);
278!
777
  if (pCreate->exprLen < 0) TAOS_RETURN(code);
278!
778
  if (pCreate->tagsFilterLen < 0) TAOS_RETURN(code);
278!
779
  if (pCreate->sqlLen < 0) TAOS_RETURN(code);
278!
780
  if (pCreate->astLen < 0) TAOS_RETURN(code);
278!
781
  if (pCreate->exprLen != 0 && strlen(pCreate->expr) + 1 != pCreate->exprLen) TAOS_RETURN(code);
278!
782
  if (pCreate->tagsFilterLen != 0 && strlen(pCreate->tagsFilter) + 1 != pCreate->tagsFilterLen) TAOS_RETURN(code);
278!
783
  if (pCreate->sqlLen != 0 && strlen(pCreate->sql) + 1 != pCreate->sqlLen) TAOS_RETURN(code);
278!
784
  if (pCreate->astLen != 0 && strlen(pCreate->ast) + 1 != pCreate->astLen) TAOS_RETURN(code);
278!
785

786
  SName smaName = {0};
278✔
787
  if (tNameFromString(&smaName, pCreate->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) < 0) return -1;
278!
788
  if (*(char *)tNameGetTableName(&smaName) == 0) return -1;
278!
789

790
  code = 0;
278✔
791
  TAOS_RETURN(code);
278✔
792
}
793

UNCOV
794
static int32_t mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
×
795
  SName   n;
UNCOV
796
  int32_t code = tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
797
  if (TSDB_CODE_SUCCESS != code) {
×
798
    return code;
×
799
  }
UNCOV
800
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", n.acctId, n.tname);
×
UNCOV
801
  return TSDB_CODE_SUCCESS;
×
802
}
803

804
#if 0
805
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
806
  SMnode        *pMnode = pReq->info.node;
807
  int32_t        code = -1;
808
  SStbObj       *pStb = NULL;
809
  SSmaObj       *pSma = NULL;
810
  SStreamObj    *pStream = NULL;
811
  SDbObj        *pDb = NULL;
812
  SMCreateSmaReq createReq = {0};
813

814
  int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
815

816
  TAOS_CHECK_GOTO(tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
817

818
#ifdef WINDOWS
819
  terrno = TSDB_CODE_MND_INVALID_PLATFORM;
820
  goto _OVER;
821
#endif
822
  mInfo("sma:%s, start to create", createReq.name);
823
  TAOS_CHECK_GOTO(mndCheckCreateSmaReq(&createReq), NULL, _OVER);
824

825
  pStb = mndAcquireStb(pMnode, createReq.stb);
826
  if (pStb == NULL) {
827
    mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
828
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
829
    if (terrno != 0) code = terrno;
830
    goto _OVER;
831
  }
832

833
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
834
  code = mndGetStreamNameFromSmaName(streamName, createReq.name);
835
  if (TSDB_CODE_SUCCESS != code) {
836
    goto _OVER;
837
  }
838

839
  code = mndAcquireStream(pMnode, streamName, &pStream);
840
  if (pStream != NULL || code == 0) {
841
    mError("sma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
842
    code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
843
    goto _OVER;
844
  }
845
  SSIdx idx = {0};
846
  if ((code = mndAcquireGlobalIdx(pMnode, createReq.name, SDB_SMA, &idx)) == 0) {
847
    pSma = idx.pIdx;
848
  } else {
849
    goto _OVER;
850
  }
851

852
  if (pSma != NULL) {
853
    if (createReq.igExists) {
854
      mInfo("sma:%s, already exist in sma:%s, ignore exist is set", createReq.name, pSma->name);
855
      code = 0;
856
      goto _OVER;
857
    } else {
858
      code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
859
      goto _OVER;
860
    }
861
  }
862

863
  SName name = {0};
864
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
865
  if (TSDB_CODE_SUCCESS != code) {
866
    goto _OVER;
867
  }
868
  char db[TSDB_TABLE_FNAME_LEN] = {0};
869
  (void)tNameGetFullDbName(&name, db);
870

871
  pDb = mndAcquireDb(pMnode, db);
872
  if (pDb == NULL) {
873
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
874
    goto _OVER;
875
  }
876

877
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
878

879
  code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb, streamName);
880
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
881

882
_OVER:
883
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
884
    mError("sma:%s, failed to create since %s", createReq.name, tstrerror(code));
885
  }
886

887
  mndReleaseStb(pMnode, pStb);
888
  mndReleaseSma(pMnode, pSma);
889
  mndReleaseStream(pMnode, pStream);
890
  mndReleaseDb(pMnode, pDb);
891
  tFreeSMCreateSmaReq(&createReq);
892

893
  TAOS_RETURN(code);
894
}
895
#endif
896

897
static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
214✔
898
  int32_t  code = 0;
214✔
899
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
214✔
900
  if (pRedoRaw == NULL) {
214!
901
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
902
    if (terrno != 0) code = terrno;
×
903
    return -1;
×
904
  }
905
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
214!
906
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
214!
907

908
  return 0;
214✔
909
}
910

911
static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
239✔
912
  int32_t  code = 0;
239✔
913
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
239✔
914
  if (pCommitRaw == NULL) {
239!
915
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
916
    if (terrno != 0) code = terrno;
×
917
    return -1;
×
918
  }
919
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
239!
920
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
239!
921

922
  return 0;
239✔
923
}
924

UNCOV
925
static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
UNCOV
926
  int32_t  code = 0;
×
UNCOV
927
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
UNCOV
928
  if (pVgRaw == NULL) {
×
929
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
930
    if (terrno != 0) code = terrno;
×
931
    return -1;
×
932
  }
UNCOV
933
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
×
UNCOV
934
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPING));
×
935

UNCOV
936
  return 0;
×
937
}
938

UNCOV
939
static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
UNCOV
940
  int32_t  code = 0;
×
UNCOV
941
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
UNCOV
942
  if (pVgRaw == NULL) {
×
943
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
944
    if (terrno != 0) code = terrno;
×
945
    return -1;
×
946
  }
UNCOV
947
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
×
UNCOV
948
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED));
×
949

UNCOV
950
  return 0;
×
951
}
952

UNCOV
953
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
UNCOV
954
  int32_t    code = 0;
×
UNCOV
955
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
×
UNCOV
956
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
UNCOV
957
  if (pDnode == NULL) {
×
958
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
959
    if (terrno != 0) code = terrno;
×
960
    TAOS_RETURN(code);
×
961
  }
962

UNCOV
963
  STransAction action = {0};
×
UNCOV
964
  action.epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
965
  mndReleaseDnode(pMnode, pDnode);
×
966

UNCOV
967
  int32_t contLen = 0;
×
UNCOV
968
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
UNCOV
969
  if (pReq == NULL) {
×
970
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
971
    if (terrno != 0) code = terrno;
×
972
    TAOS_RETURN(code);
×
973
  }
974

UNCOV
975
  action.pCont = pReq;
×
UNCOV
976
  action.contLen = contLen;
×
UNCOV
977
  action.msgType = TDMT_DND_DROP_VNODE;
×
UNCOV
978
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
×
979

UNCOV
980
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
981
    taosMemoryFree(pReq);
×
982
    TAOS_RETURN(code);
×
983
  }
984

UNCOV
985
  TAOS_RETURN(code);
×
986
}
987

UNCOV
988
static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) {
×
UNCOV
989
  int32_t     code = -1;
×
UNCOV
990
  SVgObj     *pVgroup = NULL;
×
UNCOV
991
  SStbObj    *pStb = NULL;
×
UNCOV
992
  STrans     *pTrans = NULL;
×
UNCOV
993
  SStreamObj *pStream = NULL;
×
994

UNCOV
995
  pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
UNCOV
996
  if (pVgroup == NULL) {
×
997
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
998
    if (terrno != 0) code = terrno;
×
999
    goto _OVER;
×
1000
  }
1001

UNCOV
1002
  pStb = mndAcquireStb(pMnode, pSma->stb);
×
UNCOV
1003
  if (pStb == NULL) {
×
1004
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1005
    if (terrno != 0) code = terrno;
×
1006
    goto _OVER;
×
1007
  }
1008

UNCOV
1009
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-sma");
×
UNCOV
1010
  if (pTrans == NULL) {
×
1011
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1012
    if (terrno != 0) code = terrno;
×
1013
    goto _OVER;
×
1014
  }
1015

UNCOV
1016
  mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
×
UNCOV
1017
  mndTransSetDbName(pTrans, pDb->name, NULL);
×
UNCOV
1018
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
1019

UNCOV
1020
  mndTransSetSerial(pTrans);
×
1021

UNCOV
1022
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1023
  code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
UNCOV
1024
  if (TSDB_CODE_SUCCESS != code) {
×
1025
    goto _OVER;
×
1026
  }
1027

UNCOV
1028
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
UNCOV
1029
  if (pStream == NULL || pStream->smaId != pSma->uid || code != 0) {
×
1030
    sdbRelease(pMnode->pSdb, pStream);
×
1031
    goto _OVER;
×
1032
  } else {
UNCOV
1033
    if ((code = mndStreamSetDropAction(pMnode, pTrans, pStream)) < 0) {
×
1034
      mError("stream:%s, failed to drop task since %s", pStream->name, tstrerror(code));
×
1035
      sdbRelease(pMnode->pSdb, pStream);
×
1036
      goto _OVER;
×
1037
    }
1038

1039
    // drop stream
UNCOV
1040
    if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
1041
      mError("stream:%s, failed to drop log since %s", pStream->name, tstrerror(code));
×
1042
      sdbRelease(pMnode->pSdb, pStream);
×
1043
      goto _OVER;
×
1044
    }
1045
  }
UNCOV
1046
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
UNCOV
1047
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
UNCOV
1048
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
UNCOV
1049
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
UNCOV
1050
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
×
UNCOV
1051
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
UNCOV
1052
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
1053

UNCOV
1054
  code = 0;
×
1055

UNCOV
1056
_OVER:
×
UNCOV
1057
  mndTransDrop(pTrans);
×
UNCOV
1058
  mndReleaseStream(pMnode, pStream);
×
UNCOV
1059
  mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
1060
  mndReleaseStb(pMnode, pStb);
×
UNCOV
1061
  TAOS_RETURN(code);
×
1062
}
1063

1064
int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
1,043✔
1065
  SSdb    *pSdb = pMnode->pSdb;
1,043✔
1066
  SSmaObj *pSma = NULL;
1,043✔
1067
  void    *pIter = NULL;
1,043✔
1068
  SVgObj  *pVgroup = NULL;
1,043✔
1069
  int32_t  code = -1;
1,043✔
1070

1071
  while (1) {
1072
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
1,185✔
1073
    if (pIter == NULL) break;
1,185✔
1074

1075
    if (pSma->stbUid == pStb->uid) {
142!
1076
      mndTransSetSerial(pTrans);
×
1077
      pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
1078
      if (pVgroup == NULL) {
×
1079
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1080
        if (terrno != 0) code = terrno;
×
1081
        goto _OVER;
×
1082
      }
1083

1084
      char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1085
      code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
1086
      if (TSDB_CODE_SUCCESS != code) {
×
1087
        goto _OVER;
×
1088
      }
1089

1090
      SStreamObj *pStream = NULL;
×
1091
      code = mndAcquireStream(pMnode, streamName, &pStream);
×
1092
      if ((pStream != NULL && pStream->smaId == pSma->uid) || code != 0) {
×
1093
        if ((code = mndStreamSetDropAction(pMnode, pTrans, pStream)) < 0) {
×
1094
          mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
×
1095
          mndReleaseStream(pMnode, pStream);
×
1096
          goto _OVER;
×
1097
        }
1098

1099
        if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
1100
          mndReleaseStream(pMnode, pStream);
×
1101
          goto _OVER;
×
1102
        }
1103

1104
        mndReleaseStream(pMnode, pStream);
×
1105
      }
1106

1107
      TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
1108
      TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
1109
      TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
1110
      mndReleaseVgroup(pMnode, pVgroup);
×
1111
      pVgroup = NULL;
×
1112
    }
1113

1114
    sdbRelease(pSdb, pSma);
142✔
1115
  }
1116

1117
  code = 0;
1,043✔
1118

1119
_OVER:
1,043✔
1120
  sdbCancelFetch(pSdb, pIter);
1,043✔
1121
  sdbRelease(pSdb, pSma);
1,043✔
1122
  mndReleaseVgroup(pMnode, pVgroup);
1,043✔
1123
  TAOS_RETURN(code);
1,043✔
1124
}
1125

1126
int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,922✔
1127
  int32_t code = 0;
1,922✔
1128
  SSdb   *pSdb = pMnode->pSdb;
1,922✔
1129
  void   *pIter = NULL;
1,922✔
1130

1131
  while (1) {
35✔
1132
    SSmaObj *pSma = NULL;
1,957✔
1133
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
1,957✔
1134
    if (pIter == NULL) break;
1,957✔
1135

1136
    if (pSma->dbUid == pDb->uid) {
35✔
1137
      if ((code = mndSetDropSmaCommitLogs(pMnode, pTrans, pSma)) != 0) {
25!
1138
        sdbRelease(pSdb, pSma);
×
1139
        sdbCancelFetch(pSdb, pSma);
×
1140
        TAOS_RETURN(code);
×
1141
      }
1142
    }
1143

1144
    sdbRelease(pSdb, pSma);
35✔
1145
  }
1146

1147
  TAOS_RETURN(code);
1,922✔
1148
}
1149

1150
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
1,068✔
1151
  SMnode      *pMnode = pReq->info.node;
1,068✔
1152
  int32_t      code = -1;
1,068✔
1153
  SDbObj      *pDb = NULL;
1,068✔
1154
  SSmaObj     *pSma = NULL;
1,068✔
1155
  SMDropSmaReq dropReq = {0};
1,068✔
1156

1157
  TAOS_CHECK_GOTO(tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
1,068!
1158

1159
  mInfo("sma:%s, start to drop", dropReq.name);
1,068!
1160

1161
  SSIdx idx = {0};
1,068✔
1162
  if ((code = mndAcquireGlobalIdx(pMnode, dropReq.name, SDB_SMA, &idx)) == 0) {
1,068✔
1163
    pSma = idx.pIdx;
3✔
1164
  } else {
1165
    goto _OVER;
1,065✔
1166
  }
1167
  if (pSma == NULL) {
3!
1168
    if (dropReq.igNotExists) {
3!
1169
      mInfo("sma:%s, not exist, ignore not exist is set", dropReq.name);
×
1170
      code = 0;
×
1171
      goto _OVER;
×
1172
    } else {
1173
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
3✔
1174
      goto _OVER;
3✔
1175
    }
1176
  }
1177

UNCOV
1178
  SName name = {0};
×
UNCOV
1179
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1180
  if (TSDB_CODE_SUCCESS != code) {
×
1181
    goto _OVER;
×
1182
  }
UNCOV
1183
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1184
  (void)tNameGetFullDbName(&name, db);
×
1185

UNCOV
1186
  pDb = mndAcquireDb(pMnode, db);
×
UNCOV
1187
  if (pDb == NULL) {
×
1188
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1189
    goto _OVER;
×
1190
  }
1191

UNCOV
1192
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
1193

UNCOV
1194
  code = mndDropSma(pMnode, pReq, pDb, pSma);
×
UNCOV
1195
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1196

1197
_OVER:
×
1198
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,068!
1199
    mError("sma:%s, failed to drop since %s", dropReq.name, tstrerror(code));
1,068!
1200
  }
1201

1202
  mndReleaseSma(pMnode, pSma);
1,068✔
1203
  mndReleaseDb(pMnode, pDb);
1,068✔
1204
  TAOS_RETURN(code);
1,068✔
1205
}
1206

1207
static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
×
1208
  int32_t  code = -1;
×
1209
  SSmaObj *pSma = NULL;
×
1210

1211
  SSIdx idx = {0};
×
1212
  if (0 == mndAcquireGlobalIdx(pMnode, indexReq->indexFName, SDB_SMA, &idx)) {
×
1213
    pSma = idx.pIdx;
×
1214
  } else {
1215
    *exist = false;
×
1216
    return 0;
×
1217
  }
1218

1219
  if (pSma == NULL) {
×
1220
    *exist = false;
×
1221
    return 0;
×
1222
  }
1223

1224
  memcpy(rsp->dbFName, pSma->db, sizeof(pSma->db));
×
1225
  memcpy(rsp->tblFName, pSma->stb, sizeof(pSma->stb));
×
1226
  tstrncpy(rsp->indexType, TSDB_INDEX_TYPE_SMA, TSDB_INDEX_TYPE_LEN);
×
1227

1228
  SNodeList *pList = NULL;
×
1229
  int32_t    extOffset = 0;
×
1230
  code = nodesStringToList(pSma->expr, &pList);
×
1231
  if (0 == code) {
×
1232
    SNode *node = NULL;
×
1233
    FOREACH(node, pList) {
×
1234
      SFunctionNode *pFunc = (SFunctionNode *)node;
×
1235
      extOffset += tsnprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
×
1236
                             (extOffset ? "," : ""), pFunc->functionName);
×
1237
    }
1238

1239
    *exist = true;
×
1240
  }
1241

1242
  mndReleaseSma(pMnode, pSma);
×
1243
  return code;
×
1244
}
1245

1246
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) {
9,250✔
1247
  int32_t         code = 0;
9,250✔
1248
  SSmaObj        *pSma = NULL;
9,250✔
1249
  SSdb           *pSdb = pMnode->pSdb;
9,250✔
1250
  void           *pIter = NULL;
9,250✔
1251
  STableIndexInfo info;
1252

1253
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
9,250✔
1254
  if (NULL == pStb) {
9,250✔
1255
    *exist = false;
5,867✔
1256
    return TSDB_CODE_SUCCESS;
5,867✔
1257
  }
1258

1259
  tstrncpy(rsp->dbFName, pStb->db, TSDB_DB_FNAME_LEN);
3,383✔
1260
  tstrncpy(rsp->tbName, pStb->name + strlen(pStb->db) + 1, TSDB_TABLE_NAME_LEN);
3,383✔
1261
  rsp->suid = pStb->uid;
3,383✔
1262
  rsp->version = pStb->smaVer;
3,383✔
1263
  mndReleaseStb(pMnode, pStb);
3,383✔
1264

1265
  while (1) {
2,177✔
1266
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
5,560✔
1267
    if (pIter == NULL) break;
5,560!
1268

1269
    if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
5,560!
1270
      sdbRelease(pSdb, pSma);
2,177✔
1271
      continue;
2,177✔
1272
    }
1273

1274
    info.intervalUnit = pSma->intervalUnit;
3,383✔
1275
    info.slidingUnit = pSma->slidingUnit;
3,383✔
1276
    info.interval = pSma->interval;
3,383✔
1277
    info.offset = pSma->offset;
3,383✔
1278
    info.sliding = pSma->sliding;
3,383✔
1279
    info.dstTbUid = pSma->dstTbUid;
3,383✔
1280
    info.dstVgId = pSma->dstVgId;
3,383✔
1281

1282
    SVgObj *pVg = mndAcquireVgroup(pMnode, pSma->dstVgId);
3,383✔
1283
    if (pVg == NULL) {
3,383!
1284
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
3,383✔
1285
      if (terrno != 0) code = terrno;
3,383!
1286
      sdbRelease(pSdb, pSma);
3,383✔
1287
      sdbCancelFetch(pSdb, pIter);
3,383✔
1288
      return code;
3,383✔
1289
    }
UNCOV
1290
    info.epSet = mndGetVgroupEpset(pMnode, pVg);
×
1291

UNCOV
1292
    info.expr = taosMemoryMalloc(pSma->exprLen + 1);
×
UNCOV
1293
    if (info.expr == NULL) {
×
1294
      code = terrno;
×
1295
      sdbRelease(pSdb, pSma);
×
1296
      sdbCancelFetch(pSdb, pIter);
×
1297
      return code;
×
1298
    }
1299

UNCOV
1300
    memcpy(info.expr, pSma->expr, pSma->exprLen);
×
UNCOV
1301
    info.expr[pSma->exprLen] = 0;
×
1302

UNCOV
1303
    if (NULL == taosArrayPush(rsp->pIndex, &info)) {
×
1304
      code = terrno;
×
1305
      taosMemoryFree(info.expr);
×
1306
      sdbRelease(pSdb, pSma);
×
1307
      sdbCancelFetch(pSdb, pIter);
×
1308
      return code;
×
1309
    }
1310

UNCOV
1311
    rsp->indexSize += sizeof(info) + pSma->exprLen + 1;
×
UNCOV
1312
    *exist = true;
×
1313

UNCOV
1314
    sdbRelease(pSdb, pSma);
×
1315
  }
1316

UNCOV
1317
  TAOS_RETURN(code);
×
1318
}
1319

1320
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
×
1321
  SUserIndexReq indexReq = {0};
×
1322
  SMnode       *pMnode = pReq->info.node;
×
1323
  int32_t       code = -1;
×
1324
  SUserIndexRsp rsp = {0};
×
1325
  bool          exist = false;
×
1326

1327
  TAOS_CHECK_GOTO(tDeserializeSUserIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
1328

1329
  code = mndGetSma(pMnode, &indexReq, &rsp, &exist);
×
1330
  if (code) {
×
1331
    goto _OVER;
×
1332
  }
1333

1334
  if (!exist) {
×
1335
    // TODO GET INDEX FROM FULLTEXT
1336
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
1337
  } else {
1338
    int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
×
1339
    void   *pRsp = rpcMallocCont(contLen);
×
1340
    if (pRsp == NULL) {
×
1341
      code = terrno;
×
1342
      goto _OVER;
×
1343
    }
1344

1345
    contLen = tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
×
1346
    if (contLen < 0) {
×
1347
      code = terrno;
×
1348
      goto _OVER;
×
1349
    }
1350

1351
    pReq->info.rsp = pRsp;
×
1352
    pReq->info.rspLen = contLen;
×
1353

1354
    code = 0;
×
1355
  }
1356

1357
_OVER:
×
1358
  if (code != 0) {
×
1359
    mError("failed to get index %s since %s", indexReq.indexFName, tstrerror(code));
×
1360
  }
1361

1362
  TAOS_RETURN(code);
×
1363
}
1364

1365
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
9,250✔
1366
  STableIndexReq indexReq = {0};
9,250✔
1367
  SMnode        *pMnode = pReq->info.node;
9,250✔
1368
  int32_t        code = -1;
9,250✔
1369
  STableIndexRsp rsp = {0};
9,250✔
1370
  bool           exist = false;
9,250✔
1371

1372
  TAOS_CHECK_GOTO(tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
9,250!
1373

1374
  rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
9,250✔
1375
  if (NULL == rsp.pIndex) {
9,250!
1376
    code = terrno;
×
1377
    goto _OVER;
×
1378
  }
1379

1380
  code = mndGetTableSma(pMnode, indexReq.tbFName, &rsp, &exist);
9,250✔
1381
  if (code) {
9,250✔
1382
    goto _OVER;
3,383✔
1383
  }
1384

1385
  if (!exist) {
5,867!
1386
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
5,867✔
1387
  } else {
UNCOV
1388
    int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp);
×
UNCOV
1389
    void   *pRsp = rpcMallocCont(contLen);
×
UNCOV
1390
    if (pRsp == NULL) {
×
1391
      code = terrno;
×
1392
      goto _OVER;
×
1393
    }
1394

UNCOV
1395
    contLen = tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
×
UNCOV
1396
    if (contLen < 0) {
×
1397
      code = terrno;
×
1398
      goto _OVER;
×
1399
    }
1400

UNCOV
1401
    pReq->info.rsp = pRsp;
×
UNCOV
1402
    pReq->info.rspLen = contLen;
×
1403

UNCOV
1404
    code = 0;
×
1405
  }
1406

1407
_OVER:
9,250✔
1408
  if (code != 0) {
9,250!
1409
    mError("failed to get table index %s since %s", indexReq.tbFName, tstrerror(code));
9,250!
1410
  }
1411

1412
  tFreeSerializeSTableIndexRsp(&rsp);
9,250✔
1413
  TAOS_RETURN(code);
9,250✔
1414
}
1415

1416
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
6,751✔
1417
  SMnode  *pMnode = pReq->info.node;
6,751✔
1418
  SSdb    *pSdb = pMnode->pSdb;
6,751✔
1419
  int32_t  numOfRows = 0;
6,751✔
1420
  SSmaObj *pSma = NULL;
6,751✔
1421
  int32_t  cols = 0;
6,751✔
1422
  int32_t  code = 0;
6,751✔
1423

1424
  SDbObj *pDb = NULL;
6,751✔
1425
  if (strlen(pShow->db) > 0) {
6,751✔
1426
    pDb = mndAcquireDb(pMnode, pShow->db);
103✔
1427
    if (pDb == NULL) return 0;
103!
1428
  }
1429
  SSmaAndTagIter *pIter = pShow->pIter;
6,751✔
1430
  while (numOfRows < rows) {
6,751!
1431
    pIter->pSmaIter = sdbFetch(pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
6,753✔
1432
    if (pIter->pSmaIter == NULL) break;
6,757!
1433

UNCOV
1434
    if (NULL != pDb && pSma->dbUid != pDb->uid) {
×
1435
      sdbRelease(pSdb, pSma);
×
1436
      continue;
×
1437
    }
1438

UNCOV
1439
    cols = 0;
×
1440

UNCOV
1441
    SName smaName = {0};
×
UNCOV
1442
    SName stbName = {0};
×
UNCOV
1443
    char  n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1444
    char  n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1445
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1446
    char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1447
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1448
      STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
×
UNCOV
1449
      STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db));
×
UNCOV
1450
      code = tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1451
    }
UNCOV
1452
    SColumnInfoData *pColInfo = NULL;
×
UNCOV
1453
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1454
      STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
×
1455

UNCOV
1456
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1457
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
×
1458
    }
UNCOV
1459
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1460
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1461
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
×
1462
    }
UNCOV
1463
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1464
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1465
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
×
1466
    }
UNCOV
1467
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1468
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1469
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
×
1470
    }
UNCOV
1471
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1472
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1473
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
×
1474
    }
1475

UNCOV
1476
    char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1477
    STR_TO_VARSTR(col, (char *)"");
×
1478

UNCOV
1479
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1480
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1481
      code = colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
×
1482
    }
1483

UNCOV
1484
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1485
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1486

UNCOV
1487
      char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1488
      STR_TO_VARSTR(tag, (char *)"sma_index");
×
UNCOV
1489
      code = colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
×
1490
    }
1491

UNCOV
1492
    numOfRows++;
×
UNCOV
1493
    sdbRelease(pSdb, pSma);
×
UNCOV
1494
    if (TSDB_CODE_SUCCESS != code) {
×
1495
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
1496
      numOfRows = -1;
×
1497
      break;
×
1498
    }
1499
  }
1500

1501
  mndReleaseDb(pMnode, pDb);
6,755✔
1502
  pShow->numOfRows += numOfRows;
6,751✔
1503
  return numOfRows;
6,751✔
1504
}
1505

1506
// sma and tag index comm func
1507
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
1,068✔
1508
  int ret = mndProcessDropSmaReq(pReq);
1,068✔
1509
  if (ret == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST || ret == TSDB_CODE_MND_SMA_NOT_EXIST) {
1,068!
1510
    terrno = 0;
1,068✔
1511
    ret = mndProcessDropTagIdxReq(pReq);
1,068✔
1512
  }
1513
  return ret;
1,068✔
1514
}
1515

1516
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
6,751✔
1517
  if (pShow->pIter == NULL) {
6,751!
1518
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
6,753!
1519
  }
1520
  if (!pShow->pIter) {
6,755!
1521
    return terrno;
×
1522
  }
1523
  int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
6,755✔
1524
  if (read < rows) {
6,754!
1525
    read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read);
6,754✔
1526
  }
1527
  // no more to read
1528
  if (read < rows) {
6,758!
1529
    taosMemoryFree(pShow->pIter);
6,758!
1530
    pShow->pIter = NULL;
6,758✔
1531
  }
1532
  return read;
6,758✔
1533
}
1534
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
×
1535
  SSmaAndTagIter *p = pIter;
×
1536
  if (p != NULL) {
×
1537
    SSdb *pSdb = pMnode->pSdb;
×
1538
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
1539
    sdbCancelFetchByType(pSdb, p->pIdxIter, SDB_IDX);
×
1540
  }
1541
  taosMemoryFree(p);
×
1542
}
×
1543

1544
static void initSMAObj(SCreateTSMACxt *pCxt) {
263✔
1545
  memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
263✔
1546
  memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
263✔
1547
  memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
263✔
1548
  if (pCxt->pBaseSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pBaseSma->name, TSDB_TABLE_FNAME_LEN);
263✔
1549
  pCxt->pSma->createdTime = taosGetTimestampMs();
263✔
1550
  pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
263✔
1551

1552
  memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
263✔
1553
  pCxt->pSma->dstTbUid = 0;  // not used
263✔
1554
  pCxt->pSma->stbUid = pCxt->pSrcStb ? pCxt->pSrcStb->uid : pCxt->pCreateSmaReq->normSourceTbUid;
263✔
1555
  pCxt->pSma->dbUid = pCxt->pDb->uid;
263✔
1556
  pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
263✔
1557
  pCxt->pSma->intervalUnit = pCxt->pCreateSmaReq->intervalUnit;
263✔
1558
  //  pCxt->pSma->timezone = taosGetLocalTimezoneOffset();
1559
  pCxt->pSma->version = 1;
263✔
1560

1561
  pCxt->pSma->exprLen = pCxt->pCreateSmaReq->exprLen;
263✔
1562
  pCxt->pSma->sqlLen = pCxt->pCreateSmaReq->sqlLen;
263✔
1563
  pCxt->pSma->astLen = pCxt->pCreateSmaReq->astLen;
263✔
1564
  pCxt->pSma->expr = pCxt->pCreateSmaReq->expr;
263✔
1565
  pCxt->pSma->sql = pCxt->pCreateSmaReq->sql;
263✔
1566
  pCxt->pSma->ast = pCxt->pCreateSmaReq->ast;
263✔
1567
}
263✔
1568

1569
static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
263✔
1570
  tstrncpy(pCxt->pCreateStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
263✔
1571
  tstrncpy(pCxt->pCreateStreamReq->sourceDB, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
263✔
1572
  tstrncpy(pCxt->pCreateStreamReq->targetStbFullName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
263✔
1573
  pCxt->pCreateStreamReq->igExists = false;
263✔
1574
  pCxt->pCreateStreamReq->triggerType = STREAM_TRIGGER_MAX_DELAY;
263✔
1575
  pCxt->pCreateStreamReq->igExpired = false;
263✔
1576
  pCxt->pCreateStreamReq->fillHistory = STREAM_FILL_HISTORY_ON;
263✔
1577
  pCxt->pCreateStreamReq->maxDelay = 10000;
263✔
1578
  pCxt->pCreateStreamReq->watermark = 0;
263✔
1579
  pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb ? pCxt->pSrcStb->numOfTags + 1 : 1;
263✔
1580
  pCxt->pCreateStreamReq->checkpointFreq = 0;
263✔
1581
  pCxt->pCreateStreamReq->createStb = 1;
263✔
1582
  pCxt->pCreateStreamReq->targetStbUid = 0;
263✔
1583
  pCxt->pCreateStreamReq->fillNullCols = NULL;
263✔
1584
  pCxt->pCreateStreamReq->igUpdate = 0;
263✔
1585
  pCxt->pCreateStreamReq->deleteMark = pCxt->pCreateSmaReq->deleteMark;
263✔
1586
  pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
263✔
1587
  pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid;
263✔
1588
  pCxt->pCreateStreamReq->ast = taosStrdup(pCxt->pCreateSmaReq->ast);
263!
1589
  if (!pCxt->pCreateStreamReq->ast) {
263!
1590
    return terrno;
×
1591
  }
1592
  pCxt->pCreateStreamReq->sql = taosStrdup(pCxt->pCreateSmaReq->sql);
263!
1593
  if (!pCxt->pCreateStreamReq->sql) {
263!
1594
    return terrno;
×
1595
  }
1596

1597
  // construct tags
1598
  pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pCreateStreamReq->numOfTags, sizeof(SField));
263✔
1599
  if (!pCxt->pCreateStreamReq->pTags) {
263!
1600
    return terrno;
×
1601
  }
1602
  SFieldWithOptions f = {0};
263✔
1603
  int32_t           code = 0;
263✔
1604
  if (pCxt->pSrcStb) {
263✔
1605
    for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) {
1,556✔
1606
      SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
1,310✔
1607
      f.bytes = pSchema->bytes;
1,310✔
1608
      f.type = pSchema->type;
1,310✔
1609
      f.flags = pSchema->flags;
1,310✔
1610
      tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN);
1,310✔
1611
      if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
2,620!
1612
        code = terrno;
×
1613
        break;
×
1614
      }
1615
    }
1616
  }
1617

1618
  if (TSDB_CODE_SUCCESS == code) {
263!
1619
    f.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
263✔
1620
    f.flags = COL_SMA_ON;
263✔
1621
    f.type = TSDB_DATA_TYPE_BINARY;
263✔
1622
    tstrncpy(f.name, "tbname", strlen("tbname") + 1);
263✔
1623
    if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
526!
1624
      code = terrno;
×
1625
    }
1626
  }
1627

1628
  if (TSDB_CODE_SUCCESS == code) {
263!
1629
    // construct output cols
1630
    SNode *pNode;
1631
    FOREACH(pNode, pCxt->pProjects) {
42,736!
1632
      SExprNode *pExprNode = (SExprNode *)pNode;
42,473✔
1633
      f.bytes = pExprNode->resType.bytes;
42,473✔
1634
      f.type = pExprNode->resType.type;
42,473✔
1635
      f.flags = COL_SMA_ON;
42,473✔
1636
      tstrncpy(f.name, pExprNode->userAlias, TSDB_COL_NAME_LEN);
42,473✔
1637
      if (IS_DECIMAL_TYPE(f.type)) {
42,473!
1638
        f.typeMod = decimalCalcTypeMod(pExprNode->resType.precision, pExprNode->resType.scale);
8✔
1639
        f.flags |= COL_HAS_TYPE_MOD;
8✔
1640
      }
1641
      if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pCols, &f)) {
84,946!
UNCOV
1642
        code = terrno;
×
UNCOV
1643
        break;
×
1644
      }
1645
    }
1646
  }
1647
  return code;
263✔
1648
}
1649

1650
static int32_t mndCreateTSMABuildDropStreamReq(SCreateTSMACxt *pCxt) {
477✔
1651
  tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
477✔
1652
  pCxt->pDropStreamReq->igNotExists = false;
477✔
1653
  pCxt->pDropStreamReq->sql = taosStrdup(pCxt->pDropSmaReq->name);
477!
1654
  if (!pCxt->pDropStreamReq->sql) {
477!
UNCOV
1655
    return terrno;
×
1656
  }
1657
  pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
477✔
1658
  return TSDB_CODE_SUCCESS;
477✔
1659
}
1660

1661
static int32_t mndSetUpdateDbTsmaVersionPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
477✔
1662
  int32_t  code = 0;
477✔
1663
  SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
477✔
1664
  if (pRedoRaw == NULL) {
477!
UNCOV
1665
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1666
    if (terrno != 0) code = terrno;
×
1667
    TAOS_RETURN(code);
×
1668
  }
1669
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
477!
UNCOV
1670
    sdbFreeRaw(pRedoRaw);
×
UNCOV
1671
    TAOS_RETURN(code);
×
1672
  }
1673

1674
  TAOS_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
477✔
1675
}
1676

1677
static int32_t mndSetUpdateDbTsmaVersionCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
477✔
1678
  int32_t  code = 0;
477✔
1679
  SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
477✔
1680
  if (pCommitRaw == NULL) {
477!
UNCOV
1681
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1682
    if (terrno != 0) code = terrno;
×
1683
    TAOS_RETURN(code);
×
1684
  }
1685
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
477!
UNCOV
1686
    sdbFreeRaw(pCommitRaw);
×
UNCOV
1687
    TAOS_RETURN(code);
×
1688
  }
1689

1690
  TAOS_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
477✔
1691
}
1692

1693
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt *pCxt) {
263✔
1694
  int32_t      code = -1;
263✔
1695
  STransAction createStreamRedoAction = {0};
263✔
1696
  STransAction createStreamUndoAction = {0};
263✔
1697
  STransAction dropStbUndoAction = {0};
263✔
1698
  SMDropStbReq dropStbReq = {0};
263✔
1699
  STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
263✔
1700
  if (!pTrans) {
263!
UNCOV
1701
    code = terrno;
×
UNCOV
1702
    goto _OVER;
×
1703
  }
1704
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
263✔
1705
  TAOS_CHECK_GOTO(mndTransCheckConflict(pCxt->pMnode, pTrans), NULL, _OVER);
263!
1706

1707
  mndTransSetSerial(pTrans);
263✔
1708
  mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name,
263!
1709
        pCxt->pCreateStreamReq->name);
1710

1711
  mndGetMnodeEpSet(pCxt->pMnode, &createStreamRedoAction.epSet);
263✔
1712
  createStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
263✔
1713
  createStreamRedoAction.msgType = TDMT_STREAM_CREATE;
263✔
1714
  createStreamRedoAction.contLen = tSerializeSCMCreateStreamReq(0, 0, pCxt->pCreateStreamReq);
263✔
1715
  createStreamRedoAction.pCont = taosMemoryCalloc(1, createStreamRedoAction.contLen);
263!
1716
  if (!createStreamRedoAction.pCont) {
263!
UNCOV
1717
    code = terrno;
×
UNCOV
1718
    goto _OVER;
×
1719
  }
1720
  if (createStreamRedoAction.contLen != tSerializeSCMCreateStreamReq(createStreamRedoAction.pCont,
263!
1721
                                                                     createStreamRedoAction.contLen,
1722
                                                                     pCxt->pCreateStreamReq)) {
263✔
UNCOV
1723
    mError("sma: %s, failed to create due to create stream req encode failure", pCxt->pCreateSmaReq->name);
×
UNCOV
1724
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1725
    goto _OVER;
×
1726
  }
1727

1728
  createStreamUndoAction.epSet = createStreamRedoAction.epSet;
263✔
1729
  createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
263✔
1730
  createStreamUndoAction.msgType = TDMT_STREAM_DROP;
263✔
1731
  createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
263✔
1732
  createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
263!
1733
  if (!createStreamUndoAction.pCont) {
263!
UNCOV
1734
    code = terrno;
×
1735
    goto _OVER;
×
1736
  }
1737
  if (createStreamUndoAction.contLen !=
263!
1738
      tSerializeSMDropStreamReq(createStreamUndoAction.pCont, createStreamUndoAction.contLen, pCxt->pDropStreamReq)) {
263✔
UNCOV
1739
    mError("sma: %s, failed to create due to drop stream req encode failure", pCxt->pCreateSmaReq->name);
×
UNCOV
1740
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1741
    goto _OVER;
×
1742
  }
1743

1744
  dropStbReq.igNotExists = true;
263✔
1745
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
263✔
1746
  dropStbUndoAction.epSet = createStreamRedoAction.epSet;
263✔
1747
  dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
263✔
1748
  dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
263✔
1749
  dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
263✔
1750
  dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
263✔
1751
  dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
263!
1752
  if (!dropStbUndoAction.pCont) {
263!
UNCOV
1753
    code = terrno;
×
1754
    goto _OVER;
×
1755
  }
1756
  if (dropStbUndoAction.contLen !=
263!
1757
      tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
263✔
UNCOV
1758
    mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
×
UNCOV
1759
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1760
    goto _OVER;
×
1761
  }
1762

1763
  SDbObj newDb = {0};
263✔
1764
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
263✔
1765
  newDb.tsmaVersion++;
263✔
1766
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
263!
1767
  TAOS_CHECK_GOTO(mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
263!
1768
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
263!
1769
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &createStreamRedoAction), NULL, _OVER);
263!
1770
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &createStreamUndoAction), NULL, _OVER);
263!
1771
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &dropStbUndoAction), NULL, _OVER);
263!
1772
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
263!
1773
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
263!
1774
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
263!
1775

1776
  code = TSDB_CODE_SUCCESS;
263✔
1777

1778
_OVER:
263✔
1779
  mndTransDrop(pTrans);
263✔
1780
  TAOS_RETURN(code);
263✔
1781
}
1782

1783
static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
263✔
1784
  int32_t            code = 0;
263✔
1785
  SSmaObj            sma = {0};
263✔
1786
  SCMCreateStreamReq createStreamReq = {0};
263✔
1787
  SMDropStreamReq    dropStreamReq = {0};
263✔
1788

1789
  pCxt->pSma = &sma;
263✔
1790
  initSMAObj(pCxt);
263✔
1791

1792
  SNodeList *pProjects = NULL;
263✔
1793
  code = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects);
263✔
1794
  if (TSDB_CODE_SUCCESS != code) {
263!
UNCOV
1795
    goto _OVER;
×
1796
  }
1797
  pCxt->pProjects = pProjects;
263✔
1798

1799
  pCxt->pCreateStreamReq = &createStreamReq;
263✔
1800
  if (pCxt->pCreateSmaReq->pVgroupVerList) {
263✔
1801
    pCxt->pCreateStreamReq->pVgroupVerList = taosArrayDup(pCxt->pCreateSmaReq->pVgroupVerList, NULL);
234✔
1802
    if (!pCxt->pCreateStreamReq->pVgroupVerList) {
234!
UNCOV
1803
      code = terrno;
×
UNCOV
1804
      goto _OVER;
×
1805
    }
1806
  }
1807
  if (LIST_LENGTH(pProjects) > 0) {
263!
1808
    createStreamReq.pCols = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SFieldWithOptions));
263!
1809
    if (!createStreamReq.pCols) {
263!
UNCOV
1810
      code = terrno;
×
UNCOV
1811
      goto _OVER;
×
1812
    }
1813
  }
1814
  pCxt->pDropStreamReq = &dropStreamReq;
263✔
1815
  code = mndCreateTSMABuildCreateStreamReq(pCxt);
263✔
1816
  if (TSDB_CODE_SUCCESS != code) {
263!
1817
    goto _OVER;
×
1818
  }
1819
  code = mndCreateTSMABuildDropStreamReq(pCxt);
263✔
1820
  if (TSDB_CODE_SUCCESS != code) {
263!
1821
    goto _OVER;
×
1822
  }
1823

1824
  if (TSDB_CODE_SUCCESS != (code = mndCreateTSMATxnPrepare(pCxt))) {
263!
UNCOV
1825
    goto _OVER;
×
1826
  } else {
1827
    mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 " dstTb:%s dstVg:%d", pCxt->pCreateSmaReq->name, sma.uid,
263!
1828
          sma.stbUid, sma.dstTbName, sma.dstVgId);
1829
    code = 0;
263✔
1830
  }
1831

1832
_OVER:
263✔
1833
  tFreeSCMCreateStreamReq(pCxt->pCreateStreamReq);
263✔
1834
  if (pCxt->pDropStreamReq) tFreeMDropStreamReq(pCxt->pDropStreamReq);
263!
1835
  pCxt->pCreateStreamReq = NULL;
263✔
1836
  if (pProjects) nodesDestroyList(pProjects);
263!
1837
  pCxt->pProjects = NULL;
263✔
1838
  TAOS_RETURN(code);
263✔
1839
}
1840

1841
static int32_t mndTSMAGenerateOutputName(const char *tsmaName, char *streamName, char *targetStbName) {
497✔
1842
  SName   smaName;
1843
  int32_t code = tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
497✔
1844
  if (TSDB_CODE_SUCCESS != code) {
497!
UNCOV
1845
    return code;
×
1846
  }
1847
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
497✔
1848
  snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s" TSMA_RES_STB_POSTFIX, tsmaName);
497✔
1849
  return TSDB_CODE_SUCCESS;
497✔
1850
}
1851

1852
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq) {
278✔
1853
#ifdef WINDOWS
1854
  TAOS_RETURN(TSDB_CODE_MND_INVALID_PLATFORM);
1855
#endif
1856
  SMnode        *pMnode = pReq->info.node;
278✔
1857
  int32_t        code = -1;
278✔
1858
  SDbObj        *pDb = NULL;
278✔
1859
  SStbObj       *pStb = NULL;
278✔
1860
  SSmaObj       *pSma = NULL;
278✔
1861
  SSmaObj       *pBaseTsma = NULL;
278✔
1862
  SStreamObj    *pStream = NULL;
278✔
1863
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
278✔
1864
  SMCreateSmaReq createReq = {0};
278✔
1865

1866
  if (sdbGetSize(pMnode->pSdb, SDB_SMA) >= tsMaxTsmaNum) {
278!
UNCOV
1867
    code = TSDB_CODE_MND_MAX_TSMA_NUM_EXCEEDED;
×
1868
    goto _OVER;
×
1869
  }
1870

1871
  if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
278!
UNCOV
1872
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1873
    goto _OVER;
×
1874
  }
1875

1876
  mInfo("start to create tsma: %s", createReq.name);
278!
1877
  if ((code = mndCheckCreateSmaReq(&createReq)) != 0) goto _OVER;
278!
1878

1879
  if (createReq.normSourceTbUid == 0) {
278✔
1880
    pStb = mndAcquireStb(pMnode, createReq.stb);
261✔
1881
    if (!pStb && !createReq.recursiveTsma) {
261!
UNCOV
1882
      mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
UNCOV
1883
      code = TSDB_CODE_MND_STB_NOT_EXIST;
×
UNCOV
1884
      goto _OVER;
×
1885
    }
1886
  }
1887

1888
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
278✔
1889
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
278✔
1890
  code = mndTSMAGenerateOutputName(createReq.name, streamName, streamTargetStbFullName);
278✔
1891
  if (TSDB_CODE_SUCCESS != code) {
278!
UNCOV
1892
    mInfo("tsma:%s, faield to generate name", createReq.name);
×
UNCOV
1893
    goto _OVER;
×
1894
  }
1895

1896
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.name);
278✔
1897
  if (pSma && createReq.igExists) {
278!
UNCOV
1898
    mInfo("tsma:%s, already exists in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
UNCOV
1899
    code = 0;
×
UNCOV
1900
    goto _OVER;
×
1901
  }
1902

1903
  if (pSma) {
278✔
1904
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
10✔
1905
    goto _OVER;
10✔
1906
  }
1907

1908
  SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName);
268✔
1909
  if (pTargetStb) {
268!
UNCOV
1910
    code = TSDB_CODE_TDB_STB_ALREADY_EXIST;
×
UNCOV
1911
    mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name,
×
1912
           streamTargetStbFullName);
UNCOV
1913
    goto _OVER;
×
1914
  }
1915

1916
  code = mndAcquireStream(pMnode, streamName, &pStream);
268✔
1917
  if (pStream != NULL || code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
268!
1918
    mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
5!
1919
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
5✔
1920
    goto _OVER;
5✔
1921
  }
1922

1923
  SName name = {0};
263✔
1924
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
263✔
1925
  if (TSDB_CODE_SUCCESS != code) {
263!
UNCOV
1926
    goto _OVER;
×
1927
  }
1928
  char db[TSDB_TABLE_FNAME_LEN] = {0};
263✔
1929
  (void)tNameGetFullDbName(&name, db);
263✔
1930

1931
  pDb = mndAcquireDb(pMnode, db);
263✔
1932
  if (pDb == NULL) {
263!
UNCOV
1933
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
UNCOV
1934
    goto _OVER;
×
1935
  }
1936

1937
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
263!
1938

1939
  if (createReq.recursiveTsma) {
263✔
1940
    pBaseTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName);
85✔
1941
    if (!pBaseTsma) {
85!
UNCOV
1942
      mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName);
×
1943
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
UNCOV
1944
      goto _OVER;
×
1945
    }
1946
    if (!pStb) {
85!
UNCOV
1947
      createReq.normSourceTbUid = pBaseTsma->stbUid;
×
1948
    }
1949
  }
1950

1951
  SCreateTSMACxt cxt = {
263✔
1952
      .pMnode = pMnode,
1953
      .pCreateSmaReq = &createReq,
1954
      .pCreateStreamReq = NULL,
1955
      .streamName = streamName,
1956
      .targetStbFullName = streamTargetStbFullName,
1957
      .pDb = pDb,
1958
      .pRpcReq = pReq,
1959
      .pSma = NULL,
1960
      .pBaseSma = pBaseTsma,
1961
      .pSrcStb = pStb,
1962
  };
1963

1964
  code = mndCreateTSMA(&cxt);
263✔
1965
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
263!
1966

UNCOV
1967
_OVER:
×
1968
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
278!
1969
    mError("tsma:%s, failed to create since %s", createReq.name, tstrerror(code));
15!
1970
  }
1971

1972
  if (pStb) mndReleaseStb(pMnode, pStb);
278✔
1973
  if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
278✔
1974
  mndReleaseSma(pMnode, pSma);
278✔
1975
  mndReleaseStream(pMnode, pStream);
278✔
1976
  mndReleaseDb(pMnode, pDb);
278✔
1977
  tFreeSMCreateSmaReq(&createReq);
278✔
1978

1979
  TAOS_RETURN(code);
278✔
1980
}
1981

1982
static int32_t mndDropTSMA(SCreateTSMACxt *pCxt) {
214✔
1983
  int32_t      code = -1;
214✔
1984
  STransAction dropStreamRedoAction = {0};
214✔
1985
  STrans      *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "drop-tsma");
214✔
1986
  if (!pTrans) {
214!
UNCOV
1987
    code = terrno;
×
UNCOV
1988
    goto _OVER;
×
1989
  }
1990
  SMDropStreamReq dropStreamReq = {0};
214✔
1991
  pCxt->pDropStreamReq = &dropStreamReq;
214✔
1992
  code = mndCreateTSMABuildDropStreamReq(pCxt);
214✔
1993
  if (TSDB_CODE_SUCCESS != code) {
214!
UNCOV
1994
    goto _OVER;
×
1995
  }
1996
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
214✔
1997
  if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
214!
1998
  mndTransSetSerial(pTrans);
214✔
1999
  mndGetMnodeEpSet(pCxt->pMnode, &dropStreamRedoAction.epSet);
214✔
2000
  dropStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
214✔
2001
  dropStreamRedoAction.msgType = TDMT_STREAM_DROP;
214✔
2002
  dropStreamRedoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
214✔
2003
  dropStreamRedoAction.pCont = taosMemoryCalloc(1, dropStreamRedoAction.contLen);
214!
2004
  if (!dropStreamRedoAction.pCont) {
214!
UNCOV
2005
    code = terrno;
×
2006
    goto _OVER;
×
2007
  }
2008
  if (dropStreamRedoAction.contLen !=
214!
2009
      tSerializeSMDropStreamReq(dropStreamRedoAction.pCont, dropStreamRedoAction.contLen, pCxt->pDropStreamReq)) {
214✔
UNCOV
2010
    mError("tsma: %s, failed to drop due to drop stream req encode failure", pCxt->pDropSmaReq->name);
×
UNCOV
2011
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2012
    goto _OVER;
×
2013
  }
2014

2015
  // output stable is not dropped when dropping stream, dropping it when dropping tsma
2016
  SMDropStbReq dropStbReq = {0};
214✔
2017
  dropStbReq.igNotExists = false;
214✔
2018
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
214✔
2019
  dropStbReq.sql = "drop";
214✔
2020
  dropStbReq.sqlLen = 5;
214✔
2021

2022
  STransAction dropStbRedoAction = {0};
214✔
2023
  mndGetMnodeEpSet(pCxt->pMnode, &dropStbRedoAction.epSet);
214✔
2024
  dropStbRedoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
214✔
2025
  dropStbRedoAction.msgType = TDMT_MND_STB_DROP;
214✔
2026
  dropStbRedoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
214✔
2027
  dropStbRedoAction.pCont = taosMemoryCalloc(1, dropStbRedoAction.contLen);
214!
2028
  if (!dropStbRedoAction.pCont) {
214!
UNCOV
2029
    code = terrno;
×
2030
    goto _OVER;
×
2031
  }
2032
  if (dropStbRedoAction.contLen !=
214!
2033
      tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
214✔
UNCOV
2034
    mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name);
×
UNCOV
2035
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2036
    goto _OVER;
×
2037
  }
2038

2039
  SDbObj newDb = {0};
214✔
2040
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
214✔
2041
  newDb.tsmaVersion++;
214✔
2042
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
214!
2043
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
214!
2044
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStreamRedoAction), NULL, _OVER);
214!
2045
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStbRedoAction), NULL, _OVER);
214!
2046
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
214!
2047
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
214!
2048
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
214!
2049
  code = TSDB_CODE_SUCCESS;
214✔
2050
_OVER:
214✔
2051
  tFreeMDropStreamReq(pCxt->pDropStreamReq);
214✔
2052
  mndTransDrop(pTrans);
214✔
2053
  TAOS_RETURN(code);
214✔
2054
}
2055

2056
static bool hasRecursiveTsmasBasedOnMe(SMnode *pMnode, const SSmaObj *pSma) {
219✔
2057
  SSmaObj *pSmaObj = NULL;
219✔
2058
  void    *pIter = NULL;
219✔
2059
  while (1) {
2060
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj);
580✔
2061
    if (pIter == NULL) break;
580✔
2062
    if (0 == strncmp(pSmaObj->baseSmaName, pSma->name, TSDB_TABLE_FNAME_LEN)) {
366✔
2063
      sdbRelease(pMnode->pSdb, pSmaObj);
5✔
2064
      sdbCancelFetch(pMnode->pSdb, pIter);
5✔
2065
      return true;
5✔
2066
    }
2067
    sdbRelease(pMnode->pSdb, pSmaObj);
361✔
2068
  }
2069
  return false;
214✔
2070
}
2071

2072
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq) {
219✔
2073
  int32_t      code = -1;
219✔
2074
  SMDropSmaReq dropReq = {0};
219✔
2075
  SSmaObj     *pSma = NULL;
219✔
2076
  SDbObj      *pDb = NULL;
219✔
2077
  SMnode      *pMnode = pReq->info.node;
219✔
2078
  if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) {
219!
UNCOV
2079
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2080
    goto _OVER;
×
2081
  }
2082

2083
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
219✔
2084
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
219✔
2085
  code = mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
219✔
2086
  if (TSDB_CODE_SUCCESS != code) {
219!
UNCOV
2087
    goto _OVER;
×
2088
  }
2089

2090
  SStbObj *pStb = mndAcquireStb(pMnode, streamTargetStbFullName);
219✔
2091

2092
  pSma = mndAcquireSma(pMnode, dropReq.name);
219✔
2093
  if (!pSma && dropReq.igNotExists) {
219!
2094
    code = 0;
×
2095
    goto _OVER;
×
2096
  }
2097
  if (!pSma) {
219!
UNCOV
2098
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
UNCOV
2099
    goto _OVER;
×
2100
  }
2101
  SName name = {0};
219✔
2102
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
219✔
2103
  if (TSDB_CODE_SUCCESS != code) {
219!
UNCOV
2104
    goto _OVER;
×
2105
  }
2106
  char db[TSDB_TABLE_FNAME_LEN] = {0};
219✔
2107
  (void)tNameGetFullDbName(&name, db);
219✔
2108

2109
  pDb = mndAcquireDb(pMnode, db);
219✔
2110
  if (!pDb) {
219!
UNCOV
2111
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
2112
    goto _OVER;
×
2113
  }
2114

2115
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
219!
UNCOV
2116
    goto _OVER;
×
2117
  }
2118

2119
  if (hasRecursiveTsmasBasedOnMe(pMnode, pSma)) {
219✔
2120
    code = TSDB_CODE_MND_INVALID_DROP_TSMA;
5✔
2121
    goto _OVER;
5✔
2122
  }
2123

2124
  SCreateTSMACxt cxt = {
214✔
2125
      .pDb = pDb,
2126
      .pMnode = pMnode,
2127
      .pRpcReq = pReq,
2128
      .pSma = pSma,
2129
      .streamName = streamName,
2130
      .targetStbFullName = streamTargetStbFullName,
2131
      .pDropSmaReq = &dropReq,
2132
  };
2133

2134
  code = mndDropTSMA(&cxt);
214✔
2135

2136
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
214!
UNCOV
2137
_OVER:
×
2138

2139
  mndReleaseStb(pMnode, pStb);
219✔
2140
  mndReleaseSma(pMnode, pSma);
219✔
2141
  mndReleaseDb(pMnode, pDb);
219✔
2142
  TAOS_RETURN(code);
219✔
2143
}
2144

2145
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
6✔
2146
  SDbObj          *pDb = NULL;
6✔
2147
  int32_t          numOfRows = 0;
6✔
2148
  SSmaObj         *pSma = NULL;
6✔
2149
  SMnode          *pMnode = pReq->info.node;
6✔
2150
  int32_t          code = 0;
6✔
2151
  SColumnInfoData *pColInfo;
2152
  if (pShow->pIter == NULL) {
6!
2153
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
6!
2154
  }
2155
  if (!pShow->pIter) {
6!
UNCOV
2156
    return terrno;
×
2157
  }
2158
  if (pShow->db[0]) {
6✔
2159
    pDb = mndAcquireDb(pMnode, pShow->db);
2✔
2160
  }
2161
  SSmaAndTagIter *pIter = pShow->pIter;
6✔
2162
  while (numOfRows < rows) {
14!
2163
    pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
14✔
2164
    if (pIter->pSmaIter == NULL) break;
14✔
2165
    SDbObj *pSrcDb = mndAcquireDb(pMnode, pSma->db);
8✔
2166

2167
    if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) {
8!
UNCOV
2168
      sdbRelease(pMnode->pSdb, pSma);
×
UNCOV
2169
      if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
UNCOV
2170
      continue;
×
2171
    }
2172

2173
    int32_t cols = 0;
8✔
2174
    SName   n = {0};
8✔
2175

2176
    code = tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
8✔
2177
    char smaName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
2178
    if (TSDB_CODE_SUCCESS == code) {
8!
2179
      STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n));
8✔
2180
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2181
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
8✔
2182
    }
2183

2184
    char db[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
2185
    if (TSDB_CODE_SUCCESS == code) {
8!
2186
      STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
8✔
2187
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2188
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
8✔
2189
    }
2190

2191
    if (TSDB_CODE_SUCCESS == code) {
8!
2192
      code = tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
8✔
2193
    }
2194
    char srcTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
2195
    if (TSDB_CODE_SUCCESS == code) {
8!
2196
      STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
8✔
2197
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2198
      code = colDataSetVal(pColInfo, numOfRows, (const char *)srcTb, false);
8✔
2199
    }
2200

2201
    if (TSDB_CODE_SUCCESS == code) {
8!
2202
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2203
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
8✔
2204
    }
2205

2206
    if (TSDB_CODE_SUCCESS == code) {
8!
2207
      code = tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
8✔
2208
    }
2209

2210
    if (TSDB_CODE_SUCCESS == code) {
8!
2211
      char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
2212
      STR_TO_VARSTR(targetTb, (char *)tNameGetTableName(&n));
8✔
2213
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2214
      code = colDataSetVal(pColInfo, numOfRows, (const char *)targetTb, false);
8✔
2215
    }
2216

2217
    if (TSDB_CODE_SUCCESS == code) {
8!
2218
      // stream name
2219
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2220
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
8✔
2221
    }
2222

2223
    if (TSDB_CODE_SUCCESS == code) {
8!
2224
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2225
      code = colDataSetVal(pColInfo, numOfRows, (const char *)(&pSma->createdTime), false);
8✔
2226
    }
2227

2228
    // interval
2229
    char    interval[64 + VARSTR_HEADER_SIZE] = {0};
8✔
2230
    int32_t len = 0;
8✔
2231
    if (TSDB_CODE_SUCCESS == code) {
8!
2232
      if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
8!
2233
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
8✔
2234
                        getPrecisionUnit(pSrcDb->cfg.precision));
8✔
2235
      } else {
UNCOV
2236
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
×
2237
      }
2238
      varDataSetLen(interval, len);
8✔
2239
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2240
      code = colDataSetVal(pColInfo, numOfRows, interval, false);
8✔
2241
    }
2242

2243
    char buf[TSDB_MAX_SAVED_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
2244
    if (TSDB_CODE_SUCCESS == code) {
8!
2245
      // create sql
2246
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2247
      len = tsnprintf(buf + VARSTR_HEADER_SIZE, TSDB_MAX_SAVED_SQL_LEN, "%s", pSma->sql);
8✔
2248
      varDataSetLen(buf, TMIN(len, TSDB_MAX_SAVED_SQL_LEN));
8✔
2249
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
8✔
2250
    }
2251

2252
    // func list
2253
    len = 0;
8✔
2254
    SNode *pNode = NULL, *pFunc = NULL;
8✔
2255
    if (TSDB_CODE_SUCCESS == code) {
8!
2256
      code = nodesStringToNode(pSma->ast, &pNode);
8✔
2257
    }
2258
    if (TSDB_CODE_SUCCESS == code) {
8!
2259
      char *start = buf + VARSTR_HEADER_SIZE;
8✔
2260
      FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) {
48!
2261
        if (nodeType(pFunc) == QUERY_NODE_FUNCTION) {
40!
2262
          SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
40✔
2263
          if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
40✔
2264
          len += tsnprintf(start, TSDB_MAX_SAVED_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "",
16✔
2265
                           ((SExprNode *)pFunc)->userAlias);
16✔
2266
          if (len >= TSDB_MAX_SAVED_SQL_LEN) {
16!
UNCOV
2267
            len = TSDB_MAX_SAVED_SQL_LEN;
×
UNCOV
2268
            break;
×
2269
          }
2270
          start = buf + VARSTR_HEADER_SIZE + len;
16✔
2271
        }
2272
      }
2273
      nodesDestroyNode(pNode);
8✔
2274
    }
2275

2276
    if (TSDB_CODE_SUCCESS == code) {
8!
2277
      varDataSetLen(buf, len);
8✔
2278
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2279
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
8✔
2280
    }
2281

2282
    numOfRows++;
8✔
2283
    mndReleaseSma(pMnode, pSma);
8✔
2284
    mndReleaseDb(pMnode, pSrcDb);
8✔
2285
    if (TSDB_CODE_SUCCESS != code) {
8!
UNCOV
2286
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
UNCOV
2287
      numOfRows = code;
×
UNCOV
2288
      break;
×
2289
    }
2290
  }
2291
  mndReleaseDb(pMnode, pDb);
6✔
2292
  pShow->numOfRows += numOfRows;
6✔
2293
  if (numOfRows < rows) {
6!
2294
    taosMemoryFree(pShow->pIter);
6!
2295
    pShow->pIter = NULL;
6✔
2296
  }
2297
  return numOfRows;
6✔
2298
}
2299

2300
static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
×
UNCOV
2301
  SSmaAndTagIter *p = pIter;
×
2302
  if (p != NULL) {
×
2303
    SSdb *pSdb = pMnode->pSdb;
×
UNCOV
2304
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
2305
  }
UNCOV
2306
  taosMemoryFree(p);
×
UNCOV
2307
}
×
2308

2309
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj *pSma, const SStbObj *pDestStb, STableTSMAInfo *pInfo,
2,012✔
2310
                               const SSmaObj *pBaseTsma) {
2311
  int32_t code = 0;
2,012✔
2312
  pInfo->interval = pSma->interval;
2,012✔
2313
  pInfo->unit = pSma->intervalUnit;
2,012✔
2314
  pInfo->tsmaId = pSma->uid;
2,012✔
2315
  pInfo->version = pSma->version;
2,012✔
2316
  pInfo->tsmaId = pSma->uid;
2,012✔
2317
  pInfo->destTbUid = pDestStb->uid;
2,012✔
2318
  SName sName = {0};
2,012✔
2319
  code = tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
2,012✔
2320
  if (TSDB_CODE_SUCCESS != code) {
2,012!
UNCOV
2321
    return code;
×
2322
  }
2323
  tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN);
2,012✔
2324
  tstrncpy(pInfo->targetDbFName, pSma->db, TSDB_DB_FNAME_LEN);
2,012✔
2325
  code = tNameFromString(&sName, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
2,012✔
2326
  if (TSDB_CODE_SUCCESS != code) {
2,012!
UNCOV
2327
    return code;
×
2328
  }
2329
  tstrncpy(pInfo->targetTb, sName.tname, TSDB_TABLE_NAME_LEN);
2,012✔
2330
  tstrncpy(pInfo->dbFName, pSma->db, TSDB_DB_FNAME_LEN);
2,012✔
2331
  code = tNameFromString(&sName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
2,012✔
2332
  if (TSDB_CODE_SUCCESS != code) {
2,012!
UNCOV
2333
    return code;
×
2334
  }
2335
  tstrncpy(pInfo->tb, sName.tname, TSDB_TABLE_NAME_LEN);
2,012✔
2336
  pInfo->pFuncs = taosArrayInit(8, sizeof(STableTSMAFuncInfo));
2,012✔
2337
  if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY;
2,012!
2338

2339
  SNode *pNode, *pFunc;
2340
  if (TSDB_CODE_SUCCESS != nodesStringToNode(pBaseTsma ? pBaseTsma->ast : pSma->ast, &pNode)) {
2,012!
UNCOV
2341
    taosArrayDestroy(pInfo->pFuncs);
×
UNCOV
2342
    pInfo->pFuncs = NULL;
×
UNCOV
2343
    return TSDB_CODE_TSMA_INVALID_STAT;
×
2344
  }
2345
  if (pNode) {
2,012!
2346
    SSelectStmt *pSelect = (SSelectStmt *)pNode;
2,012✔
2347
    FOREACH(pFunc, pSelect->pProjectionList) {
353,094!
2348
      STableTSMAFuncInfo funcInfo = {0};
351,082✔
2349
      SFunctionNode     *pFuncNode = (SFunctionNode *)pFunc;
351,082✔
2350
      if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
351,082✔
2351
      funcInfo.funcId = pFuncNode->funcId;
345,046✔
2352
      funcInfo.colId = ((SColumnNode *)pFuncNode->pParameterList->pHead->pNode)->colId;
345,046✔
2353
      if (!taosArrayPush(pInfo->pFuncs, &funcInfo)) {
690,092!
UNCOV
2354
        code = terrno;
×
UNCOV
2355
        taosArrayDestroy(pInfo->pFuncs);
×
UNCOV
2356
        nodesDestroyNode(pNode);
×
UNCOV
2357
        return code;
×
2358
      }
2359
    }
2360
    nodesDestroyNode(pNode);
2,012✔
2361
  }
2362
  pInfo->ast = taosStrdup(pSma->ast);
2,012!
2363
  if (!pInfo->ast) code = terrno;
2,012!
2364

2365
  if (code == TSDB_CODE_SUCCESS && pDestStb->numOfTags > 0) {
2,012!
2366
    pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema));
2,012✔
2367
    if (!pInfo->pTags) {
2,012!
2368
      code = terrno;
×
2369
    } else {
2370
      for (int32_t i = 0; i < pDestStb->numOfTags; ++i) {
14,832✔
2371
        if (NULL == taosArrayPush(pInfo->pTags, &pDestStb->pTags[i])) {
25,640!
UNCOV
2372
          code = terrno;
×
UNCOV
2373
          break;
×
2374
        }
2375
      }
2376
    }
2377
  }
2378
  if (code == TSDB_CODE_SUCCESS) {
2,012!
2379
    pInfo->pUsedCols = taosArrayInit(pDestStb->numOfColumns - 3, sizeof(SSchema));
2,012✔
2380
    if (!pInfo->pUsedCols)
2,012!
UNCOV
2381
      code = terrno;
×
2382
    else {
2383
      // skip _wstart, _wend, _duration
2384
      for (int32_t i = 1; i < pDestStb->numOfColumns - 2; ++i) {
347,058✔
2385
        if (NULL == taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i])) {
690,092!
UNCOV
2386
          code = terrno;
×
UNCOV
2387
          break;
×
2388
        }
2389
      }
2390
    }
2391
  }
2392
  TAOS_RETURN(code);
2,012✔
2393
}
2394

2395
// @note remember to mndReleaseSma(*ppOut)
2396
static int32_t mndGetDeepestBaseForTsma(SMnode *pMnode, SSmaObj *pSma, SSmaObj **ppOut) {
2,012✔
2397
  int32_t  code = 0;
2,012✔
2398
  SSmaObj *pRecursiveTsma = NULL;
2,012✔
2399
  if (pSma->baseSmaName[0]) {
2,012✔
2400
    pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName);
445✔
2401
    if (!pRecursiveTsma) {
445!
UNCOV
2402
      mError("base tsma: %s for tsma: %s not found", pSma->baseSmaName, pSma->name);
×
UNCOV
2403
      return TSDB_CODE_MND_SMA_NOT_EXIST;
×
2404
    }
2405
    while (pRecursiveTsma->baseSmaName[0]) {
490✔
2406
      SSmaObj *pTmpSma = pRecursiveTsma;
45✔
2407
      pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
45✔
2408
      if (!pRecursiveTsma) {
45!
UNCOV
2409
        mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name);
×
UNCOV
2410
        mndReleaseSma(pMnode, pTmpSma);
×
UNCOV
2411
        return TSDB_CODE_MND_SMA_NOT_EXIST;
×
2412
      }
2413
      mndReleaseSma(pMnode, pTmpSma);
45✔
2414
    }
2415
  }
2416
  *ppOut = pRecursiveTsma;
2,012✔
2417
  return code;
2,012✔
2418
}
2419

2420
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
94✔
2421
  int32_t  code = -1;
94✔
2422
  SSmaObj *pSma = NULL;
94✔
2423
  SSmaObj *pBaseTsma = NULL;
94✔
2424
  SStbObj *pDstStb = NULL;
94✔
2425

2426
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName);
94✔
2427
  if (pSma) {
94!
2428
    pDstStb = mndAcquireStb(pMnode, pSma->dstTbName);
94✔
2429
    if (!pDstStb) {
94!
UNCOV
2430
      sdbRelease(pMnode->pSdb, pSma);
×
UNCOV
2431
      return TSDB_CODE_SUCCESS;
×
2432
    }
2433

2434
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
94!
2435
    if (!pTsma) {
94!
UNCOV
2436
      code = terrno;
×
UNCOV
2437
      sdbRelease(pMnode->pSdb, pSma);
×
UNCOV
2438
      mndReleaseStb(pMnode, pDstStb);
×
UNCOV
2439
      TAOS_RETURN(code);
×
2440
    }
2441

2442
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
94✔
2443
    if (code == 0) {
94!
2444
      code = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma, pBaseTsma);
94✔
2445
    }
2446
    mndReleaseStb(pMnode, pDstStb);
94✔
2447
    sdbRelease(pMnode->pSdb, pSma);
94✔
2448
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
94✔
2449
    if (terrno) {
94!
2450
      tFreeAndClearTableTSMAInfo(pTsma);
×
2451
      TAOS_RETURN(code);
×
2452
    }
2453
    if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
188!
UNCOV
2454
      code = terrno;
×
UNCOV
2455
      tFreeAndClearTableTSMAInfo(pTsma);
×
2456
    }
2457
    *exist = true;
94✔
2458
  }
2459
  TAOS_RETURN(code);
94✔
2460
}
2461

2462
typedef bool (*tsmaFilter)(const SSmaObj *pSma, void *param);
2463

2464
static int32_t mndGetSomeTsmas(SMnode *pMnode, STableTSMAInfoRsp *pRsp, tsmaFilter filtered, void *param, bool *exist) {
13,731✔
2465
  int32_t     code = 0;
13,731✔
2466
  SSmaObj    *pSma = NULL;
13,731✔
2467
  SSmaObj    *pBaseTsma = NULL;
13,731✔
2468
  SSdb       *pSdb = pMnode->pSdb;
13,731✔
2469
  void       *pIter = NULL;
13,731✔
2470
  SStreamObj *pStream = NULL;
13,731✔
2471
  SStbObj    *pStb = NULL;
13,731✔
2472
  bool        shouldRetry = false;
13,731✔
2473

2474
  while (1) {
2,299✔
2475
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
16,030✔
2476
    if (pIter == NULL) break;
16,030✔
2477

2478
    if (filtered(pSma, param)) {
2,299✔
2479
      sdbRelease(pSdb, pSma);
381✔
2480
      continue;
381✔
2481
    }
2482

2483
    pStb = mndAcquireStb(pMnode, pSma->dstTbName);
1,918✔
2484
    if (!pStb) {
1,918!
UNCOV
2485
      sdbRelease(pSdb, pSma);
×
UNCOV
2486
      shouldRetry = true;
×
UNCOV
2487
      continue;
×
2488
    }
2489

2490
    SName smaName;
2491
    char  streamName[TSDB_TABLE_FNAME_LEN] = {0};
1,918✔
2492
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,918✔
2493
    if (TSDB_CODE_SUCCESS != code) {
1,918!
UNCOV
2494
      sdbRelease(pSdb, pSma);
×
UNCOV
2495
      mndReleaseStb(pMnode, pStb);
×
UNCOV
2496
      TAOS_RETURN(code);
×
2497
    }
2498
    snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
1,918✔
2499
    pStream = NULL;
1,918✔
2500

2501
    code = mndAcquireStream(pMnode, streamName, &pStream);
1,918✔
2502
    if (!pStream) {
1,918!
UNCOV
2503
      shouldRetry = true;
×
UNCOV
2504
      sdbRelease(pSdb, pSma);
×
2505
      mndReleaseStb(pMnode, pStb);
×
2506
      continue;
×
2507
    }
2508
    if (code != 0) {
1,918!
UNCOV
2509
      sdbRelease(pSdb, pSma);
×
UNCOV
2510
      mndReleaseStb(pMnode, pStb);
×
UNCOV
2511
      TAOS_RETURN(code);
×
2512
    }
2513

2514
    int64_t streamId = pStream->uid;
1,918✔
2515
    mndReleaseStream(pMnode, pStream);
1,918✔
2516

2517
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
1,918!
2518
    if (!pTsma) {
1,918!
2519
      code = terrno;
×
UNCOV
2520
      mndReleaseStb(pMnode, pStb);
×
UNCOV
2521
      sdbRelease(pSdb, pSma);
×
UNCOV
2522
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
2523
      TAOS_RETURN(code);
×
2524
    }
2525
    pTsma->streamUid = streamId;
1,918✔
2526

2527
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
1,918✔
2528
    if (code == 0) {
1,918!
2529
      code = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma, pBaseTsma);
1,918✔
2530
    }
2531
    mndReleaseStb(pMnode, pStb);
1,918✔
2532
    sdbRelease(pSdb, pSma);
1,918✔
2533
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
1,918✔
2534
    if (terrno) {
1,918!
UNCOV
2535
      tFreeAndClearTableTSMAInfo(pTsma);
×
2536
      sdbCancelFetch(pSdb, pIter);
×
2537
      TAOS_RETURN(code);
×
2538
    }
2539
    if (NULL == taosArrayPush(pRsp->pTsmas, &pTsma)) {
3,836!
UNCOV
2540
      code = terrno;
×
UNCOV
2541
      tFreeAndClearTableTSMAInfo(pTsma);
×
UNCOV
2542
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
2543
      TAOS_RETURN(code);
×
2544
    }
2545
    *exist = true;
1,918✔
2546
  }
2547
  if (shouldRetry) {
13,731!
UNCOV
2548
    return TSDB_CODE_NEED_RETRY;
×
2549
  }
2550
  return TSDB_CODE_SUCCESS;
13,731✔
2551
}
2552

2553
static bool tsmaTbFilter(const SSmaObj *pSma, void *param) {
1,954✔
2554
  const char *tbFName = param;
1,954✔
2555
  return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0;
1,954!
2556
}
2557

2558
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *pRsp, bool *exist) {
1,307✔
2559
  return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist);
1,307✔
2560
}
2561

2562
static bool tsmaDbFilter(const SSmaObj *pSma, void *param) {
345✔
2563
  uint64_t *dbUid = param;
345✔
2564
  return pSma->dbUid != *dbUid;
345✔
2565
}
2566

2567
int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist) {
12,424✔
2568
  return mndGetSomeTsmas(pMnode, pRsp, tsmaDbFilter, &dbUid, exist);
12,424✔
2569
}
2570

2571
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
1,401✔
2572
  STableTSMAInfoRsp rsp = {0};
1,401✔
2573
  int32_t           code = -1;
1,401✔
2574
  STableTSMAInfoReq tsmaReq = {0};
1,401✔
2575
  bool              exist = false;
1,401✔
2576
  SMnode           *pMnode = pReq->info.node;
1,401✔
2577

2578
  TAOS_CHECK_GOTO(tDeserializeTableTSMAInfoReq(pReq->pCont, pReq->contLen, &tsmaReq), NULL, _OVER);
1,401!
2579

2580
  rsp.pTsmas = taosArrayInit(4, POINTER_BYTES);
1,401✔
2581
  if (NULL == rsp.pTsmas) {
1,401!
UNCOV
2582
    code = terrno;
×
UNCOV
2583
    goto _OVER;
×
2584
  }
2585

2586
  if (tsmaReq.fetchingWithTsmaName) {
1,401✔
2587
    code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
94✔
2588
  } else {
2589
    code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
1,307✔
2590
    if (TSDB_CODE_NEED_RETRY == code) {
1,307!
2591
      code = TSDB_CODE_SUCCESS;
×
2592
    }
2593
  }
2594
  if (code) {
1,401!
UNCOV
2595
    goto _OVER;
×
2596
  }
2597

2598
  if (!exist) {
1,401✔
2599
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
240✔
2600
  } else {
2601
    int32_t contLen = tSerializeTableTSMAInfoRsp(NULL, 0, &rsp);
1,161✔
2602
    void   *pRsp = rpcMallocCont(contLen);
1,161✔
2603
    if (pRsp == NULL) {
1,161!
UNCOV
2604
      code = terrno;
×
UNCOV
2605
      goto _OVER;
×
2606
    }
2607

2608
    int32_t len = tSerializeTableTSMAInfoRsp(pRsp, contLen, &rsp);
1,161✔
2609
    if (len < 0) {
1,161!
UNCOV
2610
      code = terrno;
×
UNCOV
2611
      goto _OVER;
×
2612
    }
2613

2614
    pReq->info.rsp = pRsp;
1,161✔
2615
    pReq->info.rspLen = contLen;
1,161✔
2616

2617
    code = 0;
1,161✔
2618
  }
2619

2620
_OVER:
1,401✔
2621
  tFreeTableTSMAInfoRsp(&rsp);
1,401✔
2622
  TAOS_RETURN(code);
1,401✔
2623
}
2624

2625
static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo **ppTsma) {
1✔
2626
  STableTSMAInfo *pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
1!
2627
  if (!pInfo) {
1!
UNCOV
2628
    return terrno;
×
2629
  }
2630
  pInfo->pFuncs = NULL;
1✔
2631
  pInfo->tsmaId = pTsmaVer->tsmaId;
1✔
2632
  tstrncpy(pInfo->dbFName, pTsmaVer->dbFName, TSDB_DB_FNAME_LEN);
1✔
2633
  tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN);
1✔
2634
  tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN);
1✔
2635
  pInfo->dbId = pTsmaVer->dbId;
1✔
2636
  pInfo->ast = taosMemoryCalloc(1, 1);
1!
2637
  if (!pInfo->ast) {
1!
UNCOV
2638
    taosMemoryFree(pInfo);
×
UNCOV
2639
    return terrno;
×
2640
  }
2641
  *ppTsma = pInfo;
1✔
2642
  return TSDB_CODE_SUCCESS;
1✔
2643
}
2644

2645
int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp,
409✔
2646
                            int32_t *pRspLen) {
2647
  int32_t         code = -1;
409✔
2648
  STSMAHbRsp      hbRsp = {0};
409✔
2649
  int32_t         rspLen = 0;
409✔
2650
  void           *pRsp = NULL;
409✔
2651
  char            tsmaFName[TSDB_TABLE_FNAME_LEN] = {0};
409✔
2652
  STableTSMAInfo *pTsmaInfo = NULL;
409✔
2653

2654
  hbRsp.pTsmas = taosArrayInit(numOfTsmas, POINTER_BYTES);
409✔
2655
  if (!hbRsp.pTsmas) {
409!
UNCOV
2656
    code = terrno;
×
UNCOV
2657
    TAOS_RETURN(code);
×
2658
  }
2659

2660
  for (int32_t i = 0; i < numOfTsmas; ++i) {
859✔
2661
    STSMAVersion *pTsmaVer = &pTsmaVersions[i];
450✔
2662
    pTsmaVer->dbId = be64toh(pTsmaVer->dbId);
450✔
2663
    pTsmaVer->tsmaId = be64toh(pTsmaVer->tsmaId);
450✔
2664
    pTsmaVer->version = ntohl(pTsmaVer->version);
450✔
2665

2666
    snprintf(tsmaFName, sizeof(tsmaFName), "%s.%s", pTsmaVer->dbFName, pTsmaVer->name);
450✔
2667
    SSmaObj *pSma = mndAcquireSma(pMnode, tsmaFName);
450✔
2668
    if (!pSma) {
450✔
2669
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
1✔
2670
      if (code) goto _OVER;
1!
2671
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
2!
UNCOV
2672
        code = terrno;
×
UNCOV
2673
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
UNCOV
2674
        goto _OVER;
×
2675
      }
2676
      continue;
450✔
2677
    }
2678

2679
    if (pSma->uid != pTsmaVer->tsmaId) {
449!
2680
      mDebug("tsma: %s.%" PRIx64 " tsmaId mismatch with current %" PRIx64, tsmaFName, pTsmaVer->tsmaId, pSma->uid);
×
2681
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2682
      mndReleaseSma(pMnode, pSma);
×
2683
      if (code) goto _OVER;
×
UNCOV
2684
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2685
        code = terrno;
×
UNCOV
2686
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
UNCOV
2687
        goto _OVER;
×
2688
      }
UNCOV
2689
      continue;
×
2690
    } else if (pSma->version == pTsmaVer->version) {
449!
2691
      mndReleaseSma(pMnode, pSma);
449✔
2692
      continue;
449✔
2693
    }
2694

2695
    SStbObj *pDestStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
2696
    if (!pDestStb) {
×
2697
      mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName);
×
2698
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2699
      mndReleaseSma(pMnode, pSma);
×
2700
      if (code) goto _OVER;
×
UNCOV
2701
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2702
        code = terrno;
×
UNCOV
2703
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
UNCOV
2704
        goto _OVER;
×
2705
      }
2706
      continue;
×
2707
    }
2708

2709
    // dump smaObj into rsp
2710
    STableTSMAInfo *pInfo = NULL;
×
2711
    pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2712
    if (!pInfo) {
×
UNCOV
2713
      code = terrno;
×
UNCOV
2714
      mndReleaseSma(pMnode, pSma);
×
2715
      mndReleaseStb(pMnode, pDestStb);
×
2716
      goto _OVER;
×
2717
    }
2718

2719
    SSmaObj *pBaseSma = NULL;
×
2720
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma);
×
2721
    if (code == 0) code = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma);
×
2722

2723
    mndReleaseStb(pMnode, pDestStb);
×
2724
    mndReleaseSma(pMnode, pSma);
×
UNCOV
2725
    if (pBaseSma) mndReleaseSma(pMnode, pBaseSma);
×
UNCOV
2726
    if (terrno) {
×
2727
      tFreeAndClearTableTSMAInfo(pInfo);
×
2728
      goto _OVER;
×
2729
    }
2730

UNCOV
2731
    if (NULL == taosArrayPush(hbRsp.pTsmas, pInfo)) {
×
UNCOV
2732
      code = terrno;
×
UNCOV
2733
      tFreeAndClearTableTSMAInfo(pInfo);
×
UNCOV
2734
      goto _OVER;
×
2735
    }
2736
  }
2737

2738
  rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp);
409✔
2739
  if (rspLen < 0) {
409!
UNCOV
2740
    code = terrno;
×
UNCOV
2741
    goto _OVER;
×
2742
  }
2743

2744
  pRsp = taosMemoryMalloc(rspLen);
409!
2745
  if (!pRsp) {
409!
UNCOV
2746
    code = terrno;
×
UNCOV
2747
    rspLen = 0;
×
UNCOV
2748
    goto _OVER;
×
2749
  }
2750

2751
  rspLen = tSerializeTSMAHbRsp(pRsp, rspLen, &hbRsp);
409✔
2752
  if (rspLen < 0) {
409!
UNCOV
2753
    code = terrno;
×
UNCOV
2754
    goto _OVER;
×
2755
  }
2756
  code = 0;
409✔
2757
_OVER:
409✔
2758
  tFreeTSMAHbRsp(&hbRsp);
409✔
2759
  *ppRsp = pRsp;
409✔
2760
  *pRspLen = rspLen;
409✔
2761
  TAOS_RETURN(code);
409✔
2762
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc