• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3530

16 Nov 2024 07:44AM UTC coverage: 60.219% (-0.7%) from 60.888%
#3530

push

travis-ci

web-flow
Update 03-ad.md

118417 of 252124 branches covered (46.97%)

Branch coverage included in aggregate %.

198982 of 274951 relevant lines covered (72.37%)

6072359.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.7
/source/dnode/mnode/impl/src/mndSma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSma.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndIndex.h"
21
#include "mndIndexComm.h"
22
#include "mndInfoSchema.h"
23
#include "mndMnode.h"
24
#include "mndPrivilege.h"
25
#include "mndScheduler.h"
26
#include "mndShow.h"
27
#include "mndStb.h"
28
#include "mndStream.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "mndVgroup.h"
32
#include "parser.h"
33
#include "tname.h"
34
#include "functionMgt.h"
35

36
#define TSDB_SMA_VER_NUMBER   1
37
#define TSDB_SMA_RESERVE_SIZE 64
38

39
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma);
40
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
41
static int32_t  mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
42
static int32_t  mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
43
static int32_t  mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
44
static int32_t  mndProcessCreateSmaReq(SRpcMsg *pReq);
45
static int32_t  mndProcessDropSmaReq(SRpcMsg *pReq);
46
static int32_t  mndProcessGetSmaReq(SRpcMsg *pReq);
47
static int32_t  mndProcessGetTbSmaReq(SRpcMsg *pReq);
48
static int32_t  mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void     mndDestroySmaObj(SSmaObj *pSmaObj);
50

51
static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq);
52
static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq);
53

54
// sma and tag index comm func
55
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
56
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
57
static void    mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
58

59
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
60
static void    mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter);
61
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq);
62

63
typedef struct SCreateTSMACxt {
64
  SMnode *       pMnode;
65
  const SRpcMsg *pRpcReq;
66
  union {
67
    const SMCreateSmaReq *pCreateSmaReq;
68
    const SMDropSmaReq *  pDropSmaReq;
69
  };
70
  SDbObj             *pDb;
71
  SStbObj            *pSrcStb;
72
  SSmaObj            *pSma;
73
  const SSmaObj      *pBaseSma;
74
  SCMCreateStreamReq *pCreateStreamReq;
75
  SMDropStreamReq    *pDropStreamReq;
76
  const char         *streamName;
77
  const char         *targetStbFullName;
78
  SNodeList          *pProjects;
79
} SCreateTSMACxt;
80

81
int32_t mndInitSma(SMnode *pMnode) {
1,957✔
82
  SSdbTable table = {
1,957✔
83
      .sdbType = SDB_SMA,
84
      .keyType = SDB_KEY_BINARY,
85
      .encodeFp = (SdbEncodeFp)mndSmaActionEncode,
86
      .decodeFp = (SdbDecodeFp)mndSmaActionDecode,
87
      .insertFp = (SdbInsertFp)mndSmaActionInsert,
88
      .updateFp = (SdbUpdateFp)mndSmaActionUpdate,
89
      .deleteFp = (SdbDeleteFp)mndSmaActionDelete,
90
  };
91

92
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq);
1,957✔
93
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
1,957✔
94
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
1,957✔
95
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
1,957✔
96
  mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
1,957✔
97
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
1,957✔
98

99
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
1,957✔
100
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
1,957✔
101

102
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq);
1,957✔
103
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq);
1,957✔
104
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_TSMA, mndProcessGetTbTSMAReq);
1,957✔
105
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TSMA, mndProcessGetTbTSMAReq);
1,957✔
106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA);
1,957✔
107
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA);
1,957✔
108

109
  return sdbSetTable(pMnode->pSdb, table);
1,957✔
110
}
111

112
void mndCleanupSma(SMnode *pMnode) {}
1,956✔
113

114
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
1,396✔
115
  int32_t code = 0;
1,396✔
116
  int32_t lino = 0;
1,396✔
117
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,396✔
118

119
  int32_t size =
1,396✔
120
      sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
1,396✔
121
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size);
1,396✔
122
  if (pRaw == NULL) goto _OVER;
1,396!
123

124
  int32_t dataPos = 0;
1,396✔
125
  SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
1,396!
126
  SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
1,396!
127
  SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
1,396!
128
  SDB_SET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
1,396!
129
  SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER)
1,396!
130
  SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER)
1,396!
131
  SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER)
1,396!
132
  SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER)
1,396!
133
  SDB_SET_INT64(pRaw, dataPos, pSma->dstTbUid, _OVER)
1,396!
134
  SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER)
1,396!
135
  SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER)
1,396!
136
  SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER)
1,396!
137
  SDB_SET_INT32(pRaw, dataPos, pSma->dstVgId, _OVER)
1,396!
138
  SDB_SET_INT64(pRaw, dataPos, pSma->interval, _OVER)
1,396!
139
  SDB_SET_INT64(pRaw, dataPos, pSma->offset, _OVER)
1,396!
140
  SDB_SET_INT64(pRaw, dataPos, pSma->sliding, _OVER)
1,396!
141
  SDB_SET_INT32(pRaw, dataPos, pSma->exprLen, _OVER)
1,396!
142
  SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER)
1,396!
143
  SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER)
1,396!
144
  SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER)
1,396!
145
  SDB_SET_INT32(pRaw, dataPos, pSma->version, _OVER)
1,396!
146

147
  if (pSma->exprLen > 0) {
1,396!
148
    SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
1,396!
149
  }
150
  if (pSma->tagsFilterLen > 0) {
1,396!
151
    SDB_SET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
152
  }
153
  if (pSma->sqlLen > 0) {
1,396!
154
    SDB_SET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
1,396!
155
  }
156
  if (pSma->astLen > 0) {
1,396!
157
    SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
1,396!
158
  }
159
  SDB_SET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
1,396!
160

161
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
1,396!
162
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
1,396!
163

164
  terrno = 0;
1,396✔
165

166
_OVER:
1,396✔
167
  if (terrno != 0) {
1,396!
168
    mError("sma:%s, failed to encode to raw:%p since %s", pSma->name, pRaw, terrstr());
×
169
    sdbFreeRaw(pRaw);
×
170
    return NULL;
×
171
  }
172

173
  mTrace("sma:%s, encode to raw:%p, row:%p", pSma->name, pRaw, pSma);
1,396✔
174
  return pRaw;
1,396✔
175
}
176

177
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
1,088✔
178
  int32_t code = 0;
1,088✔
179
  int32_t lino = 0;
1,088✔
180
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,088✔
181
  SSdbRow *pRow = NULL;
1,088✔
182
  SSmaObj *pSma = NULL;
1,088✔
183

184
  int8_t sver = 0;
1,088✔
185
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
1,088!
186

187
  if (sver != TSDB_SMA_VER_NUMBER) {
1,088!
188
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
189
    goto _OVER;
×
190
  }
191

192
  pRow = sdbAllocRow(sizeof(SSmaObj));
1,088✔
193
  if (pRow == NULL) goto _OVER;
1,088!
194

195
  pSma = sdbGetRowObj(pRow);
1,088✔
196
  if (pSma == NULL) goto _OVER;
1,088!
197

198
  int32_t dataPos = 0;
1,088✔
199

200
  SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
1,088!
201
  SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
1,088!
202
  SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
1,088!
203
  SDB_GET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
1,088!
204
  SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER)
1,088!
205
  SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER)
1,088!
206
  SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER)
1,088!
207
  SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER)
1,088!
208
  SDB_GET_INT64(pRaw, dataPos, &pSma->dstTbUid, _OVER)
1,088!
209
  SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER)
1,088!
210
  SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER)
1,088!
211
  SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER)
1,088!
212
  SDB_GET_INT32(pRaw, dataPos, &pSma->dstVgId, _OVER)
1,088!
213
  SDB_GET_INT64(pRaw, dataPos, &pSma->interval, _OVER)
1,088!
214
  SDB_GET_INT64(pRaw, dataPos, &pSma->offset, _OVER)
1,088!
215
  SDB_GET_INT64(pRaw, dataPos, &pSma->sliding, _OVER)
1,088!
216
  SDB_GET_INT32(pRaw, dataPos, &pSma->exprLen, _OVER)
1,088!
217
  SDB_GET_INT32(pRaw, dataPos, &pSma->tagsFilterLen, _OVER)
1,088!
218
  SDB_GET_INT32(pRaw, dataPos, &pSma->sqlLen, _OVER)
1,088!
219
  SDB_GET_INT32(pRaw, dataPos, &pSma->astLen, _OVER)
1,088!
220
  SDB_GET_INT32(pRaw, dataPos, &pSma->version, _OVER)
1,088!
221

222
  if (pSma->exprLen > 0) {
1,088!
223
    pSma->expr = taosMemoryCalloc(pSma->exprLen, 1);
1,088✔
224
    if (pSma->expr == NULL) goto _OVER;
1,088!
225
    SDB_GET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
1,088!
226
  }
227

228
  if (pSma->tagsFilterLen > 0) {
1,088!
229
    pSma->tagsFilter = taosMemoryCalloc(pSma->tagsFilterLen, 1);
×
230
    if (pSma->tagsFilter == NULL) goto _OVER;
×
231
    SDB_GET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
232
  }
233

234
  if (pSma->sqlLen > 0) {
1,088!
235
    pSma->sql = taosMemoryCalloc(pSma->sqlLen, 1);
1,088✔
236
    if (pSma->sql == NULL) goto _OVER;
1,088!
237
    SDB_GET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
1,088!
238
  }
239

240
  if (pSma->astLen > 0) {
1,088!
241
    pSma->ast = taosMemoryCalloc(pSma->astLen, 1);
1,088✔
242
    if (pSma->ast == NULL) goto _OVER;
1,088!
243
    SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
1,088!
244
  }
245
  SDB_GET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
1,088!
246

247
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
1,088!
248

249
  terrno = 0;
1,088✔
250

251
_OVER:
1,088✔
252
  if (terrno != 0) {
1,088!
253
    if (pSma != NULL) {
×
254
      mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr());
×
255
      taosMemoryFreeClear(pSma->expr);
×
256
      taosMemoryFreeClear(pSma->tagsFilter);
×
257
      taosMemoryFreeClear(pSma->sql);
×
258
      taosMemoryFreeClear(pSma->ast);
×
259
    }
260
    taosMemoryFreeClear(pRow);
×
261
    return NULL;
×
262
  }
263

264
  mTrace("sma:%s, decode from raw:%p, row:%p", pSma->name, pRaw, pSma);
1,088✔
265
  return pRow;
1,088✔
266
}
267

268
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma) {
300✔
269
  mTrace("sma:%s, perform insert action, row:%p", pSma->name, pSma);
300✔
270
  return 0;
300✔
271
}
272

273
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSma) {
1,087✔
274
  mTrace("sma:%s, perform delete action, row:%p", pSma->name, pSma);
1,087✔
275
  taosMemoryFreeClear(pSma->tagsFilter);
1,087!
276
  taosMemoryFreeClear(pSma->expr);
1,087!
277
  taosMemoryFreeClear(pSma->sql);
1,087!
278
  taosMemoryFreeClear(pSma->ast);
1,087!
279
  return 0;
1,087✔
280
}
281

282
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew) {
528✔
283
  mTrace("sma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
528✔
284
  return 0;
528✔
285
}
286

287
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName) {
1,138✔
288
  SSdb    *pSdb = pMnode->pSdb;
1,138✔
289
  SSmaObj *pSma = sdbAcquire(pSdb, SDB_SMA, smaName);
1,138✔
290
  if (pSma == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
1,138!
291
    terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
×
292
  }
293
  return pSma;
1,138✔
294
}
295

296
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) {
1,569✔
297
  SSdb *pSdb = pMnode->pSdb;
1,569✔
298
  sdbRelease(pSdb, pSma);
1,569✔
299
}
1,569✔
300

301
SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *db) {
×
302

303
  return mndAcquireDb(pMnode, db);
×
304
}
305

306
static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
30✔
307
  SEncoder encoder = {0};
30✔
308
  int32_t  contLen = 0;
30✔
309
  SName    name = {0};
30✔
310
  int32_t code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
30✔
311
  if (TSDB_CODE_SUCCESS != code) {
30!
312
    return NULL;
×
313
  }
314

315
  SVCreateTSmaReq req = {0};
30✔
316
  req.version = 0;
30✔
317
  req.intervalUnit = pSma->intervalUnit;
30✔
318
  req.slidingUnit = pSma->slidingUnit;
30✔
319
  req.timezoneInt = pSma->timezone;
30✔
320
  tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
30✔
321
  req.exprLen = pSma->exprLen;
30✔
322
  req.tagsFilterLen = pSma->tagsFilterLen;
30✔
323
  req.indexUid = pSma->uid;
30✔
324
  req.tableUid = pSma->stbUid;
30✔
325
  req.dstVgId = pSma->dstVgId;
30✔
326
  req.dstTbUid = pSma->dstTbUid;
30✔
327
  req.interval = pSma->interval;
30✔
328
  req.offset = pSma->offset;
30✔
329
  req.sliding = pSma->sliding;
30✔
330
  req.expr = pSma->expr;
30✔
331
  req.tagsFilter = pSma->tagsFilter;
30✔
332
  req.schemaRow = pSma->schemaRow;
30✔
333
  req.schemaTag = pSma->schemaTag;
30✔
334
  req.dstTbName = pSma->dstTbName;
30✔
335

336
  // get length
337
  int32_t ret = 0;
30✔
338
  tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret);
30!
339
  if (ret < 0) {
30!
340
    return NULL;
×
341
  }
342
  contLen += sizeof(SMsgHead);
30✔
343

344
  SMsgHead *pHead = taosMemoryMalloc(contLen);
30✔
345
  if (pHead == NULL) {
30!
346
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
347
    return NULL;
×
348
  }
349

350
  pHead->contLen = htonl(contLen);
30✔
351
  pHead->vgId = htonl(pVgroup->vgId);
30✔
352

353
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
30✔
354
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
30✔
355
  if (tEncodeSVCreateTSmaReq(&encoder, &req) < 0) {
30!
356
    taosMemoryFreeClear(pHead);
×
357
    tEncoderClear(&encoder);
×
358
    return NULL;
×
359
  }
360

361
  tEncoderClear(&encoder);
30✔
362

363
  *pContLen = contLen;
30✔
364
  return pHead;
30✔
365
}
366

367
static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
×
368
  SEncoder encoder = {0};
×
369
  int32_t  contLen;
370
  SName    name = {0};
×
371
  int32_t code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
372
  if (TSDB_CODE_SUCCESS != code) {
×
373
    terrno = code;
×
374
    return NULL;
×
375
  }
376

377
  SVDropTSmaReq req = {0};
×
378
  req.indexUid = pSma->uid;
×
379
  tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
×
380

381
  // get length
382
  int32_t ret = 0;
×
383
  tEncodeSize(tEncodeSVDropTSmaReq, &req, contLen, ret);
×
384
  if (ret < 0) {
×
385
    return NULL;
×
386
  }
387

388
  contLen += sizeof(SMsgHead);
×
389

390
  SMsgHead *pHead = taosMemoryMalloc(contLen);
×
391
  if (pHead == NULL) {
×
392
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
393
    return NULL;
×
394
  }
395

396
  pHead->contLen = htonl(contLen);
×
397
  pHead->vgId = htonl(pVgroup->vgId);
×
398

399
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
×
400
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
×
401

402
  if (tEncodeSVDropTSmaReq(&encoder, &req) < 0) {
×
403
    taosMemoryFreeClear(pHead);
×
404
    tEncoderClear(&encoder);
×
405
    return NULL;
×
406
  }
407
  tEncoderClear(&encoder);
×
408

409
  *pContLen = contLen;
×
410
  return pHead;
×
411
}
412

413
static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
289✔
414
  int32_t  code = 0;
289✔
415
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
289✔
416
  if (pRedoRaw == NULL) {
289!
417
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
418
    if (terrno != 0) code = terrno;
×
419
    TAOS_RETURN(code);
×
420
  }
421
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
289!
422
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
289!
423

424
  TAOS_RETURN(code);
289✔
425
}
426

427
static int32_t mndSetCreateSmaUndoLogs(SMnode* pMnode, STrans* pTrans, SSmaObj* pSma) {
259✔
428
  int32_t   code = 0;
259✔
429
  SSdbRaw * pUndoRaw = mndSmaActionEncode(pSma);
259✔
430
  if (!pUndoRaw) {
259!
431
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
432
    if (terrno != 0) code = terrno;
×
433
    TAOS_RETURN(code);
×
434
  }
435
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
259!
436
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
259!
437
  TAOS_RETURN(code);
259✔
438
}
439

440
static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
289✔
441
  int32_t  code = 0;
289✔
442
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
289✔
443
  if (pCommitRaw == NULL) {
289!
444
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
445
    if (terrno != 0) code = terrno;
×
446
    TAOS_RETURN(code);
×
447
  }
448
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
289!
449
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
289!
450

451
  TAOS_RETURN(code);
289✔
452
}
453

454
static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
30✔
455
  int32_t  code = 0;
30✔
456
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
30✔
457
  if (pVgRaw == NULL) {
30!
458
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
459
    if (terrno != 0) code = terrno;
×
460
    TAOS_RETURN(code);
×
461
  }
462
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
30!
463
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_UPDATE));
30!
464
  TAOS_RETURN(code);
30✔
465
}
466

467
static int32_t mndSetCreateSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
30✔
468
  int32_t  code = 0;
30✔
469
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
30✔
470
  if (pVgRaw == NULL) {
30!
471
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
472
    if (terrno != 0) code = terrno;
×
473
    TAOS_RETURN(code);
×
474
  }
475
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
30!
476
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_READY));
30!
477
  TAOS_RETURN(code);
30✔
478
}
479

480
static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
45✔
481
  int32_t code = 0;
45✔
482
  SStbObj stbObj = {0};
45✔
483
  taosRLockLatch(&pStb->lock);
45✔
484
  memcpy(&stbObj, pStb, sizeof(SStbObj));
45✔
485
  taosRUnLockLatch(&pStb->lock);
45✔
486
  stbObj.numOfColumns = 0;
45✔
487
  stbObj.pColumns = NULL;
45✔
488
  stbObj.numOfTags = 0;
45✔
489
  stbObj.pTags = NULL;
45✔
490
  stbObj.numOfFuncs = 0;
45✔
491
  stbObj.pFuncs = NULL;
45✔
492
  stbObj.updateTime = taosGetTimestampMs();
45✔
493
  stbObj.lock = 0;
45✔
494
  stbObj.smaVer++;
45✔
495

496
  SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj);
45✔
497
  if (pCommitRaw == NULL) {
45!
498
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
499
    if (terrno != 0) code = terrno;
×
500
    TAOS_RETURN(code);
×
501
  }
502
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
45!
503
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
45!
504

505
  TAOS_RETURN(code);
45✔
506
}
507

508
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
30✔
509
                                                SSmaObj *pSma) {
510
  int32_t    code = 0;
30✔
511
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
30✔
512
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
30✔
513
  if (pDnode == NULL) {
30!
514
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
515
    if (terrno != 0) code = terrno;
×
516
    TAOS_RETURN(code);
×
517
  }
518

519
  STransAction action = {0};
30✔
520
  action.epSet = mndGetDnodeEpset(pDnode);
30✔
521
  mndReleaseDnode(pMnode, pDnode);
30✔
522

523
  // todo add sma info here
524
  SNode *pAst = NULL;
30✔
525
  TAOS_CHECK_RETURN(nodesStringToNode(pSma->ast, &pAst));
30!
526
  if ((code = qExtractResultSchema(pAst, &pSma->schemaRow.nCols, &pSma->schemaRow.pSchema)) != 0) {
30!
527
    nodesDestroyNode(pAst);
×
528
    TAOS_RETURN(code);
×
529
  }
530
  nodesDestroyNode(pAst);
30✔
531
  pSma->schemaRow.version = 1;
30✔
532

533
  // TODO: the schemaTag generated by qExtractResultXXX later.
534
  pSma->schemaTag.nCols = 1;
30✔
535
  pSma->schemaTag.version = 1;
30✔
536
  pSma->schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema));
30✔
537
  if (!pSma->schemaTag.pSchema) {
30!
538
    TAOS_RETURN(-1);
×
539
  }
540
  pSma->schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT;
30✔
541
  pSma->schemaTag.pSchema[0].bytes = TYPE_BYTES[TSDB_DATA_TYPE_BIGINT];
30✔
542
  pSma->schemaTag.pSchema[0].colId = pSma->schemaRow.nCols + PRIMARYKEY_TIMESTAMP_COL_ID;
30✔
543
  pSma->schemaTag.pSchema[0].flags = 0;
30✔
544
  snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId");
30✔
545

546
  int32_t smaContLen = 0;
30✔
547
  void   *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
30✔
548
  if (pSmaReq == NULL) {
30!
549
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
550
    if (terrno != 0) code = terrno;
×
551
    TAOS_RETURN(code);
×
552
  }
553
  pVgroup->pTsma = pSmaReq;
30✔
554

555
  int32_t contLen = 0;
30✔
556
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
30✔
557
  if (pReq == NULL) {
30!
558
    taosMemoryFreeClear(pSmaReq);
×
559
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
560
    if (terrno != 0) code = terrno;
×
561
    TAOS_RETURN(code);
×
562
  }
563

564
  action.mTraceId = pTrans->mTraceId;
30✔
565
  action.pCont = pReq;
30✔
566
  action.contLen = contLen;
30✔
567
  action.msgType = TDMT_DND_CREATE_VNODE;
30✔
568
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
30✔
569

570
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
30!
571
    taosMemoryFreeClear(pSmaReq);
×
572
    taosMemoryFree(pReq);
×
573
    TAOS_RETURN(code);
×
574
  }
575

576
  action.pCont = pSmaReq;
30✔
577
  action.contLen = smaContLen;
30✔
578
  action.msgType = TDMT_VND_CREATE_SMA;
30✔
579
  action.acceptableCode = TSDB_CODE_TSMA_ALREADY_EXIST;
30✔
580

581
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
30!
582
    taosMemoryFreeClear(pSmaReq);
×
583
    TAOS_RETURN(code);
×
584
  }
585

586
  TAOS_RETURN(code);
30✔
587
}
588

589
static void mndDestroySmaObj(SSmaObj *pSmaObj) {
30✔
590
  if (pSmaObj) {
30!
591
    taosMemoryFreeClear(pSmaObj->schemaRow.pSchema);
30!
592
    taosMemoryFreeClear(pSmaObj->schemaTag.pSchema);
30!
593
  }
594
}
30✔
595

596
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb,
31✔
597
                            const char *streamName) {
598
  int32_t code = 0;
31✔
599
  if (pDb->cfg.replications > 1) {
31✔
600
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
1✔
601
    mError("sma:%s, failed to create since not support multiple replicas", pCreate->name);
1!
602
    TAOS_RETURN(code);
1✔
603
  }
604
  SSmaObj smaObj = {0};
30✔
605
  memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
30✔
606
  memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
30✔
607
  memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
30✔
608
  smaObj.createdTime = taosGetTimestampMs();
30✔
609
  smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
30✔
610

611
  char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
30✔
612
  snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name);
30✔
613
  memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
30✔
614
  smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
30✔
615
  smaObj.stbUid = pStb->uid;
30✔
616
  smaObj.dbUid = pStb->dbUid;
30✔
617
  smaObj.intervalUnit = pCreate->intervalUnit;
30✔
618
  smaObj.slidingUnit = pCreate->slidingUnit;
30✔
619
#if 0
620
  smaObj.timezone = pCreate->timezone;
621
#endif
622
  smaObj.timezone = tsTimezone;  // use timezone of server
30✔
623
  smaObj.interval = pCreate->interval;
30✔
624
  smaObj.offset = pCreate->offset;
30✔
625
  smaObj.sliding = pCreate->sliding;
30✔
626
  smaObj.exprLen = pCreate->exprLen;
30✔
627
  smaObj.tagsFilterLen = pCreate->tagsFilterLen;
30✔
628
  smaObj.sqlLen = pCreate->sqlLen;
30✔
629
  smaObj.astLen = pCreate->astLen;
30✔
630
  if (smaObj.exprLen > 0) {
30!
631
    smaObj.expr = pCreate->expr;
30✔
632
  }
633
  if (smaObj.tagsFilterLen > 0) {
30!
634
    smaObj.tagsFilter = pCreate->tagsFilter;
×
635
  }
636
  if (smaObj.sqlLen > 0) {
30!
637
    smaObj.sql = pCreate->sql;
30✔
638
  }
639
  if (smaObj.astLen > 0) {
30!
640
    smaObj.ast = pCreate->ast;
30✔
641
  }
642

643
  SStreamObj streamObj = {0};
30✔
644
  tstrncpy(streamObj.name, streamName, TSDB_STREAM_FNAME_LEN);
30✔
645
  tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
30✔
646
  tstrncpy(streamObj.targetDb, streamObj.sourceDb, TSDB_DB_FNAME_LEN);
30✔
647
  streamObj.createTime = taosGetTimestampMs();
30✔
648
  streamObj.updateTime = streamObj.createTime;
30✔
649
  streamObj.uid = mndGenerateUid(streamName, strlen(streamName));
30✔
650
  streamObj.sourceDbUid = pDb->uid;
30✔
651
  streamObj.targetDbUid = pDb->uid;
30✔
652
  streamObj.version = 1;
30✔
653
  streamObj.sql = taosStrdup(pCreate->sql);
30✔
654
  if (!streamObj.sql) {
30!
655
    return terrno;
×
656
  }
657
  streamObj.smaId = smaObj.uid;
30✔
658
  streamObj.conf.watermark = pCreate->watermark;
30✔
659
  streamObj.deleteMark = pCreate->deleteMark;
30✔
660
  streamObj.conf.fillHistory = STREAM_FILL_HISTORY_ON;
30✔
661
  streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
30✔
662
  streamObj.conf.triggerParam = pCreate->maxDelay;
30✔
663
  streamObj.ast = taosStrdup(smaObj.ast);
30✔
664
  if (!streamObj.ast) {
30!
665
    taosMemoryFree(streamObj.sql);
×
666
    return terrno;
×
667
  }
668
  streamObj.indexForMultiAggBalance = -1;
30✔
669

670
  // check the maxDelay
671
  if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
30✔
672
    int64_t msInterval = -1;
25✔
673
    int32_t code = convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND, &msInterval);
25✔
674
    if (TSDB_CODE_SUCCESS != code) {
25!
675
      mError("sma:%s, failed to create since convert time failed: %s", smaObj.name, tstrerror(code));
×
676
      return code;
×
677
    }
678
    streamObj.conf.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
25✔
679
  }
680
  if (streamObj.conf.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
30!
681
    streamObj.conf.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
×
682
  }
683

684
  if ((code = mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg)) != 0) {
30!
685
    mError("sma:%s, failed to create since %s", smaObj.name, tstrerror(code));
×
686
    TAOS_RETURN(code);
×
687
  }
688
  smaObj.dstVgId = streamObj.fixedSinkVg.vgId;
30✔
689
  streamObj.fixedSinkVgId = smaObj.dstVgId;
30✔
690

691
  SNode *pAst = NULL;
30✔
692
  if (nodesStringToNode(streamObj.ast, &pAst) < 0) {
30!
693
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
694
    mError("sma:%s, failed to create since parse ast error", smaObj.name);
×
695
    TAOS_RETURN(code);
×
696
  }
697

698
  // extract output schema from ast
699
  if (qExtractResultSchema(pAst, (int32_t *)&streamObj.outputSchema.nCols, &streamObj.outputSchema.pSchema) != 0) {
30!
700
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
701
    mError("sma:%s, failed to create since extract result schema error", smaObj.name);
×
702
    TAOS_RETURN(code);
×
703
  }
704

705
  SQueryPlan  *pPlan = NULL;
30✔
706
  SPlanContext cxt = {
30✔
707
      .pAstRoot = pAst,
708
      .topicQuery = false,
709
      .streamQuery = true,
710
      .triggerType = streamObj.conf.trigger,
30✔
711
      .watermark = streamObj.conf.watermark,
30✔
712
      .deleteMark = streamObj.deleteMark,
30✔
713
  };
714

715
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
30!
716
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
717
    mError("sma:%s, failed to create since create query plan error", smaObj.name);
×
718
    TAOS_RETURN(code);
×
719
  }
720

721
  // save physcial plan
722
  if (nodesNodeToString((SNode *)pPlan, false, &streamObj.physicalPlan, NULL) != 0) {
30!
723
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
724
    mError("sma:%s, failed to create since save physcial plan error", smaObj.name);
×
725
    TAOS_RETURN(code);
×
726
  }
727

728
  if (pAst != NULL) nodesDestroyNode(pAst);
30!
729
  nodesDestroyNode((SNode *)pPlan);
30✔
730

731
  code = -1;
30✔
732
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-sma");
30✔
733
  if (pTrans == NULL) {
30!
734
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
735
    if (terrno != 0) code = terrno;
×
736
    goto _OVER;
×
737
  }
738
  mndTransSetDbName(pTrans, pDb->name, NULL);
30✔
739
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
30!
740

741
  mndTransSetSerial(pTrans);
30✔
742
  mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
30!
743
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
30!
744
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj), NULL, _OVER);
30!
745
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
30!
746
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj), NULL, _OVER);
30!
747
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
30!
748
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
30!
749
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj), NULL, _OVER);
30!
750
  TAOS_CHECK_GOTO(mndScheduleStream(pMnode, &streamObj, 1685959190000, NULL), NULL, _OVER);
30!
751
  TAOS_CHECK_GOTO(mndPersistStream(pTrans, &streamObj), NULL, _OVER);
30!
752
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
30!
753

754
  mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreate->name,
30!
755
        smaObj.uid, smaObj.stbUid, smaObj.dstTbUid, smaObj.dstTbName, smaObj.dstVgId);
756

757
  code = 0;
30✔
758

759
_OVER:
30✔
760
  tFreeStreamObj(&streamObj);
30✔
761
  mndDestroySmaObj(&smaObj);
30✔
762
  mndTransDrop(pTrans);
30✔
763
  TAOS_RETURN(code);
30✔
764
}
765

766
static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
306✔
767
  int32_t code = TSDB_CODE_MND_INVALID_SMA_OPTION;
306✔
768
  if (pCreate->name[0] == 0) TAOS_RETURN(code);
306!
769
  if (pCreate->stb[0] == 0) TAOS_RETURN(code);
306!
770
  if (pCreate->igExists < 0 || pCreate->igExists > 1) TAOS_RETURN(code);
306!
771
  if (pCreate->intervalUnit < 0) TAOS_RETURN(code);
306!
772
  if (pCreate->slidingUnit < 0) TAOS_RETURN(code);
306!
773
  if (pCreate->timezone < 0) TAOS_RETURN(code);
306!
774
  if (pCreate->interval < 0) TAOS_RETURN(code);
306!
775
  if (pCreate->offset < 0) TAOS_RETURN(code);
306!
776
  if (pCreate->sliding < 0) TAOS_RETURN(code);
306!
777
  if (pCreate->exprLen < 0) TAOS_RETURN(code);
306!
778
  if (pCreate->tagsFilterLen < 0) TAOS_RETURN(code);
306!
779
  if (pCreate->sqlLen < 0) TAOS_RETURN(code);
306!
780
  if (pCreate->astLen < 0) TAOS_RETURN(code);
306!
781
  if (pCreate->exprLen != 0 && strlen(pCreate->expr) + 1 != pCreate->exprLen) TAOS_RETURN(code);
306!
782
  if (pCreate->tagsFilterLen != 0 && strlen(pCreate->tagsFilter) + 1 != pCreate->tagsFilterLen) TAOS_RETURN(code);
306!
783
  if (pCreate->sqlLen != 0 && strlen(pCreate->sql) + 1 != pCreate->sqlLen) TAOS_RETURN(code);
306!
784
  if (pCreate->astLen != 0 && strlen(pCreate->ast) + 1 != pCreate->astLen) TAOS_RETURN(code);
306!
785

786
  SName smaName = {0};
306✔
787
  if (tNameFromString(&smaName, pCreate->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) < 0) return -1;
306!
788
  if (*(char *)tNameGetTableName(&smaName) == 0) return -1;
306!
789

790
  code = 0;
306✔
791
  TAOS_RETURN(code);
306✔
792
}
793

794
static int32_t mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
47✔
795
  SName n;
796
  int32_t code = tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
47✔
797
  if (TSDB_CODE_SUCCESS != code) {
47!
798
    return code;
×
799
  }
800
  sprintf(streamName, "%d.%s", n.acctId, n.tname);
47✔
801
  return TSDB_CODE_SUCCESS;
47✔
802
}
803

804
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
32✔
805
  SMnode        *pMnode = pReq->info.node;
32✔
806
  int32_t        code = -1;
32✔
807
  SStbObj       *pStb = NULL;
32✔
808
  SSmaObj       *pSma = NULL;
32✔
809
  SStreamObj    *pStream = NULL;
32✔
810
  SDbObj        *pDb = NULL;
32✔
811
  SMCreateSmaReq createReq = {0};
32✔
812

813
  int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
32✔
814

815
  TAOS_CHECK_GOTO(tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
32!
816

817
#ifdef WINDOWS
818
  terrno = TSDB_CODE_MND_INVALID_PLATFORM;
819
  goto _OVER;
820
#endif
821
  mInfo("sma:%s, start to create", createReq.name);
32!
822
  TAOS_CHECK_GOTO(mndCheckCreateSmaReq(&createReq), NULL, _OVER);
32!
823

824
  pStb = mndAcquireStb(pMnode, createReq.stb);
32✔
825
  if (pStb == NULL) {
32!
826
    mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
827
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
828
    if (terrno != 0) code = terrno;
×
829
    goto _OVER;
×
830
  }
831

832
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
32✔
833
  code = mndGetStreamNameFromSmaName(streamName, createReq.name);
32✔
834
  if (TSDB_CODE_SUCCESS != code) {
32!
835
    goto _OVER;
×
836
  }
837

838
  code = mndAcquireStream(pMnode, streamName, &pStream);
32✔
839
  if (pStream != NULL || code == 0) {
32!
840
    mError("sma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
×
841
    code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
842
    goto _OVER;
×
843
  }
844
  SSIdx idx = {0};
32✔
845
  if ((code = mndAcquireGlobalIdx(pMnode, createReq.name, SDB_SMA, &idx)) == 0) {
32✔
846
    pSma = idx.pIdx;
31✔
847
  } else {
848
    goto _OVER;
1✔
849
  }
850

851
  if (pSma != NULL) {
31!
852
    if (createReq.igExists) {
×
853
      mInfo("sma:%s, already exist in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
854
      code = 0;
×
855
      goto _OVER;
×
856
    } else {
857
      code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
858
      goto _OVER;
×
859
    }
860
  }
861

862
  SName name = {0};
31✔
863
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
31✔
864
  if (TSDB_CODE_SUCCESS != code) {
31!
865
    goto _OVER;
×
866
  }
867
  char db[TSDB_TABLE_FNAME_LEN] = {0};
31✔
868
  (void)tNameGetFullDbName(&name, db);
31✔
869

870
  pDb = mndAcquireDb(pMnode, db);
31✔
871
  if (pDb == NULL) {
31!
872
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
873
    goto _OVER;
×
874
  }
875

876
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
31!
877

878
  code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb, streamName);
31✔
879
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
31✔
880

881
_OVER:
1✔
882
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
32!
883
    mError("sma:%s, failed to create since %s", createReq.name, tstrerror(code));
2!
884
  }
885

886
  mndReleaseStb(pMnode, pStb);
32✔
887
  mndReleaseSma(pMnode, pSma);
32✔
888
  mndReleaseStream(pMnode, pStream);
32✔
889
  mndReleaseDb(pMnode, pDb);
32✔
890
  tFreeSMCreateSmaReq(&createReq);
32✔
891

892
  TAOS_RETURN(code);
32✔
893
}
894

895
static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
229✔
896
  int32_t  code = 0;
229✔
897
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
229✔
898
  if (pRedoRaw == NULL) {
229!
899
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
900
    if (terrno != 0) code = terrno;
×
901
    return -1;
×
902
  }
903
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
229!
904
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
229!
905

906
  return 0;
229✔
907
}
908

909
static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
256✔
910
  int32_t  code = 0;
256✔
911
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
256✔
912
  if (pCommitRaw == NULL) {
256!
913
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
914
    if (terrno != 0) code = terrno;
×
915
    return -1;
×
916
  }
917
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
256!
918
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
256!
919

920
  return 0;
256✔
921
}
922

923
static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
15✔
924
  int32_t  code = 0;
15✔
925
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
15✔
926
  if (pVgRaw == NULL) {
15!
927
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
928
    if (terrno != 0) code = terrno;
×
929
    return -1;
×
930
  }
931
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
15!
932
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPING));
15!
933

934
  return 0;
15✔
935
}
936

937
static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
15✔
938
  int32_t  code = 0;
15✔
939
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
15✔
940
  if (pVgRaw == NULL) {
15!
941
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
942
    if (terrno != 0) code = terrno;
×
943
    return -1;
×
944
  }
945
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
15!
946
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED));
15!
947

948
  return 0;
15✔
949
}
950

951
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
15✔
952
  int32_t    code = 0;
15✔
953
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
15✔
954
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
15✔
955
  if (pDnode == NULL) {
15!
956
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
957
    if (terrno != 0) code = terrno;
×
958
    TAOS_RETURN(code);
×
959
  }
960

961
  STransAction action = {0};
15✔
962
  action.epSet = mndGetDnodeEpset(pDnode);
15✔
963
  mndReleaseDnode(pMnode, pDnode);
15✔
964

965
  int32_t contLen = 0;
15✔
966
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
15✔
967
  if (pReq == NULL) {
15!
968
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
969
    if (terrno != 0) code = terrno;
×
970
    TAOS_RETURN(code);
×
971
  }
972

973
  action.pCont = pReq;
15✔
974
  action.contLen = contLen;
15✔
975
  action.msgType = TDMT_DND_DROP_VNODE;
15✔
976
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
15✔
977

978
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
15!
979
    taosMemoryFree(pReq);
×
980
    TAOS_RETURN(code);
×
981
  }
982

983
  TAOS_RETURN(code);
15✔
984
}
985

986
static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) {
15✔
987
  int32_t  code = -1;
15✔
988
  SVgObj  *pVgroup = NULL;
15✔
989
  SStbObj *pStb = NULL;
15✔
990
  STrans  *pTrans = NULL;
15✔
991
  SStreamObj *pStream = NULL;
15✔
992

993
  pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
15✔
994
  if (pVgroup == NULL) {
15!
995
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
996
    if (terrno != 0) code = terrno;
×
997
    goto _OVER;
×
998
  }
999

1000
  pStb = mndAcquireStb(pMnode, pSma->stb);
15✔
1001
  if (pStb == NULL) {
15!
1002
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1003
    if (terrno != 0) code = terrno;
×
1004
    goto _OVER;
×
1005
  }
1006

1007
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-sma");
15✔
1008
  if (pTrans == NULL) {
15!
1009
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1010
    if (terrno != 0) code = terrno;
×
1011
    goto _OVER;
×
1012
  }
1013

1014
  mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
15!
1015
  mndTransSetDbName(pTrans, pDb->name, NULL);
15✔
1016
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
15!
1017

1018
  mndTransSetSerial(pTrans);
15✔
1019

1020
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
15✔
1021
  code = mndGetStreamNameFromSmaName(streamName, pSma->name);
15✔
1022
  if (TSDB_CODE_SUCCESS != code) {
15!
1023
    goto _OVER;
×
1024
  }
1025

1026

1027
  code = mndAcquireStream(pMnode, streamName, &pStream);
15✔
1028
  if (pStream == NULL || pStream->smaId != pSma->uid || code != 0) {
15!
1029
    sdbRelease(pMnode->pSdb, pStream);
×
1030
    goto _OVER;
×
1031
  } else {
1032
    if ((code = mndStreamSetDropAction(pMnode, pTrans, pStream)) < 0) {
15!
1033
      mError("stream:%s, failed to drop task since %s", pStream->name, tstrerror(code));
×
1034
      sdbRelease(pMnode->pSdb, pStream);
×
1035
      goto _OVER;
×
1036
    }
1037

1038
    // drop stream
1039
    if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
15!
1040
      mError("stream:%s, failed to drop log since %s", pStream->name, tstrerror(code));
×
1041
      sdbRelease(pMnode->pSdb, pStream);
×
1042
      goto _OVER;
×
1043
    }
1044
  }
1045
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pMnode, pTrans, pSma), NULL, _OVER);
15!
1046
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
15!
1047
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
15!
1048
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
15!
1049
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
15!
1050
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
15!
1051
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
15!
1052

1053
  code = 0;
15✔
1054

1055
_OVER:
15✔
1056
  mndTransDrop(pTrans);
15✔
1057
  mndReleaseStream(pMnode, pStream);
15✔
1058
  mndReleaseVgroup(pMnode, pVgroup);
15✔
1059
  mndReleaseStb(pMnode, pStb);
15✔
1060
  TAOS_RETURN(code);
15✔
1061
}
1062

1063
int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
961✔
1064
  SSdb    *pSdb = pMnode->pSdb;
961✔
1065
  SSmaObj *pSma = NULL;
961✔
1066
  void    *pIter = NULL;
961✔
1067
  SVgObj  *pVgroup = NULL;
961✔
1068
  int32_t  code = -1;
961✔
1069

1070
  while (1) {
1071
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
1,103✔
1072
    if (pIter == NULL) break;
1,103✔
1073

1074
    if (pSma->stbUid == pStb->uid) {
142!
1075
      mndTransSetSerial(pTrans);
×
1076
      pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
1077
      if (pVgroup == NULL) {
×
1078
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1079
        if (terrno != 0) code = terrno;
×
1080
        goto _OVER;
×
1081
      }
1082

1083
      char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1084
      code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
1085
      if (TSDB_CODE_SUCCESS != code) {
×
1086
        goto _OVER;
×
1087
      }
1088

1089
      SStreamObj *pStream = NULL;
×
1090
      code = mndAcquireStream(pMnode, streamName, &pStream);
×
1091
      if ((pStream != NULL && pStream->smaId == pSma->uid) || code != 0) {
×
1092
        if ((code = mndStreamSetDropAction(pMnode, pTrans, pStream)) < 0) {
×
1093
          mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
×
1094
          mndReleaseStream(pMnode, pStream);
×
1095
          goto _OVER;
×
1096
        }
1097

1098
        if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
1099
          mndReleaseStream(pMnode, pStream);
×
1100
          goto _OVER;
×
1101
        }
1102

1103
        mndReleaseStream(pMnode, pStream);
×
1104
      }
1105

1106
      TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
1107
      TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
1108
      TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
1109
      mndReleaseVgroup(pMnode, pVgroup);
×
1110
      pVgroup = NULL;
×
1111
    }
1112

1113
    sdbRelease(pSdb, pSma);
142✔
1114
  }
1115

1116
  code = 0;
961✔
1117

1118
_OVER:
961✔
1119
  sdbCancelFetch(pSdb, pIter);
961✔
1120
  sdbRelease(pSdb, pSma);
961✔
1121
  mndReleaseVgroup(pMnode, pVgroup);
961✔
1122
  TAOS_RETURN(code);
961✔
1123
}
1124

1125
int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,893✔
1126
  int32_t code = 0;
1,893✔
1127
  SSdb *pSdb = pMnode->pSdb;
1,893✔
1128
  void *pIter = NULL;
1,893✔
1129

1130
  while (1) {
37✔
1131
    SSmaObj *pSma = NULL;
1,930✔
1132
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
1,930✔
1133
    if (pIter == NULL) break;
1,930✔
1134

1135
    if (pSma->dbUid == pDb->uid) {
37✔
1136
      if ((code = mndSetDropSmaCommitLogs(pMnode, pTrans, pSma)) != 0) {
27!
1137
        sdbRelease(pSdb, pSma);
×
1138
        sdbCancelFetch(pSdb, pSma);
×
1139
        TAOS_RETURN(code);
×
1140
      }
1141
    }
1142

1143
    sdbRelease(pSdb, pSma);
37✔
1144
  }
1145

1146
  TAOS_RETURN(code);
1,893✔
1147
}
1148

1149
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
37✔
1150
  SMnode      *pMnode = pReq->info.node;
37✔
1151
  int32_t      code = -1;
37✔
1152
  SDbObj      *pDb = NULL;
37✔
1153
  SSmaObj     *pSma = NULL;
37✔
1154
  SMDropSmaReq dropReq = {0};
37✔
1155

1156
  TAOS_CHECK_GOTO(tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
37!
1157

1158
  mInfo("sma:%s, start to drop", dropReq.name);
37!
1159

1160
  SSIdx idx = {0};
37✔
1161
  if ((code = mndAcquireGlobalIdx(pMnode, dropReq.name, SDB_SMA, &idx)) == 0) {
37✔
1162
    pSma = idx.pIdx;
18✔
1163
  } else {
1164
    goto _OVER;
19✔
1165
  }
1166
  if (pSma == NULL) {
18✔
1167
    if (dropReq.igNotExists) {
3!
1168
      mInfo("sma:%s, not exist, ignore not exist is set", dropReq.name);
×
1169
      code = 0;
×
1170
      goto _OVER;
×
1171
    } else {
1172
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
3✔
1173
      goto _OVER;
3✔
1174
    }
1175
  }
1176

1177
  SName name = {0};
15✔
1178
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
15✔
1179
  if (TSDB_CODE_SUCCESS != code) {
15!
1180
    goto _OVER;
×
1181
  }
1182
  char db[TSDB_TABLE_FNAME_LEN] = {0};
15✔
1183
  (void)tNameGetFullDbName(&name, db);
15✔
1184

1185
  pDb = mndAcquireDb(pMnode, db);
15✔
1186
  if (pDb == NULL) {
15!
1187
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1188
    goto _OVER;
×
1189
  }
1190

1191
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
15!
1192

1193
  code = mndDropSma(pMnode, pReq, pDb, pSma);
15✔
1194
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
15!
1195

1196
_OVER:
×
1197
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
37!
1198
    mError("sma:%s, failed to drop since %s", dropReq.name, tstrerror(code));
22!
1199
  }
1200

1201
  mndReleaseSma(pMnode, pSma);
37✔
1202
  mndReleaseDb(pMnode, pDb);
37✔
1203
  TAOS_RETURN(code);
37✔
1204
}
1205

1206
static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
×
1207
  int32_t  code = -1;
×
1208
  SSmaObj *pSma = NULL;
×
1209

1210
  SSIdx idx = {0};
×
1211
  if (0 == mndAcquireGlobalIdx(pMnode, indexReq->indexFName, SDB_SMA, &idx)) {
×
1212
    pSma = idx.pIdx;
×
1213
  } else {
1214
    *exist = false;
×
1215
    return 0;
×
1216
  }
1217

1218
  if (pSma == NULL) {
×
1219
    *exist = false;
×
1220
    return 0;
×
1221
  }
1222

1223
  memcpy(rsp->dbFName, pSma->db, sizeof(pSma->db));
×
1224
  memcpy(rsp->tblFName, pSma->stb, sizeof(pSma->stb));
×
1225
  strcpy(rsp->indexType, TSDB_INDEX_TYPE_SMA);
×
1226

1227
  SNodeList *pList = NULL;
×
1228
  int32_t    extOffset = 0;
×
1229
  code = nodesStringToList(pSma->expr, &pList);
×
1230
  if (0 == code) {
×
1231
    SNode *node = NULL;
×
1232
    FOREACH(node, pList) {
×
1233
      SFunctionNode *pFunc = (SFunctionNode *)node;
×
1234
      extOffset += tsnprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
×
1235
                            (extOffset ? "," : ""), pFunc->functionName);
×
1236
    }
1237

1238
    *exist = true;
×
1239
  }
1240

1241
  mndReleaseSma(pMnode, pSma);
×
1242
  return code;
×
1243
}
1244

1245
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) {
9,112✔
1246
  int32_t         code = 0;
9,112✔
1247
  SSmaObj        *pSma = NULL;
9,112✔
1248
  SSdb           *pSdb = pMnode->pSdb;
9,112✔
1249
  void           *pIter = NULL;
9,112✔
1250
  STableIndexInfo info;
1251

1252
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
9,112✔
1253
  if (NULL == pStb) {
9,112✔
1254
    *exist = false;
5,826✔
1255
    return TSDB_CODE_SUCCESS;
5,826✔
1256
  }
1257

1258
  strcpy(rsp->dbFName, pStb->db);
3,286✔
1259
  strcpy(rsp->tbName, pStb->name + strlen(pStb->db) + 1);
3,286✔
1260
  rsp->suid = pStb->uid;
3,286✔
1261
  rsp->version = pStb->smaVer;
3,286✔
1262
  mndReleaseStb(pMnode, pStb);
3,286✔
1263

1264
  while (1) {
2,129✔
1265
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
5,415✔
1266
    if (pIter == NULL) break;
5,415✔
1267

1268
    if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
5,403!
1269
      sdbRelease(pSdb, pSma);
2,126✔
1270
      continue;
2,126✔
1271
    }
1272

1273
    info.intervalUnit = pSma->intervalUnit;
3,277✔
1274
    info.slidingUnit = pSma->slidingUnit;
3,277✔
1275
    info.interval = pSma->interval;
3,277✔
1276
    info.offset = pSma->offset;
3,277✔
1277
    info.sliding = pSma->sliding;
3,277✔
1278
    info.dstTbUid = pSma->dstTbUid;
3,277✔
1279
    info.dstVgId = pSma->dstVgId;
3,277✔
1280

1281
    SVgObj *pVg = mndAcquireVgroup(pMnode, pSma->dstVgId);
3,277✔
1282
    if (pVg == NULL) {
3,277✔
1283
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
3,274✔
1284
      if (terrno != 0) code = terrno;
3,274!
1285
      sdbRelease(pSdb, pSma);
3,274✔
1286
      sdbCancelFetch(pSdb, pIter);
3,274✔
1287
      return code;
3,274✔
1288
    }
1289
    info.epSet = mndGetVgroupEpset(pMnode, pVg);
3✔
1290

1291
    info.expr = taosMemoryMalloc(pSma->exprLen + 1);
3✔
1292
    if (info.expr == NULL) {
3!
1293
      code = terrno;
×
1294
      sdbRelease(pSdb, pSma);
×
1295
      sdbCancelFetch(pSdb, pIter);
×
1296
      return code;
×
1297
    }
1298

1299
    memcpy(info.expr, pSma->expr, pSma->exprLen);
3✔
1300
    info.expr[pSma->exprLen] = 0;
3✔
1301

1302
    if (NULL == taosArrayPush(rsp->pIndex, &info)) {
6!
1303
      code = terrno;
×
1304
      taosMemoryFree(info.expr);
×
1305
      sdbRelease(pSdb, pSma);
×
1306
      sdbCancelFetch(pSdb, pIter);
×
1307
      return code;
×
1308
    }
1309

1310
    rsp->indexSize += sizeof(info) + pSma->exprLen + 1;
3✔
1311
    *exist = true;
3✔
1312

1313
    sdbRelease(pSdb, pSma);
3✔
1314
  }
1315

1316
  TAOS_RETURN(code);
12✔
1317
}
1318

1319
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
×
1320
  SUserIndexReq indexReq = {0};
×
1321
  SMnode       *pMnode = pReq->info.node;
×
1322
  int32_t       code = -1;
×
1323
  SUserIndexRsp rsp = {0};
×
1324
  bool          exist = false;
×
1325

1326
  TAOS_CHECK_GOTO(tDeserializeSUserIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
1327

1328
  code = mndGetSma(pMnode, &indexReq, &rsp, &exist);
×
1329
  if (code) {
×
1330
    goto _OVER;
×
1331
  }
1332

1333
  if (!exist) {
×
1334
    // TODO GET INDEX FROM FULLTEXT
1335
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
1336
  } else {
1337
    int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
×
1338
    void   *pRsp = rpcMallocCont(contLen);
×
1339
    if (pRsp == NULL) {
×
1340
      code = terrno;
×
1341
      goto _OVER;
×
1342
    }
1343

1344
    contLen = tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
×
1345
    if (contLen < 0) {
×
1346
      code = terrno;
×
1347
      goto _OVER;
×
1348
    }
1349

1350
    pReq->info.rsp = pRsp;
×
1351
    pReq->info.rspLen = contLen;
×
1352

1353
    code = 0;
×
1354
  }
1355

1356
_OVER:
×
1357
  if (code != 0) {
×
1358
    mError("failed to get index %s since %s", indexReq.indexFName, tstrerror(code));
×
1359
  }
1360

1361
  TAOS_RETURN(code);
×
1362
}
1363

1364
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
9,112✔
1365
  STableIndexReq indexReq = {0};
9,112✔
1366
  SMnode        *pMnode = pReq->info.node;
9,112✔
1367
  int32_t        code = -1;
9,112✔
1368
  STableIndexRsp rsp = {0};
9,112✔
1369
  bool           exist = false;
9,112✔
1370

1371
  TAOS_CHECK_GOTO(tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
9,112!
1372

1373
  rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
9,112✔
1374
  if (NULL == rsp.pIndex) {
9,112!
1375
    code = terrno;
×
1376
    goto _OVER;
×
1377
  }
1378

1379
  code = mndGetTableSma(pMnode, indexReq.tbFName, &rsp, &exist);
9,112✔
1380
  if (code) {
9,112✔
1381
    goto _OVER;
3,274✔
1382
  }
1383

1384
  if (!exist) {
5,838✔
1385
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
5,836✔
1386
  } else {
1387
    int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp);
2✔
1388
    void   *pRsp = rpcMallocCont(contLen);
2✔
1389
    if (pRsp == NULL) {
2!
1390
      code = terrno;
×
1391
      goto _OVER;
×
1392
    }
1393

1394
    contLen = tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
2✔
1395
    if (contLen < 0) {
2!
1396
      code = terrno;
×
1397
      goto _OVER;
×
1398
    }
1399

1400
    pReq->info.rsp = pRsp;
2✔
1401
    pReq->info.rspLen = contLen;
2✔
1402

1403
    code = 0;
2✔
1404
  }
1405

1406
_OVER:
9,112✔
1407
  if (code != 0) {
9,112✔
1408
    mError("failed to get table index %s since %s", indexReq.tbFName, tstrerror(code));
9,110!
1409
  }
1410

1411
  tFreeSerializeSTableIndexRsp(&rsp);
9,112✔
1412
  TAOS_RETURN(code);
9,112✔
1413
}
1414

1415
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
47✔
1416
  SMnode  *pMnode = pReq->info.node;
47✔
1417
  SSdb    *pSdb = pMnode->pSdb;
47✔
1418
  int32_t  numOfRows = 0;
47✔
1419
  SSmaObj *pSma = NULL;
47✔
1420
  int32_t  cols = 0;
47✔
1421
  int32_t  code = 0;
47✔
1422

1423
  SDbObj *pDb = NULL;
47✔
1424
  if (strlen(pShow->db) > 0) {
47✔
1425
    pDb = mndAcquireDb(pMnode, pShow->db);
27✔
1426
    if (pDb == NULL) return 0;
27!
1427
  }
1428
  SSmaAndTagIter *pIter = pShow->pIter;
47✔
1429
  while (numOfRows < rows) {
55!
1430
    pIter->pSmaIter = sdbFetch(pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
55✔
1431
    if (pIter->pSmaIter == NULL) break;
55✔
1432

1433
    if (NULL != pDb && pSma->dbUid != pDb->uid) {
8!
1434
      sdbRelease(pSdb, pSma);
×
1435
      continue;
×
1436
    }
1437

1438
    cols = 0;
8✔
1439

1440
    SName smaName = {0};
8✔
1441
    SName stbName = {0};
8✔
1442
    char n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
1443
    char n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
1444
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
8✔
1445
    char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
1446
    if (TSDB_CODE_SUCCESS == code) {
8!
1447
      STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
8✔
1448
      STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db));
8✔
1449
      code = tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
8✔
1450
    }
1451
    SColumnInfoData* pColInfo = NULL;
8✔
1452
    if (TSDB_CODE_SUCCESS == code) {
8!
1453
      STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
8✔
1454

1455
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1456
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
8✔
1457
    }
1458
    if (TSDB_CODE_SUCCESS == code) {
8!
1459
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1460
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
8✔
1461
    }
1462
    if (TSDB_CODE_SUCCESS == code) {
8!
1463
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1464
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
8✔
1465
    }
1466
    if (TSDB_CODE_SUCCESS == code) {
8!
1467
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1468
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
8✔
1469
    }
1470
    if (TSDB_CODE_SUCCESS == code) {
8!
1471
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1472
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
8✔
1473
    }
1474

1475
    char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
1476
    STR_TO_VARSTR(col, (char *)"");
8✔
1477

1478
    if (TSDB_CODE_SUCCESS == code) {
8!
1479
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1480
      code = colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
8✔
1481
    }
1482

1483
    if (TSDB_CODE_SUCCESS == code) {
8!
1484
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1485

1486
      char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
1487
      STR_TO_VARSTR(tag, (char *)"sma_index");
8✔
1488
      code = colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
8✔
1489
    }
1490

1491
    numOfRows++;
8✔
1492
    sdbRelease(pSdb, pSma);
8✔
1493
    if (TSDB_CODE_SUCCESS != code) {
8!
1494
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
1495
      numOfRows = -1;
×
1496
      break;
×
1497
    }
1498
  }
1499

1500
  mndReleaseDb(pMnode, pDb);
47✔
1501
  pShow->numOfRows += numOfRows;
47✔
1502
  return numOfRows;
47✔
1503
}
1504

1505
// sma and tag index comm func
1506
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
37✔
1507
  int ret = mndProcessDropSmaReq(pReq);
37✔
1508
  if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST) {
37✔
1509
    terrno = 0;
19✔
1510
    ret = mndProcessDropTagIdxReq(pReq);
19✔
1511
  }
1512
  return ret;
37✔
1513
}
1514

1515
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
47✔
1516
  if (pShow->pIter == NULL) {
47!
1517
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
47✔
1518
  }
1519
  if (!pShow->pIter) {
47!
1520
    return terrno;
×
1521
  }
1522
  int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
47✔
1523
  if (read < rows) {
47!
1524
    read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read);
47✔
1525
  }
1526
  // no more to read
1527
  if (read < rows) {
47!
1528
    taosMemoryFree(pShow->pIter);
47✔
1529
    pShow->pIter = NULL;
47✔
1530
  }
1531
  return read;
47✔
1532
}
1533
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
×
1534
  SSmaAndTagIter *p = pIter;
×
1535
  if (p != NULL) {
×
1536
    SSdb *pSdb = pMnode->pSdb;
×
1537
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
1538
    sdbCancelFetchByType(pSdb, p->pIdxIter, SDB_IDX);
×
1539
  }
1540
  taosMemoryFree(p);
×
1541
}
×
1542

1543
static void initSMAObj(SCreateTSMACxt* pCxt) {
259✔
1544
  memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
259✔
1545
  memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
259✔
1546
  memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
259✔
1547
  if (pCxt->pBaseSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pBaseSma->name, TSDB_TABLE_FNAME_LEN);
259✔
1548
  pCxt->pSma->createdTime = taosGetTimestampMs();
259✔
1549
  pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
259✔
1550

1551
  memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
259✔
1552
  pCxt->pSma->dstTbUid = 0; // not used
259✔
1553
  pCxt->pSma->stbUid = pCxt->pSrcStb ? pCxt->pSrcStb->uid : pCxt->pCreateSmaReq->normSourceTbUid;
259✔
1554
  pCxt->pSma->dbUid = pCxt->pDb->uid;
259✔
1555
  pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
259✔
1556
  pCxt->pSma->intervalUnit = pCxt->pCreateSmaReq->intervalUnit;
259✔
1557
  pCxt->pSma->timezone = tsTimezone;
259✔
1558
  pCxt->pSma->version = 1;
259✔
1559

1560
  pCxt->pSma->exprLen = pCxt->pCreateSmaReq->exprLen;
259✔
1561
  pCxt->pSma->sqlLen = pCxt->pCreateSmaReq->sqlLen;
259✔
1562
  pCxt->pSma->astLen = pCxt->pCreateSmaReq->astLen;
259✔
1563
  pCxt->pSma->expr = pCxt->pCreateSmaReq->expr;
259✔
1564
  pCxt->pSma->sql = pCxt->pCreateSmaReq->sql;
259✔
1565
  pCxt->pSma->ast = pCxt->pCreateSmaReq->ast;
259✔
1566
}
259✔
1567

1568
static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
259✔
1569
  tstrncpy(pCxt->pCreateStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
259✔
1570
  tstrncpy(pCxt->pCreateStreamReq->sourceDB, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
259✔
1571
  tstrncpy(pCxt->pCreateStreamReq->targetStbFullName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
259✔
1572
  pCxt->pCreateStreamReq->igExists = false;
259✔
1573
  pCxt->pCreateStreamReq->triggerType = STREAM_TRIGGER_MAX_DELAY;
259✔
1574
  pCxt->pCreateStreamReq->igExpired = false;
259✔
1575
  pCxt->pCreateStreamReq->fillHistory = STREAM_FILL_HISTORY_ON;
259✔
1576
  pCxt->pCreateStreamReq->maxDelay = 10000;
259✔
1577
  pCxt->pCreateStreamReq->watermark = 0;
259✔
1578
  pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb ? pCxt->pSrcStb->numOfTags + 1 : 1;
259✔
1579
  pCxt->pCreateStreamReq->checkpointFreq = 0;
259✔
1580
  pCxt->pCreateStreamReq->createStb = 1;
259✔
1581
  pCxt->pCreateStreamReq->targetStbUid = 0;
259✔
1582
  pCxt->pCreateStreamReq->fillNullCols = NULL;
259✔
1583
  pCxt->pCreateStreamReq->igUpdate = 0;
259✔
1584
  pCxt->pCreateStreamReq->deleteMark = pCxt->pCreateSmaReq->deleteMark;
259✔
1585
  pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
259✔
1586
  pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid;
259✔
1587
  pCxt->pCreateStreamReq->ast = taosStrdup(pCxt->pCreateSmaReq->ast);
259✔
1588
  if (!pCxt->pCreateStreamReq->ast) {
259!
1589
    return terrno;
×
1590
  }
1591
  pCxt->pCreateStreamReq->sql = taosStrdup(pCxt->pCreateSmaReq->sql);
259✔
1592
  if (!pCxt->pCreateStreamReq->sql) {
259!
1593
    return terrno;
×
1594
  }
1595

1596
  // construct tags
1597
  pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pCreateStreamReq->numOfTags, sizeof(SField));
259✔
1598
  if (!pCxt->pCreateStreamReq->pTags) {
259!
1599
    return terrno;
×
1600
  }
1601
  SField f = {0};
259✔
1602
  int32_t code = 0;
259✔
1603
  if (pCxt->pSrcStb) {
259✔
1604
    for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) {
1,544✔
1605
      SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
1,302✔
1606
      f.bytes = pSchema->bytes;
1,302✔
1607
      f.type = pSchema->type;
1,302✔
1608
      f.flags = pSchema->flags;
1,302✔
1609
      tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN);
1,302✔
1610
      if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
2,604!
1611
        code = terrno;
×
1612
        break;
×
1613
      }
1614
    }
1615
  }
1616

1617
  if (TSDB_CODE_SUCCESS == code) {
259!
1618
    f.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
259✔
1619
    f.flags = COL_SMA_ON;
259✔
1620
    f.type = TSDB_DATA_TYPE_BINARY;
259✔
1621
    tstrncpy(f.name, "tbname", strlen("tbname") + 1);
259✔
1622
    if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
518!
1623
      code = terrno;
×
1624
    }
1625
  }
1626

1627
  if (TSDB_CODE_SUCCESS == code) {
259!
1628
    // construct output cols
1629
    SNode* pNode;
1630
    FOREACH(pNode, pCxt->pProjects) {
42,704!
1631
      SExprNode* pExprNode = (SExprNode*)pNode;
42,445✔
1632
      f.bytes = pExprNode->resType.bytes;
42,445✔
1633
      f.type = pExprNode->resType.type;
42,445✔
1634
      f.flags = COL_SMA_ON;
42,445✔
1635
      strcpy(f.name, pExprNode->userAlias);
42,445✔
1636
      if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pCols, &f)) {
84,890!
1637
        code = terrno;
×
1638
        break;
×
1639
      }
1640
    }
1641
  }
1642
  return code;
259✔
1643
}
1644

1645
static int32_t mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) {
473✔
1646
  tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
473✔
1647
  pCxt->pDropStreamReq->igNotExists = false;
473✔
1648
  pCxt->pDropStreamReq->sql = taosStrdup(pCxt->pDropSmaReq->name);
473✔
1649
  if (!pCxt->pDropStreamReq->sql) {
473!
1650
    return terrno;
×
1651
  }
1652
  pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
473✔
1653
  return TSDB_CODE_SUCCESS;
473✔
1654
}
1655

1656
static int32_t mndSetUpdateDbTsmaVersionPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
473✔
1657
  int32_t  code = 0;
473✔
1658
  SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
473✔
1659
  if (pRedoRaw == NULL) {
473!
1660
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1661
    if (terrno != 0) code = terrno;
×
1662
    TAOS_RETURN(code);
×
1663
  }
1664
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
473!
1665
    sdbFreeRaw(pRedoRaw);
×
1666
    TAOS_RETURN(code);
×
1667
  }
1668

1669
  TAOS_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
473✔
1670
}
1671

1672
static int32_t mndSetUpdateDbTsmaVersionCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
473✔
1673
  int32_t  code = 0;
473✔
1674
  SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
473✔
1675
  if (pCommitRaw == NULL) {
473!
1676
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1677
    if (terrno != 0) code = terrno;
×
1678
    TAOS_RETURN(code);
×
1679
  }
1680
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
473!
1681
    sdbFreeRaw(pCommitRaw);
×
1682
    TAOS_RETURN(code);
×
1683
  }
1684

1685
  TAOS_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
473✔
1686
}
1687

1688
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
259✔
1689
  int32_t      code = -1;
259✔
1690
  STransAction createStreamRedoAction = {0};
259✔
1691
  STransAction createStreamUndoAction = {0};
259✔
1692
  STransAction dropStbUndoAction = {0};
259✔
1693
  SMDropStbReq dropStbReq = {0};
259✔
1694
  STrans      *pTrans =
1695
      mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
259✔
1696
  if (!pTrans) {
259!
1697
    code = terrno;
×
1698
    goto _OVER;
×
1699
  }
1700
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
259✔
1701
  TAOS_CHECK_GOTO(mndTransCheckConflict(pCxt->pMnode, pTrans), NULL, _OVER);
259!
1702

1703
  mndTransSetSerial(pTrans);
259✔
1704
  mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name,
259!
1705
        pCxt->pCreateStreamReq->name);
1706

1707
  mndGetMnodeEpSet(pCxt->pMnode, &createStreamRedoAction.epSet);
259✔
1708
  createStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
259✔
1709
  createStreamRedoAction.msgType = TDMT_STREAM_CREATE;
259✔
1710
  createStreamRedoAction.contLen = tSerializeSCMCreateStreamReq(0, 0, pCxt->pCreateStreamReq);
259✔
1711
  createStreamRedoAction.pCont = taosMemoryCalloc(1, createStreamRedoAction.contLen);
259✔
1712
  if (!createStreamRedoAction.pCont) {
259!
1713
    code = terrno;
×
1714
    goto _OVER;
×
1715
  }
1716
  if (createStreamRedoAction.contLen != tSerializeSCMCreateStreamReq(createStreamRedoAction.pCont, createStreamRedoAction.contLen, pCxt->pCreateStreamReq)) {
259!
1717
    mError("sma: %s, failed to create due to create stream req encode failure", pCxt->pCreateSmaReq->name);
×
1718
    code = TSDB_CODE_INVALID_MSG;
×
1719
    goto _OVER;
×
1720
  }
1721

1722
  createStreamUndoAction.epSet = createStreamRedoAction.epSet;
259✔
1723
  createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
259✔
1724
  createStreamUndoAction.msgType = TDMT_STREAM_DROP;
259✔
1725
  createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
259✔
1726
  createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
259✔
1727
  if (!createStreamUndoAction.pCont) {
259!
1728
    code = terrno;
×
1729
    goto _OVER;
×
1730
  }
1731
  if (createStreamUndoAction.contLen != tSerializeSMDropStreamReq(createStreamUndoAction.pCont, createStreamUndoAction.contLen, pCxt->pDropStreamReq)) {
259!
1732
    mError("sma: %s, failed to create due to drop stream req encode failure", pCxt->pCreateSmaReq->name);
×
1733
    code = TSDB_CODE_INVALID_MSG;
×
1734
    goto _OVER;
×
1735
  }
1736

1737
  dropStbReq.igNotExists = true;
259✔
1738
  strncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
259✔
1739
  dropStbUndoAction.epSet = createStreamRedoAction.epSet;
259✔
1740
  dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
259✔
1741
  dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
259✔
1742
  dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
259✔
1743
  dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
259✔
1744
  dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
259✔
1745
  if (!dropStbUndoAction.pCont) {
259!
1746
    code = terrno;
×
1747
    goto _OVER;
×
1748
  }
1749
  if (dropStbUndoAction.contLen != tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
259!
1750
    mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
×
1751
    code = TSDB_CODE_INVALID_MSG;
×
1752
    goto _OVER;
×
1753
  }
1754

1755
  SDbObj newDb = {0};
259✔
1756
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
259✔
1757
  newDb.tsmaVersion++;
259✔
1758
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
259!
1759
  TAOS_CHECK_GOTO(mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
259!
1760
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
259!
1761
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &createStreamRedoAction), NULL, _OVER);
259!
1762
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &createStreamUndoAction), NULL, _OVER);
259!
1763
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &dropStbUndoAction), NULL, _OVER);
259!
1764
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
259!
1765
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
259!
1766
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
259!
1767

1768
  code = TSDB_CODE_SUCCESS;
259✔
1769

1770
_OVER:
259✔
1771
  mndTransDrop(pTrans);
259✔
1772
  TAOS_RETURN(code);
259✔
1773
}
1774

1775
static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
259✔
1776
  int32_t            code = 0;
259✔
1777
  SSmaObj            sma = {0};
259✔
1778
  SCMCreateStreamReq createStreamReq = {0};
259✔
1779
  SMDropStreamReq    dropStreamReq = {0};
259✔
1780

1781
  pCxt->pSma = &sma;
259✔
1782
  initSMAObj(pCxt);
259✔
1783

1784
  SNodeList* pProjects = NULL;
259✔
1785
  code = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects);
259✔
1786
  if (TSDB_CODE_SUCCESS != code) {
259!
1787
    goto _OVER;
×
1788
  }
1789
  pCxt->pProjects = pProjects;
259✔
1790

1791
  pCxt->pCreateStreamReq = &createStreamReq;
259✔
1792
  if (pCxt->pCreateSmaReq->pVgroupVerList) {
259✔
1793
    pCxt->pCreateStreamReq->pVgroupVerList = taosArrayDup(pCxt->pCreateSmaReq->pVgroupVerList, NULL);
230✔
1794
    if (!pCxt->pCreateStreamReq->pVgroupVerList) {
230!
1795
      code = terrno;
×
1796
      goto _OVER;
×
1797
    }
1798
  }
1799
  if (LIST_LENGTH(pProjects) > 0) {
259!
1800
    createStreamReq.pCols = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SField));
259!
1801
    if (!createStreamReq.pCols) {
259!
1802
      code = terrno;
×
1803
      goto _OVER;
×
1804
    }
1805
  }
1806
  pCxt->pDropStreamReq = &dropStreamReq;
259✔
1807
  code = mndCreateTSMABuildCreateStreamReq(pCxt);
259✔
1808
  if (TSDB_CODE_SUCCESS != code) {
259!
1809
    goto _OVER;
×
1810
  }
1811
  code = mndCreateTSMABuildDropStreamReq(pCxt);
259✔
1812
  if (TSDB_CODE_SUCCESS != code) {
259!
1813
    goto _OVER;
×
1814
  }
1815

1816
  if (TSDB_CODE_SUCCESS != (code = mndCreateTSMATxnPrepare(pCxt))) {
259!
1817
    goto _OVER;
×
1818
  } else {
1819
    mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 " dstTb:%s dstVg:%d", pCxt->pCreateSmaReq->name, sma.uid,
259!
1820
          sma.stbUid, sma.dstTbName, sma.dstVgId);
1821
    code = 0;
259✔
1822
  }
1823

1824
_OVER:
259✔
1825
  tFreeSCMCreateStreamReq(pCxt->pCreateStreamReq);
259✔
1826
  if (pCxt->pDropStreamReq) tFreeMDropStreamReq(pCxt->pDropStreamReq);
259!
1827
  pCxt->pCreateStreamReq = NULL;
259✔
1828
  if (pProjects) nodesDestroyList(pProjects);
259!
1829
  pCxt->pProjects = NULL;
259✔
1830
  TAOS_RETURN(code);
259✔
1831
}
1832

1833
static int32_t mndTSMAGenerateOutputName(const char* tsmaName, char* streamName, char* targetStbName) {
493✔
1834
  SName smaName;
1835
  int32_t code = tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
493✔
1836
  if (TSDB_CODE_SUCCESS != code) {
493!
1837
    return code;
×
1838
  }
1839
  sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
493✔
1840
  snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s"TSMA_RES_STB_POSTFIX, tsmaName);
493✔
1841
  return TSDB_CODE_SUCCESS;
493✔
1842
}
1843

1844
static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
274✔
1845
#ifdef WINDOWS
1846
  TAOS_RETURN(TSDB_CODE_MND_INVALID_PLATFORM);
1847
#endif
1848
  SMnode *       pMnode = pReq->info.node;
274✔
1849
  int32_t        code = -1;
274✔
1850
  SDbObj *       pDb = NULL;
274✔
1851
  SStbObj *      pStb = NULL;
274✔
1852
  SSmaObj *      pSma = NULL;
274✔
1853
  SSmaObj *      pBaseTsma = NULL;
274✔
1854
  SStreamObj *   pStream = NULL;
274✔
1855
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
274✔
1856
  SMCreateSmaReq createReq = {0};
274✔
1857

1858
  if (sdbGetSize(pMnode->pSdb, SDB_SMA) >= tsMaxTsmaNum) {
274!
1859
    code = TSDB_CODE_MND_MAX_TSMA_NUM_EXCEEDED;
×
1860
    goto _OVER;
×
1861
  }
1862

1863
  if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
274!
1864
    code = TSDB_CODE_INVALID_MSG;
×
1865
    goto _OVER;
×
1866
  }
1867

1868
  mInfo("start to create tsma: %s", createReq.name);
274!
1869
  if ((code = mndCheckCreateSmaReq(&createReq)) != 0) goto _OVER;
274!
1870

1871
  if (createReq.normSourceTbUid == 0) {
274✔
1872
    pStb = mndAcquireStb(pMnode, createReq.stb);
257✔
1873
    if (!pStb && !createReq.recursiveTsma) {
257!
1874
      mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
1875
      code = TSDB_CODE_MND_STB_NOT_EXIST;
×
1876
      goto _OVER;
×
1877
    }
1878
  }
1879

1880
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
274✔
1881
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
274✔
1882
  code = mndTSMAGenerateOutputName(createReq.name, streamName, streamTargetStbFullName);
274✔
1883
  if (TSDB_CODE_SUCCESS != code) {
274!
1884
    mInfo("tsma:%s, faield to generate name", createReq.name);
×
1885
    goto _OVER;
×
1886
  }
1887

1888
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.name);
274✔
1889
  if (pSma && createReq.igExists) {
274!
1890
    mInfo("tsma:%s, already exists in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
1891
    code = 0;
×
1892
    goto _OVER;
×
1893
  }
1894

1895
  if (pSma) {
274✔
1896
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
10✔
1897
    goto _OVER;
10✔
1898
  }
1899

1900
  SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName);
264✔
1901
  if (pTargetStb) {
264!
1902
    code = TSDB_CODE_TDB_STB_ALREADY_EXIST;
×
1903
    mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name,
×
1904
           streamTargetStbFullName);
1905
    goto _OVER;
×
1906
  }
1907

1908
  code = mndAcquireStream(pMnode, streamName, &pStream);
264✔
1909
  if (pStream != NULL || code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
264!
1910
    mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
5!
1911
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
5✔
1912
    goto _OVER;
5✔
1913
  }
1914

1915
  SName name = {0};
259✔
1916
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
259✔
1917
  if (TSDB_CODE_SUCCESS != code) {
259!
1918
    goto _OVER;
×
1919
  }
1920
  char db[TSDB_TABLE_FNAME_LEN] = {0};
259✔
1921
  (void)tNameGetFullDbName(&name, db);
259✔
1922

1923
  pDb = mndAcquireDb(pMnode, db);
259✔
1924
  if (pDb == NULL) {
259!
1925
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1926
    goto _OVER;
×
1927
  }
1928

1929
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
259!
1930

1931
  if (createReq.recursiveTsma) {
259✔
1932
    pBaseTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName);
85✔
1933
    if (!pBaseTsma) {
85!
1934
      mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName);
×
1935
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1936
      goto _OVER;
×
1937
    }
1938
    if (!pStb) {
85!
1939
      createReq.normSourceTbUid = pBaseTsma->stbUid;
×
1940
    }
1941
  }
1942

1943
  SCreateTSMACxt cxt = {
259✔
1944
    .pMnode = pMnode,
1945
    .pCreateSmaReq = &createReq,
1946
    .pCreateStreamReq = NULL,
1947
    .streamName = streamName,
1948
    .targetStbFullName = streamTargetStbFullName,
1949
    .pDb = pDb,
1950
    .pRpcReq = pReq,
1951
    .pSma = NULL,
1952
    .pBaseSma = pBaseTsma,
1953
    .pSrcStb = pStb,
1954
  };
1955

1956
  code = mndCreateTSMA(&cxt);
259✔
1957
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
259!
1958

1959
_OVER:
×
1960
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
274!
1961
    mError("tsma:%s, failed to create since %s", createReq.name, tstrerror(code));
15!
1962
  }
1963

1964
  if (pStb) mndReleaseStb(pMnode, pStb);
274✔
1965
  if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
274✔
1966
  mndReleaseSma(pMnode, pSma);
274✔
1967
  mndReleaseStream(pMnode, pStream);
274✔
1968
  mndReleaseDb(pMnode, pDb);
274✔
1969
  tFreeSMCreateSmaReq(&createReq);
274✔
1970

1971
  TAOS_RETURN(code);
274✔
1972
}
1973

1974
static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) {
214✔
1975
  int32_t code = -1;
214✔
1976
  STransAction dropStreamRedoAction = {0};
214✔
1977
  STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "drop-tsma");
214✔
1978
  if (!pTrans) {
214!
1979
    code = terrno;
×
1980
    goto _OVER;
×
1981
  }
1982
  SMDropStreamReq dropStreamReq = {0};
214✔
1983
  pCxt->pDropStreamReq = &dropStreamReq;
214✔
1984
  code = mndCreateTSMABuildDropStreamReq(pCxt);
214✔
1985
  if (TSDB_CODE_SUCCESS != code) {
214!
1986
    goto _OVER;
×
1987
  }
1988
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
214✔
1989
  if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
214!
1990
  mndTransSetSerial(pTrans);
214✔
1991
  mndGetMnodeEpSet(pCxt->pMnode, &dropStreamRedoAction.epSet);
214✔
1992
  dropStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
214✔
1993
  dropStreamRedoAction.msgType = TDMT_STREAM_DROP;
214✔
1994
  dropStreamRedoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
214✔
1995
  dropStreamRedoAction.pCont = taosMemoryCalloc(1, dropStreamRedoAction.contLen);
214✔
1996
  if (!dropStreamRedoAction.pCont) {
214!
1997
    code = terrno;
×
1998
    goto _OVER;
×
1999
  }
2000
  if (dropStreamRedoAction.contLen !=
214!
2001
      tSerializeSMDropStreamReq(dropStreamRedoAction.pCont, dropStreamRedoAction.contLen, pCxt->pDropStreamReq)) {
214✔
2002
    mError("tsma: %s, failed to drop due to drop stream req encode failure", pCxt->pDropSmaReq->name);
×
2003
    code = TSDB_CODE_INVALID_MSG;
×
2004
    goto _OVER;
×
2005
  }
2006

2007
  // output stable is not dropped when dropping stream, dropping it when dropping tsma
2008
  SMDropStbReq dropStbReq = {0};
214✔
2009
  dropStbReq.igNotExists = false;
214✔
2010
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
214✔
2011
  dropStbReq.sql = "drop";
214✔
2012
  dropStbReq.sqlLen = 5;
214✔
2013

2014
  STransAction dropStbRedoAction = {0};
214✔
2015
  mndGetMnodeEpSet(pCxt->pMnode, &dropStbRedoAction.epSet);
214✔
2016
  dropStbRedoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
214✔
2017
  dropStbRedoAction.msgType = TDMT_MND_STB_DROP;
214✔
2018
  dropStbRedoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
214✔
2019
  dropStbRedoAction.pCont = taosMemoryCalloc(1, dropStbRedoAction.contLen);
214✔
2020
  if (!dropStbRedoAction.pCont) {
214!
2021
    code = terrno;
×
2022
    goto _OVER;
×
2023
  }
2024
  if (dropStbRedoAction.contLen != tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
214!
2025
    mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name);
×
2026
    code = TSDB_CODE_INVALID_MSG;
×
2027
    goto _OVER;
×
2028
  }
2029

2030
  SDbObj newDb = {0};
214✔
2031
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
214✔
2032
  newDb.tsmaVersion++;
214✔
2033
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
214!
2034
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
214!
2035
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStreamRedoAction), NULL, _OVER);
214!
2036
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStbRedoAction), NULL, _OVER);
214!
2037
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
214!
2038
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
214!
2039
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
214!
2040
  code = TSDB_CODE_SUCCESS;
214✔
2041
_OVER:
214✔
2042
  tFreeMDropStreamReq(pCxt->pDropStreamReq);
214✔
2043
  mndTransDrop(pTrans);
214✔
2044
  TAOS_RETURN(code);
214✔
2045
}
2046

2047
static bool hasRecursiveTsmasBasedOnMe(SMnode* pMnode, const SSmaObj* pSma) {
219✔
2048
  SSmaObj *pSmaObj = NULL;
219✔
2049
  void *   pIter = NULL;
219✔
2050
  while (1) {
2051
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj);
580✔
2052
    if (pIter == NULL) break;
580✔
2053
    if (0 == strncmp(pSmaObj->baseSmaName, pSma->name, TSDB_TABLE_FNAME_LEN)) {
366✔
2054
      sdbRelease(pMnode->pSdb, pSmaObj);
5✔
2055
      sdbCancelFetch(pMnode->pSdb, pIter);
5✔
2056
      return true;
5✔
2057
    }
2058
    sdbRelease(pMnode->pSdb, pSmaObj);
361✔
2059
  }
2060
  return false;
214✔
2061
}
2062

2063
static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) {
219✔
2064
  int32_t      code = -1;
219✔
2065
  SMDropSmaReq dropReq = {0};
219✔
2066
  SSmaObj *    pSma = NULL;
219✔
2067
  SDbObj *     pDb = NULL;
219✔
2068
  SMnode *     pMnode = pReq->info.node;
219✔
2069
  if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) {
219!
2070
    code = TSDB_CODE_INVALID_MSG;
×
2071
    goto _OVER;
×
2072
  }
2073

2074
  char  streamName[TSDB_TABLE_FNAME_LEN] = {0};
219✔
2075
  char  streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
219✔
2076
  code = mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
219✔
2077
  if (TSDB_CODE_SUCCESS != code) {
219!
2078
    goto _OVER;
×
2079
  }
2080

2081
  SStbObj* pStb = mndAcquireStb(pMnode, streamTargetStbFullName);
219✔
2082

2083
  pSma = mndAcquireSma(pMnode, dropReq.name);
219✔
2084
  if (!pSma && dropReq.igNotExists) {
219!
2085
    code = 0;
×
2086
    goto _OVER;
×
2087
  }
2088
  if (!pSma) {
219!
2089
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
2090
    goto _OVER;
×
2091
  }
2092
  SName name = {0};
219✔
2093
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
219✔
2094
  if (TSDB_CODE_SUCCESS != code) {
219!
2095
    goto _OVER;
×
2096
  }
2097
  char db[TSDB_TABLE_FNAME_LEN] = {0};
219✔
2098
  (void)tNameGetFullDbName(&name, db);
219✔
2099

2100
  pDb = mndAcquireDb(pMnode, db);
219✔
2101
  if (!pDb) {
219!
2102
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
2103
    goto _OVER;
×
2104
  }
2105

2106
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
219!
2107
    goto _OVER;
×
2108
  }
2109

2110
  if (hasRecursiveTsmasBasedOnMe(pMnode, pSma)) {
219✔
2111
    code = TSDB_CODE_MND_INVALID_DROP_TSMA;
5✔
2112
    goto _OVER;
5✔
2113
  }
2114

2115
  SCreateTSMACxt cxt = {
214✔
2116
    .pDb = pDb,
2117
    .pMnode = pMnode,
2118
    .pRpcReq = pReq,
2119
    .pSma = pSma,
2120
    .streamName = streamName,
2121
    .targetStbFullName = streamTargetStbFullName,
2122
    .pDropSmaReq = &dropReq,
2123
  };
2124

2125
  code = mndDropTSMA(&cxt);
214✔
2126

2127
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
214!
2128
_OVER:
×
2129

2130
  mndReleaseStb(pMnode, pStb);
219✔
2131
  mndReleaseSma(pMnode, pSma);
219✔
2132
  mndReleaseDb(pMnode, pDb);
219✔
2133
  TAOS_RETURN(code);
219✔
2134
}
2135

2136
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
6✔
2137
  SDbObj *         pDb = NULL;
6✔
2138
  int32_t          numOfRows = 0;
6✔
2139
  SSmaObj *        pSma = NULL;
6✔
2140
  SMnode *         pMnode = pReq->info.node;
6✔
2141
  int32_t          code = 0;
6✔
2142
  SColumnInfoData *pColInfo;
2143
  if (pShow->pIter == NULL) {
6!
2144
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
6✔
2145
  }
2146
  if (!pShow->pIter) {
6!
2147
    return terrno;
×
2148
  }
2149
  if (pShow->db[0]) {
6✔
2150
    pDb = mndAcquireDb(pMnode, pShow->db);
2✔
2151
  }
2152
  SSmaAndTagIter *pIter = pShow->pIter;
6✔
2153
  while (numOfRows < rows) {
14!
2154
    pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
14✔
2155
    if (pIter->pSmaIter == NULL) break;
14✔
2156
    SDbObj* pSrcDb = mndAcquireDb(pMnode, pSma->db);
8✔
2157

2158
    if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) {
8!
2159
      sdbRelease(pMnode->pSdb, pSma);
×
2160
      if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
2161
      continue;
×
2162
    }
2163

2164
    int32_t cols = 0;
8✔
2165
    SName   n = {0};
8✔
2166

2167
    code = tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
8✔
2168
    char smaName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
2169
    if (TSDB_CODE_SUCCESS == code) {
8!
2170
      STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n));
8✔
2171
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2172
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
8✔
2173
    }
2174

2175
    char db[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
2176
    if (TSDB_CODE_SUCCESS == code) {
8!
2177
      STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
8✔
2178
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2179
      code = colDataSetVal(pColInfo, numOfRows, (const char*)db, false);
8✔
2180
    }
2181

2182
    if (TSDB_CODE_SUCCESS == code) {
8!
2183
      code = tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
8✔
2184
    }
2185
    char srcTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
2186
    if (TSDB_CODE_SUCCESS == code) {
8!
2187
      STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
8✔
2188
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2189
      code = colDataSetVal(pColInfo, numOfRows, (const char*)srcTb, false);
8✔
2190
    }
2191

2192
    if (TSDB_CODE_SUCCESS == code) {
8!
2193
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2194
      code = colDataSetVal(pColInfo, numOfRows, (const char*)db, false);
8✔
2195
    }
2196

2197
    if (TSDB_CODE_SUCCESS == code) {
8!
2198
      code = tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
8✔
2199
    }
2200

2201
    if (TSDB_CODE_SUCCESS == code) {
8!
2202
      char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
2203
      STR_TO_VARSTR(targetTb, (char*)tNameGetTableName(&n));
8✔
2204
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2205
      code = colDataSetVal(pColInfo, numOfRows, (const char*)targetTb, false);
8✔
2206
    }
2207

2208
    if (TSDB_CODE_SUCCESS == code) {
8!
2209
      // stream name
2210
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2211
      code = colDataSetVal(pColInfo, numOfRows, (const char*)smaName, false);
8✔
2212
    }
2213

2214
    if (TSDB_CODE_SUCCESS == code) {
8!
2215
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2216
      code = colDataSetVal(pColInfo, numOfRows, (const char*)(&pSma->createdTime), false);
8✔
2217
    }
2218

2219
    // interval
2220
    char interval[64 + VARSTR_HEADER_SIZE] = {0};
8✔
2221
    int32_t len = 0;
8✔
2222
    if (TSDB_CODE_SUCCESS == code) {
8!
2223
      if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
8!
2224
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
8✔
2225
            getPrecisionUnit(pSrcDb->cfg.precision));
8✔
2226
      } else {
2227
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
×
2228
      }
2229
      varDataSetLen(interval, len);
8✔
2230
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2231
      code = colDataSetVal(pColInfo, numOfRows, interval, false);
8✔
2232
    }
2233

2234
    char buf[TSDB_MAX_SAVED_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
2235
    if (TSDB_CODE_SUCCESS == code) {
8!
2236
      // create sql
2237
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2238
      len = tsnprintf(buf + VARSTR_HEADER_SIZE, TSDB_MAX_SAVED_SQL_LEN, "%s", pSma->sql);
8✔
2239
      varDataSetLen(buf, TMIN(len, TSDB_MAX_SAVED_SQL_LEN));
8✔
2240
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
8✔
2241
    }
2242

2243
    // func list
2244
    len = 0;
8✔
2245
    SNode *pNode = NULL, *pFunc = NULL;
8✔
2246
    if (TSDB_CODE_SUCCESS  == code) {
8!
2247
      code = nodesStringToNode(pSma->ast, &pNode);
8✔
2248
    }
2249
    if (TSDB_CODE_SUCCESS == code) {
8!
2250
      char * start = buf + VARSTR_HEADER_SIZE;
8✔
2251
      FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) {
48!
2252
        if (nodeType(pFunc) == QUERY_NODE_FUNCTION) {
40!
2253
          SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
40✔
2254
          if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
40✔
2255
          len += tsnprintf(start, TSDB_MAX_SAVED_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "",
16✔
2256
                          ((SExprNode *)pFunc)->userAlias);
16✔
2257
          if (len >= TSDB_MAX_SAVED_SQL_LEN) {
16!
2258
            len = TSDB_MAX_SAVED_SQL_LEN;
×
2259
            break;
×
2260
          }
2261
          start = buf + VARSTR_HEADER_SIZE + len;
16✔
2262
        }
2263
      }
2264
      nodesDestroyNode(pNode);
8✔
2265
    }
2266

2267
    if (TSDB_CODE_SUCCESS == code) {
8!
2268
      varDataSetLen(buf, len);
8✔
2269
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
2270
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
8✔
2271
    }
2272

2273
    numOfRows++;
8✔
2274
    mndReleaseSma(pMnode, pSma);
8✔
2275
    mndReleaseDb(pMnode, pSrcDb);
8✔
2276
    if (TSDB_CODE_SUCCESS != code) {
8!
2277
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
2278
      numOfRows = code;
×
2279
      break;
×
2280
    }
2281
  }
2282
  mndReleaseDb(pMnode, pDb);
6✔
2283
  pShow->numOfRows += numOfRows;
6✔
2284
  if (numOfRows < rows) {
6!
2285
    taosMemoryFree(pShow->pIter);
6✔
2286
    pShow->pIter = NULL;
6✔
2287
  }
2288
  return numOfRows;
6✔
2289
}
2290

2291
static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
×
2292
  SSmaAndTagIter *p = pIter;
×
2293
  if (p != NULL) {
×
2294
    SSdb *pSdb = pMnode->pSdb;
×
2295
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
2296
  }
2297
  taosMemoryFree(p);
×
2298
}
×
2299

2300
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STableTSMAInfo* pInfo, const SSmaObj* pBaseTsma) {
1,986✔
2301
  int32_t code = 0;
1,986✔
2302
  pInfo->interval = pSma->interval;
1,986✔
2303
  pInfo->unit = pSma->intervalUnit;
1,986✔
2304
  pInfo->tsmaId = pSma->uid;
1,986✔
2305
  pInfo->version = pSma->version;
1,986✔
2306
  pInfo->tsmaId = pSma->uid;
1,986✔
2307
  pInfo->destTbUid = pDestStb->uid;
1,986✔
2308
  SName sName = {0};
1,986✔
2309
  code = tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,986✔
2310
  if (TSDB_CODE_SUCCESS != code) {
1,986!
2311
    return code;
×
2312
  }
2313
  tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN);
1,986✔
2314
  tstrncpy(pInfo->targetDbFName, pSma->db, TSDB_DB_FNAME_LEN);
1,986✔
2315
  code = tNameFromString(&sName, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,986✔
2316
  if (TSDB_CODE_SUCCESS != code) {
1,986!
2317
    return code;
×
2318
  }
2319
  tstrncpy(pInfo->targetTb, sName.tname, TSDB_TABLE_NAME_LEN);
1,986✔
2320
  tstrncpy(pInfo->dbFName, pSma->db, TSDB_DB_FNAME_LEN);
1,986✔
2321
  code = tNameFromString(&sName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,986✔
2322
  if (TSDB_CODE_SUCCESS != code) {
1,986!
2323
    return code;
×
2324
  }
2325
  tstrncpy(pInfo->tb, sName.tname, TSDB_TABLE_NAME_LEN);
1,986✔
2326
  pInfo->pFuncs = taosArrayInit(8, sizeof(STableTSMAFuncInfo));
1,986✔
2327
  if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY;
1,986!
2328

2329
  SNode *pNode, *pFunc;
2330
  if (TSDB_CODE_SUCCESS != nodesStringToNode(pBaseTsma ? pBaseTsma->ast : pSma->ast, &pNode)) {
1,986!
2331
    taosArrayDestroy(pInfo->pFuncs);
×
2332
    pInfo->pFuncs = NULL;
×
2333
    return TSDB_CODE_TSMA_INVALID_STAT;
×
2334
  }
2335
  if (pNode) {
1,986!
2336
    SSelectStmt *pSelect = (SSelectStmt *)pNode;
1,986✔
2337
    FOREACH(pFunc, pSelect->pProjectionList) {
308,246!
2338
      STableTSMAFuncInfo funcInfo = {0};
306,260✔
2339
      SFunctionNode *    pFuncNode = (SFunctionNode *)pFunc;
306,260✔
2340
      if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
306,260✔
2341
      funcInfo.funcId = pFuncNode->funcId;
300,302✔
2342
      funcInfo.colId = ((SColumnNode *)pFuncNode->pParameterList->pHead->pNode)->colId;
300,302✔
2343
      if (!taosArrayPush(pInfo->pFuncs, &funcInfo)) {
600,604!
2344
        code = terrno;
×
2345
        taosArrayDestroy(pInfo->pFuncs);
×
2346
        nodesDestroyNode(pNode);
×
2347
        return code;
×
2348
      }
2349
    }
2350
    nodesDestroyNode(pNode);
1,986✔
2351
  }
2352
  pInfo->ast = taosStrdup(pSma->ast);
1,986✔
2353
  if (!pInfo->ast) code = terrno;
1,986!
2354

2355
  if (code == TSDB_CODE_SUCCESS && pDestStb->numOfTags > 0) {
1,986!
2356
    pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema));
1,986✔
2357
    if (!pInfo->pTags) {
1,986!
2358
      code = terrno;
×
2359
    } else {
2360
      for (int32_t i = 0; i < pDestStb->numOfTags; ++i) {
14,640✔
2361
        if (NULL == taosArrayPush(pInfo->pTags, &pDestStb->pTags[i])) {
25,308!
2362
          code = terrno;
×
2363
          break;
×
2364
        }
2365
      }
2366
    }
2367
  }
2368
  if (code == TSDB_CODE_SUCCESS) {
1,986!
2369
    pInfo->pUsedCols = taosArrayInit(pDestStb->numOfColumns - 3, sizeof(SSchema));
1,986✔
2370
    if (!pInfo->pUsedCols)
1,986!
2371
      code = terrno;
×
2372
    else {
2373
      // skip _wstart, _wend, _duration
2374
      for (int32_t i = 1; i < pDestStb->numOfColumns - 2; ++i) {
302,288✔
2375
        if (NULL == taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i])) {
600,604!
2376
          code = terrno;
×
2377
          break;
×
2378
        }
2379
      }
2380
    }
2381
  }
2382
  TAOS_RETURN(code);
1,986✔
2383
}
2384

2385
// @note remember to mndReleaseSma(*ppOut)
2386
static int32_t mndGetDeepestBaseForTsma(SMnode* pMnode, SSmaObj* pSma, SSmaObj** ppOut) {
1,986✔
2387
  int32_t code = 0;
1,986✔
2388
  SSmaObj* pRecursiveTsma = NULL;
1,986✔
2389
  if (pSma->baseSmaName[0]) {
1,986✔
2390
    pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName);
435✔
2391
    if (!pRecursiveTsma) {
435!
2392
      mError("base tsma: %s for tsma: %s not found", pSma->baseSmaName, pSma->name);
×
2393
      return TSDB_CODE_MND_SMA_NOT_EXIST;
×
2394
    }
2395
    while (pRecursiveTsma->baseSmaName[0]) {
477✔
2396
      SSmaObj* pTmpSma = pRecursiveTsma;
42✔
2397
      pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
42✔
2398
      if (!pRecursiveTsma) {
42!
2399
        mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name);
×
2400
        mndReleaseSma(pMnode, pTmpSma);
×
2401
        return TSDB_CODE_MND_SMA_NOT_EXIST;
×
2402
      }
2403
      mndReleaseSma(pMnode, pTmpSma);
42✔
2404
    }
2405
  }
2406
  *ppOut = pRecursiveTsma;
1,986✔
2407
  return code;
1,986✔
2408
}
2409

2410

2411
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
94✔
2412
  int32_t  code = -1;
94✔
2413
  SSmaObj *pSma = NULL;
94✔
2414
  SSmaObj *pBaseTsma = NULL;
94✔
2415
  SStbObj *pDstStb = NULL;
94✔
2416

2417
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName);
94✔
2418
  if (pSma) {
94!
2419
    pDstStb = mndAcquireStb(pMnode, pSma->dstTbName);
94✔
2420
    if (!pDstStb) {
94!
2421
      sdbRelease(pMnode->pSdb, pSma);
×
2422
      return TSDB_CODE_SUCCESS;
×
2423
    }
2424

2425
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
94✔
2426
    if (!pTsma) {
94!
2427
      code = terrno;
×
2428
      sdbRelease(pMnode->pSdb, pSma);
×
2429
      mndReleaseStb(pMnode, pDstStb);
×
2430
      TAOS_RETURN(code);
×
2431
    }
2432

2433
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
94✔
2434
    if (code == 0) {
94!
2435
      code = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma, pBaseTsma);
94✔
2436
    }
2437
    mndReleaseStb(pMnode, pDstStb);
94✔
2438
    sdbRelease(pMnode->pSdb, pSma);
94✔
2439
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
94✔
2440
    if (terrno) {
94!
2441
      tFreeAndClearTableTSMAInfo(pTsma);
×
2442
      TAOS_RETURN(code);
×
2443
    }
2444
    if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
188!
2445
      code = terrno;
×
2446
      tFreeAndClearTableTSMAInfo(pTsma);
×
2447
    }
2448
    *exist = true;
94✔
2449
  }
2450
  TAOS_RETURN(code);
94✔
2451
}
2452

2453
typedef bool (*tsmaFilter)(const SSmaObj* pSma, void* param);
2454

2455
static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilter filtered, void* param, bool* exist) {
22,405✔
2456
  int32_t        code = 0;
22,405✔
2457
  SSmaObj *      pSma = NULL;
22,405✔
2458
  SSmaObj *      pBaseTsma = NULL;
22,405✔
2459
  SSdb *         pSdb = pMnode->pSdb;
22,405✔
2460
  void *         pIter = NULL;
22,405✔
2461
  SStreamObj *   pStream = NULL;
22,405✔
2462
  SStbObj *      pStb = NULL;
22,405✔
2463
  bool           shouldRetry = false;
22,405✔
2464

2465
  while (1) {
2,259✔
2466
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
24,664✔
2467
    if (pIter == NULL) break;
24,665✔
2468

2469
    if (filtered(pSma, param)) {
2,259✔
2470
      sdbRelease(pSdb, pSma);
364✔
2471
      continue;
367✔
2472
    }
2473

2474
    pStb = mndAcquireStb(pMnode, pSma->dstTbName);
1,895✔
2475
    if (!pStb) {
1,895✔
2476
      sdbRelease(pSdb, pSma);
3✔
2477
      shouldRetry = true;
3✔
2478
      continue;
3✔
2479
    }
2480

2481
    SName smaName;
2482
    char streamName[TSDB_TABLE_FNAME_LEN] = {0};
1,892✔
2483
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,892✔
2484
    if (TSDB_CODE_SUCCESS != code) {
1,892!
2485
      sdbRelease(pSdb, pSma);
×
2486
      mndReleaseStb(pMnode, pStb);
×
2487
      TAOS_RETURN(code);
×
2488
    }
2489
    sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
1,892✔
2490
    pStream = NULL;
1,892✔
2491

2492
    code = mndAcquireStream(pMnode, streamName, &pStream);
1,892✔
2493
    if (!pStream) {
1,892!
2494
      shouldRetry = true;
×
2495
      sdbRelease(pSdb, pSma);
×
2496
      mndReleaseStb(pMnode, pStb);
×
2497
      continue;
×
2498
    }
2499
    if (code != 0) {
1,892!
2500
      sdbRelease(pSdb, pSma);
×
2501
      mndReleaseStb(pMnode, pStb);
×
2502
      TAOS_RETURN(code);
×
2503
    }
2504

2505
    int64_t streamId = pStream->uid;
1,892✔
2506
    mndReleaseStream(pMnode, pStream);
1,892✔
2507

2508
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
1,892✔
2509
    if (!pTsma) {
1,892!
2510
      code = terrno;
×
2511
      mndReleaseStb(pMnode, pStb);
×
2512
      sdbRelease(pSdb, pSma);
×
2513
      sdbCancelFetch(pSdb, pIter);
×
2514
      TAOS_RETURN(code);
×
2515
    }
2516
    pTsma->streamUid = streamId;
1,892✔
2517

2518
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
1,892✔
2519
    if (code == 0) {
1,892!
2520
      code = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma, pBaseTsma);
1,892✔
2521
    }
2522
    mndReleaseStb(pMnode, pStb);
1,892✔
2523
    sdbRelease(pSdb, pSma);
1,892✔
2524
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
1,892✔
2525
    if (terrno) {
1,892!
2526
      tFreeAndClearTableTSMAInfo(pTsma);
×
2527
      sdbCancelFetch(pSdb, pIter);
×
2528
      TAOS_RETURN(code);
×
2529
    }
2530
    if (NULL == taosArrayPush(pRsp->pTsmas, &pTsma)) {
3,784!
2531
      code = terrno;
×
2532
      tFreeAndClearTableTSMAInfo(pTsma);
×
2533
      sdbCancelFetch(pSdb, pIter);
×
2534
      TAOS_RETURN(code);
×
2535
    }
2536
    *exist = true;
1,892✔
2537
  }
2538
  if (shouldRetry) {
22,406✔
2539
    return TSDB_CODE_NEED_RETRY;
2✔
2540
  }
2541
  return TSDB_CODE_SUCCESS;
22,404✔
2542
}
2543

2544
static bool tsmaTbFilter(const SSmaObj* pSma, void* param) {
1,908✔
2545
  const char* tbFName = param;
1,908✔
2546
  return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0;
1,908!
2547
}
2548

2549
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *pRsp, bool *exist) {
20,148✔
2550
  return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist);
20,148✔
2551
}
2552

2553
static bool tsmaDbFilter(const SSmaObj* pSma, void* param) {
351✔
2554
  uint64_t *dbUid = param;
351✔
2555
  return pSma->dbUid != *dbUid;
351✔
2556
}
2557

2558
int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist) {
2,257✔
2559
  return mndGetSomeTsmas(pMnode, pRsp, tsmaDbFilter, &dbUid, exist);
2,257✔
2560
}
2561

2562
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
20,242✔
2563
  STableTSMAInfoRsp rsp = {0};
20,242✔
2564
  int32_t           code = -1;
20,242✔
2565
  STableTSMAInfoReq tsmaReq = {0};
20,242✔
2566
  bool              exist = false;
20,242✔
2567
  SMnode *          pMnode = pReq->info.node;
20,242✔
2568

2569
  TAOS_CHECK_GOTO(tDeserializeTableTSMAInfoReq(pReq->pCont, pReq->contLen, &tsmaReq), NULL, _OVER);
20,242!
2570

2571
  rsp.pTsmas = taosArrayInit(4, POINTER_BYTES);
20,243✔
2572
  if (NULL == rsp.pTsmas) {
20,243!
2573
    code = terrno;
×
2574
    goto _OVER;
×
2575
  }
2576

2577
  if (tsmaReq.fetchingWithTsmaName) {
20,243✔
2578
    code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
94✔
2579
  } else {
2580
    code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
20,149✔
2581
    if (TSDB_CODE_NEED_RETRY == code) {
20,149!
2582
      code = TSDB_CODE_SUCCESS;
×
2583
    }
2584
  }
2585
  if (code) {
20,243!
2586
    goto _OVER;
×
2587
  }
2588

2589
  if (!exist) {
20,243✔
2590
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
19,100✔
2591
  } else {
2592
    int32_t contLen = tSerializeTableTSMAInfoRsp(NULL, 0, &rsp);
1,143✔
2593
    void   *pRsp = rpcMallocCont(contLen);
1,143✔
2594
    if (pRsp == NULL) {
1,143!
2595
      code = terrno;
×
2596
      goto _OVER;
×
2597
    }
2598

2599
    int32_t len = tSerializeTableTSMAInfoRsp(pRsp, contLen, &rsp);
1,143✔
2600
    if (len < 0) {
1,143!
2601
      code = terrno;
×
2602
      goto _OVER;
×
2603
    }
2604

2605
    pReq->info.rsp = pRsp;
1,143✔
2606
    pReq->info.rspLen = contLen;
1,143✔
2607

2608
    code = 0;
1,143✔
2609
  }
2610

2611
_OVER:
20,243✔
2612
  tFreeTableTSMAInfoRsp(&rsp);
20,243✔
2613
  TAOS_RETURN(code);
20,242✔
2614
}
2615

2616
static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo **ppTsma) {
6✔
2617
  STableTSMAInfo *pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
6✔
2618
  if (!pInfo) {
6!
2619
    return terrno;
×
2620
  }
2621
  pInfo->pFuncs = NULL;
6✔
2622
  pInfo->tsmaId = pTsmaVer->tsmaId;
6✔
2623
  tstrncpy(pInfo->dbFName, pTsmaVer->dbFName, TSDB_DB_FNAME_LEN);
6✔
2624
  tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN);
6✔
2625
  tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN);
6✔
2626
  pInfo->dbId = pTsmaVer->dbId;
6✔
2627
  pInfo->ast = taosMemoryCalloc(1, 1);
6✔
2628
  if (!pInfo->ast) {
6!
2629
    taosMemoryFree(pInfo);
×
2630
    return terrno;
×
2631
  }
2632
  *ppTsma = pInfo;
6✔
2633
  return TSDB_CODE_SUCCESS;
6✔
2634
}
2635

2636
int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp,
409✔
2637
                            int32_t *pRspLen) {
2638
  int32_t            code = -1;
409✔
2639
  STSMAHbRsp         hbRsp = {0};
409✔
2640
  int32_t            rspLen = 0;
409✔
2641
  void *             pRsp = NULL;
409✔
2642
  char               tsmaFName[TSDB_TABLE_FNAME_LEN] = {0};
409✔
2643
  STableTSMAInfo *   pTsmaInfo = NULL;
409✔
2644

2645
  hbRsp.pTsmas = taosArrayInit(numOfTsmas, POINTER_BYTES);
409✔
2646
  if (!hbRsp.pTsmas) {
409!
2647
    code = terrno;
×
2648
    TAOS_RETURN(code);
×
2649
  }
2650

2651
  for (int32_t i = 0; i < numOfTsmas; ++i) {
851✔
2652
    STSMAVersion* pTsmaVer = &pTsmaVersions[i];
442✔
2653
    pTsmaVer->dbId = be64toh(pTsmaVer->dbId);
442✔
2654
    pTsmaVer->tsmaId = be64toh(pTsmaVer->tsmaId);
442✔
2655
    pTsmaVer->version = ntohl(pTsmaVer->version);
442✔
2656

2657
    snprintf(tsmaFName, sizeof(tsmaFName), "%s.%s", pTsmaVer->dbFName, pTsmaVer->name);
442✔
2658
    SSmaObj* pSma = mndAcquireSma(pMnode, tsmaFName);
442✔
2659
    if (!pSma) {
442✔
2660
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
6✔
2661
      if (code) goto _OVER;
6!
2662
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
12!
2663
        code = terrno;
×
2664
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2665
        goto _OVER;
×
2666
      }
2667
      continue;
442✔
2668
    }
2669

2670
    if (pSma->uid != pTsmaVer->tsmaId) {
436!
2671
      mDebug("tsma: %s.%" PRIx64 " tsmaId mismatch with current %" PRIx64, tsmaFName, pTsmaVer->tsmaId, pSma->uid);
×
2672
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2673
      mndReleaseSma(pMnode, pSma);
×
2674
      if (code) goto _OVER;
×
2675
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2676
        code = terrno;
×
2677
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2678
        goto _OVER;
×
2679
      }
2680
      continue;
×
2681
    } else if (pSma->version == pTsmaVer->version) {
436!
2682
      mndReleaseSma(pMnode, pSma);
436✔
2683
      continue;
436✔
2684
    }
2685

2686
    SStbObj* pDestStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
2687
    if (!pDestStb) {
×
2688
      mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName);
×
2689
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2690
      mndReleaseSma(pMnode, pSma);
×
2691
      if (code) goto _OVER;
×
2692
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2693
        code = terrno;
×
2694
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2695
        goto _OVER;
×
2696
      }
2697
      continue;
×
2698
    }
2699

2700
    // dump smaObj into rsp
2701
    STableTSMAInfo *   pInfo = NULL;
×
2702
    pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2703
    if (!pInfo) {
×
2704
      code = terrno;
×
2705
      mndReleaseSma(pMnode, pSma);
×
2706
      mndReleaseStb(pMnode, pDestStb);
×
2707
      goto _OVER;
×
2708
    }
2709

2710
    SSmaObj* pBaseSma = NULL;
×
2711
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma);
×
2712
    if (code == 0) code = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma);
×
2713

2714
    mndReleaseStb(pMnode, pDestStb);
×
2715
    mndReleaseSma(pMnode, pSma);
×
2716
    if (pBaseSma) mndReleaseSma(pMnode, pBaseSma);
×
2717
    if (terrno) {
×
2718
      tFreeAndClearTableTSMAInfo(pInfo);
×
2719
      goto _OVER;
×
2720
    }
2721

2722
    if (NULL == taosArrayPush(hbRsp.pTsmas, pInfo)) {
×
2723
      code = terrno;
×
2724
      tFreeAndClearTableTSMAInfo(pInfo);
×
2725
      goto _OVER;
×
2726
    }
2727
  }
2728

2729
  rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp);
409✔
2730
  if (rspLen < 0) {
409!
2731
    code = terrno;
×
2732
    goto _OVER;
×
2733
  }
2734

2735
  pRsp = taosMemoryMalloc(rspLen);
409✔
2736
  if (!pRsp) {
409!
2737
    code = terrno;
×
2738
    rspLen = 0;
×
2739
    goto _OVER;
×
2740
  }
2741

2742
  rspLen = tSerializeTSMAHbRsp(pRsp, rspLen, &hbRsp);
409✔
2743
  if (rspLen < 0) {
409!
2744
    code = terrno;
×
2745
    goto _OVER;
×
2746
  }
2747
  code = 0;
409✔
2748
_OVER:
409✔
2749
  tFreeTSMAHbRsp(&hbRsp);
409✔
2750
  *ppRsp = pRsp;
409✔
2751
  *pRspLen = rspLen;
409✔
2752
  TAOS_RETURN(code);
409✔
2753
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc