• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4945

30 Jan 2026 06:19AM UTC coverage: 66.87% (+0.02%) from 66.849%
#4945

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1126 of 2018 new or added lines in 72 files covered. (55.8%)

13708 existing lines in 159 files now uncovered.

205277 of 306978 relevant lines covered (66.87%)

126353544.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.01
/source/dnode/mnode/impl/src/mndSma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSma.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndIndex.h"
22
#include "mndIndexComm.h"
23
#include "mndInfoSchema.h"
24
#include "mndMnode.h"
25
#include "mndPrivilege.h"
26
#include "mndShow.h"
27
#include "mndStb.h"
28
#include "mndStream.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "mndVgroup.h"
32
#include "parser.h"
33
#include "tname.h"
34

35
#define TSDB_SMA_VER_NUMBER   1
36
#define TSDB_SMA_RESERVE_SIZE 32
37

38
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma);
39
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
40
static int32_t  mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
41
static int32_t  mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
42
static int32_t  mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
43
static int32_t  mndProcessDropSmaReq(SRpcMsg *pReq);
44
static int32_t  mndProcessGetSmaReq(SRpcMsg *pReq);
45
static int32_t  mndProcessGetTbSmaReq(SRpcMsg *pReq);
46
static int32_t  mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47

48
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq);
49
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq);
50

51
// sma and tag index comm func
52
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
53
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
54
static void    mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
55

56
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
57
static void    mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter);
58
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq);
59

60
typedef struct SCreateTSMACxt {
61
  SMnode        *pMnode;
62
  const SRpcMsg *pRpcReq;
63
  union {
64
    const SMCreateSmaReq *pCreateSmaReq;
65
    const SMDropSmaReq   *pDropSmaReq;
66
  };
67
  SDbObj             *pDb;
68
  SStbObj            *pSrcStb;
69
  SSmaObj            *pSma;
70
  const SSmaObj      *pBaseSma;
71
  SUserObj           *pOperUser;
72
  const char         *streamName;
73
  const char         *targetStbFullName;
74
  SNodeList          *pProjects;
75
} SCreateTSMACxt;
76

77
int32_t mndInitSma(SMnode *pMnode) {
398,738✔
78
  SSdbTable table = {
398,738✔
79
      .sdbType = SDB_SMA,
80
      .keyType = SDB_KEY_BINARY,
81
      .encodeFp = (SdbEncodeFp)mndSmaActionEncode,
82
      .decodeFp = (SdbDecodeFp)mndSmaActionDecode,
83
      .insertFp = (SdbInsertFp)mndSmaActionInsert,
84
      .updateFp = (SdbUpdateFp)mndSmaActionUpdate,
85
      .deleteFp = (SdbDeleteFp)mndSmaActionDelete,
86
  };
87

88
//  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq);
89
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
398,738✔
90
  mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
398,738✔
91
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
398,738✔
92

93
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
398,738✔
94
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
398,738✔
95

96
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq);
398,738✔
97
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM_RSP, mndTransProcessRsp);
398,738✔
98
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndTransProcessRsp);
398,738✔
99
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB_RSP, mndTransProcessRsp);
398,738✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq);
398,738✔
101
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_TSMA, mndProcessGetTbTSMAReq);
398,738✔
102
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TSMA, mndProcessGetTbTSMAReq);
398,738✔
103
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA);
398,738✔
104
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA);
398,738✔
105

106
  return sdbSetTable(pMnode->pSdb, table);
398,738✔
107
}
108

109
void mndCleanupSma(SMnode *pMnode) {}
398,680✔
110

111
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
×
112
  int32_t code = 0;
×
113
  int32_t lino = 0;
×
114
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
115

116
  int32_t size =
×
117
      sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
×
118
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size);
×
119
  if (pRaw == NULL) goto _OVER;
×
120

121
  int32_t dataPos = 0;
×
122
  SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
×
123
  SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
×
124
  SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
×
125
  SDB_SET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
×
126
  SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER)
×
127
  SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER)
×
128
  SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER)
×
129
  SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER)
×
130
  SDB_SET_INT64(pRaw, dataPos, pSma->dstTbUid, _OVER)
×
131
  SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER)
×
132
  SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER)
×
133
  SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER)
×
134
  SDB_SET_INT32(pRaw, dataPos, pSma->dstVgId, _OVER)
×
135
  SDB_SET_INT64(pRaw, dataPos, pSma->interval, _OVER)
×
136
  SDB_SET_INT64(pRaw, dataPos, pSma->offset, _OVER)
×
137
  SDB_SET_INT64(pRaw, dataPos, pSma->sliding, _OVER)
×
138
  SDB_SET_INT32(pRaw, dataPos, pSma->exprLen, _OVER)
×
139
  SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER)
×
140
  SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER)
×
141
  SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER)
×
142
  SDB_SET_INT32(pRaw, dataPos, pSma->version, _OVER)
×
143

144
  if (pSma->exprLen > 0) {
×
145
    SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
×
146
  }
147
  if (pSma->tagsFilterLen > 0) {
×
148
    SDB_SET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
149
  }
150
  if (pSma->sqlLen > 0) {
×
151
    SDB_SET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
×
152
  }
153
  if (pSma->astLen > 0) {
×
154
    SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
×
155
  }
156
  SDB_SET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
×
157
  SDB_SET_BINARY(pRaw, dataPos, pSma->createUser, TSDB_USER_LEN, _OVER)
×
158
  SDB_SET_INT64(pRaw, dataPos, pSma->ownerId, _OVER)
×
159

160
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
×
161
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
×
162

163
  terrno = 0;
×
164

165
_OVER:
×
166
  if (terrno != 0) {
×
167
    mError("sma:%s, failed to encode to raw:%p since %s", pSma->name, pRaw, terrstr());
×
168
    sdbFreeRaw(pRaw);
×
169
    return NULL;
×
170
  }
171

172
  mTrace("sma:%s, encode to raw:%p, row:%p", pSma->name, pRaw, pSma);
×
173
  return pRaw;
×
174
}
175

176
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
×
177
  int32_t code = 0;
×
178
  int32_t lino = 0;
×
179
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
180
  SSdbRow *pRow = NULL;
×
181
  SSmaObj *pSma = NULL;
×
182

183
  int8_t sver = 0;
×
184
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
×
185

186
  if (sver != TSDB_SMA_VER_NUMBER) {
×
187
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
188
    goto _OVER;
×
189
  }
190

191
  pRow = sdbAllocRow(sizeof(SSmaObj));
×
192
  if (pRow == NULL) goto _OVER;
×
193

194
  pSma = sdbGetRowObj(pRow);
×
195
  if (pSma == NULL) goto _OVER;
×
196

197
  int32_t dataPos = 0;
×
198

199
  SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
×
200
  SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
×
201
  SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
×
202
  SDB_GET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
×
203
  SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER)
×
204
  SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER)
×
205
  SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER)
×
206
  SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER)
×
207
  SDB_GET_INT64(pRaw, dataPos, &pSma->dstTbUid, _OVER)
×
208
  SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER)
×
209
  SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER)
×
210
  SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER)
×
211
  SDB_GET_INT32(pRaw, dataPos, &pSma->dstVgId, _OVER)
×
212
  SDB_GET_INT64(pRaw, dataPos, &pSma->interval, _OVER)
×
213
  SDB_GET_INT64(pRaw, dataPos, &pSma->offset, _OVER)
×
214
  SDB_GET_INT64(pRaw, dataPos, &pSma->sliding, _OVER)
×
215
  SDB_GET_INT32(pRaw, dataPos, &pSma->exprLen, _OVER)
×
216
  SDB_GET_INT32(pRaw, dataPos, &pSma->tagsFilterLen, _OVER)
×
217
  SDB_GET_INT32(pRaw, dataPos, &pSma->sqlLen, _OVER)
×
218
  SDB_GET_INT32(pRaw, dataPos, &pSma->astLen, _OVER)
×
219
  SDB_GET_INT32(pRaw, dataPos, &pSma->version, _OVER)
×
220

221
  if (pSma->exprLen > 0) {
×
222
    pSma->expr = taosMemoryCalloc(pSma->exprLen, 1);
×
223
    if (pSma->expr == NULL) goto _OVER;
×
224
    SDB_GET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
×
225
  }
226

227
  if (pSma->tagsFilterLen > 0) {
×
228
    pSma->tagsFilter = taosMemoryCalloc(pSma->tagsFilterLen, 1);
×
229
    if (pSma->tagsFilter == NULL) goto _OVER;
×
230
    SDB_GET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
231
  }
232

233
  if (pSma->sqlLen > 0) {
×
234
    pSma->sql = taosMemoryCalloc(pSma->sqlLen, 1);
×
235
    if (pSma->sql == NULL) goto _OVER;
×
236
    SDB_GET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
×
237
  }
238

239
  if (pSma->astLen > 0) {
×
240
    pSma->ast = taosMemoryCalloc(pSma->astLen, 1);
×
241
    if (pSma->ast == NULL) goto _OVER;
×
242
    SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
×
243
  }
244
  SDB_GET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
×
245
  SDB_GET_BINARY(pRaw, dataPos, pSma->createUser, TSDB_USER_LEN, _OVER)
×
246
  SDB_GET_INT64(pRaw, dataPos, &pSma->ownerId, _OVER)
×
247

248
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
×
249

250
  terrno = 0;
×
251

252
_OVER:
×
253
  if (terrno != 0) {
×
254
    if (pSma != NULL) {
×
255
      mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr());
×
256
      taosMemoryFreeClear(pSma->expr);
×
257
      taosMemoryFreeClear(pSma->tagsFilter);
×
258
      taosMemoryFreeClear(pSma->sql);
×
259
      taosMemoryFreeClear(pSma->ast);
×
260
    }
261
    taosMemoryFreeClear(pRow);
×
262
    return NULL;
×
263
  }
264

265
  mTrace("sma:%s, decode from raw:%p, row:%p", pSma->name, pRaw, pSma);
×
266
  return pRow;
×
267
}
268

269
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma) {
×
270
  mTrace("sma:%s, perform insert action, row:%p", pSma->name, pSma);
×
271
  return 0;
×
272
}
273

274
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSma) {
×
275
  mTrace("sma:%s, perform delete action, row:%p", pSma->name, pSma);
×
276
  taosMemoryFreeClear(pSma->tagsFilter);
×
277
  taosMemoryFreeClear(pSma->expr);
×
278
  taosMemoryFreeClear(pSma->sql);
×
279
  taosMemoryFreeClear(pSma->ast);
×
280
  return 0;
×
281
}
282

283
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew) {
×
284
  mTrace("sma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
×
285
  pOld->ownerId = pNew->ownerId;
×
286
  return 0;
×
287
}
288

289
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName) {
×
290
  SSdb    *pSdb = pMnode->pSdb;
×
291
  SSmaObj *pSma = sdbAcquire(pSdb, SDB_SMA, smaName);
×
292
  if (pSma == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
293
    terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
×
294
  }
295
  return pSma;
×
296
}
297

298
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) {
3,210✔
299
  SSdb *pSdb = pMnode->pSdb;
3,210✔
300
  sdbRelease(pSdb, pSma);
3,210✔
301
}
3,210✔
302

303
static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
304
  int32_t  code = 0;
×
305
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
×
306
  if (pRedoRaw == NULL) {
×
307
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
308
    if (terrno != 0) code = terrno;
×
309
    TAOS_RETURN(code);
×
310
  }
311
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
312
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
313

314
  TAOS_RETURN(code);
×
315
}
316

317
static int32_t mndSetCreateSmaUndoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
318
  int32_t  code = 0;
×
319
  SSdbRaw *pUndoRaw = mndSmaActionEncode(pSma);
×
320
  if (!pUndoRaw) {
×
321
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
322
    if (terrno != 0) code = terrno;
×
323
    TAOS_RETURN(code);
×
324
  }
325
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
326
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
327
  TAOS_RETURN(code);
×
328
}
329

330
static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
331
  int32_t  code = 0;
×
332
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
×
333
  if (pCommitRaw == NULL) {
×
334
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
335
    if (terrno != 0) code = terrno;
×
336
    TAOS_RETURN(code);
×
337
  }
338
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
339
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
340

341
  TAOS_RETURN(code);
×
342
}
343

344
static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
×
345
  int32_t code = 0;
×
346
  SStbObj stbObj = {0};
×
347
  taosRLockLatch(&pStb->lock);
×
348
  memcpy(&stbObj, pStb, sizeof(SStbObj));
×
349
  taosRUnLockLatch(&pStb->lock);
×
350
  stbObj.numOfColumns = 0;
×
351
  stbObj.pColumns = NULL;
×
352
  stbObj.numOfTags = 0;
×
353
  stbObj.pTags = NULL;
×
354
  stbObj.numOfFuncs = 0;
×
355
  stbObj.pFuncs = NULL;
×
356
  stbObj.updateTime = taosGetTimestampMs();
×
357
  stbObj.lock = 0;
×
358
  stbObj.smaVer++;
×
359

360
  SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj);
×
361
  if (pCommitRaw == NULL) {
×
362
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
363
    if (terrno != 0) code = terrno;
×
364
    TAOS_RETURN(code);
×
365
  }
366
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
367
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
368

369
  TAOS_RETURN(code);
×
370
}
371

372
static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
×
373
  int32_t code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
374
  if (pCreate->name[0] == 0) TAOS_RETURN(code);
×
375
  if (pCreate->stb[0] == 0) TAOS_RETURN(code);
×
376
  if (pCreate->igExists < 0 || pCreate->igExists > 1) TAOS_RETURN(code);
×
377
  if (pCreate->intervalUnit < 0) TAOS_RETURN(code);
×
378
  if (pCreate->slidingUnit < 0) TAOS_RETURN(code);
×
379
  if (pCreate->timezone < 0) TAOS_RETURN(code);
×
380
  if (pCreate->interval < 0) TAOS_RETURN(code);
×
381
  if (pCreate->offset < 0) TAOS_RETURN(code);
×
382
  if (pCreate->sliding < 0) TAOS_RETURN(code);
×
383
  if (pCreate->exprLen < 0) TAOS_RETURN(code);
×
384
  if (pCreate->tagsFilterLen < 0) TAOS_RETURN(code);
×
385
  if (pCreate->sqlLen < 0) TAOS_RETURN(code);
×
386
  if (pCreate->astLen < 0) TAOS_RETURN(code);
×
387
  if (pCreate->exprLen != 0 && strlen(pCreate->expr) + 1 != pCreate->exprLen) TAOS_RETURN(code);
×
388
  if (pCreate->tagsFilterLen != 0 && strlen(pCreate->tagsFilter) + 1 != pCreate->tagsFilterLen) TAOS_RETURN(code);
×
389
  if (pCreate->sqlLen != 0 && strlen(pCreate->sql) + 1 != pCreate->sqlLen) TAOS_RETURN(code);
×
390
  if (pCreate->astLen != 0 && strlen(pCreate->ast) + 1 != pCreate->astLen) TAOS_RETURN(code);
×
391

392
  SName smaName = {0};
×
393
  if (tNameFromString(&smaName, pCreate->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) < 0) return -1;
×
394
  if (*(char *)tNameGetTableName(&smaName) == 0) return -1;
×
395

396
  code = 0;
×
397
  TAOS_RETURN(code);
×
398
}
399

400
static int32_t mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
×
401
  SName   n;
×
402
  int32_t code = tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
403
  if (TSDB_CODE_SUCCESS != code) {
×
404
    return code;
×
405
  }
406
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", n.acctId, n.tname);
×
407
  return TSDB_CODE_SUCCESS;
×
408
}
409

410
static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
411
  int32_t  code = 0;
×
412
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
×
413
  if (pRedoRaw == NULL) {
×
414
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
415
    if (terrno != 0) code = terrno;
×
416
    return -1;
×
417
  }
418
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
419
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
420

421
  return 0;
×
422
}
423

424
static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
425
  int32_t  code = 0;
×
426
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
×
427
  if (pCommitRaw == NULL) {
×
428
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
429
    if (terrno != 0) code = terrno;
×
430
    return -1;
×
431
  }
432
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
433
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
434

435
  return 0;
×
436
}
437

438
static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
439
  int32_t  code = 0;
×
440
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
441
  if (pVgRaw == NULL) {
×
442
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
443
    if (terrno != 0) code = terrno;
×
444
    return -1;
×
445
  }
446
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
×
447
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPING));
×
448

449
  return 0;
×
450
}
451

452
static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
453
  int32_t  code = 0;
×
454
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
455
  if (pVgRaw == NULL) {
×
456
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
457
    if (terrno != 0) code = terrno;
×
458
    return -1;
×
459
  }
460
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
×
461
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED));
×
462

463
  return 0;
×
464
}
465

466
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
467
  int32_t    code = 0;
×
468
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
×
469
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
470
  if (pDnode == NULL) {
×
471
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
472
    if (terrno != 0) code = terrno;
×
473
    TAOS_RETURN(code);
×
474
  }
475

476
  STransAction action = {0};
×
477
  action.epSet = mndGetDnodeEpset(pDnode);
×
478
  mndReleaseDnode(pMnode, pDnode);
×
479

480
  int32_t contLen = 0;
×
481
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
482
  if (pReq == NULL) {
×
483
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
484
    if (terrno != 0) code = terrno;
×
485
    TAOS_RETURN(code);
×
486
  }
487

488
  action.pCont = pReq;
×
489
  action.contLen = contLen;
×
490
  action.msgType = TDMT_DND_DROP_VNODE;
×
491
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
×
492

493
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
494
    taosMemoryFree(pReq);
×
495
    TAOS_RETURN(code);
×
496
  }
497

498
  TAOS_RETURN(code);
×
499
}
500

501
static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) {
×
502
  int32_t     code = -1;
×
503
  SVgObj     *pVgroup = NULL;
×
504
  SStbObj    *pStb = NULL;
×
505
  STrans     *pTrans = NULL;
×
506
  SStreamObj *pStream = NULL;
×
507

508
  pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
509
  if (pVgroup == NULL) {
×
510
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
511
    if (terrno != 0) code = terrno;
×
512
    goto _OVER;
×
513
  }
514

515
  pStb = mndAcquireStb(pMnode, pSma->stb);
×
516
  if (pStb == NULL) {
×
517
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
518
    if (terrno != 0) code = terrno;
×
519
    goto _OVER;
×
520
  }
521

522
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-sma");
×
523
  if (pTrans == NULL) {
×
524
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
525
    if (terrno != 0) code = terrno;
×
526
    goto _OVER;
×
527
  }
528

529
  mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
×
530
  mndTransSetDbName(pTrans, pDb->name, NULL);
×
531
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
532

533
  mndTransSetSerial(pTrans);
×
534

535
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
536
  code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
537
  if (TSDB_CODE_SUCCESS != code) {
×
538
    goto _OVER;
×
539
  }
540

541
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
542
  if (pStream == NULL || pStream->pCreate->streamId != pSma->uid || code != 0) {
×
543
    sdbRelease(pMnode->pSdb, pStream);
×
544
    goto _OVER;
×
545
  } else {
546
    // drop stream
547
    if ((code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
548
      mError("stream:%s, failed to drop log since %s", pStream->pCreate->name, tstrerror(code));
×
549
      sdbRelease(pMnode->pSdb, pStream);
×
550
      goto _OVER;
×
551
    }
552
  }
553
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
554
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
555
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
556
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
557
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
×
558
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
559
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
560

561
  code = 0;
×
562

563
_OVER:
×
564
  mndTransDrop(pTrans);
×
565
  mndReleaseStream(pMnode, pStream);
×
566
  mndReleaseVgroup(pMnode, pVgroup);
×
567
  mndReleaseStb(pMnode, pStb);
×
568
  TAOS_RETURN(code);
×
569
}
570

571
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
3,210✔
572
  SMnode      *pMnode = pReq->info.node;
3,210✔
573
  int32_t      code = -1;
3,210✔
574
  SDbObj      *pDb = NULL;
3,210✔
575
  SSmaObj     *pSma = NULL;
3,210✔
576
  SMDropSmaReq dropReq = {0};
3,210✔
577

578
  TAOS_CHECK_GOTO(tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
3,210✔
579

580
  mInfo("sma:%s, start to drop", dropReq.name);
3,210✔
581

582
  SSIdx idx = {0};
3,210✔
583
  if ((code = mndAcquireGlobalIdx(pMnode, dropReq.name, SDB_SMA, &idx)) == 0) {
3,210✔
584
    pSma = idx.pIdx;
1,782✔
585
  } else {
586
    goto _OVER;
1,428✔
587
  }
588
  if (pSma == NULL) {
1,782✔
589
    if (dropReq.igNotExists) {
1,782✔
590
      mInfo("sma:%s, not exist, ignore not exist is set", dropReq.name);
×
591
      code = 0;
×
592
      goto _OVER;
×
593
    } else {
594
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
1,782✔
595
      goto _OVER;
1,782✔
596
    }
597
  }
598

599
  SName name = {0};
×
600
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
601
  if (TSDB_CODE_SUCCESS != code) {
×
602
    goto _OVER;
×
603
  }
604
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
605
  (void)tNameGetFullDbName(&name, db);
×
606

607
  pDb = mndAcquireDb(pMnode, db);
×
608
  if (pDb == NULL) {
×
609
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
610
    goto _OVER;
×
611
  }
612

613
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
614

615
  code = mndDropSma(pMnode, pReq, pDb, pSma);
×
616
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
617

618
_OVER:
3,210✔
619
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
3,210✔
620
    mError("sma:%s, failed to drop since %s", dropReq.name, tstrerror(code));
3,210✔
621
  }
622

623
  mndReleaseSma(pMnode, pSma);
3,210✔
624
  mndReleaseDb(pMnode, pDb);
3,210✔
625
  TAOS_RETURN(code);
3,210✔
626
}
627

628
static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
×
629
  int32_t  code = -1;
×
630
  SSmaObj *pSma = NULL;
×
631

632
  SSIdx idx = {0};
×
633
  if (0 == mndAcquireGlobalIdx(pMnode, indexReq->indexFName, SDB_SMA, &idx)) {
×
634
    pSma = idx.pIdx;
×
635
  } else {
636
    *exist = false;
×
637
    return 0;
×
638
  }
639

640
  if (pSma == NULL) {
×
641
    *exist = false;
×
642
    return 0;
×
643
  }
644

645
  memcpy(rsp->dbFName, pSma->db, sizeof(pSma->db));
×
646
  memcpy(rsp->tblFName, pSma->stb, sizeof(pSma->stb));
×
647
  tstrncpy(rsp->indexType, TSDB_INDEX_TYPE_SMA, TSDB_INDEX_TYPE_LEN);
×
648

649
  SNodeList *pList = NULL;
×
650
  int32_t    extOffset = 0;
×
651
  code = nodesStringToList(pSma->expr, &pList);
×
652
  if (0 == code) {
×
653
    SNode *node = NULL;
×
654
    FOREACH(node, pList) {
×
655
      SFunctionNode *pFunc = (SFunctionNode *)node;
×
656
      extOffset += tsnprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
×
657
                             (extOffset ? "," : ""), pFunc->functionName);
×
658
    }
659

660
    *exist = true;
×
661
  }
662

663
  mndReleaseSma(pMnode, pSma);
×
664
  return code;
×
665
}
666

667
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) {
×
668
  int32_t         code = 0;
×
669
  SSmaObj        *pSma = NULL;
×
670
  SSdb           *pSdb = pMnode->pSdb;
×
671
  void           *pIter = NULL;
×
672
  STableIndexInfo info;
×
673

674
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
×
675
  if (NULL == pStb) {
×
676
    *exist = false;
×
677
    return TSDB_CODE_SUCCESS;
×
678
  }
679

680
  tstrncpy(rsp->dbFName, pStb->db, TSDB_DB_FNAME_LEN);
×
681
  tstrncpy(rsp->tbName, pStb->name + strlen(pStb->db) + 1, TSDB_TABLE_NAME_LEN);
×
682
  rsp->suid = pStb->uid;
×
683
  rsp->version = pStb->smaVer;
×
684
  mndReleaseStb(pMnode, pStb);
×
685

686
  while (1) {
×
687
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
×
688
    if (pIter == NULL) break;
×
689

690
    if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
×
691
      sdbRelease(pSdb, pSma);
×
692
      continue;
×
693
    }
694

695
    info.intervalUnit = pSma->intervalUnit;
×
696
    info.slidingUnit = pSma->slidingUnit;
×
697
    info.interval = pSma->interval;
×
698
    info.offset = pSma->offset;
×
699
    info.sliding = pSma->sliding;
×
700
    info.dstTbUid = pSma->dstTbUid;
×
701
    info.dstVgId = pSma->dstVgId;
×
702

703
    SVgObj *pVg = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
704
    if (pVg == NULL) {
×
705
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
706
      if (terrno != 0) code = terrno;
×
707
      sdbRelease(pSdb, pSma);
×
708
      sdbCancelFetch(pSdb, pIter);
×
709
      return code;
×
710
    }
711
    info.epSet = mndGetVgroupEpset(pMnode, pVg);
×
712

713
    info.expr = taosMemoryMalloc(pSma->exprLen + 1);
×
714
    if (info.expr == NULL) {
×
715
      code = terrno;
×
716
      sdbRelease(pSdb, pSma);
×
717
      sdbCancelFetch(pSdb, pIter);
×
718
      return code;
×
719
    }
720

721
    memcpy(info.expr, pSma->expr, pSma->exprLen);
×
722
    info.expr[pSma->exprLen] = 0;
×
723

724
    if (NULL == taosArrayPush(rsp->pIndex, &info)) {
×
725
      code = terrno;
×
726
      taosMemoryFree(info.expr);
×
727
      sdbRelease(pSdb, pSma);
×
728
      sdbCancelFetch(pSdb, pIter);
×
729
      return code;
×
730
    }
731

732
    rsp->indexSize += sizeof(info) + pSma->exprLen + 1;
×
733
    *exist = true;
×
734

735
    sdbRelease(pSdb, pSma);
×
736
  }
737

738
  TAOS_RETURN(code);
×
739
}
740

741
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
×
742
  SUserIndexReq indexReq = {0};
×
743
  SMnode       *pMnode = pReq->info.node;
×
744
  int32_t       code = -1;
×
745
  SUserIndexRsp rsp = {0};
×
746
  bool          exist = false;
×
747

748
  TAOS_CHECK_GOTO(tDeserializeSUserIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
749

750
  code = mndGetSma(pMnode, &indexReq, &rsp, &exist);
×
751
  if (code) {
×
752
    goto _OVER;
×
753
  }
754

755
  if (!exist) {
×
756
    // TODO GET INDEX FROM FULLTEXT
757
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
758
  } else {
759
    int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
×
760
    void   *pRsp = rpcMallocCont(contLen);
×
761
    if (pRsp == NULL) {
×
762
      code = terrno;
×
763
      goto _OVER;
×
764
    }
765

766
    contLen = tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
×
767
    if (contLen < 0) {
×
768
      code = terrno;
×
769
      goto _OVER;
×
770
    }
771

772
    pReq->info.rsp = pRsp;
×
773
    pReq->info.rspLen = contLen;
×
774

775
    code = 0;
×
776
  }
777

778
_OVER:
×
779
  if (code != 0) {
×
780
    mError("failed to get index %s since %s", indexReq.indexFName, tstrerror(code));
×
781
  }
782

783
  TAOS_RETURN(code);
×
784
}
785

786
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
×
787
  STableIndexReq indexReq = {0};
×
788
  SMnode        *pMnode = pReq->info.node;
×
789
  int32_t        code = -1;
×
790
  STableIndexRsp rsp = {0};
×
791
  bool           exist = false;
×
792

793
  TAOS_CHECK_GOTO(tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
794

795
  rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
×
796
  if (NULL == rsp.pIndex) {
×
797
    code = terrno;
×
798
    goto _OVER;
×
799
  }
800

801
  code = mndGetTableSma(pMnode, indexReq.tbFName, &rsp, &exist);
×
802
  if (code) {
×
803
    goto _OVER;
×
804
  }
805

806
  if (!exist) {
×
807
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
808
  } else {
809
    int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp);
×
810
    void   *pRsp = rpcMallocCont(contLen);
×
811
    if (pRsp == NULL) {
×
812
      code = terrno;
×
813
      goto _OVER;
×
814
    }
815

816
    contLen = tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
×
817
    if (contLen < 0) {
×
818
      code = terrno;
×
819
      goto _OVER;
×
820
    }
821

822
    pReq->info.rsp = pRsp;
×
823
    pReq->info.rspLen = contLen;
×
824

825
    code = 0;
×
826
  }
827

828
_OVER:
×
829
  if (code != 0) {
×
830
    mError("failed to get table index %s since %s", indexReq.tbFName, tstrerror(code));
×
831
  }
832

833
  tFreeSerializeSTableIndexRsp(&rsp);
×
834
  TAOS_RETURN(code);
×
835
}
836

837
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
5,485✔
838
  SMnode   *pMnode = pReq->info.node;
5,485✔
839
  SSdb     *pSdb = pMnode->pSdb;
5,485✔
840
  int32_t   numOfRows = 0;
5,485✔
841
  SSmaObj  *pSma = NULL;
5,485✔
842
  int32_t   cols = 0;
5,485✔
843
  int32_t   code = 0;
5,485✔
844

845
  SDbObj *pDb = NULL;
5,485✔
846
  if (strlen(pShow->db) > 0) {
5,485✔
847
    pDb = mndAcquireDb(pMnode, pShow->db);
3,010✔
848
    if (pDb == NULL) return 0;
3,010✔
849
  }
850

851
  SSmaAndTagIter *pIter = pShow->pIter;
5,485✔
852
  while (numOfRows < rows) {
5,485✔
853
    pIter->pSmaIter = sdbFetch(pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
5,485✔
854
    if (pIter->pSmaIter == NULL) break;
5,485✔
855

856
    if (NULL != pDb && pSma->dbUid != pDb->uid) {
×
857
      sdbRelease(pSdb, pSma);
×
858
      continue;
×
859
    }
860

861
    cols = 0;
×
862

863
    SName smaName = {0};
×
864
    SName stbName = {0};
×
865
    char  n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
866
    char  n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
867
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
868
    char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
869
    if (TSDB_CODE_SUCCESS == code) {
×
870
      STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
×
871
      STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db));
×
872
      code = tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
873
    }
874
    SColumnInfoData *pColInfo = NULL;
×
875
    if (TSDB_CODE_SUCCESS == code) {
×
876
      STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
×
877

878
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
879
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
×
880
    }
881
    if (TSDB_CODE_SUCCESS == code) {
×
882
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
883
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
×
884
    }
885
    if (TSDB_CODE_SUCCESS == code) {
×
886
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
887
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
×
888
    }
889
    if (TSDB_CODE_SUCCESS == code) {
×
890
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
891
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
×
892
    }
893
    if (TSDB_CODE_SUCCESS == code) {
×
894
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
895
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
×
896
    }
897

898
    char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
899
    STR_TO_VARSTR(col, (char *)"");
×
900

901
    if (TSDB_CODE_SUCCESS == code) {
×
902
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
903
      code = colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
×
904
    }
905

906
    if (TSDB_CODE_SUCCESS == code) {
×
907
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
908

909
      char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
910
      STR_TO_VARSTR(tag, (char *)"sma_index");
×
911
      code = colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
×
912
    }
913

914
    numOfRows++;
×
915
    sdbRelease(pSdb, pSma);
×
916
    if (TSDB_CODE_SUCCESS != code) {
×
917
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
918
      numOfRows = -1;
×
919
      break;
×
920
    }
921
  }
922

923
  mndReleaseDb(pMnode, pDb);
5,485✔
924
  pShow->numOfRows += numOfRows;
5,485✔
925
  return numOfRows;
5,485✔
926
}
927

928
// sma and tag index comm func
929
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
3,210✔
930
  int ret = mndProcessDropSmaReq(pReq);
3,210✔
931
  if (ret == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST || ret == TSDB_CODE_MND_SMA_NOT_EXIST) {
3,210✔
932
    terrno = 0;
3,210✔
933
    ret = mndProcessDropTagIdxReq(pReq);
3,210✔
934
  }
935
  return ret;
3,210✔
936
}
937

938
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
5,485✔
939
  if (pShow->pIter == NULL) {
5,485✔
940
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
5,485✔
941
  }
942
  if (!pShow->pIter) {
5,485✔
943
    return terrno;
×
944
  }
945
  int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
5,485✔
946
  if (read < rows) {
5,485✔
947
    read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read);
5,485✔
948
  }
949
  // no more to read
950
  if (read < rows) {
5,485✔
951
    taosMemoryFree(pShow->pIter);
5,485✔
952
    pShow->pIter = NULL;
5,485✔
953
  }
954
  return read;
5,485✔
955
}
956
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
×
957
  SSmaAndTagIter *p = pIter;
×
958
  if (p != NULL) {
×
959
    SSdb *pSdb = pMnode->pSdb;
×
960
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
961
    sdbCancelFetchByType(pSdb, p->pIdxIter, SDB_IDX);
×
962
  }
963
  taosMemoryFree(p);
×
964
}
×
965

966
static void initSMAObj(SCreateTSMACxt *pCxt) {
×
967
  memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
×
968
  memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
×
969
  memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
×
970
  (void)snprintf(pCxt->pSma->createUser, sizeof(pCxt->pSma->createUser), "%s", pCxt->pOperUser->name);
×
971
  pCxt->pSma->ownerId = pCxt->pOperUser->uid;
×
972
  if (pCxt->pBaseSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pBaseSma->name, TSDB_TABLE_FNAME_LEN);
×
973
  pCxt->pSma->createdTime = taosGetTimestampMs();
×
974
  pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
×
975

976
  memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
977
  pCxt->pSma->dstTbUid = 0;  // not used
×
978
  pCxt->pSma->stbUid = pCxt->pSrcStb ? pCxt->pSrcStb->uid : pCxt->pCreateSmaReq->normSourceTbUid;
×
979
  pCxt->pSma->dbUid = pCxt->pDb->uid;
×
980
  pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
×
981
  pCxt->pSma->intervalUnit = pCxt->pCreateSmaReq->intervalUnit;
×
982
  //  pCxt->pSma->timezone = taosGetLocalTimezoneOffset();
983
  pCxt->pSma->version = 1;
×
984

985
  pCxt->pSma->exprLen = pCxt->pCreateSmaReq->exprLen;
×
986
  pCxt->pSma->sqlLen = pCxt->pCreateSmaReq->sqlLen;
×
987
  pCxt->pSma->astLen = pCxt->pCreateSmaReq->astLen;
×
988
  pCxt->pSma->expr = pCxt->pCreateSmaReq->expr;
×
989
  pCxt->pSma->sql = pCxt->pCreateSmaReq->sql;
×
990
  pCxt->pSma->ast = pCxt->pCreateSmaReq->ast;
×
991
}
×
992

993
static int32_t mndSetUpdateDbTsmaVersionPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
×
994
  int32_t  code = 0;
×
995
  SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
×
996
  if (pRedoRaw == NULL) {
×
997
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
998
    if (terrno != 0) code = terrno;
×
999
    TAOS_RETURN(code);
×
1000
  }
1001
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
×
1002
    sdbFreeRaw(pRedoRaw);
×
1003
    TAOS_RETURN(code);
×
1004
  }
1005

1006
  TAOS_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
×
1007
}
1008

1009
static int32_t mndSetUpdateDbTsmaVersionCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
×
1010
  int32_t  code = 0;
×
1011
  SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
×
1012
  if (pCommitRaw == NULL) {
×
1013
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1014
    if (terrno != 0) code = terrno;
×
1015
    TAOS_RETURN(code);
×
1016
  }
1017
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
1018
    sdbFreeRaw(pCommitRaw);
×
1019
    TAOS_RETURN(code);
×
1020
  }
1021

1022
  TAOS_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
1023
}
1024

1025
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt *pCxt) {
×
1026
  int32_t      code = -1;
×
1027
  STransAction createStreamRedoAction = {0};
×
1028
  STransAction createStreamUndoAction = {0};
×
1029
  STransAction dropStbUndoAction = {0};
×
1030
  SMDropStbReq dropStbReq = {0};
×
1031
  STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
×
1032
  if (!pTrans) {
×
1033
    code = terrno;
×
1034
    goto _OVER;
×
1035
  }
1036
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
×
1037
  TAOS_CHECK_GOTO(mndTransCheckConflict(pCxt->pMnode, pTrans), NULL, _OVER);
×
1038

1039
  mndTransSetSerial(pTrans);
×
1040
  mInfo("trans:%d, used to create tsma:%s", pTrans->id, pCxt->pCreateSmaReq->name);
×
1041

1042
  mndGetMnodeEpSet(pCxt->pMnode, &createStreamRedoAction.epSet);
×
1043
  createStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
1044
  createStreamRedoAction.msgType = TDMT_MND_CREATE_STREAM;
×
1045
  createStreamRedoAction.contLen = pCxt->pCreateSmaReq->streamReqLen;
×
1046
  createStreamRedoAction.pCont = taosMemoryCalloc(1, createStreamRedoAction.contLen);
×
1047
  memcpy(createStreamRedoAction.pCont, pCxt->pCreateSmaReq->createStreamReq, createStreamRedoAction.contLen);
×
1048

1049
  createStreamUndoAction.epSet = createStreamRedoAction.epSet;
×
1050
  createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
1051
  createStreamUndoAction.msgType = TDMT_MND_DROP_STREAM;
×
1052
  createStreamUndoAction.contLen = pCxt->pCreateSmaReq->dropStreamReqLen;
×
1053
  createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
×
1054
  memcpy(createStreamUndoAction.pCont, pCxt->pCreateSmaReq->dropStreamReq, createStreamUndoAction.contLen);
×
1055

1056
  dropStbReq.igNotExists = true;
×
1057
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
1058
  dropStbUndoAction.epSet = createStreamRedoAction.epSet;
×
1059
  dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
×
1060
  dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
1061
  dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
×
1062
  dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
×
1063
  dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
×
1064
  if (!dropStbUndoAction.pCont) {
×
1065
    code = terrno;
×
1066
    goto _OVER;
×
1067
  }
1068
  if (dropStbUndoAction.contLen !=
×
1069
      tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
×
1070
    mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
×
1071
    code = TSDB_CODE_INVALID_MSG;
×
1072
    goto _OVER;
×
1073
  }
1074

1075
  SDbObj newDb = {0};
×
1076
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
×
1077
  newDb.tsmaVersion++;
×
1078
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1079
  TAOS_CHECK_GOTO(mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1080
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1081
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &createStreamRedoAction), NULL, _OVER);
×
1082
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &createStreamUndoAction), NULL, _OVER);
×
1083
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &dropStbUndoAction), NULL, _OVER);
×
1084
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1085
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1086
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
×
1087

1088
  code = TSDB_CODE_SUCCESS;
×
1089

1090
_OVER:
×
1091
  mndTransDrop(pTrans);
×
1092
  TAOS_RETURN(code);
×
1093
}
1094

1095
static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
×
1096
  int32_t            code = 0;
×
1097
  SSmaObj            sma = {0};
×
1098

1099
  pCxt->pSma = &sma;
×
1100
  initSMAObj(pCxt);
×
1101

1102
  SNodeList *pProjects = NULL;
×
1103
  code = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects);
×
1104
  if (TSDB_CODE_SUCCESS != code) {
×
1105
    goto _OVER;
×
1106
  }
1107
  pCxt->pProjects = pProjects;
×
1108

1109

1110
  if (TSDB_CODE_SUCCESS != (code = mndCreateTSMATxnPrepare(pCxt))) {
×
1111
    goto _OVER;
×
1112
  } else {
1113
    mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 " dstTb:%s dstVg:%d", pCxt->pCreateSmaReq->name, sma.uid,
×
1114
          sma.stbUid, sma.dstTbName, sma.dstVgId);
1115
    code = 0;
×
1116
  }
1117

1118
_OVER:
×
1119
  if (pProjects) nodesDestroyList(pProjects);
×
1120
  pCxt->pProjects = NULL;
×
1121
  TAOS_RETURN(code);
×
1122
}
1123

1124
static int32_t mndTSMAGenerateOutputName(const char *tsmaName, char *streamName, char *targetStbName) {
×
1125
  SName   smaName;
×
1126
  int32_t code = tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1127
  if (TSDB_CODE_SUCCESS != code) {
×
1128
    return code;
×
1129
  }
1130
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
×
1131
  snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s" TSMA_RES_STB_POSTFIX, tsmaName);
×
1132
  return TSDB_CODE_SUCCESS;
×
1133
}
1134

1135
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq) {
×
1136
#ifdef WINDOWS
1137
  TAOS_RETURN(TSDB_CODE_MND_INVALID_PLATFORM);
1138
#endif
1139
  SMnode        *pMnode = pReq->info.node;
×
1140
  int32_t        code = -1;
×
1141
  SDbObj        *pDb = NULL;
×
1142
  SStbObj       *pStb = NULL;
×
1143
  SSmaObj       *pSma = NULL;
×
1144
  SSmaObj       *pBaseTsma = NULL;
×
1145
  SStreamObj    *pStream = NULL;
×
1146
  SUserObj      *pOperUser = NULL;
×
1147
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
×
1148
  SMCreateSmaReq createReq = {0};
×
1149

1150
  if (sdbGetSize(pMnode->pSdb, SDB_SMA) >= tsMaxTsmaNum) {
×
1151
    code = TSDB_CODE_MND_MAX_TSMA_NUM_EXCEEDED;
×
1152
    goto _OVER;
×
1153
  }
1154

1155
  if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
×
1156
    code = TSDB_CODE_INVALID_MSG;
×
1157
    goto _OVER;
×
1158
  }
1159

1160
  mInfo("start to create tsma: %s", createReq.name);
×
1161
  if ((code = mndCheckCreateSmaReq(&createReq)) != 0) goto _OVER;
×
1162
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser)) != 0) goto _OVER;
×
1163

1164
  if (createReq.normSourceTbUid == 0) {
×
1165
    pStb = mndAcquireStb(pMnode, createReq.stb);
×
1166
    if (!pStb && !createReq.recursiveTsma) {
×
1167
      mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
1168
      code = TSDB_CODE_MND_STB_NOT_EXIST;
×
1169
      goto _OVER;
×
1170
    }
1171
  }
1172

1173
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1174
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
×
1175
  code = mndTSMAGenerateOutputName(createReq.name, streamName, streamTargetStbFullName);
×
1176
  if (TSDB_CODE_SUCCESS != code) {
×
1177
    mInfo("tsma:%s, faield to generate name", createReq.name);
×
1178
    goto _OVER;
×
1179
  }
1180

1181
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.name);
×
1182
  if (pSma && createReq.igExists) {
×
1183
    mInfo("tsma:%s, already exists in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
1184
    code = 0;
×
1185
    goto _OVER;
×
1186
  }
1187

1188
  if (pSma) {
×
1189
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
1190
    goto _OVER;
×
1191
  }
1192

1193
  SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName);
×
1194
  if (pTargetStb) {
×
1195
    code = TSDB_CODE_TDB_STB_ALREADY_EXIST;
×
1196
    mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name,
×
1197
           streamTargetStbFullName);
1198
    goto _OVER;
×
1199
  }
1200

1201
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
1202
  if (pStream != NULL || code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
×
1203
    mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
×
1204
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
1205
    goto _OVER;
×
1206
  }
1207

1208
  SName name = {0};
×
1209
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1210
  if (TSDB_CODE_SUCCESS != code) {
×
1211
    goto _OVER;
×
1212
  }
1213
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
1214
  (void)tNameGetFullDbName(&name, db);
×
1215

1216
  pDb = mndAcquireDb(pMnode, db);
×
1217
  if (pDb == NULL) {
×
1218
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1219
    goto _OVER;
×
1220
  }
1221

1222
  // TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb), NULL, _OVER);
NEW
1223
  if ((code = mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pDb))) {
×
1224
    goto _OVER;
×
1225
  }
UNCOV
1226
  if ((code = mndCheckObjPrivilegeRec(pMnode, pOperUser, PRIV_TBL_CREATE, PRIV_OBJ_DB, pDb->ownerId, name.acctId,
×
1227
                                      name.dbname, NULL))) {
UNCOV
1228
    goto _OVER;
×
1229
  }
1230

1231
  if (createReq.recursiveTsma) {
×
1232
    pBaseTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName);
×
1233
    if (!pBaseTsma) {
×
1234
      mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName);
×
1235
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
UNCOV
1236
      goto _OVER;
×
1237
    }
1238
    if (!pStb) {
×
UNCOV
1239
      createReq.normSourceTbUid = pBaseTsma->stbUid;
×
1240
    }
1241
  }
1242

UNCOV
1243
  SCreateTSMACxt cxt = {
×
1244
      .pMnode = pMnode,
1245
      .pCreateSmaReq = &createReq,
1246
      .streamName = streamName,
1247
      .targetStbFullName = streamTargetStbFullName,
1248
      .pDb = pDb,
1249
      .pRpcReq = pReq,
1250
      .pSma = NULL,
1251
      .pBaseSma = pBaseTsma,
1252
      .pSrcStb = pStb,
1253
      .pOperUser = pOperUser,
1254
  };
1255

1256
  code = mndCreateTSMA(&cxt);
×
UNCOV
1257
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1258

1259
_OVER:
×
1260
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1261
    mError("tsma:%s, failed to create since %s", createReq.name, tstrerror(code));
×
1262
  }
1263

1264
  if (pStb) mndReleaseStb(pMnode, pStb);
×
1265
  if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
1266
  mndReleaseSma(pMnode, pSma);
×
1267
  mndReleaseStream(pMnode, pStream);
×
1268
  mndReleaseDb(pMnode, pDb);
×
1269
  mndReleaseUser(pMnode, pOperUser);
×
UNCOV
1270
  tFreeSMCreateSmaReq(&createReq);
×
1271

UNCOV
1272
  TAOS_RETURN(code);
×
1273
}
1274

1275
static int32_t mndDropTSMA(SCreateTSMACxt *pCxt) {
×
1276
  int32_t      code = -1;
×
1277
  STransAction dropStreamRedoAction = {0};
×
1278
  STrans      *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "drop-tsma");
×
1279
  if (!pTrans) {
×
1280
    code = terrno;
×
UNCOV
1281
    goto _OVER;
×
1282
  }
1283
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
×
1284
  if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
×
1285
  mndTransSetSerial(pTrans);
×
1286
  mndGetMnodeEpSet(pCxt->pMnode, &dropStreamRedoAction.epSet);
×
1287
  dropStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
1288
  dropStreamRedoAction.msgType = TDMT_MND_DROP_STREAM;
×
1289
  dropStreamRedoAction.contLen = pCxt->pDropSmaReq->dropStreamReqLen;
×
1290
  dropStreamRedoAction.pCont = taosMemoryCalloc(1, dropStreamRedoAction.contLen);
×
UNCOV
1291
  memcpy(dropStreamRedoAction.pCont, pCxt->pDropSmaReq->dropStreamReq, dropStreamRedoAction.contLen);
×
1292

1293
  // output stable is not dropped when dropping stream, dropping it when dropping tsma
1294
  SMDropStbReq dropStbReq = {0};
×
1295
  dropStbReq.igNotExists = false;
×
1296
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
1297
  dropStbReq.sql = "drop";
×
UNCOV
1298
  dropStbReq.sqlLen = 5;
×
1299

1300
  STransAction dropStbRedoAction = {0};
×
1301
  mndGetMnodeEpSet(pCxt->pMnode, &dropStbRedoAction.epSet);
×
1302
  dropStbRedoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
×
1303
  dropStbRedoAction.msgType = TDMT_MND_STB_DROP;
×
1304
  dropStbRedoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
×
1305
  dropStbRedoAction.pCont = taosMemoryCalloc(1, dropStbRedoAction.contLen);
×
1306
  if (!dropStbRedoAction.pCont) {
×
1307
    code = terrno;
×
UNCOV
1308
    goto _OVER;
×
1309
  }
1310
  if (dropStbRedoAction.contLen !=
×
1311
      tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
×
1312
    mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name);
×
1313
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1314
    goto _OVER;
×
1315
  }
1316

1317
  SDbObj newDb = {0};
×
1318
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
×
1319
  newDb.tsmaVersion++;
×
1320
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1321
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1322
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStreamRedoAction), NULL, _OVER);
×
1323
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStbRedoAction), NULL, _OVER);
×
1324
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1325
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1326
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
×
1327
  code = TSDB_CODE_SUCCESS;
×
1328
_OVER:
×
1329
  mndTransDrop(pTrans);
×
UNCOV
1330
  TAOS_RETURN(code);
×
1331
}
1332

1333
static bool hasRecursiveTsmasBasedOnMe(SMnode *pMnode, const SSmaObj *pSma) {
×
1334
  SSmaObj *pSmaObj = NULL;
×
UNCOV
1335
  void    *pIter = NULL;
×
1336
  while (1) {
1337
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj);
×
1338
    if (pIter == NULL) break;
×
1339
    if (0 == strncmp(pSmaObj->baseSmaName, pSma->name, TSDB_TABLE_FNAME_LEN)) {
×
1340
      sdbRelease(pMnode->pSdb, pSmaObj);
×
1341
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
1342
      return true;
×
1343
    }
UNCOV
1344
    sdbRelease(pMnode->pSdb, pSmaObj);
×
1345
  }
UNCOV
1346
  return false;
×
1347
}
1348

1349
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq) {
×
1350
  int32_t      code = -1;
×
1351
  SMDropSmaReq dropReq = {0};
×
1352
  SSmaObj     *pSma = NULL;
×
1353
  SDbObj      *pDb = NULL;
×
1354
  SMnode      *pMnode = pReq->info.node;
×
1355
  SStbObj     *pStb = NULL;
×
1356
  SUserObj    *pUser = NULL;
×
1357
  if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) {
×
1358
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1359
    goto _OVER;
×
1360
  }
1361

1362
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1363
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
×
1364
  code = mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
×
1365
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1366
    goto _OVER;
×
1367
  }
1368

UNCOV
1369
  pStb = mndAcquireStb(pMnode, streamTargetStbFullName);
×
1370

1371
  pSma = mndAcquireSma(pMnode, dropReq.name);
×
1372
  if (!pSma && dropReq.igNotExists) {
×
1373
    code = 0;
×
UNCOV
1374
    goto _OVER;
×
1375
  }
1376
  if (!pSma) {
×
1377
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
UNCOV
1378
    goto _OVER;
×
1379
  }
1380
  SName name = {0};
×
1381
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1382
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1383
    goto _OVER;
×
1384
  }
1385
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1386
  (void)tNameGetFullDbName(&name, db);
×
1387

1388
  pDb = mndAcquireDb(pMnode, db);
×
1389
  if (!pDb) {
×
1390
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
UNCOV
1391
    goto _OVER;
×
1392
  }
1393

1394
  // if ((code = mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb)) != 0) {
1395
  //   goto _OVER;
1396
  // }
1397
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser)) != 0) goto _OVER;
×
1398
  if ((code =
×
1399
           mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_DROP, PRIV_OBJ_TSMA, pSma->ownerId, pSma->db, pSma->name))) {
×
UNCOV
1400
    goto _OVER;
×
1401
  }
1402

1403
  if (hasRecursiveTsmasBasedOnMe(pMnode, pSma)) {
×
1404
    code = TSDB_CODE_MND_INVALID_DROP_TSMA;
×
UNCOV
1405
    goto _OVER;
×
1406
  }
1407

UNCOV
1408
  SCreateTSMACxt cxt = {
×
1409
      .pDb = pDb,
1410
      .pMnode = pMnode,
1411
      .pRpcReq = pReq,
1412
      .pSma = pSma,
1413
      .streamName = streamName,
1414
      .targetStbFullName = streamTargetStbFullName,
1415
      .pDropSmaReq = &dropReq,
1416
  };
1417

UNCOV
1418
  code = mndDropTSMA(&cxt);
×
1419

1420
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
UNCOV
1421
_OVER:
×
1422

1423
  tFreeSMDropSmaReq(&dropReq);
×
1424
  mndReleaseStb(pMnode, pStb);
×
1425
  mndReleaseSma(pMnode, pSma);
×
1426
  mndReleaseDb(pMnode, pDb);
×
1427
  mndReleaseUser(pMnode, pUser);
×
UNCOV
1428
  TAOS_RETURN(code);
×
1429
}
1430

1431
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1432
  SDbObj          *pDb = NULL;
×
1433
  int32_t          numOfRows = 0;
×
1434
  SSmaObj         *pSma = NULL;
×
1435
  SMnode          *pMnode = pReq->info.node;
×
1436
  SSdb            *pSdb = pMnode->pSdb;
×
1437
  int32_t          code = 0, lino = 0;
×
1438
  SUserObj        *pUser = NULL;
×
1439
  char             objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
×
1440
  bool             showAll = false;
×
UNCOV
1441
  int64_t          dbUid = 0;
×
1442
  SColumnInfoData *pColInfo;
1443

1444
  if (pShow->pIter == NULL) {
×
UNCOV
1445
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
×
1446
  }
1447
  if (!pShow->pIter) {
×
UNCOV
1448
    return terrno;
×
1449
  }
1450
  if (pShow->db[0]) {
×
1451
    pDb = mndAcquireDb(pMnode, pShow->db);
×
1452
    if (!pDb) {
×
1453
      taosMemoryFreeClear(pShow->pIter);
×
UNCOV
1454
      return terrno;
×
1455
    }
1456
  }
1457

1458
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
×
1459
  int32_t objLevel = privObjGetLevel(PRIV_OBJ_TSMA);
×
1460
  (void)snprintf(objFName, sizeof(objFName), "%d.*", pUser->acctId);
×
NEW
1461
  showAll = (0 == mndCheckSysObjPrivilege(pMnode, pUser, RPC_MSG_TOKEN(pReq), PRIV_CM_SHOW, PRIV_OBJ_TSMA, 0, objFName,
×
1462
                                          objLevel == 0 ? NULL : "*"));
1463
  if (!showAll && pShow->db[0] != 0) {
×
NEW
1464
    showAll = (0 == mndCheckSysObjPrivilege(pMnode, pUser, RPC_MSG_TOKEN(pReq), PRIV_CM_SHOW, PRIV_OBJ_TSMA, pUser->uid,
×
NEW
1465
                                            pShow->db, objLevel == 0 ? NULL : "*"));
×
1466
  }
1467

1468
  SSmaAndTagIter *pIter = pShow->pIter;
×
1469
  while (numOfRows < rows) {
×
1470
    pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
×
1471
    if (pIter->pSmaIter == NULL) break;
×
UNCOV
1472
    SDbObj *pSrcDb = mndAcquireDb(pMnode, pSma->db);
×
1473

1474
    if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) {
×
1475
      sdbRelease(pMnode->pSdb, pSma);
×
1476
      if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
UNCOV
1477
      continue;
×
1478
    }
1479

1480
    int32_t cols = 0;
×
UNCOV
1481
    SName   n = {0};
×
1482

UNCOV
1483
    code = tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1484

1485
    if (TSDB_CODE_SUCCESS != code) {
×
1486
      sdbRelease(pSdb, pSma);
×
1487
      if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
UNCOV
1488
      break;
×
1489
    }
1490

1491
    if (!showAll) {
×
1492
      (void)snprintf(objFName, sizeof(objFName), "%s", pSma->db);
×
UNCOV
1493
      if (mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_SHOW, PRIV_OBJ_TSMA, pSma->ownerId, objFName,
×
1494
                                   objLevel == 0 ? NULL : n.tname)) {  // 1.db1.tsma1
1495
        sdbRelease(pSdb, pSma);
×
1496
        if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
UNCOV
1497
        continue;
×
1498
      }
1499
    }
1500

1501
    char smaName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1502
    if (TSDB_CODE_SUCCESS == code) {
×
1503
      STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n));
×
1504
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1505
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
1506
    }
1507

1508
    char db[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1509
    if (TSDB_CODE_SUCCESS == code) {
×
1510
      STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
×
1511
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1512
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
1513
    }
1514

1515
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1516
      code = tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1517
    }
1518
    char srcTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1519
    if (TSDB_CODE_SUCCESS == code) {
×
1520
      STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
×
1521
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1522
      code = colDataSetVal(pColInfo, numOfRows, (const char *)srcTb, false);
×
1523
    }
1524

1525
    if (TSDB_CODE_SUCCESS == code) {
×
1526
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1527
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
1528
    }
1529

1530
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1531
      code = tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1532
    }
1533

1534
    if (TSDB_CODE_SUCCESS == code) {
×
1535
      char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1536
      STR_TO_VARSTR(targetTb, (char *)tNameGetTableName(&n));
×
1537
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1538
      code = colDataSetVal(pColInfo, numOfRows, (const char *)targetTb, false);
×
1539
    }
1540

UNCOV
1541
    if (TSDB_CODE_SUCCESS == code) {
×
1542
      // stream name
1543
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1544
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
1545
    }
1546

1547
    if (TSDB_CODE_SUCCESS == code) {
×
1548
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1549
      code = colDataSetVal(pColInfo, numOfRows, (const char *)(&pSma->createdTime), false);
×
1550
    }
1551

1552
    // interval
1553
    char    interval[64 + VARSTR_HEADER_SIZE] = {0};
×
1554
    int32_t len = 0;
×
1555
    if (TSDB_CODE_SUCCESS == code) {
×
1556
      if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
×
1557
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
×
UNCOV
1558
                        getPrecisionUnit(pSrcDb->cfg.precision));
×
1559
      } else {
UNCOV
1560
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
×
1561
      }
1562
      varDataSetLen(interval, len);
×
1563
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1564
      code = colDataSetVal(pColInfo, numOfRows, interval, false);
×
1565
    }
1566

1567
    char buf[TSDB_MAX_SAVED_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1568
    if (TSDB_CODE_SUCCESS == code) {
×
1569
      // create sql
1570
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1571
      len = tsnprintf(buf + VARSTR_HEADER_SIZE, TSDB_MAX_SAVED_SQL_LEN, "%s", pSma->sql);
×
1572
      varDataSetLen(buf, TMIN(len, TSDB_MAX_SAVED_SQL_LEN));
×
UNCOV
1573
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
1574
    }
1575

1576
    // func list
1577
    len = 0;
×
1578
    SNode *pNode = NULL, *pFunc = NULL;
×
1579
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1580
      code = nodesStringToNode(pSma->ast, &pNode);
×
1581
    }
1582
    if (TSDB_CODE_SUCCESS == code) {
×
1583
      char *start = buf + VARSTR_HEADER_SIZE;
×
1584
      FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) {
×
1585
        if (nodeType(pFunc) == QUERY_NODE_FUNCTION) {
×
1586
          SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
×
1587
          if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
1588
          len += tsnprintf(start, TSDB_MAX_SAVED_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "",
×
1589
                           ((SExprNode *)pFunc)->userAlias);
×
1590
          if (len >= TSDB_MAX_SAVED_SQL_LEN) {
×
1591
            len = TSDB_MAX_SAVED_SQL_LEN;
×
UNCOV
1592
            break;
×
1593
          }
UNCOV
1594
          start = buf + VARSTR_HEADER_SIZE + len;
×
1595
        }
1596
      }
UNCOV
1597
      nodesDestroyNode(pNode);
×
1598
    }
1599

1600
    if (TSDB_CODE_SUCCESS == code) {
×
1601
      varDataSetLen(buf, len);
×
1602
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1603
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
1604
    }
1605

1606
    numOfRows++;
×
1607
    mndReleaseSma(pMnode, pSma);
×
1608
    mndReleaseDb(pMnode, pSrcDb);
×
1609
    if (TSDB_CODE_SUCCESS != code) {
×
1610
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
1611
      numOfRows = code;
×
UNCOV
1612
      break;
×
1613
    }
1614
  }
1615
_exit:
×
1616
  if (code != 0) {
×
UNCOV
1617
    numOfRows = code;
×
1618
  }
1619
  mndReleaseDb(pMnode, pDb);
×
1620
  mndReleaseUser(pMnode, pUser);
×
1621
  pShow->numOfRows += numOfRows;
×
1622
  if (numOfRows < rows) {
×
1623
    taosMemoryFree(pShow->pIter);
×
UNCOV
1624
    pShow->pIter = NULL;
×
1625
  }
UNCOV
1626
  return numOfRows;
×
1627
}
1628

1629
static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
×
1630
  SSmaAndTagIter *p = pIter;
×
1631
  if (p != NULL) {
×
1632
    SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1633
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
1634
  }
1635
  taosMemoryFree(p);
×
UNCOV
1636
}
×
1637

UNCOV
1638
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj *pSma, const SStbObj *pDestStb, STableTSMAInfo *pInfo,
×
1639
                               const SSmaObj *pBaseTsma) {
1640
  int32_t code = 0;
×
1641
  pInfo->interval = pSma->interval;
×
1642
  pInfo->unit = pSma->intervalUnit;
×
1643
  pInfo->tsmaId = pSma->uid;
×
1644
  pInfo->version = pSma->version;
×
1645
  pInfo->tsmaId = pSma->uid;
×
1646
  pInfo->destTbUid = pDestStb->uid;
×
1647
  SName sName = {0};
×
1648
  code = tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1649
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1650
    return code;
×
1651
  }
1652
  tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN);
×
1653
  tstrncpy(pInfo->targetDbFName, pSma->db, TSDB_DB_FNAME_LEN);
×
1654
  code = tNameFromString(&sName, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1655
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1656
    return code;
×
1657
  }
1658
  tstrncpy(pInfo->targetTb, sName.tname, TSDB_TABLE_NAME_LEN);
×
1659
  tstrncpy(pInfo->dbFName, pSma->db, TSDB_DB_FNAME_LEN);
×
1660
  code = tNameFromString(&sName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1661
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1662
    return code;
×
1663
  }
1664
  tstrncpy(pInfo->tb, sName.tname, TSDB_TABLE_NAME_LEN);
×
1665
  pInfo->pFuncs = taosArrayInit(8, sizeof(STableTSMAFuncInfo));
×
UNCOV
1666
  if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY;
×
1667

1668
  SNode *pNode, *pFunc;
×
1669
  if (TSDB_CODE_SUCCESS != nodesStringToNode(pBaseTsma ? pBaseTsma->ast : pSma->ast, &pNode)) {
×
1670
    taosArrayDestroy(pInfo->pFuncs);
×
1671
    pInfo->pFuncs = NULL;
×
UNCOV
1672
    return TSDB_CODE_TSMA_INVALID_STAT;
×
1673
  }
1674
  if (pNode) {
×
1675
    SSelectStmt *pSelect = (SSelectStmt *)pNode;
×
1676
    FOREACH(pFunc, pSelect->pProjectionList) {
×
1677
      STableTSMAFuncInfo funcInfo = {0};
×
1678
      SFunctionNode     *pFuncNode = (SFunctionNode *)pFunc;
×
1679
      if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
1680
      funcInfo.funcId = pFuncNode->funcId;
×
1681
      funcInfo.colId = ((SColumnNode *)pFuncNode->pParameterList->pHead->pNode)->colId;
×
1682
      if (!taosArrayPush(pInfo->pFuncs, &funcInfo)) {
×
1683
        code = terrno;
×
1684
        taosArrayDestroy(pInfo->pFuncs);
×
1685
        nodesDestroyNode(pNode);
×
UNCOV
1686
        return code;
×
1687
      }
1688
    }
UNCOV
1689
    nodesDestroyNode(pNode);
×
1690
  }
1691
  pInfo->ast = taosStrdup(pSma->ast);
×
UNCOV
1692
  if (!pInfo->ast) code = terrno;
×
1693

1694
  if (code == TSDB_CODE_SUCCESS && pDestStb->numOfTags > 0) {
×
1695
    pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema));
×
1696
    if (!pInfo->pTags) {
×
UNCOV
1697
      code = terrno;
×
1698
    } else {
1699
      for (int32_t i = 0; i < pDestStb->numOfTags; ++i) {
×
1700
        if (NULL == taosArrayPush(pInfo->pTags, &pDestStb->pTags[i])) {
×
1701
          code = terrno;
×
UNCOV
1702
          break;
×
1703
        }
1704
      }
1705
    }
1706
  }
1707
  if (code == TSDB_CODE_SUCCESS) {
×
1708
    pInfo->pUsedCols = taosArrayInit(pDestStb->numOfColumns - 3, sizeof(SSchema));
×
1709
    if (!pInfo->pUsedCols)
×
UNCOV
1710
      code = terrno;
×
1711
    else {
1712
      // skip _wstart, _wend, _duration
1713
      for (int32_t i = 1; i < pDestStb->numOfColumns - 2; ++i) {
×
1714
        if (NULL == taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i])) {
×
1715
          code = terrno;
×
UNCOV
1716
          break;
×
1717
        }
1718
      }
1719
    }
1720
  }
UNCOV
1721
  TAOS_RETURN(code);
×
1722
}
1723

1724
// @note remember to mndReleaseSma(*ppOut)
1725
static int32_t mndGetDeepestBaseForTsma(SMnode *pMnode, SSmaObj *pSma, SSmaObj **ppOut) {
×
1726
  int32_t  code = 0;
×
1727
  SSmaObj *pRecursiveTsma = NULL;
×
1728
  if (pSma->baseSmaName[0]) {
×
1729
    pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName);
×
1730
    if (!pRecursiveTsma) {
×
1731
      mError("base tsma: %s for tsma: %s not found", pSma->baseSmaName, pSma->name);
×
UNCOV
1732
      return TSDB_CODE_MND_SMA_NOT_EXIST;
×
1733
    }
1734
    while (pRecursiveTsma->baseSmaName[0]) {
×
1735
      SSmaObj *pTmpSma = pRecursiveTsma;
×
1736
      pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
×
1737
      if (!pRecursiveTsma) {
×
1738
        mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name);
×
1739
        mndReleaseSma(pMnode, pTmpSma);
×
UNCOV
1740
        return TSDB_CODE_MND_SMA_NOT_EXIST;
×
1741
      }
UNCOV
1742
      mndReleaseSma(pMnode, pTmpSma);
×
1743
    }
1744
  }
1745
  *ppOut = pRecursiveTsma;
×
UNCOV
1746
  return code;
×
1747
}
1748

1749
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
×
1750
  int32_t  code = -1;
×
1751
  SSmaObj *pSma = NULL;
×
1752
  SSmaObj *pBaseTsma = NULL;
×
UNCOV
1753
  SStbObj *pDstStb = NULL;
×
1754

1755
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName);
×
1756
  if (pSma) {
×
1757
    pDstStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
1758
    if (!pDstStb) {
×
1759
      sdbRelease(pMnode->pSdb, pSma);
×
UNCOV
1760
      return TSDB_CODE_SUCCESS;
×
1761
    }
1762

1763
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
1764
    if (!pTsma) {
×
1765
      code = terrno;
×
1766
      sdbRelease(pMnode->pSdb, pSma);
×
1767
      mndReleaseStb(pMnode, pDstStb);
×
UNCOV
1768
      TAOS_RETURN(code);
×
1769
    }
1770

1771
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
×
1772
    if (code == 0) {
×
UNCOV
1773
      code = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma, pBaseTsma);
×
1774
    }
1775
    mndReleaseStb(pMnode, pDstStb);
×
1776
    sdbRelease(pMnode->pSdb, pSma);
×
1777
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
1778
    if (terrno) {
×
1779
      tFreeAndClearTableTSMAInfo(pTsma);
×
UNCOV
1780
      TAOS_RETURN(code);
×
1781
    }
1782
    if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
×
1783
      code = terrno;
×
UNCOV
1784
      tFreeAndClearTableTSMAInfo(pTsma);
×
1785
    }
UNCOV
1786
    *exist = true;
×
1787
  }
UNCOV
1788
  TAOS_RETURN(code);
×
1789
}
1790

1791
typedef bool (*tsmaFilter)(const SSmaObj *pSma, void *param);
1792

1793
static int32_t mndGetSomeTsmas(SMnode *pMnode, STableTSMAInfoRsp *pRsp, tsmaFilter filtered, void *param, bool *exist) {
767,867✔
1794
  int32_t     code = 0;
767,867✔
1795
  SSmaObj    *pSma = NULL;
767,867✔
1796
  SSmaObj    *pBaseTsma = NULL;
767,867✔
1797
  SSdb       *pSdb = pMnode->pSdb;
767,867✔
1798
  void       *pIter = NULL;
767,867✔
1799
  SStreamObj *pStream = NULL;
767,867✔
1800
  SStbObj    *pStb = NULL;
767,867✔
1801
  bool        shouldRetry = false;
767,867✔
1802

UNCOV
1803
  while (1) {
×
1804
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
767,867✔
1805
    if (pIter == NULL) break;
767,867✔
1806

1807
    if (filtered(pSma, param)) {
×
1808
      sdbRelease(pSdb, pSma);
×
UNCOV
1809
      continue;
×
1810
    }
1811

1812
    pStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
1813
    if (!pStb) {
×
1814
      sdbRelease(pSdb, pSma);
×
1815
      shouldRetry = true;
×
UNCOV
1816
      continue;
×
1817
    }
1818

1819
    SName smaName;
×
1820
    char  streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1821
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1822
    if (TSDB_CODE_SUCCESS != code) {
×
1823
      sdbRelease(pSdb, pSma);
×
1824
      mndReleaseStb(pMnode, pStb);
×
UNCOV
1825
      TAOS_RETURN(code);
×
1826
    }
1827
    snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s.%s", smaName.acctId, smaName.dbname, smaName.tname);
×
UNCOV
1828
    pStream = NULL;
×
1829

1830
    code = mndAcquireStream(pMnode, streamName, &pStream);
×
1831
    if (!pStream) {
×
1832
      shouldRetry = true;
×
1833
      sdbRelease(pSdb, pSma);
×
1834
      mndReleaseStb(pMnode, pStb);
×
UNCOV
1835
      continue;
×
1836
    }
1837
    if (code != 0) {
×
1838
      sdbRelease(pSdb, pSma);
×
1839
      mndReleaseStb(pMnode, pStb);
×
UNCOV
1840
      TAOS_RETURN(code);
×
1841
    }
1842

1843
    int64_t streamId = pStream->pCreate->streamId;
×
UNCOV
1844
    mndReleaseStream(pMnode, pStream);
×
1845

1846
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
1847
    if (!pTsma) {
×
1848
      code = terrno;
×
1849
      mndReleaseStb(pMnode, pStb);
×
1850
      sdbRelease(pSdb, pSma);
×
1851
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1852
      TAOS_RETURN(code);
×
1853
    }
1854

1855
    pTsma->streamAddr = taosMemoryCalloc(1, sizeof(SStreamTaskAddr));
×
1856
    code = msmGetTriggerTaskAddr(pMnode, streamId, pTsma->streamAddr);
×
1857
    if (code != 0) {
×
1858
      shouldRetry = true;
×
1859
      mndReleaseStb(pMnode, pStb);
×
1860
      sdbRelease(pSdb, pSma);
×
1861
      tFreeAndClearTableTSMAInfo(pTsma);
×
1862
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1863
      TAOS_RETURN(code);
×
1864
    }
UNCOV
1865
    pTsma->streamUid = streamId;
×
1866

1867
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
×
1868
    if (code == 0) {
×
UNCOV
1869
      code = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma, pBaseTsma);
×
1870
    }
1871
    mndReleaseStb(pMnode, pStb);
×
1872
    sdbRelease(pSdb, pSma);
×
1873
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
1874
    if (terrno) {
×
1875
      tFreeAndClearTableTSMAInfo(pTsma);
×
1876
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1877
      TAOS_RETURN(code);
×
1878
    }
1879
    if (NULL == taosArrayPush(pRsp->pTsmas, &pTsma)) {
×
1880
      code = terrno;
×
1881
      tFreeAndClearTableTSMAInfo(pTsma);
×
1882
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1883
      TAOS_RETURN(code);
×
1884
    }
UNCOV
1885
    *exist = true;
×
1886
  }
1887
  if (shouldRetry) {
767,867✔
UNCOV
1888
    return TSDB_CODE_NEED_RETRY;
×
1889
  }
1890
  return TSDB_CODE_SUCCESS;
767,867✔
1891
}
1892

1893
static bool tsmaTbFilter(const SSmaObj *pSma, void *param) {
×
1894
  const char *tbFName = param;
×
UNCOV
1895
  return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0;
×
1896
}
1897

1898
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *pRsp, bool *exist) {
×
UNCOV
1899
  return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist);
×
1900
}
1901

1902
static bool tsmaDbFilter(const SSmaObj *pSma, void *param) {
×
1903
  uint64_t *dbUid = param;
×
UNCOV
1904
  return pSma->dbUid != *dbUid;
×
1905
}
1906

1907
int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist) {
767,867✔
1908
  return mndGetSomeTsmas(pMnode, pRsp, tsmaDbFilter, &dbUid, exist);
767,867✔
1909
}
1910

1911
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
×
1912
  STableTSMAInfoRsp rsp = {0};
×
1913
  int32_t           code = -1;
×
1914
  STableTSMAInfoReq tsmaReq = {0};
×
1915
  bool              exist = false;
×
UNCOV
1916
  SMnode           *pMnode = pReq->info.node;
×
1917

UNCOV
1918
  TAOS_CHECK_GOTO(tDeserializeTableTSMAInfoReq(pReq->pCont, pReq->contLen, &tsmaReq), NULL, _OVER);
×
1919

1920
  rsp.pTsmas = taosArrayInit(4, POINTER_BYTES);
×
1921
  if (NULL == rsp.pTsmas) {
×
1922
    code = terrno;
×
UNCOV
1923
    goto _OVER;
×
1924
  }
1925

1926
  if (tsmaReq.fetchingWithTsmaName) {
×
UNCOV
1927
    code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
×
1928
  } else {
1929
    code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
×
1930
    if (TSDB_CODE_NEED_RETRY == code) {
×
UNCOV
1931
      code = TSDB_CODE_SUCCESS;
×
1932
    }
1933
  }
1934
  if (code) {
×
UNCOV
1935
    goto _OVER;
×
1936
  }
1937

1938
  if (!exist) {
×
UNCOV
1939
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1940
  } else {
1941
    int32_t contLen = tSerializeTableTSMAInfoRsp(NULL, 0, &rsp);
×
1942
    void   *pRsp = rpcMallocCont(contLen);
×
1943
    if (pRsp == NULL) {
×
1944
      code = terrno;
×
UNCOV
1945
      goto _OVER;
×
1946
    }
1947

1948
    int32_t len = tSerializeTableTSMAInfoRsp(pRsp, contLen, &rsp);
×
1949
    if (len < 0) {
×
1950
      code = terrno;
×
UNCOV
1951
      goto _OVER;
×
1952
    }
1953

1954
    pReq->info.rsp = pRsp;
×
UNCOV
1955
    pReq->info.rspLen = contLen;
×
1956

UNCOV
1957
    code = 0;
×
1958
  }
1959

1960
_OVER:
×
1961
  tFreeTableTSMAInfoRsp(&rsp);
×
UNCOV
1962
  TAOS_RETURN(code);
×
1963
}
1964

1965
static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo **ppTsma) {
×
1966
  STableTSMAInfo *pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
1967
  if (!pInfo) {
×
UNCOV
1968
    return terrno;
×
1969
  }
1970
  pInfo->pFuncs = NULL;
×
1971
  pInfo->tsmaId = pTsmaVer->tsmaId;
×
1972
  tstrncpy(pInfo->dbFName, pTsmaVer->dbFName, TSDB_DB_FNAME_LEN);
×
1973
  tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN);
×
1974
  tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN);
×
1975
  pInfo->dbId = pTsmaVer->dbId;
×
1976
  pInfo->ast = taosMemoryCalloc(1, 1);
×
1977
  if (!pInfo->ast) {
×
1978
    taosMemoryFree(pInfo);
×
UNCOV
1979
    return terrno;
×
1980
  }
1981
  *ppTsma = pInfo;
×
UNCOV
1982
  return TSDB_CODE_SUCCESS;
×
1983
}
1984

UNCOV
1985
int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp,
×
1986
                            int32_t *pRspLen) {
1987
  int32_t         code = -1;
×
1988
  STSMAHbRsp      hbRsp = {0};
×
1989
  int32_t         rspLen = 0;
×
1990
  void           *pRsp = NULL;
×
1991
  char            tsmaFName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1992
  STableTSMAInfo *pTsmaInfo = NULL;
×
1993

1994
  hbRsp.pTsmas = taosArrayInit(numOfTsmas, POINTER_BYTES);
×
1995
  if (!hbRsp.pTsmas) {
×
1996
    code = terrno;
×
UNCOV
1997
    TAOS_RETURN(code);
×
1998
  }
1999

2000
  for (int32_t i = 0; i < numOfTsmas; ++i) {
×
2001
    STSMAVersion *pTsmaVer = &pTsmaVersions[i];
×
2002
    pTsmaVer->dbId = be64toh(pTsmaVer->dbId);
×
2003
    pTsmaVer->tsmaId = be64toh(pTsmaVer->tsmaId);
×
UNCOV
2004
    pTsmaVer->version = ntohl(pTsmaVer->version);
×
2005

2006
    snprintf(tsmaFName, sizeof(tsmaFName), "%s.%s", pTsmaVer->dbFName, pTsmaVer->name);
×
2007
    SSmaObj *pSma = mndAcquireSma(pMnode, tsmaFName);
×
2008
    if (!pSma) {
×
2009
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2010
      if (code) goto _OVER;
×
2011
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2012
        code = terrno;
×
2013
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
UNCOV
2014
        goto _OVER;
×
2015
      }
UNCOV
2016
      continue;
×
2017
    }
2018

2019
    if (pSma->uid != pTsmaVer->tsmaId) {
×
2020
      mDebug("tsma: %s.%" PRIx64 " tsmaId mismatch with current %" PRIx64, tsmaFName, pTsmaVer->tsmaId, pSma->uid);
×
2021
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2022
      mndReleaseSma(pMnode, pSma);
×
2023
      if (code) goto _OVER;
×
2024
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2025
        code = terrno;
×
2026
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
UNCOV
2027
        goto _OVER;
×
2028
      }
2029
      continue;
×
2030
    } else if (pSma->version == pTsmaVer->version) {
×
2031
      mndReleaseSma(pMnode, pSma);
×
UNCOV
2032
      continue;
×
2033
    }
2034

2035
    SStbObj *pDestStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
2036
    if (!pDestStb) {
×
2037
      mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName);
×
2038
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2039
      mndReleaseSma(pMnode, pSma);
×
2040
      if (code) goto _OVER;
×
2041
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2042
        code = terrno;
×
2043
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
UNCOV
2044
        goto _OVER;
×
2045
      }
UNCOV
2046
      continue;
×
2047
    }
2048

2049
    // dump smaObj into rsp
2050
    STableTSMAInfo *pInfo = NULL;
×
2051
    pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2052
    if (!pInfo) {
×
2053
      code = terrno;
×
2054
      mndReleaseSma(pMnode, pSma);
×
2055
      mndReleaseStb(pMnode, pDestStb);
×
UNCOV
2056
      goto _OVER;
×
2057
    }
2058

2059
    SSmaObj *pBaseSma = NULL;
×
2060
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma);
×
UNCOV
2061
    if (code == 0) code = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma);
×
2062

2063
    mndReleaseStb(pMnode, pDestStb);
×
2064
    mndReleaseSma(pMnode, pSma);
×
2065
    if (pBaseSma) mndReleaseSma(pMnode, pBaseSma);
×
2066
    if (terrno) {
×
2067
      tFreeAndClearTableTSMAInfo(pInfo);
×
UNCOV
2068
      goto _OVER;
×
2069
    }
2070

2071
    if (NULL == taosArrayPush(hbRsp.pTsmas, pInfo)) {
×
2072
      code = terrno;
×
2073
      tFreeAndClearTableTSMAInfo(pInfo);
×
UNCOV
2074
      goto _OVER;
×
2075
    }
2076
  }
2077

2078
  rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp);
×
2079
  if (rspLen < 0) {
×
2080
    code = terrno;
×
UNCOV
2081
    goto _OVER;
×
2082
  }
2083

2084
  pRsp = taosMemoryMalloc(rspLen);
×
2085
  if (!pRsp) {
×
2086
    code = terrno;
×
2087
    rspLen = 0;
×
UNCOV
2088
    goto _OVER;
×
2089
  }
2090

2091
  rspLen = tSerializeTSMAHbRsp(pRsp, rspLen, &hbRsp);
×
2092
  if (rspLen < 0) {
×
2093
    code = terrno;
×
UNCOV
2094
    goto _OVER;
×
2095
  }
2096
  code = 0;
×
2097
_OVER:
×
2098
  tFreeTSMAHbRsp(&hbRsp);
×
2099
  *ppRsp = pRsp;
×
2100
  *pRspLen = rspLen;
×
UNCOV
2101
  TAOS_RETURN(code);
×
2102
}
2103

2104
int32_t mndDropTSMAsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
603,316✔
2105
  int32_t code = 0;
603,316✔
2106
  SSdb   *pSdb = pMnode->pSdb;
603,316✔
2107
  void   *pIter = NULL;
603,316✔
2108

UNCOV
2109
  while (1) {
×
2110
    SSmaObj *pSma = NULL;
603,316✔
2111
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
603,316✔
2112
    if (pIter == NULL) break;
603,316✔
2113

2114
    if (pSma->dbUid == pDb->uid) {
×
2115
      if ((code = mndSetDropSmaCommitLogs(pMnode, pTrans, pSma)) != 0) {
×
2116
        sdbRelease(pSdb, pSma);
×
2117
        sdbCancelFetch(pSdb, pSma);
×
UNCOV
2118
        TAOS_RETURN(code);
×
2119
      }
2120
    }
2121

UNCOV
2122
    sdbRelease(pSdb, pSma);
×
2123
  }
2124

2125
  TAOS_RETURN(code);
603,316✔
2126
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc