• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3796

31 Mar 2025 10:39AM UTC coverage: 30.372% (-7.1%) from 37.443%
#3796

push

travis-ci

happyguoxy
test:add test cases

69287 of 309062 branches covered (22.42%)

Branch coverage included in aggregate %.

118044 of 307720 relevant lines covered (38.36%)

278592.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.37
/source/dnode/mnode/impl/src/mndSma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSma.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndIndex.h"
22
#include "mndIndexComm.h"
23
#include "mndInfoSchema.h"
24
#include "mndMnode.h"
25
#include "mndPrivilege.h"
26
#include "mndScheduler.h"
27
#include "mndShow.h"
28
#include "mndStb.h"
29
#include "mndStream.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "parser.h"
34
#include "tname.h"
35

36
#define TSDB_SMA_VER_NUMBER   1
37
#define TSDB_SMA_RESERVE_SIZE 64
38

39
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma);
40
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
41
static int32_t  mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
42
static int32_t  mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
43
static int32_t  mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
44
static int32_t  mndProcessCreateSmaReq(SRpcMsg *pReq);
45
static int32_t  mndProcessDropSmaReq(SRpcMsg *pReq);
46
static int32_t  mndProcessGetSmaReq(SRpcMsg *pReq);
47
static int32_t  mndProcessGetTbSmaReq(SRpcMsg *pReq);
48
static int32_t  mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void     mndDestroySmaObj(SSmaObj *pSmaObj);
50

51
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq);
52
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq);
53

54
// sma and tag index comm func
55
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
56
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
57
static void    mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
58

59
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
60
static void    mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter);
61
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq);
62

63
typedef struct SCreateTSMACxt {
64
  SMnode        *pMnode;
65
  const SRpcMsg *pRpcReq;
66
  union {
67
    const SMCreateSmaReq *pCreateSmaReq;
68
    const SMDropSmaReq   *pDropSmaReq;
69
  };
70
  SDbObj             *pDb;
71
  SStbObj            *pSrcStb;
72
  SSmaObj            *pSma;
73
  const SSmaObj      *pBaseSma;
74
  SCMCreateStreamReq *pCreateStreamReq;
75
  SMDropStreamReq    *pDropStreamReq;
76
  const char         *streamName;
77
  const char         *targetStbFullName;
78
  SNodeList          *pProjects;
79
} SCreateTSMACxt;
80

81
int32_t mndInitSma(SMnode *pMnode) {
9✔
82
  SSdbTable table = {
9✔
83
      .sdbType = SDB_SMA,
84
      .keyType = SDB_KEY_BINARY,
85
      .encodeFp = (SdbEncodeFp)mndSmaActionEncode,
86
      .decodeFp = (SdbDecodeFp)mndSmaActionDecode,
87
      .insertFp = (SdbInsertFp)mndSmaActionInsert,
88
      .updateFp = (SdbUpdateFp)mndSmaActionUpdate,
89
      .deleteFp = (SdbDeleteFp)mndSmaActionDelete,
90
  };
91

92
//  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq);
93
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
9✔
94
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
9✔
95
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
9✔
96
  mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
9✔
97
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
9✔
98

99
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
9✔
100
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
9✔
101

102
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq);
9✔
103
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq);
9✔
104
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_TSMA, mndProcessGetTbTSMAReq);
9✔
105
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TSMA, mndProcessGetTbTSMAReq);
9✔
106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA);
9✔
107
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA);
9✔
108

109
  return sdbSetTable(pMnode->pSdb, table);
9✔
110
}
111

112
void mndCleanupSma(SMnode *pMnode) {}
9✔
113

114
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
×
115
  int32_t code = 0;
×
116
  int32_t lino = 0;
×
117
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
118

119
  int32_t size =
×
120
      sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
×
121
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size);
×
122
  if (pRaw == NULL) goto _OVER;
×
123

124
  int32_t dataPos = 0;
×
125
  SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
×
126
  SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
×
127
  SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
×
128
  SDB_SET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
×
129
  SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER)
×
130
  SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER)
×
131
  SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER)
×
132
  SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER)
×
133
  SDB_SET_INT64(pRaw, dataPos, pSma->dstTbUid, _OVER)
×
134
  SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER)
×
135
  SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER)
×
136
  SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER)
×
137
  SDB_SET_INT32(pRaw, dataPos, pSma->dstVgId, _OVER)
×
138
  SDB_SET_INT64(pRaw, dataPos, pSma->interval, _OVER)
×
139
  SDB_SET_INT64(pRaw, dataPos, pSma->offset, _OVER)
×
140
  SDB_SET_INT64(pRaw, dataPos, pSma->sliding, _OVER)
×
141
  SDB_SET_INT32(pRaw, dataPos, pSma->exprLen, _OVER)
×
142
  SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER)
×
143
  SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER)
×
144
  SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER)
×
145
  SDB_SET_INT32(pRaw, dataPos, pSma->version, _OVER)
×
146

147
  if (pSma->exprLen > 0) {
×
148
    SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
×
149
  }
150
  if (pSma->tagsFilterLen > 0) {
×
151
    SDB_SET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
152
  }
153
  if (pSma->sqlLen > 0) {
×
154
    SDB_SET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
×
155
  }
156
  if (pSma->astLen > 0) {
×
157
    SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
×
158
  }
159
  SDB_SET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
×
160

161
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
×
162
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
×
163

164
  terrno = 0;
×
165

166
_OVER:
×
167
  if (terrno != 0) {
×
168
    mError("sma:%s, failed to encode to raw:%p since %s", pSma->name, pRaw, terrstr());
×
169
    sdbFreeRaw(pRaw);
×
170
    return NULL;
×
171
  }
172

173
  mTrace("sma:%s, encode to raw:%p, row:%p", pSma->name, pRaw, pSma);
×
174
  return pRaw;
×
175
}
176

177
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
×
178
  int32_t code = 0;
×
179
  int32_t lino = 0;
×
180
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
181
  SSdbRow *pRow = NULL;
×
182
  SSmaObj *pSma = NULL;
×
183

184
  int8_t sver = 0;
×
185
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
×
186

187
  if (sver != TSDB_SMA_VER_NUMBER) {
×
188
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
189
    goto _OVER;
×
190
  }
191

192
  pRow = sdbAllocRow(sizeof(SSmaObj));
×
193
  if (pRow == NULL) goto _OVER;
×
194

195
  pSma = sdbGetRowObj(pRow);
×
196
  if (pSma == NULL) goto _OVER;
×
197

198
  int32_t dataPos = 0;
×
199

200
  SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
×
201
  SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
×
202
  SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
×
203
  SDB_GET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
×
204
  SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER)
×
205
  SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER)
×
206
  SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER)
×
207
  SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER)
×
208
  SDB_GET_INT64(pRaw, dataPos, &pSma->dstTbUid, _OVER)
×
209
  SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER)
×
210
  SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER)
×
211
  SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER)
×
212
  SDB_GET_INT32(pRaw, dataPos, &pSma->dstVgId, _OVER)
×
213
  SDB_GET_INT64(pRaw, dataPos, &pSma->interval, _OVER)
×
214
  SDB_GET_INT64(pRaw, dataPos, &pSma->offset, _OVER)
×
215
  SDB_GET_INT64(pRaw, dataPos, &pSma->sliding, _OVER)
×
216
  SDB_GET_INT32(pRaw, dataPos, &pSma->exprLen, _OVER)
×
217
  SDB_GET_INT32(pRaw, dataPos, &pSma->tagsFilterLen, _OVER)
×
218
  SDB_GET_INT32(pRaw, dataPos, &pSma->sqlLen, _OVER)
×
219
  SDB_GET_INT32(pRaw, dataPos, &pSma->astLen, _OVER)
×
220
  SDB_GET_INT32(pRaw, dataPos, &pSma->version, _OVER)
×
221

222
  if (pSma->exprLen > 0) {
×
223
    pSma->expr = taosMemoryCalloc(pSma->exprLen, 1);
×
224
    if (pSma->expr == NULL) goto _OVER;
×
225
    SDB_GET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
×
226
  }
227

228
  if (pSma->tagsFilterLen > 0) {
×
229
    pSma->tagsFilter = taosMemoryCalloc(pSma->tagsFilterLen, 1);
×
230
    if (pSma->tagsFilter == NULL) goto _OVER;
×
231
    SDB_GET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
232
  }
233

234
  if (pSma->sqlLen > 0) {
×
235
    pSma->sql = taosMemoryCalloc(pSma->sqlLen, 1);
×
236
    if (pSma->sql == NULL) goto _OVER;
×
237
    SDB_GET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
×
238
  }
239

240
  if (pSma->astLen > 0) {
×
241
    pSma->ast = taosMemoryCalloc(pSma->astLen, 1);
×
242
    if (pSma->ast == NULL) goto _OVER;
×
243
    SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
×
244
  }
245
  SDB_GET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
×
246

247
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
×
248

249
  terrno = 0;
×
250

251
_OVER:
×
252
  if (terrno != 0) {
×
253
    if (pSma != NULL) {
×
254
      mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr());
×
255
      taosMemoryFreeClear(pSma->expr);
×
256
      taosMemoryFreeClear(pSma->tagsFilter);
×
257
      taosMemoryFreeClear(pSma->sql);
×
258
      taosMemoryFreeClear(pSma->ast);
×
259
    }
260
    taosMemoryFreeClear(pRow);
×
261
    return NULL;
×
262
  }
263

264
  mTrace("sma:%s, decode from raw:%p, row:%p", pSma->name, pRaw, pSma);
×
265
  return pRow;
×
266
}
267

268
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma) {
×
269
  mTrace("sma:%s, perform insert action, row:%p", pSma->name, pSma);
×
270
  return 0;
×
271
}
272

273
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSma) {
×
274
  mTrace("sma:%s, perform delete action, row:%p", pSma->name, pSma);
×
275
  taosMemoryFreeClear(pSma->tagsFilter);
×
276
  taosMemoryFreeClear(pSma->expr);
×
277
  taosMemoryFreeClear(pSma->sql);
×
278
  taosMemoryFreeClear(pSma->ast);
×
279
  return 0;
×
280
}
281

282
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew) {
×
283
  mTrace("sma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
×
284
  return 0;
×
285
}
286

287
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName) {
×
288
  SSdb    *pSdb = pMnode->pSdb;
×
289
  SSmaObj *pSma = sdbAcquire(pSdb, SDB_SMA, smaName);
×
290
  if (pSma == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
291
    terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
×
292
  }
293
  return pSma;
×
294
}
295

296
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) {
×
297
  SSdb *pSdb = pMnode->pSdb;
×
298
  sdbRelease(pSdb, pSma);
×
299
}
×
300

301
SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *db) { return mndAcquireDb(pMnode, db); }
×
302

303
static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
×
304
  SEncoder encoder = {0};
×
305
  int32_t  contLen = 0;
×
306
  SName    name = {0};
×
307
  int32_t  code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
308
  if (TSDB_CODE_SUCCESS != code) {
×
309
    return NULL;
×
310
  }
311

312
  SVCreateTSmaReq req = {0};
×
313
  req.version = 0;
×
314
  req.intervalUnit = pSma->intervalUnit;
×
315
  req.slidingUnit = pSma->slidingUnit;
×
316
  //  req.timezoneInt = pSma->timezone;
317
  tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
×
318
  req.exprLen = pSma->exprLen;
×
319
  req.tagsFilterLen = pSma->tagsFilterLen;
×
320
  req.indexUid = pSma->uid;
×
321
  req.tableUid = pSma->stbUid;
×
322
  req.dstVgId = pSma->dstVgId;
×
323
  req.dstTbUid = pSma->dstTbUid;
×
324
  req.interval = pSma->interval;
×
325
  req.offset = pSma->offset;
×
326
  req.sliding = pSma->sliding;
×
327
  req.expr = pSma->expr;
×
328
  req.tagsFilter = pSma->tagsFilter;
×
329
  req.schemaRow = pSma->schemaRow;
×
330
  req.schemaTag = pSma->schemaTag;
×
331
  req.dstTbName = pSma->dstTbName;
×
332

333
  // get length
334
  int32_t ret = 0;
×
335
  tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret);
×
336
  if (ret < 0) {
×
337
    return NULL;
×
338
  }
339
  contLen += sizeof(SMsgHead);
×
340

341
  SMsgHead *pHead = taosMemoryMalloc(contLen);
×
342
  if (pHead == NULL) {
×
343
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
344
    return NULL;
×
345
  }
346

347
  pHead->contLen = htonl(contLen);
×
348
  pHead->vgId = htonl(pVgroup->vgId);
×
349

350
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
×
351
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
×
352
  if (tEncodeSVCreateTSmaReq(&encoder, &req) < 0) {
×
353
    taosMemoryFreeClear(pHead);
×
354
    tEncoderClear(&encoder);
×
355
    return NULL;
×
356
  }
357

358
  tEncoderClear(&encoder);
×
359

360
  *pContLen = contLen;
×
361
  return pHead;
×
362
}
363

364
static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
×
365
  SEncoder encoder = {0};
×
366
  int32_t  contLen;
367
  SName    name = {0};
×
368
  int32_t  code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
369
  if (TSDB_CODE_SUCCESS != code) {
×
370
    terrno = code;
×
371
    return NULL;
×
372
  }
373

374
  SVDropTSmaReq req = {0};
×
375
  req.indexUid = pSma->uid;
×
376
  tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
×
377

378
  // get length
379
  int32_t ret = 0;
×
380
  tEncodeSize(tEncodeSVDropTSmaReq, &req, contLen, ret);
×
381
  if (ret < 0) {
×
382
    return NULL;
×
383
  }
384

385
  contLen += sizeof(SMsgHead);
×
386

387
  SMsgHead *pHead = taosMemoryMalloc(contLen);
×
388
  if (pHead == NULL) {
×
389
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
390
    return NULL;
×
391
  }
392

393
  pHead->contLen = htonl(contLen);
×
394
  pHead->vgId = htonl(pVgroup->vgId);
×
395

396
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
×
397
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
×
398

399
  if (tEncodeSVDropTSmaReq(&encoder, &req) < 0) {
×
400
    taosMemoryFreeClear(pHead);
×
401
    tEncoderClear(&encoder);
×
402
    return NULL;
×
403
  }
404
  tEncoderClear(&encoder);
×
405

406
  *pContLen = contLen;
×
407
  return pHead;
×
408
}
409

410
static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
411
  int32_t  code = 0;
×
412
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
×
413
  if (pRedoRaw == NULL) {
×
414
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
415
    if (terrno != 0) code = terrno;
×
416
    TAOS_RETURN(code);
×
417
  }
418
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
419
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
420

421
  TAOS_RETURN(code);
×
422
}
423

424
static int32_t mndSetCreateSmaUndoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
425
  int32_t  code = 0;
×
426
  SSdbRaw *pUndoRaw = mndSmaActionEncode(pSma);
×
427
  if (!pUndoRaw) {
×
428
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
429
    if (terrno != 0) code = terrno;
×
430
    TAOS_RETURN(code);
×
431
  }
432
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
433
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
434
  TAOS_RETURN(code);
×
435
}
436

437
static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
438
  int32_t  code = 0;
×
439
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
×
440
  if (pCommitRaw == NULL) {
×
441
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
442
    if (terrno != 0) code = terrno;
×
443
    TAOS_RETURN(code);
×
444
  }
445
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
446
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
447

448
  TAOS_RETURN(code);
×
449
}
450

451
static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
452
  int32_t  code = 0;
×
453
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
454
  if (pVgRaw == NULL) {
×
455
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
456
    if (terrno != 0) code = terrno;
×
457
    TAOS_RETURN(code);
×
458
  }
459
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
×
460
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_UPDATE));
×
461
  TAOS_RETURN(code);
×
462
}
463

464
static int32_t mndSetCreateSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
465
  int32_t  code = 0;
×
466
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
467
  if (pVgRaw == NULL) {
×
468
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
469
    if (terrno != 0) code = terrno;
×
470
    TAOS_RETURN(code);
×
471
  }
472
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
×
473
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_READY));
×
474
  TAOS_RETURN(code);
×
475
}
476

477
static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
×
478
  int32_t code = 0;
×
479
  SStbObj stbObj = {0};
×
480
  taosRLockLatch(&pStb->lock);
×
481
  memcpy(&stbObj, pStb, sizeof(SStbObj));
×
482
  taosRUnLockLatch(&pStb->lock);
×
483
  stbObj.numOfColumns = 0;
×
484
  stbObj.pColumns = NULL;
×
485
  stbObj.numOfTags = 0;
×
486
  stbObj.pTags = NULL;
×
487
  stbObj.numOfFuncs = 0;
×
488
  stbObj.pFuncs = NULL;
×
489
  stbObj.updateTime = taosGetTimestampMs();
×
490
  stbObj.lock = 0;
×
491
  stbObj.smaVer++;
×
492

493
  SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj);
×
494
  if (pCommitRaw == NULL) {
×
495
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
496
    if (terrno != 0) code = terrno;
×
497
    TAOS_RETURN(code);
×
498
  }
499
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
500
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
501

502
  TAOS_RETURN(code);
×
503
}
504

505
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
×
506
                                                SSmaObj *pSma) {
507
  int32_t    code = 0;
×
508
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
×
509
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
510
  if (pDnode == NULL) {
×
511
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
512
    if (terrno != 0) code = terrno;
×
513
    TAOS_RETURN(code);
×
514
  }
515

516
  STransAction action = {0};
×
517
  action.epSet = mndGetDnodeEpset(pDnode);
×
518
  mndReleaseDnode(pMnode, pDnode);
×
519

520
  // todo add sma info here
521
  SNode *pAst = NULL;
×
522
  TAOS_CHECK_RETURN(nodesStringToNode(pSma->ast, &pAst));
×
523
  if ((code = qExtractResultSchema(pAst, &pSma->schemaRow.nCols, &pSma->schemaRow.pSchema)) != 0) {
×
524
    nodesDestroyNode(pAst);
×
525
    TAOS_RETURN(code);
×
526
  }
527
  nodesDestroyNode(pAst);
×
528
  pSma->schemaRow.version = 1;
×
529

530
  // TODO: the schemaTag generated by qExtractResultXXX later.
531
  pSma->schemaTag.nCols = 1;
×
532
  pSma->schemaTag.version = 1;
×
533
  pSma->schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema));
×
534
  if (!pSma->schemaTag.pSchema) {
×
535
    TAOS_RETURN(-1);
×
536
  }
537
  pSma->schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT;
×
538
  pSma->schemaTag.pSchema[0].bytes = TYPE_BYTES[TSDB_DATA_TYPE_BIGINT];
×
539
  pSma->schemaTag.pSchema[0].colId = pSma->schemaRow.nCols + PRIMARYKEY_TIMESTAMP_COL_ID;
×
540
  pSma->schemaTag.pSchema[0].flags = 0;
×
541
  snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId");
×
542

543
  int32_t smaContLen = 0;
×
544
  void   *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
×
545
  if (pSmaReq == NULL) {
×
546
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
547
    if (terrno != 0) code = terrno;
×
548
    TAOS_RETURN(code);
×
549
  }
550
  pVgroup->pTsma = pSmaReq;
×
551

552
  int32_t contLen = 0;
×
553
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
554
  if (pReq == NULL) {
×
555
    taosMemoryFreeClear(pSmaReq);
×
556
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
557
    if (terrno != 0) code = terrno;
×
558
    TAOS_RETURN(code);
×
559
  }
560

561
  action.mTraceId = pTrans->mTraceId;
×
562
  action.pCont = pReq;
×
563
  action.contLen = contLen;
×
564
  action.msgType = TDMT_DND_CREATE_VNODE;
×
565
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
×
566

567
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
568
    taosMemoryFreeClear(pSmaReq);
×
569
    taosMemoryFree(pReq);
×
570
    TAOS_RETURN(code);
×
571
  }
572

573
  action.pCont = pSmaReq;
×
574
  action.contLen = smaContLen;
×
575
  action.msgType = TDMT_VND_CREATE_SMA;
×
576
  action.acceptableCode = TSDB_CODE_TSMA_ALREADY_EXIST;
×
577

578
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
579
    taosMemoryFreeClear(pSmaReq);
×
580
    TAOS_RETURN(code);
×
581
  }
582

583
  TAOS_RETURN(code);
×
584
}
585

586
static void mndDestroySmaObj(SSmaObj *pSmaObj) {
×
587
  if (pSmaObj) {
×
588
    taosMemoryFreeClear(pSmaObj->schemaRow.pSchema);
×
589
    taosMemoryFreeClear(pSmaObj->schemaTag.pSchema);
×
590
  }
591
}
×
592

593
#if 0
594
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb,
595
                            const char *streamName) {
596
  int32_t code = 0;
597
  if (pDb->cfg.replications > 1) {
598
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
599
    mError("sma:%s, failed to create since not support multiple replicas", pCreate->name);
600
    TAOS_RETURN(code);
601
  }
602
  SSmaObj smaObj = {0};
603
  memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
604
  memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
605
  memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
606
  smaObj.createdTime = taosGetTimestampMs();
607
  smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
608

609
  char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
610
  snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name);
611
  memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
612
  smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
613
  smaObj.stbUid = pStb->uid;
614
  smaObj.dbUid = pStb->dbUid;
615
  smaObj.intervalUnit = pCreate->intervalUnit;
616
  smaObj.slidingUnit = pCreate->slidingUnit;
617
#if 0
618
//  smaObj.timezone = pCreate->timezone;
619
#endif
620
  //  smaObj.timezone = taosGetLocalTimezoneOffset();  // use timezone of server
621
  smaObj.interval = pCreate->interval;
622
  smaObj.offset = pCreate->offset;
623
  smaObj.sliding = pCreate->sliding;
624
  smaObj.exprLen = pCreate->exprLen;
625
  smaObj.tagsFilterLen = pCreate->tagsFilterLen;
626
  smaObj.sqlLen = pCreate->sqlLen;
627
  smaObj.astLen = pCreate->astLen;
628
  if (smaObj.exprLen > 0) {
629
    smaObj.expr = pCreate->expr;
630
  }
631
  if (smaObj.tagsFilterLen > 0) {
632
    smaObj.tagsFilter = pCreate->tagsFilter;
633
  }
634
  if (smaObj.sqlLen > 0) {
635
    smaObj.sql = pCreate->sql;
636
  }
637
  if (smaObj.astLen > 0) {
638
    smaObj.ast = pCreate->ast;
639
  }
640

641
  SStreamObj streamObj = {0};
642
  tstrncpy(streamObj.name, streamName, TSDB_STREAM_FNAME_LEN);
643
  tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
644
  tstrncpy(streamObj.targetDb, streamObj.sourceDb, TSDB_DB_FNAME_LEN);
645
  streamObj.createTime = taosGetTimestampMs();
646
  streamObj.updateTime = streamObj.createTime;
647
  streamObj.uid = mndGenerateUid(streamName, strlen(streamName));
648
  streamObj.sourceDbUid = pDb->uid;
649
  streamObj.targetDbUid = pDb->uid;
650
  streamObj.version = 1;
651
  streamObj.sql = taosStrdup(pCreate->sql);
652
  if (!streamObj.sql) {
653
    return terrno;
654
  }
655
  streamObj.smaId = smaObj.uid;
656
  streamObj.conf.watermark = pCreate->watermark;
657
  streamObj.deleteMark = pCreate->deleteMark;
658
  streamObj.conf.fillHistory = STREAM_FILL_HISTORY_ON;
659
  streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
660
  streamObj.conf.triggerParam = pCreate->maxDelay;
661
  streamObj.ast = taosStrdup(smaObj.ast);
662
  if (!streamObj.ast) {
663
    taosMemoryFree(streamObj.sql);
664
    return terrno;
665
  }
666
  streamObj.indexForMultiAggBalance = -1;
667

668
  // check the maxDelay
669
  if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
670
    int64_t msInterval = -1;
671
    int32_t code =
672
        convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND, &msInterval);
673
    if (TSDB_CODE_SUCCESS != code) {
674
      mError("sma:%s, failed to create since convert time failed: %s", smaObj.name, tstrerror(code));
675
      return code;
676
    }
677
    streamObj.conf.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
678
  }
679
  if (streamObj.conf.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
680
    streamObj.conf.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
681
  }
682

683
  if ((code = mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg)) != 0) {
684
    mError("sma:%s, failed to create since %s", smaObj.name, tstrerror(code));
685
    TAOS_RETURN(code);
686
  }
687
  smaObj.dstVgId = streamObj.fixedSinkVg.vgId;
688
  streamObj.fixedSinkVgId = smaObj.dstVgId;
689

690
  SNode *pAst = NULL;
691
  if (nodesStringToNode(streamObj.ast, &pAst) < 0) {
692
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
693
    mError("sma:%s, failed to create since parse ast error", smaObj.name);
694
    TAOS_RETURN(code);
695
  }
696

697
  // extract output schema from ast
698
  if (qExtractResultSchema(pAst, (int32_t *)&streamObj.outputSchema.nCols, &streamObj.outputSchema.pSchema) != 0) {
699
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
700
    mError("sma:%s, failed to create since extract result schema error", smaObj.name);
701
    TAOS_RETURN(code);
702
  }
703

704
  SQueryPlan  *pPlan = NULL;
705
  SPlanContext cxt = {
706
      .pAstRoot = pAst,
707
      .topicQuery = false,
708
      .streamQuery = true,
709
      .triggerType = streamObj.conf.trigger,
710
      .watermark = streamObj.conf.watermark,
711
      .deleteMark = streamObj.deleteMark,
712
  };
713

714
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
715
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
716
    mError("sma:%s, failed to create since create query plan error", smaObj.name);
717
    TAOS_RETURN(code);
718
  }
719

720
  // save physcial plan
721
  if (nodesNodeToString((SNode *)pPlan, false, &streamObj.physicalPlan, NULL) != 0) {
722
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
723
    mError("sma:%s, failed to create since save physcial plan error", smaObj.name);
724
    TAOS_RETURN(code);
725
  }
726

727
  if (pAst != NULL) nodesDestroyNode(pAst);
728
  nodesDestroyNode((SNode *)pPlan);
729

730
  code = -1;
731
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-sma");
732
  if (pTrans == NULL) {
733
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
734
    if (terrno != 0) code = terrno;
735
    goto _OVER;
736
  }
737
  mndTransSetDbName(pTrans, pDb->name, NULL);
738
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
739

740
  mndTransSetSerial(pTrans);
741
  mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
742
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
743
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj), NULL, _OVER);
744
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
745
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj), NULL, _OVER);
746
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
747
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
748
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj), NULL, _OVER);
749
  TAOS_CHECK_GOTO(mndScheduleStream(pMnode, &streamObj, 1685959190000, NULL), NULL, _OVER);
750
  TAOS_CHECK_GOTO(mndPersistStream(pTrans, &streamObj), NULL, _OVER);
751
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
752

753
  mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreate->name,
754
        smaObj.uid, smaObj.stbUid, smaObj.dstTbUid, smaObj.dstTbName, smaObj.dstVgId);
755

756
  code = 0;
757

758
_OVER:
759
  tFreeStreamObj(&streamObj);
760
  mndDestroySmaObj(&smaObj);
761
  mndTransDrop(pTrans);
762
  TAOS_RETURN(code);
763
}
764
#endif
765

766
static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
×
767
  int32_t code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
768
  if (pCreate->name[0] == 0) TAOS_RETURN(code);
×
769
  if (pCreate->stb[0] == 0) TAOS_RETURN(code);
×
770
  if (pCreate->igExists < 0 || pCreate->igExists > 1) TAOS_RETURN(code);
×
771
  if (pCreate->intervalUnit < 0) TAOS_RETURN(code);
×
772
  if (pCreate->slidingUnit < 0) TAOS_RETURN(code);
×
773
  if (pCreate->timezone < 0) TAOS_RETURN(code);
×
774
  if (pCreate->interval < 0) TAOS_RETURN(code);
×
775
  if (pCreate->offset < 0) TAOS_RETURN(code);
×
776
  if (pCreate->sliding < 0) TAOS_RETURN(code);
×
777
  if (pCreate->exprLen < 0) TAOS_RETURN(code);
×
778
  if (pCreate->tagsFilterLen < 0) TAOS_RETURN(code);
×
779
  if (pCreate->sqlLen < 0) TAOS_RETURN(code);
×
780
  if (pCreate->astLen < 0) TAOS_RETURN(code);
×
781
  if (pCreate->exprLen != 0 && strlen(pCreate->expr) + 1 != pCreate->exprLen) TAOS_RETURN(code);
×
782
  if (pCreate->tagsFilterLen != 0 && strlen(pCreate->tagsFilter) + 1 != pCreate->tagsFilterLen) TAOS_RETURN(code);
×
783
  if (pCreate->sqlLen != 0 && strlen(pCreate->sql) + 1 != pCreate->sqlLen) TAOS_RETURN(code);
×
784
  if (pCreate->astLen != 0 && strlen(pCreate->ast) + 1 != pCreate->astLen) TAOS_RETURN(code);
×
785

786
  SName smaName = {0};
×
787
  if (tNameFromString(&smaName, pCreate->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) < 0) return -1;
×
788
  if (*(char *)tNameGetTableName(&smaName) == 0) return -1;
×
789

790
  code = 0;
×
791
  TAOS_RETURN(code);
×
792
}
793

794
static int32_t mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
×
795
  SName   n;
796
  int32_t code = tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
797
  if (TSDB_CODE_SUCCESS != code) {
×
798
    return code;
×
799
  }
800
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", n.acctId, n.tname);
×
801
  return TSDB_CODE_SUCCESS;
×
802
}
803

804
#if 0
805
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
806
  SMnode        *pMnode = pReq->info.node;
807
  int32_t        code = -1;
808
  SStbObj       *pStb = NULL;
809
  SSmaObj       *pSma = NULL;
810
  SStreamObj    *pStream = NULL;
811
  SDbObj        *pDb = NULL;
812
  SMCreateSmaReq createReq = {0};
813

814
  int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
815

816
  TAOS_CHECK_GOTO(tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
817

818
#ifdef WINDOWS
819
  terrno = TSDB_CODE_MND_INVALID_PLATFORM;
820
  goto _OVER;
821
#endif
822
  mInfo("sma:%s, start to create", createReq.name);
823
  TAOS_CHECK_GOTO(mndCheckCreateSmaReq(&createReq), NULL, _OVER);
824

825
  pStb = mndAcquireStb(pMnode, createReq.stb);
826
  if (pStb == NULL) {
827
    mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
828
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
829
    if (terrno != 0) code = terrno;
830
    goto _OVER;
831
  }
832

833
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
834
  code = mndGetStreamNameFromSmaName(streamName, createReq.name);
835
  if (TSDB_CODE_SUCCESS != code) {
836
    goto _OVER;
837
  }
838

839
  code = mndAcquireStream(pMnode, streamName, &pStream);
840
  if (pStream != NULL || code == 0) {
841
    mError("sma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
842
    code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
843
    goto _OVER;
844
  }
845
  SSIdx idx = {0};
846
  if ((code = mndAcquireGlobalIdx(pMnode, createReq.name, SDB_SMA, &idx)) == 0) {
847
    pSma = idx.pIdx;
848
  } else {
849
    goto _OVER;
850
  }
851

852
  if (pSma != NULL) {
853
    if (createReq.igExists) {
854
      mInfo("sma:%s, already exist in sma:%s, ignore exist is set", createReq.name, pSma->name);
855
      code = 0;
856
      goto _OVER;
857
    } else {
858
      code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
859
      goto _OVER;
860
    }
861
  }
862

863
  SName name = {0};
864
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
865
  if (TSDB_CODE_SUCCESS != code) {
866
    goto _OVER;
867
  }
868
  char db[TSDB_TABLE_FNAME_LEN] = {0};
869
  (void)tNameGetFullDbName(&name, db);
870

871
  pDb = mndAcquireDb(pMnode, db);
872
  if (pDb == NULL) {
873
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
874
    goto _OVER;
875
  }
876

877
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
878

879
  code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb, streamName);
880
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
881

882
_OVER:
883
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
884
    mError("sma:%s, failed to create since %s", createReq.name, tstrerror(code));
885
  }
886

887
  mndReleaseStb(pMnode, pStb);
888
  mndReleaseSma(pMnode, pSma);
889
  mndReleaseStream(pMnode, pStream);
890
  mndReleaseDb(pMnode, pDb);
891
  tFreeSMCreateSmaReq(&createReq);
892

893
  TAOS_RETURN(code);
894
}
895
#endif
896

897
static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
898
  int32_t  code = 0;
×
899
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
×
900
  if (pRedoRaw == NULL) {
×
901
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
902
    if (terrno != 0) code = terrno;
×
903
    return -1;
×
904
  }
905
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
906
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
907

908
  return 0;
×
909
}
910

911
static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
912
  int32_t  code = 0;
×
913
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
×
914
  if (pCommitRaw == NULL) {
×
915
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
916
    if (terrno != 0) code = terrno;
×
917
    return -1;
×
918
  }
919
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
920
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
921

922
  return 0;
×
923
}
924

925
static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
926
  int32_t  code = 0;
×
927
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
928
  if (pVgRaw == NULL) {
×
929
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
930
    if (terrno != 0) code = terrno;
×
931
    return -1;
×
932
  }
933
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
×
934
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPING));
×
935

936
  return 0;
×
937
}
938

939
static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
940
  int32_t  code = 0;
×
941
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
942
  if (pVgRaw == NULL) {
×
943
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
944
    if (terrno != 0) code = terrno;
×
945
    return -1;
×
946
  }
947
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
×
948
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED));
×
949

950
  return 0;
×
951
}
952

953
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
954
  int32_t    code = 0;
×
955
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
×
956
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
957
  if (pDnode == NULL) {
×
958
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
959
    if (terrno != 0) code = terrno;
×
960
    TAOS_RETURN(code);
×
961
  }
962

963
  STransAction action = {0};
×
964
  action.epSet = mndGetDnodeEpset(pDnode);
×
965
  mndReleaseDnode(pMnode, pDnode);
×
966

967
  int32_t contLen = 0;
×
968
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
969
  if (pReq == NULL) {
×
970
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
971
    if (terrno != 0) code = terrno;
×
972
    TAOS_RETURN(code);
×
973
  }
974

975
  action.pCont = pReq;
×
976
  action.contLen = contLen;
×
977
  action.msgType = TDMT_DND_DROP_VNODE;
×
978
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
×
979

980
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
981
    taosMemoryFree(pReq);
×
982
    TAOS_RETURN(code);
×
983
  }
984

985
  TAOS_RETURN(code);
×
986
}
987

988
static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) {
×
989
  int32_t     code = -1;
×
990
  SVgObj     *pVgroup = NULL;
×
991
  SStbObj    *pStb = NULL;
×
992
  STrans     *pTrans = NULL;
×
993
  SStreamObj *pStream = NULL;
×
994

995
  pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
996
  if (pVgroup == NULL) {
×
997
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
998
    if (terrno != 0) code = terrno;
×
999
    goto _OVER;
×
1000
  }
1001

1002
  pStb = mndAcquireStb(pMnode, pSma->stb);
×
1003
  if (pStb == NULL) {
×
1004
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1005
    if (terrno != 0) code = terrno;
×
1006
    goto _OVER;
×
1007
  }
1008

1009
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-sma");
×
1010
  if (pTrans == NULL) {
×
1011
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1012
    if (terrno != 0) code = terrno;
×
1013
    goto _OVER;
×
1014
  }
1015

1016
  mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
×
1017
  mndTransSetDbName(pTrans, pDb->name, NULL);
×
1018
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
1019

1020
  mndTransSetSerial(pTrans);
×
1021

1022
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1023
  code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
1024
  if (TSDB_CODE_SUCCESS != code) {
×
1025
    goto _OVER;
×
1026
  }
1027

1028
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
1029
  if (pStream == NULL || pStream->smaId != pSma->uid || code != 0) {
×
1030
    sdbRelease(pMnode->pSdb, pStream);
×
1031
    goto _OVER;
×
1032
  } else {
1033
    if ((code = mndStreamSetDropAction(pMnode, pTrans, pStream)) < 0) {
×
1034
      mError("stream:%s, failed to drop task since %s", pStream->name, tstrerror(code));
×
1035
      sdbRelease(pMnode->pSdb, pStream);
×
1036
      goto _OVER;
×
1037
    }
1038

1039
    // drop stream
1040
    if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
1041
      mError("stream:%s, failed to drop log since %s", pStream->name, tstrerror(code));
×
1042
      sdbRelease(pMnode->pSdb, pStream);
×
1043
      goto _OVER;
×
1044
    }
1045
  }
1046
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
1047
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
1048
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
1049
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
1050
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
×
1051
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
1052
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
1053

1054
  code = 0;
×
1055

1056
_OVER:
×
1057
  mndTransDrop(pTrans);
×
1058
  mndReleaseStream(pMnode, pStream);
×
1059
  mndReleaseVgroup(pMnode, pVgroup);
×
1060
  mndReleaseStb(pMnode, pStb);
×
1061
  TAOS_RETURN(code);
×
1062
}
1063

1064
int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
2✔
1065
  SSdb    *pSdb = pMnode->pSdb;
2✔
1066
  SSmaObj *pSma = NULL;
2✔
1067
  void    *pIter = NULL;
2✔
1068
  SVgObj  *pVgroup = NULL;
2✔
1069
  int32_t  code = -1;
2✔
1070

1071
  while (1) {
1072
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
2✔
1073
    if (pIter == NULL) break;
2!
1074

1075
    if (pSma->stbUid == pStb->uid) {
×
1076
      mndTransSetSerial(pTrans);
×
1077
      pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
1078
      if (pVgroup == NULL) {
×
1079
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1080
        if (terrno != 0) code = terrno;
×
1081
        goto _OVER;
×
1082
      }
1083

1084
      char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1085
      code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
1086
      if (TSDB_CODE_SUCCESS != code) {
×
1087
        goto _OVER;
×
1088
      }
1089

1090
      SStreamObj *pStream = NULL;
×
1091
      code = mndAcquireStream(pMnode, streamName, &pStream);
×
1092
      if ((pStream != NULL && pStream->smaId == pSma->uid) || code != 0) {
×
1093
        if ((code = mndStreamSetDropAction(pMnode, pTrans, pStream)) < 0) {
×
1094
          mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
×
1095
          mndReleaseStream(pMnode, pStream);
×
1096
          goto _OVER;
×
1097
        }
1098

1099
        if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
1100
          mndReleaseStream(pMnode, pStream);
×
1101
          goto _OVER;
×
1102
        }
1103

1104
        mndReleaseStream(pMnode, pStream);
×
1105
      }
1106

1107
      TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
1108
      TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
1109
      TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
1110
      mndReleaseVgroup(pMnode, pVgroup);
×
1111
      pVgroup = NULL;
×
1112
    }
1113

1114
    sdbRelease(pSdb, pSma);
×
1115
  }
1116

1117
  code = 0;
2✔
1118

1119
_OVER:
2✔
1120
  sdbCancelFetch(pSdb, pIter);
2✔
1121
  sdbRelease(pSdb, pSma);
2✔
1122
  mndReleaseVgroup(pMnode, pVgroup);
2✔
1123
  TAOS_RETURN(code);
2✔
1124
}
1125

1126
int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
8✔
1127
  int32_t code = 0;
8✔
1128
  SSdb   *pSdb = pMnode->pSdb;
8✔
1129
  void   *pIter = NULL;
8✔
1130

1131
  while (1) {
×
1132
    SSmaObj *pSma = NULL;
8✔
1133
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
8✔
1134
    if (pIter == NULL) break;
8!
1135

1136
    if (pSma->dbUid == pDb->uid) {
×
1137
      if ((code = mndSetDropSmaCommitLogs(pMnode, pTrans, pSma)) != 0) {
×
1138
        sdbRelease(pSdb, pSma);
×
1139
        sdbCancelFetch(pSdb, pSma);
×
1140
        TAOS_RETURN(code);
×
1141
      }
1142
    }
1143

1144
    sdbRelease(pSdb, pSma);
×
1145
  }
1146

1147
  TAOS_RETURN(code);
8✔
1148
}
1149

1150
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
×
1151
  SMnode      *pMnode = pReq->info.node;
×
1152
  int32_t      code = -1;
×
1153
  SDbObj      *pDb = NULL;
×
1154
  SSmaObj     *pSma = NULL;
×
1155
  SMDropSmaReq dropReq = {0};
×
1156

1157
  TAOS_CHECK_GOTO(tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
1158

1159
  mInfo("sma:%s, start to drop", dropReq.name);
×
1160

1161
  SSIdx idx = {0};
×
1162
  if ((code = mndAcquireGlobalIdx(pMnode, dropReq.name, SDB_SMA, &idx)) == 0) {
×
1163
    pSma = idx.pIdx;
×
1164
  } else {
1165
    goto _OVER;
×
1166
  }
1167
  if (pSma == NULL) {
×
1168
    if (dropReq.igNotExists) {
×
1169
      mInfo("sma:%s, not exist, ignore not exist is set", dropReq.name);
×
1170
      code = 0;
×
1171
      goto _OVER;
×
1172
    } else {
1173
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1174
      goto _OVER;
×
1175
    }
1176
  }
1177

1178
  SName name = {0};
×
1179
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1180
  if (TSDB_CODE_SUCCESS != code) {
×
1181
    goto _OVER;
×
1182
  }
1183
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
1184
  (void)tNameGetFullDbName(&name, db);
×
1185

1186
  pDb = mndAcquireDb(pMnode, db);
×
1187
  if (pDb == NULL) {
×
1188
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1189
    goto _OVER;
×
1190
  }
1191

1192
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
1193

1194
  code = mndDropSma(pMnode, pReq, pDb, pSma);
×
1195
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1196

1197
_OVER:
×
1198
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1199
    mError("sma:%s, failed to drop since %s", dropReq.name, tstrerror(code));
×
1200
  }
1201

1202
  mndReleaseSma(pMnode, pSma);
×
1203
  mndReleaseDb(pMnode, pDb);
×
1204
  TAOS_RETURN(code);
×
1205
}
1206

1207
static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
×
1208
  int32_t  code = -1;
×
1209
  SSmaObj *pSma = NULL;
×
1210

1211
  SSIdx idx = {0};
×
1212
  if (0 == mndAcquireGlobalIdx(pMnode, indexReq->indexFName, SDB_SMA, &idx)) {
×
1213
    pSma = idx.pIdx;
×
1214
  } else {
1215
    *exist = false;
×
1216
    return 0;
×
1217
  }
1218

1219
  if (pSma == NULL) {
×
1220
    *exist = false;
×
1221
    return 0;
×
1222
  }
1223

1224
  memcpy(rsp->dbFName, pSma->db, sizeof(pSma->db));
×
1225
  memcpy(rsp->tblFName, pSma->stb, sizeof(pSma->stb));
×
1226
  tstrncpy(rsp->indexType, TSDB_INDEX_TYPE_SMA, TSDB_INDEX_TYPE_LEN);
×
1227

1228
  SNodeList *pList = NULL;
×
1229
  int32_t    extOffset = 0;
×
1230
  code = nodesStringToList(pSma->expr, &pList);
×
1231
  if (0 == code) {
×
1232
    SNode *node = NULL;
×
1233
    FOREACH(node, pList) {
×
1234
      SFunctionNode *pFunc = (SFunctionNode *)node;
×
1235
      extOffset += tsnprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
×
1236
                             (extOffset ? "," : ""), pFunc->functionName);
×
1237
    }
1238

1239
    *exist = true;
×
1240
  }
1241

1242
  mndReleaseSma(pMnode, pSma);
×
1243
  return code;
×
1244
}
1245

1246
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) {
×
1247
  int32_t         code = 0;
×
1248
  SSmaObj        *pSma = NULL;
×
1249
  SSdb           *pSdb = pMnode->pSdb;
×
1250
  void           *pIter = NULL;
×
1251
  STableIndexInfo info;
1252

1253
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
×
1254
  if (NULL == pStb) {
×
1255
    *exist = false;
×
1256
    return TSDB_CODE_SUCCESS;
×
1257
  }
1258

1259
  tstrncpy(rsp->dbFName, pStb->db, TSDB_DB_FNAME_LEN);
×
1260
  tstrncpy(rsp->tbName, pStb->name + strlen(pStb->db) + 1, TSDB_TABLE_NAME_LEN);
×
1261
  rsp->suid = pStb->uid;
×
1262
  rsp->version = pStb->smaVer;
×
1263
  mndReleaseStb(pMnode, pStb);
×
1264

1265
  while (1) {
×
1266
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
×
1267
    if (pIter == NULL) break;
×
1268

1269
    if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
×
1270
      sdbRelease(pSdb, pSma);
×
1271
      continue;
×
1272
    }
1273

1274
    info.intervalUnit = pSma->intervalUnit;
×
1275
    info.slidingUnit = pSma->slidingUnit;
×
1276
    info.interval = pSma->interval;
×
1277
    info.offset = pSma->offset;
×
1278
    info.sliding = pSma->sliding;
×
1279
    info.dstTbUid = pSma->dstTbUid;
×
1280
    info.dstVgId = pSma->dstVgId;
×
1281

1282
    SVgObj *pVg = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
1283
    if (pVg == NULL) {
×
1284
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1285
      if (terrno != 0) code = terrno;
×
1286
      sdbRelease(pSdb, pSma);
×
1287
      sdbCancelFetch(pSdb, pIter);
×
1288
      return code;
×
1289
    }
1290
    info.epSet = mndGetVgroupEpset(pMnode, pVg);
×
1291

1292
    info.expr = taosMemoryMalloc(pSma->exprLen + 1);
×
1293
    if (info.expr == NULL) {
×
1294
      code = terrno;
×
1295
      sdbRelease(pSdb, pSma);
×
1296
      sdbCancelFetch(pSdb, pIter);
×
1297
      return code;
×
1298
    }
1299

1300
    memcpy(info.expr, pSma->expr, pSma->exprLen);
×
1301
    info.expr[pSma->exprLen] = 0;
×
1302

1303
    if (NULL == taosArrayPush(rsp->pIndex, &info)) {
×
1304
      code = terrno;
×
1305
      taosMemoryFree(info.expr);
×
1306
      sdbRelease(pSdb, pSma);
×
1307
      sdbCancelFetch(pSdb, pIter);
×
1308
      return code;
×
1309
    }
1310

1311
    rsp->indexSize += sizeof(info) + pSma->exprLen + 1;
×
1312
    *exist = true;
×
1313

1314
    sdbRelease(pSdb, pSma);
×
1315
  }
1316

1317
  TAOS_RETURN(code);
×
1318
}
1319

1320
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
×
1321
  SUserIndexReq indexReq = {0};
×
1322
  SMnode       *pMnode = pReq->info.node;
×
1323
  int32_t       code = -1;
×
1324
  SUserIndexRsp rsp = {0};
×
1325
  bool          exist = false;
×
1326

1327
  TAOS_CHECK_GOTO(tDeserializeSUserIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
1328

1329
  code = mndGetSma(pMnode, &indexReq, &rsp, &exist);
×
1330
  if (code) {
×
1331
    goto _OVER;
×
1332
  }
1333

1334
  if (!exist) {
×
1335
    // TODO GET INDEX FROM FULLTEXT
1336
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
1337
  } else {
1338
    int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
×
1339
    void   *pRsp = rpcMallocCont(contLen);
×
1340
    if (pRsp == NULL) {
×
1341
      code = terrno;
×
1342
      goto _OVER;
×
1343
    }
1344

1345
    contLen = tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
×
1346
    if (contLen < 0) {
×
1347
      code = terrno;
×
1348
      goto _OVER;
×
1349
    }
1350

1351
    pReq->info.rsp = pRsp;
×
1352
    pReq->info.rspLen = contLen;
×
1353

1354
    code = 0;
×
1355
  }
1356

1357
_OVER:
×
1358
  if (code != 0) {
×
1359
    mError("failed to get index %s since %s", indexReq.indexFName, tstrerror(code));
×
1360
  }
1361

1362
  TAOS_RETURN(code);
×
1363
}
1364

1365
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
×
1366
  STableIndexReq indexReq = {0};
×
1367
  SMnode        *pMnode = pReq->info.node;
×
1368
  int32_t        code = -1;
×
1369
  STableIndexRsp rsp = {0};
×
1370
  bool           exist = false;
×
1371

1372
  TAOS_CHECK_GOTO(tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
1373

1374
  rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
×
1375
  if (NULL == rsp.pIndex) {
×
1376
    code = terrno;
×
1377
    goto _OVER;
×
1378
  }
1379

1380
  code = mndGetTableSma(pMnode, indexReq.tbFName, &rsp, &exist);
×
1381
  if (code) {
×
1382
    goto _OVER;
×
1383
  }
1384

1385
  if (!exist) {
×
1386
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
1387
  } else {
1388
    int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp);
×
1389
    void   *pRsp = rpcMallocCont(contLen);
×
1390
    if (pRsp == NULL) {
×
1391
      code = terrno;
×
1392
      goto _OVER;
×
1393
    }
1394

1395
    contLen = tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
×
1396
    if (contLen < 0) {
×
1397
      code = terrno;
×
1398
      goto _OVER;
×
1399
    }
1400

1401
    pReq->info.rsp = pRsp;
×
1402
    pReq->info.rspLen = contLen;
×
1403

1404
    code = 0;
×
1405
  }
1406

1407
_OVER:
×
1408
  if (code != 0) {
×
1409
    mError("failed to get table index %s since %s", indexReq.tbFName, tstrerror(code));
×
1410
  }
1411

1412
  tFreeSerializeSTableIndexRsp(&rsp);
×
1413
  TAOS_RETURN(code);
×
1414
}
1415

1416
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1417
  SMnode  *pMnode = pReq->info.node;
×
1418
  SSdb    *pSdb = pMnode->pSdb;
×
1419
  int32_t  numOfRows = 0;
×
1420
  SSmaObj *pSma = NULL;
×
1421
  int32_t  cols = 0;
×
1422
  int32_t  code = 0;
×
1423

1424
  SDbObj *pDb = NULL;
×
1425
  if (strlen(pShow->db) > 0) {
×
1426
    pDb = mndAcquireDb(pMnode, pShow->db);
×
1427
    if (pDb == NULL) return 0;
×
1428
  }
1429
  SSmaAndTagIter *pIter = pShow->pIter;
×
1430
  while (numOfRows < rows) {
×
1431
    pIter->pSmaIter = sdbFetch(pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
×
1432
    if (pIter->pSmaIter == NULL) break;
×
1433

1434
    if (NULL != pDb && pSma->dbUid != pDb->uid) {
×
1435
      sdbRelease(pSdb, pSma);
×
1436
      continue;
×
1437
    }
1438

1439
    cols = 0;
×
1440

1441
    SName smaName = {0};
×
1442
    SName stbName = {0};
×
1443
    char  n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1444
    char  n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1445
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1446
    char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1447
    if (TSDB_CODE_SUCCESS == code) {
×
1448
      STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
×
1449
      STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db));
×
1450
      code = tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1451
    }
1452
    SColumnInfoData *pColInfo = NULL;
×
1453
    if (TSDB_CODE_SUCCESS == code) {
×
1454
      STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
×
1455

1456
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1457
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
×
1458
    }
1459
    if (TSDB_CODE_SUCCESS == code) {
×
1460
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1461
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
×
1462
    }
1463
    if (TSDB_CODE_SUCCESS == code) {
×
1464
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1465
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
×
1466
    }
1467
    if (TSDB_CODE_SUCCESS == code) {
×
1468
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1469
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
×
1470
    }
1471
    if (TSDB_CODE_SUCCESS == code) {
×
1472
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1473
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
×
1474
    }
1475

1476
    char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1477
    STR_TO_VARSTR(col, (char *)"");
×
1478

1479
    if (TSDB_CODE_SUCCESS == code) {
×
1480
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1481
      code = colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
×
1482
    }
1483

1484
    if (TSDB_CODE_SUCCESS == code) {
×
1485
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1486

1487
      char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1488
      STR_TO_VARSTR(tag, (char *)"sma_index");
×
1489
      code = colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
×
1490
    }
1491

1492
    numOfRows++;
×
1493
    sdbRelease(pSdb, pSma);
×
1494
    if (TSDB_CODE_SUCCESS != code) {
×
1495
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
1496
      numOfRows = -1;
×
1497
      break;
×
1498
    }
1499
  }
1500

1501
  mndReleaseDb(pMnode, pDb);
×
1502
  pShow->numOfRows += numOfRows;
×
1503
  return numOfRows;
×
1504
}
1505

1506
// sma and tag index comm func
1507
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
×
1508
  int ret = mndProcessDropSmaReq(pReq);
×
1509
  if (ret == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST || ret == TSDB_CODE_MND_SMA_NOT_EXIST) {
×
1510
    terrno = 0;
×
1511
    ret = mndProcessDropTagIdxReq(pReq);
×
1512
  }
1513
  return ret;
×
1514
}
1515

1516
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1517
  if (pShow->pIter == NULL) {
×
1518
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
×
1519
  }
1520
  if (!pShow->pIter) {
×
1521
    return terrno;
×
1522
  }
1523
  int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
×
1524
  if (read < rows) {
×
1525
    read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read);
×
1526
  }
1527
  // no more to read
1528
  if (read < rows) {
×
1529
    taosMemoryFree(pShow->pIter);
×
1530
    pShow->pIter = NULL;
×
1531
  }
1532
  return read;
×
1533
}
1534
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
×
1535
  SSmaAndTagIter *p = pIter;
×
1536
  if (p != NULL) {
×
1537
    SSdb *pSdb = pMnode->pSdb;
×
1538
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
1539
    sdbCancelFetchByType(pSdb, p->pIdxIter, SDB_IDX);
×
1540
  }
1541
  taosMemoryFree(p);
×
1542
}
×
1543

1544
static void initSMAObj(SCreateTSMACxt *pCxt) {
×
1545
  memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
×
1546
  memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
×
1547
  memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
×
1548
  if (pCxt->pBaseSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pBaseSma->name, TSDB_TABLE_FNAME_LEN);
×
1549
  pCxt->pSma->createdTime = taosGetTimestampMs();
×
1550
  pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
×
1551

1552
  memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
1553
  pCxt->pSma->dstTbUid = 0;  // not used
×
1554
  pCxt->pSma->stbUid = pCxt->pSrcStb ? pCxt->pSrcStb->uid : pCxt->pCreateSmaReq->normSourceTbUid;
×
1555
  pCxt->pSma->dbUid = pCxt->pDb->uid;
×
1556
  pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
×
1557
  pCxt->pSma->intervalUnit = pCxt->pCreateSmaReq->intervalUnit;
×
1558
  //  pCxt->pSma->timezone = taosGetLocalTimezoneOffset();
1559
  pCxt->pSma->version = 1;
×
1560

1561
  pCxt->pSma->exprLen = pCxt->pCreateSmaReq->exprLen;
×
1562
  pCxt->pSma->sqlLen = pCxt->pCreateSmaReq->sqlLen;
×
1563
  pCxt->pSma->astLen = pCxt->pCreateSmaReq->astLen;
×
1564
  pCxt->pSma->expr = pCxt->pCreateSmaReq->expr;
×
1565
  pCxt->pSma->sql = pCxt->pCreateSmaReq->sql;
×
1566
  pCxt->pSma->ast = pCxt->pCreateSmaReq->ast;
×
1567
}
×
1568

1569
static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
×
1570
  tstrncpy(pCxt->pCreateStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
×
1571
  tstrncpy(pCxt->pCreateStreamReq->sourceDB, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
×
1572
  tstrncpy(pCxt->pCreateStreamReq->targetStbFullName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
1573
  pCxt->pCreateStreamReq->igExists = false;
×
1574
  pCxt->pCreateStreamReq->triggerType = STREAM_TRIGGER_MAX_DELAY;
×
1575
  pCxt->pCreateStreamReq->igExpired = false;
×
1576
  pCxt->pCreateStreamReq->fillHistory = STREAM_FILL_HISTORY_ON;
×
1577
  pCxt->pCreateStreamReq->maxDelay = 10000;
×
1578
  pCxt->pCreateStreamReq->watermark = 0;
×
1579
  pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb ? pCxt->pSrcStb->numOfTags + 1 : 1;
×
1580
  pCxt->pCreateStreamReq->checkpointFreq = 0;
×
1581
  pCxt->pCreateStreamReq->createStb = 1;
×
1582
  pCxt->pCreateStreamReq->targetStbUid = 0;
×
1583
  pCxt->pCreateStreamReq->fillNullCols = NULL;
×
1584
  pCxt->pCreateStreamReq->igUpdate = 0;
×
1585
  pCxt->pCreateStreamReq->deleteMark = pCxt->pCreateSmaReq->deleteMark;
×
1586
  pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
×
1587
  pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid;
×
1588
  pCxt->pCreateStreamReq->ast = taosStrdup(pCxt->pCreateSmaReq->ast);
×
1589
  if (!pCxt->pCreateStreamReq->ast) {
×
1590
    return terrno;
×
1591
  }
1592
  pCxt->pCreateStreamReq->sql = taosStrdup(pCxt->pCreateSmaReq->sql);
×
1593
  if (!pCxt->pCreateStreamReq->sql) {
×
1594
    return terrno;
×
1595
  }
1596

1597
  // construct tags
1598
  pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pCreateStreamReq->numOfTags, sizeof(SField));
×
1599
  if (!pCxt->pCreateStreamReq->pTags) {
×
1600
    return terrno;
×
1601
  }
1602
  SFieldWithOptions f = {0};
×
1603
  int32_t           code = 0;
×
1604
  if (pCxt->pSrcStb) {
×
1605
    for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) {
×
1606
      SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
×
1607
      f.bytes = pSchema->bytes;
×
1608
      f.type = pSchema->type;
×
1609
      f.flags = pSchema->flags;
×
1610
      tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN);
×
1611
      if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
×
1612
        code = terrno;
×
1613
        break;
×
1614
      }
1615
    }
1616
  }
1617

1618
  if (TSDB_CODE_SUCCESS == code) {
×
1619
    f.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
×
1620
    f.flags = COL_SMA_ON;
×
1621
    f.type = TSDB_DATA_TYPE_BINARY;
×
1622
    tstrncpy(f.name, "tbname", strlen("tbname") + 1);
×
1623
    if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
×
1624
      code = terrno;
×
1625
    }
1626
  }
1627

1628
  if (TSDB_CODE_SUCCESS == code) {
×
1629
    // construct output cols
1630
    SNode *pNode;
1631
    FOREACH(pNode, pCxt->pProjects) {
×
1632
      SExprNode *pExprNode = (SExprNode *)pNode;
×
1633
      f.bytes = pExprNode->resType.bytes;
×
1634
      f.type = pExprNode->resType.type;
×
1635
      f.flags = COL_SMA_ON;
×
1636
      tstrncpy(f.name, pExprNode->userAlias, TSDB_COL_NAME_LEN);
×
1637
      if (IS_DECIMAL_TYPE(f.type)) {
×
1638
        f.typeMod = decimalCalcTypeMod(pExprNode->resType.precision, pExprNode->resType.scale);
×
1639
        f.flags |= COL_HAS_TYPE_MOD;
×
1640
      }
1641
      if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pCols, &f)) {
×
1642
        code = terrno;
×
1643
        break;
×
1644
      }
1645
    }
1646
  }
1647
  return code;
×
1648
}
1649

1650
static int32_t mndCreateTSMABuildDropStreamReq(SCreateTSMACxt *pCxt) {
×
1651
  tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
×
1652
  pCxt->pDropStreamReq->igNotExists = false;
×
1653
  pCxt->pDropStreamReq->sql = taosStrdup(pCxt->pDropSmaReq->name);
×
1654
  if (!pCxt->pDropStreamReq->sql) {
×
1655
    return terrno;
×
1656
  }
1657
  pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
×
1658
  return TSDB_CODE_SUCCESS;
×
1659
}
1660

1661
static int32_t mndSetUpdateDbTsmaVersionPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
×
1662
  int32_t  code = 0;
×
1663
  SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
×
1664
  if (pRedoRaw == NULL) {
×
1665
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1666
    if (terrno != 0) code = terrno;
×
1667
    TAOS_RETURN(code);
×
1668
  }
1669
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
×
1670
    sdbFreeRaw(pRedoRaw);
×
1671
    TAOS_RETURN(code);
×
1672
  }
1673

1674
  TAOS_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
×
1675
}
1676

1677
static int32_t mndSetUpdateDbTsmaVersionCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
×
1678
  int32_t  code = 0;
×
1679
  SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
×
1680
  if (pCommitRaw == NULL) {
×
1681
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1682
    if (terrno != 0) code = terrno;
×
1683
    TAOS_RETURN(code);
×
1684
  }
1685
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
1686
    sdbFreeRaw(pCommitRaw);
×
1687
    TAOS_RETURN(code);
×
1688
  }
1689

1690
  TAOS_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
1691
}
1692

1693
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt *pCxt) {
×
1694
  int32_t      code = -1;
×
1695
  STransAction createStreamRedoAction = {0};
×
1696
  STransAction createStreamUndoAction = {0};
×
1697
  STransAction dropStbUndoAction = {0};
×
1698
  SMDropStbReq dropStbReq = {0};
×
1699
  STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
×
1700
  if (!pTrans) {
×
1701
    code = terrno;
×
1702
    goto _OVER;
×
1703
  }
1704
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
×
1705
  TAOS_CHECK_GOTO(mndTransCheckConflict(pCxt->pMnode, pTrans), NULL, _OVER);
×
1706

1707
  mndTransSetSerial(pTrans);
×
1708
  mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name,
×
1709
        pCxt->pCreateStreamReq->name);
1710

1711
  mndGetMnodeEpSet(pCxt->pMnode, &createStreamRedoAction.epSet);
×
1712
  createStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
1713
  createStreamRedoAction.msgType = TDMT_STREAM_CREATE;
×
1714
  createStreamRedoAction.contLen = tSerializeSCMCreateStreamReq(0, 0, pCxt->pCreateStreamReq);
×
1715
  createStreamRedoAction.pCont = taosMemoryCalloc(1, createStreamRedoAction.contLen);
×
1716
  if (!createStreamRedoAction.pCont) {
×
1717
    code = terrno;
×
1718
    goto _OVER;
×
1719
  }
1720
  if (createStreamRedoAction.contLen != tSerializeSCMCreateStreamReq(createStreamRedoAction.pCont,
×
1721
                                                                     createStreamRedoAction.contLen,
1722
                                                                     pCxt->pCreateStreamReq)) {
×
1723
    mError("sma: %s, failed to create due to create stream req encode failure", pCxt->pCreateSmaReq->name);
×
1724
    code = TSDB_CODE_INVALID_MSG;
×
1725
    goto _OVER;
×
1726
  }
1727

1728
  createStreamUndoAction.epSet = createStreamRedoAction.epSet;
×
1729
  createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
1730
  createStreamUndoAction.msgType = TDMT_STREAM_DROP;
×
1731
  createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
×
1732
  createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
×
1733
  if (!createStreamUndoAction.pCont) {
×
1734
    code = terrno;
×
1735
    goto _OVER;
×
1736
  }
1737
  if (createStreamUndoAction.contLen !=
×
1738
      tSerializeSMDropStreamReq(createStreamUndoAction.pCont, createStreamUndoAction.contLen, pCxt->pDropStreamReq)) {
×
1739
    mError("sma: %s, failed to create due to drop stream req encode failure", pCxt->pCreateSmaReq->name);
×
1740
    code = TSDB_CODE_INVALID_MSG;
×
1741
    goto _OVER;
×
1742
  }
1743

1744
  dropStbReq.igNotExists = true;
×
1745
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
1746
  dropStbUndoAction.epSet = createStreamRedoAction.epSet;
×
1747
  dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
×
1748
  dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
1749
  dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
×
1750
  dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
×
1751
  dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
×
1752
  if (!dropStbUndoAction.pCont) {
×
1753
    code = terrno;
×
1754
    goto _OVER;
×
1755
  }
1756
  if (dropStbUndoAction.contLen !=
×
1757
      tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
×
1758
    mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
×
1759
    code = TSDB_CODE_INVALID_MSG;
×
1760
    goto _OVER;
×
1761
  }
1762

1763
  SDbObj newDb = {0};
×
1764
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
×
1765
  newDb.tsmaVersion++;
×
1766
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1767
  TAOS_CHECK_GOTO(mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1768
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1769
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &createStreamRedoAction), NULL, _OVER);
×
1770
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &createStreamUndoAction), NULL, _OVER);
×
1771
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &dropStbUndoAction), NULL, _OVER);
×
1772
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1773
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1774
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
×
1775

1776
  code = TSDB_CODE_SUCCESS;
×
1777

1778
_OVER:
×
1779
  mndTransDrop(pTrans);
×
1780
  TAOS_RETURN(code);
×
1781
}
1782

1783
static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
×
1784
  int32_t            code = 0;
×
1785
  SSmaObj            sma = {0};
×
1786
  SCMCreateStreamReq createStreamReq = {0};
×
1787
  SMDropStreamReq    dropStreamReq = {0};
×
1788

1789
  pCxt->pSma = &sma;
×
1790
  initSMAObj(pCxt);
×
1791

1792
  SNodeList *pProjects = NULL;
×
1793
  code = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects);
×
1794
  if (TSDB_CODE_SUCCESS != code) {
×
1795
    goto _OVER;
×
1796
  }
1797
  pCxt->pProjects = pProjects;
×
1798

1799
  pCxt->pCreateStreamReq = &createStreamReq;
×
1800
  if (pCxt->pCreateSmaReq->pVgroupVerList) {
×
1801
    pCxt->pCreateStreamReq->pVgroupVerList = taosArrayDup(pCxt->pCreateSmaReq->pVgroupVerList, NULL);
×
1802
    if (!pCxt->pCreateStreamReq->pVgroupVerList) {
×
1803
      code = terrno;
×
1804
      goto _OVER;
×
1805
    }
1806
  }
1807
  if (LIST_LENGTH(pProjects) > 0) {
×
1808
    createStreamReq.pCols = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SFieldWithOptions));
×
1809
    if (!createStreamReq.pCols) {
×
1810
      code = terrno;
×
1811
      goto _OVER;
×
1812
    }
1813
  }
1814
  pCxt->pDropStreamReq = &dropStreamReq;
×
1815
  code = mndCreateTSMABuildCreateStreamReq(pCxt);
×
1816
  if (TSDB_CODE_SUCCESS != code) {
×
1817
    goto _OVER;
×
1818
  }
1819
  code = mndCreateTSMABuildDropStreamReq(pCxt);
×
1820
  if (TSDB_CODE_SUCCESS != code) {
×
1821
    goto _OVER;
×
1822
  }
1823

1824
  if (TSDB_CODE_SUCCESS != (code = mndCreateTSMATxnPrepare(pCxt))) {
×
1825
    goto _OVER;
×
1826
  } else {
1827
    mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 " dstTb:%s dstVg:%d", pCxt->pCreateSmaReq->name, sma.uid,
×
1828
          sma.stbUid, sma.dstTbName, sma.dstVgId);
1829
    code = 0;
×
1830
  }
1831

1832
_OVER:
×
1833
  tFreeSCMCreateStreamReq(pCxt->pCreateStreamReq);
×
1834
  if (pCxt->pDropStreamReq) tFreeMDropStreamReq(pCxt->pDropStreamReq);
×
1835
  pCxt->pCreateStreamReq = NULL;
×
1836
  if (pProjects) nodesDestroyList(pProjects);
×
1837
  pCxt->pProjects = NULL;
×
1838
  TAOS_RETURN(code);
×
1839
}
1840

1841
static int32_t mndTSMAGenerateOutputName(const char *tsmaName, char *streamName, char *targetStbName) {
×
1842
  SName   smaName;
1843
  int32_t code = tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1844
  if (TSDB_CODE_SUCCESS != code) {
×
1845
    return code;
×
1846
  }
1847
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
×
1848
  snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s" TSMA_RES_STB_POSTFIX, tsmaName);
×
1849
  return TSDB_CODE_SUCCESS;
×
1850
}
1851

1852
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq) {
×
1853
#ifdef WINDOWS
1854
  TAOS_RETURN(TSDB_CODE_MND_INVALID_PLATFORM);
1855
#endif
1856
  SMnode        *pMnode = pReq->info.node;
×
1857
  int32_t        code = -1;
×
1858
  SDbObj        *pDb = NULL;
×
1859
  SStbObj       *pStb = NULL;
×
1860
  SSmaObj       *pSma = NULL;
×
1861
  SSmaObj       *pBaseTsma = NULL;
×
1862
  SStreamObj    *pStream = NULL;
×
1863
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
×
1864
  SMCreateSmaReq createReq = {0};
×
1865

1866
  if (sdbGetSize(pMnode->pSdb, SDB_SMA) >= tsMaxTsmaNum) {
×
1867
    code = TSDB_CODE_MND_MAX_TSMA_NUM_EXCEEDED;
×
1868
    goto _OVER;
×
1869
  }
1870

1871
  if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
×
1872
    code = TSDB_CODE_INVALID_MSG;
×
1873
    goto _OVER;
×
1874
  }
1875

1876
  mInfo("start to create tsma: %s", createReq.name);
×
1877
  if ((code = mndCheckCreateSmaReq(&createReq)) != 0) goto _OVER;
×
1878

1879
  if (createReq.normSourceTbUid == 0) {
×
1880
    pStb = mndAcquireStb(pMnode, createReq.stb);
×
1881
    if (!pStb && !createReq.recursiveTsma) {
×
1882
      mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
1883
      code = TSDB_CODE_MND_STB_NOT_EXIST;
×
1884
      goto _OVER;
×
1885
    }
1886
  }
1887

1888
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1889
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
×
1890
  code = mndTSMAGenerateOutputName(createReq.name, streamName, streamTargetStbFullName);
×
1891
  if (TSDB_CODE_SUCCESS != code) {
×
1892
    mInfo("tsma:%s, faield to generate name", createReq.name);
×
1893
    goto _OVER;
×
1894
  }
1895

1896
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.name);
×
1897
  if (pSma && createReq.igExists) {
×
1898
    mInfo("tsma:%s, already exists in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
1899
    code = 0;
×
1900
    goto _OVER;
×
1901
  }
1902

1903
  if (pSma) {
×
1904
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
1905
    goto _OVER;
×
1906
  }
1907

1908
  SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName);
×
1909
  if (pTargetStb) {
×
1910
    code = TSDB_CODE_TDB_STB_ALREADY_EXIST;
×
1911
    mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name,
×
1912
           streamTargetStbFullName);
1913
    goto _OVER;
×
1914
  }
1915

1916
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
1917
  if (pStream != NULL || code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
×
1918
    mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
×
1919
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
1920
    goto _OVER;
×
1921
  }
1922

1923
  SName name = {0};
×
1924
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1925
  if (TSDB_CODE_SUCCESS != code) {
×
1926
    goto _OVER;
×
1927
  }
1928
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
1929
  (void)tNameGetFullDbName(&name, db);
×
1930

1931
  pDb = mndAcquireDb(pMnode, db);
×
1932
  if (pDb == NULL) {
×
1933
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1934
    goto _OVER;
×
1935
  }
1936

1937
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
1938

1939
  if (createReq.recursiveTsma) {
×
1940
    pBaseTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName);
×
1941
    if (!pBaseTsma) {
×
1942
      mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName);
×
1943
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1944
      goto _OVER;
×
1945
    }
1946
    if (!pStb) {
×
1947
      createReq.normSourceTbUid = pBaseTsma->stbUid;
×
1948
    }
1949
  }
1950

1951
  SCreateTSMACxt cxt = {
×
1952
      .pMnode = pMnode,
1953
      .pCreateSmaReq = &createReq,
1954
      .pCreateStreamReq = NULL,
1955
      .streamName = streamName,
1956
      .targetStbFullName = streamTargetStbFullName,
1957
      .pDb = pDb,
1958
      .pRpcReq = pReq,
1959
      .pSma = NULL,
1960
      .pBaseSma = pBaseTsma,
1961
      .pSrcStb = pStb,
1962
  };
1963

1964
  code = mndCreateTSMA(&cxt);
×
1965
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1966

1967
_OVER:
×
1968
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1969
    mError("tsma:%s, failed to create since %s", createReq.name, tstrerror(code));
×
1970
  }
1971

1972
  if (pStb) mndReleaseStb(pMnode, pStb);
×
1973
  if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
1974
  mndReleaseSma(pMnode, pSma);
×
1975
  mndReleaseStream(pMnode, pStream);
×
1976
  mndReleaseDb(pMnode, pDb);
×
1977
  tFreeSMCreateSmaReq(&createReq);
×
1978

1979
  TAOS_RETURN(code);
×
1980
}
1981

1982
static int32_t mndDropTSMA(SCreateTSMACxt *pCxt) {
×
1983
  int32_t      code = -1;
×
1984
  STransAction dropStreamRedoAction = {0};
×
1985
  STrans      *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "drop-tsma");
×
1986
  if (!pTrans) {
×
1987
    code = terrno;
×
1988
    goto _OVER;
×
1989
  }
1990
  SMDropStreamReq dropStreamReq = {0};
×
1991
  pCxt->pDropStreamReq = &dropStreamReq;
×
1992
  code = mndCreateTSMABuildDropStreamReq(pCxt);
×
1993
  if (TSDB_CODE_SUCCESS != code) {
×
1994
    goto _OVER;
×
1995
  }
1996
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
×
1997
  if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
×
1998
  mndTransSetSerial(pTrans);
×
1999
  mndGetMnodeEpSet(pCxt->pMnode, &dropStreamRedoAction.epSet);
×
2000
  dropStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
2001
  dropStreamRedoAction.msgType = TDMT_STREAM_DROP;
×
2002
  dropStreamRedoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
×
2003
  dropStreamRedoAction.pCont = taosMemoryCalloc(1, dropStreamRedoAction.contLen);
×
2004
  if (!dropStreamRedoAction.pCont) {
×
2005
    code = terrno;
×
2006
    goto _OVER;
×
2007
  }
2008
  if (dropStreamRedoAction.contLen !=
×
2009
      tSerializeSMDropStreamReq(dropStreamRedoAction.pCont, dropStreamRedoAction.contLen, pCxt->pDropStreamReq)) {
×
2010
    mError("tsma: %s, failed to drop due to drop stream req encode failure", pCxt->pDropSmaReq->name);
×
2011
    code = TSDB_CODE_INVALID_MSG;
×
2012
    goto _OVER;
×
2013
  }
2014

2015
  // output stable is not dropped when dropping stream, dropping it when dropping tsma
2016
  SMDropStbReq dropStbReq = {0};
×
2017
  dropStbReq.igNotExists = false;
×
2018
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
2019
  dropStbReq.sql = "drop";
×
2020
  dropStbReq.sqlLen = 5;
×
2021

2022
  STransAction dropStbRedoAction = {0};
×
2023
  mndGetMnodeEpSet(pCxt->pMnode, &dropStbRedoAction.epSet);
×
2024
  dropStbRedoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
×
2025
  dropStbRedoAction.msgType = TDMT_MND_STB_DROP;
×
2026
  dropStbRedoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
×
2027
  dropStbRedoAction.pCont = taosMemoryCalloc(1, dropStbRedoAction.contLen);
×
2028
  if (!dropStbRedoAction.pCont) {
×
2029
    code = terrno;
×
2030
    goto _OVER;
×
2031
  }
2032
  if (dropStbRedoAction.contLen !=
×
2033
      tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
×
2034
    mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name);
×
2035
    code = TSDB_CODE_INVALID_MSG;
×
2036
    goto _OVER;
×
2037
  }
2038

2039
  SDbObj newDb = {0};
×
2040
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
×
2041
  newDb.tsmaVersion++;
×
2042
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
2043
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
2044
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStreamRedoAction), NULL, _OVER);
×
2045
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStbRedoAction), NULL, _OVER);
×
2046
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
2047
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
2048
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
×
2049
  code = TSDB_CODE_SUCCESS;
×
2050
_OVER:
×
2051
  tFreeMDropStreamReq(pCxt->pDropStreamReq);
×
2052
  mndTransDrop(pTrans);
×
2053
  TAOS_RETURN(code);
×
2054
}
2055

2056
static bool hasRecursiveTsmasBasedOnMe(SMnode *pMnode, const SSmaObj *pSma) {
×
2057
  SSmaObj *pSmaObj = NULL;
×
2058
  void    *pIter = NULL;
×
2059
  while (1) {
2060
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj);
×
2061
    if (pIter == NULL) break;
×
2062
    if (0 == strncmp(pSmaObj->baseSmaName, pSma->name, TSDB_TABLE_FNAME_LEN)) {
×
2063
      sdbRelease(pMnode->pSdb, pSmaObj);
×
2064
      sdbCancelFetch(pMnode->pSdb, pIter);
×
2065
      return true;
×
2066
    }
2067
    sdbRelease(pMnode->pSdb, pSmaObj);
×
2068
  }
2069
  return false;
×
2070
}
2071

2072
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq) {
×
2073
  int32_t      code = -1;
×
2074
  SMDropSmaReq dropReq = {0};
×
2075
  SSmaObj     *pSma = NULL;
×
2076
  SDbObj      *pDb = NULL;
×
2077
  SMnode      *pMnode = pReq->info.node;
×
2078
  if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) {
×
2079
    code = TSDB_CODE_INVALID_MSG;
×
2080
    goto _OVER;
×
2081
  }
2082

2083
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
2084
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
×
2085
  code = mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
×
2086
  if (TSDB_CODE_SUCCESS != code) {
×
2087
    goto _OVER;
×
2088
  }
2089

2090
  SStbObj *pStb = mndAcquireStb(pMnode, streamTargetStbFullName);
×
2091

2092
  pSma = mndAcquireSma(pMnode, dropReq.name);
×
2093
  if (!pSma && dropReq.igNotExists) {
×
2094
    code = 0;
×
2095
    goto _OVER;
×
2096
  }
2097
  if (!pSma) {
×
2098
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
2099
    goto _OVER;
×
2100
  }
2101
  SName name = {0};
×
2102
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2103
  if (TSDB_CODE_SUCCESS != code) {
×
2104
    goto _OVER;
×
2105
  }
2106
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
2107
  (void)tNameGetFullDbName(&name, db);
×
2108

2109
  pDb = mndAcquireDb(pMnode, db);
×
2110
  if (!pDb) {
×
2111
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
2112
    goto _OVER;
×
2113
  }
2114

2115
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
×
2116
    goto _OVER;
×
2117
  }
2118

2119
  if (hasRecursiveTsmasBasedOnMe(pMnode, pSma)) {
×
2120
    code = TSDB_CODE_MND_INVALID_DROP_TSMA;
×
2121
    goto _OVER;
×
2122
  }
2123

2124
  SCreateTSMACxt cxt = {
×
2125
      .pDb = pDb,
2126
      .pMnode = pMnode,
2127
      .pRpcReq = pReq,
2128
      .pSma = pSma,
2129
      .streamName = streamName,
2130
      .targetStbFullName = streamTargetStbFullName,
2131
      .pDropSmaReq = &dropReq,
2132
  };
2133

2134
  code = mndDropTSMA(&cxt);
×
2135

2136
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
2137
_OVER:
×
2138

2139
  mndReleaseStb(pMnode, pStb);
×
2140
  mndReleaseSma(pMnode, pSma);
×
2141
  mndReleaseDb(pMnode, pDb);
×
2142
  TAOS_RETURN(code);
×
2143
}
2144

2145
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
2146
  SDbObj          *pDb = NULL;
×
2147
  int32_t          numOfRows = 0;
×
2148
  SSmaObj         *pSma = NULL;
×
2149
  SMnode          *pMnode = pReq->info.node;
×
2150
  int32_t          code = 0;
×
2151
  SColumnInfoData *pColInfo;
2152
  if (pShow->pIter == NULL) {
×
2153
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
×
2154
  }
2155
  if (!pShow->pIter) {
×
2156
    return terrno;
×
2157
  }
2158
  if (pShow->db[0]) {
×
2159
    pDb = mndAcquireDb(pMnode, pShow->db);
×
2160
  }
2161
  SSmaAndTagIter *pIter = pShow->pIter;
×
2162
  while (numOfRows < rows) {
×
2163
    pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
×
2164
    if (pIter->pSmaIter == NULL) break;
×
2165
    SDbObj *pSrcDb = mndAcquireDb(pMnode, pSma->db);
×
2166

2167
    if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) {
×
2168
      sdbRelease(pMnode->pSdb, pSma);
×
2169
      if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
2170
      continue;
×
2171
    }
2172

2173
    int32_t cols = 0;
×
2174
    SName   n = {0};
×
2175

2176
    code = tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2177
    char smaName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
2178
    if (TSDB_CODE_SUCCESS == code) {
×
2179
      STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n));
×
2180
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2181
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
2182
    }
2183

2184
    char db[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
2185
    if (TSDB_CODE_SUCCESS == code) {
×
2186
      STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
×
2187
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2188
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
2189
    }
2190

2191
    if (TSDB_CODE_SUCCESS == code) {
×
2192
      code = tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2193
    }
2194
    char srcTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
2195
    if (TSDB_CODE_SUCCESS == code) {
×
2196
      STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
×
2197
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2198
      code = colDataSetVal(pColInfo, numOfRows, (const char *)srcTb, false);
×
2199
    }
2200

2201
    if (TSDB_CODE_SUCCESS == code) {
×
2202
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2203
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
2204
    }
2205

2206
    if (TSDB_CODE_SUCCESS == code) {
×
2207
      code = tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2208
    }
2209

2210
    if (TSDB_CODE_SUCCESS == code) {
×
2211
      char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
2212
      STR_TO_VARSTR(targetTb, (char *)tNameGetTableName(&n));
×
2213
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2214
      code = colDataSetVal(pColInfo, numOfRows, (const char *)targetTb, false);
×
2215
    }
2216

2217
    if (TSDB_CODE_SUCCESS == code) {
×
2218
      // stream name
2219
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2220
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
2221
    }
2222

2223
    if (TSDB_CODE_SUCCESS == code) {
×
2224
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2225
      code = colDataSetVal(pColInfo, numOfRows, (const char *)(&pSma->createdTime), false);
×
2226
    }
2227

2228
    // interval
2229
    char    interval[64 + VARSTR_HEADER_SIZE] = {0};
×
2230
    int32_t len = 0;
×
2231
    if (TSDB_CODE_SUCCESS == code) {
×
2232
      if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
×
2233
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
×
2234
                        getPrecisionUnit(pSrcDb->cfg.precision));
×
2235
      } else {
2236
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
×
2237
      }
2238
      varDataSetLen(interval, len);
×
2239
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2240
      code = colDataSetVal(pColInfo, numOfRows, interval, false);
×
2241
    }
2242

2243
    char buf[TSDB_MAX_SAVED_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
2244
    if (TSDB_CODE_SUCCESS == code) {
×
2245
      // create sql
2246
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2247
      len = tsnprintf(buf + VARSTR_HEADER_SIZE, TSDB_MAX_SAVED_SQL_LEN, "%s", pSma->sql);
×
2248
      varDataSetLen(buf, TMIN(len, TSDB_MAX_SAVED_SQL_LEN));
×
2249
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
2250
    }
2251

2252
    // func list
2253
    len = 0;
×
2254
    SNode *pNode = NULL, *pFunc = NULL;
×
2255
    if (TSDB_CODE_SUCCESS == code) {
×
2256
      code = nodesStringToNode(pSma->ast, &pNode);
×
2257
    }
2258
    if (TSDB_CODE_SUCCESS == code) {
×
2259
      char *start = buf + VARSTR_HEADER_SIZE;
×
2260
      FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) {
×
2261
        if (nodeType(pFunc) == QUERY_NODE_FUNCTION) {
×
2262
          SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
×
2263
          if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
2264
          len += tsnprintf(start, TSDB_MAX_SAVED_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "",
×
2265
                           ((SExprNode *)pFunc)->userAlias);
×
2266
          if (len >= TSDB_MAX_SAVED_SQL_LEN) {
×
2267
            len = TSDB_MAX_SAVED_SQL_LEN;
×
2268
            break;
×
2269
          }
2270
          start = buf + VARSTR_HEADER_SIZE + len;
×
2271
        }
2272
      }
2273
      nodesDestroyNode(pNode);
×
2274
    }
2275

2276
    if (TSDB_CODE_SUCCESS == code) {
×
2277
      varDataSetLen(buf, len);
×
2278
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2279
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
2280
    }
2281

2282
    numOfRows++;
×
2283
    mndReleaseSma(pMnode, pSma);
×
2284
    mndReleaseDb(pMnode, pSrcDb);
×
2285
    if (TSDB_CODE_SUCCESS != code) {
×
2286
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
2287
      numOfRows = code;
×
2288
      break;
×
2289
    }
2290
  }
2291
  mndReleaseDb(pMnode, pDb);
×
2292
  pShow->numOfRows += numOfRows;
×
2293
  if (numOfRows < rows) {
×
2294
    taosMemoryFree(pShow->pIter);
×
2295
    pShow->pIter = NULL;
×
2296
  }
2297
  return numOfRows;
×
2298
}
2299

2300
static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
×
2301
  SSmaAndTagIter *p = pIter;
×
2302
  if (p != NULL) {
×
2303
    SSdb *pSdb = pMnode->pSdb;
×
2304
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
2305
  }
2306
  taosMemoryFree(p);
×
2307
}
×
2308

2309
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj *pSma, const SStbObj *pDestStb, STableTSMAInfo *pInfo,
×
2310
                               const SSmaObj *pBaseTsma) {
2311
  int32_t code = 0;
×
2312
  pInfo->interval = pSma->interval;
×
2313
  pInfo->unit = pSma->intervalUnit;
×
2314
  pInfo->tsmaId = pSma->uid;
×
2315
  pInfo->version = pSma->version;
×
2316
  pInfo->tsmaId = pSma->uid;
×
2317
  pInfo->destTbUid = pDestStb->uid;
×
2318
  SName sName = {0};
×
2319
  code = tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2320
  if (TSDB_CODE_SUCCESS != code) {
×
2321
    return code;
×
2322
  }
2323
  tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN);
×
2324
  tstrncpy(pInfo->targetDbFName, pSma->db, TSDB_DB_FNAME_LEN);
×
2325
  code = tNameFromString(&sName, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2326
  if (TSDB_CODE_SUCCESS != code) {
×
2327
    return code;
×
2328
  }
2329
  tstrncpy(pInfo->targetTb, sName.tname, TSDB_TABLE_NAME_LEN);
×
2330
  tstrncpy(pInfo->dbFName, pSma->db, TSDB_DB_FNAME_LEN);
×
2331
  code = tNameFromString(&sName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2332
  if (TSDB_CODE_SUCCESS != code) {
×
2333
    return code;
×
2334
  }
2335
  tstrncpy(pInfo->tb, sName.tname, TSDB_TABLE_NAME_LEN);
×
2336
  pInfo->pFuncs = taosArrayInit(8, sizeof(STableTSMAFuncInfo));
×
2337
  if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY;
×
2338

2339
  SNode *pNode, *pFunc;
2340
  if (TSDB_CODE_SUCCESS != nodesStringToNode(pBaseTsma ? pBaseTsma->ast : pSma->ast, &pNode)) {
×
2341
    taosArrayDestroy(pInfo->pFuncs);
×
2342
    pInfo->pFuncs = NULL;
×
2343
    return TSDB_CODE_TSMA_INVALID_STAT;
×
2344
  }
2345
  if (pNode) {
×
2346
    SSelectStmt *pSelect = (SSelectStmt *)pNode;
×
2347
    FOREACH(pFunc, pSelect->pProjectionList) {
×
2348
      STableTSMAFuncInfo funcInfo = {0};
×
2349
      SFunctionNode     *pFuncNode = (SFunctionNode *)pFunc;
×
2350
      if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
2351
      funcInfo.funcId = pFuncNode->funcId;
×
2352
      funcInfo.colId = ((SColumnNode *)pFuncNode->pParameterList->pHead->pNode)->colId;
×
2353
      if (!taosArrayPush(pInfo->pFuncs, &funcInfo)) {
×
2354
        code = terrno;
×
2355
        taosArrayDestroy(pInfo->pFuncs);
×
2356
        nodesDestroyNode(pNode);
×
2357
        return code;
×
2358
      }
2359
    }
2360
    nodesDestroyNode(pNode);
×
2361
  }
2362
  pInfo->ast = taosStrdup(pSma->ast);
×
2363
  if (!pInfo->ast) code = terrno;
×
2364

2365
  if (code == TSDB_CODE_SUCCESS && pDestStb->numOfTags > 0) {
×
2366
    pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema));
×
2367
    if (!pInfo->pTags) {
×
2368
      code = terrno;
×
2369
    } else {
2370
      for (int32_t i = 0; i < pDestStb->numOfTags; ++i) {
×
2371
        if (NULL == taosArrayPush(pInfo->pTags, &pDestStb->pTags[i])) {
×
2372
          code = terrno;
×
2373
          break;
×
2374
        }
2375
      }
2376
    }
2377
  }
2378
  if (code == TSDB_CODE_SUCCESS) {
×
2379
    pInfo->pUsedCols = taosArrayInit(pDestStb->numOfColumns - 3, sizeof(SSchema));
×
2380
    if (!pInfo->pUsedCols)
×
2381
      code = terrno;
×
2382
    else {
2383
      // skip _wstart, _wend, _duration
2384
      for (int32_t i = 1; i < pDestStb->numOfColumns - 2; ++i) {
×
2385
        if (NULL == taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i])) {
×
2386
          code = terrno;
×
2387
          break;
×
2388
        }
2389
      }
2390
    }
2391
  }
2392
  TAOS_RETURN(code);
×
2393
}
2394

2395
// @note remember to mndReleaseSma(*ppOut)
2396
static int32_t mndGetDeepestBaseForTsma(SMnode *pMnode, SSmaObj *pSma, SSmaObj **ppOut) {
×
2397
  int32_t  code = 0;
×
2398
  SSmaObj *pRecursiveTsma = NULL;
×
2399
  if (pSma->baseSmaName[0]) {
×
2400
    pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName);
×
2401
    if (!pRecursiveTsma) {
×
2402
      mError("base tsma: %s for tsma: %s not found", pSma->baseSmaName, pSma->name);
×
2403
      return TSDB_CODE_MND_SMA_NOT_EXIST;
×
2404
    }
2405
    while (pRecursiveTsma->baseSmaName[0]) {
×
2406
      SSmaObj *pTmpSma = pRecursiveTsma;
×
2407
      pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
×
2408
      if (!pRecursiveTsma) {
×
2409
        mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name);
×
2410
        mndReleaseSma(pMnode, pTmpSma);
×
2411
        return TSDB_CODE_MND_SMA_NOT_EXIST;
×
2412
      }
2413
      mndReleaseSma(pMnode, pTmpSma);
×
2414
    }
2415
  }
2416
  *ppOut = pRecursiveTsma;
×
2417
  return code;
×
2418
}
2419

2420
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
×
2421
  int32_t  code = -1;
×
2422
  SSmaObj *pSma = NULL;
×
2423
  SSmaObj *pBaseTsma = NULL;
×
2424
  SStbObj *pDstStb = NULL;
×
2425

2426
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName);
×
2427
  if (pSma) {
×
2428
    pDstStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
2429
    if (!pDstStb) {
×
2430
      sdbRelease(pMnode->pSdb, pSma);
×
2431
      return TSDB_CODE_SUCCESS;
×
2432
    }
2433

2434
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2435
    if (!pTsma) {
×
2436
      code = terrno;
×
2437
      sdbRelease(pMnode->pSdb, pSma);
×
2438
      mndReleaseStb(pMnode, pDstStb);
×
2439
      TAOS_RETURN(code);
×
2440
    }
2441

2442
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
×
2443
    if (code == 0) {
×
2444
      code = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma, pBaseTsma);
×
2445
    }
2446
    mndReleaseStb(pMnode, pDstStb);
×
2447
    sdbRelease(pMnode->pSdb, pSma);
×
2448
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
2449
    if (terrno) {
×
2450
      tFreeAndClearTableTSMAInfo(pTsma);
×
2451
      TAOS_RETURN(code);
×
2452
    }
2453
    if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
×
2454
      code = terrno;
×
2455
      tFreeAndClearTableTSMAInfo(pTsma);
×
2456
    }
2457
    *exist = true;
×
2458
  }
2459
  TAOS_RETURN(code);
×
2460
}
2461

2462
typedef bool (*tsmaFilter)(const SSmaObj *pSma, void *param);
2463

2464
static int32_t mndGetSomeTsmas(SMnode *pMnode, STableTSMAInfoRsp *pRsp, tsmaFilter filtered, void *param, bool *exist) {
×
2465
  int32_t     code = 0;
×
2466
  SSmaObj    *pSma = NULL;
×
2467
  SSmaObj    *pBaseTsma = NULL;
×
2468
  SSdb       *pSdb = pMnode->pSdb;
×
2469
  void       *pIter = NULL;
×
2470
  SStreamObj *pStream = NULL;
×
2471
  SStbObj    *pStb = NULL;
×
2472
  bool        shouldRetry = false;
×
2473

2474
  while (1) {
×
2475
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
×
2476
    if (pIter == NULL) break;
×
2477

2478
    if (filtered(pSma, param)) {
×
2479
      sdbRelease(pSdb, pSma);
×
2480
      continue;
×
2481
    }
2482

2483
    pStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
2484
    if (!pStb) {
×
2485
      sdbRelease(pSdb, pSma);
×
2486
      shouldRetry = true;
×
2487
      continue;
×
2488
    }
2489

2490
    SName smaName;
2491
    char  streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
2492
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2493
    if (TSDB_CODE_SUCCESS != code) {
×
2494
      sdbRelease(pSdb, pSma);
×
2495
      mndReleaseStb(pMnode, pStb);
×
2496
      TAOS_RETURN(code);
×
2497
    }
2498
    snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
×
2499
    pStream = NULL;
×
2500

2501
    code = mndAcquireStream(pMnode, streamName, &pStream);
×
2502
    if (!pStream) {
×
2503
      shouldRetry = true;
×
2504
      sdbRelease(pSdb, pSma);
×
2505
      mndReleaseStb(pMnode, pStb);
×
2506
      continue;
×
2507
    }
2508
    if (code != 0) {
×
2509
      sdbRelease(pSdb, pSma);
×
2510
      mndReleaseStb(pMnode, pStb);
×
2511
      TAOS_RETURN(code);
×
2512
    }
2513

2514
    int64_t streamId = pStream->uid;
×
2515
    mndReleaseStream(pMnode, pStream);
×
2516

2517
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2518
    if (!pTsma) {
×
2519
      code = terrno;
×
2520
      mndReleaseStb(pMnode, pStb);
×
2521
      sdbRelease(pSdb, pSma);
×
2522
      sdbCancelFetch(pSdb, pIter);
×
2523
      TAOS_RETURN(code);
×
2524
    }
2525
    pTsma->streamUid = streamId;
×
2526

2527
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
×
2528
    if (code == 0) {
×
2529
      code = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma, pBaseTsma);
×
2530
    }
2531
    mndReleaseStb(pMnode, pStb);
×
2532
    sdbRelease(pSdb, pSma);
×
2533
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
2534
    if (terrno) {
×
2535
      tFreeAndClearTableTSMAInfo(pTsma);
×
2536
      sdbCancelFetch(pSdb, pIter);
×
2537
      TAOS_RETURN(code);
×
2538
    }
2539
    if (NULL == taosArrayPush(pRsp->pTsmas, &pTsma)) {
×
2540
      code = terrno;
×
2541
      tFreeAndClearTableTSMAInfo(pTsma);
×
2542
      sdbCancelFetch(pSdb, pIter);
×
2543
      TAOS_RETURN(code);
×
2544
    }
2545
    *exist = true;
×
2546
  }
2547
  if (shouldRetry) {
×
2548
    return TSDB_CODE_NEED_RETRY;
×
2549
  }
2550
  return TSDB_CODE_SUCCESS;
×
2551
}
2552

2553
static bool tsmaTbFilter(const SSmaObj *pSma, void *param) {
×
2554
  const char *tbFName = param;
×
2555
  return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0;
×
2556
}
2557

2558
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *pRsp, bool *exist) {
×
2559
  return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist);
×
2560
}
2561

2562
static bool tsmaDbFilter(const SSmaObj *pSma, void *param) {
×
2563
  uint64_t *dbUid = param;
×
2564
  return pSma->dbUid != *dbUid;
×
2565
}
2566

2567
int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist) {
×
2568
  return mndGetSomeTsmas(pMnode, pRsp, tsmaDbFilter, &dbUid, exist);
×
2569
}
2570

2571
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
×
2572
  STableTSMAInfoRsp rsp = {0};
×
2573
  int32_t           code = -1;
×
2574
  STableTSMAInfoReq tsmaReq = {0};
×
2575
  bool              exist = false;
×
2576
  SMnode           *pMnode = pReq->info.node;
×
2577

2578
  TAOS_CHECK_GOTO(tDeserializeTableTSMAInfoReq(pReq->pCont, pReq->contLen, &tsmaReq), NULL, _OVER);
×
2579

2580
  rsp.pTsmas = taosArrayInit(4, POINTER_BYTES);
×
2581
  if (NULL == rsp.pTsmas) {
×
2582
    code = terrno;
×
2583
    goto _OVER;
×
2584
  }
2585

2586
  if (tsmaReq.fetchingWithTsmaName) {
×
2587
    code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
×
2588
  } else {
2589
    code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
×
2590
    if (TSDB_CODE_NEED_RETRY == code) {
×
2591
      code = TSDB_CODE_SUCCESS;
×
2592
    }
2593
  }
2594
  if (code) {
×
2595
    goto _OVER;
×
2596
  }
2597

2598
  if (!exist) {
×
2599
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
2600
  } else {
2601
    int32_t contLen = tSerializeTableTSMAInfoRsp(NULL, 0, &rsp);
×
2602
    void   *pRsp = rpcMallocCont(contLen);
×
2603
    if (pRsp == NULL) {
×
2604
      code = terrno;
×
2605
      goto _OVER;
×
2606
    }
2607

2608
    int32_t len = tSerializeTableTSMAInfoRsp(pRsp, contLen, &rsp);
×
2609
    if (len < 0) {
×
2610
      code = terrno;
×
2611
      goto _OVER;
×
2612
    }
2613

2614
    pReq->info.rsp = pRsp;
×
2615
    pReq->info.rspLen = contLen;
×
2616

2617
    code = 0;
×
2618
  }
2619

2620
_OVER:
×
2621
  tFreeTableTSMAInfoRsp(&rsp);
×
2622
  TAOS_RETURN(code);
×
2623
}
2624

2625
static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo **ppTsma) {
×
2626
  STableTSMAInfo *pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2627
  if (!pInfo) {
×
2628
    return terrno;
×
2629
  }
2630
  pInfo->pFuncs = NULL;
×
2631
  pInfo->tsmaId = pTsmaVer->tsmaId;
×
2632
  tstrncpy(pInfo->dbFName, pTsmaVer->dbFName, TSDB_DB_FNAME_LEN);
×
2633
  tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN);
×
2634
  tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN);
×
2635
  pInfo->dbId = pTsmaVer->dbId;
×
2636
  pInfo->ast = taosMemoryCalloc(1, 1);
×
2637
  if (!pInfo->ast) {
×
2638
    taosMemoryFree(pInfo);
×
2639
    return terrno;
×
2640
  }
2641
  *ppTsma = pInfo;
×
2642
  return TSDB_CODE_SUCCESS;
×
2643
}
2644

2645
int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp,
×
2646
                            int32_t *pRspLen) {
2647
  int32_t         code = -1;
×
2648
  STSMAHbRsp      hbRsp = {0};
×
2649
  int32_t         rspLen = 0;
×
2650
  void           *pRsp = NULL;
×
2651
  char            tsmaFName[TSDB_TABLE_FNAME_LEN] = {0};
×
2652
  STableTSMAInfo *pTsmaInfo = NULL;
×
2653

2654
  hbRsp.pTsmas = taosArrayInit(numOfTsmas, POINTER_BYTES);
×
2655
  if (!hbRsp.pTsmas) {
×
2656
    code = terrno;
×
2657
    TAOS_RETURN(code);
×
2658
  }
2659

2660
  for (int32_t i = 0; i < numOfTsmas; ++i) {
×
2661
    STSMAVersion *pTsmaVer = &pTsmaVersions[i];
×
2662
    pTsmaVer->dbId = be64toh(pTsmaVer->dbId);
×
2663
    pTsmaVer->tsmaId = be64toh(pTsmaVer->tsmaId);
×
2664
    pTsmaVer->version = ntohl(pTsmaVer->version);
×
2665

2666
    snprintf(tsmaFName, sizeof(tsmaFName), "%s.%s", pTsmaVer->dbFName, pTsmaVer->name);
×
2667
    SSmaObj *pSma = mndAcquireSma(pMnode, tsmaFName);
×
2668
    if (!pSma) {
×
2669
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2670
      if (code) goto _OVER;
×
2671
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2672
        code = terrno;
×
2673
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2674
        goto _OVER;
×
2675
      }
2676
      continue;
×
2677
    }
2678

2679
    if (pSma->uid != pTsmaVer->tsmaId) {
×
2680
      mDebug("tsma: %s.%" PRIx64 " tsmaId mismatch with current %" PRIx64, tsmaFName, pTsmaVer->tsmaId, pSma->uid);
×
2681
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2682
      mndReleaseSma(pMnode, pSma);
×
2683
      if (code) goto _OVER;
×
2684
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2685
        code = terrno;
×
2686
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2687
        goto _OVER;
×
2688
      }
2689
      continue;
×
2690
    } else if (pSma->version == pTsmaVer->version) {
×
2691
      mndReleaseSma(pMnode, pSma);
×
2692
      continue;
×
2693
    }
2694

2695
    SStbObj *pDestStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
2696
    if (!pDestStb) {
×
2697
      mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName);
×
2698
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2699
      mndReleaseSma(pMnode, pSma);
×
2700
      if (code) goto _OVER;
×
2701
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2702
        code = terrno;
×
2703
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2704
        goto _OVER;
×
2705
      }
2706
      continue;
×
2707
    }
2708

2709
    // dump smaObj into rsp
2710
    STableTSMAInfo *pInfo = NULL;
×
2711
    pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2712
    if (!pInfo) {
×
2713
      code = terrno;
×
2714
      mndReleaseSma(pMnode, pSma);
×
2715
      mndReleaseStb(pMnode, pDestStb);
×
2716
      goto _OVER;
×
2717
    }
2718

2719
    SSmaObj *pBaseSma = NULL;
×
2720
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma);
×
2721
    if (code == 0) code = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma);
×
2722

2723
    mndReleaseStb(pMnode, pDestStb);
×
2724
    mndReleaseSma(pMnode, pSma);
×
2725
    if (pBaseSma) mndReleaseSma(pMnode, pBaseSma);
×
2726
    if (terrno) {
×
2727
      tFreeAndClearTableTSMAInfo(pInfo);
×
2728
      goto _OVER;
×
2729
    }
2730

2731
    if (NULL == taosArrayPush(hbRsp.pTsmas, pInfo)) {
×
2732
      code = terrno;
×
2733
      tFreeAndClearTableTSMAInfo(pInfo);
×
2734
      goto _OVER;
×
2735
    }
2736
  }
2737

2738
  rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp);
×
2739
  if (rspLen < 0) {
×
2740
    code = terrno;
×
2741
    goto _OVER;
×
2742
  }
2743

2744
  pRsp = taosMemoryMalloc(rspLen);
×
2745
  if (!pRsp) {
×
2746
    code = terrno;
×
2747
    rspLen = 0;
×
2748
    goto _OVER;
×
2749
  }
2750

2751
  rspLen = tSerializeTSMAHbRsp(pRsp, rspLen, &hbRsp);
×
2752
  if (rspLen < 0) {
×
2753
    code = terrno;
×
2754
    goto _OVER;
×
2755
  }
2756
  code = 0;
×
2757
_OVER:
×
2758
  tFreeTSMAHbRsp(&hbRsp);
×
2759
  *ppRsp = pRsp;
×
2760
  *pRspLen = rspLen;
×
2761
  TAOS_RETURN(code);
×
2762
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc