• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4761

28 Sep 2025 10:49AM UTC coverage: 57.837% (-1.0%) from 58.866%
#4761

push

travis-ci

web-flow
merge: set version (#33122)

136913 of 302095 branches covered (45.32%)

Branch coverage included in aggregate %.

207750 of 293830 relevant lines covered (70.7%)

5673932.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.34
/source/dnode/mnode/impl/src/mndSma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSma.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndIndex.h"
22
#include "mndIndexComm.h"
23
#include "mndInfoSchema.h"
24
#include "mndMnode.h"
25
#include "mndPrivilege.h"
26
#include "mndShow.h"
27
#include "mndStb.h"
28
#include "mndStream.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "mndVgroup.h"
32
#include "parser.h"
33
#include "tname.h"
34

35
#define TSDB_SMA_VER_NUMBER   1
36
#define TSDB_SMA_RESERVE_SIZE 64
37

38
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma);
39
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
40
static int32_t  mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
41
static int32_t  mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
42
static int32_t  mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
43
static int32_t  mndProcessDropSmaReq(SRpcMsg *pReq);
44
static int32_t  mndProcessGetSmaReq(SRpcMsg *pReq);
45
static int32_t  mndProcessGetTbSmaReq(SRpcMsg *pReq);
46
static int32_t  mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47

48
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq);
49
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq);
50

51
// sma and tag index comm func
52
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
53
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
54
static void    mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
55

56
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
57
static void    mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter);
58
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq);
59

60
typedef struct SCreateTSMACxt {
61
  SMnode        *pMnode;
62
  const SRpcMsg *pRpcReq;
63
  union {
64
    const SMCreateSmaReq *pCreateSmaReq;
65
    const SMDropSmaReq   *pDropSmaReq;
66
  };
67
  SDbObj             *pDb;
68
  SStbObj            *pSrcStb;
69
  SSmaObj            *pSma;
70
  const SSmaObj      *pBaseSma;
71
  const char         *streamName;
72
  const char         *targetStbFullName;
73
  SNodeList          *pProjects;
74
} SCreateTSMACxt;
75

76
int32_t mndInitSma(SMnode *pMnode) {
1,944✔
77
  SSdbTable table = {
1,944✔
78
      .sdbType = SDB_SMA,
79
      .keyType = SDB_KEY_BINARY,
80
      .encodeFp = (SdbEncodeFp)mndSmaActionEncode,
81
      .decodeFp = (SdbDecodeFp)mndSmaActionDecode,
82
      .insertFp = (SdbInsertFp)mndSmaActionInsert,
83
      .updateFp = (SdbUpdateFp)mndSmaActionUpdate,
84
      .deleteFp = (SdbDeleteFp)mndSmaActionDelete,
85
  };
86

87
//  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq);
88
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
1,944✔
89
  mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
1,944✔
90
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
1,944✔
91

92
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
1,944✔
93
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
1,944✔
94

95
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq);
1,944✔
96
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM_RSP, mndTransProcessRsp);
1,944✔
97
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndTransProcessRsp);
1,944✔
98
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB_RSP, mndTransProcessRsp);
1,944✔
99
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq);
1,944✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_TSMA, mndProcessGetTbTSMAReq);
1,944✔
101
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TSMA, mndProcessGetTbTSMAReq);
1,944✔
102
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA);
1,944✔
103
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA);
1,944✔
104

105
  return sdbSetTable(pMnode->pSdb, table);
1,944✔
106
}
107

108
void mndCleanupSma(SMnode *pMnode) {}
1,944✔
109

110
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
×
111
  int32_t code = 0;
×
112
  int32_t lino = 0;
×
113
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
114

115
  int32_t size =
×
116
      sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
×
117
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size);
×
118
  if (pRaw == NULL) goto _OVER;
×
119

120
  int32_t dataPos = 0;
×
121
  SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
×
122
  SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
×
123
  SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
×
124
  SDB_SET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
×
125
  SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER)
×
126
  SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER)
×
127
  SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER)
×
128
  SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER)
×
129
  SDB_SET_INT64(pRaw, dataPos, pSma->dstTbUid, _OVER)
×
130
  SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER)
×
131
  SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER)
×
132
  SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER)
×
133
  SDB_SET_INT32(pRaw, dataPos, pSma->dstVgId, _OVER)
×
134
  SDB_SET_INT64(pRaw, dataPos, pSma->interval, _OVER)
×
135
  SDB_SET_INT64(pRaw, dataPos, pSma->offset, _OVER)
×
136
  SDB_SET_INT64(pRaw, dataPos, pSma->sliding, _OVER)
×
137
  SDB_SET_INT32(pRaw, dataPos, pSma->exprLen, _OVER)
×
138
  SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER)
×
139
  SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER)
×
140
  SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER)
×
141
  SDB_SET_INT32(pRaw, dataPos, pSma->version, _OVER)
×
142

143
  if (pSma->exprLen > 0) {
×
144
    SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
×
145
  }
146
  if (pSma->tagsFilterLen > 0) {
×
147
    SDB_SET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
148
  }
149
  if (pSma->sqlLen > 0) {
×
150
    SDB_SET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
×
151
  }
152
  if (pSma->astLen > 0) {
×
153
    SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
×
154
  }
155
  SDB_SET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
×
156

157
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
×
158
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
×
159

160
  terrno = 0;
×
161

162
_OVER:
×
163
  if (terrno != 0) {
×
164
    mError("sma:%s, failed to encode to raw:%p since %s", pSma->name, pRaw, terrstr());
×
165
    sdbFreeRaw(pRaw);
×
166
    return NULL;
×
167
  }
168

169
  mTrace("sma:%s, encode to raw:%p, row:%p", pSma->name, pRaw, pSma);
×
170
  return pRaw;
×
171
}
172

173
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
×
174
  int32_t code = 0;
×
175
  int32_t lino = 0;
×
176
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
177
  SSdbRow *pRow = NULL;
×
178
  SSmaObj *pSma = NULL;
×
179

180
  int8_t sver = 0;
×
181
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
×
182

183
  if (sver != TSDB_SMA_VER_NUMBER) {
×
184
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
185
    goto _OVER;
×
186
  }
187

188
  pRow = sdbAllocRow(sizeof(SSmaObj));
×
189
  if (pRow == NULL) goto _OVER;
×
190

191
  pSma = sdbGetRowObj(pRow);
×
192
  if (pSma == NULL) goto _OVER;
×
193

194
  int32_t dataPos = 0;
×
195

196
  SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
×
197
  SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
×
198
  SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
×
199
  SDB_GET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
×
200
  SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER)
×
201
  SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER)
×
202
  SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER)
×
203
  SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER)
×
204
  SDB_GET_INT64(pRaw, dataPos, &pSma->dstTbUid, _OVER)
×
205
  SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER)
×
206
  SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER)
×
207
  SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER)
×
208
  SDB_GET_INT32(pRaw, dataPos, &pSma->dstVgId, _OVER)
×
209
  SDB_GET_INT64(pRaw, dataPos, &pSma->interval, _OVER)
×
210
  SDB_GET_INT64(pRaw, dataPos, &pSma->offset, _OVER)
×
211
  SDB_GET_INT64(pRaw, dataPos, &pSma->sliding, _OVER)
×
212
  SDB_GET_INT32(pRaw, dataPos, &pSma->exprLen, _OVER)
×
213
  SDB_GET_INT32(pRaw, dataPos, &pSma->tagsFilterLen, _OVER)
×
214
  SDB_GET_INT32(pRaw, dataPos, &pSma->sqlLen, _OVER)
×
215
  SDB_GET_INT32(pRaw, dataPos, &pSma->astLen, _OVER)
×
216
  SDB_GET_INT32(pRaw, dataPos, &pSma->version, _OVER)
×
217

218
  if (pSma->exprLen > 0) {
×
219
    pSma->expr = taosMemoryCalloc(pSma->exprLen, 1);
×
220
    if (pSma->expr == NULL) goto _OVER;
×
221
    SDB_GET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
×
222
  }
223

224
  if (pSma->tagsFilterLen > 0) {
×
225
    pSma->tagsFilter = taosMemoryCalloc(pSma->tagsFilterLen, 1);
×
226
    if (pSma->tagsFilter == NULL) goto _OVER;
×
227
    SDB_GET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
228
  }
229

230
  if (pSma->sqlLen > 0) {
×
231
    pSma->sql = taosMemoryCalloc(pSma->sqlLen, 1);
×
232
    if (pSma->sql == NULL) goto _OVER;
×
233
    SDB_GET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
×
234
  }
235

236
  if (pSma->astLen > 0) {
×
237
    pSma->ast = taosMemoryCalloc(pSma->astLen, 1);
×
238
    if (pSma->ast == NULL) goto _OVER;
×
239
    SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
×
240
  }
241
  SDB_GET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
×
242

243
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
×
244

245
  terrno = 0;
×
246

247
_OVER:
×
248
  if (terrno != 0) {
×
249
    if (pSma != NULL) {
×
250
      mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr());
×
251
      taosMemoryFreeClear(pSma->expr);
×
252
      taosMemoryFreeClear(pSma->tagsFilter);
×
253
      taosMemoryFreeClear(pSma->sql);
×
254
      taosMemoryFreeClear(pSma->ast);
×
255
    }
256
    taosMemoryFreeClear(pRow);
×
257
    return NULL;
×
258
  }
259

260
  mTrace("sma:%s, decode from raw:%p, row:%p", pSma->name, pRaw, pSma);
×
261
  return pRow;
×
262
}
263

264
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma) {
×
265
  mTrace("sma:%s, perform insert action, row:%p", pSma->name, pSma);
×
266
  return 0;
×
267
}
268

269
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSma) {
×
270
  mTrace("sma:%s, perform delete action, row:%p", pSma->name, pSma);
×
271
  taosMemoryFreeClear(pSma->tagsFilter);
×
272
  taosMemoryFreeClear(pSma->expr);
×
273
  taosMemoryFreeClear(pSma->sql);
×
274
  taosMemoryFreeClear(pSma->ast);
×
275
  return 0;
×
276
}
277

278
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew) {
×
279
  mTrace("sma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
×
280
  return 0;
×
281
}
282

283
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName) {
×
284
  SSdb    *pSdb = pMnode->pSdb;
×
285
  SSmaObj *pSma = sdbAcquire(pSdb, SDB_SMA, smaName);
×
286
  if (pSma == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
287
    terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
×
288
  }
289
  return pSma;
×
290
}
291

292
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) {
10✔
293
  SSdb *pSdb = pMnode->pSdb;
10✔
294
  sdbRelease(pSdb, pSma);
10✔
295
}
10✔
296

297
static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
298
  int32_t  code = 0;
×
299
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
×
300
  if (pRedoRaw == NULL) {
×
301
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
302
    if (terrno != 0) code = terrno;
×
303
    TAOS_RETURN(code);
×
304
  }
305
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
306
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
307

308
  TAOS_RETURN(code);
×
309
}
310

311
static int32_t mndSetCreateSmaUndoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
312
  int32_t  code = 0;
×
313
  SSdbRaw *pUndoRaw = mndSmaActionEncode(pSma);
×
314
  if (!pUndoRaw) {
×
315
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
316
    if (terrno != 0) code = terrno;
×
317
    TAOS_RETURN(code);
×
318
  }
319
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
320
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
321
  TAOS_RETURN(code);
×
322
}
323

324
static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
325
  int32_t  code = 0;
×
326
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
×
327
  if (pCommitRaw == NULL) {
×
328
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
329
    if (terrno != 0) code = terrno;
×
330
    TAOS_RETURN(code);
×
331
  }
332
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
333
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
334

335
  TAOS_RETURN(code);
×
336
}
337

338
static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
×
339
  int32_t code = 0;
×
340
  SStbObj stbObj = {0};
×
341
  taosRLockLatch(&pStb->lock);
×
342
  memcpy(&stbObj, pStb, sizeof(SStbObj));
×
343
  taosRUnLockLatch(&pStb->lock);
×
344
  stbObj.numOfColumns = 0;
×
345
  stbObj.pColumns = NULL;
×
346
  stbObj.numOfTags = 0;
×
347
  stbObj.pTags = NULL;
×
348
  stbObj.numOfFuncs = 0;
×
349
  stbObj.pFuncs = NULL;
×
350
  stbObj.updateTime = taosGetTimestampMs();
×
351
  stbObj.lock = 0;
×
352
  stbObj.smaVer++;
×
353

354
  SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj);
×
355
  if (pCommitRaw == NULL) {
×
356
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
357
    if (terrno != 0) code = terrno;
×
358
    TAOS_RETURN(code);
×
359
  }
360
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
361
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
362

363
  TAOS_RETURN(code);
×
364
}
365

366
static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
×
367
  int32_t code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
368
  if (pCreate->name[0] == 0) TAOS_RETURN(code);
×
369
  if (pCreate->stb[0] == 0) TAOS_RETURN(code);
×
370
  if (pCreate->igExists < 0 || pCreate->igExists > 1) TAOS_RETURN(code);
×
371
  if (pCreate->intervalUnit < 0) TAOS_RETURN(code);
×
372
  if (pCreate->slidingUnit < 0) TAOS_RETURN(code);
×
373
  if (pCreate->timezone < 0) TAOS_RETURN(code);
×
374
  if (pCreate->interval < 0) TAOS_RETURN(code);
×
375
  if (pCreate->offset < 0) TAOS_RETURN(code);
×
376
  if (pCreate->sliding < 0) TAOS_RETURN(code);
×
377
  if (pCreate->exprLen < 0) TAOS_RETURN(code);
×
378
  if (pCreate->tagsFilterLen < 0) TAOS_RETURN(code);
×
379
  if (pCreate->sqlLen < 0) TAOS_RETURN(code);
×
380
  if (pCreate->astLen < 0) TAOS_RETURN(code);
×
381
  if (pCreate->exprLen != 0 && strlen(pCreate->expr) + 1 != pCreate->exprLen) TAOS_RETURN(code);
×
382
  if (pCreate->tagsFilterLen != 0 && strlen(pCreate->tagsFilter) + 1 != pCreate->tagsFilterLen) TAOS_RETURN(code);
×
383
  if (pCreate->sqlLen != 0 && strlen(pCreate->sql) + 1 != pCreate->sqlLen) TAOS_RETURN(code);
×
384
  if (pCreate->astLen != 0 && strlen(pCreate->ast) + 1 != pCreate->astLen) TAOS_RETURN(code);
×
385

386
  SName smaName = {0};
×
387
  if (tNameFromString(&smaName, pCreate->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) < 0) return -1;
×
388
  if (*(char *)tNameGetTableName(&smaName) == 0) return -1;
×
389

390
  code = 0;
×
391
  TAOS_RETURN(code);
×
392
}
393

394
static int32_t mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
×
395
  SName   n;
396
  int32_t code = tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
397
  if (TSDB_CODE_SUCCESS != code) {
×
398
    return code;
×
399
  }
400
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", n.acctId, n.tname);
×
401
  return TSDB_CODE_SUCCESS;
×
402
}
403

404
static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
405
  int32_t  code = 0;
×
406
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
×
407
  if (pRedoRaw == NULL) {
×
408
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
409
    if (terrno != 0) code = terrno;
×
410
    return -1;
×
411
  }
412
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
413
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
414

415
  return 0;
×
416
}
417

418
static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
419
  int32_t  code = 0;
×
420
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
×
421
  if (pCommitRaw == NULL) {
×
422
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
423
    if (terrno != 0) code = terrno;
×
424
    return -1;
×
425
  }
426
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
427
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
428

429
  return 0;
×
430
}
431

432
static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
433
  int32_t  code = 0;
×
434
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
435
  if (pVgRaw == NULL) {
×
436
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
437
    if (terrno != 0) code = terrno;
×
438
    return -1;
×
439
  }
440
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
×
441
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPING));
×
442

443
  return 0;
×
444
}
445

446
static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
447
  int32_t  code = 0;
×
448
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
449
  if (pVgRaw == NULL) {
×
450
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
451
    if (terrno != 0) code = terrno;
×
452
    return -1;
×
453
  }
454
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
×
455
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED));
×
456

457
  return 0;
×
458
}
459

460
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
461
  int32_t    code = 0;
×
462
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
×
463
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
464
  if (pDnode == NULL) {
×
465
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
466
    if (terrno != 0) code = terrno;
×
467
    TAOS_RETURN(code);
×
468
  }
469

470
  STransAction action = {0};
×
471
  action.epSet = mndGetDnodeEpset(pDnode);
×
472
  mndReleaseDnode(pMnode, pDnode);
×
473

474
  int32_t contLen = 0;
×
475
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
476
  if (pReq == NULL) {
×
477
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
478
    if (terrno != 0) code = terrno;
×
479
    TAOS_RETURN(code);
×
480
  }
481

482
  action.pCont = pReq;
×
483
  action.contLen = contLen;
×
484
  action.msgType = TDMT_DND_DROP_VNODE;
×
485
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
×
486

487
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
488
    taosMemoryFree(pReq);
×
489
    TAOS_RETURN(code);
×
490
  }
491

492
  TAOS_RETURN(code);
×
493
}
494

495
static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) {
×
496
  int32_t     code = -1;
×
497
  SVgObj     *pVgroup = NULL;
×
498
  SStbObj    *pStb = NULL;
×
499
  STrans     *pTrans = NULL;
×
500
  SStreamObj *pStream = NULL;
×
501

502
  pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
503
  if (pVgroup == NULL) {
×
504
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
505
    if (terrno != 0) code = terrno;
×
506
    goto _OVER;
×
507
  }
508

509
  pStb = mndAcquireStb(pMnode, pSma->stb);
×
510
  if (pStb == NULL) {
×
511
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
512
    if (terrno != 0) code = terrno;
×
513
    goto _OVER;
×
514
  }
515

516
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-sma");
×
517
  if (pTrans == NULL) {
×
518
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
519
    if (terrno != 0) code = terrno;
×
520
    goto _OVER;
×
521
  }
522

523
  mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
×
524
  mndTransSetDbName(pTrans, pDb->name, NULL);
×
525
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
526

527
  mndTransSetSerial(pTrans);
×
528

529
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
530
  code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
531
  if (TSDB_CODE_SUCCESS != code) {
×
532
    goto _OVER;
×
533
  }
534

535
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
536
  if (pStream == NULL || pStream->pCreate->streamId != pSma->uid || code != 0) {
×
537
    sdbRelease(pMnode->pSdb, pStream);
×
538
    goto _OVER;
×
539
  } else {
540
    // drop stream
541
    if ((code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
542
      mError("stream:%s, failed to drop log since %s", pStream->pCreate->name, tstrerror(code));
×
543
      sdbRelease(pMnode->pSdb, pStream);
×
544
      goto _OVER;
×
545
    }
546
  }
547
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
548
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
549
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
550
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
551
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
×
552
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
553
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
554

555
  code = 0;
×
556

557
_OVER:
×
558
  mndTransDrop(pTrans);
×
559
  mndReleaseStream(pMnode, pStream);
×
560
  mndReleaseVgroup(pMnode, pVgroup);
×
561
  mndReleaseStb(pMnode, pStb);
×
562
  TAOS_RETURN(code);
×
563
}
564

565
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
10✔
566
  SMnode      *pMnode = pReq->info.node;
10✔
567
  int32_t      code = -1;
10✔
568
  SDbObj      *pDb = NULL;
10✔
569
  SSmaObj     *pSma = NULL;
10✔
570
  SMDropSmaReq dropReq = {0};
10✔
571

572
  TAOS_CHECK_GOTO(tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
10!
573

574
  mInfo("sma:%s, start to drop", dropReq.name);
10!
575

576
  SSIdx idx = {0};
10✔
577
  if ((code = mndAcquireGlobalIdx(pMnode, dropReq.name, SDB_SMA, &idx)) == 0) {
10✔
578
    pSma = idx.pIdx;
5✔
579
  } else {
580
    goto _OVER;
5✔
581
  }
582
  if (pSma == NULL) {
5!
583
    if (dropReq.igNotExists) {
5!
584
      mInfo("sma:%s, not exist, ignore not exist is set", dropReq.name);
×
585
      code = 0;
×
586
      goto _OVER;
×
587
    } else {
588
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
5✔
589
      goto _OVER;
5✔
590
    }
591
  }
592

593
  SName name = {0};
×
594
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
595
  if (TSDB_CODE_SUCCESS != code) {
×
596
    goto _OVER;
×
597
  }
598
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
599
  (void)tNameGetFullDbName(&name, db);
×
600

601
  pDb = mndAcquireDb(pMnode, db);
×
602
  if (pDb == NULL) {
×
603
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
604
    goto _OVER;
×
605
  }
606

607
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
608

609
  code = mndDropSma(pMnode, pReq, pDb, pSma);
×
610
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
611

612
_OVER:
×
613
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
10!
614
    mError("sma:%s, failed to drop since %s", dropReq.name, tstrerror(code));
10!
615
  }
616

617
  mndReleaseSma(pMnode, pSma);
10✔
618
  mndReleaseDb(pMnode, pDb);
10✔
619
  TAOS_RETURN(code);
10✔
620
}
621

622
static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
×
623
  int32_t  code = -1;
×
624
  SSmaObj *pSma = NULL;
×
625

626
  SSIdx idx = {0};
×
627
  if (0 == mndAcquireGlobalIdx(pMnode, indexReq->indexFName, SDB_SMA, &idx)) {
×
628
    pSma = idx.pIdx;
×
629
  } else {
630
    *exist = false;
×
631
    return 0;
×
632
  }
633

634
  if (pSma == NULL) {
×
635
    *exist = false;
×
636
    return 0;
×
637
  }
638

639
  memcpy(rsp->dbFName, pSma->db, sizeof(pSma->db));
×
640
  memcpy(rsp->tblFName, pSma->stb, sizeof(pSma->stb));
×
641
  tstrncpy(rsp->indexType, TSDB_INDEX_TYPE_SMA, TSDB_INDEX_TYPE_LEN);
×
642

643
  SNodeList *pList = NULL;
×
644
  int32_t    extOffset = 0;
×
645
  code = nodesStringToList(pSma->expr, &pList);
×
646
  if (0 == code) {
×
647
    SNode *node = NULL;
×
648
    FOREACH(node, pList) {
×
649
      SFunctionNode *pFunc = (SFunctionNode *)node;
×
650
      extOffset += tsnprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
×
651
                             (extOffset ? "," : ""), pFunc->functionName);
×
652
    }
653

654
    *exist = true;
×
655
  }
656

657
  mndReleaseSma(pMnode, pSma);
×
658
  return code;
×
659
}
660

661
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) {
×
662
  int32_t         code = 0;
×
663
  SSmaObj        *pSma = NULL;
×
664
  SSdb           *pSdb = pMnode->pSdb;
×
665
  void           *pIter = NULL;
×
666
  STableIndexInfo info;
667

668
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
×
669
  if (NULL == pStb) {
×
670
    *exist = false;
×
671
    return TSDB_CODE_SUCCESS;
×
672
  }
673

674
  tstrncpy(rsp->dbFName, pStb->db, TSDB_DB_FNAME_LEN);
×
675
  tstrncpy(rsp->tbName, pStb->name + strlen(pStb->db) + 1, TSDB_TABLE_NAME_LEN);
×
676
  rsp->suid = pStb->uid;
×
677
  rsp->version = pStb->smaVer;
×
678
  mndReleaseStb(pMnode, pStb);
×
679

680
  while (1) {
×
681
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
×
682
    if (pIter == NULL) break;
×
683

684
    if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
×
685
      sdbRelease(pSdb, pSma);
×
686
      continue;
×
687
    }
688

689
    info.intervalUnit = pSma->intervalUnit;
×
690
    info.slidingUnit = pSma->slidingUnit;
×
691
    info.interval = pSma->interval;
×
692
    info.offset = pSma->offset;
×
693
    info.sliding = pSma->sliding;
×
694
    info.dstTbUid = pSma->dstTbUid;
×
695
    info.dstVgId = pSma->dstVgId;
×
696

697
    SVgObj *pVg = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
698
    if (pVg == NULL) {
×
699
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
700
      if (terrno != 0) code = terrno;
×
701
      sdbRelease(pSdb, pSma);
×
702
      sdbCancelFetch(pSdb, pIter);
×
703
      return code;
×
704
    }
705
    info.epSet = mndGetVgroupEpset(pMnode, pVg);
×
706

707
    info.expr = taosMemoryMalloc(pSma->exprLen + 1);
×
708
    if (info.expr == NULL) {
×
709
      code = terrno;
×
710
      sdbRelease(pSdb, pSma);
×
711
      sdbCancelFetch(pSdb, pIter);
×
712
      return code;
×
713
    }
714

715
    memcpy(info.expr, pSma->expr, pSma->exprLen);
×
716
    info.expr[pSma->exprLen] = 0;
×
717

718
    if (NULL == taosArrayPush(rsp->pIndex, &info)) {
×
719
      code = terrno;
×
720
      taosMemoryFree(info.expr);
×
721
      sdbRelease(pSdb, pSma);
×
722
      sdbCancelFetch(pSdb, pIter);
×
723
      return code;
×
724
    }
725

726
    rsp->indexSize += sizeof(info) + pSma->exprLen + 1;
×
727
    *exist = true;
×
728

729
    sdbRelease(pSdb, pSma);
×
730
  }
731

732
  TAOS_RETURN(code);
×
733
}
734

735
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
×
736
  SUserIndexReq indexReq = {0};
×
737
  SMnode       *pMnode = pReq->info.node;
×
738
  int32_t       code = -1;
×
739
  SUserIndexRsp rsp = {0};
×
740
  bool          exist = false;
×
741

742
  TAOS_CHECK_GOTO(tDeserializeSUserIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
743

744
  code = mndGetSma(pMnode, &indexReq, &rsp, &exist);
×
745
  if (code) {
×
746
    goto _OVER;
×
747
  }
748

749
  if (!exist) {
×
750
    // TODO GET INDEX FROM FULLTEXT
751
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
752
  } else {
753
    int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
×
754
    void   *pRsp = rpcMallocCont(contLen);
×
755
    if (pRsp == NULL) {
×
756
      code = terrno;
×
757
      goto _OVER;
×
758
    }
759

760
    contLen = tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
×
761
    if (contLen < 0) {
×
762
      code = terrno;
×
763
      goto _OVER;
×
764
    }
765

766
    pReq->info.rsp = pRsp;
×
767
    pReq->info.rspLen = contLen;
×
768

769
    code = 0;
×
770
  }
771

772
_OVER:
×
773
  if (code != 0) {
×
774
    mError("failed to get index %s since %s", indexReq.indexFName, tstrerror(code));
×
775
  }
776

777
  TAOS_RETURN(code);
×
778
}
779

780
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
×
781
  STableIndexReq indexReq = {0};
×
782
  SMnode        *pMnode = pReq->info.node;
×
783
  int32_t        code = -1;
×
784
  STableIndexRsp rsp = {0};
×
785
  bool           exist = false;
×
786

787
  TAOS_CHECK_GOTO(tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
788

789
  rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
×
790
  if (NULL == rsp.pIndex) {
×
791
    code = terrno;
×
792
    goto _OVER;
×
793
  }
794

795
  code = mndGetTableSma(pMnode, indexReq.tbFName, &rsp, &exist);
×
796
  if (code) {
×
797
    goto _OVER;
×
798
  }
799

800
  if (!exist) {
×
801
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
802
  } else {
803
    int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp);
×
804
    void   *pRsp = rpcMallocCont(contLen);
×
805
    if (pRsp == NULL) {
×
806
      code = terrno;
×
807
      goto _OVER;
×
808
    }
809

810
    contLen = tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
×
811
    if (contLen < 0) {
×
812
      code = terrno;
×
813
      goto _OVER;
×
814
    }
815

816
    pReq->info.rsp = pRsp;
×
817
    pReq->info.rspLen = contLen;
×
818

819
    code = 0;
×
820
  }
821

822
_OVER:
×
823
  if (code != 0) {
×
824
    mError("failed to get table index %s since %s", indexReq.tbFName, tstrerror(code));
×
825
  }
826

827
  tFreeSerializeSTableIndexRsp(&rsp);
×
828
  TAOS_RETURN(code);
×
829
}
830

831
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
37✔
832
  SMnode  *pMnode = pReq->info.node;
37✔
833
  SSdb    *pSdb = pMnode->pSdb;
37✔
834
  int32_t  numOfRows = 0;
37✔
835
  SSmaObj *pSma = NULL;
37✔
836
  int32_t  cols = 0;
37✔
837
  int32_t  code = 0;
37✔
838

839
  SDbObj *pDb = NULL;
37✔
840
  if (strlen(pShow->db) > 0) {
37✔
841
    pDb = mndAcquireDb(pMnode, pShow->db);
25✔
842
    if (pDb == NULL) return 0;
25!
843
  }
844
  SSmaAndTagIter *pIter = pShow->pIter;
37✔
845
  while (numOfRows < rows) {
37!
846
    pIter->pSmaIter = sdbFetch(pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
37✔
847
    if (pIter->pSmaIter == NULL) break;
37!
848

849
    if (NULL != pDb && pSma->dbUid != pDb->uid) {
×
850
      sdbRelease(pSdb, pSma);
×
851
      continue;
×
852
    }
853

854
    cols = 0;
×
855

856
    SName smaName = {0};
×
857
    SName stbName = {0};
×
858
    char  n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
859
    char  n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
860
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
861
    char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
862
    if (TSDB_CODE_SUCCESS == code) {
×
863
      STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
×
864
      STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db));
×
865
      code = tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
866
    }
867
    SColumnInfoData *pColInfo = NULL;
×
868
    if (TSDB_CODE_SUCCESS == code) {
×
869
      STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
×
870

871
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
872
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
×
873
    }
874
    if (TSDB_CODE_SUCCESS == code) {
×
875
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
876
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
×
877
    }
878
    if (TSDB_CODE_SUCCESS == code) {
×
879
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
880
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
×
881
    }
882
    if (TSDB_CODE_SUCCESS == code) {
×
883
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
884
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
×
885
    }
886
    if (TSDB_CODE_SUCCESS == code) {
×
887
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
888
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
×
889
    }
890

891
    char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
892
    STR_TO_VARSTR(col, (char *)"");
×
893

894
    if (TSDB_CODE_SUCCESS == code) {
×
895
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
896
      code = colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
×
897
    }
898

899
    if (TSDB_CODE_SUCCESS == code) {
×
900
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
901

902
      char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
903
      STR_TO_VARSTR(tag, (char *)"sma_index");
×
904
      code = colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
×
905
    }
906

907
    numOfRows++;
×
908
    sdbRelease(pSdb, pSma);
×
909
    if (TSDB_CODE_SUCCESS != code) {
×
910
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
911
      numOfRows = -1;
×
912
      break;
×
913
    }
914
  }
915

916
  mndReleaseDb(pMnode, pDb);
37✔
917
  pShow->numOfRows += numOfRows;
37✔
918
  return numOfRows;
37✔
919
}
920

921
// sma and tag index comm func
922
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
10✔
923
  int ret = mndProcessDropSmaReq(pReq);
10✔
924
  if (ret == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST || ret == TSDB_CODE_MND_SMA_NOT_EXIST) {
10!
925
    terrno = 0;
10✔
926
    ret = mndProcessDropTagIdxReq(pReq);
10✔
927
  }
928
  return ret;
10✔
929
}
930

931
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
37✔
932
  if (pShow->pIter == NULL) {
37!
933
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
37!
934
  }
935
  if (!pShow->pIter) {
37!
936
    return terrno;
×
937
  }
938
  int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
37✔
939
  if (read < rows) {
37!
940
    read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read);
37✔
941
  }
942
  // no more to read
943
  if (read < rows) {
37!
944
    taosMemoryFree(pShow->pIter);
37!
945
    pShow->pIter = NULL;
37✔
946
  }
947
  return read;
37✔
948
}
949
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
×
950
  SSmaAndTagIter *p = pIter;
×
951
  if (p != NULL) {
×
952
    SSdb *pSdb = pMnode->pSdb;
×
953
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
954
    sdbCancelFetchByType(pSdb, p->pIdxIter, SDB_IDX);
×
955
  }
956
  taosMemoryFree(p);
×
957
}
×
958

959
static void initSMAObj(SCreateTSMACxt *pCxt) {
×
960
  memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
×
961
  memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
×
962
  memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
×
963
  if (pCxt->pBaseSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pBaseSma->name, TSDB_TABLE_FNAME_LEN);
×
964
  pCxt->pSma->createdTime = taosGetTimestampMs();
×
965
  pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
×
966

967
  memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
968
  pCxt->pSma->dstTbUid = 0;  // not used
×
969
  pCxt->pSma->stbUid = pCxt->pSrcStb ? pCxt->pSrcStb->uid : pCxt->pCreateSmaReq->normSourceTbUid;
×
970
  pCxt->pSma->dbUid = pCxt->pDb->uid;
×
971
  pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
×
972
  pCxt->pSma->intervalUnit = pCxt->pCreateSmaReq->intervalUnit;
×
973
  //  pCxt->pSma->timezone = taosGetLocalTimezoneOffset();
974
  pCxt->pSma->version = 1;
×
975

976
  pCxt->pSma->exprLen = pCxt->pCreateSmaReq->exprLen;
×
977
  pCxt->pSma->sqlLen = pCxt->pCreateSmaReq->sqlLen;
×
978
  pCxt->pSma->astLen = pCxt->pCreateSmaReq->astLen;
×
979
  pCxt->pSma->expr = pCxt->pCreateSmaReq->expr;
×
980
  pCxt->pSma->sql = pCxt->pCreateSmaReq->sql;
×
981
  pCxt->pSma->ast = pCxt->pCreateSmaReq->ast;
×
982
}
×
983

984
static int32_t mndSetUpdateDbTsmaVersionPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
×
985
  int32_t  code = 0;
×
986
  SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
×
987
  if (pRedoRaw == NULL) {
×
988
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
989
    if (terrno != 0) code = terrno;
×
990
    TAOS_RETURN(code);
×
991
  }
992
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
×
993
    sdbFreeRaw(pRedoRaw);
×
994
    TAOS_RETURN(code);
×
995
  }
996

997
  TAOS_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
×
998
}
999

1000
static int32_t mndSetUpdateDbTsmaVersionCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
×
1001
  int32_t  code = 0;
×
1002
  SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
×
1003
  if (pCommitRaw == NULL) {
×
1004
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1005
    if (terrno != 0) code = terrno;
×
1006
    TAOS_RETURN(code);
×
1007
  }
1008
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
1009
    sdbFreeRaw(pCommitRaw);
×
1010
    TAOS_RETURN(code);
×
1011
  }
1012

1013
  TAOS_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
1014
}
1015

1016
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt *pCxt) {
×
1017
  int32_t      code = -1;
×
1018
  STransAction createStreamRedoAction = {0};
×
1019
  STransAction createStreamUndoAction = {0};
×
1020
  STransAction dropStbUndoAction = {0};
×
1021
  SMDropStbReq dropStbReq = {0};
×
1022
  STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
×
1023
  if (!pTrans) {
×
1024
    code = terrno;
×
1025
    goto _OVER;
×
1026
  }
1027
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
×
1028
  TAOS_CHECK_GOTO(mndTransCheckConflict(pCxt->pMnode, pTrans), NULL, _OVER);
×
1029

1030
  mndTransSetSerial(pTrans);
×
1031
  mInfo("trans:%d, used to create tsma:%s", pTrans->id, pCxt->pCreateSmaReq->name);
×
1032

1033
  mndGetMnodeEpSet(pCxt->pMnode, &createStreamRedoAction.epSet);
×
1034
  createStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
1035
  createStreamRedoAction.msgType = TDMT_MND_CREATE_STREAM;
×
1036
  createStreamRedoAction.contLen = pCxt->pCreateSmaReq->streamReqLen;
×
1037
  createStreamRedoAction.pCont = taosMemoryCalloc(1, createStreamRedoAction.contLen);
×
1038
  memcpy(createStreamRedoAction.pCont, pCxt->pCreateSmaReq->createStreamReq, createStreamRedoAction.contLen);
×
1039

1040
  createStreamUndoAction.epSet = createStreamRedoAction.epSet;
×
1041
  createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
1042
  createStreamUndoAction.msgType = TDMT_MND_DROP_STREAM;
×
1043
  createStreamUndoAction.contLen = pCxt->pCreateSmaReq->dropStreamReqLen;
×
1044
  createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
×
1045
  memcpy(createStreamUndoAction.pCont, pCxt->pCreateSmaReq->dropStreamReq, createStreamUndoAction.contLen);
×
1046

1047
  dropStbReq.igNotExists = true;
×
1048
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
1049
  dropStbUndoAction.epSet = createStreamRedoAction.epSet;
×
1050
  dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
×
1051
  dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
1052
  dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
×
1053
  dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
×
1054
  dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
×
1055
  if (!dropStbUndoAction.pCont) {
×
1056
    code = terrno;
×
1057
    goto _OVER;
×
1058
  }
1059
  if (dropStbUndoAction.contLen !=
×
1060
      tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
×
1061
    mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
×
1062
    code = TSDB_CODE_INVALID_MSG;
×
1063
    goto _OVER;
×
1064
  }
1065

1066
  SDbObj newDb = {0};
×
1067
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
×
1068
  newDb.tsmaVersion++;
×
1069
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1070
  TAOS_CHECK_GOTO(mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1071
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1072
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &createStreamRedoAction), NULL, _OVER);
×
1073
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &createStreamUndoAction), NULL, _OVER);
×
1074
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &dropStbUndoAction), NULL, _OVER);
×
1075
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1076
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1077
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
×
1078

1079
  code = TSDB_CODE_SUCCESS;
×
1080

1081
_OVER:
×
1082
  mndTransDrop(pTrans);
×
1083
  TAOS_RETURN(code);
×
1084
}
1085

1086
static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
×
1087
  int32_t            code = 0;
×
1088
  SSmaObj            sma = {0};
×
1089

1090
  pCxt->pSma = &sma;
×
1091
  initSMAObj(pCxt);
×
1092

1093
  SNodeList *pProjects = NULL;
×
1094
  code = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects);
×
1095
  if (TSDB_CODE_SUCCESS != code) {
×
1096
    goto _OVER;
×
1097
  }
1098
  pCxt->pProjects = pProjects;
×
1099

1100

1101
  if (TSDB_CODE_SUCCESS != (code = mndCreateTSMATxnPrepare(pCxt))) {
×
1102
    goto _OVER;
×
1103
  } else {
1104
    mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 " dstTb:%s dstVg:%d", pCxt->pCreateSmaReq->name, sma.uid,
×
1105
          sma.stbUid, sma.dstTbName, sma.dstVgId);
1106
    code = 0;
×
1107
  }
1108

1109
_OVER:
×
1110
  if (pProjects) nodesDestroyList(pProjects);
×
1111
  pCxt->pProjects = NULL;
×
1112
  TAOS_RETURN(code);
×
1113
}
1114

1115
static int32_t mndTSMAGenerateOutputName(const char *tsmaName, char *streamName, char *targetStbName) {
×
1116
  SName   smaName;
1117
  int32_t code = tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1118
  if (TSDB_CODE_SUCCESS != code) {
×
1119
    return code;
×
1120
  }
1121
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
×
1122
  snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s" TSMA_RES_STB_POSTFIX, tsmaName);
×
1123
  return TSDB_CODE_SUCCESS;
×
1124
}
1125

1126
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq) {
×
1127
#ifdef WINDOWS
1128
  TAOS_RETURN(TSDB_CODE_MND_INVALID_PLATFORM);
1129
#endif
1130
  SMnode        *pMnode = pReq->info.node;
×
1131
  int32_t        code = -1;
×
1132
  SDbObj        *pDb = NULL;
×
1133
  SStbObj       *pStb = NULL;
×
1134
  SSmaObj       *pSma = NULL;
×
1135
  SSmaObj       *pBaseTsma = NULL;
×
1136
  SStreamObj    *pStream = NULL;
×
1137
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
×
1138
  SMCreateSmaReq createReq = {0};
×
1139

1140
  if (sdbGetSize(pMnode->pSdb, SDB_SMA) >= tsMaxTsmaNum) {
×
1141
    code = TSDB_CODE_MND_MAX_TSMA_NUM_EXCEEDED;
×
1142
    goto _OVER;
×
1143
  }
1144

1145
  if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
×
1146
    code = TSDB_CODE_INVALID_MSG;
×
1147
    goto _OVER;
×
1148
  }
1149

1150
  mInfo("start to create tsma: %s", createReq.name);
×
1151
  if ((code = mndCheckCreateSmaReq(&createReq)) != 0) goto _OVER;
×
1152

1153
  if (createReq.normSourceTbUid == 0) {
×
1154
    pStb = mndAcquireStb(pMnode, createReq.stb);
×
1155
    if (!pStb && !createReq.recursiveTsma) {
×
1156
      mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
1157
      code = TSDB_CODE_MND_STB_NOT_EXIST;
×
1158
      goto _OVER;
×
1159
    }
1160
  }
1161

1162
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1163
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
×
1164
  code = mndTSMAGenerateOutputName(createReq.name, streamName, streamTargetStbFullName);
×
1165
  if (TSDB_CODE_SUCCESS != code) {
×
1166
    mInfo("tsma:%s, faield to generate name", createReq.name);
×
1167
    goto _OVER;
×
1168
  }
1169

1170
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.name);
×
1171
  if (pSma && createReq.igExists) {
×
1172
    mInfo("tsma:%s, already exists in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
1173
    code = 0;
×
1174
    goto _OVER;
×
1175
  }
1176

1177
  if (pSma) {
×
1178
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
1179
    goto _OVER;
×
1180
  }
1181

1182
  SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName);
×
1183
  if (pTargetStb) {
×
1184
    code = TSDB_CODE_TDB_STB_ALREADY_EXIST;
×
1185
    mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name,
×
1186
           streamTargetStbFullName);
1187
    goto _OVER;
×
1188
  }
1189

1190
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
1191
  if (pStream != NULL || code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
×
1192
    mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
×
1193
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
1194
    goto _OVER;
×
1195
  }
1196

1197
  SName name = {0};
×
1198
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1199
  if (TSDB_CODE_SUCCESS != code) {
×
1200
    goto _OVER;
×
1201
  }
1202
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
1203
  (void)tNameGetFullDbName(&name, db);
×
1204

1205
  pDb = mndAcquireDb(pMnode, db);
×
1206
  if (pDb == NULL) {
×
1207
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1208
    goto _OVER;
×
1209
  }
1210

1211
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
1212

1213
  if (createReq.recursiveTsma) {
×
1214
    pBaseTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName);
×
1215
    if (!pBaseTsma) {
×
1216
      mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName);
×
1217
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1218
      goto _OVER;
×
1219
    }
1220
    if (!pStb) {
×
1221
      createReq.normSourceTbUid = pBaseTsma->stbUid;
×
1222
    }
1223
  }
1224

1225
  SCreateTSMACxt cxt = {
×
1226
      .pMnode = pMnode,
1227
      .pCreateSmaReq = &createReq,
1228
      .streamName = streamName,
1229
      .targetStbFullName = streamTargetStbFullName,
1230
      .pDb = pDb,
1231
      .pRpcReq = pReq,
1232
      .pSma = NULL,
1233
      .pBaseSma = pBaseTsma,
1234
      .pSrcStb = pStb,
1235
  };
1236

1237
  code = mndCreateTSMA(&cxt);
×
1238
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1239

1240
_OVER:
×
1241
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1242
    mError("tsma:%s, failed to create since %s", createReq.name, tstrerror(code));
×
1243
  }
1244

1245
  if (pStb) mndReleaseStb(pMnode, pStb);
×
1246
  if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
1247
  mndReleaseSma(pMnode, pSma);
×
1248
  mndReleaseStream(pMnode, pStream);
×
1249
  mndReleaseDb(pMnode, pDb);
×
1250
  tFreeSMCreateSmaReq(&createReq);
×
1251

1252
  TAOS_RETURN(code);
×
1253
}
1254

1255
static int32_t mndDropTSMA(SCreateTSMACxt *pCxt) {
×
1256
  int32_t      code = -1;
×
1257
  STransAction dropStreamRedoAction = {0};
×
1258
  STrans      *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "drop-tsma");
×
1259
  if (!pTrans) {
×
1260
    code = terrno;
×
1261
    goto _OVER;
×
1262
  }
1263
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
×
1264
  if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
×
1265
  mndTransSetSerial(pTrans);
×
1266
  mndGetMnodeEpSet(pCxt->pMnode, &dropStreamRedoAction.epSet);
×
1267
  dropStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
1268
  dropStreamRedoAction.msgType = TDMT_MND_DROP_STREAM;
×
1269
  dropStreamRedoAction.contLen = pCxt->pDropSmaReq->dropStreamReqLen;
×
1270
  dropStreamRedoAction.pCont = taosMemoryCalloc(1, dropStreamRedoAction.contLen);
×
1271
  memcpy(dropStreamRedoAction.pCont, pCxt->pDropSmaReq->dropStreamReq, dropStreamRedoAction.contLen);
×
1272

1273
  // output stable is not dropped when dropping stream, dropping it when dropping tsma
1274
  SMDropStbReq dropStbReq = {0};
×
1275
  dropStbReq.igNotExists = false;
×
1276
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
1277
  dropStbReq.sql = "drop";
×
1278
  dropStbReq.sqlLen = 5;
×
1279

1280
  STransAction dropStbRedoAction = {0};
×
1281
  mndGetMnodeEpSet(pCxt->pMnode, &dropStbRedoAction.epSet);
×
1282
  dropStbRedoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
×
1283
  dropStbRedoAction.msgType = TDMT_MND_STB_DROP;
×
1284
  dropStbRedoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
×
1285
  dropStbRedoAction.pCont = taosMemoryCalloc(1, dropStbRedoAction.contLen);
×
1286
  if (!dropStbRedoAction.pCont) {
×
1287
    code = terrno;
×
1288
    goto _OVER;
×
1289
  }
1290
  if (dropStbRedoAction.contLen !=
×
1291
      tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
×
1292
    mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name);
×
1293
    code = TSDB_CODE_INVALID_MSG;
×
1294
    goto _OVER;
×
1295
  }
1296

1297
  SDbObj newDb = {0};
×
1298
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
×
1299
  newDb.tsmaVersion++;
×
1300
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1301
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1302
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStreamRedoAction), NULL, _OVER);
×
1303
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStbRedoAction), NULL, _OVER);
×
1304
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1305
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1306
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
×
1307
  code = TSDB_CODE_SUCCESS;
×
1308
_OVER:
×
1309
  mndTransDrop(pTrans);
×
1310
  TAOS_RETURN(code);
×
1311
}
1312

1313
static bool hasRecursiveTsmasBasedOnMe(SMnode *pMnode, const SSmaObj *pSma) {
×
1314
  SSmaObj *pSmaObj = NULL;
×
1315
  void    *pIter = NULL;
×
1316
  while (1) {
1317
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj);
×
1318
    if (pIter == NULL) break;
×
1319
    if (0 == strncmp(pSmaObj->baseSmaName, pSma->name, TSDB_TABLE_FNAME_LEN)) {
×
1320
      sdbRelease(pMnode->pSdb, pSmaObj);
×
1321
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1322
      return true;
×
1323
    }
1324
    sdbRelease(pMnode->pSdb, pSmaObj);
×
1325
  }
1326
  return false;
×
1327
}
1328

1329
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq) {
×
1330
  int32_t      code = -1;
×
1331
  SMDropSmaReq dropReq = {0};
×
1332
  SSmaObj     *pSma = NULL;
×
1333
  SDbObj      *pDb = NULL;
×
1334
  SMnode      *pMnode = pReq->info.node;
×
1335
  SStbObj     *pStb = NULL;
×
1336
  if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) {
×
1337
    code = TSDB_CODE_INVALID_MSG;
×
1338
    goto _OVER;
×
1339
  }
1340

1341
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1342
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
×
1343
  code = mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
×
1344
  if (TSDB_CODE_SUCCESS != code) {
×
1345
    goto _OVER;
×
1346
  }
1347

1348
  pStb = mndAcquireStb(pMnode, streamTargetStbFullName);
×
1349

1350
  pSma = mndAcquireSma(pMnode, dropReq.name);
×
1351
  if (!pSma && dropReq.igNotExists) {
×
1352
    code = 0;
×
1353
    goto _OVER;
×
1354
  }
1355
  if (!pSma) {
×
1356
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1357
    goto _OVER;
×
1358
  }
1359
  SName name = {0};
×
1360
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1361
  if (TSDB_CODE_SUCCESS != code) {
×
1362
    goto _OVER;
×
1363
  }
1364
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
1365
  (void)tNameGetFullDbName(&name, db);
×
1366

1367
  pDb = mndAcquireDb(pMnode, db);
×
1368
  if (!pDb) {
×
1369
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
1370
    goto _OVER;
×
1371
  }
1372

1373
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
×
1374
    goto _OVER;
×
1375
  }
1376

1377
  if (hasRecursiveTsmasBasedOnMe(pMnode, pSma)) {
×
1378
    code = TSDB_CODE_MND_INVALID_DROP_TSMA;
×
1379
    goto _OVER;
×
1380
  }
1381

1382
  SCreateTSMACxt cxt = {
×
1383
      .pDb = pDb,
1384
      .pMnode = pMnode,
1385
      .pRpcReq = pReq,
1386
      .pSma = pSma,
1387
      .streamName = streamName,
1388
      .targetStbFullName = streamTargetStbFullName,
1389
      .pDropSmaReq = &dropReq,
1390
  };
1391

1392
  code = mndDropTSMA(&cxt);
×
1393

1394
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1395
_OVER:
×
1396

1397
  tFreeSMDropSmaReq(&dropReq);
×
1398
  mndReleaseStb(pMnode, pStb);
×
1399
  mndReleaseSma(pMnode, pSma);
×
1400
  mndReleaseDb(pMnode, pDb);
×
1401
  TAOS_RETURN(code);
×
1402
}
1403

1404
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1405
  SDbObj          *pDb = NULL;
×
1406
  int32_t          numOfRows = 0;
×
1407
  SSmaObj         *pSma = NULL;
×
1408
  SMnode          *pMnode = pReq->info.node;
×
1409
  int32_t          code = 0;
×
1410
  SColumnInfoData *pColInfo;
1411
  if (pShow->pIter == NULL) {
×
1412
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
×
1413
  }
1414
  if (!pShow->pIter) {
×
1415
    return terrno;
×
1416
  }
1417
  if (pShow->db[0]) {
×
1418
    pDb = mndAcquireDb(pMnode, pShow->db);
×
1419
  }
1420
  SSmaAndTagIter *pIter = pShow->pIter;
×
1421
  while (numOfRows < rows) {
×
1422
    pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
×
1423
    if (pIter->pSmaIter == NULL) break;
×
1424
    SDbObj *pSrcDb = mndAcquireDb(pMnode, pSma->db);
×
1425

1426
    if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) {
×
1427
      sdbRelease(pMnode->pSdb, pSma);
×
1428
      if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
1429
      continue;
×
1430
    }
1431

1432
    int32_t cols = 0;
×
1433
    SName   n = {0};
×
1434

1435
    code = tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1436
    char smaName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1437
    if (TSDB_CODE_SUCCESS == code) {
×
1438
      STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n));
×
1439
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1440
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
1441
    }
1442

1443
    char db[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1444
    if (TSDB_CODE_SUCCESS == code) {
×
1445
      STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
×
1446
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1447
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
1448
    }
1449

1450
    if (TSDB_CODE_SUCCESS == code) {
×
1451
      code = tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1452
    }
1453
    char srcTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1454
    if (TSDB_CODE_SUCCESS == code) {
×
1455
      STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
×
1456
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1457
      code = colDataSetVal(pColInfo, numOfRows, (const char *)srcTb, false);
×
1458
    }
1459

1460
    if (TSDB_CODE_SUCCESS == code) {
×
1461
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1462
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
1463
    }
1464

1465
    if (TSDB_CODE_SUCCESS == code) {
×
1466
      code = tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1467
    }
1468

1469
    if (TSDB_CODE_SUCCESS == code) {
×
1470
      char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1471
      STR_TO_VARSTR(targetTb, (char *)tNameGetTableName(&n));
×
1472
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1473
      code = colDataSetVal(pColInfo, numOfRows, (const char *)targetTb, false);
×
1474
    }
1475

1476
    if (TSDB_CODE_SUCCESS == code) {
×
1477
      // stream name
1478
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1479
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
1480
    }
1481

1482
    if (TSDB_CODE_SUCCESS == code) {
×
1483
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1484
      code = colDataSetVal(pColInfo, numOfRows, (const char *)(&pSma->createdTime), false);
×
1485
    }
1486

1487
    // interval
1488
    char    interval[64 + VARSTR_HEADER_SIZE] = {0};
×
1489
    int32_t len = 0;
×
1490
    if (TSDB_CODE_SUCCESS == code) {
×
1491
      if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
×
1492
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
×
1493
                        getPrecisionUnit(pSrcDb->cfg.precision));
×
1494
      } else {
1495
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
×
1496
      }
1497
      varDataSetLen(interval, len);
×
1498
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1499
      code = colDataSetVal(pColInfo, numOfRows, interval, false);
×
1500
    }
1501

1502
    char buf[TSDB_MAX_SAVED_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
1503
    if (TSDB_CODE_SUCCESS == code) {
×
1504
      // create sql
1505
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1506
      len = tsnprintf(buf + VARSTR_HEADER_SIZE, TSDB_MAX_SAVED_SQL_LEN, "%s", pSma->sql);
×
1507
      varDataSetLen(buf, TMIN(len, TSDB_MAX_SAVED_SQL_LEN));
×
1508
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
1509
    }
1510

1511
    // func list
1512
    len = 0;
×
1513
    SNode *pNode = NULL, *pFunc = NULL;
×
1514
    if (TSDB_CODE_SUCCESS == code) {
×
1515
      code = nodesStringToNode(pSma->ast, &pNode);
×
1516
    }
1517
    if (TSDB_CODE_SUCCESS == code) {
×
1518
      char *start = buf + VARSTR_HEADER_SIZE;
×
1519
      FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) {
×
1520
        if (nodeType(pFunc) == QUERY_NODE_FUNCTION) {
×
1521
          SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
×
1522
          if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
1523
          len += tsnprintf(start, TSDB_MAX_SAVED_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "",
×
1524
                           ((SExprNode *)pFunc)->userAlias);
×
1525
          if (len >= TSDB_MAX_SAVED_SQL_LEN) {
×
1526
            len = TSDB_MAX_SAVED_SQL_LEN;
×
1527
            break;
×
1528
          }
1529
          start = buf + VARSTR_HEADER_SIZE + len;
×
1530
        }
1531
      }
1532
      nodesDestroyNode(pNode);
×
1533
    }
1534

1535
    if (TSDB_CODE_SUCCESS == code) {
×
1536
      varDataSetLen(buf, len);
×
1537
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1538
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
1539
    }
1540

1541
    numOfRows++;
×
1542
    mndReleaseSma(pMnode, pSma);
×
1543
    mndReleaseDb(pMnode, pSrcDb);
×
1544
    if (TSDB_CODE_SUCCESS != code) {
×
1545
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
1546
      numOfRows = code;
×
1547
      break;
×
1548
    }
1549
  }
1550
  mndReleaseDb(pMnode, pDb);
×
1551
  pShow->numOfRows += numOfRows;
×
1552
  if (numOfRows < rows) {
×
1553
    taosMemoryFree(pShow->pIter);
×
1554
    pShow->pIter = NULL;
×
1555
  }
1556
  return numOfRows;
×
1557
}
1558

1559
static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
×
1560
  SSmaAndTagIter *p = pIter;
×
1561
  if (p != NULL) {
×
1562
    SSdb *pSdb = pMnode->pSdb;
×
1563
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
1564
  }
1565
  taosMemoryFree(p);
×
1566
}
×
1567

1568
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj *pSma, const SStbObj *pDestStb, STableTSMAInfo *pInfo,
×
1569
                               const SSmaObj *pBaseTsma) {
1570
  int32_t code = 0;
×
1571
  pInfo->interval = pSma->interval;
×
1572
  pInfo->unit = pSma->intervalUnit;
×
1573
  pInfo->tsmaId = pSma->uid;
×
1574
  pInfo->version = pSma->version;
×
1575
  pInfo->tsmaId = pSma->uid;
×
1576
  pInfo->destTbUid = pDestStb->uid;
×
1577
  SName sName = {0};
×
1578
  code = tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1579
  if (TSDB_CODE_SUCCESS != code) {
×
1580
    return code;
×
1581
  }
1582
  tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN);
×
1583
  tstrncpy(pInfo->targetDbFName, pSma->db, TSDB_DB_FNAME_LEN);
×
1584
  code = tNameFromString(&sName, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1585
  if (TSDB_CODE_SUCCESS != code) {
×
1586
    return code;
×
1587
  }
1588
  tstrncpy(pInfo->targetTb, sName.tname, TSDB_TABLE_NAME_LEN);
×
1589
  tstrncpy(pInfo->dbFName, pSma->db, TSDB_DB_FNAME_LEN);
×
1590
  code = tNameFromString(&sName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1591
  if (TSDB_CODE_SUCCESS != code) {
×
1592
    return code;
×
1593
  }
1594
  tstrncpy(pInfo->tb, sName.tname, TSDB_TABLE_NAME_LEN);
×
1595
  pInfo->pFuncs = taosArrayInit(8, sizeof(STableTSMAFuncInfo));
×
1596
  if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY;
×
1597

1598
  SNode *pNode, *pFunc;
1599
  if (TSDB_CODE_SUCCESS != nodesStringToNode(pBaseTsma ? pBaseTsma->ast : pSma->ast, &pNode)) {
×
1600
    taosArrayDestroy(pInfo->pFuncs);
×
1601
    pInfo->pFuncs = NULL;
×
1602
    return TSDB_CODE_TSMA_INVALID_STAT;
×
1603
  }
1604
  if (pNode) {
×
1605
    SSelectStmt *pSelect = (SSelectStmt *)pNode;
×
1606
    FOREACH(pFunc, pSelect->pProjectionList) {
×
1607
      STableTSMAFuncInfo funcInfo = {0};
×
1608
      SFunctionNode     *pFuncNode = (SFunctionNode *)pFunc;
×
1609
      if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
1610
      funcInfo.funcId = pFuncNode->funcId;
×
1611
      funcInfo.colId = ((SColumnNode *)pFuncNode->pParameterList->pHead->pNode)->colId;
×
1612
      if (!taosArrayPush(pInfo->pFuncs, &funcInfo)) {
×
1613
        code = terrno;
×
1614
        taosArrayDestroy(pInfo->pFuncs);
×
1615
        nodesDestroyNode(pNode);
×
1616
        return code;
×
1617
      }
1618
    }
1619
    nodesDestroyNode(pNode);
×
1620
  }
1621
  pInfo->ast = taosStrdup(pSma->ast);
×
1622
  if (!pInfo->ast) code = terrno;
×
1623

1624
  if (code == TSDB_CODE_SUCCESS && pDestStb->numOfTags > 0) {
×
1625
    pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema));
×
1626
    if (!pInfo->pTags) {
×
1627
      code = terrno;
×
1628
    } else {
1629
      for (int32_t i = 0; i < pDestStb->numOfTags; ++i) {
×
1630
        if (NULL == taosArrayPush(pInfo->pTags, &pDestStb->pTags[i])) {
×
1631
          code = terrno;
×
1632
          break;
×
1633
        }
1634
      }
1635
    }
1636
  }
1637
  if (code == TSDB_CODE_SUCCESS) {
×
1638
    pInfo->pUsedCols = taosArrayInit(pDestStb->numOfColumns - 3, sizeof(SSchema));
×
1639
    if (!pInfo->pUsedCols)
×
1640
      code = terrno;
×
1641
    else {
1642
      // skip _wstart, _wend, _duration
1643
      for (int32_t i = 1; i < pDestStb->numOfColumns - 2; ++i) {
×
1644
        if (NULL == taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i])) {
×
1645
          code = terrno;
×
1646
          break;
×
1647
        }
1648
      }
1649
    }
1650
  }
1651
  TAOS_RETURN(code);
×
1652
}
1653

1654
// @note remember to mndReleaseSma(*ppOut)
1655
static int32_t mndGetDeepestBaseForTsma(SMnode *pMnode, SSmaObj *pSma, SSmaObj **ppOut) {
×
1656
  int32_t  code = 0;
×
1657
  SSmaObj *pRecursiveTsma = NULL;
×
1658
  if (pSma->baseSmaName[0]) {
×
1659
    pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName);
×
1660
    if (!pRecursiveTsma) {
×
1661
      mError("base tsma: %s for tsma: %s not found", pSma->baseSmaName, pSma->name);
×
1662
      return TSDB_CODE_MND_SMA_NOT_EXIST;
×
1663
    }
1664
    while (pRecursiveTsma->baseSmaName[0]) {
×
1665
      SSmaObj *pTmpSma = pRecursiveTsma;
×
1666
      pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
×
1667
      if (!pRecursiveTsma) {
×
1668
        mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name);
×
1669
        mndReleaseSma(pMnode, pTmpSma);
×
1670
        return TSDB_CODE_MND_SMA_NOT_EXIST;
×
1671
      }
1672
      mndReleaseSma(pMnode, pTmpSma);
×
1673
    }
1674
  }
1675
  *ppOut = pRecursiveTsma;
×
1676
  return code;
×
1677
}
1678

1679
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
×
1680
  int32_t  code = -1;
×
1681
  SSmaObj *pSma = NULL;
×
1682
  SSmaObj *pBaseTsma = NULL;
×
1683
  SStbObj *pDstStb = NULL;
×
1684

1685
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName);
×
1686
  if (pSma) {
×
1687
    pDstStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
1688
    if (!pDstStb) {
×
1689
      sdbRelease(pMnode->pSdb, pSma);
×
1690
      return TSDB_CODE_SUCCESS;
×
1691
    }
1692

1693
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
1694
    if (!pTsma) {
×
1695
      code = terrno;
×
1696
      sdbRelease(pMnode->pSdb, pSma);
×
1697
      mndReleaseStb(pMnode, pDstStb);
×
1698
      TAOS_RETURN(code);
×
1699
    }
1700

1701
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
×
1702
    if (code == 0) {
×
1703
      code = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma, pBaseTsma);
×
1704
    }
1705
    mndReleaseStb(pMnode, pDstStb);
×
1706
    sdbRelease(pMnode->pSdb, pSma);
×
1707
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
1708
    if (terrno) {
×
1709
      tFreeAndClearTableTSMAInfo(pTsma);
×
1710
      TAOS_RETURN(code);
×
1711
    }
1712
    if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
×
1713
      code = terrno;
×
1714
      tFreeAndClearTableTSMAInfo(pTsma);
×
1715
    }
1716
    *exist = true;
×
1717
  }
1718
  TAOS_RETURN(code);
×
1719
}
1720

1721
typedef bool (*tsmaFilter)(const SSmaObj *pSma, void *param);
1722

1723
static int32_t mndGetSomeTsmas(SMnode *pMnode, STableTSMAInfoRsp *pRsp, tsmaFilter filtered, void *param, bool *exist) {
2,175✔
1724
  int32_t     code = 0;
2,175✔
1725
  SSmaObj    *pSma = NULL;
2,175✔
1726
  SSmaObj    *pBaseTsma = NULL;
2,175✔
1727
  SSdb       *pSdb = pMnode->pSdb;
2,175✔
1728
  void       *pIter = NULL;
2,175✔
1729
  SStreamObj *pStream = NULL;
2,175✔
1730
  SStbObj    *pStb = NULL;
2,175✔
1731
  bool        shouldRetry = false;
2,175✔
1732

1733
  while (1) {
×
1734
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
2,175✔
1735
    if (pIter == NULL) break;
2,175!
1736

1737
    if (filtered(pSma, param)) {
×
1738
      sdbRelease(pSdb, pSma);
×
1739
      continue;
×
1740
    }
1741

1742
    pStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
1743
    if (!pStb) {
×
1744
      sdbRelease(pSdb, pSma);
×
1745
      shouldRetry = true;
×
1746
      continue;
×
1747
    }
1748

1749
    SName smaName;
1750
    char  streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1751
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1752
    if (TSDB_CODE_SUCCESS != code) {
×
1753
      sdbRelease(pSdb, pSma);
×
1754
      mndReleaseStb(pMnode, pStb);
×
1755
      TAOS_RETURN(code);
×
1756
    }
1757
    snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s.%s", smaName.acctId, smaName.dbname, smaName.tname);
×
1758
    pStream = NULL;
×
1759

1760
    code = mndAcquireStream(pMnode, streamName, &pStream);
×
1761
    if (!pStream) {
×
1762
      shouldRetry = true;
×
1763
      sdbRelease(pSdb, pSma);
×
1764
      mndReleaseStb(pMnode, pStb);
×
1765
      continue;
×
1766
    }
1767
    if (code != 0) {
×
1768
      sdbRelease(pSdb, pSma);
×
1769
      mndReleaseStb(pMnode, pStb);
×
1770
      TAOS_RETURN(code);
×
1771
    }
1772

1773
    int64_t streamId = pStream->pCreate->streamId;
×
1774
    mndReleaseStream(pMnode, pStream);
×
1775

1776
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
1777
    if (!pTsma) {
×
1778
      code = terrno;
×
1779
      mndReleaseStb(pMnode, pStb);
×
1780
      sdbRelease(pSdb, pSma);
×
1781
      sdbCancelFetch(pSdb, pIter);
×
1782
      TAOS_RETURN(code);
×
1783
    }
1784

1785
    pTsma->streamAddr = taosMemoryCalloc(1, sizeof(SStreamTaskAddr));
×
1786
    code = msmGetTriggerTaskAddr(pMnode, streamId, pTsma->streamAddr);
×
1787
    if (code != 0) {
×
1788
      shouldRetry = true;
×
1789
      mndReleaseStb(pMnode, pStb);
×
1790
      sdbRelease(pSdb, pSma);
×
1791
      tFreeAndClearTableTSMAInfo(pTsma);
×
1792
      sdbCancelFetch(pSdb, pIter);
×
1793
      TAOS_RETURN(code);
×
1794
    }
1795
    pTsma->streamUid = streamId;
×
1796

1797
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
×
1798
    if (code == 0) {
×
1799
      code = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma, pBaseTsma);
×
1800
    }
1801
    mndReleaseStb(pMnode, pStb);
×
1802
    sdbRelease(pSdb, pSma);
×
1803
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
1804
    if (terrno) {
×
1805
      tFreeAndClearTableTSMAInfo(pTsma);
×
1806
      sdbCancelFetch(pSdb, pIter);
×
1807
      TAOS_RETURN(code);
×
1808
    }
1809
    if (NULL == taosArrayPush(pRsp->pTsmas, &pTsma)) {
×
1810
      code = terrno;
×
1811
      tFreeAndClearTableTSMAInfo(pTsma);
×
1812
      sdbCancelFetch(pSdb, pIter);
×
1813
      TAOS_RETURN(code);
×
1814
    }
1815
    *exist = true;
×
1816
  }
1817
  if (shouldRetry) {
2,175!
1818
    return TSDB_CODE_NEED_RETRY;
×
1819
  }
1820
  return TSDB_CODE_SUCCESS;
2,175✔
1821
}
1822

1823
static bool tsmaTbFilter(const SSmaObj *pSma, void *param) {
×
1824
  const char *tbFName = param;
×
1825
  return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0;
×
1826
}
1827

1828
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *pRsp, bool *exist) {
6✔
1829
  return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist);
6✔
1830
}
1831

1832
static bool tsmaDbFilter(const SSmaObj *pSma, void *param) {
×
1833
  uint64_t *dbUid = param;
×
1834
  return pSma->dbUid != *dbUid;
×
1835
}
1836

1837
int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist) {
2,169✔
1838
  return mndGetSomeTsmas(pMnode, pRsp, tsmaDbFilter, &dbUid, exist);
2,169✔
1839
}
1840

1841
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
6✔
1842
  STableTSMAInfoRsp rsp = {0};
6✔
1843
  int32_t           code = -1;
6✔
1844
  STableTSMAInfoReq tsmaReq = {0};
6✔
1845
  bool              exist = false;
6✔
1846
  SMnode           *pMnode = pReq->info.node;
6✔
1847

1848
  TAOS_CHECK_GOTO(tDeserializeTableTSMAInfoReq(pReq->pCont, pReq->contLen, &tsmaReq), NULL, _OVER);
6!
1849

1850
  rsp.pTsmas = taosArrayInit(4, POINTER_BYTES);
6✔
1851
  if (NULL == rsp.pTsmas) {
6!
1852
    code = terrno;
×
1853
    goto _OVER;
×
1854
  }
1855

1856
  if (tsmaReq.fetchingWithTsmaName) {
6!
1857
    code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
×
1858
  } else {
1859
    code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
6✔
1860
    if (TSDB_CODE_NEED_RETRY == code) {
6!
1861
      code = TSDB_CODE_SUCCESS;
×
1862
    }
1863
  }
1864
  if (code) {
6!
1865
    goto _OVER;
×
1866
  }
1867

1868
  if (!exist) {
6!
1869
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
6✔
1870
  } else {
1871
    int32_t contLen = tSerializeTableTSMAInfoRsp(NULL, 0, &rsp);
×
1872
    void   *pRsp = rpcMallocCont(contLen);
×
1873
    if (pRsp == NULL) {
×
1874
      code = terrno;
×
1875
      goto _OVER;
×
1876
    }
1877

1878
    int32_t len = tSerializeTableTSMAInfoRsp(pRsp, contLen, &rsp);
×
1879
    if (len < 0) {
×
1880
      code = terrno;
×
1881
      goto _OVER;
×
1882
    }
1883

1884
    pReq->info.rsp = pRsp;
×
1885
    pReq->info.rspLen = contLen;
×
1886

1887
    code = 0;
×
1888
  }
1889

1890
_OVER:
6✔
1891
  tFreeTableTSMAInfoRsp(&rsp);
6✔
1892
  TAOS_RETURN(code);
6✔
1893
}
1894

1895
static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo **ppTsma) {
×
1896
  STableTSMAInfo *pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
1897
  if (!pInfo) {
×
1898
    return terrno;
×
1899
  }
1900
  pInfo->pFuncs = NULL;
×
1901
  pInfo->tsmaId = pTsmaVer->tsmaId;
×
1902
  tstrncpy(pInfo->dbFName, pTsmaVer->dbFName, TSDB_DB_FNAME_LEN);
×
1903
  tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN);
×
1904
  tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN);
×
1905
  pInfo->dbId = pTsmaVer->dbId;
×
1906
  pInfo->ast = taosMemoryCalloc(1, 1);
×
1907
  if (!pInfo->ast) {
×
1908
    taosMemoryFree(pInfo);
×
1909
    return terrno;
×
1910
  }
1911
  *ppTsma = pInfo;
×
1912
  return TSDB_CODE_SUCCESS;
×
1913
}
1914

1915
int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp,
×
1916
                            int32_t *pRspLen) {
1917
  int32_t         code = -1;
×
1918
  STSMAHbRsp      hbRsp = {0};
×
1919
  int32_t         rspLen = 0;
×
1920
  void           *pRsp = NULL;
×
1921
  char            tsmaFName[TSDB_TABLE_FNAME_LEN] = {0};
×
1922
  STableTSMAInfo *pTsmaInfo = NULL;
×
1923

1924
  hbRsp.pTsmas = taosArrayInit(numOfTsmas, POINTER_BYTES);
×
1925
  if (!hbRsp.pTsmas) {
×
1926
    code = terrno;
×
1927
    TAOS_RETURN(code);
×
1928
  }
1929

1930
  for (int32_t i = 0; i < numOfTsmas; ++i) {
×
1931
    STSMAVersion *pTsmaVer = &pTsmaVersions[i];
×
1932
    pTsmaVer->dbId = be64toh(pTsmaVer->dbId);
×
1933
    pTsmaVer->tsmaId = be64toh(pTsmaVer->tsmaId);
×
1934
    pTsmaVer->version = ntohl(pTsmaVer->version);
×
1935

1936
    snprintf(tsmaFName, sizeof(tsmaFName), "%s.%s", pTsmaVer->dbFName, pTsmaVer->name);
×
1937
    SSmaObj *pSma = mndAcquireSma(pMnode, tsmaFName);
×
1938
    if (!pSma) {
×
1939
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
1940
      if (code) goto _OVER;
×
1941
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
1942
        code = terrno;
×
1943
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
1944
        goto _OVER;
×
1945
      }
1946
      continue;
×
1947
    }
1948

1949
    if (pSma->uid != pTsmaVer->tsmaId) {
×
1950
      mDebug("tsma: %s.%" PRIx64 " tsmaId mismatch with current %" PRIx64, tsmaFName, pTsmaVer->tsmaId, pSma->uid);
×
1951
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
1952
      mndReleaseSma(pMnode, pSma);
×
1953
      if (code) goto _OVER;
×
1954
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
1955
        code = terrno;
×
1956
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
1957
        goto _OVER;
×
1958
      }
1959
      continue;
×
1960
    } else if (pSma->version == pTsmaVer->version) {
×
1961
      mndReleaseSma(pMnode, pSma);
×
1962
      continue;
×
1963
    }
1964

1965
    SStbObj *pDestStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
1966
    if (!pDestStb) {
×
1967
      mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName);
×
1968
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
1969
      mndReleaseSma(pMnode, pSma);
×
1970
      if (code) goto _OVER;
×
1971
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
1972
        code = terrno;
×
1973
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
1974
        goto _OVER;
×
1975
      }
1976
      continue;
×
1977
    }
1978

1979
    // dump smaObj into rsp
1980
    STableTSMAInfo *pInfo = NULL;
×
1981
    pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
1982
    if (!pInfo) {
×
1983
      code = terrno;
×
1984
      mndReleaseSma(pMnode, pSma);
×
1985
      mndReleaseStb(pMnode, pDestStb);
×
1986
      goto _OVER;
×
1987
    }
1988

1989
    SSmaObj *pBaseSma = NULL;
×
1990
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma);
×
1991
    if (code == 0) code = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma);
×
1992

1993
    mndReleaseStb(pMnode, pDestStb);
×
1994
    mndReleaseSma(pMnode, pSma);
×
1995
    if (pBaseSma) mndReleaseSma(pMnode, pBaseSma);
×
1996
    if (terrno) {
×
1997
      tFreeAndClearTableTSMAInfo(pInfo);
×
1998
      goto _OVER;
×
1999
    }
2000

2001
    if (NULL == taosArrayPush(hbRsp.pTsmas, pInfo)) {
×
2002
      code = terrno;
×
2003
      tFreeAndClearTableTSMAInfo(pInfo);
×
2004
      goto _OVER;
×
2005
    }
2006
  }
2007

2008
  rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp);
×
2009
  if (rspLen < 0) {
×
2010
    code = terrno;
×
2011
    goto _OVER;
×
2012
  }
2013

2014
  pRsp = taosMemoryMalloc(rspLen);
×
2015
  if (!pRsp) {
×
2016
    code = terrno;
×
2017
    rspLen = 0;
×
2018
    goto _OVER;
×
2019
  }
2020

2021
  rspLen = tSerializeTSMAHbRsp(pRsp, rspLen, &hbRsp);
×
2022
  if (rspLen < 0) {
×
2023
    code = terrno;
×
2024
    goto _OVER;
×
2025
  }
2026
  code = 0;
×
2027
_OVER:
×
2028
  tFreeTSMAHbRsp(&hbRsp);
×
2029
  *ppRsp = pRsp;
×
2030
  *pRspLen = rspLen;
×
2031
  TAOS_RETURN(code);
×
2032
}
2033

2034
int32_t mndDropTSMAsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,951✔
2035
  int32_t code = 0;
1,951✔
2036
  SSdb   *pSdb = pMnode->pSdb;
1,951✔
2037
  void   *pIter = NULL;
1,951✔
2038

2039
  while (1) {
×
2040
    SSmaObj *pSma = NULL;
1,951✔
2041
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
1,951✔
2042
    if (pIter == NULL) break;
1,951!
2043

2044
    if (pSma->dbUid == pDb->uid) {
×
2045
      if ((code = mndSetDropSmaCommitLogs(pMnode, pTrans, pSma)) != 0) {
×
2046
        sdbRelease(pSdb, pSma);
×
2047
        sdbCancelFetch(pSdb, pSma);
×
2048
        TAOS_RETURN(code);
×
2049
      }
2050
    }
2051

2052
    sdbRelease(pSdb, pSma);
×
2053
  }
2054

2055
  TAOS_RETURN(code);
1,951✔
2056
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc