• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.18
/source/dnode/mnode/impl/src/mndSma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSma.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndIndex.h"
22
#include "mndIndexComm.h"
23
#include "mndInfoSchema.h"
24
#include "mndMnode.h"
25
#include "mndPrivilege.h"
26
#include "mndScheduler.h"
27
#include "mndShow.h"
28
#include "mndStb.h"
29
#include "mndStream.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "parser.h"
34
#include "tname.h"
35

36
#define TSDB_SMA_VER_NUMBER   1
37
#define TSDB_SMA_RESERVE_SIZE 64
38

39
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma);
40
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
41
static int32_t  mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
42
static int32_t  mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
43
static int32_t  mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
44
static int32_t  mndProcessCreateSmaReq(SRpcMsg *pReq);
45
static int32_t  mndProcessDropSmaReq(SRpcMsg *pReq);
46
static int32_t  mndProcessGetSmaReq(SRpcMsg *pReq);
47
static int32_t  mndProcessGetTbSmaReq(SRpcMsg *pReq);
48
static int32_t  mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void     mndDestroySmaObj(SSmaObj *pSmaObj);
50

51
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq);
52
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq);
53

54
// sma and tag index comm func
55
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
56
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
57
static void    mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
58

59
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
60
static void    mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter);
61
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq);
62

63
typedef struct SCreateTSMACxt {
64
  SMnode        *pMnode;
65
  const SRpcMsg *pRpcReq;
66
  union {
67
    const SMCreateSmaReq *pCreateSmaReq;
68
    const SMDropSmaReq   *pDropSmaReq;
69
  };
70
  SDbObj             *pDb;
71
  SStbObj            *pSrcStb;
72
  SSmaObj            *pSma;
73
  const SSmaObj      *pBaseSma;
74
  SCMCreateStreamReq *pCreateStreamReq;
75
  SMDropStreamReq    *pDropStreamReq;
76
  const char         *streamName;
77
  const char         *targetStbFullName;
78
  SNodeList          *pProjects;
79
} SCreateTSMACxt;
80

81
int32_t mndInitSma(SMnode *pMnode) {
1,516✔
82
  SSdbTable table = {
1,516✔
83
      .sdbType = SDB_SMA,
84
      .keyType = SDB_KEY_BINARY,
85
      .encodeFp = (SdbEncodeFp)mndSmaActionEncode,
86
      .decodeFp = (SdbDecodeFp)mndSmaActionDecode,
87
      .insertFp = (SdbInsertFp)mndSmaActionInsert,
88
      .updateFp = (SdbUpdateFp)mndSmaActionUpdate,
89
      .deleteFp = (SdbDeleteFp)mndSmaActionDelete,
90
  };
91

92
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq);
1,516✔
93
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
1,516✔
94
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
1,516✔
95
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
1,516✔
96
  mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
1,516✔
97
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
1,516✔
98

99
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
1,516✔
100
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
1,516✔
101

102
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq);
1,516✔
103
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq);
1,516✔
104
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_TSMA, mndProcessGetTbTSMAReq);
1,516✔
105
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TSMA, mndProcessGetTbTSMAReq);
1,516✔
106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA);
1,516✔
107
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA);
1,516✔
108

109
  return sdbSetTable(pMnode->pSdb, table);
1,516✔
110
}
111

112
void mndCleanupSma(SMnode *pMnode) {}
1,515✔
113

114
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
1,264✔
115
  int32_t code = 0;
1,264✔
116
  int32_t lino = 0;
1,264✔
117
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,264✔
118

119
  int32_t size =
1,264✔
120
      sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
1,264✔
121
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size);
1,264✔
122
  if (pRaw == NULL) goto _OVER;
1,264!
123

124
  int32_t dataPos = 0;
1,264✔
125
  SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
1,264!
126
  SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
1,264!
127
  SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
1,264!
128
  SDB_SET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
1,264!
129
  SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER)
1,264!
130
  SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER)
1,264!
131
  SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER)
1,264!
132
  SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER)
1,264!
133
  SDB_SET_INT64(pRaw, dataPos, pSma->dstTbUid, _OVER)
1,264!
134
  SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER)
1,264!
135
  SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER)
1,264!
136
  SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER)
1,264!
137
  SDB_SET_INT32(pRaw, dataPos, pSma->dstVgId, _OVER)
1,264!
138
  SDB_SET_INT64(pRaw, dataPos, pSma->interval, _OVER)
1,264!
139
  SDB_SET_INT64(pRaw, dataPos, pSma->offset, _OVER)
1,264!
140
  SDB_SET_INT64(pRaw, dataPos, pSma->sliding, _OVER)
1,264!
141
  SDB_SET_INT32(pRaw, dataPos, pSma->exprLen, _OVER)
1,264!
142
  SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER)
1,264!
143
  SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER)
1,264!
144
  SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER)
1,264!
145
  SDB_SET_INT32(pRaw, dataPos, pSma->version, _OVER)
1,264!
146

147
  if (pSma->exprLen > 0) {
1,264!
148
    SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
1,264!
149
  }
150
  if (pSma->tagsFilterLen > 0) {
1,264!
151
    SDB_SET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
152
  }
153
  if (pSma->sqlLen > 0) {
1,264!
154
    SDB_SET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
1,264!
155
  }
156
  if (pSma->astLen > 0) {
1,264!
157
    SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
1,264!
158
  }
159
  SDB_SET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
1,264!
160

161
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
1,264!
162
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
1,264!
163

164
  terrno = 0;
1,264✔
165

166
_OVER:
1,264✔
167
  if (terrno != 0) {
1,264!
168
    mError("sma:%s, failed to encode to raw:%p since %s", pSma->name, pRaw, terrstr());
×
169
    sdbFreeRaw(pRaw);
×
170
    return NULL;
×
171
  }
172

173
  mTrace("sma:%s, encode to raw:%p, row:%p", pSma->name, pRaw, pSma);
1,264✔
174
  return pRaw;
1,264✔
175
}
176

177
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
970✔
178
  int32_t code = 0;
970✔
179
  int32_t lino = 0;
970✔
180
  terrno = TSDB_CODE_OUT_OF_MEMORY;
970✔
181
  SSdbRow *pRow = NULL;
970✔
182
  SSmaObj *pSma = NULL;
970✔
183

184
  int8_t sver = 0;
970✔
185
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
970!
186

187
  if (sver != TSDB_SMA_VER_NUMBER) {
970!
188
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
189
    goto _OVER;
×
190
  }
191

192
  pRow = sdbAllocRow(sizeof(SSmaObj));
970✔
193
  if (pRow == NULL) goto _OVER;
970!
194

195
  pSma = sdbGetRowObj(pRow);
970✔
196
  if (pSma == NULL) goto _OVER;
970!
197

198
  int32_t dataPos = 0;
970✔
199

200
  SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
970!
201
  SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
970!
202
  SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
970!
203
  SDB_GET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
970!
204
  SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER)
970!
205
  SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER)
970!
206
  SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER)
970!
207
  SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER)
970!
208
  SDB_GET_INT64(pRaw, dataPos, &pSma->dstTbUid, _OVER)
970!
209
  SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER)
970!
210
  SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER)
970!
211
  SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER)
970!
212
  SDB_GET_INT32(pRaw, dataPos, &pSma->dstVgId, _OVER)
970!
213
  SDB_GET_INT64(pRaw, dataPos, &pSma->interval, _OVER)
970!
214
  SDB_GET_INT64(pRaw, dataPos, &pSma->offset, _OVER)
970!
215
  SDB_GET_INT64(pRaw, dataPos, &pSma->sliding, _OVER)
970!
216
  SDB_GET_INT32(pRaw, dataPos, &pSma->exprLen, _OVER)
970!
217
  SDB_GET_INT32(pRaw, dataPos, &pSma->tagsFilterLen, _OVER)
970!
218
  SDB_GET_INT32(pRaw, dataPos, &pSma->sqlLen, _OVER)
970!
219
  SDB_GET_INT32(pRaw, dataPos, &pSma->astLen, _OVER)
970!
220
  SDB_GET_INT32(pRaw, dataPos, &pSma->version, _OVER)
970!
221

222
  if (pSma->exprLen > 0) {
970!
223
    pSma->expr = taosMemoryCalloc(pSma->exprLen, 1);
970!
224
    if (pSma->expr == NULL) goto _OVER;
970!
225
    SDB_GET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
970!
226
  }
227

228
  if (pSma->tagsFilterLen > 0) {
970!
229
    pSma->tagsFilter = taosMemoryCalloc(pSma->tagsFilterLen, 1);
×
230
    if (pSma->tagsFilter == NULL) goto _OVER;
×
231
    SDB_GET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
232
  }
233

234
  if (pSma->sqlLen > 0) {
970!
235
    pSma->sql = taosMemoryCalloc(pSma->sqlLen, 1);
970!
236
    if (pSma->sql == NULL) goto _OVER;
970!
237
    SDB_GET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
970!
238
  }
239

240
  if (pSma->astLen > 0) {
970!
241
    pSma->ast = taosMemoryCalloc(pSma->astLen, 1);
970!
242
    if (pSma->ast == NULL) goto _OVER;
970!
243
    SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
970!
244
  }
245
  SDB_GET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
970!
246

247
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
970!
248

249
  terrno = 0;
970✔
250

251
_OVER:
970✔
252
  if (terrno != 0) {
970!
253
    if (pSma != NULL) {
×
254
      mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr());
×
255
      taosMemoryFreeClear(pSma->expr);
×
256
      taosMemoryFreeClear(pSma->tagsFilter);
×
257
      taosMemoryFreeClear(pSma->sql);
×
258
      taosMemoryFreeClear(pSma->ast);
×
259
    }
260
    taosMemoryFreeClear(pRow);
×
261
    return NULL;
×
262
  }
263

264
  mTrace("sma:%s, decode from raw:%p, row:%p", pSma->name, pRaw, pSma);
970✔
265
  return pRow;
970✔
266
}
267

268
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma) {
265✔
269
  mTrace("sma:%s, perform insert action, row:%p", pSma->name, pSma);
265✔
270
  return 0;
265✔
271
}
272

273
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSma) {
970✔
274
  mTrace("sma:%s, perform delete action, row:%p", pSma->name, pSma);
970✔
275
  taosMemoryFreeClear(pSma->tagsFilter);
970!
276
  taosMemoryFreeClear(pSma->expr);
970!
277
  taosMemoryFreeClear(pSma->sql);
970!
278
  taosMemoryFreeClear(pSma->ast);
970!
279
  return 0;
970✔
280
}
281

282
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew) {
469✔
283
  mTrace("sma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
469✔
284
  return 0;
469✔
285
}
286

287
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName) {
1,054✔
288
  SSdb    *pSdb = pMnode->pSdb;
1,054✔
289
  SSmaObj *pSma = sdbAcquire(pSdb, SDB_SMA, smaName);
1,054✔
290
  if (pSma == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
1,054!
UNCOV
291
    terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
×
292
  }
293
  return pSma;
1,054✔
294
}
295

296
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) {
1,427✔
297
  SSdb *pSdb = pMnode->pSdb;
1,427✔
298
  sdbRelease(pSdb, pSma);
1,427✔
299
}
1,427✔
300

301
SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *db) { return mndAcquireDb(pMnode, db); }
×
302

303
static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
26✔
304
  SEncoder encoder = {0};
26✔
305
  int32_t  contLen = 0;
26✔
306
  SName    name = {0};
26✔
307
  int32_t  code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
26✔
308
  if (TSDB_CODE_SUCCESS != code) {
26!
309
    return NULL;
×
310
  }
311

312
  SVCreateTSmaReq req = {0};
26✔
313
  req.version = 0;
26✔
314
  req.intervalUnit = pSma->intervalUnit;
26✔
315
  req.slidingUnit = pSma->slidingUnit;
26✔
316
//  req.timezoneInt = pSma->timezone;
317
  tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
26✔
318
  req.exprLen = pSma->exprLen;
26✔
319
  req.tagsFilterLen = pSma->tagsFilterLen;
26✔
320
  req.indexUid = pSma->uid;
26✔
321
  req.tableUid = pSma->stbUid;
26✔
322
  req.dstVgId = pSma->dstVgId;
26✔
323
  req.dstTbUid = pSma->dstTbUid;
26✔
324
  req.interval = pSma->interval;
26✔
325
  req.offset = pSma->offset;
26✔
326
  req.sliding = pSma->sliding;
26✔
327
  req.expr = pSma->expr;
26✔
328
  req.tagsFilter = pSma->tagsFilter;
26✔
329
  req.schemaRow = pSma->schemaRow;
26✔
330
  req.schemaTag = pSma->schemaTag;
26✔
331
  req.dstTbName = pSma->dstTbName;
26✔
332

333
  // get length
334
  int32_t ret = 0;
26✔
335
  tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret);
26!
336
  if (ret < 0) {
26!
337
    return NULL;
×
338
  }
339
  contLen += sizeof(SMsgHead);
26✔
340

341
  SMsgHead *pHead = taosMemoryMalloc(contLen);
26!
342
  if (pHead == NULL) {
26!
343
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
344
    return NULL;
×
345
  }
346

347
  pHead->contLen = htonl(contLen);
26✔
348
  pHead->vgId = htonl(pVgroup->vgId);
26✔
349

350
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
26✔
351
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
26✔
352
  if (tEncodeSVCreateTSmaReq(&encoder, &req) < 0) {
26!
353
    taosMemoryFreeClear(pHead);
×
354
    tEncoderClear(&encoder);
×
355
    return NULL;
×
356
  }
357

358
  tEncoderClear(&encoder);
26✔
359

360
  *pContLen = contLen;
26✔
361
  return pHead;
26✔
362
}
363

364
static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
×
365
  SEncoder encoder = {0};
×
366
  int32_t  contLen;
367
  SName    name = {0};
×
368
  int32_t  code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
369
  if (TSDB_CODE_SUCCESS != code) {
×
370
    terrno = code;
×
371
    return NULL;
×
372
  }
373

374
  SVDropTSmaReq req = {0};
×
375
  req.indexUid = pSma->uid;
×
376
  tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
×
377

378
  // get length
379
  int32_t ret = 0;
×
380
  tEncodeSize(tEncodeSVDropTSmaReq, &req, contLen, ret);
×
381
  if (ret < 0) {
×
382
    return NULL;
×
383
  }
384

385
  contLen += sizeof(SMsgHead);
×
386

387
  SMsgHead *pHead = taosMemoryMalloc(contLen);
×
388
  if (pHead == NULL) {
×
389
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
390
    return NULL;
×
391
  }
392

393
  pHead->contLen = htonl(contLen);
×
394
  pHead->vgId = htonl(pVgroup->vgId);
×
395

396
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
×
397
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
×
398

399
  if (tEncodeSVDropTSmaReq(&encoder, &req) < 0) {
×
400
    taosMemoryFreeClear(pHead);
×
401
    tEncoderClear(&encoder);
×
402
    return NULL;
×
403
  }
404
  tEncoderClear(&encoder);
×
405

406
  *pContLen = contLen;
×
407
  return pHead;
×
408
}
409

410
static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
261✔
411
  int32_t  code = 0;
261✔
412
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
261✔
413
  if (pRedoRaw == NULL) {
261!
414
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
415
    if (terrno != 0) code = terrno;
×
416
    TAOS_RETURN(code);
×
417
  }
418
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
261!
419
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
261!
420

421
  TAOS_RETURN(code);
261✔
422
}
423

424
static int32_t mndSetCreateSmaUndoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
235✔
425
  int32_t  code = 0;
235✔
426
  SSdbRaw *pUndoRaw = mndSmaActionEncode(pSma);
235✔
427
  if (!pUndoRaw) {
235!
428
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
429
    if (terrno != 0) code = terrno;
×
430
    TAOS_RETURN(code);
×
431
  }
432
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
235!
433
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
235!
434
  TAOS_RETURN(code);
235✔
435
}
436

437
static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
261✔
438
  int32_t  code = 0;
261✔
439
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
261✔
440
  if (pCommitRaw == NULL) {
261!
441
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
442
    if (terrno != 0) code = terrno;
×
443
    TAOS_RETURN(code);
×
444
  }
445
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
261!
446
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
261!
447

448
  TAOS_RETURN(code);
261✔
449
}
450

451
static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
26✔
452
  int32_t  code = 0;
26✔
453
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
26✔
454
  if (pVgRaw == NULL) {
26!
455
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
456
    if (terrno != 0) code = terrno;
×
457
    TAOS_RETURN(code);
×
458
  }
459
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
26!
460
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_UPDATE));
26!
461
  TAOS_RETURN(code);
26✔
462
}
463

464
static int32_t mndSetCreateSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
26✔
465
  int32_t  code = 0;
26✔
466
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
26✔
467
  if (pVgRaw == NULL) {
26!
468
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
469
    if (terrno != 0) code = terrno;
×
470
    TAOS_RETURN(code);
×
471
  }
472
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
26!
473
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_READY));
26!
474
  TAOS_RETURN(code);
26✔
475
}
476

477
static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
41✔
478
  int32_t code = 0;
41✔
479
  SStbObj stbObj = {0};
41✔
480
  taosRLockLatch(&pStb->lock);
41✔
481
  memcpy(&stbObj, pStb, sizeof(SStbObj));
41✔
482
  taosRUnLockLatch(&pStb->lock);
41✔
483
  stbObj.numOfColumns = 0;
41✔
484
  stbObj.pColumns = NULL;
41✔
485
  stbObj.numOfTags = 0;
41✔
486
  stbObj.pTags = NULL;
41✔
487
  stbObj.numOfFuncs = 0;
41✔
488
  stbObj.pFuncs = NULL;
41✔
489
  stbObj.updateTime = taosGetTimestampMs();
41✔
490
  stbObj.lock = 0;
41✔
491
  stbObj.smaVer++;
41✔
492

493
  SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj);
41✔
494
  if (pCommitRaw == NULL) {
41!
495
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
496
    if (terrno != 0) code = terrno;
×
497
    TAOS_RETURN(code);
×
498
  }
499
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
41!
500
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
41!
501

502
  TAOS_RETURN(code);
41✔
503
}
504

505
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
26✔
506
                                                SSmaObj *pSma) {
507
  int32_t    code = 0;
26✔
508
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
26✔
509
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
26✔
510
  if (pDnode == NULL) {
26!
511
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
512
    if (terrno != 0) code = terrno;
×
513
    TAOS_RETURN(code);
×
514
  }
515

516
  STransAction action = {0};
26✔
517
  action.epSet = mndGetDnodeEpset(pDnode);
26✔
518
  mndReleaseDnode(pMnode, pDnode);
26✔
519

520
  // todo add sma info here
521
  SNode *pAst = NULL;
26✔
522
  TAOS_CHECK_RETURN(nodesStringToNode(pSma->ast, &pAst));
26!
523
  if ((code = qExtractResultSchema(pAst, &pSma->schemaRow.nCols, &pSma->schemaRow.pSchema)) != 0) {
26!
524
    nodesDestroyNode(pAst);
×
525
    TAOS_RETURN(code);
×
526
  }
527
  nodesDestroyNode(pAst);
26✔
528
  pSma->schemaRow.version = 1;
26✔
529

530
  // TODO: the schemaTag generated by qExtractResultXXX later.
531
  pSma->schemaTag.nCols = 1;
26✔
532
  pSma->schemaTag.version = 1;
26✔
533
  pSma->schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema));
26!
534
  if (!pSma->schemaTag.pSchema) {
26!
535
    TAOS_RETURN(-1);
×
536
  }
537
  pSma->schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT;
26✔
538
  pSma->schemaTag.pSchema[0].bytes = TYPE_BYTES[TSDB_DATA_TYPE_BIGINT];
26✔
539
  pSma->schemaTag.pSchema[0].colId = pSma->schemaRow.nCols + PRIMARYKEY_TIMESTAMP_COL_ID;
26✔
540
  pSma->schemaTag.pSchema[0].flags = 0;
26✔
541
  snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId");
26✔
542

543
  int32_t smaContLen = 0;
26✔
544
  void   *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
26✔
545
  if (pSmaReq == NULL) {
26!
546
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
547
    if (terrno != 0) code = terrno;
×
548
    TAOS_RETURN(code);
×
549
  }
550
  pVgroup->pTsma = pSmaReq;
26✔
551

552
  int32_t contLen = 0;
26✔
553
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
26✔
554
  if (pReq == NULL) {
26!
555
    taosMemoryFreeClear(pSmaReq);
×
556
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
557
    if (terrno != 0) code = terrno;
×
558
    TAOS_RETURN(code);
×
559
  }
560

561
  action.mTraceId = pTrans->mTraceId;
26✔
562
  action.pCont = pReq;
26✔
563
  action.contLen = contLen;
26✔
564
  action.msgType = TDMT_DND_CREATE_VNODE;
26✔
565
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
26✔
566

567
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
26!
568
    taosMemoryFreeClear(pSmaReq);
×
569
    taosMemoryFree(pReq);
×
570
    TAOS_RETURN(code);
×
571
  }
572

573
  action.pCont = pSmaReq;
26✔
574
  action.contLen = smaContLen;
26✔
575
  action.msgType = TDMT_VND_CREATE_SMA;
26✔
576
  action.acceptableCode = TSDB_CODE_TSMA_ALREADY_EXIST;
26✔
577

578
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
26!
579
    taosMemoryFreeClear(pSmaReq);
×
580
    TAOS_RETURN(code);
×
581
  }
582

583
  TAOS_RETURN(code);
26✔
584
}
585

586
static void mndDestroySmaObj(SSmaObj *pSmaObj) {
26✔
587
  if (pSmaObj) {
26!
588
    taosMemoryFreeClear(pSmaObj->schemaRow.pSchema);
26!
589
    taosMemoryFreeClear(pSmaObj->schemaTag.pSchema);
26!
590
  }
591
}
26✔
592

593
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb,
26✔
594
                            const char *streamName) {
595
  int32_t code = 0;
26✔
596
  if (pDb->cfg.replications > 1) {
26!
597
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
598
    mError("sma:%s, failed to create since not support multiple replicas", pCreate->name);
×
599
    TAOS_RETURN(code);
×
600
  }
601
  SSmaObj smaObj = {0};
26✔
602
  memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
26✔
603
  memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
26✔
604
  memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
26✔
605
  smaObj.createdTime = taosGetTimestampMs();
26✔
606
  smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
26✔
607

608
  char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
26✔
609
  snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name);
26✔
610
  memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
26✔
611
  smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
26✔
612
  smaObj.stbUid = pStb->uid;
26✔
613
  smaObj.dbUid = pStb->dbUid;
26✔
614
  smaObj.intervalUnit = pCreate->intervalUnit;
26✔
615
  smaObj.slidingUnit = pCreate->slidingUnit;
26✔
616
#if 0
617
//  smaObj.timezone = pCreate->timezone;
618
#endif
619
//  smaObj.timezone = taosGetLocalTimezoneOffset();  // use timezone of server
620
  smaObj.interval = pCreate->interval;
26✔
621
  smaObj.offset = pCreate->offset;
26✔
622
  smaObj.sliding = pCreate->sliding;
26✔
623
  smaObj.exprLen = pCreate->exprLen;
26✔
624
  smaObj.tagsFilterLen = pCreate->tagsFilterLen;
26✔
625
  smaObj.sqlLen = pCreate->sqlLen;
26✔
626
  smaObj.astLen = pCreate->astLen;
26✔
627
  if (smaObj.exprLen > 0) {
26!
628
    smaObj.expr = pCreate->expr;
26✔
629
  }
630
  if (smaObj.tagsFilterLen > 0) {
26!
631
    smaObj.tagsFilter = pCreate->tagsFilter;
×
632
  }
633
  if (smaObj.sqlLen > 0) {
26!
634
    smaObj.sql = pCreate->sql;
26✔
635
  }
636
  if (smaObj.astLen > 0) {
26!
637
    smaObj.ast = pCreate->ast;
26✔
638
  }
639

640
  SStreamObj streamObj = {0};
26✔
641
  tstrncpy(streamObj.name, streamName, TSDB_STREAM_FNAME_LEN);
26✔
642
  tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
26✔
643
  tstrncpy(streamObj.targetDb, streamObj.sourceDb, TSDB_DB_FNAME_LEN);
26✔
644
  streamObj.createTime = taosGetTimestampMs();
26✔
645
  streamObj.updateTime = streamObj.createTime;
26✔
646
  streamObj.uid = mndGenerateUid(streamName, strlen(streamName));
26✔
647
  streamObj.sourceDbUid = pDb->uid;
26✔
648
  streamObj.targetDbUid = pDb->uid;
26✔
649
  streamObj.version = 1;
26✔
650
  streamObj.sql = taosStrdup(pCreate->sql);
26!
651
  if (!streamObj.sql) {
26!
652
    return terrno;
×
653
  }
654
  streamObj.smaId = smaObj.uid;
26✔
655
  streamObj.conf.watermark = pCreate->watermark;
26✔
656
  streamObj.deleteMark = pCreate->deleteMark;
26✔
657
  streamObj.conf.fillHistory = STREAM_FILL_HISTORY_ON;
26✔
658
  streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
26✔
659
  streamObj.conf.triggerParam = pCreate->maxDelay;
26✔
660
  streamObj.ast = taosStrdup(smaObj.ast);
26!
661
  if (!streamObj.ast) {
26!
662
    taosMemoryFree(streamObj.sql);
×
663
    return terrno;
×
664
  }
665
  streamObj.indexForMultiAggBalance = -1;
26✔
666

667
  // check the maxDelay
668
  if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
26✔
669
    int64_t msInterval = -1;
20✔
670
    int32_t code =
671
        convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND, &msInterval);
20✔
672
    if (TSDB_CODE_SUCCESS != code) {
20!
673
      mError("sma:%s, failed to create since convert time failed: %s", smaObj.name, tstrerror(code));
×
674
      return code;
×
675
    }
676
    streamObj.conf.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
20✔
677
  }
678
  if (streamObj.conf.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
26!
679
    streamObj.conf.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
×
680
  }
681

682
  if ((code = mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg)) != 0) {
26!
683
    mError("sma:%s, failed to create since %s", smaObj.name, tstrerror(code));
×
684
    TAOS_RETURN(code);
×
685
  }
686
  smaObj.dstVgId = streamObj.fixedSinkVg.vgId;
26✔
687
  streamObj.fixedSinkVgId = smaObj.dstVgId;
26✔
688

689
  SNode *pAst = NULL;
26✔
690
  if (nodesStringToNode(streamObj.ast, &pAst) < 0) {
26!
691
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
692
    mError("sma:%s, failed to create since parse ast error", smaObj.name);
×
693
    TAOS_RETURN(code);
×
694
  }
695

696
  // extract output schema from ast
697
  if (qExtractResultSchema(pAst, (int32_t *)&streamObj.outputSchema.nCols, &streamObj.outputSchema.pSchema) != 0) {
26!
698
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
699
    mError("sma:%s, failed to create since extract result schema error", smaObj.name);
×
700
    TAOS_RETURN(code);
×
701
  }
702

703
  SQueryPlan  *pPlan = NULL;
26✔
704
  SPlanContext cxt = {
26✔
705
      .pAstRoot = pAst,
706
      .topicQuery = false,
707
      .streamQuery = true,
708
      .triggerType = streamObj.conf.trigger,
26✔
709
      .watermark = streamObj.conf.watermark,
26✔
710
      .deleteMark = streamObj.deleteMark,
26✔
711
  };
712

713
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
26!
714
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
715
    mError("sma:%s, failed to create since create query plan error", smaObj.name);
×
716
    TAOS_RETURN(code);
×
717
  }
718

719
  // save physcial plan
720
  if (nodesNodeToString((SNode *)pPlan, false, &streamObj.physicalPlan, NULL) != 0) {
26!
721
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
722
    mError("sma:%s, failed to create since save physcial plan error", smaObj.name);
×
723
    TAOS_RETURN(code);
×
724
  }
725

726
  if (pAst != NULL) nodesDestroyNode(pAst);
26!
727
  nodesDestroyNode((SNode *)pPlan);
26✔
728

729
  code = -1;
26✔
730
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-sma");
26✔
731
  if (pTrans == NULL) {
26!
732
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
733
    if (terrno != 0) code = terrno;
×
734
    goto _OVER;
×
735
  }
736
  mndTransSetDbName(pTrans, pDb->name, NULL);
26✔
737
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
26!
738

739
  mndTransSetSerial(pTrans);
26✔
740
  mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
26!
741
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
26!
742
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj), NULL, _OVER);
26!
743
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
26!
744
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj), NULL, _OVER);
26!
745
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
26!
746
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
26!
747
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj), NULL, _OVER);
26!
748
  TAOS_CHECK_GOTO(mndScheduleStream(pMnode, &streamObj, 1685959190000, NULL), NULL, _OVER);
26!
749
  TAOS_CHECK_GOTO(mndPersistStream(pTrans, &streamObj), NULL, _OVER);
26!
750
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
26!
751

752
  mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreate->name,
26!
753
        smaObj.uid, smaObj.stbUid, smaObj.dstTbUid, smaObj.dstTbName, smaObj.dstVgId);
754

755
  code = 0;
26✔
756

757
_OVER:
26✔
758
  tFreeStreamObj(&streamObj);
26✔
759
  mndDestroySmaObj(&smaObj);
26✔
760
  mndTransDrop(pTrans);
26✔
761
  TAOS_RETURN(code);
26✔
762
}
763

764
static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
277✔
765
  int32_t code = TSDB_CODE_MND_INVALID_SMA_OPTION;
277✔
766
  if (pCreate->name[0] == 0) TAOS_RETURN(code);
277!
767
  if (pCreate->stb[0] == 0) TAOS_RETURN(code);
277!
768
  if (pCreate->igExists < 0 || pCreate->igExists > 1) TAOS_RETURN(code);
277!
769
  if (pCreate->intervalUnit < 0) TAOS_RETURN(code);
277!
770
  if (pCreate->slidingUnit < 0) TAOS_RETURN(code);
277!
771
  if (pCreate->timezone < 0) TAOS_RETURN(code);
277!
772
  if (pCreate->interval < 0) TAOS_RETURN(code);
277!
773
  if (pCreate->offset < 0) TAOS_RETURN(code);
277!
774
  if (pCreate->sliding < 0) TAOS_RETURN(code);
277!
775
  if (pCreate->exprLen < 0) TAOS_RETURN(code);
277!
776
  if (pCreate->tagsFilterLen < 0) TAOS_RETURN(code);
277!
777
  if (pCreate->sqlLen < 0) TAOS_RETURN(code);
277!
778
  if (pCreate->astLen < 0) TAOS_RETURN(code);
277!
779
  if (pCreate->exprLen != 0 && strlen(pCreate->expr) + 1 != pCreate->exprLen) TAOS_RETURN(code);
277!
780
  if (pCreate->tagsFilterLen != 0 && strlen(pCreate->tagsFilter) + 1 != pCreate->tagsFilterLen) TAOS_RETURN(code);
277!
781
  if (pCreate->sqlLen != 0 && strlen(pCreate->sql) + 1 != pCreate->sqlLen) TAOS_RETURN(code);
277!
782
  if (pCreate->astLen != 0 && strlen(pCreate->ast) + 1 != pCreate->astLen) TAOS_RETURN(code);
277!
783

784
  SName smaName = {0};
277✔
785
  if (tNameFromString(&smaName, pCreate->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) < 0) return -1;
277!
786
  if (*(char *)tNameGetTableName(&smaName) == 0) return -1;
277!
787

788
  code = 0;
277✔
789
  TAOS_RETURN(code);
277✔
790
}
791

792
static int32_t mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
42✔
793
  SName   n;
794
  int32_t code = tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
42✔
795
  if (TSDB_CODE_SUCCESS != code) {
42!
796
    return code;
×
797
  }
798
  snprintf(streamName, TSDB_TABLE_FNAME_LEN,"%d.%s", n.acctId, n.tname);
42✔
799
  return TSDB_CODE_SUCCESS;
42✔
800
}
801

802
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
27✔
803
  SMnode        *pMnode = pReq->info.node;
27✔
804
  int32_t        code = -1;
27✔
805
  SStbObj       *pStb = NULL;
27✔
806
  SSmaObj       *pSma = NULL;
27✔
807
  SStreamObj    *pStream = NULL;
27✔
808
  SDbObj        *pDb = NULL;
27✔
809
  SMCreateSmaReq createReq = {0};
27✔
810

811
  int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
27✔
812

813
  TAOS_CHECK_GOTO(tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
27!
814

815
#ifdef WINDOWS
816
  terrno = TSDB_CODE_MND_INVALID_PLATFORM;
817
  goto _OVER;
818
#endif
819
  mInfo("sma:%s, start to create", createReq.name);
27!
820
  TAOS_CHECK_GOTO(mndCheckCreateSmaReq(&createReq), NULL, _OVER);
27!
821

822
  pStb = mndAcquireStb(pMnode, createReq.stb);
27✔
823
  if (pStb == NULL) {
27!
824
    mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
825
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
826
    if (terrno != 0) code = terrno;
×
827
    goto _OVER;
×
828
  }
829

830
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
27✔
831
  code = mndGetStreamNameFromSmaName(streamName, createReq.name);
27✔
832
  if (TSDB_CODE_SUCCESS != code) {
27!
833
    goto _OVER;
×
834
  }
835

836
  code = mndAcquireStream(pMnode, streamName, &pStream);
27✔
837
  if (pStream != NULL || code == 0) {
27!
838
    mError("sma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
×
839
    code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
840
    goto _OVER;
×
841
  }
842
  SSIdx idx = {0};
27✔
843
  if ((code = mndAcquireGlobalIdx(pMnode, createReq.name, SDB_SMA, &idx)) == 0) {
27✔
844
    pSma = idx.pIdx;
26✔
845
  } else {
846
    goto _OVER;
1✔
847
  }
848

849
  if (pSma != NULL) {
26!
850
    if (createReq.igExists) {
×
851
      mInfo("sma:%s, already exist in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
852
      code = 0;
×
853
      goto _OVER;
×
854
    } else {
855
      code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
856
      goto _OVER;
×
857
    }
858
  }
859

860
  SName name = {0};
26✔
861
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
26✔
862
  if (TSDB_CODE_SUCCESS != code) {
26!
863
    goto _OVER;
×
864
  }
865
  char db[TSDB_TABLE_FNAME_LEN] = {0};
26✔
866
  (void)tNameGetFullDbName(&name, db);
26✔
867

868
  pDb = mndAcquireDb(pMnode, db);
26✔
869
  if (pDb == NULL) {
26!
870
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
871
    goto _OVER;
×
872
  }
873

874
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
26!
875

876
  code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb, streamName);
26✔
877
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
26!
878

879
_OVER:
×
880
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
27!
881
    mError("sma:%s, failed to create since %s", createReq.name, tstrerror(code));
1!
882
  }
883

884
  mndReleaseStb(pMnode, pStb);
27✔
885
  mndReleaseSma(pMnode, pSma);
27✔
886
  mndReleaseStream(pMnode, pStream);
27✔
887
  mndReleaseDb(pMnode, pDb);
27✔
888
  tFreeSMCreateSmaReq(&createReq);
27✔
889

890
  TAOS_RETURN(code);
27✔
891
}
892

893
static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
208✔
894
  int32_t  code = 0;
208✔
895
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
208✔
896
  if (pRedoRaw == NULL) {
208!
897
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
898
    if (terrno != 0) code = terrno;
×
899
    return -1;
×
900
  }
901
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
208!
902
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
208!
903

904
  return 0;
208✔
905
}
906

907
static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
236✔
908
  int32_t  code = 0;
236✔
909
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
236✔
910
  if (pCommitRaw == NULL) {
236!
911
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
912
    if (terrno != 0) code = terrno;
×
913
    return -1;
×
914
  }
915
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
236!
916
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
236!
917

918
  return 0;
236✔
919
}
920

921
static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
15✔
922
  int32_t  code = 0;
15✔
923
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
15✔
924
  if (pVgRaw == NULL) {
15!
925
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
926
    if (terrno != 0) code = terrno;
×
927
    return -1;
×
928
  }
929
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
15!
930
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPING));
15!
931

932
  return 0;
15✔
933
}
934

935
static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
15✔
936
  int32_t  code = 0;
15✔
937
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
15✔
938
  if (pVgRaw == NULL) {
15!
939
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
940
    if (terrno != 0) code = terrno;
×
941
    return -1;
×
942
  }
943
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
15!
944
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED));
15!
945

946
  return 0;
15✔
947
}
948

949
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
15✔
950
  int32_t    code = 0;
15✔
951
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
15✔
952
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
15✔
953
  if (pDnode == NULL) {
15!
954
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
955
    if (terrno != 0) code = terrno;
×
956
    TAOS_RETURN(code);
×
957
  }
958

959
  STransAction action = {0};
15✔
960
  action.epSet = mndGetDnodeEpset(pDnode);
15✔
961
  mndReleaseDnode(pMnode, pDnode);
15✔
962

963
  int32_t contLen = 0;
15✔
964
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
15✔
965
  if (pReq == NULL) {
15!
966
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
967
    if (terrno != 0) code = terrno;
×
968
    TAOS_RETURN(code);
×
969
  }
970

971
  action.pCont = pReq;
15✔
972
  action.contLen = contLen;
15✔
973
  action.msgType = TDMT_DND_DROP_VNODE;
15✔
974
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
15✔
975

976
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
15!
977
    taosMemoryFree(pReq);
×
978
    TAOS_RETURN(code);
×
979
  }
980

981
  TAOS_RETURN(code);
15✔
982
}
983

984
static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) {
15✔
985
  int32_t     code = -1;
15✔
986
  SVgObj     *pVgroup = NULL;
15✔
987
  SStbObj    *pStb = NULL;
15✔
988
  STrans     *pTrans = NULL;
15✔
989
  SStreamObj *pStream = NULL;
15✔
990

991
  pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
15✔
992
  if (pVgroup == NULL) {
15!
993
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
994
    if (terrno != 0) code = terrno;
×
995
    goto _OVER;
×
996
  }
997

998
  pStb = mndAcquireStb(pMnode, pSma->stb);
15✔
999
  if (pStb == NULL) {
15!
1000
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1001
    if (terrno != 0) code = terrno;
×
1002
    goto _OVER;
×
1003
  }
1004

1005
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-sma");
15✔
1006
  if (pTrans == NULL) {
15!
1007
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1008
    if (terrno != 0) code = terrno;
×
1009
    goto _OVER;
×
1010
  }
1011

1012
  mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
15!
1013
  mndTransSetDbName(pTrans, pDb->name, NULL);
15✔
1014
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
15!
1015

1016
  mndTransSetSerial(pTrans);
15✔
1017

1018
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
15✔
1019
  code = mndGetStreamNameFromSmaName(streamName, pSma->name);
15✔
1020
  if (TSDB_CODE_SUCCESS != code) {
15!
1021
    goto _OVER;
×
1022
  }
1023

1024
  code = mndAcquireStream(pMnode, streamName, &pStream);
15✔
1025
  if (pStream == NULL || pStream->smaId != pSma->uid || code != 0) {
15!
1026
    sdbRelease(pMnode->pSdb, pStream);
×
1027
    goto _OVER;
×
1028
  } else {
1029
    if ((code = mndStreamSetDropAction(pMnode, pTrans, pStream)) < 0) {
15!
1030
      mError("stream:%s, failed to drop task since %s", pStream->name, tstrerror(code));
×
1031
      sdbRelease(pMnode->pSdb, pStream);
×
1032
      goto _OVER;
×
1033
    }
1034

1035
    // drop stream
1036
    if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
15!
1037
      mError("stream:%s, failed to drop log since %s", pStream->name, tstrerror(code));
×
1038
      sdbRelease(pMnode->pSdb, pStream);
×
1039
      goto _OVER;
×
1040
    }
1041
  }
1042
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pMnode, pTrans, pSma), NULL, _OVER);
15!
1043
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
15!
1044
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
15!
1045
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
15!
1046
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
15!
1047
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
15!
1048
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
15!
1049

1050
  code = 0;
15✔
1051

1052
_OVER:
15✔
1053
  mndTransDrop(pTrans);
15✔
1054
  mndReleaseStream(pMnode, pStream);
15✔
1055
  mndReleaseVgroup(pMnode, pVgroup);
15✔
1056
  mndReleaseStb(pMnode, pStb);
15✔
1057
  TAOS_RETURN(code);
15✔
1058
}
1059

1060
int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
836✔
1061
  SSdb    *pSdb = pMnode->pSdb;
836✔
1062
  SSmaObj *pSma = NULL;
836✔
1063
  void    *pIter = NULL;
836✔
1064
  SVgObj  *pVgroup = NULL;
836✔
1065
  int32_t  code = -1;
836✔
1066

1067
  while (1) {
1068
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
970✔
1069
    if (pIter == NULL) break;
970✔
1070

1071
    if (pSma->stbUid == pStb->uid) {
134!
1072
      mndTransSetSerial(pTrans);
×
1073
      pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
1074
      if (pVgroup == NULL) {
×
1075
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1076
        if (terrno != 0) code = terrno;
×
1077
        goto _OVER;
×
1078
      }
1079

1080
      char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1081
      code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
1082
      if (TSDB_CODE_SUCCESS != code) {
×
1083
        goto _OVER;
×
1084
      }
1085

1086
      SStreamObj *pStream = NULL;
×
1087
      code = mndAcquireStream(pMnode, streamName, &pStream);
×
1088
      if ((pStream != NULL && pStream->smaId == pSma->uid) || code != 0) {
×
1089
        if ((code = mndStreamSetDropAction(pMnode, pTrans, pStream)) < 0) {
×
1090
          mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
×
1091
          mndReleaseStream(pMnode, pStream);
×
1092
          goto _OVER;
×
1093
        }
1094

1095
        if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
1096
          mndReleaseStream(pMnode, pStream);
×
1097
          goto _OVER;
×
1098
        }
1099

1100
        mndReleaseStream(pMnode, pStream);
×
1101
      }
1102

1103
      TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
1104
      TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
1105
      TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
1106
      mndReleaseVgroup(pMnode, pVgroup);
×
1107
      pVgroup = NULL;
×
1108
    }
1109

1110
    sdbRelease(pSdb, pSma);
134✔
1111
  }
1112

1113
  code = 0;
836✔
1114

1115
_OVER:
836✔
1116
  sdbCancelFetch(pSdb, pIter);
836✔
1117
  sdbRelease(pSdb, pSma);
836✔
1118
  mndReleaseVgroup(pMnode, pVgroup);
836✔
1119
  TAOS_RETURN(code);
836✔
1120
}
1121

1122
int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,576✔
1123
  int32_t code = 0;
1,576✔
1124
  SSdb   *pSdb = pMnode->pSdb;
1,576✔
1125
  void   *pIter = NULL;
1,576✔
1126

1127
  while (1) {
43✔
1128
    SSmaObj *pSma = NULL;
1,619✔
1129
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
1,619✔
1130
    if (pIter == NULL) break;
1,619✔
1131

1132
    if (pSma->dbUid == pDb->uid) {
43✔
1133
      if ((code = mndSetDropSmaCommitLogs(pMnode, pTrans, pSma)) != 0) {
28!
1134
        sdbRelease(pSdb, pSma);
×
1135
        sdbCancelFetch(pSdb, pSma);
×
1136
        TAOS_RETURN(code);
×
1137
      }
1138
    }
1139

1140
    sdbRelease(pSdb, pSma);
43✔
1141
  }
1142

1143
  TAOS_RETURN(code);
1,576✔
1144
}
1145

1146
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
22✔
1147
  SMnode      *pMnode = pReq->info.node;
22✔
1148
  int32_t      code = -1;
22✔
1149
  SDbObj      *pDb = NULL;
22✔
1150
  SSmaObj     *pSma = NULL;
22✔
1151
  SMDropSmaReq dropReq = {0};
22✔
1152

1153
  TAOS_CHECK_GOTO(tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
22!
1154

1155
  mInfo("sma:%s, start to drop", dropReq.name);
22!
1156

1157
  SSIdx idx = {0};
22✔
1158
  if ((code = mndAcquireGlobalIdx(pMnode, dropReq.name, SDB_SMA, &idx)) == 0) {
22✔
1159
    pSma = idx.pIdx;
18✔
1160
  } else {
1161
    goto _OVER;
4✔
1162
  }
1163
  if (pSma == NULL) {
18✔
1164
    if (dropReq.igNotExists) {
3!
1165
      mInfo("sma:%s, not exist, ignore not exist is set", dropReq.name);
×
1166
      code = 0;
×
1167
      goto _OVER;
×
1168
    } else {
1169
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
3✔
1170
      goto _OVER;
3✔
1171
    }
1172
  }
1173

1174
  SName name = {0};
15✔
1175
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
15✔
1176
  if (TSDB_CODE_SUCCESS != code) {
15!
1177
    goto _OVER;
×
1178
  }
1179
  char db[TSDB_TABLE_FNAME_LEN] = {0};
15✔
1180
  (void)tNameGetFullDbName(&name, db);
15✔
1181

1182
  pDb = mndAcquireDb(pMnode, db);
15✔
1183
  if (pDb == NULL) {
15!
1184
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1185
    goto _OVER;
×
1186
  }
1187

1188
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
15!
1189

1190
  code = mndDropSma(pMnode, pReq, pDb, pSma);
15✔
1191
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
15!
1192

1193
_OVER:
×
1194
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
22!
1195
    mError("sma:%s, failed to drop since %s", dropReq.name, tstrerror(code));
7!
1196
  }
1197

1198
  mndReleaseSma(pMnode, pSma);
22✔
1199
  mndReleaseDb(pMnode, pDb);
22✔
1200
  TAOS_RETURN(code);
22✔
1201
}
1202

1203
static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
×
1204
  int32_t  code = -1;
×
1205
  SSmaObj *pSma = NULL;
×
1206

1207
  SSIdx idx = {0};
×
1208
  if (0 == mndAcquireGlobalIdx(pMnode, indexReq->indexFName, SDB_SMA, &idx)) {
×
1209
    pSma = idx.pIdx;
×
1210
  } else {
1211
    *exist = false;
×
1212
    return 0;
×
1213
  }
1214

1215
  if (pSma == NULL) {
×
1216
    *exist = false;
×
1217
    return 0;
×
1218
  }
1219

1220
  memcpy(rsp->dbFName, pSma->db, sizeof(pSma->db));
×
1221
  memcpy(rsp->tblFName, pSma->stb, sizeof(pSma->stb));
×
1222
  tstrncpy(rsp->indexType, TSDB_INDEX_TYPE_SMA, TSDB_INDEX_TYPE_LEN);
×
1223

1224
  SNodeList *pList = NULL;
×
1225
  int32_t    extOffset = 0;
×
1226
  code = nodesStringToList(pSma->expr, &pList);
×
1227
  if (0 == code) {
×
1228
    SNode *node = NULL;
×
1229
    FOREACH(node, pList) {
×
1230
      SFunctionNode *pFunc = (SFunctionNode *)node;
×
1231
      extOffset += tsnprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
×
1232
                             (extOffset ? "," : ""), pFunc->functionName);
×
1233
    }
1234

1235
    *exist = true;
×
1236
  }
1237

1238
  mndReleaseSma(pMnode, pSma);
×
1239
  return code;
×
1240
}
1241

1242
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) {
9,006✔
1243
  int32_t         code = 0;
9,006✔
1244
  SSmaObj        *pSma = NULL;
9,006✔
1245
  SSdb           *pSdb = pMnode->pSdb;
9,006✔
1246
  void           *pIter = NULL;
9,006✔
1247
  STableIndexInfo info;
1248

1249
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
9,006✔
1250
  if (NULL == pStb) {
9,006✔
1251
    *exist = false;
5,806✔
1252
    return TSDB_CODE_SUCCESS;
5,806✔
1253
  }
1254

1255
  tstrncpy(rsp->dbFName, pStb->db, TSDB_DB_FNAME_LEN);
3,200✔
1256
  tstrncpy(rsp->tbName, pStb->name + strlen(pStb->db) + 1, TSDB_TABLE_NAME_LEN);
3,200✔
1257
  rsp->suid = pStb->uid;
3,200✔
1258
  rsp->version = pStb->smaVer;
3,200✔
1259
  mndReleaseStb(pMnode, pStb);
3,200✔
1260

1261
  while (1) {
2,146✔
1262
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
5,346✔
1263
    if (pIter == NULL) break;
5,346✔
1264

1265
    if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
5,345!
1266
      sdbRelease(pSdb, pSma);
2,144✔
1267
      continue;
2,144✔
1268
    }
1269

1270
    info.intervalUnit = pSma->intervalUnit;
3,201✔
1271
    info.slidingUnit = pSma->slidingUnit;
3,201✔
1272
    info.interval = pSma->interval;
3,201✔
1273
    info.offset = pSma->offset;
3,201✔
1274
    info.sliding = pSma->sliding;
3,201✔
1275
    info.dstTbUid = pSma->dstTbUid;
3,201✔
1276
    info.dstVgId = pSma->dstVgId;
3,201✔
1277

1278
    SVgObj *pVg = mndAcquireVgroup(pMnode, pSma->dstVgId);
3,201✔
1279
    if (pVg == NULL) {
3,201✔
1280
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
3,199✔
1281
      if (terrno != 0) code = terrno;
3,199!
1282
      sdbRelease(pSdb, pSma);
3,199✔
1283
      sdbCancelFetch(pSdb, pIter);
3,199✔
1284
      return code;
3,199✔
1285
    }
1286
    info.epSet = mndGetVgroupEpset(pMnode, pVg);
2✔
1287

1288
    info.expr = taosMemoryMalloc(pSma->exprLen + 1);
2!
1289
    if (info.expr == NULL) {
2!
1290
      code = terrno;
×
1291
      sdbRelease(pSdb, pSma);
×
1292
      sdbCancelFetch(pSdb, pIter);
×
1293
      return code;
×
1294
    }
1295

1296
    memcpy(info.expr, pSma->expr, pSma->exprLen);
2✔
1297
    info.expr[pSma->exprLen] = 0;
2✔
1298

1299
    if (NULL == taosArrayPush(rsp->pIndex, &info)) {
4!
1300
      code = terrno;
×
1301
      taosMemoryFree(info.expr);
×
1302
      sdbRelease(pSdb, pSma);
×
1303
      sdbCancelFetch(pSdb, pIter);
×
1304
      return code;
×
1305
    }
1306

1307
    rsp->indexSize += sizeof(info) + pSma->exprLen + 1;
2✔
1308
    *exist = true;
2✔
1309

1310
    sdbRelease(pSdb, pSma);
2✔
1311
  }
1312

1313
  TAOS_RETURN(code);
1✔
1314
}
1315

1316
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
×
1317
  SUserIndexReq indexReq = {0};
×
1318
  SMnode       *pMnode = pReq->info.node;
×
1319
  int32_t       code = -1;
×
1320
  SUserIndexRsp rsp = {0};
×
1321
  bool          exist = false;
×
1322

1323
  TAOS_CHECK_GOTO(tDeserializeSUserIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
1324

1325
  code = mndGetSma(pMnode, &indexReq, &rsp, &exist);
×
1326
  if (code) {
×
1327
    goto _OVER;
×
1328
  }
1329

1330
  if (!exist) {
×
1331
    // TODO GET INDEX FROM FULLTEXT
1332
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
1333
  } else {
1334
    int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
×
1335
    void   *pRsp = rpcMallocCont(contLen);
×
1336
    if (pRsp == NULL) {
×
1337
      code = terrno;
×
1338
      goto _OVER;
×
1339
    }
1340

1341
    contLen = tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
×
1342
    if (contLen < 0) {
×
1343
      code = terrno;
×
1344
      goto _OVER;
×
1345
    }
1346

1347
    pReq->info.rsp = pRsp;
×
1348
    pReq->info.rspLen = contLen;
×
1349

1350
    code = 0;
×
1351
  }
1352

1353
_OVER:
×
1354
  if (code != 0) {
×
1355
    mError("failed to get index %s since %s", indexReq.indexFName, tstrerror(code));
×
1356
  }
1357

1358
  TAOS_RETURN(code);
×
1359
}
1360

1361
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
9,006✔
1362
  STableIndexReq indexReq = {0};
9,006✔
1363
  SMnode        *pMnode = pReq->info.node;
9,006✔
1364
  int32_t        code = -1;
9,006✔
1365
  STableIndexRsp rsp = {0};
9,006✔
1366
  bool           exist = false;
9,006✔
1367

1368
  TAOS_CHECK_GOTO(tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
9,006!
1369

1370
  rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
9,006✔
1371
  if (NULL == rsp.pIndex) {
9,006!
1372
    code = terrno;
×
1373
    goto _OVER;
×
1374
  }
1375

1376
  code = mndGetTableSma(pMnode, indexReq.tbFName, &rsp, &exist);
9,006✔
1377
  if (code) {
9,006✔
1378
    goto _OVER;
3,199✔
1379
  }
1380

1381
  if (!exist) {
5,807✔
1382
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
5,806✔
1383
  } else {
1384
    int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp);
1✔
1385
    void   *pRsp = rpcMallocCont(contLen);
1✔
1386
    if (pRsp == NULL) {
1!
1387
      code = terrno;
×
1388
      goto _OVER;
×
1389
    }
1390

1391
    contLen = tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
1✔
1392
    if (contLen < 0) {
1!
1393
      code = terrno;
×
1394
      goto _OVER;
×
1395
    }
1396

1397
    pReq->info.rsp = pRsp;
1✔
1398
    pReq->info.rspLen = contLen;
1✔
1399

1400
    code = 0;
1✔
1401
  }
1402

1403
_OVER:
9,006✔
1404
  if (code != 0) {
9,006✔
1405
    mError("failed to get table index %s since %s", indexReq.tbFName, tstrerror(code));
9,005!
1406
  }
1407

1408
  tFreeSerializeSTableIndexRsp(&rsp);
9,006✔
1409
  TAOS_RETURN(code);
9,006✔
1410
}
1411

1412
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
5,050✔
1413
  SMnode  *pMnode = pReq->info.node;
5,050✔
1414
  SSdb    *pSdb = pMnode->pSdb;
5,050✔
1415
  int32_t  numOfRows = 0;
5,050✔
1416
  SSmaObj *pSma = NULL;
5,050✔
1417
  int32_t  cols = 0;
5,050✔
1418
  int32_t  code = 0;
5,050✔
1419

1420
  SDbObj *pDb = NULL;
5,050✔
1421
  if (strlen(pShow->db) > 0) {
5,050✔
1422
    pDb = mndAcquireDb(pMnode, pShow->db);
12✔
1423
    if (pDb == NULL) return 0;
12!
1424
  }
1425
  SSmaAndTagIter *pIter = pShow->pIter;
5,050✔
1426
  while (numOfRows < rows) {
5,058✔
1427
    pIter->pSmaIter = sdbFetch(pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
5,055✔
1428
    if (pIter->pSmaIter == NULL) break;
5,071✔
1429

1430
    if (NULL != pDb && pSma->dbUid != pDb->uid) {
5!
1431
      sdbRelease(pSdb, pSma);
×
1432
      continue;
×
1433
    }
1434

1435
    cols = 0;
5✔
1436

1437
    SName smaName = {0};
5✔
1438
    SName stbName = {0};
5✔
1439
    char  n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
5✔
1440
    char  n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
5✔
1441
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
5✔
1442
    char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
1443
    if (TSDB_CODE_SUCCESS == code) {
8!
1444
      STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
8✔
1445
      STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db));
8✔
1446
      code = tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
8✔
1447
    }
1448
    SColumnInfoData *pColInfo = NULL;
8✔
1449
    if (TSDB_CODE_SUCCESS == code) {
8!
1450
      STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
8✔
1451

1452
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1453
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
8✔
1454
    }
1455
    if (TSDB_CODE_SUCCESS == code) {
8!
1456
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1457
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
8✔
1458
    }
1459
    if (TSDB_CODE_SUCCESS == code) {
8!
1460
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1461
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
8✔
1462
    }
1463
    if (TSDB_CODE_SUCCESS == code) {
8!
1464
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1465
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
8✔
1466
    }
1467
    if (TSDB_CODE_SUCCESS == code) {
8!
1468
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1469
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
8✔
1470
    }
1471

1472
    char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
1473
    STR_TO_VARSTR(col, (char *)"");
8✔
1474

1475
    if (TSDB_CODE_SUCCESS == code) {
8!
1476
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1477
      code = colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
8✔
1478
    }
1479

1480
    if (TSDB_CODE_SUCCESS == code) {
8!
1481
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
8✔
1482

1483
      char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8✔
1484
      STR_TO_VARSTR(tag, (char *)"sma_index");
8✔
1485
      code = colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
8✔
1486
    }
1487

1488
    numOfRows++;
8✔
1489
    sdbRelease(pSdb, pSma);
8✔
1490
    if (TSDB_CODE_SUCCESS != code) {
8!
1491
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
1492
      numOfRows = -1;
×
1493
      break;
×
1494
    }
1495
  }
1496

1497
  mndReleaseDb(pMnode, pDb);
5,069✔
1498
  pShow->numOfRows += numOfRows;
5,057✔
1499
  return numOfRows;
5,057✔
1500
}
1501

1502
// sma and tag index comm func
1503
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
22✔
1504
  int ret = mndProcessDropSmaReq(pReq);
22✔
1505
  if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST) {
22✔
1506
    terrno = 0;
4✔
1507
    ret = mndProcessDropTagIdxReq(pReq);
4✔
1508
  }
1509
  return ret;
22✔
1510
}
1511

1512
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
5,052✔
1513
  if (pShow->pIter == NULL) {
5,052!
1514
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
5,061!
1515
  }
1516
  if (!pShow->pIter) {
5,061!
1517
    return terrno;
×
1518
  }
1519
  int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
5,061✔
1520
  if (read < rows) {
5,062!
1521
    read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read);
5,062✔
1522
  }
1523
  // no more to read
1524
  if (read < rows) {
5,070!
1525
    taosMemoryFree(pShow->pIter);
5,070!
1526
    pShow->pIter = NULL;
5,070✔
1527
  }
1528
  return read;
5,070✔
1529
}
1530
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
×
1531
  SSmaAndTagIter *p = pIter;
×
1532
  if (p != NULL) {
×
1533
    SSdb *pSdb = pMnode->pSdb;
×
1534
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
1535
    sdbCancelFetchByType(pSdb, p->pIdxIter, SDB_IDX);
×
1536
  }
1537
  taosMemoryFree(p);
×
1538
}
×
1539

1540
static void initSMAObj(SCreateTSMACxt *pCxt) {
235✔
1541
  memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
235✔
1542
  memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
235✔
1543
  memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
235✔
1544
  if (pCxt->pBaseSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pBaseSma->name, TSDB_TABLE_FNAME_LEN);
235✔
1545
  pCxt->pSma->createdTime = taosGetTimestampMs();
235✔
1546
  pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
235✔
1547

1548
  memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
235✔
1549
  pCxt->pSma->dstTbUid = 0;  // not used
235✔
1550
  pCxt->pSma->stbUid = pCxt->pSrcStb ? pCxt->pSrcStb->uid : pCxt->pCreateSmaReq->normSourceTbUid;
235✔
1551
  pCxt->pSma->dbUid = pCxt->pDb->uid;
235✔
1552
  pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
235✔
1553
  pCxt->pSma->intervalUnit = pCxt->pCreateSmaReq->intervalUnit;
235✔
1554
//  pCxt->pSma->timezone = taosGetLocalTimezoneOffset();
1555
  pCxt->pSma->version = 1;
235✔
1556

1557
  pCxt->pSma->exprLen = pCxt->pCreateSmaReq->exprLen;
235✔
1558
  pCxt->pSma->sqlLen = pCxt->pCreateSmaReq->sqlLen;
235✔
1559
  pCxt->pSma->astLen = pCxt->pCreateSmaReq->astLen;
235✔
1560
  pCxt->pSma->expr = pCxt->pCreateSmaReq->expr;
235✔
1561
  pCxt->pSma->sql = pCxt->pCreateSmaReq->sql;
235✔
1562
  pCxt->pSma->ast = pCxt->pCreateSmaReq->ast;
235✔
1563
}
235✔
1564

1565
static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
235✔
1566
  tstrncpy(pCxt->pCreateStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
235✔
1567
  tstrncpy(pCxt->pCreateStreamReq->sourceDB, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
235✔
1568
  tstrncpy(pCxt->pCreateStreamReq->targetStbFullName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
235✔
1569
  pCxt->pCreateStreamReq->igExists = false;
235✔
1570
  pCxt->pCreateStreamReq->triggerType = STREAM_TRIGGER_MAX_DELAY;
235✔
1571
  pCxt->pCreateStreamReq->igExpired = false;
235✔
1572
  pCxt->pCreateStreamReq->fillHistory = STREAM_FILL_HISTORY_ON;
235✔
1573
  pCxt->pCreateStreamReq->maxDelay = 10000;
235✔
1574
  pCxt->pCreateStreamReq->watermark = 0;
235✔
1575
  pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb ? pCxt->pSrcStb->numOfTags + 1 : 1;
235✔
1576
  pCxt->pCreateStreamReq->checkpointFreq = 0;
235✔
1577
  pCxt->pCreateStreamReq->createStb = 1;
235✔
1578
  pCxt->pCreateStreamReq->targetStbUid = 0;
235✔
1579
  pCxt->pCreateStreamReq->fillNullCols = NULL;
235✔
1580
  pCxt->pCreateStreamReq->igUpdate = 0;
235✔
1581
  pCxt->pCreateStreamReq->deleteMark = pCxt->pCreateSmaReq->deleteMark;
235✔
1582
  pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
235✔
1583
  pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid;
235✔
1584
  pCxt->pCreateStreamReq->ast = taosStrdup(pCxt->pCreateSmaReq->ast);
235!
1585
  if (!pCxt->pCreateStreamReq->ast) {
235!
1586
    return terrno;
×
1587
  }
1588
  pCxt->pCreateStreamReq->sql = taosStrdup(pCxt->pCreateSmaReq->sql);
235!
1589
  if (!pCxt->pCreateStreamReq->sql) {
235!
1590
    return terrno;
×
1591
  }
1592

1593
  // construct tags
1594
  pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pCreateStreamReq->numOfTags, sizeof(SField));
235✔
1595
  if (!pCxt->pCreateStreamReq->pTags) {
235!
1596
    return terrno;
×
1597
  }
1598
  SField  f = {0};
235✔
1599
  int32_t code = 0;
235✔
1600
  if (pCxt->pSrcStb) {
235✔
1601
    for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) {
1,400✔
1602
      SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
1,180✔
1603
      f.bytes = pSchema->bytes;
1,180✔
1604
      f.type = pSchema->type;
1,180✔
1605
      f.flags = pSchema->flags;
1,180✔
1606
      tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN);
1,180✔
1607
      if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
2,360!
1608
        code = terrno;
×
1609
        break;
×
1610
      }
1611
    }
1612
  }
1613

1614
  if (TSDB_CODE_SUCCESS == code) {
235!
1615
    f.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
235✔
1616
    f.flags = COL_SMA_ON;
235✔
1617
    f.type = TSDB_DATA_TYPE_BINARY;
235✔
1618
    tstrncpy(f.name, "tbname", strlen("tbname") + 1);
235✔
1619
    if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
470!
1620
      code = terrno;
×
1621
    }
1622
  }
1623

1624
  if (TSDB_CODE_SUCCESS == code) {
235!
1625
    // construct output cols
1626
    SNode *pNode;
1627
    FOREACH(pNode, pCxt->pProjects) {
42,480!
1628
      SExprNode *pExprNode = (SExprNode *)pNode;
42,245✔
1629
      f.bytes = pExprNode->resType.bytes;
42,245✔
1630
      f.type = pExprNode->resType.type;
42,245✔
1631
      f.flags = COL_SMA_ON;
42,245✔
1632
      tstrncpy(f.name, pExprNode->userAlias, TSDB_COL_NAME_LEN);
42,245✔
1633
      if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pCols, &f)) {
84,490!
1634
        code = terrno;
×
1635
        break;
×
1636
      }
1637
    }
1638
  }
1639
  return code;
235✔
1640
}
1641

1642
static int32_t mndCreateTSMABuildDropStreamReq(SCreateTSMACxt *pCxt) {
428✔
1643
  tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
428✔
1644
  pCxt->pDropStreamReq->igNotExists = false;
428✔
1645
  pCxt->pDropStreamReq->sql = taosStrdup(pCxt->pDropSmaReq->name);
428!
1646
  if (!pCxt->pDropStreamReq->sql) {
428!
1647
    return terrno;
×
1648
  }
1649
  pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
428✔
1650
  return TSDB_CODE_SUCCESS;
428✔
1651
}
1652

1653
static int32_t mndSetUpdateDbTsmaVersionPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
428✔
1654
  int32_t  code = 0;
428✔
1655
  SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
428✔
1656
  if (pRedoRaw == NULL) {
428!
1657
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1658
    if (terrno != 0) code = terrno;
×
1659
    TAOS_RETURN(code);
×
1660
  }
1661
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
428!
1662
    sdbFreeRaw(pRedoRaw);
×
1663
    TAOS_RETURN(code);
×
1664
  }
1665

1666
  TAOS_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
428✔
1667
}
1668

1669
static int32_t mndSetUpdateDbTsmaVersionCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
428✔
1670
  int32_t  code = 0;
428✔
1671
  SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
428✔
1672
  if (pCommitRaw == NULL) {
428!
1673
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1674
    if (terrno != 0) code = terrno;
×
1675
    TAOS_RETURN(code);
×
1676
  }
1677
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
428!
1678
    sdbFreeRaw(pCommitRaw);
×
1679
    TAOS_RETURN(code);
×
1680
  }
1681

1682
  TAOS_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
428✔
1683
}
1684

1685
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt *pCxt) {
235✔
1686
  int32_t      code = -1;
235✔
1687
  STransAction createStreamRedoAction = {0};
235✔
1688
  STransAction createStreamUndoAction = {0};
235✔
1689
  STransAction dropStbUndoAction = {0};
235✔
1690
  SMDropStbReq dropStbReq = {0};
235✔
1691
  STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
235✔
1692
  if (!pTrans) {
235!
1693
    code = terrno;
×
1694
    goto _OVER;
×
1695
  }
1696
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
235✔
1697
  TAOS_CHECK_GOTO(mndTransCheckConflict(pCxt->pMnode, pTrans), NULL, _OVER);
235!
1698

1699
  mndTransSetSerial(pTrans);
235✔
1700
  mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name,
235!
1701
        pCxt->pCreateStreamReq->name);
1702

1703
  mndGetMnodeEpSet(pCxt->pMnode, &createStreamRedoAction.epSet);
235✔
1704
  createStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
235✔
1705
  createStreamRedoAction.msgType = TDMT_STREAM_CREATE;
235✔
1706
  createStreamRedoAction.contLen = tSerializeSCMCreateStreamReq(0, 0, pCxt->pCreateStreamReq);
235✔
1707
  createStreamRedoAction.pCont = taosMemoryCalloc(1, createStreamRedoAction.contLen);
235!
1708
  if (!createStreamRedoAction.pCont) {
235!
1709
    code = terrno;
×
1710
    goto _OVER;
×
1711
  }
1712
  if (createStreamRedoAction.contLen != tSerializeSCMCreateStreamReq(createStreamRedoAction.pCont,
235!
1713
                                                                     createStreamRedoAction.contLen,
1714
                                                                     pCxt->pCreateStreamReq)) {
235✔
1715
    mError("sma: %s, failed to create due to create stream req encode failure", pCxt->pCreateSmaReq->name);
×
1716
    code = TSDB_CODE_INVALID_MSG;
×
1717
    goto _OVER;
×
1718
  }
1719

1720
  createStreamUndoAction.epSet = createStreamRedoAction.epSet;
235✔
1721
  createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
235✔
1722
  createStreamUndoAction.msgType = TDMT_STREAM_DROP;
235✔
1723
  createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
235✔
1724
  createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
235!
1725
  if (!createStreamUndoAction.pCont) {
235!
1726
    code = terrno;
×
1727
    goto _OVER;
×
1728
  }
1729
  if (createStreamUndoAction.contLen !=
235!
1730
      tSerializeSMDropStreamReq(createStreamUndoAction.pCont, createStreamUndoAction.contLen, pCxt->pDropStreamReq)) {
235✔
1731
    mError("sma: %s, failed to create due to drop stream req encode failure", pCxt->pCreateSmaReq->name);
×
1732
    code = TSDB_CODE_INVALID_MSG;
×
1733
    goto _OVER;
×
1734
  }
1735

1736
  dropStbReq.igNotExists = true;
235✔
1737
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
235✔
1738
  dropStbUndoAction.epSet = createStreamRedoAction.epSet;
235✔
1739
  dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
235✔
1740
  dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
235✔
1741
  dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
235✔
1742
  dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
235✔
1743
  dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
235!
1744
  if (!dropStbUndoAction.pCont) {
235!
1745
    code = terrno;
×
1746
    goto _OVER;
×
1747
  }
1748
  if (dropStbUndoAction.contLen !=
235!
1749
      tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
235✔
1750
    mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
×
1751
    code = TSDB_CODE_INVALID_MSG;
×
1752
    goto _OVER;
×
1753
  }
1754

1755
  SDbObj newDb = {0};
235✔
1756
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
235✔
1757
  newDb.tsmaVersion++;
235✔
1758
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
235!
1759
  TAOS_CHECK_GOTO(mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
235!
1760
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
235!
1761
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &createStreamRedoAction), NULL, _OVER);
235!
1762
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &createStreamUndoAction), NULL, _OVER);
235!
1763
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &dropStbUndoAction), NULL, _OVER);
235!
1764
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
235!
1765
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
235!
1766
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
235!
1767

1768
  code = TSDB_CODE_SUCCESS;
235✔
1769

1770
_OVER:
235✔
1771
  mndTransDrop(pTrans);
235✔
1772
  TAOS_RETURN(code);
235✔
1773
}
1774

1775
static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
235✔
1776
  int32_t            code = 0;
235✔
1777
  SSmaObj            sma = {0};
235✔
1778
  SCMCreateStreamReq createStreamReq = {0};
235✔
1779
  SMDropStreamReq    dropStreamReq = {0};
235✔
1780

1781
  pCxt->pSma = &sma;
235✔
1782
  initSMAObj(pCxt);
235✔
1783

1784
  SNodeList *pProjects = NULL;
235✔
1785
  code = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects);
235✔
1786
  if (TSDB_CODE_SUCCESS != code) {
235!
1787
    goto _OVER;
×
1788
  }
1789
  pCxt->pProjects = pProjects;
235✔
1790

1791
  pCxt->pCreateStreamReq = &createStreamReq;
235✔
1792
  if (pCxt->pCreateSmaReq->pVgroupVerList) {
235✔
1793
    pCxt->pCreateStreamReq->pVgroupVerList = taosArrayDup(pCxt->pCreateSmaReq->pVgroupVerList, NULL);
210✔
1794
    if (!pCxt->pCreateStreamReq->pVgroupVerList) {
210!
1795
      code = terrno;
×
1796
      goto _OVER;
×
1797
    }
1798
  }
1799
  if (LIST_LENGTH(pProjects) > 0) {
235!
1800
    createStreamReq.pCols = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SField));
235!
1801
    if (!createStreamReq.pCols) {
235!
1802
      code = terrno;
×
1803
      goto _OVER;
×
1804
    }
1805
  }
1806
  pCxt->pDropStreamReq = &dropStreamReq;
235✔
1807
  code = mndCreateTSMABuildCreateStreamReq(pCxt);
235✔
1808
  if (TSDB_CODE_SUCCESS != code) {
235!
1809
    goto _OVER;
×
1810
  }
1811
  code = mndCreateTSMABuildDropStreamReq(pCxt);
235✔
1812
  if (TSDB_CODE_SUCCESS != code) {
235!
1813
    goto _OVER;
×
1814
  }
1815

1816
  if (TSDB_CODE_SUCCESS != (code = mndCreateTSMATxnPrepare(pCxt))) {
235!
1817
    goto _OVER;
×
1818
  } else {
1819
    mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 " dstTb:%s dstVg:%d", pCxt->pCreateSmaReq->name, sma.uid,
235!
1820
          sma.stbUid, sma.dstTbName, sma.dstVgId);
1821
    code = 0;
235✔
1822
  }
1823

1824
_OVER:
235✔
1825
  tFreeSCMCreateStreamReq(pCxt->pCreateStreamReq);
235✔
1826
  if (pCxt->pDropStreamReq) tFreeMDropStreamReq(pCxt->pDropStreamReq);
235!
1827
  pCxt->pCreateStreamReq = NULL;
235✔
1828
  if (pProjects) nodesDestroyList(pProjects);
235!
1829
  pCxt->pProjects = NULL;
235✔
1830
  TAOS_RETURN(code);
235✔
1831
}
1832

1833
static int32_t mndTSMAGenerateOutputName(const char *tsmaName, char *streamName, char *targetStbName) {
448✔
1834
  SName   smaName;
1835
  int32_t code = tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
448✔
1836
  if (TSDB_CODE_SUCCESS != code) {
448!
1837
    return code;
×
1838
  }
1839
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
448✔
1840
  snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s"TSMA_RES_STB_POSTFIX, tsmaName);
448✔
1841
  return TSDB_CODE_SUCCESS;
448✔
1842
}
1843

1844
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq) {
250✔
1845
#ifdef WINDOWS
1846
  TAOS_RETURN(TSDB_CODE_MND_INVALID_PLATFORM);
1847
#endif
1848
  SMnode        *pMnode = pReq->info.node;
250✔
1849
  int32_t        code = -1;
250✔
1850
  SDbObj        *pDb = NULL;
250✔
1851
  SStbObj       *pStb = NULL;
250✔
1852
  SSmaObj       *pSma = NULL;
250✔
1853
  SSmaObj       *pBaseTsma = NULL;
250✔
1854
  SStreamObj    *pStream = NULL;
250✔
1855
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
250✔
1856
  SMCreateSmaReq createReq = {0};
250✔
1857

1858
  if (sdbGetSize(pMnode->pSdb, SDB_SMA) >= tsMaxTsmaNum) {
250!
1859
    code = TSDB_CODE_MND_MAX_TSMA_NUM_EXCEEDED;
×
1860
    goto _OVER;
×
1861
  }
1862

1863
  if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
250!
1864
    code = TSDB_CODE_INVALID_MSG;
×
1865
    goto _OVER;
×
1866
  }
1867

1868
  mInfo("start to create tsma: %s", createReq.name);
250!
1869
  if ((code = mndCheckCreateSmaReq(&createReq)) != 0) goto _OVER;
250!
1870

1871
  if (createReq.normSourceTbUid == 0) {
250✔
1872
    pStb = mndAcquireStb(pMnode, createReq.stb);
235✔
1873
    if (!pStb && !createReq.recursiveTsma) {
235!
1874
      mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
1875
      code = TSDB_CODE_MND_STB_NOT_EXIST;
×
1876
      goto _OVER;
×
1877
    }
1878
  }
1879

1880
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
250✔
1881
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
250✔
1882
  code = mndTSMAGenerateOutputName(createReq.name, streamName, streamTargetStbFullName);
250✔
1883
  if (TSDB_CODE_SUCCESS != code) {
250!
1884
    mInfo("tsma:%s, faield to generate name", createReq.name);
×
1885
    goto _OVER;
×
1886
  }
1887

1888
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.name);
250✔
1889
  if (pSma && createReq.igExists) {
250!
1890
    mInfo("tsma:%s, already exists in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
1891
    code = 0;
×
1892
    goto _OVER;
×
1893
  }
1894

1895
  if (pSma) {
250✔
1896
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
10✔
1897
    goto _OVER;
10✔
1898
  }
1899

1900
  SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName);
240✔
1901
  if (pTargetStb) {
240!
1902
    code = TSDB_CODE_TDB_STB_ALREADY_EXIST;
×
1903
    mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name,
×
1904
           streamTargetStbFullName);
1905
    goto _OVER;
×
1906
  }
1907

1908
  code = mndAcquireStream(pMnode, streamName, &pStream);
240✔
1909
  if (pStream != NULL || code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
240!
1910
    mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
5!
1911
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
5✔
1912
    goto _OVER;
5✔
1913
  }
1914

1915
  SName name = {0};
235✔
1916
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
235✔
1917
  if (TSDB_CODE_SUCCESS != code) {
235!
1918
    goto _OVER;
×
1919
  }
1920
  char db[TSDB_TABLE_FNAME_LEN] = {0};
235✔
1921
  (void)tNameGetFullDbName(&name, db);
235✔
1922

1923
  pDb = mndAcquireDb(pMnode, db);
235✔
1924
  if (pDb == NULL) {
235!
1925
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1926
    goto _OVER;
×
1927
  }
1928

1929
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
235!
1930

1931
  if (createReq.recursiveTsma) {
235✔
1932
    pBaseTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName);
77✔
1933
    if (!pBaseTsma) {
77!
1934
      mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName);
×
1935
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1936
      goto _OVER;
×
1937
    }
1938
    if (!pStb) {
77!
1939
      createReq.normSourceTbUid = pBaseTsma->stbUid;
×
1940
    }
1941
  }
1942

1943
  SCreateTSMACxt cxt = {
235✔
1944
      .pMnode = pMnode,
1945
      .pCreateSmaReq = &createReq,
1946
      .pCreateStreamReq = NULL,
1947
      .streamName = streamName,
1948
      .targetStbFullName = streamTargetStbFullName,
1949
      .pDb = pDb,
1950
      .pRpcReq = pReq,
1951
      .pSma = NULL,
1952
      .pBaseSma = pBaseTsma,
1953
      .pSrcStb = pStb,
1954
  };
1955

1956
  code = mndCreateTSMA(&cxt);
235✔
1957
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
235!
1958

1959
_OVER:
×
1960
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
250!
1961
    mError("tsma:%s, failed to create since %s", createReq.name, tstrerror(code));
15!
1962
  }
1963

1964
  if (pStb) mndReleaseStb(pMnode, pStb);
250✔
1965
  if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
250✔
1966
  mndReleaseSma(pMnode, pSma);
250✔
1967
  mndReleaseStream(pMnode, pStream);
250✔
1968
  mndReleaseDb(pMnode, pDb);
250✔
1969
  tFreeSMCreateSmaReq(&createReq);
250✔
1970

1971
  TAOS_RETURN(code);
250✔
1972
}
1973

1974
static int32_t mndDropTSMA(SCreateTSMACxt *pCxt) {
193✔
1975
  int32_t      code = -1;
193✔
1976
  STransAction dropStreamRedoAction = {0};
193✔
1977
  STrans      *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "drop-tsma");
193✔
1978
  if (!pTrans) {
193!
1979
    code = terrno;
×
1980
    goto _OVER;
×
1981
  }
1982
  SMDropStreamReq dropStreamReq = {0};
193✔
1983
  pCxt->pDropStreamReq = &dropStreamReq;
193✔
1984
  code = mndCreateTSMABuildDropStreamReq(pCxt);
193✔
1985
  if (TSDB_CODE_SUCCESS != code) {
193!
1986
    goto _OVER;
×
1987
  }
1988
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
193✔
1989
  if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
193!
1990
  mndTransSetSerial(pTrans);
193✔
1991
  mndGetMnodeEpSet(pCxt->pMnode, &dropStreamRedoAction.epSet);
193✔
1992
  dropStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
193✔
1993
  dropStreamRedoAction.msgType = TDMT_STREAM_DROP;
193✔
1994
  dropStreamRedoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
193✔
1995
  dropStreamRedoAction.pCont = taosMemoryCalloc(1, dropStreamRedoAction.contLen);
193!
1996
  if (!dropStreamRedoAction.pCont) {
193!
1997
    code = terrno;
×
1998
    goto _OVER;
×
1999
  }
2000
  if (dropStreamRedoAction.contLen !=
193!
2001
      tSerializeSMDropStreamReq(dropStreamRedoAction.pCont, dropStreamRedoAction.contLen, pCxt->pDropStreamReq)) {
193✔
2002
    mError("tsma: %s, failed to drop due to drop stream req encode failure", pCxt->pDropSmaReq->name);
×
2003
    code = TSDB_CODE_INVALID_MSG;
×
2004
    goto _OVER;
×
2005
  }
2006

2007
  // output stable is not dropped when dropping stream, dropping it when dropping tsma
2008
  SMDropStbReq dropStbReq = {0};
193✔
2009
  dropStbReq.igNotExists = false;
193✔
2010
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
193✔
2011
  dropStbReq.sql = "drop";
193✔
2012
  dropStbReq.sqlLen = 5;
193✔
2013

2014
  STransAction dropStbRedoAction = {0};
193✔
2015
  mndGetMnodeEpSet(pCxt->pMnode, &dropStbRedoAction.epSet);
193✔
2016
  dropStbRedoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
193✔
2017
  dropStbRedoAction.msgType = TDMT_MND_STB_DROP;
193✔
2018
  dropStbRedoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
193✔
2019
  dropStbRedoAction.pCont = taosMemoryCalloc(1, dropStbRedoAction.contLen);
193!
2020
  if (!dropStbRedoAction.pCont) {
193!
2021
    code = terrno;
×
2022
    goto _OVER;
×
2023
  }
2024
  if (dropStbRedoAction.contLen !=
193!
2025
      tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
193✔
2026
    mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name);
×
2027
    code = TSDB_CODE_INVALID_MSG;
×
2028
    goto _OVER;
×
2029
  }
2030

2031
  SDbObj newDb = {0};
193✔
2032
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
193✔
2033
  newDb.tsmaVersion++;
193✔
2034
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
193!
2035
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
193!
2036
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStreamRedoAction), NULL, _OVER);
193!
2037
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStbRedoAction), NULL, _OVER);
193!
2038
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
193!
2039
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
193!
2040
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
193!
2041
  code = TSDB_CODE_SUCCESS;
193✔
2042
_OVER:
193✔
2043
  tFreeMDropStreamReq(pCxt->pDropStreamReq);
193✔
2044
  mndTransDrop(pTrans);
193✔
2045
  TAOS_RETURN(code);
193✔
2046
}
2047

2048
static bool hasRecursiveTsmasBasedOnMe(SMnode *pMnode, const SSmaObj *pSma) {
198✔
2049
  SSmaObj *pSmaObj = NULL;
198✔
2050
  void    *pIter = NULL;
198✔
2051
  while (1) {
2052
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj);
530✔
2053
    if (pIter == NULL) break;
530✔
2054
    if (0 == strncmp(pSmaObj->baseSmaName, pSma->name, TSDB_TABLE_FNAME_LEN)) {
337✔
2055
      sdbRelease(pMnode->pSdb, pSmaObj);
5✔
2056
      sdbCancelFetch(pMnode->pSdb, pIter);
5✔
2057
      return true;
5✔
2058
    }
2059
    sdbRelease(pMnode->pSdb, pSmaObj);
332✔
2060
  }
2061
  return false;
193✔
2062
}
2063

2064
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq) {
198✔
2065
  int32_t      code = -1;
198✔
2066
  SMDropSmaReq dropReq = {0};
198✔
2067
  SSmaObj     *pSma = NULL;
198✔
2068
  SDbObj      *pDb = NULL;
198✔
2069
  SMnode      *pMnode = pReq->info.node;
198✔
2070
  if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) {
198!
2071
    code = TSDB_CODE_INVALID_MSG;
×
2072
    goto _OVER;
×
2073
  }
2074

2075
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
198✔
2076
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
198✔
2077
  code = mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
198✔
2078
  if (TSDB_CODE_SUCCESS != code) {
198!
2079
    goto _OVER;
×
2080
  }
2081

2082
  SStbObj *pStb = mndAcquireStb(pMnode, streamTargetStbFullName);
198✔
2083

2084
  pSma = mndAcquireSma(pMnode, dropReq.name);
198✔
2085
  if (!pSma && dropReq.igNotExists) {
198!
2086
    code = 0;
×
2087
    goto _OVER;
×
2088
  }
2089
  if (!pSma) {
198!
2090
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
2091
    goto _OVER;
×
2092
  }
2093
  SName name = {0};
198✔
2094
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
198✔
2095
  if (TSDB_CODE_SUCCESS != code) {
198!
2096
    goto _OVER;
×
2097
  }
2098
  char db[TSDB_TABLE_FNAME_LEN] = {0};
198✔
2099
  (void)tNameGetFullDbName(&name, db);
198✔
2100

2101
  pDb = mndAcquireDb(pMnode, db);
198✔
2102
  if (!pDb) {
198!
2103
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
2104
    goto _OVER;
×
2105
  }
2106

2107
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
198!
2108
    goto _OVER;
×
2109
  }
2110

2111
  if (hasRecursiveTsmasBasedOnMe(pMnode, pSma)) {
198✔
2112
    code = TSDB_CODE_MND_INVALID_DROP_TSMA;
5✔
2113
    goto _OVER;
5✔
2114
  }
2115

2116
  SCreateTSMACxt cxt = {
193✔
2117
      .pDb = pDb,
2118
      .pMnode = pMnode,
2119
      .pRpcReq = pReq,
2120
      .pSma = pSma,
2121
      .streamName = streamName,
2122
      .targetStbFullName = streamTargetStbFullName,
2123
      .pDropSmaReq = &dropReq,
2124
  };
2125

2126
  code = mndDropTSMA(&cxt);
193✔
2127

2128
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
193!
2129
_OVER:
×
2130

2131
  mndReleaseStb(pMnode, pStb);
198✔
2132
  mndReleaseSma(pMnode, pSma);
198✔
2133
  mndReleaseDb(pMnode, pDb);
198✔
2134
  TAOS_RETURN(code);
198✔
2135
}
2136

UNCOV
2137
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
2138
  SDbObj          *pDb = NULL;
×
UNCOV
2139
  int32_t          numOfRows = 0;
×
UNCOV
2140
  SSmaObj         *pSma = NULL;
×
UNCOV
2141
  SMnode          *pMnode = pReq->info.node;
×
UNCOV
2142
  int32_t          code = 0;
×
2143
  SColumnInfoData *pColInfo;
UNCOV
2144
  if (pShow->pIter == NULL) {
×
UNCOV
2145
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
×
2146
  }
UNCOV
2147
  if (!pShow->pIter) {
×
2148
    return terrno;
×
2149
  }
UNCOV
2150
  if (pShow->db[0]) {
×
UNCOV
2151
    pDb = mndAcquireDb(pMnode, pShow->db);
×
2152
  }
UNCOV
2153
  SSmaAndTagIter *pIter = pShow->pIter;
×
UNCOV
2154
  while (numOfRows < rows) {
×
UNCOV
2155
    pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
×
UNCOV
2156
    if (pIter->pSmaIter == NULL) break;
×
UNCOV
2157
    SDbObj *pSrcDb = mndAcquireDb(pMnode, pSma->db);
×
2158

UNCOV
2159
    if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) {
×
2160
      sdbRelease(pMnode->pSdb, pSma);
×
2161
      if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
2162
      continue;
×
2163
    }
2164

UNCOV
2165
    int32_t cols = 0;
×
UNCOV
2166
    SName   n = {0};
×
2167

UNCOV
2168
    code = tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
2169
    char smaName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
2170
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2171
      STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n));
×
UNCOV
2172
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
2173
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
2174
    }
2175

UNCOV
2176
    char db[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
2177
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2178
      STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
×
UNCOV
2179
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
2180
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
2181
    }
2182

UNCOV
2183
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2184
      code = tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2185
    }
UNCOV
2186
    char srcTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
2187
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2188
      STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
×
UNCOV
2189
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
2190
      code = colDataSetVal(pColInfo, numOfRows, (const char *)srcTb, false);
×
2191
    }
2192

UNCOV
2193
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2194
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
2195
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
2196
    }
2197

UNCOV
2198
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2199
      code = tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2200
    }
2201

UNCOV
2202
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2203
      char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
2204
      STR_TO_VARSTR(targetTb, (char *)tNameGetTableName(&n));
×
UNCOV
2205
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
2206
      code = colDataSetVal(pColInfo, numOfRows, (const char *)targetTb, false);
×
2207
    }
2208

UNCOV
2209
    if (TSDB_CODE_SUCCESS == code) {
×
2210
      // stream name
UNCOV
2211
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
2212
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
2213
    }
2214

UNCOV
2215
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2216
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
2217
      code = colDataSetVal(pColInfo, numOfRows, (const char *)(&pSma->createdTime), false);
×
2218
    }
2219

2220
    // interval
UNCOV
2221
    char    interval[64 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
2222
    int32_t len = 0;
×
UNCOV
2223
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2224
      if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
×
UNCOV
2225
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
×
UNCOV
2226
                        getPrecisionUnit(pSrcDb->cfg.precision));
×
2227
      } else {
2228
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
×
2229
      }
UNCOV
2230
      varDataSetLen(interval, len);
×
UNCOV
2231
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
2232
      code = colDataSetVal(pColInfo, numOfRows, interval, false);
×
2233
    }
2234

UNCOV
2235
    char buf[TSDB_MAX_SAVED_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
2236
    if (TSDB_CODE_SUCCESS == code) {
×
2237
      // create sql
UNCOV
2238
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
2239
      len = tsnprintf(buf + VARSTR_HEADER_SIZE, TSDB_MAX_SAVED_SQL_LEN, "%s", pSma->sql);
×
UNCOV
2240
      varDataSetLen(buf, TMIN(len, TSDB_MAX_SAVED_SQL_LEN));
×
UNCOV
2241
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
2242
    }
2243

2244
    // func list
UNCOV
2245
    len = 0;
×
UNCOV
2246
    SNode *pNode = NULL, *pFunc = NULL;
×
UNCOV
2247
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2248
      code = nodesStringToNode(pSma->ast, &pNode);
×
2249
    }
UNCOV
2250
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2251
      char *start = buf + VARSTR_HEADER_SIZE;
×
UNCOV
2252
      FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) {
×
UNCOV
2253
        if (nodeType(pFunc) == QUERY_NODE_FUNCTION) {
×
UNCOV
2254
          SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
×
UNCOV
2255
          if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
UNCOV
2256
          len += tsnprintf(start, TSDB_MAX_SAVED_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "",
×
UNCOV
2257
                           ((SExprNode *)pFunc)->userAlias);
×
UNCOV
2258
          if (len >= TSDB_MAX_SAVED_SQL_LEN) {
×
2259
            len = TSDB_MAX_SAVED_SQL_LEN;
×
2260
            break;
×
2261
          }
UNCOV
2262
          start = buf + VARSTR_HEADER_SIZE + len;
×
2263
        }
2264
      }
UNCOV
2265
      nodesDestroyNode(pNode);
×
2266
    }
2267

UNCOV
2268
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2269
      varDataSetLen(buf, len);
×
UNCOV
2270
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
2271
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
2272
    }
2273

UNCOV
2274
    numOfRows++;
×
UNCOV
2275
    mndReleaseSma(pMnode, pSma);
×
UNCOV
2276
    mndReleaseDb(pMnode, pSrcDb);
×
UNCOV
2277
    if (TSDB_CODE_SUCCESS != code) {
×
2278
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
2279
      numOfRows = code;
×
2280
      break;
×
2281
    }
2282
  }
UNCOV
2283
  mndReleaseDb(pMnode, pDb);
×
UNCOV
2284
  pShow->numOfRows += numOfRows;
×
UNCOV
2285
  if (numOfRows < rows) {
×
UNCOV
2286
    taosMemoryFree(pShow->pIter);
×
UNCOV
2287
    pShow->pIter = NULL;
×
2288
  }
UNCOV
2289
  return numOfRows;
×
2290
}
2291

2292
static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
×
2293
  SSmaAndTagIter *p = pIter;
×
2294
  if (p != NULL) {
×
2295
    SSdb *pSdb = pMnode->pSdb;
×
2296
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
2297
  }
2298
  taosMemoryFree(p);
×
2299
}
×
2300

2301
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj *pSma, const SStbObj *pDestStb, STableTSMAInfo *pInfo,
1,798✔
2302
                               const SSmaObj *pBaseTsma) {
2303
  int32_t code = 0;
1,798✔
2304
  pInfo->interval = pSma->interval;
1,798✔
2305
  pInfo->unit = pSma->intervalUnit;
1,798✔
2306
  pInfo->tsmaId = pSma->uid;
1,798✔
2307
  pInfo->version = pSma->version;
1,798✔
2308
  pInfo->tsmaId = pSma->uid;
1,798✔
2309
  pInfo->destTbUid = pDestStb->uid;
1,798✔
2310
  SName sName = {0};
1,798✔
2311
  code = tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,798✔
2312
  if (TSDB_CODE_SUCCESS != code) {
1,798!
2313
    return code;
×
2314
  }
2315
  tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN);
1,798✔
2316
  tstrncpy(pInfo->targetDbFName, pSma->db, TSDB_DB_FNAME_LEN);
1,798✔
2317
  code = tNameFromString(&sName, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,798✔
2318
  if (TSDB_CODE_SUCCESS != code) {
1,798!
2319
    return code;
×
2320
  }
2321
  tstrncpy(pInfo->targetTb, sName.tname, TSDB_TABLE_NAME_LEN);
1,798✔
2322
  tstrncpy(pInfo->dbFName, pSma->db, TSDB_DB_FNAME_LEN);
1,798✔
2323
  code = tNameFromString(&sName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,798✔
2324
  if (TSDB_CODE_SUCCESS != code) {
1,798!
2325
    return code;
×
2326
  }
2327
  tstrncpy(pInfo->tb, sName.tname, TSDB_TABLE_NAME_LEN);
1,798✔
2328
  pInfo->pFuncs = taosArrayInit(8, sizeof(STableTSMAFuncInfo));
1,798✔
2329
  if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY;
1,798!
2330

2331
  SNode *pNode, *pFunc;
2332
  if (TSDB_CODE_SUCCESS != nodesStringToNode(pBaseTsma ? pBaseTsma->ast : pSma->ast, &pNode)) {
1,798!
2333
    taosArrayDestroy(pInfo->pFuncs);
×
2334
    pInfo->pFuncs = NULL;
×
2335
    return TSDB_CODE_TSMA_INVALID_STAT;
×
2336
  }
2337
  if (pNode) {
1,798!
2338
    SSelectStmt *pSelect = (SSelectStmt *)pNode;
1,798✔
2339
    FOREACH(pFunc, pSelect->pProjectionList) {
302,408!
2340
      STableTSMAFuncInfo funcInfo = {0};
300,610✔
2341
      SFunctionNode     *pFuncNode = (SFunctionNode *)pFunc;
300,610✔
2342
      if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
300,610✔
2343
      funcInfo.funcId = pFuncNode->funcId;
295,216✔
2344
      funcInfo.colId = ((SColumnNode *)pFuncNode->pParameterList->pHead->pNode)->colId;
295,216✔
2345
      if (!taosArrayPush(pInfo->pFuncs, &funcInfo)) {
590,432!
2346
        code = terrno;
×
2347
        taosArrayDestroy(pInfo->pFuncs);
×
2348
        nodesDestroyNode(pNode);
×
2349
        return code;
×
2350
      }
2351
    }
2352
    nodesDestroyNode(pNode);
1,798✔
2353
  }
2354
  pInfo->ast = taosStrdup(pSma->ast);
1,798!
2355
  if (!pInfo->ast) code = terrno;
1,798!
2356

2357
  if (code == TSDB_CODE_SUCCESS && pDestStb->numOfTags > 0) {
1,798!
2358
    pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema));
1,798✔
2359
    if (!pInfo->pTags) {
1,798!
2360
      code = terrno;
×
2361
    } else {
2362
      for (int32_t i = 0; i < pDestStb->numOfTags; ++i) {
13,232✔
2363
        if (NULL == taosArrayPush(pInfo->pTags, &pDestStb->pTags[i])) {
22,868!
2364
          code = terrno;
×
2365
          break;
×
2366
        }
2367
      }
2368
    }
2369
  }
2370
  if (code == TSDB_CODE_SUCCESS) {
1,798!
2371
    pInfo->pUsedCols = taosArrayInit(pDestStb->numOfColumns - 3, sizeof(SSchema));
1,798✔
2372
    if (!pInfo->pUsedCols)
1,798!
2373
      code = terrno;
×
2374
    else {
2375
      // skip _wstart, _wend, _duration
2376
      for (int32_t i = 1; i < pDestStb->numOfColumns - 2; ++i) {
297,014✔
2377
        if (NULL == taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i])) {
590,432!
2378
          code = terrno;
×
2379
          break;
×
2380
        }
2381
      }
2382
    }
2383
  }
2384
  TAOS_RETURN(code);
1,798✔
2385
}
2386

2387
// @note remember to mndReleaseSma(*ppOut)
2388
static int32_t mndGetDeepestBaseForTsma(SMnode *pMnode, SSmaObj *pSma, SSmaObj **ppOut) {
1,798✔
2389
  int32_t  code = 0;
1,798✔
2390
  SSmaObj *pRecursiveTsma = NULL;
1,798✔
2391
  if (pSma->baseSmaName[0]) {
1,798✔
2392
    pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName);
385✔
2393
    if (!pRecursiveTsma) {
385!
2394
      mError("base tsma: %s for tsma: %s not found", pSma->baseSmaName, pSma->name);
×
2395
      return TSDB_CODE_MND_SMA_NOT_EXIST;
×
2396
    }
2397
    while (pRecursiveTsma->baseSmaName[0]) {
424✔
2398
      SSmaObj *pTmpSma = pRecursiveTsma;
39✔
2399
      pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
39✔
2400
      if (!pRecursiveTsma) {
39!
2401
        mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name);
×
2402
        mndReleaseSma(pMnode, pTmpSma);
×
2403
        return TSDB_CODE_MND_SMA_NOT_EXIST;
×
2404
      }
2405
      mndReleaseSma(pMnode, pTmpSma);
39✔
2406
    }
2407
  }
2408
  *ppOut = pRecursiveTsma;
1,798✔
2409
  return code;
1,798✔
2410
}
2411

2412
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
84✔
2413
  int32_t  code = -1;
84✔
2414
  SSmaObj *pSma = NULL;
84✔
2415
  SSmaObj *pBaseTsma = NULL;
84✔
2416
  SStbObj *pDstStb = NULL;
84✔
2417

2418
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName);
84✔
2419
  if (pSma) {
84!
2420
    pDstStb = mndAcquireStb(pMnode, pSma->dstTbName);
84✔
2421
    if (!pDstStb) {
84!
2422
      sdbRelease(pMnode->pSdb, pSma);
×
2423
      return TSDB_CODE_SUCCESS;
×
2424
    }
2425

2426
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
84!
2427
    if (!pTsma) {
84!
2428
      code = terrno;
×
2429
      sdbRelease(pMnode->pSdb, pSma);
×
2430
      mndReleaseStb(pMnode, pDstStb);
×
2431
      TAOS_RETURN(code);
×
2432
    }
2433

2434
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
84✔
2435
    if (code == 0) {
84!
2436
      code = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma, pBaseTsma);
84✔
2437
    }
2438
    mndReleaseStb(pMnode, pDstStb);
84✔
2439
    sdbRelease(pMnode->pSdb, pSma);
84✔
2440
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
84✔
2441
    if (terrno) {
84!
2442
      tFreeAndClearTableTSMAInfo(pTsma);
×
2443
      TAOS_RETURN(code);
×
2444
    }
2445
    if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
168!
2446
      code = terrno;
×
2447
      tFreeAndClearTableTSMAInfo(pTsma);
×
2448
    }
2449
    *exist = true;
84✔
2450
  }
2451
  TAOS_RETURN(code);
84✔
2452
}
2453

2454
typedef bool (*tsmaFilter)(const SSmaObj *pSma, void *param);
2455

2456
static int32_t mndGetSomeTsmas(SMnode *pMnode, STableTSMAInfoRsp *pRsp, tsmaFilter filtered, void *param, bool *exist) {
3,188✔
2457
  int32_t     code = 0;
3,188✔
2458
  SSmaObj    *pSma = NULL;
3,188✔
2459
  SSmaObj    *pBaseTsma = NULL;
3,188✔
2460
  SSdb       *pSdb = pMnode->pSdb;
3,188✔
2461
  void       *pIter = NULL;
3,188✔
2462
  SStreamObj *pStream = NULL;
3,188✔
2463
  SStbObj    *pStb = NULL;
3,188✔
2464
  bool        shouldRetry = false;
3,188✔
2465

2466
  while (1) {
2,092✔
2467
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
5,280✔
2468
    if (pIter == NULL) break;
5,280✔
2469

2470
    if (filtered(pSma, param)) {
2,092✔
2471
      sdbRelease(pSdb, pSma);
374✔
2472
      continue;
378✔
2473
    }
2474

2475
    pStb = mndAcquireStb(pMnode, pSma->dstTbName);
1,718✔
2476
    if (!pStb) {
1,718✔
2477
      sdbRelease(pSdb, pSma);
4✔
2478
      shouldRetry = true;
4✔
2479
      continue;
4✔
2480
    }
2481

2482
    SName smaName;
2483
    char  streamName[TSDB_TABLE_FNAME_LEN] = {0};
1,714✔
2484
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
1,714✔
2485
    if (TSDB_CODE_SUCCESS != code) {
1,714!
2486
      sdbRelease(pSdb, pSma);
×
2487
      mndReleaseStb(pMnode, pStb);
×
2488
      TAOS_RETURN(code);
×
2489
    }
2490
    snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
1,714✔
2491
    pStream = NULL;
1,714✔
2492

2493
    code = mndAcquireStream(pMnode, streamName, &pStream);
1,714✔
2494
    if (!pStream) {
1,714!
2495
      shouldRetry = true;
×
2496
      sdbRelease(pSdb, pSma);
×
2497
      mndReleaseStb(pMnode, pStb);
×
2498
      continue;
×
2499
    }
2500
    if (code != 0) {
1,714!
2501
      sdbRelease(pSdb, pSma);
×
2502
      mndReleaseStb(pMnode, pStb);
×
2503
      TAOS_RETURN(code);
×
2504
    }
2505

2506
    int64_t streamId = pStream->uid;
1,714✔
2507
    mndReleaseStream(pMnode, pStream);
1,714✔
2508

2509
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
1,714!
2510
    if (!pTsma) {
1,714!
2511
      code = terrno;
×
2512
      mndReleaseStb(pMnode, pStb);
×
2513
      sdbRelease(pSdb, pSma);
×
2514
      sdbCancelFetch(pSdb, pIter);
×
2515
      TAOS_RETURN(code);
×
2516
    }
2517
    pTsma->streamUid = streamId;
1,714✔
2518

2519
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
1,714✔
2520
    if (code == 0) {
1,714!
2521
      code = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma, pBaseTsma);
1,714✔
2522
    }
2523
    mndReleaseStb(pMnode, pStb);
1,714✔
2524
    sdbRelease(pSdb, pSma);
1,714✔
2525
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
1,714✔
2526
    if (terrno) {
1,714!
2527
      tFreeAndClearTableTSMAInfo(pTsma);
×
2528
      sdbCancelFetch(pSdb, pIter);
×
2529
      TAOS_RETURN(code);
×
2530
    }
2531
    if (NULL == taosArrayPush(pRsp->pTsmas, &pTsma)) {
3,428!
2532
      code = terrno;
×
2533
      tFreeAndClearTableTSMAInfo(pTsma);
×
2534
      sdbCancelFetch(pSdb, pIter);
×
2535
      TAOS_RETURN(code);
×
2536
    }
2537
    *exist = true;
1,714✔
2538
  }
2539
  if (shouldRetry) {
3,188✔
2540
    return TSDB_CODE_NEED_RETRY;
2✔
2541
  }
2542
  return TSDB_CODE_SUCCESS;
3,186✔
2543
}
2544

2545
static bool tsmaTbFilter(const SSmaObj *pSma, void *param) {
1,767✔
2546
  const char *tbFName = param;
1,767✔
2547
  return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0;
1,767!
2548
}
2549

2550
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *pRsp, bool *exist) {
1,169✔
2551
  return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist);
1,169✔
2552
}
2553

2554
static bool tsmaDbFilter(const SSmaObj *pSma, void *param) {
325✔
2555
  uint64_t *dbUid = param;
325✔
2556
  return pSma->dbUid != *dbUid;
325✔
2557
}
2558

2559
int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist) {
2,019✔
2560
  return mndGetSomeTsmas(pMnode, pRsp, tsmaDbFilter, &dbUid, exist);
2,019✔
2561
}
2562

2563
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
1,253✔
2564
  STableTSMAInfoRsp rsp = {0};
1,253✔
2565
  int32_t           code = -1;
1,253✔
2566
  STableTSMAInfoReq tsmaReq = {0};
1,253✔
2567
  bool              exist = false;
1,253✔
2568
  SMnode           *pMnode = pReq->info.node;
1,253✔
2569

2570
  TAOS_CHECK_GOTO(tDeserializeTableTSMAInfoReq(pReq->pCont, pReq->contLen, &tsmaReq), NULL, _OVER);
1,253!
2571

2572
  rsp.pTsmas = taosArrayInit(4, POINTER_BYTES);
1,253✔
2573
  if (NULL == rsp.pTsmas) {
1,253!
2574
    code = terrno;
×
2575
    goto _OVER;
×
2576
  }
2577

2578
  if (tsmaReq.fetchingWithTsmaName) {
1,253✔
2579
    code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
84✔
2580
  } else {
2581
    code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
1,169✔
2582
    if (TSDB_CODE_NEED_RETRY == code) {
1,169!
2583
      code = TSDB_CODE_SUCCESS;
×
2584
    }
2585
  }
2586
  if (code) {
1,253!
2587
    goto _OVER;
×
2588
  }
2589

2590
  if (!exist) {
1,253✔
2591
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
218✔
2592
  } else {
2593
    int32_t contLen = tSerializeTableTSMAInfoRsp(NULL, 0, &rsp);
1,035✔
2594
    void   *pRsp = rpcMallocCont(contLen);
1,035✔
2595
    if (pRsp == NULL) {
1,035!
2596
      code = terrno;
×
2597
      goto _OVER;
×
2598
    }
2599

2600
    int32_t len = tSerializeTableTSMAInfoRsp(pRsp, contLen, &rsp);
1,035✔
2601
    if (len < 0) {
1,035!
2602
      code = terrno;
×
2603
      goto _OVER;
×
2604
    }
2605

2606
    pReq->info.rsp = pRsp;
1,035✔
2607
    pReq->info.rspLen = contLen;
1,035✔
2608

2609
    code = 0;
1,035✔
2610
  }
2611

2612
_OVER:
1,253✔
2613
  tFreeTableTSMAInfoRsp(&rsp);
1,253✔
2614
  TAOS_RETURN(code);
1,253✔
2615
}
2616

2617
static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo **ppTsma) {
4✔
2618
  STableTSMAInfo *pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
4!
2619
  if (!pInfo) {
4!
2620
    return terrno;
×
2621
  }
2622
  pInfo->pFuncs = NULL;
4✔
2623
  pInfo->tsmaId = pTsmaVer->tsmaId;
4✔
2624
  tstrncpy(pInfo->dbFName, pTsmaVer->dbFName, TSDB_DB_FNAME_LEN);
4✔
2625
  tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN);
4✔
2626
  tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN);
4✔
2627
  pInfo->dbId = pTsmaVer->dbId;
4✔
2628
  pInfo->ast = taosMemoryCalloc(1, 1);
4!
2629
  if (!pInfo->ast) {
4!
2630
    taosMemoryFree(pInfo);
×
2631
    return terrno;
×
2632
  }
2633
  *ppTsma = pInfo;
4✔
2634
  return TSDB_CODE_SUCCESS;
4✔
2635
}
2636

2637
int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp,
411✔
2638
                            int32_t *pRspLen) {
2639
  int32_t         code = -1;
411✔
2640
  STSMAHbRsp      hbRsp = {0};
411✔
2641
  int32_t         rspLen = 0;
411✔
2642
  void           *pRsp = NULL;
411✔
2643
  char            tsmaFName[TSDB_TABLE_FNAME_LEN] = {0};
411✔
2644
  STableTSMAInfo *pTsmaInfo = NULL;
411✔
2645

2646
  hbRsp.pTsmas = taosArrayInit(numOfTsmas, POINTER_BYTES);
411✔
2647
  if (!hbRsp.pTsmas) {
411!
2648
    code = terrno;
×
2649
    TAOS_RETURN(code);
×
2650
  }
2651

2652
  for (int32_t i = 0; i < numOfTsmas; ++i) {
843✔
2653
    STSMAVersion *pTsmaVer = &pTsmaVersions[i];
432✔
2654
    pTsmaVer->dbId = be64toh(pTsmaVer->dbId);
432✔
2655
    pTsmaVer->tsmaId = be64toh(pTsmaVer->tsmaId);
432✔
2656
    pTsmaVer->version = ntohl(pTsmaVer->version);
432✔
2657

2658
    snprintf(tsmaFName, sizeof(tsmaFName), "%s.%s", pTsmaVer->dbFName, pTsmaVer->name);
432✔
2659
    SSmaObj *pSma = mndAcquireSma(pMnode, tsmaFName);
432✔
2660
    if (!pSma) {
432✔
2661
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
4✔
2662
      if (code) goto _OVER;
4!
2663
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
8!
2664
        code = terrno;
×
2665
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2666
        goto _OVER;
×
2667
      }
2668
      continue;
432✔
2669
    }
2670

2671
    if (pSma->uid != pTsmaVer->tsmaId) {
428!
2672
      mDebug("tsma: %s.%" PRIx64 " tsmaId mismatch with current %" PRIx64, tsmaFName, pTsmaVer->tsmaId, pSma->uid);
×
2673
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2674
      mndReleaseSma(pMnode, pSma);
×
2675
      if (code) goto _OVER;
×
2676
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2677
        code = terrno;
×
2678
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2679
        goto _OVER;
×
2680
      }
2681
      continue;
×
2682
    } else if (pSma->version == pTsmaVer->version) {
428!
2683
      mndReleaseSma(pMnode, pSma);
428✔
2684
      continue;
428✔
2685
    }
2686

2687
    SStbObj *pDestStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
2688
    if (!pDestStb) {
×
2689
      mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName);
×
2690
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2691
      mndReleaseSma(pMnode, pSma);
×
2692
      if (code) goto _OVER;
×
2693
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2694
        code = terrno;
×
2695
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2696
        goto _OVER;
×
2697
      }
2698
      continue;
×
2699
    }
2700

2701
    // dump smaObj into rsp
2702
    STableTSMAInfo *pInfo = NULL;
×
2703
    pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2704
    if (!pInfo) {
×
2705
      code = terrno;
×
2706
      mndReleaseSma(pMnode, pSma);
×
2707
      mndReleaseStb(pMnode, pDestStb);
×
2708
      goto _OVER;
×
2709
    }
2710

2711
    SSmaObj *pBaseSma = NULL;
×
2712
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma);
×
2713
    if (code == 0) code = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma);
×
2714

2715
    mndReleaseStb(pMnode, pDestStb);
×
2716
    mndReleaseSma(pMnode, pSma);
×
2717
    if (pBaseSma) mndReleaseSma(pMnode, pBaseSma);
×
2718
    if (terrno) {
×
2719
      tFreeAndClearTableTSMAInfo(pInfo);
×
2720
      goto _OVER;
×
2721
    }
2722

2723
    if (NULL == taosArrayPush(hbRsp.pTsmas, pInfo)) {
×
2724
      code = terrno;
×
2725
      tFreeAndClearTableTSMAInfo(pInfo);
×
2726
      goto _OVER;
×
2727
    }
2728
  }
2729

2730
  rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp);
411✔
2731
  if (rspLen < 0) {
411!
2732
    code = terrno;
×
2733
    goto _OVER;
×
2734
  }
2735

2736
  pRsp = taosMemoryMalloc(rspLen);
411!
2737
  if (!pRsp) {
411!
2738
    code = terrno;
×
2739
    rspLen = 0;
×
2740
    goto _OVER;
×
2741
  }
2742

2743
  rspLen = tSerializeTSMAHbRsp(pRsp, rspLen, &hbRsp);
411✔
2744
  if (rspLen < 0) {
411!
2745
    code = terrno;
×
2746
    goto _OVER;
×
2747
  }
2748
  code = 0;
411✔
2749
_OVER:
411✔
2750
  tFreeTSMAHbRsp(&hbRsp);
411✔
2751
  *ppRsp = pRsp;
411✔
2752
  *pRspLen = rspLen;
411✔
2753
  TAOS_RETURN(code);
411✔
2754
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc