• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4905

29 Dec 2025 02:08PM UTC coverage: 65.423% (-0.3%) from 65.734%
#4905

push

travis-ci

web-flow
enh: sign connect request (#34067)

23 of 29 new or added lines in 4 files covered. (79.31%)

11614 existing lines in 186 files now uncovered.

193476 of 295730 relevant lines covered (65.42%)

115752566.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.01
/source/dnode/mnode/impl/src/mndSma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSma.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndIndex.h"
22
#include "mndIndexComm.h"
23
#include "mndInfoSchema.h"
24
#include "mndMnode.h"
25
#include "mndPrivilege.h"
26
#include "mndShow.h"
27
#include "mndStb.h"
28
#include "mndStream.h"
29
#include "mndTrans.h"
30
#include "mndUser.h"
31
#include "mndVgroup.h"
32
#include "parser.h"
33
#include "tname.h"
34

35
#define TSDB_SMA_VER_NUMBER   1
36
#define TSDB_SMA_RESERVE_SIZE 32
37

38
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma);
39
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
40
static int32_t  mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
41
static int32_t  mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
42
static int32_t  mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
43
static int32_t  mndProcessDropSmaReq(SRpcMsg *pReq);
44
static int32_t  mndProcessGetSmaReq(SRpcMsg *pReq);
45
static int32_t  mndProcessGetTbSmaReq(SRpcMsg *pReq);
46
static int32_t  mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47

48
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq);
49
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq);
50

51
// sma and tag index comm func
52
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
53
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
54
static void    mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
55

56
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
57
static void    mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter);
58
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq);
59

60
typedef struct SCreateTSMACxt {
61
  SMnode        *pMnode;
62
  const SRpcMsg *pRpcReq;
63
  union {
64
    const SMCreateSmaReq *pCreateSmaReq;
65
    const SMDropSmaReq   *pDropSmaReq;
66
  };
67
  SDbObj             *pDb;
68
  SStbObj            *pSrcStb;
69
  SSmaObj            *pSma;
70
  const SSmaObj      *pBaseSma;
71
  SUserObj           *pOperUser;
72
  const char         *streamName;
73
  const char         *targetStbFullName;
74
  SNodeList          *pProjects;
75
} SCreateTSMACxt;
76

77
int32_t mndInitSma(SMnode *pMnode) {
383,877✔
78
  SSdbTable table = {
383,877✔
79
      .sdbType = SDB_SMA,
80
      .keyType = SDB_KEY_BINARY,
81
      .encodeFp = (SdbEncodeFp)mndSmaActionEncode,
82
      .decodeFp = (SdbDecodeFp)mndSmaActionDecode,
83
      .insertFp = (SdbInsertFp)mndSmaActionInsert,
84
      .updateFp = (SdbUpdateFp)mndSmaActionUpdate,
85
      .deleteFp = (SdbDeleteFp)mndSmaActionDelete,
86
  };
87

88
//  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq);
89
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
383,877✔
90
  mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
383,877✔
91
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
383,877✔
92

93
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
383,877✔
94
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
383,877✔
95

96
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq);
383,877✔
97
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM_RSP, mndTransProcessRsp);
383,877✔
98
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndTransProcessRsp);
383,877✔
99
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB_RSP, mndTransProcessRsp);
383,877✔
100
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq);
383,877✔
101
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_TSMA, mndProcessGetTbTSMAReq);
383,877✔
102
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TSMA, mndProcessGetTbTSMAReq);
383,877✔
103
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA);
383,877✔
104
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA);
383,877✔
105

106
  return sdbSetTable(pMnode->pSdb, table);
383,877✔
107
}
108

109
void mndCleanupSma(SMnode *pMnode) {}
383,814✔
110

111
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
×
112
  int32_t code = 0;
×
113
  int32_t lino = 0;
×
UNCOV
114
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
115

116
  int32_t size =
×
117
      sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
×
118
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size);
×
UNCOV
119
  if (pRaw == NULL) goto _OVER;
×
120

121
  int32_t dataPos = 0;
×
122
  SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
×
123
  SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
×
124
  SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
×
125
  SDB_SET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
×
126
  SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER)
×
127
  SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER)
×
128
  SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER)
×
129
  SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER)
×
130
  SDB_SET_INT64(pRaw, dataPos, pSma->dstTbUid, _OVER)
×
131
  SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER)
×
132
  SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER)
×
133
  SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER)
×
134
  SDB_SET_INT32(pRaw, dataPos, pSma->dstVgId, _OVER)
×
135
  SDB_SET_INT64(pRaw, dataPos, pSma->interval, _OVER)
×
136
  SDB_SET_INT64(pRaw, dataPos, pSma->offset, _OVER)
×
137
  SDB_SET_INT64(pRaw, dataPos, pSma->sliding, _OVER)
×
138
  SDB_SET_INT32(pRaw, dataPos, pSma->exprLen, _OVER)
×
139
  SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER)
×
140
  SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER)
×
141
  SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER)
×
UNCOV
142
  SDB_SET_INT32(pRaw, dataPos, pSma->version, _OVER)
×
143

144
  if (pSma->exprLen > 0) {
×
UNCOV
145
    SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
×
146
  }
147
  if (pSma->tagsFilterLen > 0) {
×
UNCOV
148
    SDB_SET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
149
  }
150
  if (pSma->sqlLen > 0) {
×
UNCOV
151
    SDB_SET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
×
152
  }
153
  if (pSma->astLen > 0) {
×
UNCOV
154
    SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
×
155
  }
UNCOV
156
  SDB_SET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
×
157
  SDB_SET_BINARY(pRaw, dataPos, pSma->createUser, TSDB_USER_LEN, _OVER)
×
158
  SDB_SET_INT64(pRaw, dataPos, pSma->ownerId, _OVER)
×
159

160
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
×
UNCOV
161
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
×
162

163
  terrno = 0;
×
164

165
_OVER:
×
166
  if (terrno != 0) {
×
UNCOV
167
    mError("sma:%s, failed to encode to raw:%p since %s", pSma->name, pRaw, terrstr());
×
UNCOV
168
    sdbFreeRaw(pRaw);
×
169
    return NULL;
×
170
  }
171

UNCOV
172
  mTrace("sma:%s, encode to raw:%p, row:%p", pSma->name, pRaw, pSma);
×
173
  return pRaw;
×
174
}
175

176
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
×
177
  int32_t code = 0;
×
178
  int32_t lino = 0;
×
UNCOV
179
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
180
  SSdbRow *pRow = NULL;
×
181
  SSmaObj *pSma = NULL;
×
182

183
  int8_t sver = 0;
×
184
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
×
185

UNCOV
186
  if (sver != TSDB_SMA_VER_NUMBER) {
×
UNCOV
187
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
188
    goto _OVER;
×
189
  }
190

191
  pRow = sdbAllocRow(sizeof(SSmaObj));
×
192
  if (pRow == NULL) goto _OVER;
×
193

194
  pSma = sdbGetRowObj(pRow);
×
UNCOV
195
  if (pSma == NULL) goto _OVER;
×
196

197
  int32_t dataPos = 0;
×
198

199
  SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
×
200
  SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
×
201
  SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
×
202
  SDB_GET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
×
203
  SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER)
×
204
  SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER)
×
205
  SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER)
×
206
  SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER)
×
207
  SDB_GET_INT64(pRaw, dataPos, &pSma->dstTbUid, _OVER)
×
208
  SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER)
×
209
  SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER)
×
210
  SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER)
×
211
  SDB_GET_INT32(pRaw, dataPos, &pSma->dstVgId, _OVER)
×
212
  SDB_GET_INT64(pRaw, dataPos, &pSma->interval, _OVER)
×
213
  SDB_GET_INT64(pRaw, dataPos, &pSma->offset, _OVER)
×
214
  SDB_GET_INT64(pRaw, dataPos, &pSma->sliding, _OVER)
×
215
  SDB_GET_INT32(pRaw, dataPos, &pSma->exprLen, _OVER)
×
216
  SDB_GET_INT32(pRaw, dataPos, &pSma->tagsFilterLen, _OVER)
×
UNCOV
217
  SDB_GET_INT32(pRaw, dataPos, &pSma->sqlLen, _OVER)
×
218
  SDB_GET_INT32(pRaw, dataPos, &pSma->astLen, _OVER)
×
219
  SDB_GET_INT32(pRaw, dataPos, &pSma->version, _OVER)
×
220

221
  if (pSma->exprLen > 0) {
×
UNCOV
222
    pSma->expr = taosMemoryCalloc(pSma->exprLen, 1);
×
UNCOV
223
    if (pSma->expr == NULL) goto _OVER;
×
224
    SDB_GET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
×
225
  }
226

227
  if (pSma->tagsFilterLen > 0) {
×
UNCOV
228
    pSma->tagsFilter = taosMemoryCalloc(pSma->tagsFilterLen, 1);
×
UNCOV
229
    if (pSma->tagsFilter == NULL) goto _OVER;
×
230
    SDB_GET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
231
  }
232

233
  if (pSma->sqlLen > 0) {
×
UNCOV
234
    pSma->sql = taosMemoryCalloc(pSma->sqlLen, 1);
×
UNCOV
235
    if (pSma->sql == NULL) goto _OVER;
×
236
    SDB_GET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
×
237
  }
238

239
  if (pSma->astLen > 0) {
×
UNCOV
240
    pSma->ast = taosMemoryCalloc(pSma->astLen, 1);
×
241
    if (pSma->ast == NULL) goto _OVER;
×
UNCOV
242
    SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
×
243
  }
UNCOV
244
  SDB_GET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
×
245
  SDB_GET_BINARY(pRaw, dataPos, pSma->createUser, TSDB_USER_LEN, _OVER)
×
UNCOV
246
  SDB_GET_INT64(pRaw, dataPos, &pSma->ownerId, _OVER)
×
247

248
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
×
249

250
  terrno = 0;
×
251

252
_OVER:
×
253
  if (terrno != 0) {
×
254
    if (pSma != NULL) {
×
UNCOV
255
      mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr());
×
256
      taosMemoryFreeClear(pSma->expr);
×
257
      taosMemoryFreeClear(pSma->tagsFilter);
×
UNCOV
258
      taosMemoryFreeClear(pSma->sql);
×
UNCOV
259
      taosMemoryFreeClear(pSma->ast);
×
260
    }
261
    taosMemoryFreeClear(pRow);
×
UNCOV
262
    return NULL;
×
263
  }
264

265
  mTrace("sma:%s, decode from raw:%p, row:%p", pSma->name, pRaw, pSma);
×
266
  return pRow;
×
267
}
268

269
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma) {
×
270
  mTrace("sma:%s, perform insert action, row:%p", pSma->name, pSma);
×
271
  return 0;
×
272
}
273

274
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSma) {
×
275
  mTrace("sma:%s, perform delete action, row:%p", pSma->name, pSma);
×
UNCOV
276
  taosMemoryFreeClear(pSma->tagsFilter);
×
UNCOV
277
  taosMemoryFreeClear(pSma->expr);
×
278
  taosMemoryFreeClear(pSma->sql);
×
279
  taosMemoryFreeClear(pSma->ast);
×
280
  return 0;
×
281
}
282

283
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew) {
×
284
  mTrace("sma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
×
285
  pOld->ownerId = pNew->ownerId;
×
286
  return 0;
×
287
}
288

289
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName) {
×
UNCOV
290
  SSdb    *pSdb = pMnode->pSdb;
×
UNCOV
291
  SSmaObj *pSma = sdbAcquire(pSdb, SDB_SMA, smaName);
×
UNCOV
292
  if (pSma == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
UNCOV
293
    terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
×
294
  }
UNCOV
295
  return pSma;
×
296
}
297

298
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) {
3,081✔
299
  SSdb *pSdb = pMnode->pSdb;
3,081✔
300
  sdbRelease(pSdb, pSma);
3,081✔
301
}
3,081✔
302

303
static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
UNCOV
304
  int32_t  code = 0;
×
305
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
×
306
  if (pRedoRaw == NULL) {
×
UNCOV
307
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
308
    if (terrno != 0) code = terrno;
×
UNCOV
309
    TAOS_RETURN(code);
×
310
  }
311
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
312
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
313

314
  TAOS_RETURN(code);
×
315
}
316

317
static int32_t mndSetCreateSmaUndoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
UNCOV
318
  int32_t  code = 0;
×
319
  SSdbRaw *pUndoRaw = mndSmaActionEncode(pSma);
×
320
  if (!pUndoRaw) {
×
321
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
322
    if (terrno != 0) code = terrno;
×
UNCOV
323
    TAOS_RETURN(code);
×
324
  }
325
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
326
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
327
  TAOS_RETURN(code);
×
328
}
329

330
static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
UNCOV
331
  int32_t  code = 0;
×
332
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
×
333
  if (pCommitRaw == NULL) {
×
UNCOV
334
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
335
    if (terrno != 0) code = terrno;
×
UNCOV
336
    TAOS_RETURN(code);
×
337
  }
338
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
339
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
340

341
  TAOS_RETURN(code);
×
342
}
343

344
static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
×
345
  int32_t code = 0;
×
346
  SStbObj stbObj = {0};
×
347
  taosRLockLatch(&pStb->lock);
×
348
  memcpy(&stbObj, pStb, sizeof(SStbObj));
×
349
  taosRUnLockLatch(&pStb->lock);
×
350
  stbObj.numOfColumns = 0;
×
351
  stbObj.pColumns = NULL;
×
352
  stbObj.numOfTags = 0;
×
UNCOV
353
  stbObj.pTags = NULL;
×
354
  stbObj.numOfFuncs = 0;
×
355
  stbObj.pFuncs = NULL;
×
356
  stbObj.updateTime = taosGetTimestampMs();
×
357
  stbObj.lock = 0;
×
358
  stbObj.smaVer++;
×
359

360
  SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj);
×
361
  if (pCommitRaw == NULL) {
×
UNCOV
362
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
363
    if (terrno != 0) code = terrno;
×
UNCOV
364
    TAOS_RETURN(code);
×
365
  }
366
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
367
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
368

369
  TAOS_RETURN(code);
×
370
}
371

372
static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
×
373
  int32_t code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
374
  if (pCreate->name[0] == 0) TAOS_RETURN(code);
×
375
  if (pCreate->stb[0] == 0) TAOS_RETURN(code);
×
376
  if (pCreate->igExists < 0 || pCreate->igExists > 1) TAOS_RETURN(code);
×
377
  if (pCreate->intervalUnit < 0) TAOS_RETURN(code);
×
378
  if (pCreate->slidingUnit < 0) TAOS_RETURN(code);
×
379
  if (pCreate->timezone < 0) TAOS_RETURN(code);
×
380
  if (pCreate->interval < 0) TAOS_RETURN(code);
×
381
  if (pCreate->offset < 0) TAOS_RETURN(code);
×
382
  if (pCreate->sliding < 0) TAOS_RETURN(code);
×
383
  if (pCreate->exprLen < 0) TAOS_RETURN(code);
×
384
  if (pCreate->tagsFilterLen < 0) TAOS_RETURN(code);
×
UNCOV
385
  if (pCreate->sqlLen < 0) TAOS_RETURN(code);
×
386
  if (pCreate->astLen < 0) TAOS_RETURN(code);
×
387
  if (pCreate->exprLen != 0 && strlen(pCreate->expr) + 1 != pCreate->exprLen) TAOS_RETURN(code);
×
388
  if (pCreate->tagsFilterLen != 0 && strlen(pCreate->tagsFilter) + 1 != pCreate->tagsFilterLen) TAOS_RETURN(code);
×
UNCOV
389
  if (pCreate->sqlLen != 0 && strlen(pCreate->sql) + 1 != pCreate->sqlLen) TAOS_RETURN(code);
×
390
  if (pCreate->astLen != 0 && strlen(pCreate->ast) + 1 != pCreate->astLen) TAOS_RETURN(code);
×
391

UNCOV
392
  SName smaName = {0};
×
UNCOV
393
  if (tNameFromString(&smaName, pCreate->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) < 0) return -1;
×
394
  if (*(char *)tNameGetTableName(&smaName) == 0) return -1;
×
395

396
  code = 0;
×
397
  TAOS_RETURN(code);
×
398
}
399

400
static int32_t mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
×
401
  SName   n;
×
UNCOV
402
  int32_t code = tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
403
  if (TSDB_CODE_SUCCESS != code) {
×
404
    return code;
×
405
  }
406
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", n.acctId, n.tname);
×
407
  return TSDB_CODE_SUCCESS;
×
408
}
409

410
static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
UNCOV
411
  int32_t  code = 0;
×
412
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
×
413
  if (pRedoRaw == NULL) {
×
UNCOV
414
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
415
    if (terrno != 0) code = terrno;
×
UNCOV
416
    return -1;
×
417
  }
418
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
419
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
420

421
  return 0;
×
422
}
423

424
static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
UNCOV
425
  int32_t  code = 0;
×
426
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
×
427
  if (pCommitRaw == NULL) {
×
UNCOV
428
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
429
    if (terrno != 0) code = terrno;
×
UNCOV
430
    return -1;
×
431
  }
432
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
433
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
434

435
  return 0;
×
436
}
437

438
static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
UNCOV
439
  int32_t  code = 0;
×
440
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
441
  if (pVgRaw == NULL) {
×
UNCOV
442
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
443
    if (terrno != 0) code = terrno;
×
UNCOV
444
    return -1;
×
445
  }
446
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
×
447
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPING));
×
448

449
  return 0;
×
450
}
451

452
static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
UNCOV
453
  int32_t  code = 0;
×
454
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
455
  if (pVgRaw == NULL) {
×
UNCOV
456
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
457
    if (terrno != 0) code = terrno;
×
UNCOV
458
    return -1;
×
459
  }
460
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
×
461
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED));
×
462

463
  return 0;
×
464
}
465

466
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
467
  int32_t    code = 0;
×
UNCOV
468
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
×
UNCOV
469
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
470
  if (pDnode == NULL) {
×
471
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
472
    if (terrno != 0) code = terrno;
×
UNCOV
473
    TAOS_RETURN(code);
×
474
  }
475

476
  STransAction action = {0};
×
477
  action.epSet = mndGetDnodeEpset(pDnode);
×
478
  mndReleaseDnode(pMnode, pDnode);
×
479

UNCOV
480
  int32_t contLen = 0;
×
UNCOV
481
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
482
  if (pReq == NULL) {
×
483
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
484
    if (terrno != 0) code = terrno;
×
485
    TAOS_RETURN(code);
×
486
  }
487

488
  action.pCont = pReq;
×
489
  action.contLen = contLen;
×
UNCOV
490
  action.msgType = TDMT_DND_DROP_VNODE;
×
UNCOV
491
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
×
492

UNCOV
493
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
UNCOV
494
    taosMemoryFree(pReq);
×
495
    TAOS_RETURN(code);
×
496
  }
497

498
  TAOS_RETURN(code);
×
499
}
500

UNCOV
501
static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) {
×
502
  int32_t     code = -1;
×
503
  SVgObj     *pVgroup = NULL;
×
504
  SStbObj    *pStb = NULL;
×
505
  STrans     *pTrans = NULL;
×
506
  SStreamObj *pStream = NULL;
×
507

UNCOV
508
  pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
509
  if (pVgroup == NULL) {
×
510
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
511
    if (terrno != 0) code = terrno;
×
512
    goto _OVER;
×
513
  }
514

UNCOV
515
  pStb = mndAcquireStb(pMnode, pSma->stb);
×
516
  if (pStb == NULL) {
×
517
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
518
    if (terrno != 0) code = terrno;
×
519
    goto _OVER;
×
520
  }
521

UNCOV
522
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-sma");
×
523
  if (pTrans == NULL) {
×
524
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
525
    if (terrno != 0) code = terrno;
×
UNCOV
526
    goto _OVER;
×
527
  }
528

529
  mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
×
530
  mndTransSetDbName(pTrans, pDb->name, NULL);
×
531
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
532

UNCOV
533
  mndTransSetSerial(pTrans);
×
534

535
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
536
  code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
537
  if (TSDB_CODE_SUCCESS != code) {
×
538
    goto _OVER;
×
539
  }
540

541
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
542
  if (pStream == NULL || pStream->pCreate->streamId != pSma->uid || code != 0) {
×
543
    sdbRelease(pMnode->pSdb, pStream);
×
544
    goto _OVER;
×
545
  } else {
546
    // drop stream
547
    if ((code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
548
      mError("stream:%s, failed to drop log since %s", pStream->pCreate->name, tstrerror(code));
×
549
      sdbRelease(pMnode->pSdb, pStream);
×
550
      goto _OVER;
×
551
    }
552
  }
553
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
UNCOV
554
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
555
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
UNCOV
556
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
557
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
×
558
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
559
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
560

561
  code = 0;
×
562

UNCOV
563
_OVER:
×
UNCOV
564
  mndTransDrop(pTrans);
×
UNCOV
565
  mndReleaseStream(pMnode, pStream);
×
UNCOV
566
  mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
567
  mndReleaseStb(pMnode, pStb);
×
UNCOV
568
  TAOS_RETURN(code);
×
569
}
570

571
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
3,081✔
572
  SMnode      *pMnode = pReq->info.node;
3,081✔
573
  int32_t      code = -1;
3,081✔
574
  SDbObj      *pDb = NULL;
3,081✔
575
  SSmaObj     *pSma = NULL;
3,081✔
576
  SMDropSmaReq dropReq = {0};
3,081✔
577

578
  TAOS_CHECK_GOTO(tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
3,081✔
579

580
  mInfo("sma:%s, start to drop", dropReq.name);
3,081✔
581

582
  SSIdx idx = {0};
3,081✔
583
  if ((code = mndAcquireGlobalIdx(pMnode, dropReq.name, SDB_SMA, &idx)) == 0) {
3,081✔
584
    pSma = idx.pIdx;
1,711✔
585
  } else {
586
    goto _OVER;
1,370✔
587
  }
588
  if (pSma == NULL) {
1,711✔
589
    if (dropReq.igNotExists) {
1,711✔
UNCOV
590
      mInfo("sma:%s, not exist, ignore not exist is set", dropReq.name);
×
UNCOV
591
      code = 0;
×
UNCOV
592
      goto _OVER;
×
593
    } else {
594
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
1,711✔
595
      goto _OVER;
1,711✔
596
    }
597
  }
598

599
  SName name = {0};
×
UNCOV
600
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
601
  if (TSDB_CODE_SUCCESS != code) {
×
602
    goto _OVER;
×
603
  }
604
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
605
  (void)tNameGetFullDbName(&name, db);
×
606

607
  pDb = mndAcquireDb(pMnode, db);
×
UNCOV
608
  if (pDb == NULL) {
×
609
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
610
    goto _OVER;
×
611
  }
612

UNCOV
613
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
614

UNCOV
615
  code = mndDropSma(pMnode, pReq, pDb, pSma);
×
UNCOV
616
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
617

618
_OVER:
3,081✔
619
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
3,081✔
620
    mError("sma:%s, failed to drop since %s", dropReq.name, tstrerror(code));
3,081✔
621
  }
622

623
  mndReleaseSma(pMnode, pSma);
3,081✔
624
  mndReleaseDb(pMnode, pDb);
3,081✔
625
  TAOS_RETURN(code);
3,081✔
626
}
627

628
static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
×
UNCOV
629
  int32_t  code = -1;
×
630
  SSmaObj *pSma = NULL;
×
631

UNCOV
632
  SSIdx idx = {0};
×
UNCOV
633
  if (0 == mndAcquireGlobalIdx(pMnode, indexReq->indexFName, SDB_SMA, &idx)) {
×
634
    pSma = idx.pIdx;
×
635
  } else {
636
    *exist = false;
×
UNCOV
637
    return 0;
×
638
  }
639

640
  if (pSma == NULL) {
×
641
    *exist = false;
×
UNCOV
642
    return 0;
×
643
  }
644

645
  memcpy(rsp->dbFName, pSma->db, sizeof(pSma->db));
×
646
  memcpy(rsp->tblFName, pSma->stb, sizeof(pSma->stb));
×
647
  tstrncpy(rsp->indexType, TSDB_INDEX_TYPE_SMA, TSDB_INDEX_TYPE_LEN);
×
648

649
  SNodeList *pList = NULL;
×
650
  int32_t    extOffset = 0;
×
651
  code = nodesStringToList(pSma->expr, &pList);
×
UNCOV
652
  if (0 == code) {
×
UNCOV
653
    SNode *node = NULL;
×
654
    FOREACH(node, pList) {
×
UNCOV
655
      SFunctionNode *pFunc = (SFunctionNode *)node;
×
UNCOV
656
      extOffset += tsnprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
×
657
                             (extOffset ? "," : ""), pFunc->functionName);
×
658
    }
659

UNCOV
660
    *exist = true;
×
661
  }
662

663
  mndReleaseSma(pMnode, pSma);
×
664
  return code;
×
665
}
666

UNCOV
667
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) {
×
668
  int32_t         code = 0;
×
669
  SSmaObj        *pSma = NULL;
×
670
  SSdb           *pSdb = pMnode->pSdb;
×
671
  void           *pIter = NULL;
×
UNCOV
672
  STableIndexInfo info;
×
673

674
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
×
675
  if (NULL == pStb) {
×
676
    *exist = false;
×
677
    return TSDB_CODE_SUCCESS;
×
678
  }
679

680
  tstrncpy(rsp->dbFName, pStb->db, TSDB_DB_FNAME_LEN);
×
681
  tstrncpy(rsp->tbName, pStb->name + strlen(pStb->db) + 1, TSDB_TABLE_NAME_LEN);
×
682
  rsp->suid = pStb->uid;
×
UNCOV
683
  rsp->version = pStb->smaVer;
×
684
  mndReleaseStb(pMnode, pStb);
×
685

686
  while (1) {
×
UNCOV
687
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
×
UNCOV
688
    if (pIter == NULL) break;
×
689

690
    if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
×
691
      sdbRelease(pSdb, pSma);
×
692
      continue;
×
693
    }
694

695
    info.intervalUnit = pSma->intervalUnit;
×
UNCOV
696
    info.slidingUnit = pSma->slidingUnit;
×
697
    info.interval = pSma->interval;
×
698
    info.offset = pSma->offset;
×
699
    info.sliding = pSma->sliding;
×
700
    info.dstTbUid = pSma->dstTbUid;
×
701
    info.dstVgId = pSma->dstVgId;
×
702

703
    SVgObj *pVg = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
UNCOV
704
    if (pVg == NULL) {
×
705
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
706
      if (terrno != 0) code = terrno;
×
707
      sdbRelease(pSdb, pSma);
×
708
      sdbCancelFetch(pSdb, pIter);
×
709
      return code;
×
710
    }
711
    info.epSet = mndGetVgroupEpset(pMnode, pVg);
×
712

UNCOV
713
    info.expr = taosMemoryMalloc(pSma->exprLen + 1);
×
UNCOV
714
    if (info.expr == NULL) {
×
715
      code = terrno;
×
716
      sdbRelease(pSdb, pSma);
×
UNCOV
717
      sdbCancelFetch(pSdb, pIter);
×
718
      return code;
×
719
    }
720

721
    memcpy(info.expr, pSma->expr, pSma->exprLen);
×
722
    info.expr[pSma->exprLen] = 0;
×
723

UNCOV
724
    if (NULL == taosArrayPush(rsp->pIndex, &info)) {
×
UNCOV
725
      code = terrno;
×
726
      taosMemoryFree(info.expr);
×
727
      sdbRelease(pSdb, pSma);
×
UNCOV
728
      sdbCancelFetch(pSdb, pIter);
×
729
      return code;
×
730
    }
731

732
    rsp->indexSize += sizeof(info) + pSma->exprLen + 1;
×
UNCOV
733
    *exist = true;
×
734

735
    sdbRelease(pSdb, pSma);
×
736
  }
737

738
  TAOS_RETURN(code);
×
739
}
740

UNCOV
741
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
×
742
  SUserIndexReq indexReq = {0};
×
UNCOV
743
  SMnode       *pMnode = pReq->info.node;
×
744
  int32_t       code = -1;
×
745
  SUserIndexRsp rsp = {0};
×
746
  bool          exist = false;
×
747

UNCOV
748
  TAOS_CHECK_GOTO(tDeserializeSUserIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
749

UNCOV
750
  code = mndGetSma(pMnode, &indexReq, &rsp, &exist);
×
751
  if (code) {
×
UNCOV
752
    goto _OVER;
×
753
  }
754

755
  if (!exist) {
×
756
    // TODO GET INDEX FROM FULLTEXT
757
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
758
  } else {
UNCOV
759
    int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
×
760
    void   *pRsp = rpcMallocCont(contLen);
×
761
    if (pRsp == NULL) {
×
762
      code = terrno;
×
763
      goto _OVER;
×
764
    }
765

766
    contLen = tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
×
767
    if (contLen < 0) {
×
UNCOV
768
      code = terrno;
×
769
      goto _OVER;
×
770
    }
771

772
    pReq->info.rsp = pRsp;
×
773
    pReq->info.rspLen = contLen;
×
774

UNCOV
775
    code = 0;
×
776
  }
777

UNCOV
778
_OVER:
×
UNCOV
779
  if (code != 0) {
×
780
    mError("failed to get index %s since %s", indexReq.indexFName, tstrerror(code));
×
781
  }
782

783
  TAOS_RETURN(code);
×
784
}
785

UNCOV
786
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
×
787
  STableIndexReq indexReq = {0};
×
UNCOV
788
  SMnode        *pMnode = pReq->info.node;
×
789
  int32_t        code = -1;
×
790
  STableIndexRsp rsp = {0};
×
791
  bool           exist = false;
×
792

UNCOV
793
  TAOS_CHECK_GOTO(tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
794

795
  rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
×
796
  if (NULL == rsp.pIndex) {
×
797
    code = terrno;
×
UNCOV
798
    goto _OVER;
×
799
  }
800

801
  code = mndGetTableSma(pMnode, indexReq.tbFName, &rsp, &exist);
×
UNCOV
802
  if (code) {
×
803
    goto _OVER;
×
804
  }
805

806
  if (!exist) {
×
807
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
808
  } else {
UNCOV
809
    int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp);
×
810
    void   *pRsp = rpcMallocCont(contLen);
×
811
    if (pRsp == NULL) {
×
812
      code = terrno;
×
813
      goto _OVER;
×
814
    }
815

816
    contLen = tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
×
817
    if (contLen < 0) {
×
UNCOV
818
      code = terrno;
×
819
      goto _OVER;
×
820
    }
821

822
    pReq->info.rsp = pRsp;
×
823
    pReq->info.rspLen = contLen;
×
824

UNCOV
825
    code = 0;
×
826
  }
827

828
_OVER:
×
UNCOV
829
  if (code != 0) {
×
UNCOV
830
    mError("failed to get table index %s since %s", indexReq.tbFName, tstrerror(code));
×
831
  }
832

UNCOV
833
  tFreeSerializeSTableIndexRsp(&rsp);
×
UNCOV
834
  TAOS_RETURN(code);
×
835
}
836

837
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
5,382✔
838
  SMnode   *pMnode = pReq->info.node;
5,382✔
839
  SSdb     *pSdb = pMnode->pSdb;
5,382✔
840
  int32_t   numOfRows = 0;
5,382✔
841
  SSmaObj  *pSma = NULL;
5,382✔
842
  int32_t   cols = 0;
5,382✔
843
  int32_t   code = 0;
5,382✔
844

845
  SDbObj *pDb = NULL;
5,382✔
846
  if (strlen(pShow->db) > 0) {
5,382✔
847
    pDb = mndAcquireDb(pMnode, pShow->db);
2,957✔
848
    if (pDb == NULL) return 0;
2,957✔
849
  }
850

851
  SSmaAndTagIter *pIter = pShow->pIter;
5,382✔
852
  while (numOfRows < rows) {
5,382✔
853
    pIter->pSmaIter = sdbFetch(pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
5,382✔
854
    if (pIter->pSmaIter == NULL) break;
5,382✔
855

856
    if (NULL != pDb && pSma->dbUid != pDb->uid) {
×
857
      sdbRelease(pSdb, pSma);
×
858
      continue;
×
859
    }
860

861
    cols = 0;
×
862

863
    SName smaName = {0};
×
864
    SName stbName = {0};
×
865
    char  n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
866
    char  n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
867
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
868
    char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
869
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
870
      STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
×
871
      STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db));
×
872
      code = tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
873
    }
874
    SColumnInfoData *pColInfo = NULL;
×
875
    if (TSDB_CODE_SUCCESS == code) {
×
876
      STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
×
877

878
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
879
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
×
880
    }
UNCOV
881
    if (TSDB_CODE_SUCCESS == code) {
×
882
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
883
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
×
884
    }
UNCOV
885
    if (TSDB_CODE_SUCCESS == code) {
×
886
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
887
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
×
888
    }
UNCOV
889
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
890
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
891
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
×
892
    }
UNCOV
893
    if (TSDB_CODE_SUCCESS == code) {
×
894
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
895
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
×
896
    }
897

UNCOV
898
    char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
899
    STR_TO_VARSTR(col, (char *)"");
×
900

UNCOV
901
    if (TSDB_CODE_SUCCESS == code) {
×
902
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
903
      code = colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
×
904
    }
905

UNCOV
906
    if (TSDB_CODE_SUCCESS == code) {
×
907
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
908

909
      char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
910
      STR_TO_VARSTR(tag, (char *)"sma_index");
×
911
      code = colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
×
912
    }
913

UNCOV
914
    numOfRows++;
×
UNCOV
915
    sdbRelease(pSdb, pSma);
×
UNCOV
916
    if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
917
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
UNCOV
918
      numOfRows = -1;
×
UNCOV
919
      break;
×
920
    }
921
  }
922

923
  mndReleaseDb(pMnode, pDb);
5,382✔
924
  pShow->numOfRows += numOfRows;
5,382✔
925
  return numOfRows;
5,382✔
926
}
927

928
// sma and tag index comm func
929
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
3,081✔
930
  int ret = mndProcessDropSmaReq(pReq);
3,081✔
931
  if (ret == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST || ret == TSDB_CODE_MND_SMA_NOT_EXIST) {
3,081✔
932
    terrno = 0;
3,081✔
933
    ret = mndProcessDropTagIdxReq(pReq);
3,081✔
934
  }
935
  return ret;
3,081✔
936
}
937

938
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
5,382✔
939
  if (pShow->pIter == NULL) {
5,382✔
940
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
5,382✔
941
  }
942
  if (!pShow->pIter) {
5,382✔
UNCOV
943
    return terrno;
×
944
  }
945
  int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
5,382✔
946
  if (read < rows) {
5,382✔
947
    read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read);
5,382✔
948
  }
949
  // no more to read
950
  if (read < rows) {
5,382✔
951
    taosMemoryFree(pShow->pIter);
5,382✔
952
    pShow->pIter = NULL;
5,382✔
953
  }
954
  return read;
5,382✔
955
}
956
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
×
957
  SSmaAndTagIter *p = pIter;
×
UNCOV
958
  if (p != NULL) {
×
959
    SSdb *pSdb = pMnode->pSdb;
×
960
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
961
    sdbCancelFetchByType(pSdb, p->pIdxIter, SDB_IDX);
×
962
  }
963
  taosMemoryFree(p);
×
964
}
×
965

UNCOV
966
static void initSMAObj(SCreateTSMACxt *pCxt) {
×
967
  memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
×
968
  memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
×
969
  memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
×
970
  (void)snprintf(pCxt->pSma->createUser, sizeof(pCxt->pSma->createUser), "%s", pCxt->pOperUser->name);
×
971
  pCxt->pSma->ownerId = pCxt->pOperUser->uid;
×
972
  if (pCxt->pBaseSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pBaseSma->name, TSDB_TABLE_FNAME_LEN);
×
UNCOV
973
  pCxt->pSma->createdTime = taosGetTimestampMs();
×
974
  pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
×
975

976
  memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
977
  pCxt->pSma->dstTbUid = 0;  // not used
×
978
  pCxt->pSma->stbUid = pCxt->pSrcStb ? pCxt->pSrcStb->uid : pCxt->pCreateSmaReq->normSourceTbUid;
×
979
  pCxt->pSma->dbUid = pCxt->pDb->uid;
×
980
  pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
×
981
  pCxt->pSma->intervalUnit = pCxt->pCreateSmaReq->intervalUnit;
×
982
  //  pCxt->pSma->timezone = taosGetLocalTimezoneOffset();
UNCOV
983
  pCxt->pSma->version = 1;
×
984

985
  pCxt->pSma->exprLen = pCxt->pCreateSmaReq->exprLen;
×
986
  pCxt->pSma->sqlLen = pCxt->pCreateSmaReq->sqlLen;
×
987
  pCxt->pSma->astLen = pCxt->pCreateSmaReq->astLen;
×
988
  pCxt->pSma->expr = pCxt->pCreateSmaReq->expr;
×
989
  pCxt->pSma->sql = pCxt->pCreateSmaReq->sql;
×
990
  pCxt->pSma->ast = pCxt->pCreateSmaReq->ast;
×
UNCOV
991
}
×
992

993
static int32_t mndSetUpdateDbTsmaVersionPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
×
994
  int32_t  code = 0;
×
UNCOV
995
  SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
×
UNCOV
996
  if (pRedoRaw == NULL) {
×
997
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
998
    if (terrno != 0) code = terrno;
×
UNCOV
999
    TAOS_RETURN(code);
×
1000
  }
1001
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
×
1002
    sdbFreeRaw(pRedoRaw);
×
1003
    TAOS_RETURN(code);
×
1004
  }
1005

1006
  TAOS_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
×
1007
}
1008

1009
static int32_t mndSetUpdateDbTsmaVersionCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
×
1010
  int32_t  code = 0;
×
UNCOV
1011
  SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
×
UNCOV
1012
  if (pCommitRaw == NULL) {
×
1013
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1014
    if (terrno != 0) code = terrno;
×
UNCOV
1015
    TAOS_RETURN(code);
×
1016
  }
1017
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
1018
    sdbFreeRaw(pCommitRaw);
×
1019
    TAOS_RETURN(code);
×
1020
  }
1021

1022
  TAOS_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
1023
}
1024

1025
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt *pCxt) {
×
UNCOV
1026
  int32_t      code = -1;
×
1027
  STransAction createStreamRedoAction = {0};
×
1028
  STransAction createStreamUndoAction = {0};
×
UNCOV
1029
  STransAction dropStbUndoAction = {0};
×
1030
  SMDropStbReq dropStbReq = {0};
×
1031
  STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
×
UNCOV
1032
  if (!pTrans) {
×
1033
    code = terrno;
×
1034
    goto _OVER;
×
1035
  }
1036
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
×
1037
  TAOS_CHECK_GOTO(mndTransCheckConflict(pCxt->pMnode, pTrans), NULL, _OVER);
×
1038

UNCOV
1039
  mndTransSetSerial(pTrans);
×
1040
  mInfo("trans:%d, used to create tsma:%s", pTrans->id, pCxt->pCreateSmaReq->name);
×
1041

1042
  mndGetMnodeEpSet(pCxt->pMnode, &createStreamRedoAction.epSet);
×
1043
  createStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
1044
  createStreamRedoAction.msgType = TDMT_MND_CREATE_STREAM;
×
1045
  createStreamRedoAction.contLen = pCxt->pCreateSmaReq->streamReqLen;
×
UNCOV
1046
  createStreamRedoAction.pCont = taosMemoryCalloc(1, createStreamRedoAction.contLen);
×
1047
  memcpy(createStreamRedoAction.pCont, pCxt->pCreateSmaReq->createStreamReq, createStreamRedoAction.contLen);
×
1048

1049
  createStreamUndoAction.epSet = createStreamRedoAction.epSet;
×
1050
  createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
1051
  createStreamUndoAction.msgType = TDMT_MND_DROP_STREAM;
×
1052
  createStreamUndoAction.contLen = pCxt->pCreateSmaReq->dropStreamReqLen;
×
1053
  createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
×
1054
  memcpy(createStreamUndoAction.pCont, pCxt->pCreateSmaReq->dropStreamReq, createStreamUndoAction.contLen);
×
1055

1056
  dropStbReq.igNotExists = true;
×
1057
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
1058
  dropStbUndoAction.epSet = createStreamRedoAction.epSet;
×
1059
  dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
×
1060
  dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
1061
  dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
×
1062
  dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
×
1063
  dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
×
UNCOV
1064
  if (!dropStbUndoAction.pCont) {
×
UNCOV
1065
    code = terrno;
×
1066
    goto _OVER;
×
1067
  }
1068
  if (dropStbUndoAction.contLen !=
×
1069
      tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
×
1070
    mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
×
1071
    code = TSDB_CODE_INVALID_MSG;
×
1072
    goto _OVER;
×
1073
  }
1074

1075
  SDbObj newDb = {0};
×
1076
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
×
1077
  newDb.tsmaVersion++;
×
UNCOV
1078
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1079
  TAOS_CHECK_GOTO(mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
UNCOV
1080
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1081
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &createStreamRedoAction), NULL, _OVER);
×
1082
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &createStreamUndoAction), NULL, _OVER);
×
1083
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &dropStbUndoAction), NULL, _OVER);
×
UNCOV
1084
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
UNCOV
1085
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1086
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
×
1087

1088
  code = TSDB_CODE_SUCCESS;
×
1089

1090
_OVER:
×
1091
  mndTransDrop(pTrans);
×
UNCOV
1092
  TAOS_RETURN(code);
×
1093
}
1094

1095
static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
×
1096
  int32_t            code = 0;
×
UNCOV
1097
  SSmaObj            sma = {0};
×
1098

UNCOV
1099
  pCxt->pSma = &sma;
×
UNCOV
1100
  initSMAObj(pCxt);
×
1101

1102
  SNodeList *pProjects = NULL;
×
UNCOV
1103
  code = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects);
×
1104
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1105
    goto _OVER;
×
1106
  }
UNCOV
1107
  pCxt->pProjects = pProjects;
×
1108

1109

1110
  if (TSDB_CODE_SUCCESS != (code = mndCreateTSMATxnPrepare(pCxt))) {
×
1111
    goto _OVER;
×
1112
  } else {
UNCOV
1113
    mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 " dstTb:%s dstVg:%d", pCxt->pCreateSmaReq->name, sma.uid,
×
1114
          sma.stbUid, sma.dstTbName, sma.dstVgId);
1115
    code = 0;
×
1116
  }
1117

1118
_OVER:
×
1119
  if (pProjects) nodesDestroyList(pProjects);
×
UNCOV
1120
  pCxt->pProjects = NULL;
×
1121
  TAOS_RETURN(code);
×
1122
}
1123

UNCOV
1124
static int32_t mndTSMAGenerateOutputName(const char *tsmaName, char *streamName, char *targetStbName) {
×
UNCOV
1125
  SName   smaName;
×
1126
  int32_t code = tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1127
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1128
    return code;
×
1129
  }
1130
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
×
1131
  snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s" TSMA_RES_STB_POSTFIX, tsmaName);
×
1132
  return TSDB_CODE_SUCCESS;
×
1133
}
1134

1135
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq) {
×
1136
#ifdef WINDOWS
1137
  TAOS_RETURN(TSDB_CODE_MND_INVALID_PLATFORM);
1138
#endif
UNCOV
1139
  SMnode        *pMnode = pReq->info.node;
×
1140
  int32_t        code = -1;
×
1141
  SDbObj        *pDb = NULL;
×
1142
  SStbObj       *pStb = NULL;
×
UNCOV
1143
  SSmaObj       *pSma = NULL;
×
UNCOV
1144
  SSmaObj       *pBaseTsma = NULL;
×
1145
  SStreamObj    *pStream = NULL;
×
1146
  SUserObj      *pOperUser = NULL;
×
1147
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
×
UNCOV
1148
  SMCreateSmaReq createReq = {0};
×
1149

1150
  if (sdbGetSize(pMnode->pSdb, SDB_SMA) >= tsMaxTsmaNum) {
×
1151
    code = TSDB_CODE_MND_MAX_TSMA_NUM_EXCEEDED;
×
UNCOV
1152
    goto _OVER;
×
1153
  }
1154

1155
  if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
×
1156
    code = TSDB_CODE_INVALID_MSG;
×
1157
    goto _OVER;
×
1158
  }
1159

UNCOV
1160
  mInfo("start to create tsma: %s", createReq.name);
×
UNCOV
1161
  if ((code = mndCheckCreateSmaReq(&createReq)) != 0) goto _OVER;
×
1162
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser)) != 0) goto _OVER;
×
1163

1164
  if (createReq.normSourceTbUid == 0) {
×
1165
    pStb = mndAcquireStb(pMnode, createReq.stb);
×
1166
    if (!pStb && !createReq.recursiveTsma) {
×
1167
      mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
UNCOV
1168
      code = TSDB_CODE_MND_STB_NOT_EXIST;
×
UNCOV
1169
      goto _OVER;
×
1170
    }
1171
  }
1172

1173
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1174
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1175
  code = mndTSMAGenerateOutputName(createReq.name, streamName, streamTargetStbFullName);
×
UNCOV
1176
  if (TSDB_CODE_SUCCESS != code) {
×
1177
    mInfo("tsma:%s, faield to generate name", createReq.name);
×
1178
    goto _OVER;
×
1179
  }
1180

UNCOV
1181
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.name);
×
1182
  if (pSma && createReq.igExists) {
×
1183
    mInfo("tsma:%s, already exists in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
1184
    code = 0;
×
1185
    goto _OVER;
×
1186
  }
1187

UNCOV
1188
  if (pSma) {
×
UNCOV
1189
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
1190
    goto _OVER;
×
1191
  }
1192

1193
  SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName);
×
1194
  if (pTargetStb) {
×
UNCOV
1195
    code = TSDB_CODE_TDB_STB_ALREADY_EXIST;
×
UNCOV
1196
    mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name,
×
1197
           streamTargetStbFullName);
1198
    goto _OVER;
×
1199
  }
1200

UNCOV
1201
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
1202
  if (pStream != NULL || code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
×
1203
    mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
×
UNCOV
1204
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
1205
    goto _OVER;
×
1206
  }
1207

1208
  SName name = {0};
×
UNCOV
1209
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1210
  if (TSDB_CODE_SUCCESS != code) {
×
1211
    goto _OVER;
×
1212
  }
1213
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
1214
  (void)tNameGetFullDbName(&name, db);
×
1215

1216
  pDb = mndAcquireDb(pMnode, db);
×
1217
  if (pDb == NULL) {
×
1218
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
UNCOV
1219
    goto _OVER;
×
1220
  }
1221

1222
  // TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb), NULL, _OVER);
UNCOV
1223
  if ((code = mndCheckObjPrivilegeRec(pMnode, pOperUser, PRIV_DB_USE, PRIV_OBJ_DB, pDb->ownerId, name.acctId, name.dbname,
×
1224
                                      NULL))) {
1225
    goto _OVER;
×
1226
  }
UNCOV
1227
  if ((code = mndCheckObjPrivilegeRec(pMnode, pOperUser, PRIV_TBL_CREATE, PRIV_OBJ_DB, pDb->ownerId, name.acctId,
×
1228
                                      name.dbname, NULL))) {
UNCOV
1229
    goto _OVER;
×
1230
  }
1231

UNCOV
1232
  if (createReq.recursiveTsma) {
×
UNCOV
1233
    pBaseTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName);
×
UNCOV
1234
    if (!pBaseTsma) {
×
UNCOV
1235
      mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName);
×
UNCOV
1236
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1237
      goto _OVER;
×
1238
    }
UNCOV
1239
    if (!pStb) {
×
1240
      createReq.normSourceTbUid = pBaseTsma->stbUid;
×
1241
    }
1242
  }
1243

UNCOV
1244
  SCreateTSMACxt cxt = {
×
1245
      .pMnode = pMnode,
1246
      .pCreateSmaReq = &createReq,
1247
      .streamName = streamName,
1248
      .targetStbFullName = streamTargetStbFullName,
1249
      .pDb = pDb,
1250
      .pRpcReq = pReq,
1251
      .pSma = NULL,
1252
      .pBaseSma = pBaseTsma,
1253
      .pSrcStb = pStb,
1254
      .pOperUser = pOperUser,
1255
  };
1256

1257
  code = mndCreateTSMA(&cxt);
×
1258
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1259

1260
_OVER:
×
1261
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1262
    mError("tsma:%s, failed to create since %s", createReq.name, tstrerror(code));
×
1263
  }
1264

1265
  if (pStb) mndReleaseStb(pMnode, pStb);
×
1266
  if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
1267
  mndReleaseSma(pMnode, pSma);
×
1268
  mndReleaseStream(pMnode, pStream);
×
1269
  mndReleaseDb(pMnode, pDb);
×
1270
  mndReleaseUser(pMnode, pOperUser);
×
1271
  tFreeSMCreateSmaReq(&createReq);
×
1272

UNCOV
1273
  TAOS_RETURN(code);
×
1274
}
1275

1276
static int32_t mndDropTSMA(SCreateTSMACxt *pCxt) {
×
1277
  int32_t      code = -1;
×
1278
  STransAction dropStreamRedoAction = {0};
×
UNCOV
1279
  STrans      *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "drop-tsma");
×
1280
  if (!pTrans) {
×
1281
    code = terrno;
×
1282
    goto _OVER;
×
1283
  }
1284
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
×
1285
  if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
×
1286
  mndTransSetSerial(pTrans);
×
1287
  mndGetMnodeEpSet(pCxt->pMnode, &dropStreamRedoAction.epSet);
×
1288
  dropStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
UNCOV
1289
  dropStreamRedoAction.msgType = TDMT_MND_DROP_STREAM;
×
1290
  dropStreamRedoAction.contLen = pCxt->pDropSmaReq->dropStreamReqLen;
×
1291
  dropStreamRedoAction.pCont = taosMemoryCalloc(1, dropStreamRedoAction.contLen);
×
1292
  memcpy(dropStreamRedoAction.pCont, pCxt->pDropSmaReq->dropStreamReq, dropStreamRedoAction.contLen);
×
1293

1294
  // output stable is not dropped when dropping stream, dropping it when dropping tsma
UNCOV
1295
  SMDropStbReq dropStbReq = {0};
×
UNCOV
1296
  dropStbReq.igNotExists = false;
×
1297
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
1298
  dropStbReq.sql = "drop";
×
1299
  dropStbReq.sqlLen = 5;
×
1300

1301
  STransAction dropStbRedoAction = {0};
×
1302
  mndGetMnodeEpSet(pCxt->pMnode, &dropStbRedoAction.epSet);
×
1303
  dropStbRedoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
×
1304
  dropStbRedoAction.msgType = TDMT_MND_STB_DROP;
×
1305
  dropStbRedoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
×
1306
  dropStbRedoAction.pCont = taosMemoryCalloc(1, dropStbRedoAction.contLen);
×
1307
  if (!dropStbRedoAction.pCont) {
×
1308
    code = terrno;
×
1309
    goto _OVER;
×
1310
  }
UNCOV
1311
  if (dropStbRedoAction.contLen !=
×
UNCOV
1312
      tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
×
1313
    mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name);
×
1314
    code = TSDB_CODE_INVALID_MSG;
×
1315
    goto _OVER;
×
1316
  }
1317

1318
  SDbObj newDb = {0};
×
1319
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
×
1320
  newDb.tsmaVersion++;
×
1321
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1322
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
UNCOV
1323
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStreamRedoAction), NULL, _OVER);
×
1324
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStbRedoAction), NULL, _OVER);
×
UNCOV
1325
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1326
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
UNCOV
1327
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
×
UNCOV
1328
  code = TSDB_CODE_SUCCESS;
×
1329
_OVER:
×
1330
  mndTransDrop(pTrans);
×
1331
  TAOS_RETURN(code);
×
1332
}
1333

1334
static bool hasRecursiveTsmasBasedOnMe(SMnode *pMnode, const SSmaObj *pSma) {
×
1335
  SSmaObj *pSmaObj = NULL;
×
1336
  void    *pIter = NULL;
×
1337
  while (1) {
1338
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj);
×
UNCOV
1339
    if (pIter == NULL) break;
×
UNCOV
1340
    if (0 == strncmp(pSmaObj->baseSmaName, pSma->name, TSDB_TABLE_FNAME_LEN)) {
×
1341
      sdbRelease(pMnode->pSdb, pSmaObj);
×
1342
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1343
      return true;
×
1344
    }
1345
    sdbRelease(pMnode->pSdb, pSmaObj);
×
1346
  }
UNCOV
1347
  return false;
×
1348
}
1349

1350
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq) {
×
1351
  int32_t      code = -1;
×
1352
  SMDropSmaReq dropReq = {0};
×
1353
  SSmaObj     *pSma = NULL;
×
UNCOV
1354
  SDbObj      *pDb = NULL;
×
1355
  SMnode      *pMnode = pReq->info.node;
×
1356
  SStbObj     *pStb = NULL;
×
1357
  SUserObj    *pUser = NULL;
×
UNCOV
1358
  if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) {
×
1359
    code = TSDB_CODE_INVALID_MSG;
×
1360
    goto _OVER;
×
1361
  }
1362

UNCOV
1363
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1364
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
×
1365
  code = mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
×
UNCOV
1366
  if (TSDB_CODE_SUCCESS != code) {
×
1367
    goto _OVER;
×
1368
  }
1369

1370
  pStb = mndAcquireStb(pMnode, streamTargetStbFullName);
×
1371

UNCOV
1372
  pSma = mndAcquireSma(pMnode, dropReq.name);
×
1373
  if (!pSma && dropReq.igNotExists) {
×
1374
    code = 0;
×
UNCOV
1375
    goto _OVER;
×
1376
  }
1377
  if (!pSma) {
×
1378
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1379
    goto _OVER;
×
1380
  }
UNCOV
1381
  SName name = {0};
×
1382
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1383
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1384
    goto _OVER;
×
1385
  }
UNCOV
1386
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1387
  (void)tNameGetFullDbName(&name, db);
×
1388

UNCOV
1389
  pDb = mndAcquireDb(pMnode, db);
×
UNCOV
1390
  if (!pDb) {
×
UNCOV
1391
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
1392
    goto _OVER;
×
1393
  }
1394

1395
  // if ((code = mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pDb)) != 0) {
1396
  //   goto _OVER;
1397
  // }
1398
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser)) != 0) goto _OVER;
×
1399
  if ((code =
×
1400
           mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_DROP, PRIV_OBJ_TSMA, pSma->ownerId, pSma->db, pSma->name))) {
×
1401
    goto _OVER;
×
1402
  }
1403

1404
  if (hasRecursiveTsmasBasedOnMe(pMnode, pSma)) {
×
1405
    code = TSDB_CODE_MND_INVALID_DROP_TSMA;
×
1406
    goto _OVER;
×
1407
  }
1408

1409
  SCreateTSMACxt cxt = {
×
1410
      .pDb = pDb,
1411
      .pMnode = pMnode,
1412
      .pRpcReq = pReq,
1413
      .pSma = pSma,
1414
      .streamName = streamName,
1415
      .targetStbFullName = streamTargetStbFullName,
1416
      .pDropSmaReq = &dropReq,
1417
  };
1418

UNCOV
1419
  code = mndDropTSMA(&cxt);
×
1420

1421
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1422
_OVER:
×
1423

1424
  tFreeSMDropSmaReq(&dropReq);
×
UNCOV
1425
  mndReleaseStb(pMnode, pStb);
×
1426
  mndReleaseSma(pMnode, pSma);
×
1427
  mndReleaseDb(pMnode, pDb);
×
1428
  mndReleaseUser(pMnode, pUser);
×
1429
  TAOS_RETURN(code);
×
1430
}
1431

1432
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1433
  SDbObj          *pDb = NULL;
×
UNCOV
1434
  int32_t          numOfRows = 0;
×
1435
  SSmaObj         *pSma = NULL;
×
1436
  SMnode          *pMnode = pReq->info.node;
×
1437
  SSdb            *pSdb = pMnode->pSdb;
×
1438
  int32_t          code = 0, lino = 0;
×
1439
  SUserObj        *pUser = NULL;
×
1440
  char             objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
×
UNCOV
1441
  bool             showAll = false;
×
UNCOV
1442
  int64_t          dbUid = 0;
×
1443
  SColumnInfoData *pColInfo;
1444

1445
  if (pShow->pIter == NULL) {
×
1446
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
×
1447
  }
UNCOV
1448
  if (!pShow->pIter) {
×
UNCOV
1449
    return terrno;
×
1450
  }
1451
  if (pShow->db[0]) {
×
UNCOV
1452
    pDb = mndAcquireDb(pMnode, pShow->db);
×
1453
    if (!pDb) {
×
1454
      taosMemoryFreeClear(pShow->pIter);
×
1455
      return terrno;
×
1456
    }
1457
  }
1458

UNCOV
1459
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
×
1460
  int32_t objLevel = privObjGetLevel(PRIV_OBJ_TSMA);
×
1461
  (void)snprintf(objFName, sizeof(objFName), "%d.*", pUser->acctId);
×
1462
  showAll = (0 == mndCheckSysObjPrivilege(pMnode, pUser, PRIV_CM_SHOW, PRIV_OBJ_TSMA, 0, objFName,
×
1463
                                          objLevel == 0 ? NULL : "*"));
UNCOV
1464
  if (!showAll && pShow->db[0] != 0) {
×
1465
    showAll = (0 == mndCheckSysObjPrivilege(pMnode, pUser, PRIV_CM_SHOW, PRIV_OBJ_TSMA, pUser->uid, pShow->db,
×
1466
                                            objLevel == 0 ? NULL : "*"));
1467
  }
1468

1469
  SSmaAndTagIter *pIter = pShow->pIter;
×
1470
  while (numOfRows < rows) {
×
1471
    pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
×
1472
    if (pIter->pSmaIter == NULL) break;
×
1473
    SDbObj *pSrcDb = mndAcquireDb(pMnode, pSma->db);
×
1474

UNCOV
1475
    if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) {
×
1476
      sdbRelease(pMnode->pSdb, pSma);
×
UNCOV
1477
      if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
1478
      continue;
×
1479
    }
1480

UNCOV
1481
    int32_t cols = 0;
×
1482
    SName   n = {0};
×
1483

1484
    code = tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1485

UNCOV
1486
    if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1487
      sdbRelease(pSdb, pSma);
×
1488
      if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
1489
      break;
×
1490
    }
1491

1492
    if (!showAll) {
×
1493
      (void)snprintf(objFName, sizeof(objFName), "%s", pSma->db);
×
UNCOV
1494
      if (mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_CM_SHOW, PRIV_OBJ_TSMA, pSma->ownerId, objFName,
×
1495
                                   objLevel == 0 ? NULL : n.tname)) {  // 1.db1.tsma1
UNCOV
1496
        sdbRelease(pSdb, pSma);
×
1497
        if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
1498
        continue;
×
1499
      }
1500
    }
1501

1502
    char smaName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1503
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1504
      STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n));
×
1505
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1506
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
1507
    }
1508

UNCOV
1509
    char db[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1510
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1511
      STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
×
1512
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1513
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
1514
    }
1515

UNCOV
1516
    if (TSDB_CODE_SUCCESS == code) {
×
1517
      code = tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1518
    }
1519
    char srcTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1520
    if (TSDB_CODE_SUCCESS == code) {
×
1521
      STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
×
1522
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1523
      code = colDataSetVal(pColInfo, numOfRows, (const char *)srcTb, false);
×
1524
    }
1525

1526
    if (TSDB_CODE_SUCCESS == code) {
×
1527
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1528
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
1529
    }
1530

UNCOV
1531
    if (TSDB_CODE_SUCCESS == code) {
×
1532
      code = tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1533
    }
1534

1535
    if (TSDB_CODE_SUCCESS == code) {
×
1536
      char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1537
      STR_TO_VARSTR(targetTb, (char *)tNameGetTableName(&n));
×
1538
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1539
      code = colDataSetVal(pColInfo, numOfRows, (const char *)targetTb, false);
×
1540
    }
1541

1542
    if (TSDB_CODE_SUCCESS == code) {
×
1543
      // stream name
1544
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1545
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
1546
    }
1547

UNCOV
1548
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1549
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1550
      code = colDataSetVal(pColInfo, numOfRows, (const char *)(&pSma->createdTime), false);
×
1551
    }
1552

1553
    // interval
1554
    char    interval[64 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1555
    int32_t len = 0;
×
1556
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1557
      if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
×
UNCOV
1558
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
×
1559
                        getPrecisionUnit(pSrcDb->cfg.precision));
×
1560
      } else {
1561
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
×
1562
      }
1563
      varDataSetLen(interval, len);
×
UNCOV
1564
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1565
      code = colDataSetVal(pColInfo, numOfRows, interval, false);
×
1566
    }
1567

1568
    char buf[TSDB_MAX_SAVED_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1569
    if (TSDB_CODE_SUCCESS == code) {
×
1570
      // create sql
1571
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1572
      len = tsnprintf(buf + VARSTR_HEADER_SIZE, TSDB_MAX_SAVED_SQL_LEN, "%s", pSma->sql);
×
1573
      varDataSetLen(buf, TMIN(len, TSDB_MAX_SAVED_SQL_LEN));
×
1574
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
1575
    }
1576

1577
    // func list
1578
    len = 0;
×
1579
    SNode *pNode = NULL, *pFunc = NULL;
×
1580
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1581
      code = nodesStringToNode(pSma->ast, &pNode);
×
1582
    }
1583
    if (TSDB_CODE_SUCCESS == code) {
×
1584
      char *start = buf + VARSTR_HEADER_SIZE;
×
1585
      FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) {
×
1586
        if (nodeType(pFunc) == QUERY_NODE_FUNCTION) {
×
UNCOV
1587
          SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
×
1588
          if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
1589
          len += tsnprintf(start, TSDB_MAX_SAVED_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "",
×
1590
                           ((SExprNode *)pFunc)->userAlias);
×
1591
          if (len >= TSDB_MAX_SAVED_SQL_LEN) {
×
1592
            len = TSDB_MAX_SAVED_SQL_LEN;
×
UNCOV
1593
            break;
×
1594
          }
1595
          start = buf + VARSTR_HEADER_SIZE + len;
×
1596
        }
1597
      }
1598
      nodesDestroyNode(pNode);
×
1599
    }
1600

1601
    if (TSDB_CODE_SUCCESS == code) {
×
1602
      varDataSetLen(buf, len);
×
UNCOV
1603
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1604
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
1605
    }
1606

1607
    numOfRows++;
×
1608
    mndReleaseSma(pMnode, pSma);
×
1609
    mndReleaseDb(pMnode, pSrcDb);
×
1610
    if (TSDB_CODE_SUCCESS != code) {
×
1611
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
1612
      numOfRows = code;
×
1613
      break;
×
1614
    }
1615
  }
1616
_exit:
×
UNCOV
1617
  if (code != 0) {
×
UNCOV
1618
    numOfRows = code;
×
1619
  }
UNCOV
1620
  mndReleaseDb(pMnode, pDb);
×
1621
  mndReleaseUser(pMnode, pUser);
×
1622
  pShow->numOfRows += numOfRows;
×
UNCOV
1623
  if (numOfRows < rows) {
×
1624
    taosMemoryFree(pShow->pIter);
×
1625
    pShow->pIter = NULL;
×
1626
  }
1627
  return numOfRows;
×
1628
}
1629

1630
static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
×
1631
  SSmaAndTagIter *p = pIter;
×
1632
  if (p != NULL) {
×
UNCOV
1633
    SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1634
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
1635
  }
UNCOV
1636
  taosMemoryFree(p);
×
1637
}
×
1638

1639
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj *pSma, const SStbObj *pDestStb, STableTSMAInfo *pInfo,
×
1640
                               const SSmaObj *pBaseTsma) {
UNCOV
1641
  int32_t code = 0;
×
UNCOV
1642
  pInfo->interval = pSma->interval;
×
1643
  pInfo->unit = pSma->intervalUnit;
×
1644
  pInfo->tsmaId = pSma->uid;
×
1645
  pInfo->version = pSma->version;
×
1646
  pInfo->tsmaId = pSma->uid;
×
UNCOV
1647
  pInfo->destTbUid = pDestStb->uid;
×
UNCOV
1648
  SName sName = {0};
×
UNCOV
1649
  code = tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1650
  if (TSDB_CODE_SUCCESS != code) {
×
1651
    return code;
×
1652
  }
UNCOV
1653
  tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN);
×
UNCOV
1654
  tstrncpy(pInfo->targetDbFName, pSma->db, TSDB_DB_FNAME_LEN);
×
1655
  code = tNameFromString(&sName, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1656
  if (TSDB_CODE_SUCCESS != code) {
×
1657
    return code;
×
1658
  }
1659
  tstrncpy(pInfo->targetTb, sName.tname, TSDB_TABLE_NAME_LEN);
×
1660
  tstrncpy(pInfo->dbFName, pSma->db, TSDB_DB_FNAME_LEN);
×
1661
  code = tNameFromString(&sName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1662
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1663
    return code;
×
1664
  }
1665
  tstrncpy(pInfo->tb, sName.tname, TSDB_TABLE_NAME_LEN);
×
1666
  pInfo->pFuncs = taosArrayInit(8, sizeof(STableTSMAFuncInfo));
×
1667
  if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY;
×
1668

1669
  SNode *pNode, *pFunc;
×
1670
  if (TSDB_CODE_SUCCESS != nodesStringToNode(pBaseTsma ? pBaseTsma->ast : pSma->ast, &pNode)) {
×
UNCOV
1671
    taosArrayDestroy(pInfo->pFuncs);
×
1672
    pInfo->pFuncs = NULL;
×
UNCOV
1673
    return TSDB_CODE_TSMA_INVALID_STAT;
×
1674
  }
1675
  if (pNode) {
×
1676
    SSelectStmt *pSelect = (SSelectStmt *)pNode;
×
UNCOV
1677
    FOREACH(pFunc, pSelect->pProjectionList) {
×
UNCOV
1678
      STableTSMAFuncInfo funcInfo = {0};
×
1679
      SFunctionNode     *pFuncNode = (SFunctionNode *)pFunc;
×
1680
      if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
1681
      funcInfo.funcId = pFuncNode->funcId;
×
1682
      funcInfo.colId = ((SColumnNode *)pFuncNode->pParameterList->pHead->pNode)->colId;
×
1683
      if (!taosArrayPush(pInfo->pFuncs, &funcInfo)) {
×
UNCOV
1684
        code = terrno;
×
1685
        taosArrayDestroy(pInfo->pFuncs);
×
1686
        nodesDestroyNode(pNode);
×
1687
        return code;
×
1688
      }
1689
    }
1690
    nodesDestroyNode(pNode);
×
1691
  }
UNCOV
1692
  pInfo->ast = taosStrdup(pSma->ast);
×
1693
  if (!pInfo->ast) code = terrno;
×
1694

1695
  if (code == TSDB_CODE_SUCCESS && pDestStb->numOfTags > 0) {
×
1696
    pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema));
×
1697
    if (!pInfo->pTags) {
×
1698
      code = terrno;
×
1699
    } else {
UNCOV
1700
      for (int32_t i = 0; i < pDestStb->numOfTags; ++i) {
×
1701
        if (NULL == taosArrayPush(pInfo->pTags, &pDestStb->pTags[i])) {
×
1702
          code = terrno;
×
1703
          break;
×
1704
        }
1705
      }
1706
    }
1707
  }
1708
  if (code == TSDB_CODE_SUCCESS) {
×
1709
    pInfo->pUsedCols = taosArrayInit(pDestStb->numOfColumns - 3, sizeof(SSchema));
×
1710
    if (!pInfo->pUsedCols)
×
UNCOV
1711
      code = terrno;
×
1712
    else {
1713
      // skip _wstart, _wend, _duration
1714
      for (int32_t i = 1; i < pDestStb->numOfColumns - 2; ++i) {
×
UNCOV
1715
        if (NULL == taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i])) {
×
1716
          code = terrno;
×
UNCOV
1717
          break;
×
1718
        }
1719
      }
1720
    }
1721
  }
UNCOV
1722
  TAOS_RETURN(code);
×
1723
}
1724

1725
// @note remember to mndReleaseSma(*ppOut)
UNCOV
1726
static int32_t mndGetDeepestBaseForTsma(SMnode *pMnode, SSmaObj *pSma, SSmaObj **ppOut) {
×
UNCOV
1727
  int32_t  code = 0;
×
UNCOV
1728
  SSmaObj *pRecursiveTsma = NULL;
×
UNCOV
1729
  if (pSma->baseSmaName[0]) {
×
UNCOV
1730
    pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName);
×
UNCOV
1731
    if (!pRecursiveTsma) {
×
UNCOV
1732
      mError("base tsma: %s for tsma: %s not found", pSma->baseSmaName, pSma->name);
×
1733
      return TSDB_CODE_MND_SMA_NOT_EXIST;
×
1734
    }
UNCOV
1735
    while (pRecursiveTsma->baseSmaName[0]) {
×
UNCOV
1736
      SSmaObj *pTmpSma = pRecursiveTsma;
×
1737
      pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
×
1738
      if (!pRecursiveTsma) {
×
1739
        mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name);
×
UNCOV
1740
        mndReleaseSma(pMnode, pTmpSma);
×
UNCOV
1741
        return TSDB_CODE_MND_SMA_NOT_EXIST;
×
1742
      }
1743
      mndReleaseSma(pMnode, pTmpSma);
×
1744
    }
1745
  }
1746
  *ppOut = pRecursiveTsma;
×
UNCOV
1747
  return code;
×
1748
}
1749

1750
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
×
1751
  int32_t  code = -1;
×
1752
  SSmaObj *pSma = NULL;
×
1753
  SSmaObj *pBaseTsma = NULL;
×
1754
  SStbObj *pDstStb = NULL;
×
1755

UNCOV
1756
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName);
×
1757
  if (pSma) {
×
1758
    pDstStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
UNCOV
1759
    if (!pDstStb) {
×
1760
      sdbRelease(pMnode->pSdb, pSma);
×
1761
      return TSDB_CODE_SUCCESS;
×
1762
    }
1763

1764
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
1765
    if (!pTsma) {
×
UNCOV
1766
      code = terrno;
×
1767
      sdbRelease(pMnode->pSdb, pSma);
×
1768
      mndReleaseStb(pMnode, pDstStb);
×
1769
      TAOS_RETURN(code);
×
1770
    }
1771

UNCOV
1772
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
×
1773
    if (code == 0) {
×
1774
      code = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma, pBaseTsma);
×
1775
    }
1776
    mndReleaseStb(pMnode, pDstStb);
×
1777
    sdbRelease(pMnode->pSdb, pSma);
×
1778
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
1779
    if (terrno) {
×
1780
      tFreeAndClearTableTSMAInfo(pTsma);
×
1781
      TAOS_RETURN(code);
×
1782
    }
UNCOV
1783
    if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
×
UNCOV
1784
      code = terrno;
×
1785
      tFreeAndClearTableTSMAInfo(pTsma);
×
1786
    }
1787
    *exist = true;
×
1788
  }
1789
  TAOS_RETURN(code);
×
1790
}
1791

1792
typedef bool (*tsmaFilter)(const SSmaObj *pSma, void *param);
1793

1794
static int32_t mndGetSomeTsmas(SMnode *pMnode, STableTSMAInfoRsp *pRsp, tsmaFilter filtered, void *param, bool *exist) {
612,792✔
1795
  int32_t     code = 0;
612,792✔
1796
  SSmaObj    *pSma = NULL;
612,792✔
1797
  SSmaObj    *pBaseTsma = NULL;
612,792✔
1798
  SSdb       *pSdb = pMnode->pSdb;
612,792✔
1799
  void       *pIter = NULL;
612,792✔
1800
  SStreamObj *pStream = NULL;
612,792✔
1801
  SStbObj    *pStb = NULL;
612,792✔
1802
  bool        shouldRetry = false;
612,792✔
1803

1804
  while (1) {
×
1805
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
612,792✔
1806
    if (pIter == NULL) break;
612,792✔
1807

UNCOV
1808
    if (filtered(pSma, param)) {
×
1809
      sdbRelease(pSdb, pSma);
×
1810
      continue;
×
1811
    }
1812

1813
    pStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
UNCOV
1814
    if (!pStb) {
×
1815
      sdbRelease(pSdb, pSma);
×
UNCOV
1816
      shouldRetry = true;
×
UNCOV
1817
      continue;
×
1818
    }
1819

UNCOV
1820
    SName smaName;
×
UNCOV
1821
    char  streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1822
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1823
    if (TSDB_CODE_SUCCESS != code) {
×
1824
      sdbRelease(pSdb, pSma);
×
1825
      mndReleaseStb(pMnode, pStb);
×
UNCOV
1826
      TAOS_RETURN(code);
×
1827
    }
1828
    snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s.%s", smaName.acctId, smaName.dbname, smaName.tname);
×
1829
    pStream = NULL;
×
1830

UNCOV
1831
    code = mndAcquireStream(pMnode, streamName, &pStream);
×
1832
    if (!pStream) {
×
1833
      shouldRetry = true;
×
1834
      sdbRelease(pSdb, pSma);
×
UNCOV
1835
      mndReleaseStb(pMnode, pStb);
×
UNCOV
1836
      continue;
×
1837
    }
UNCOV
1838
    if (code != 0) {
×
UNCOV
1839
      sdbRelease(pSdb, pSma);
×
UNCOV
1840
      mndReleaseStb(pMnode, pStb);
×
1841
      TAOS_RETURN(code);
×
1842
    }
1843

1844
    int64_t streamId = pStream->pCreate->streamId;
×
1845
    mndReleaseStream(pMnode, pStream);
×
1846

UNCOV
1847
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
1848
    if (!pTsma) {
×
UNCOV
1849
      code = terrno;
×
1850
      mndReleaseStb(pMnode, pStb);
×
1851
      sdbRelease(pSdb, pSma);
×
1852
      sdbCancelFetch(pSdb, pIter);
×
1853
      TAOS_RETURN(code);
×
1854
    }
1855

1856
    pTsma->streamAddr = taosMemoryCalloc(1, sizeof(SStreamTaskAddr));
×
1857
    code = msmGetTriggerTaskAddr(pMnode, streamId, pTsma->streamAddr);
×
UNCOV
1858
    if (code != 0) {
×
1859
      shouldRetry = true;
×
1860
      mndReleaseStb(pMnode, pStb);
×
1861
      sdbRelease(pSdb, pSma);
×
UNCOV
1862
      tFreeAndClearTableTSMAInfo(pTsma);
×
UNCOV
1863
      sdbCancelFetch(pSdb, pIter);
×
1864
      TAOS_RETURN(code);
×
1865
    }
UNCOV
1866
    pTsma->streamUid = streamId;
×
1867

1868
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
×
1869
    if (code == 0) {
×
UNCOV
1870
      code = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma, pBaseTsma);
×
1871
    }
1872
    mndReleaseStb(pMnode, pStb);
×
1873
    sdbRelease(pSdb, pSma);
×
1874
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
1875
    if (terrno) {
×
UNCOV
1876
      tFreeAndClearTableTSMAInfo(pTsma);
×
UNCOV
1877
      sdbCancelFetch(pSdb, pIter);
×
1878
      TAOS_RETURN(code);
×
1879
    }
1880
    if (NULL == taosArrayPush(pRsp->pTsmas, &pTsma)) {
×
1881
      code = terrno;
×
UNCOV
1882
      tFreeAndClearTableTSMAInfo(pTsma);
×
UNCOV
1883
      sdbCancelFetch(pSdb, pIter);
×
1884
      TAOS_RETURN(code);
×
1885
    }
UNCOV
1886
    *exist = true;
×
1887
  }
1888
  if (shouldRetry) {
612,792✔
UNCOV
1889
    return TSDB_CODE_NEED_RETRY;
×
1890
  }
1891
  return TSDB_CODE_SUCCESS;
612,792✔
1892
}
1893

UNCOV
1894
static bool tsmaTbFilter(const SSmaObj *pSma, void *param) {
×
1895
  const char *tbFName = param;
×
1896
  return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0;
×
1897
}
1898

UNCOV
1899
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *pRsp, bool *exist) {
×
1900
  return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist);
×
1901
}
1902

1903
static bool tsmaDbFilter(const SSmaObj *pSma, void *param) {
×
1904
  uint64_t *dbUid = param;
×
1905
  return pSma->dbUid != *dbUid;
×
1906
}
1907

1908
int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist) {
612,792✔
1909
  return mndGetSomeTsmas(pMnode, pRsp, tsmaDbFilter, &dbUid, exist);
612,792✔
1910
}
1911

1912
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
×
UNCOV
1913
  STableTSMAInfoRsp rsp = {0};
×
UNCOV
1914
  int32_t           code = -1;
×
1915
  STableTSMAInfoReq tsmaReq = {0};
×
UNCOV
1916
  bool              exist = false;
×
1917
  SMnode           *pMnode = pReq->info.node;
×
1918

1919
  TAOS_CHECK_GOTO(tDeserializeTableTSMAInfoReq(pReq->pCont, pReq->contLen, &tsmaReq), NULL, _OVER);
×
1920

1921
  rsp.pTsmas = taosArrayInit(4, POINTER_BYTES);
×
1922
  if (NULL == rsp.pTsmas) {
×
UNCOV
1923
    code = terrno;
×
1924
    goto _OVER;
×
1925
  }
1926

1927
  if (tsmaReq.fetchingWithTsmaName) {
×
UNCOV
1928
    code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
×
1929
  } else {
1930
    code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
×
1931
    if (TSDB_CODE_NEED_RETRY == code) {
×
1932
      code = TSDB_CODE_SUCCESS;
×
1933
    }
1934
  }
UNCOV
1935
  if (code) {
×
1936
    goto _OVER;
×
1937
  }
1938

1939
  if (!exist) {
×
1940
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1941
  } else {
1942
    int32_t contLen = tSerializeTableTSMAInfoRsp(NULL, 0, &rsp);
×
1943
    void   *pRsp = rpcMallocCont(contLen);
×
1944
    if (pRsp == NULL) {
×
UNCOV
1945
      code = terrno;
×
1946
      goto _OVER;
×
1947
    }
1948

1949
    int32_t len = tSerializeTableTSMAInfoRsp(pRsp, contLen, &rsp);
×
1950
    if (len < 0) {
×
1951
      code = terrno;
×
1952
      goto _OVER;
×
1953
    }
1954

1955
    pReq->info.rsp = pRsp;
×
1956
    pReq->info.rspLen = contLen;
×
1957

UNCOV
1958
    code = 0;
×
1959
  }
1960

1961
_OVER:
×
1962
  tFreeTableTSMAInfoRsp(&rsp);
×
UNCOV
1963
  TAOS_RETURN(code);
×
1964
}
1965

1966
static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo **ppTsma) {
×
1967
  STableTSMAInfo *pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
1968
  if (!pInfo) {
×
1969
    return terrno;
×
1970
  }
1971
  pInfo->pFuncs = NULL;
×
1972
  pInfo->tsmaId = pTsmaVer->tsmaId;
×
1973
  tstrncpy(pInfo->dbFName, pTsmaVer->dbFName, TSDB_DB_FNAME_LEN);
×
1974
  tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN);
×
UNCOV
1975
  tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN);
×
1976
  pInfo->dbId = pTsmaVer->dbId;
×
UNCOV
1977
  pInfo->ast = taosMemoryCalloc(1, 1);
×
UNCOV
1978
  if (!pInfo->ast) {
×
UNCOV
1979
    taosMemoryFree(pInfo);
×
1980
    return terrno;
×
1981
  }
1982
  *ppTsma = pInfo;
×
1983
  return TSDB_CODE_SUCCESS;
×
1984
}
1985

1986
int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp,
×
1987
                            int32_t *pRspLen) {
UNCOV
1988
  int32_t         code = -1;
×
1989
  STSMAHbRsp      hbRsp = {0};
×
1990
  int32_t         rspLen = 0;
×
1991
  void           *pRsp = NULL;
×
UNCOV
1992
  char            tsmaFName[TSDB_TABLE_FNAME_LEN] = {0};
×
1993
  STableTSMAInfo *pTsmaInfo = NULL;
×
1994

1995
  hbRsp.pTsmas = taosArrayInit(numOfTsmas, POINTER_BYTES);
×
1996
  if (!hbRsp.pTsmas) {
×
1997
    code = terrno;
×
1998
    TAOS_RETURN(code);
×
1999
  }
2000

2001
  for (int32_t i = 0; i < numOfTsmas; ++i) {
×
2002
    STSMAVersion *pTsmaVer = &pTsmaVersions[i];
×
2003
    pTsmaVer->dbId = be64toh(pTsmaVer->dbId);
×
2004
    pTsmaVer->tsmaId = be64toh(pTsmaVer->tsmaId);
×
UNCOV
2005
    pTsmaVer->version = ntohl(pTsmaVer->version);
×
2006

UNCOV
2007
    snprintf(tsmaFName, sizeof(tsmaFName), "%s.%s", pTsmaVer->dbFName, pTsmaVer->name);
×
2008
    SSmaObj *pSma = mndAcquireSma(pMnode, tsmaFName);
×
2009
    if (!pSma) {
×
2010
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2011
      if (code) goto _OVER;
×
UNCOV
2012
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
UNCOV
2013
        code = terrno;
×
2014
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2015
        goto _OVER;
×
2016
      }
2017
      continue;
×
2018
    }
2019

UNCOV
2020
    if (pSma->uid != pTsmaVer->tsmaId) {
×
2021
      mDebug("tsma: %s.%" PRIx64 " tsmaId mismatch with current %" PRIx64, tsmaFName, pTsmaVer->tsmaId, pSma->uid);
×
2022
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2023
      mndReleaseSma(pMnode, pSma);
×
2024
      if (code) goto _OVER;
×
UNCOV
2025
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2026
        code = terrno;
×
2027
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2028
        goto _OVER;
×
2029
      }
2030
      continue;
×
2031
    } else if (pSma->version == pTsmaVer->version) {
×
UNCOV
2032
      mndReleaseSma(pMnode, pSma);
×
UNCOV
2033
      continue;
×
2034
    }
2035

UNCOV
2036
    SStbObj *pDestStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
UNCOV
2037
    if (!pDestStb) {
×
UNCOV
2038
      mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName);
×
2039
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
UNCOV
2040
      mndReleaseSma(pMnode, pSma);
×
UNCOV
2041
      if (code) goto _OVER;
×
UNCOV
2042
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
UNCOV
2043
        code = terrno;
×
2044
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2045
        goto _OVER;
×
2046
      }
2047
      continue;
×
2048
    }
2049

2050
    // dump smaObj into rsp
UNCOV
2051
    STableTSMAInfo *pInfo = NULL;
×
2052
    pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
UNCOV
2053
    if (!pInfo) {
×
UNCOV
2054
      code = terrno;
×
UNCOV
2055
      mndReleaseSma(pMnode, pSma);
×
UNCOV
2056
      mndReleaseStb(pMnode, pDestStb);
×
UNCOV
2057
      goto _OVER;
×
2058
    }
2059

UNCOV
2060
    SSmaObj *pBaseSma = NULL;
×
UNCOV
2061
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma);
×
UNCOV
2062
    if (code == 0) code = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma);
×
2063

UNCOV
2064
    mndReleaseStb(pMnode, pDestStb);
×
UNCOV
2065
    mndReleaseSma(pMnode, pSma);
×
UNCOV
2066
    if (pBaseSma) mndReleaseSma(pMnode, pBaseSma);
×
UNCOV
2067
    if (terrno) {
×
UNCOV
2068
      tFreeAndClearTableTSMAInfo(pInfo);
×
UNCOV
2069
      goto _OVER;
×
2070
    }
2071

UNCOV
2072
    if (NULL == taosArrayPush(hbRsp.pTsmas, pInfo)) {
×
UNCOV
2073
      code = terrno;
×
UNCOV
2074
      tFreeAndClearTableTSMAInfo(pInfo);
×
UNCOV
2075
      goto _OVER;
×
2076
    }
2077
  }
2078

UNCOV
2079
  rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp);
×
UNCOV
2080
  if (rspLen < 0) {
×
UNCOV
2081
    code = terrno;
×
UNCOV
2082
    goto _OVER;
×
2083
  }
2084

UNCOV
2085
  pRsp = taosMemoryMalloc(rspLen);
×
UNCOV
2086
  if (!pRsp) {
×
UNCOV
2087
    code = terrno;
×
UNCOV
2088
    rspLen = 0;
×
UNCOV
2089
    goto _OVER;
×
2090
  }
2091

UNCOV
2092
  rspLen = tSerializeTSMAHbRsp(pRsp, rspLen, &hbRsp);
×
UNCOV
2093
  if (rspLen < 0) {
×
UNCOV
2094
    code = terrno;
×
UNCOV
2095
    goto _OVER;
×
2096
  }
UNCOV
2097
  code = 0;
×
UNCOV
2098
_OVER:
×
UNCOV
2099
  tFreeTSMAHbRsp(&hbRsp);
×
UNCOV
2100
  *ppRsp = pRsp;
×
UNCOV
2101
  *pRspLen = rspLen;
×
UNCOV
2102
  TAOS_RETURN(code);
×
2103
}
2104

2105
int32_t mndDropTSMAsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
579,508✔
2106
  int32_t code = 0;
579,508✔
2107
  SSdb   *pSdb = pMnode->pSdb;
579,508✔
2108
  void   *pIter = NULL;
579,508✔
2109

UNCOV
2110
  while (1) {
×
2111
    SSmaObj *pSma = NULL;
579,508✔
2112
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
579,508✔
2113
    if (pIter == NULL) break;
579,508✔
2114

UNCOV
2115
    if (pSma->dbUid == pDb->uid) {
×
UNCOV
2116
      if ((code = mndSetDropSmaCommitLogs(pMnode, pTrans, pSma)) != 0) {
×
UNCOV
2117
        sdbRelease(pSdb, pSma);
×
UNCOV
2118
        sdbCancelFetch(pSdb, pSma);
×
UNCOV
2119
        TAOS_RETURN(code);
×
2120
      }
2121
    }
2122

UNCOV
2123
    sdbRelease(pSdb, pSma);
×
2124
  }
2125

2126
  TAOS_RETURN(code);
579,508✔
2127
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc