• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3599

08 Feb 2025 11:23AM UTC coverage: 1.77% (-61.6%) from 63.396%
#3599

push

travis-ci

web-flow
Merge pull request #29712 from taosdata/fix/TD-33652-3.0

fix: reduce write rows from 30w to 3w

3776 of 278949 branches covered (1.35%)

Branch coverage included in aggregate %.

6012 of 274147 relevant lines covered (2.19%)

1642.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndSma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSma.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndIndex.h"
22
#include "mndIndexComm.h"
23
#include "mndInfoSchema.h"
24
#include "mndMnode.h"
25
#include "mndPrivilege.h"
26
#include "mndScheduler.h"
27
#include "mndShow.h"
28
#include "mndStb.h"
29
#include "mndStream.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "parser.h"
34
#include "tname.h"
35

36
#define TSDB_SMA_VER_NUMBER   1
37
#define TSDB_SMA_RESERVE_SIZE 64
38

39
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma);
40
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
41
static int32_t  mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
42
static int32_t  mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
43
static int32_t  mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
44
static int32_t  mndProcessCreateSmaReq(SRpcMsg *pReq);
45
static int32_t  mndProcessDropSmaReq(SRpcMsg *pReq);
46
static int32_t  mndProcessGetSmaReq(SRpcMsg *pReq);
47
static int32_t  mndProcessGetTbSmaReq(SRpcMsg *pReq);
48
static int32_t  mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void     mndDestroySmaObj(SSmaObj *pSmaObj);
50

51
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq);
52
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq);
53

54
// sma and tag index comm func
55
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
56
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
57
static void    mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
58

59
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
60
static void    mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter);
61
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq);
62

63
typedef struct SCreateTSMACxt {
64
  SMnode        *pMnode;
65
  const SRpcMsg *pRpcReq;
66
  union {
67
    const SMCreateSmaReq *pCreateSmaReq;
68
    const SMDropSmaReq   *pDropSmaReq;
69
  };
70
  SDbObj             *pDb;
71
  SStbObj            *pSrcStb;
72
  SSmaObj            *pSma;
73
  const SSmaObj      *pBaseSma;
74
  SCMCreateStreamReq *pCreateStreamReq;
75
  SMDropStreamReq    *pDropStreamReq;
76
  const char         *streamName;
77
  const char         *targetStbFullName;
78
  SNodeList          *pProjects;
79
} SCreateTSMACxt;
80

81
int32_t mndInitSma(SMnode *pMnode) {
×
82
  SSdbTable table = {
×
83
      .sdbType = SDB_SMA,
84
      .keyType = SDB_KEY_BINARY,
85
      .encodeFp = (SdbEncodeFp)mndSmaActionEncode,
86
      .decodeFp = (SdbDecodeFp)mndSmaActionDecode,
87
      .insertFp = (SdbInsertFp)mndSmaActionInsert,
88
      .updateFp = (SdbUpdateFp)mndSmaActionUpdate,
89
      .deleteFp = (SdbDeleteFp)mndSmaActionDelete,
90
  };
91

92
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq);
×
93
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
×
94
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
×
95
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
×
96
  mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
×
97
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
×
98

99
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
×
100
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
×
101

102
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq);
×
103
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq);
×
104
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_TSMA, mndProcessGetTbTSMAReq);
×
105
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TSMA, mndProcessGetTbTSMAReq);
×
106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA);
×
107
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA);
×
108

109
  return sdbSetTable(pMnode->pSdb, table);
×
110
}
111

112
void mndCleanupSma(SMnode *pMnode) {}
×
113

114
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
×
115
  int32_t code = 0;
×
116
  int32_t lino = 0;
×
117
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
118

119
  int32_t size =
×
120
      sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
×
121
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size);
×
122
  if (pRaw == NULL) goto _OVER;
×
123

124
  int32_t dataPos = 0;
×
125
  SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
×
126
  SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
×
127
  SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
×
128
  SDB_SET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
×
129
  SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER)
×
130
  SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER)
×
131
  SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER)
×
132
  SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER)
×
133
  SDB_SET_INT64(pRaw, dataPos, pSma->dstTbUid, _OVER)
×
134
  SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER)
×
135
  SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER)
×
136
  SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER)
×
137
  SDB_SET_INT32(pRaw, dataPos, pSma->dstVgId, _OVER)
×
138
  SDB_SET_INT64(pRaw, dataPos, pSma->interval, _OVER)
×
139
  SDB_SET_INT64(pRaw, dataPos, pSma->offset, _OVER)
×
140
  SDB_SET_INT64(pRaw, dataPos, pSma->sliding, _OVER)
×
141
  SDB_SET_INT32(pRaw, dataPos, pSma->exprLen, _OVER)
×
142
  SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER)
×
143
  SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER)
×
144
  SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER)
×
145
  SDB_SET_INT32(pRaw, dataPos, pSma->version, _OVER)
×
146

147
  if (pSma->exprLen > 0) {
×
148
    SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
×
149
  }
150
  if (pSma->tagsFilterLen > 0) {
×
151
    SDB_SET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
152
  }
153
  if (pSma->sqlLen > 0) {
×
154
    SDB_SET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
×
155
  }
156
  if (pSma->astLen > 0) {
×
157
    SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
×
158
  }
159
  SDB_SET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
×
160

161
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
×
162
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
×
163

164
  terrno = 0;
×
165

166
_OVER:
×
167
  if (terrno != 0) {
×
168
    mError("sma:%s, failed to encode to raw:%p since %s", pSma->name, pRaw, terrstr());
×
169
    sdbFreeRaw(pRaw);
×
170
    return NULL;
×
171
  }
172

173
  mTrace("sma:%s, encode to raw:%p, row:%p", pSma->name, pRaw, pSma);
×
174
  return pRaw;
×
175
}
176

177
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
×
178
  int32_t code = 0;
×
179
  int32_t lino = 0;
×
180
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
181
  SSdbRow *pRow = NULL;
×
182
  SSmaObj *pSma = NULL;
×
183

184
  int8_t sver = 0;
×
185
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
×
186

187
  if (sver != TSDB_SMA_VER_NUMBER) {
×
188
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
189
    goto _OVER;
×
190
  }
191

192
  pRow = sdbAllocRow(sizeof(SSmaObj));
×
193
  if (pRow == NULL) goto _OVER;
×
194

195
  pSma = sdbGetRowObj(pRow);
×
196
  if (pSma == NULL) goto _OVER;
×
197

198
  int32_t dataPos = 0;
×
199

200
  SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
×
201
  SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
×
202
  SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
×
203
  SDB_GET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
×
204
  SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER)
×
205
  SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER)
×
206
  SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER)
×
207
  SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER)
×
208
  SDB_GET_INT64(pRaw, dataPos, &pSma->dstTbUid, _OVER)
×
209
  SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER)
×
210
  SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER)
×
211
  SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER)
×
212
  SDB_GET_INT32(pRaw, dataPos, &pSma->dstVgId, _OVER)
×
213
  SDB_GET_INT64(pRaw, dataPos, &pSma->interval, _OVER)
×
214
  SDB_GET_INT64(pRaw, dataPos, &pSma->offset, _OVER)
×
215
  SDB_GET_INT64(pRaw, dataPos, &pSma->sliding, _OVER)
×
216
  SDB_GET_INT32(pRaw, dataPos, &pSma->exprLen, _OVER)
×
217
  SDB_GET_INT32(pRaw, dataPos, &pSma->tagsFilterLen, _OVER)
×
218
  SDB_GET_INT32(pRaw, dataPos, &pSma->sqlLen, _OVER)
×
219
  SDB_GET_INT32(pRaw, dataPos, &pSma->astLen, _OVER)
×
220
  SDB_GET_INT32(pRaw, dataPos, &pSma->version, _OVER)
×
221

222
  if (pSma->exprLen > 0) {
×
223
    pSma->expr = taosMemoryCalloc(pSma->exprLen, 1);
×
224
    if (pSma->expr == NULL) goto _OVER;
×
225
    SDB_GET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
×
226
  }
227

228
  if (pSma->tagsFilterLen > 0) {
×
229
    pSma->tagsFilter = taosMemoryCalloc(pSma->tagsFilterLen, 1);
×
230
    if (pSma->tagsFilter == NULL) goto _OVER;
×
231
    SDB_GET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
232
  }
233

234
  if (pSma->sqlLen > 0) {
×
235
    pSma->sql = taosMemoryCalloc(pSma->sqlLen, 1);
×
236
    if (pSma->sql == NULL) goto _OVER;
×
237
    SDB_GET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
×
238
  }
239

240
  if (pSma->astLen > 0) {
×
241
    pSma->ast = taosMemoryCalloc(pSma->astLen, 1);
×
242
    if (pSma->ast == NULL) goto _OVER;
×
243
    SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
×
244
  }
245
  SDB_GET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
×
246

247
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
×
248

249
  terrno = 0;
×
250

251
_OVER:
×
252
  if (terrno != 0) {
×
253
    if (pSma != NULL) {
×
254
      mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr());
×
255
      taosMemoryFreeClear(pSma->expr);
×
256
      taosMemoryFreeClear(pSma->tagsFilter);
×
257
      taosMemoryFreeClear(pSma->sql);
×
258
      taosMemoryFreeClear(pSma->ast);
×
259
    }
260
    taosMemoryFreeClear(pRow);
×
261
    return NULL;
×
262
  }
263

264
  mTrace("sma:%s, decode from raw:%p, row:%p", pSma->name, pRaw, pSma);
×
265
  return pRow;
×
266
}
267

268
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma) {
×
269
  mTrace("sma:%s, perform insert action, row:%p", pSma->name, pSma);
×
270
  return 0;
×
271
}
272

273
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSma) {
×
274
  mTrace("sma:%s, perform delete action, row:%p", pSma->name, pSma);
×
275
  taosMemoryFreeClear(pSma->tagsFilter);
×
276
  taosMemoryFreeClear(pSma->expr);
×
277
  taosMemoryFreeClear(pSma->sql);
×
278
  taosMemoryFreeClear(pSma->ast);
×
279
  return 0;
×
280
}
281

282
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew) {
×
283
  mTrace("sma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
×
284
  return 0;
×
285
}
286

287
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName) {
×
288
  SSdb    *pSdb = pMnode->pSdb;
×
289
  SSmaObj *pSma = sdbAcquire(pSdb, SDB_SMA, smaName);
×
290
  if (pSma == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
291
    terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
×
292
  }
293
  return pSma;
×
294
}
295

296
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) {
×
297
  SSdb *pSdb = pMnode->pSdb;
×
298
  sdbRelease(pSdb, pSma);
×
299
}
×
300

301
SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *db) { return mndAcquireDb(pMnode, db); }
×
302

303
static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
×
304
  SEncoder encoder = {0};
×
305
  int32_t  contLen = 0;
×
306
  SName    name = {0};
×
307
  int32_t  code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
308
  if (TSDB_CODE_SUCCESS != code) {
×
309
    return NULL;
×
310
  }
311

312
  SVCreateTSmaReq req = {0};
×
313
  req.version = 0;
×
314
  req.intervalUnit = pSma->intervalUnit;
×
315
  req.slidingUnit = pSma->slidingUnit;
×
316
//  req.timezoneInt = pSma->timezone;
317
  tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
×
318
  req.exprLen = pSma->exprLen;
×
319
  req.tagsFilterLen = pSma->tagsFilterLen;
×
320
  req.indexUid = pSma->uid;
×
321
  req.tableUid = pSma->stbUid;
×
322
  req.dstVgId = pSma->dstVgId;
×
323
  req.dstTbUid = pSma->dstTbUid;
×
324
  req.interval = pSma->interval;
×
325
  req.offset = pSma->offset;
×
326
  req.sliding = pSma->sliding;
×
327
  req.expr = pSma->expr;
×
328
  req.tagsFilter = pSma->tagsFilter;
×
329
  req.schemaRow = pSma->schemaRow;
×
330
  req.schemaTag = pSma->schemaTag;
×
331
  req.dstTbName = pSma->dstTbName;
×
332

333
  // get length
334
  int32_t ret = 0;
×
335
  tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret);
×
336
  if (ret < 0) {
×
337
    return NULL;
×
338
  }
339
  contLen += sizeof(SMsgHead);
×
340

341
  SMsgHead *pHead = taosMemoryMalloc(contLen);
×
342
  if (pHead == NULL) {
×
343
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
344
    return NULL;
×
345
  }
346

347
  pHead->contLen = htonl(contLen);
×
348
  pHead->vgId = htonl(pVgroup->vgId);
×
349

350
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
×
351
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
×
352
  if (tEncodeSVCreateTSmaReq(&encoder, &req) < 0) {
×
353
    taosMemoryFreeClear(pHead);
×
354
    tEncoderClear(&encoder);
×
355
    return NULL;
×
356
  }
357

358
  tEncoderClear(&encoder);
×
359

360
  *pContLen = contLen;
×
361
  return pHead;
×
362
}
363

364
static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
×
365
  SEncoder encoder = {0};
×
366
  int32_t  contLen;
367
  SName    name = {0};
×
368
  int32_t  code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
369
  if (TSDB_CODE_SUCCESS != code) {
×
370
    terrno = code;
×
371
    return NULL;
×
372
  }
373

374
  SVDropTSmaReq req = {0};
×
375
  req.indexUid = pSma->uid;
×
376
  tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
×
377

378
  // get length
379
  int32_t ret = 0;
×
380
  tEncodeSize(tEncodeSVDropTSmaReq, &req, contLen, ret);
×
381
  if (ret < 0) {
×
382
    return NULL;
×
383
  }
384

385
  contLen += sizeof(SMsgHead);
×
386

387
  SMsgHead *pHead = taosMemoryMalloc(contLen);
×
388
  if (pHead == NULL) {
×
389
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
390
    return NULL;
×
391
  }
392

393
  pHead->contLen = htonl(contLen);
×
394
  pHead->vgId = htonl(pVgroup->vgId);
×
395

396
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
×
397
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
×
398

399
  if (tEncodeSVDropTSmaReq(&encoder, &req) < 0) {
×
400
    taosMemoryFreeClear(pHead);
×
401
    tEncoderClear(&encoder);
×
402
    return NULL;
×
403
  }
404
  tEncoderClear(&encoder);
×
405

406
  *pContLen = contLen;
×
407
  return pHead;
×
408
}
409

410
static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
411
  int32_t  code = 0;
×
412
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
×
413
  if (pRedoRaw == NULL) {
×
414
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
415
    if (terrno != 0) code = terrno;
×
416
    TAOS_RETURN(code);
×
417
  }
418
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
419
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
420

421
  TAOS_RETURN(code);
×
422
}
423

424
static int32_t mndSetCreateSmaUndoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
425
  int32_t  code = 0;
×
426
  SSdbRaw *pUndoRaw = mndSmaActionEncode(pSma);
×
427
  if (!pUndoRaw) {
×
428
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
429
    if (terrno != 0) code = terrno;
×
430
    TAOS_RETURN(code);
×
431
  }
432
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
433
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
434
  TAOS_RETURN(code);
×
435
}
436

437
static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
438
  int32_t  code = 0;
×
439
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
×
440
  if (pCommitRaw == NULL) {
×
441
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
442
    if (terrno != 0) code = terrno;
×
443
    TAOS_RETURN(code);
×
444
  }
445
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
446
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
447

448
  TAOS_RETURN(code);
×
449
}
450

451
static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
452
  int32_t  code = 0;
×
453
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
454
  if (pVgRaw == NULL) {
×
455
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
456
    if (terrno != 0) code = terrno;
×
457
    TAOS_RETURN(code);
×
458
  }
459
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
×
460
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_UPDATE));
×
461
  TAOS_RETURN(code);
×
462
}
463

464
static int32_t mndSetCreateSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
465
  int32_t  code = 0;
×
466
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
467
  if (pVgRaw == NULL) {
×
468
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
469
    if (terrno != 0) code = terrno;
×
470
    TAOS_RETURN(code);
×
471
  }
472
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
×
473
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_READY));
×
474
  TAOS_RETURN(code);
×
475
}
476

477
static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
×
478
  int32_t code = 0;
×
479
  SStbObj stbObj = {0};
×
480
  taosRLockLatch(&pStb->lock);
×
481
  memcpy(&stbObj, pStb, sizeof(SStbObj));
×
482
  taosRUnLockLatch(&pStb->lock);
×
483
  stbObj.numOfColumns = 0;
×
484
  stbObj.pColumns = NULL;
×
485
  stbObj.numOfTags = 0;
×
486
  stbObj.pTags = NULL;
×
487
  stbObj.numOfFuncs = 0;
×
488
  stbObj.pFuncs = NULL;
×
489
  stbObj.updateTime = taosGetTimestampMs();
×
490
  stbObj.lock = 0;
×
491
  stbObj.smaVer++;
×
492

493
  SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj);
×
494
  if (pCommitRaw == NULL) {
×
495
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
496
    if (terrno != 0) code = terrno;
×
497
    TAOS_RETURN(code);
×
498
  }
499
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
500
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
501

502
  TAOS_RETURN(code);
×
503
}
504

505
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
×
506
                                                SSmaObj *pSma) {
507
  int32_t    code = 0;
×
508
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
×
509
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
510
  if (pDnode == NULL) {
×
511
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
512
    if (terrno != 0) code = terrno;
×
513
    TAOS_RETURN(code);
×
514
  }
515

516
  STransAction action = {0};
×
517
  action.epSet = mndGetDnodeEpset(pDnode);
×
518
  mndReleaseDnode(pMnode, pDnode);
×
519

520
  // todo add sma info here
521
  SNode *pAst = NULL;
×
522
  TAOS_CHECK_RETURN(nodesStringToNode(pSma->ast, &pAst));
×
523
  if ((code = qExtractResultSchema(pAst, &pSma->schemaRow.nCols, &pSma->schemaRow.pSchema)) != 0) {
×
524
    nodesDestroyNode(pAst);
×
525
    TAOS_RETURN(code);
×
526
  }
527
  nodesDestroyNode(pAst);
×
528
  pSma->schemaRow.version = 1;
×
529

530
  // TODO: the schemaTag generated by qExtractResultXXX later.
531
  pSma->schemaTag.nCols = 1;
×
532
  pSma->schemaTag.version = 1;
×
533
  pSma->schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema));
×
534
  if (!pSma->schemaTag.pSchema) {
×
535
    TAOS_RETURN(-1);
×
536
  }
537
  pSma->schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT;
×
538
  pSma->schemaTag.pSchema[0].bytes = TYPE_BYTES[TSDB_DATA_TYPE_BIGINT];
×
539
  pSma->schemaTag.pSchema[0].colId = pSma->schemaRow.nCols + PRIMARYKEY_TIMESTAMP_COL_ID;
×
540
  pSma->schemaTag.pSchema[0].flags = 0;
×
541
  snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId");
×
542

543
  int32_t smaContLen = 0;
×
544
  void   *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
×
545
  if (pSmaReq == NULL) {
×
546
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
547
    if (terrno != 0) code = terrno;
×
548
    TAOS_RETURN(code);
×
549
  }
550
  pVgroup->pTsma = pSmaReq;
×
551

552
  int32_t contLen = 0;
×
553
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
554
  if (pReq == NULL) {
×
555
    taosMemoryFreeClear(pSmaReq);
×
556
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
557
    if (terrno != 0) code = terrno;
×
558
    TAOS_RETURN(code);
×
559
  }
560

561
  action.mTraceId = pTrans->mTraceId;
×
562
  action.pCont = pReq;
×
563
  action.contLen = contLen;
×
564
  action.msgType = TDMT_DND_CREATE_VNODE;
×
565
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
×
566

567
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
568
    taosMemoryFreeClear(pSmaReq);
×
569
    taosMemoryFree(pReq);
×
570
    TAOS_RETURN(code);
×
571
  }
572

573
  action.pCont = pSmaReq;
×
574
  action.contLen = smaContLen;
×
575
  action.msgType = TDMT_VND_CREATE_SMA;
×
576
  action.acceptableCode = TSDB_CODE_TSMA_ALREADY_EXIST;
×
577

578
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
579
    taosMemoryFreeClear(pSmaReq);
×
580
    TAOS_RETURN(code);
×
581
  }
582

583
  TAOS_RETURN(code);
×
584
}
585

586
static void mndDestroySmaObj(SSmaObj *pSmaObj) {
×
587
  if (pSmaObj) {
×
588
    taosMemoryFreeClear(pSmaObj->schemaRow.pSchema);
×
589
    taosMemoryFreeClear(pSmaObj->schemaTag.pSchema);
×
590
  }
591
}
×
592

593
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb,
×
594
                            const char *streamName) {
595
  int32_t code = 0;
×
596
  if (pDb->cfg.replications > 1) {
×
597
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
598
    mError("sma:%s, failed to create since not support multiple replicas", pCreate->name);
×
599
    TAOS_RETURN(code);
×
600
  }
601
  SSmaObj smaObj = {0};
×
602
  memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
×
603
  memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
×
604
  memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
×
605
  smaObj.createdTime = taosGetTimestampMs();
×
606
  smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
×
607

608
  char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
×
609
  snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name);
×
610
  memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
×
611
  smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
×
612
  smaObj.stbUid = pStb->uid;
×
613
  smaObj.dbUid = pStb->dbUid;
×
614
  smaObj.intervalUnit = pCreate->intervalUnit;
×
615
  smaObj.slidingUnit = pCreate->slidingUnit;
×
616
#if 0
617
//  smaObj.timezone = pCreate->timezone;
618
#endif
619
//  smaObj.timezone = taosGetLocalTimezoneOffset();  // use timezone of server
620
  smaObj.interval = pCreate->interval;
×
621
  smaObj.offset = pCreate->offset;
×
622
  smaObj.sliding = pCreate->sliding;
×
623
  smaObj.exprLen = pCreate->exprLen;
×
624
  smaObj.tagsFilterLen = pCreate->tagsFilterLen;
×
625
  smaObj.sqlLen = pCreate->sqlLen;
×
626
  smaObj.astLen = pCreate->astLen;
×
627
  if (smaObj.exprLen > 0) {
×
628
    smaObj.expr = pCreate->expr;
×
629
  }
630
  if (smaObj.tagsFilterLen > 0) {
×
631
    smaObj.tagsFilter = pCreate->tagsFilter;
×
632
  }
633
  if (smaObj.sqlLen > 0) {
×
634
    smaObj.sql = pCreate->sql;
×
635
  }
636
  if (smaObj.astLen > 0) {
×
637
    smaObj.ast = pCreate->ast;
×
638
  }
639

640
  SStreamObj streamObj = {0};
×
641
  tstrncpy(streamObj.name, streamName, TSDB_STREAM_FNAME_LEN);
×
642
  tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
×
643
  tstrncpy(streamObj.targetDb, streamObj.sourceDb, TSDB_DB_FNAME_LEN);
×
644
  streamObj.createTime = taosGetTimestampMs();
×
645
  streamObj.updateTime = streamObj.createTime;
×
646
  streamObj.uid = mndGenerateUid(streamName, strlen(streamName));
×
647
  streamObj.sourceDbUid = pDb->uid;
×
648
  streamObj.targetDbUid = pDb->uid;
×
649
  streamObj.version = 1;
×
650
  streamObj.sql = taosStrdup(pCreate->sql);
×
651
  if (!streamObj.sql) {
×
652
    return terrno;
×
653
  }
654
  streamObj.smaId = smaObj.uid;
×
655
  streamObj.conf.watermark = pCreate->watermark;
×
656
  streamObj.deleteMark = pCreate->deleteMark;
×
657
  streamObj.conf.fillHistory = STREAM_FILL_HISTORY_ON;
×
658
  streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
×
659
  streamObj.conf.triggerParam = pCreate->maxDelay;
×
660
  streamObj.ast = taosStrdup(smaObj.ast);
×
661
  if (!streamObj.ast) {
×
662
    taosMemoryFree(streamObj.sql);
×
663
    return terrno;
×
664
  }
665
  streamObj.indexForMultiAggBalance = -1;
×
666

667
  // check the maxDelay
668
  if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
×
669
    int64_t msInterval = -1;
×
670
    int32_t code =
671
        convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND, &msInterval);
×
672
    if (TSDB_CODE_SUCCESS != code) {
×
673
      mError("sma:%s, failed to create since convert time failed: %s", smaObj.name, tstrerror(code));
×
674
      return code;
×
675
    }
676
    streamObj.conf.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
×
677
  }
678
  if (streamObj.conf.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
×
679
    streamObj.conf.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
×
680
  }
681

682
  if ((code = mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg)) != 0) {
×
683
    mError("sma:%s, failed to create since %s", smaObj.name, tstrerror(code));
×
684
    TAOS_RETURN(code);
×
685
  }
686
  smaObj.dstVgId = streamObj.fixedSinkVg.vgId;
×
687
  streamObj.fixedSinkVgId = smaObj.dstVgId;
×
688

689
  SNode *pAst = NULL;
×
690
  if (nodesStringToNode(streamObj.ast, &pAst) < 0) {
×
691
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
692
    mError("sma:%s, failed to create since parse ast error", smaObj.name);
×
693
    TAOS_RETURN(code);
×
694
  }
695

696
  // extract output schema from ast
697
  if (qExtractResultSchema(pAst, (int32_t *)&streamObj.outputSchema.nCols, &streamObj.outputSchema.pSchema) != 0) {
×
698
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
699
    mError("sma:%s, failed to create since extract result schema error", smaObj.name);
×
700
    TAOS_RETURN(code);
×
701
  }
702

703
  SQueryPlan  *pPlan = NULL;
×
704
  SPlanContext cxt = {
×
705
      .pAstRoot = pAst,
706
      .topicQuery = false,
707
      .streamQuery = true,
708
      .triggerType = streamObj.conf.trigger,
×
709
      .watermark = streamObj.conf.watermark,
×
710
      .deleteMark = streamObj.deleteMark,
×
711
  };
712

713
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
×
714
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
715
    mError("sma:%s, failed to create since create query plan error", smaObj.name);
×
716
    TAOS_RETURN(code);
×
717
  }
718

719
  // save physcial plan
720
  if (nodesNodeToString((SNode *)pPlan, false, &streamObj.physicalPlan, NULL) != 0) {
×
721
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
722
    mError("sma:%s, failed to create since save physcial plan error", smaObj.name);
×
723
    TAOS_RETURN(code);
×
724
  }
725

726
  if (pAst != NULL) nodesDestroyNode(pAst);
×
727
  nodesDestroyNode((SNode *)pPlan);
×
728

729
  code = -1;
×
730
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-sma");
×
731
  if (pTrans == NULL) {
×
732
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
733
    if (terrno != 0) code = terrno;
×
734
    goto _OVER;
×
735
  }
736
  mndTransSetDbName(pTrans, pDb->name, NULL);
×
737
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
738

739
  mndTransSetSerial(pTrans);
×
740
  mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
×
741
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
×
742
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj), NULL, _OVER);
×
743
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
×
744
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj), NULL, _OVER);
×
745
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
×
746
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
×
747
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj), NULL, _OVER);
×
748
  TAOS_CHECK_GOTO(mndScheduleStream(pMnode, &streamObj, 1685959190000, NULL), NULL, _OVER);
×
749
  TAOS_CHECK_GOTO(mndPersistStream(pTrans, &streamObj), NULL, _OVER);
×
750
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
751

752
  mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreate->name,
×
753
        smaObj.uid, smaObj.stbUid, smaObj.dstTbUid, smaObj.dstTbName, smaObj.dstVgId);
754

755
  code = 0;
×
756

757
_OVER:
×
758
  tFreeStreamObj(&streamObj);
×
759
  mndDestroySmaObj(&smaObj);
×
760
  mndTransDrop(pTrans);
×
761
  TAOS_RETURN(code);
×
762
}
763

764
static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
×
765
  int32_t code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
766
  if (pCreate->name[0] == 0) TAOS_RETURN(code);
×
767
  if (pCreate->stb[0] == 0) TAOS_RETURN(code);
×
768
  if (pCreate->igExists < 0 || pCreate->igExists > 1) TAOS_RETURN(code);
×
769
  if (pCreate->intervalUnit < 0) TAOS_RETURN(code);
×
770
  if (pCreate->slidingUnit < 0) TAOS_RETURN(code);
×
771
  if (pCreate->timezone < 0) TAOS_RETURN(code);
×
772
  if (pCreate->interval < 0) TAOS_RETURN(code);
×
773
  if (pCreate->offset < 0) TAOS_RETURN(code);
×
774
  if (pCreate->sliding < 0) TAOS_RETURN(code);
×
775
  if (pCreate->exprLen < 0) TAOS_RETURN(code);
×
776
  if (pCreate->tagsFilterLen < 0) TAOS_RETURN(code);
×
777
  if (pCreate->sqlLen < 0) TAOS_RETURN(code);
×
778
  if (pCreate->astLen < 0) TAOS_RETURN(code);
×
779
  if (pCreate->exprLen != 0 && strlen(pCreate->expr) + 1 != pCreate->exprLen) TAOS_RETURN(code);
×
780
  if (pCreate->tagsFilterLen != 0 && strlen(pCreate->tagsFilter) + 1 != pCreate->tagsFilterLen) TAOS_RETURN(code);
×
781
  if (pCreate->sqlLen != 0 && strlen(pCreate->sql) + 1 != pCreate->sqlLen) TAOS_RETURN(code);
×
782
  if (pCreate->astLen != 0 && strlen(pCreate->ast) + 1 != pCreate->astLen) TAOS_RETURN(code);
×
783

784
  SName smaName = {0};
×
785
  if (tNameFromString(&smaName, pCreate->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) < 0) return -1;
×
786
  if (*(char *)tNameGetTableName(&smaName) == 0) return -1;
×
787

788
  code = 0;
×
789
  TAOS_RETURN(code);
×
790
}
791

792
static int32_t mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
×
793
  SName   n;
794
  int32_t code = tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
795
  if (TSDB_CODE_SUCCESS != code) {
×
796
    return code;
×
797
  }
798
  snprintf(streamName, TSDB_TABLE_FNAME_LEN,"%d.%s", n.acctId, n.tname);
×
799
  return TSDB_CODE_SUCCESS;
×
800
}
801

802
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
×
803
  SMnode        *pMnode = pReq->info.node;
×
804
  int32_t        code = -1;
×
805
  SStbObj       *pStb = NULL;
×
806
  SSmaObj       *pSma = NULL;
×
807
  SStreamObj    *pStream = NULL;
×
808
  SDbObj        *pDb = NULL;
×
809
  SMCreateSmaReq createReq = {0};
×
810

811
  int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
×
812

813
  TAOS_CHECK_GOTO(tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
×
814

815
#ifdef WINDOWS
816
  terrno = TSDB_CODE_MND_INVALID_PLATFORM;
817
  goto _OVER;
818
#endif
819
  mInfo("sma:%s, start to create", createReq.name);
×
820
  TAOS_CHECK_GOTO(mndCheckCreateSmaReq(&createReq), NULL, _OVER);
×
821

822
  pStb = mndAcquireStb(pMnode, createReq.stb);
×
823
  if (pStb == NULL) {
×
824
    mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
825
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
826
    if (terrno != 0) code = terrno;
×
827
    goto _OVER;
×
828
  }
829

830
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
831
  code = mndGetStreamNameFromSmaName(streamName, createReq.name);
×
832
  if (TSDB_CODE_SUCCESS != code) {
×
833
    goto _OVER;
×
834
  }
835

836
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
837
  if (pStream != NULL || code == 0) {
×
838
    mError("sma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
×
839
    code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
840
    goto _OVER;
×
841
  }
842
  SSIdx idx = {0};
×
843
  if ((code = mndAcquireGlobalIdx(pMnode, createReq.name, SDB_SMA, &idx)) == 0) {
×
844
    pSma = idx.pIdx;
×
845
  } else {
846
    goto _OVER;
×
847
  }
848

849
  if (pSma != NULL) {
×
850
    if (createReq.igExists) {
×
851
      mInfo("sma:%s, already exist in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
852
      code = 0;
×
853
      goto _OVER;
×
854
    } else {
855
      code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
856
      goto _OVER;
×
857
    }
858
  }
859

860
  SName name = {0};
×
861
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
862
  if (TSDB_CODE_SUCCESS != code) {
×
863
    goto _OVER;
×
864
  }
865
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
866
  (void)tNameGetFullDbName(&name, db);
×
867

868
  pDb = mndAcquireDb(pMnode, db);
×
869
  if (pDb == NULL) {
×
870
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
871
    goto _OVER;
×
872
  }
873

874
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
875

876
  code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb, streamName);
×
877
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
878

879
_OVER:
×
880
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
881
    mError("sma:%s, failed to create since %s", createReq.name, tstrerror(code));
×
882
  }
883

884
  mndReleaseStb(pMnode, pStb);
×
885
  mndReleaseSma(pMnode, pSma);
×
886
  mndReleaseStream(pMnode, pStream);
×
887
  mndReleaseDb(pMnode, pDb);
×
888
  tFreeSMCreateSmaReq(&createReq);
×
889

890
  TAOS_RETURN(code);
×
891
}
892

893
static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
894
  int32_t  code = 0;
×
895
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
×
896
  if (pRedoRaw == NULL) {
×
897
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
898
    if (terrno != 0) code = terrno;
×
899
    return -1;
×
900
  }
901
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
902
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
903

904
  return 0;
×
905
}
906

907
static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
908
  int32_t  code = 0;
×
909
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
×
910
  if (pCommitRaw == NULL) {
×
911
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
912
    if (terrno != 0) code = terrno;
×
913
    return -1;
×
914
  }
915
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
916
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
917

918
  return 0;
×
919
}
920

921
static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
922
  int32_t  code = 0;
×
923
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
924
  if (pVgRaw == NULL) {
×
925
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
926
    if (terrno != 0) code = terrno;
×
927
    return -1;
×
928
  }
929
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
×
930
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPING));
×
931

932
  return 0;
×
933
}
934

935
static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
936
  int32_t  code = 0;
×
937
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
938
  if (pVgRaw == NULL) {
×
939
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
940
    if (terrno != 0) code = terrno;
×
941
    return -1;
×
942
  }
943
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
×
944
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED));
×
945

946
  return 0;
×
947
}
948

949
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
950
  int32_t    code = 0;
×
951
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
×
952
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
953
  if (pDnode == NULL) {
×
954
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
955
    if (terrno != 0) code = terrno;
×
956
    TAOS_RETURN(code);
×
957
  }
958

959
  STransAction action = {0};
×
960
  action.epSet = mndGetDnodeEpset(pDnode);
×
961
  mndReleaseDnode(pMnode, pDnode);
×
962

963
  int32_t contLen = 0;
×
964
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
965
  if (pReq == NULL) {
×
966
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
967
    if (terrno != 0) code = terrno;
×
968
    TAOS_RETURN(code);
×
969
  }
970

971
  action.pCont = pReq;
×
972
  action.contLen = contLen;
×
973
  action.msgType = TDMT_DND_DROP_VNODE;
×
974
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
×
975

976
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
977
    taosMemoryFree(pReq);
×
978
    TAOS_RETURN(code);
×
979
  }
980

981
  TAOS_RETURN(code);
×
982
}
983

984
static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) {
×
985
  int32_t     code = -1;
×
986
  SVgObj     *pVgroup = NULL;
×
987
  SStbObj    *pStb = NULL;
×
988
  STrans     *pTrans = NULL;
×
989
  SStreamObj *pStream = NULL;
×
990

991
  pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
992
  if (pVgroup == NULL) {
×
993
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
994
    if (terrno != 0) code = terrno;
×
995
    goto _OVER;
×
996
  }
997

998
  pStb = mndAcquireStb(pMnode, pSma->stb);
×
999
  if (pStb == NULL) {
×
1000
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1001
    if (terrno != 0) code = terrno;
×
1002
    goto _OVER;
×
1003
  }
1004

1005
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-sma");
×
1006
  if (pTrans == NULL) {
×
1007
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1008
    if (terrno != 0) code = terrno;
×
1009
    goto _OVER;
×
1010
  }
1011

1012
  mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
×
1013
  mndTransSetDbName(pTrans, pDb->name, NULL);
×
1014
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
1015

1016
  mndTransSetSerial(pTrans);
×
1017

1018
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1019
  code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
1020
  if (TSDB_CODE_SUCCESS != code) {
×
1021
    goto _OVER;
×
1022
  }
1023

1024
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
1025
  if (pStream == NULL || pStream->smaId != pSma->uid || code != 0) {
×
1026
    sdbRelease(pMnode->pSdb, pStream);
×
1027
    goto _OVER;
×
1028
  } else {
1029
    if ((code = mndStreamSetDropAction(pMnode, pTrans, pStream)) < 0) {
×
1030
      mError("stream:%s, failed to drop task since %s", pStream->name, tstrerror(code));
×
1031
      sdbRelease(pMnode->pSdb, pStream);
×
1032
      goto _OVER;
×
1033
    }
1034

1035
    // drop stream
1036
    if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
1037
      mError("stream:%s, failed to drop log since %s", pStream->name, tstrerror(code));
×
1038
      sdbRelease(pMnode->pSdb, pStream);
×
1039
      goto _OVER;
×
1040
    }
1041
  }
1042
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
1043
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
1044
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
1045
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
1046
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
×
1047
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
1048
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
1049

1050
  code = 0;
×
1051

1052
_OVER:
×
1053
  mndTransDrop(pTrans);
×
1054
  mndReleaseStream(pMnode, pStream);
×
1055
  mndReleaseVgroup(pMnode, pVgroup);
×
1056
  mndReleaseStb(pMnode, pStb);
×
1057
  TAOS_RETURN(code);
×
1058
}
1059

1060
int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
×
1061
  SSdb    *pSdb = pMnode->pSdb;
×
1062
  SSmaObj *pSma = NULL;
×
1063
  void    *pIter = NULL;
×
1064
  SVgObj  *pVgroup = NULL;
×
1065
  int32_t  code = -1;
×
1066

1067
  while (1) {
1068
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
×
1069
    if (pIter == NULL) break;
×
1070

1071
    if (pSma->stbUid == pStb->uid) {
×
1072
      mndTransSetSerial(pTrans);
×
1073
      pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
1074
      if (pVgroup == NULL) {
×
1075
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1076
        if (terrno != 0) code = terrno;
×
1077
        goto _OVER;
×
1078
      }
1079

1080
      char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1081
      code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
1082
      if (TSDB_CODE_SUCCESS != code) {
×
1083
        goto _OVER;
×
1084
      }
1085

1086
      SStreamObj *pStream = NULL;
×
1087
      code = mndAcquireStream(pMnode, streamName, &pStream);
×
1088
      if ((pStream != NULL && pStream->smaId == pSma->uid) || code != 0) {
×
1089
        if ((code = mndStreamSetDropAction(pMnode, pTrans, pStream)) < 0) {
×
1090
          mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
×
1091
          mndReleaseStream(pMnode, pStream);
×
1092
          goto _OVER;
×
1093
        }
1094

1095
        if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
1096
          mndReleaseStream(pMnode, pStream);
×
1097
          goto _OVER;
×
1098
        }
1099

1100
        mndReleaseStream(pMnode, pStream);
×
1101
      }
1102

1103
      TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
1104
      TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
1105
      TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
1106
      mndReleaseVgroup(pMnode, pVgroup);
×
1107
      pVgroup = NULL;
×
1108
    }
1109

1110
    sdbRelease(pSdb, pSma);
×
1111
  }
1112

1113
  code = 0;
×
1114

1115
_OVER:
×
1116
  sdbCancelFetch(pSdb, pIter);
×
1117
  sdbRelease(pSdb, pSma);
×
1118
  mndReleaseVgroup(pMnode, pVgroup);
×
1119
  TAOS_RETURN(code);
×
1120
}
1121

1122
int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
×
1123
  int32_t code = 0;
×
1124
  SSdb   *pSdb = pMnode->pSdb;
×
1125
  void   *pIter = NULL;
×
1126

1127
  while (1) {
×
1128
    SSmaObj *pSma = NULL;
×
1129
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
×
1130
    if (pIter == NULL) break;
×
1131

1132
    if (pSma->dbUid == pDb->uid) {
×
1133
      if ((code = mndSetDropSmaCommitLogs(pMnode, pTrans, pSma)) != 0) {
×
1134
        sdbRelease(pSdb, pSma);
×
1135
        sdbCancelFetch(pSdb, pSma);
×
1136
        TAOS_RETURN(code);
×
1137
      }
1138
    }
1139

1140
    sdbRelease(pSdb, pSma);
×
1141
  }
1142

1143
  TAOS_RETURN(code);
×
1144
}
1145

1146
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
×
1147
  SMnode      *pMnode = pReq->info.node;
×
1148
  int32_t      code = -1;
×
1149
  SDbObj      *pDb = NULL;
×
1150
  SSmaObj     *pSma = NULL;
×
1151
  SMDropSmaReq dropReq = {0};
×
1152

1153
  TAOS_CHECK_GOTO(tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
1154

1155
  mInfo("sma:%s, start to drop", dropReq.name);
×
1156

1157
  SSIdx idx = {0};
×
1158
  if ((code = mndAcquireGlobalIdx(pMnode, dropReq.name, SDB_SMA, &idx)) == 0) {
×
1159
    pSma = idx.pIdx;
×
1160
  } else {
1161
    goto _OVER;
×
1162
  }
1163
  if (pSma == NULL) {
×
1164
    if (dropReq.igNotExists) {
×
1165
      mInfo("sma:%s, not exist, ignore not exist is set", dropReq.name);
×
1166
      code = 0;
×
1167
      goto _OVER;
×
1168
    } else {
1169
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1170
      goto _OVER;
×
1171
    }
1172
  }
1173

1174
  SName name = {0};
×
1175
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1176
  if (TSDB_CODE_SUCCESS != code) {
×
1177
    goto _OVER;
×
1178
  }
1179
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
1180
  (void)tNameGetFullDbName(&name, db);
×
1181

1182
  pDb = mndAcquireDb(pMnode, db);
×
1183
  if (pDb == NULL) {
×
1184
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1185
    goto _OVER;
×
1186
  }
1187

1188
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
1189

1190
  code = mndDropSma(pMnode, pReq, pDb, pSma);
×
1191
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1192

1193
_OVER:
×
1194
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1195
    mError("sma:%s, failed to drop since %s", dropReq.name, tstrerror(code));
×
1196
  }
1197

1198
  mndReleaseSma(pMnode, pSma);
×
1199
  mndReleaseDb(pMnode, pDb);
×
1200
  TAOS_RETURN(code);
×
1201
}
1202

1203
static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
×
1204
  int32_t  code = -1;
×
1205
  SSmaObj *pSma = NULL;
×
1206

1207
  SSIdx idx = {0};
×
1208
  if (0 == mndAcquireGlobalIdx(pMnode, indexReq->indexFName, SDB_SMA, &idx)) {
×
1209
    pSma = idx.pIdx;
×
1210
  } else {
1211
    *exist = false;
×
1212
    return 0;
×
1213
  }
1214

1215
  if (pSma == NULL) {
×
1216
    *exist = false;
×
1217
    return 0;
×
1218
  }
1219

1220
  memcpy(rsp->dbFName, pSma->db, sizeof(pSma->db));
×
1221
  memcpy(rsp->tblFName, pSma->stb, sizeof(pSma->stb));
×
1222
  tstrncpy(rsp->indexType, TSDB_INDEX_TYPE_SMA, TSDB_INDEX_TYPE_LEN);
×
1223

1224
  SNodeList *pList = NULL;
×
1225
  int32_t    extOffset = 0;
×
1226
  code = nodesStringToList(pSma->expr, &pList);
×
1227
  if (0 == code) {
×
1228
    SNode *node = NULL;
×
1229
    FOREACH(node, pList) {
×
1230
      SFunctionNode *pFunc = (SFunctionNode *)node;
×
1231
      extOffset += tsnprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
×
1232
                             (extOffset ? "," : ""), pFunc->functionName);
×
1233
    }
1234

1235
    *exist = true;
×
1236
  }
1237

1238
  mndReleaseSma(pMnode, pSma);
×
1239
  return code;
×
1240
}
1241

1242
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) {
×
1243
  int32_t         code = 0;
×
1244
  SSmaObj        *pSma = NULL;
×
1245
  SSdb           *pSdb = pMnode->pSdb;
×
1246
  void           *pIter = NULL;
×
1247
  STableIndexInfo info;
1248

1249
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
×
1250
  if (NULL == pStb) {
×
1251
    *exist = false;
×
1252
    return TSDB_CODE_SUCCESS;
×
1253
  }
1254

1255
  tstrncpy(rsp->dbFName, pStb->db, TSDB_DB_FNAME_LEN);
×
1256
  tstrncpy(rsp->tbName, pStb->name + strlen(pStb->db) + 1, TSDB_TABLE_NAME_LEN);
×
1257
  rsp->suid = pStb->uid;
×
1258
  rsp->version = pStb->smaVer;
×
1259
  mndReleaseStb(pMnode, pStb);
×
1260

1261
  while (1) {
×
1262
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
×
1263
    if (pIter == NULL) break;
×
1264

1265
    if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
×
1266
      sdbRelease(pSdb, pSma);
×
1267
      continue;
×
1268
    }
1269

1270
    info.intervalUnit = pSma->intervalUnit;
×
1271
    info.slidingUnit = pSma->slidingUnit;
×
1272
    info.interval = pSma->interval;
×
1273
    info.offset = pSma->offset;
×
1274
    info.sliding = pSma->sliding;
×
1275
    info.dstTbUid = pSma->dstTbUid;
×
1276
    info.dstVgId = pSma->dstVgId;
×
1277

1278
    SVgObj *pVg = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
1279
    if (pVg == NULL) {
×
1280
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1281
      if (terrno != 0) code = terrno;
×
1282
      sdbRelease(pSdb, pSma);
×
1283
      sdbCancelFetch(pSdb, pIter);
×
1284
      return code;
×
1285
    }
1286
    info.epSet = mndGetVgroupEpset(pMnode, pVg);
×
1287

1288
    info.expr = taosMemoryMalloc(pSma->exprLen + 1);
×
1289
    if (info.expr == NULL) {
×
1290
      code = terrno;
×
1291
      sdbRelease(pSdb, pSma);
×
1292
      sdbCancelFetch(pSdb, pIter);
×
1293
      return code;
×
1294
    }
1295

1296
    memcpy(info.expr, pSma->expr, pSma->exprLen);
×
1297
    info.expr[pSma->exprLen] = 0;
×
1298

1299
    if (NULL == taosArrayPush(rsp->pIndex, &info)) {
×
1300
      code = terrno;
×
1301
      taosMemoryFree(info.expr);
×
1302
      sdbRelease(pSdb, pSma);
×
1303
      sdbCancelFetch(pSdb, pIter);
×
1304
      return code;
×
1305
    }
1306

1307
    rsp->indexSize += sizeof(info) + pSma->exprLen + 1;
×
1308
    *exist = true;
×
1309

1310
    sdbRelease(pSdb, pSma);
×
1311
  }
1312

1313
  TAOS_RETURN(code);
×
1314
}
1315

1316
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
×
1317
  SUserIndexReq indexReq = {0};
×
1318
  SMnode       *pMnode = pReq->info.node;
×
1319
  int32_t       code = -1;
×
1320
  SUserIndexRsp rsp = {0};
×
1321
  bool          exist = false;
×
1322

1323
  TAOS_CHECK_GOTO(tDeserializeSUserIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
1324

1325
  code = mndGetSma(pMnode, &indexReq, &rsp, &exist);
×
1326
  if (code) {
×
1327
    goto _OVER;
×
1328
  }
1329

1330
  if (!exist) {
×
1331
    // TODO GET INDEX FROM FULLTEXT
1332
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
1333
  } else {
1334
    int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
×
1335
    void   *pRsp = rpcMallocCont(contLen);
×
1336
    if (pRsp == NULL) {
×
1337
      code = terrno;
×
1338
      goto _OVER;
×
1339
    }
1340

1341
    contLen = tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
×
1342
    if (contLen < 0) {
×
1343
      code = terrno;
×
1344
      goto _OVER;
×
1345
    }
1346

1347
    pReq->info.rsp = pRsp;
×
1348
    pReq->info.rspLen = contLen;
×
1349

1350
    code = 0;
×
1351
  }
1352

1353
_OVER:
×
1354
  if (code != 0) {
×
1355
    mError("failed to get index %s since %s", indexReq.indexFName, tstrerror(code));
×
1356
  }
1357

1358
  TAOS_RETURN(code);
×
1359
}
1360

1361
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
×
1362
  STableIndexReq indexReq = {0};
×
1363
  SMnode        *pMnode = pReq->info.node;
×
1364
  int32_t        code = -1;
×
1365
  STableIndexRsp rsp = {0};
×
1366
  bool           exist = false;
×
1367

1368
  TAOS_CHECK_GOTO(tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
1369

1370
  rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
×
1371
  if (NULL == rsp.pIndex) {
×
1372
    code = terrno;
×
1373
    goto _OVER;
×
1374
  }
1375

1376
  code = mndGetTableSma(pMnode, indexReq.tbFName, &rsp, &exist);
×
1377
  if (code) {
×
1378
    goto _OVER;
×
1379
  }
1380

1381
  if (!exist) {
×
1382
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
1383
  } else {
1384
    int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp);
×
1385
    void   *pRsp = rpcMallocCont(contLen);
×
1386
    if (pRsp == NULL) {
×
1387
      code = terrno;
×
1388
      goto _OVER;
×
1389
    }
1390

1391
    contLen = tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
×
1392
    if (contLen < 0) {
×
1393
      code = terrno;
×
1394
      goto _OVER;
×
1395
    }
1396

1397
    pReq->info.rsp = pRsp;
×
1398
    pReq->info.rspLen = contLen;
×
1399

1400
    code = 0;
×
1401
  }
1402

1403
_OVER:
×
1404
  if (code != 0) {
×
1405
    mError("failed to get table index %s since %s", indexReq.tbFName, tstrerror(code));
×
1406
  }
1407

1408
  tFreeSerializeSTableIndexRsp(&rsp);
×
1409
  TAOS_RETURN(code);
×
1410
}
1411

1412
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1413
  SMnode  *pMnode = pReq->info.node;
×
1414
  SSdb    *pSdb = pMnode->pSdb;
×
1415
  int32_t  numOfRows = 0;
×
1416
  SSmaObj *pSma = NULL;
×
1417
  int32_t  cols = 0;
×
1418
  int32_t  code = 0;
×
1419

1420
  SDbObj *pDb = NULL;
×
1421
  if (strlen(pShow->db) > 0) {
×
1422
    pDb = mndAcquireDb(pMnode, pShow->db);
×
1423
    if (pDb == NULL) return 0;
×
1424
  }
1425
  SSmaAndTagIter *pIter = pShow->pIter;
×
1426
  while (numOfRows < rows) {
×
1427
    pIter->pSmaIter = sdbFetch(pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
×
1428
    if (pIter->pSmaIter == NULL) break;
×
1429

1430
    if (NULL != pDb && pSma->dbUid != pDb->uid) {
×
1431
      sdbRelease(pSdb, pSma);
×
1432
      continue;
×
1433
    }
1434

1435
    cols = 0;
×
1436

1437
    SName smaName = {0};
×
1438
    SName stbName = {0};
×
1439
    char  n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1440
    char  n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1441
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1442
    char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1443
    if (TSDB_CODE_SUCCESS == code) {
×
1444
      STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
×
1445
      STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db));
×
1446
      code = tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1447
    }
1448
    SColumnInfoData *pColInfo = NULL;
×
1449
    if (TSDB_CODE_SUCCESS == code) {
×
1450
      STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
×
1451

1452
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1453
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
×
1454
    }
1455
    if (TSDB_CODE_SUCCESS == code) {
×
1456
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1457
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
×
1458
    }
1459
    if (TSDB_CODE_SUCCESS == code) {
×
1460
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1461
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
×
1462
    }
1463
    if (TSDB_CODE_SUCCESS == code) {
×
1464
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1465
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
×
1466
    }
1467
    if (TSDB_CODE_SUCCESS == code) {
×
1468
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1469
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
×
1470
    }
1471

1472
    char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1473
    STR_TO_VARSTR(col, (char *)"");
×
1474

1475
    if (TSDB_CODE_SUCCESS == code) {
×
1476
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1477
      code = colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
×
1478
    }
1479

1480
    if (TSDB_CODE_SUCCESS == code) {
×
1481
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1482

1483
      char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1484
      STR_TO_VARSTR(tag, (char *)"sma_index");
×
1485
      code = colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
×
1486
    }
1487

1488
    numOfRows++;
×
1489
    sdbRelease(pSdb, pSma);
×
1490
    if (TSDB_CODE_SUCCESS != code) {
×
1491
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
1492
      numOfRows = -1;
×
1493
      break;
×
1494
    }
1495
  }
1496

1497
  mndReleaseDb(pMnode, pDb);
×
1498
  pShow->numOfRows += numOfRows;
×
1499
  return numOfRows;
×
1500
}
1501

1502
// sma and tag index comm func
1503
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
×
1504
  int ret = mndProcessDropSmaReq(pReq);
×
1505
  if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST) {
×
1506
    terrno = 0;
×
1507
    ret = mndProcessDropTagIdxReq(pReq);
×
1508
  }
1509
  return ret;
×
1510
}
1511

1512
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1513
  if (pShow->pIter == NULL) {
×
1514
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
×
1515
  }
1516
  if (!pShow->pIter) {
×
1517
    return terrno;
×
1518
  }
1519
  int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
×
1520
  if (read < rows) {
×
1521
    read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read);
×
1522
  }
1523
  // no more to read
1524
  if (read < rows) {
×
1525
    taosMemoryFree(pShow->pIter);
×
1526
    pShow->pIter = NULL;
×
1527
  }
1528
  return read;
×
1529
}
1530
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
×
1531
  SSmaAndTagIter *p = pIter;
×
1532
  if (p != NULL) {
×
1533
    SSdb *pSdb = pMnode->pSdb;
×
1534
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
1535
    sdbCancelFetchByType(pSdb, p->pIdxIter, SDB_IDX);
×
1536
  }
1537
  taosMemoryFree(p);
×
1538
}
×
1539

1540
static void initSMAObj(SCreateTSMACxt *pCxt) {
×
1541
  memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
×
1542
  memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
×
1543
  memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
×
1544
  if (pCxt->pBaseSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pBaseSma->name, TSDB_TABLE_FNAME_LEN);
×
1545
  pCxt->pSma->createdTime = taosGetTimestampMs();
×
1546
  pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
×
1547

1548
  memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
1549
  pCxt->pSma->dstTbUid = 0;  // not used
×
1550
  pCxt->pSma->stbUid = pCxt->pSrcStb ? pCxt->pSrcStb->uid : pCxt->pCreateSmaReq->normSourceTbUid;
×
1551
  pCxt->pSma->dbUid = pCxt->pDb->uid;
×
1552
  pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
×
1553
  pCxt->pSma->intervalUnit = pCxt->pCreateSmaReq->intervalUnit;
×
1554
//  pCxt->pSma->timezone = taosGetLocalTimezoneOffset();
1555
  pCxt->pSma->version = 1;
×
1556

1557
  pCxt->pSma->exprLen = pCxt->pCreateSmaReq->exprLen;
×
1558
  pCxt->pSma->sqlLen = pCxt->pCreateSmaReq->sqlLen;
×
1559
  pCxt->pSma->astLen = pCxt->pCreateSmaReq->astLen;
×
1560
  pCxt->pSma->expr = pCxt->pCreateSmaReq->expr;
×
1561
  pCxt->pSma->sql = pCxt->pCreateSmaReq->sql;
×
1562
  pCxt->pSma->ast = pCxt->pCreateSmaReq->ast;
×
1563
}
×
1564

1565
static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
×
1566
  tstrncpy(pCxt->pCreateStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
×
1567
  tstrncpy(pCxt->pCreateStreamReq->sourceDB, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
×
1568
  tstrncpy(pCxt->pCreateStreamReq->targetStbFullName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
1569
  pCxt->pCreateStreamReq->igExists = false;
×
1570
  pCxt->pCreateStreamReq->triggerType = STREAM_TRIGGER_MAX_DELAY;
×
1571
  pCxt->pCreateStreamReq->igExpired = false;
×
1572
  pCxt->pCreateStreamReq->fillHistory = STREAM_FILL_HISTORY_ON;
×
1573
  pCxt->pCreateStreamReq->maxDelay = 10000;
×
1574
  pCxt->pCreateStreamReq->watermark = 0;
×
1575
  pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb ? pCxt->pSrcStb->numOfTags + 1 : 1;
×
1576
  pCxt->pCreateStreamReq->checkpointFreq = 0;
×
1577
  pCxt->pCreateStreamReq->createStb = 1;
×
1578
  pCxt->pCreateStreamReq->targetStbUid = 0;
×
1579
  pCxt->pCreateStreamReq->fillNullCols = NULL;
×
1580
  pCxt->pCreateStreamReq->igUpdate = 0;
×
1581
  pCxt->pCreateStreamReq->deleteMark = pCxt->pCreateSmaReq->deleteMark;
×
1582
  pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
×
1583
  pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid;
×
1584
  pCxt->pCreateStreamReq->ast = taosStrdup(pCxt->pCreateSmaReq->ast);
×
1585
  if (!pCxt->pCreateStreamReq->ast) {
×
1586
    return terrno;
×
1587
  }
1588
  pCxt->pCreateStreamReq->sql = taosStrdup(pCxt->pCreateSmaReq->sql);
×
1589
  if (!pCxt->pCreateStreamReq->sql) {
×
1590
    return terrno;
×
1591
  }
1592

1593
  // construct tags
1594
  pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pCreateStreamReq->numOfTags, sizeof(SField));
×
1595
  if (!pCxt->pCreateStreamReq->pTags) {
×
1596
    return terrno;
×
1597
  }
1598
  SField  f = {0};
×
1599
  int32_t code = 0;
×
1600
  if (pCxt->pSrcStb) {
×
1601
    for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) {
×
1602
      SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
×
1603
      f.bytes = pSchema->bytes;
×
1604
      f.type = pSchema->type;
×
1605
      f.flags = pSchema->flags;
×
1606
      tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN);
×
1607
      if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
×
1608
        code = terrno;
×
1609
        break;
×
1610
      }
1611
    }
1612
  }
1613

1614
  if (TSDB_CODE_SUCCESS == code) {
×
1615
    f.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
×
1616
    f.flags = COL_SMA_ON;
×
1617
    f.type = TSDB_DATA_TYPE_BINARY;
×
1618
    tstrncpy(f.name, "tbname", strlen("tbname") + 1);
×
1619
    if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
×
1620
      code = terrno;
×
1621
    }
1622
  }
1623

1624
  if (TSDB_CODE_SUCCESS == code) {
×
1625
    // construct output cols
1626
    SNode *pNode;
1627
    FOREACH(pNode, pCxt->pProjects) {
×
1628
      SExprNode *pExprNode = (SExprNode *)pNode;
×
1629
      f.bytes = pExprNode->resType.bytes;
×
1630
      f.type = pExprNode->resType.type;
×
1631
      f.flags = COL_SMA_ON;
×
1632
      tstrncpy(f.name, pExprNode->userAlias, TSDB_COL_NAME_LEN);
×
1633
      if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pCols, &f)) {
×
1634
        code = terrno;
×
1635
        break;
×
1636
      }
1637
    }
1638
  }
1639
  return code;
×
1640
}
1641

1642
static int32_t mndCreateTSMABuildDropStreamReq(SCreateTSMACxt *pCxt) {
×
1643
  tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
×
1644
  pCxt->pDropStreamReq->igNotExists = false;
×
1645
  pCxt->pDropStreamReq->sql = taosStrdup(pCxt->pDropSmaReq->name);
×
1646
  if (!pCxt->pDropStreamReq->sql) {
×
1647
    return terrno;
×
1648
  }
1649
  pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
×
1650
  return TSDB_CODE_SUCCESS;
×
1651
}
1652

1653
static int32_t mndSetUpdateDbTsmaVersionPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
×
1654
  int32_t  code = 0;
×
1655
  SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
×
1656
  if (pRedoRaw == NULL) {
×
1657
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1658
    if (terrno != 0) code = terrno;
×
1659
    TAOS_RETURN(code);
×
1660
  }
1661
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
×
1662
    sdbFreeRaw(pRedoRaw);
×
1663
    TAOS_RETURN(code);
×
1664
  }
1665

1666
  TAOS_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
×
1667
}
1668

1669
static int32_t mndSetUpdateDbTsmaVersionCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
×
1670
  int32_t  code = 0;
×
1671
  SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
×
1672
  if (pCommitRaw == NULL) {
×
1673
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1674
    if (terrno != 0) code = terrno;
×
1675
    TAOS_RETURN(code);
×
1676
  }
1677
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
1678
    sdbFreeRaw(pCommitRaw);
×
1679
    TAOS_RETURN(code);
×
1680
  }
1681

1682
  TAOS_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
1683
}
1684

1685
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt *pCxt) {
×
1686
  int32_t      code = -1;
×
1687
  STransAction createStreamRedoAction = {0};
×
1688
  STransAction createStreamUndoAction = {0};
×
1689
  STransAction dropStbUndoAction = {0};
×
1690
  SMDropStbReq dropStbReq = {0};
×
1691
  STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
×
1692
  if (!pTrans) {
×
1693
    code = terrno;
×
1694
    goto _OVER;
×
1695
  }
1696
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
×
1697
  TAOS_CHECK_GOTO(mndTransCheckConflict(pCxt->pMnode, pTrans), NULL, _OVER);
×
1698

1699
  mndTransSetSerial(pTrans);
×
1700
  mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name,
×
1701
        pCxt->pCreateStreamReq->name);
1702

1703
  mndGetMnodeEpSet(pCxt->pMnode, &createStreamRedoAction.epSet);
×
1704
  createStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
1705
  createStreamRedoAction.msgType = TDMT_STREAM_CREATE;
×
1706
  createStreamRedoAction.contLen = tSerializeSCMCreateStreamReq(0, 0, pCxt->pCreateStreamReq);
×
1707
  createStreamRedoAction.pCont = taosMemoryCalloc(1, createStreamRedoAction.contLen);
×
1708
  if (!createStreamRedoAction.pCont) {
×
1709
    code = terrno;
×
1710
    goto _OVER;
×
1711
  }
1712
  if (createStreamRedoAction.contLen != tSerializeSCMCreateStreamReq(createStreamRedoAction.pCont,
×
1713
                                                                     createStreamRedoAction.contLen,
1714
                                                                     pCxt->pCreateStreamReq)) {
×
1715
    mError("sma: %s, failed to create due to create stream req encode failure", pCxt->pCreateSmaReq->name);
×
1716
    code = TSDB_CODE_INVALID_MSG;
×
1717
    goto _OVER;
×
1718
  }
1719

1720
  createStreamUndoAction.epSet = createStreamRedoAction.epSet;
×
1721
  createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
1722
  createStreamUndoAction.msgType = TDMT_STREAM_DROP;
×
1723
  createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
×
1724
  createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
×
1725
  if (!createStreamUndoAction.pCont) {
×
1726
    code = terrno;
×
1727
    goto _OVER;
×
1728
  }
1729
  if (createStreamUndoAction.contLen !=
×
1730
      tSerializeSMDropStreamReq(createStreamUndoAction.pCont, createStreamUndoAction.contLen, pCxt->pDropStreamReq)) {
×
1731
    mError("sma: %s, failed to create due to drop stream req encode failure", pCxt->pCreateSmaReq->name);
×
1732
    code = TSDB_CODE_INVALID_MSG;
×
1733
    goto _OVER;
×
1734
  }
1735

1736
  dropStbReq.igNotExists = true;
×
1737
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
1738
  dropStbUndoAction.epSet = createStreamRedoAction.epSet;
×
1739
  dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
×
1740
  dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
1741
  dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
×
1742
  dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
×
1743
  dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
×
1744
  if (!dropStbUndoAction.pCont) {
×
1745
    code = terrno;
×
1746
    goto _OVER;
×
1747
  }
1748
  if (dropStbUndoAction.contLen !=
×
1749
      tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
×
1750
    mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
×
1751
    code = TSDB_CODE_INVALID_MSG;
×
1752
    goto _OVER;
×
1753
  }
1754

1755
  SDbObj newDb = {0};
×
1756
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
×
1757
  newDb.tsmaVersion++;
×
1758
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1759
  TAOS_CHECK_GOTO(mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1760
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
1761
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &createStreamRedoAction), NULL, _OVER);
×
1762
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &createStreamUndoAction), NULL, _OVER);
×
1763
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &dropStbUndoAction), NULL, _OVER);
×
1764
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1765
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
1766
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
×
1767

1768
  code = TSDB_CODE_SUCCESS;
×
1769

1770
_OVER:
×
1771
  mndTransDrop(pTrans);
×
1772
  TAOS_RETURN(code);
×
1773
}
1774

1775
static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
×
1776
  int32_t            code = 0;
×
1777
  SSmaObj            sma = {0};
×
1778
  SCMCreateStreamReq createStreamReq = {0};
×
1779
  SMDropStreamReq    dropStreamReq = {0};
×
1780

1781
  pCxt->pSma = &sma;
×
1782
  initSMAObj(pCxt);
×
1783

1784
  SNodeList *pProjects = NULL;
×
1785
  code = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects);
×
1786
  if (TSDB_CODE_SUCCESS != code) {
×
1787
    goto _OVER;
×
1788
  }
1789
  pCxt->pProjects = pProjects;
×
1790

1791
  pCxt->pCreateStreamReq = &createStreamReq;
×
1792
  if (pCxt->pCreateSmaReq->pVgroupVerList) {
×
1793
    pCxt->pCreateStreamReq->pVgroupVerList = taosArrayDup(pCxt->pCreateSmaReq->pVgroupVerList, NULL);
×
1794
    if (!pCxt->pCreateStreamReq->pVgroupVerList) {
×
1795
      code = terrno;
×
1796
      goto _OVER;
×
1797
    }
1798
  }
1799
  if (LIST_LENGTH(pProjects) > 0) {
×
1800
    createStreamReq.pCols = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SField));
×
1801
    if (!createStreamReq.pCols) {
×
1802
      code = terrno;
×
1803
      goto _OVER;
×
1804
    }
1805
  }
1806
  pCxt->pDropStreamReq = &dropStreamReq;
×
1807
  code = mndCreateTSMABuildCreateStreamReq(pCxt);
×
1808
  if (TSDB_CODE_SUCCESS != code) {
×
1809
    goto _OVER;
×
1810
  }
1811
  code = mndCreateTSMABuildDropStreamReq(pCxt);
×
1812
  if (TSDB_CODE_SUCCESS != code) {
×
1813
    goto _OVER;
×
1814
  }
1815

1816
  if (TSDB_CODE_SUCCESS != (code = mndCreateTSMATxnPrepare(pCxt))) {
×
1817
    goto _OVER;
×
1818
  } else {
1819
    mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 " dstTb:%s dstVg:%d", pCxt->pCreateSmaReq->name, sma.uid,
×
1820
          sma.stbUid, sma.dstTbName, sma.dstVgId);
1821
    code = 0;
×
1822
  }
1823

1824
_OVER:
×
1825
  tFreeSCMCreateStreamReq(pCxt->pCreateStreamReq);
×
1826
  if (pCxt->pDropStreamReq) tFreeMDropStreamReq(pCxt->pDropStreamReq);
×
1827
  pCxt->pCreateStreamReq = NULL;
×
1828
  if (pProjects) nodesDestroyList(pProjects);
×
1829
  pCxt->pProjects = NULL;
×
1830
  TAOS_RETURN(code);
×
1831
}
1832

1833
static int32_t mndTSMAGenerateOutputName(const char *tsmaName, char *streamName, char *targetStbName) {
×
1834
  SName   smaName;
1835
  int32_t code = tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1836
  if (TSDB_CODE_SUCCESS != code) {
×
1837
    return code;
×
1838
  }
1839
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
×
1840
  snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s"TSMA_RES_STB_POSTFIX, tsmaName);
×
1841
  return TSDB_CODE_SUCCESS;
×
1842
}
1843

1844
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq) {
×
1845
#ifdef WINDOWS
1846
  TAOS_RETURN(TSDB_CODE_MND_INVALID_PLATFORM);
1847
#endif
1848
  SMnode        *pMnode = pReq->info.node;
×
1849
  int32_t        code = -1;
×
1850
  SDbObj        *pDb = NULL;
×
1851
  SStbObj       *pStb = NULL;
×
1852
  SSmaObj       *pSma = NULL;
×
1853
  SSmaObj       *pBaseTsma = NULL;
×
1854
  SStreamObj    *pStream = NULL;
×
1855
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
×
1856
  SMCreateSmaReq createReq = {0};
×
1857

1858
  if (sdbGetSize(pMnode->pSdb, SDB_SMA) >= tsMaxTsmaNum) {
×
1859
    code = TSDB_CODE_MND_MAX_TSMA_NUM_EXCEEDED;
×
1860
    goto _OVER;
×
1861
  }
1862

1863
  if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
×
1864
    code = TSDB_CODE_INVALID_MSG;
×
1865
    goto _OVER;
×
1866
  }
1867

1868
  mInfo("start to create tsma: %s", createReq.name);
×
1869
  if ((code = mndCheckCreateSmaReq(&createReq)) != 0) goto _OVER;
×
1870

1871
  if (createReq.normSourceTbUid == 0) {
×
1872
    pStb = mndAcquireStb(pMnode, createReq.stb);
×
1873
    if (!pStb && !createReq.recursiveTsma) {
×
1874
      mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
1875
      code = TSDB_CODE_MND_STB_NOT_EXIST;
×
1876
      goto _OVER;
×
1877
    }
1878
  }
1879

1880
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1881
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
×
1882
  code = mndTSMAGenerateOutputName(createReq.name, streamName, streamTargetStbFullName);
×
1883
  if (TSDB_CODE_SUCCESS != code) {
×
1884
    mInfo("tsma:%s, faield to generate name", createReq.name);
×
1885
    goto _OVER;
×
1886
  }
1887

1888
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.name);
×
1889
  if (pSma && createReq.igExists) {
×
1890
    mInfo("tsma:%s, already exists in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
1891
    code = 0;
×
1892
    goto _OVER;
×
1893
  }
1894

1895
  if (pSma) {
×
1896
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
1897
    goto _OVER;
×
1898
  }
1899

1900
  SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName);
×
1901
  if (pTargetStb) {
×
1902
    code = TSDB_CODE_TDB_STB_ALREADY_EXIST;
×
1903
    mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name,
×
1904
           streamTargetStbFullName);
1905
    goto _OVER;
×
1906
  }
1907

1908
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
1909
  if (pStream != NULL || code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
×
1910
    mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
×
1911
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
1912
    goto _OVER;
×
1913
  }
1914

1915
  SName name = {0};
×
1916
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1917
  if (TSDB_CODE_SUCCESS != code) {
×
1918
    goto _OVER;
×
1919
  }
1920
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
1921
  (void)tNameGetFullDbName(&name, db);
×
1922

1923
  pDb = mndAcquireDb(pMnode, db);
×
1924
  if (pDb == NULL) {
×
1925
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1926
    goto _OVER;
×
1927
  }
1928

1929
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
1930

1931
  if (createReq.recursiveTsma) {
×
1932
    pBaseTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName);
×
1933
    if (!pBaseTsma) {
×
1934
      mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName);
×
1935
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1936
      goto _OVER;
×
1937
    }
1938
    if (!pStb) {
×
1939
      createReq.normSourceTbUid = pBaseTsma->stbUid;
×
1940
    }
1941
  }
1942

1943
  SCreateTSMACxt cxt = {
×
1944
      .pMnode = pMnode,
1945
      .pCreateSmaReq = &createReq,
1946
      .pCreateStreamReq = NULL,
1947
      .streamName = streamName,
1948
      .targetStbFullName = streamTargetStbFullName,
1949
      .pDb = pDb,
1950
      .pRpcReq = pReq,
1951
      .pSma = NULL,
1952
      .pBaseSma = pBaseTsma,
1953
      .pSrcStb = pStb,
1954
  };
1955

1956
  code = mndCreateTSMA(&cxt);
×
1957
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1958

1959
_OVER:
×
1960
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1961
    mError("tsma:%s, failed to create since %s", createReq.name, tstrerror(code));
×
1962
  }
1963

1964
  if (pStb) mndReleaseStb(pMnode, pStb);
×
1965
  if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
1966
  mndReleaseSma(pMnode, pSma);
×
1967
  mndReleaseStream(pMnode, pStream);
×
1968
  mndReleaseDb(pMnode, pDb);
×
1969
  tFreeSMCreateSmaReq(&createReq);
×
1970

1971
  TAOS_RETURN(code);
×
1972
}
1973

1974
static int32_t mndDropTSMA(SCreateTSMACxt *pCxt) {
×
1975
  int32_t      code = -1;
×
1976
  STransAction dropStreamRedoAction = {0};
×
1977
  STrans      *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "drop-tsma");
×
1978
  if (!pTrans) {
×
1979
    code = terrno;
×
1980
    goto _OVER;
×
1981
  }
1982
  SMDropStreamReq dropStreamReq = {0};
×
1983
  pCxt->pDropStreamReq = &dropStreamReq;
×
1984
  code = mndCreateTSMABuildDropStreamReq(pCxt);
×
1985
  if (TSDB_CODE_SUCCESS != code) {
×
1986
    goto _OVER;
×
1987
  }
1988
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
×
1989
  if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
×
1990
  mndTransSetSerial(pTrans);
×
1991
  mndGetMnodeEpSet(pCxt->pMnode, &dropStreamRedoAction.epSet);
×
1992
  dropStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
1993
  dropStreamRedoAction.msgType = TDMT_STREAM_DROP;
×
1994
  dropStreamRedoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
×
1995
  dropStreamRedoAction.pCont = taosMemoryCalloc(1, dropStreamRedoAction.contLen);
×
1996
  if (!dropStreamRedoAction.pCont) {
×
1997
    code = terrno;
×
1998
    goto _OVER;
×
1999
  }
2000
  if (dropStreamRedoAction.contLen !=
×
2001
      tSerializeSMDropStreamReq(dropStreamRedoAction.pCont, dropStreamRedoAction.contLen, pCxt->pDropStreamReq)) {
×
2002
    mError("tsma: %s, failed to drop due to drop stream req encode failure", pCxt->pDropSmaReq->name);
×
2003
    code = TSDB_CODE_INVALID_MSG;
×
2004
    goto _OVER;
×
2005
  }
2006

2007
  // output stable is not dropped when dropping stream, dropping it when dropping tsma
2008
  SMDropStbReq dropStbReq = {0};
×
2009
  dropStbReq.igNotExists = false;
×
2010
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
2011
  dropStbReq.sql = "drop";
×
2012
  dropStbReq.sqlLen = 5;
×
2013

2014
  STransAction dropStbRedoAction = {0};
×
2015
  mndGetMnodeEpSet(pCxt->pMnode, &dropStbRedoAction.epSet);
×
2016
  dropStbRedoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
×
2017
  dropStbRedoAction.msgType = TDMT_MND_STB_DROP;
×
2018
  dropStbRedoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
×
2019
  dropStbRedoAction.pCont = taosMemoryCalloc(1, dropStbRedoAction.contLen);
×
2020
  if (!dropStbRedoAction.pCont) {
×
2021
    code = terrno;
×
2022
    goto _OVER;
×
2023
  }
2024
  if (dropStbRedoAction.contLen !=
×
2025
      tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
×
2026
    mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name);
×
2027
    code = TSDB_CODE_INVALID_MSG;
×
2028
    goto _OVER;
×
2029
  }
2030

2031
  SDbObj newDb = {0};
×
2032
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
×
2033
  newDb.tsmaVersion++;
×
2034
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
2035
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
2036
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStreamRedoAction), NULL, _OVER);
×
2037
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStbRedoAction), NULL, _OVER);
×
2038
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
2039
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
2040
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
×
2041
  code = TSDB_CODE_SUCCESS;
×
2042
_OVER:
×
2043
  tFreeMDropStreamReq(pCxt->pDropStreamReq);
×
2044
  mndTransDrop(pTrans);
×
2045
  TAOS_RETURN(code);
×
2046
}
2047

2048
static bool hasRecursiveTsmasBasedOnMe(SMnode *pMnode, const SSmaObj *pSma) {
×
2049
  SSmaObj *pSmaObj = NULL;
×
2050
  void    *pIter = NULL;
×
2051
  while (1) {
2052
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj);
×
2053
    if (pIter == NULL) break;
×
2054
    if (0 == strncmp(pSmaObj->baseSmaName, pSma->name, TSDB_TABLE_FNAME_LEN)) {
×
2055
      sdbRelease(pMnode->pSdb, pSmaObj);
×
2056
      sdbCancelFetch(pMnode->pSdb, pIter);
×
2057
      return true;
×
2058
    }
2059
    sdbRelease(pMnode->pSdb, pSmaObj);
×
2060
  }
2061
  return false;
×
2062
}
2063

2064
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq) {
×
2065
  int32_t      code = -1;
×
2066
  SMDropSmaReq dropReq = {0};
×
2067
  SSmaObj     *pSma = NULL;
×
2068
  SDbObj      *pDb = NULL;
×
2069
  SMnode      *pMnode = pReq->info.node;
×
2070
  if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) {
×
2071
    code = TSDB_CODE_INVALID_MSG;
×
2072
    goto _OVER;
×
2073
  }
2074

2075
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
2076
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
×
2077
  code = mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
×
2078
  if (TSDB_CODE_SUCCESS != code) {
×
2079
    goto _OVER;
×
2080
  }
2081

2082
  SStbObj *pStb = mndAcquireStb(pMnode, streamTargetStbFullName);
×
2083

2084
  pSma = mndAcquireSma(pMnode, dropReq.name);
×
2085
  if (!pSma && dropReq.igNotExists) {
×
2086
    code = 0;
×
2087
    goto _OVER;
×
2088
  }
2089
  if (!pSma) {
×
2090
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
2091
    goto _OVER;
×
2092
  }
2093
  SName name = {0};
×
2094
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2095
  if (TSDB_CODE_SUCCESS != code) {
×
2096
    goto _OVER;
×
2097
  }
2098
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
2099
  (void)tNameGetFullDbName(&name, db);
×
2100

2101
  pDb = mndAcquireDb(pMnode, db);
×
2102
  if (!pDb) {
×
2103
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
2104
    goto _OVER;
×
2105
  }
2106

2107
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
×
2108
    goto _OVER;
×
2109
  }
2110

2111
  if (hasRecursiveTsmasBasedOnMe(pMnode, pSma)) {
×
2112
    code = TSDB_CODE_MND_INVALID_DROP_TSMA;
×
2113
    goto _OVER;
×
2114
  }
2115

2116
  SCreateTSMACxt cxt = {
×
2117
      .pDb = pDb,
2118
      .pMnode = pMnode,
2119
      .pRpcReq = pReq,
2120
      .pSma = pSma,
2121
      .streamName = streamName,
2122
      .targetStbFullName = streamTargetStbFullName,
2123
      .pDropSmaReq = &dropReq,
2124
  };
2125

2126
  code = mndDropTSMA(&cxt);
×
2127

2128
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
2129
_OVER:
×
2130

2131
  mndReleaseStb(pMnode, pStb);
×
2132
  mndReleaseSma(pMnode, pSma);
×
2133
  mndReleaseDb(pMnode, pDb);
×
2134
  TAOS_RETURN(code);
×
2135
}
2136

2137
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
2138
  SDbObj          *pDb = NULL;
×
2139
  int32_t          numOfRows = 0;
×
2140
  SSmaObj         *pSma = NULL;
×
2141
  SMnode          *pMnode = pReq->info.node;
×
2142
  int32_t          code = 0;
×
2143
  SColumnInfoData *pColInfo;
2144
  if (pShow->pIter == NULL) {
×
2145
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
×
2146
  }
2147
  if (!pShow->pIter) {
×
2148
    return terrno;
×
2149
  }
2150
  if (pShow->db[0]) {
×
2151
    pDb = mndAcquireDb(pMnode, pShow->db);
×
2152
  }
2153
  SSmaAndTagIter *pIter = pShow->pIter;
×
2154
  while (numOfRows < rows) {
×
2155
    pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
×
2156
    if (pIter->pSmaIter == NULL) break;
×
2157
    SDbObj *pSrcDb = mndAcquireDb(pMnode, pSma->db);
×
2158

2159
    if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) {
×
2160
      sdbRelease(pMnode->pSdb, pSma);
×
2161
      if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
2162
      continue;
×
2163
    }
2164

2165
    int32_t cols = 0;
×
2166
    SName   n = {0};
×
2167

2168
    code = tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2169
    char smaName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
2170
    if (TSDB_CODE_SUCCESS == code) {
×
2171
      STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n));
×
2172
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2173
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
2174
    }
2175

2176
    char db[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
2177
    if (TSDB_CODE_SUCCESS == code) {
×
2178
      STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
×
2179
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2180
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
2181
    }
2182

2183
    if (TSDB_CODE_SUCCESS == code) {
×
2184
      code = tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2185
    }
2186
    char srcTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
2187
    if (TSDB_CODE_SUCCESS == code) {
×
2188
      STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
×
2189
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2190
      code = colDataSetVal(pColInfo, numOfRows, (const char *)srcTb, false);
×
2191
    }
2192

2193
    if (TSDB_CODE_SUCCESS == code) {
×
2194
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2195
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
2196
    }
2197

2198
    if (TSDB_CODE_SUCCESS == code) {
×
2199
      code = tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2200
    }
2201

2202
    if (TSDB_CODE_SUCCESS == code) {
×
2203
      char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
2204
      STR_TO_VARSTR(targetTb, (char *)tNameGetTableName(&n));
×
2205
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2206
      code = colDataSetVal(pColInfo, numOfRows, (const char *)targetTb, false);
×
2207
    }
2208

2209
    if (TSDB_CODE_SUCCESS == code) {
×
2210
      // stream name
2211
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2212
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
2213
    }
2214

2215
    if (TSDB_CODE_SUCCESS == code) {
×
2216
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2217
      code = colDataSetVal(pColInfo, numOfRows, (const char *)(&pSma->createdTime), false);
×
2218
    }
2219

2220
    // interval
2221
    char    interval[64 + VARSTR_HEADER_SIZE] = {0};
×
2222
    int32_t len = 0;
×
2223
    if (TSDB_CODE_SUCCESS == code) {
×
2224
      if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
×
2225
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
×
2226
                        getPrecisionUnit(pSrcDb->cfg.precision));
×
2227
      } else {
2228
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
×
2229
      }
2230
      varDataSetLen(interval, len);
×
2231
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2232
      code = colDataSetVal(pColInfo, numOfRows, interval, false);
×
2233
    }
2234

2235
    char buf[TSDB_MAX_SAVED_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
2236
    if (TSDB_CODE_SUCCESS == code) {
×
2237
      // create sql
2238
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2239
      len = tsnprintf(buf + VARSTR_HEADER_SIZE, TSDB_MAX_SAVED_SQL_LEN, "%s", pSma->sql);
×
2240
      varDataSetLen(buf, TMIN(len, TSDB_MAX_SAVED_SQL_LEN));
×
2241
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
2242
    }
2243

2244
    // func list
2245
    len = 0;
×
2246
    SNode *pNode = NULL, *pFunc = NULL;
×
2247
    if (TSDB_CODE_SUCCESS == code) {
×
2248
      code = nodesStringToNode(pSma->ast, &pNode);
×
2249
    }
2250
    if (TSDB_CODE_SUCCESS == code) {
×
2251
      char *start = buf + VARSTR_HEADER_SIZE;
×
2252
      FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) {
×
2253
        if (nodeType(pFunc) == QUERY_NODE_FUNCTION) {
×
2254
          SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
×
2255
          if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
2256
          len += tsnprintf(start, TSDB_MAX_SAVED_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "",
×
2257
                           ((SExprNode *)pFunc)->userAlias);
×
2258
          if (len >= TSDB_MAX_SAVED_SQL_LEN) {
×
2259
            len = TSDB_MAX_SAVED_SQL_LEN;
×
2260
            break;
×
2261
          }
2262
          start = buf + VARSTR_HEADER_SIZE + len;
×
2263
        }
2264
      }
2265
      nodesDestroyNode(pNode);
×
2266
    }
2267

2268
    if (TSDB_CODE_SUCCESS == code) {
×
2269
      varDataSetLen(buf, len);
×
2270
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2271
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
2272
    }
2273

2274
    numOfRows++;
×
2275
    mndReleaseSma(pMnode, pSma);
×
2276
    mndReleaseDb(pMnode, pSrcDb);
×
2277
    if (TSDB_CODE_SUCCESS != code) {
×
2278
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
2279
      numOfRows = code;
×
2280
      break;
×
2281
    }
2282
  }
2283
  mndReleaseDb(pMnode, pDb);
×
2284
  pShow->numOfRows += numOfRows;
×
2285
  if (numOfRows < rows) {
×
2286
    taosMemoryFree(pShow->pIter);
×
2287
    pShow->pIter = NULL;
×
2288
  }
2289
  return numOfRows;
×
2290
}
2291

2292
static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
×
2293
  SSmaAndTagIter *p = pIter;
×
2294
  if (p != NULL) {
×
2295
    SSdb *pSdb = pMnode->pSdb;
×
2296
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
2297
  }
2298
  taosMemoryFree(p);
×
2299
}
×
2300

2301
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj *pSma, const SStbObj *pDestStb, STableTSMAInfo *pInfo,
×
2302
                               const SSmaObj *pBaseTsma) {
2303
  int32_t code = 0;
×
2304
  pInfo->interval = pSma->interval;
×
2305
  pInfo->unit = pSma->intervalUnit;
×
2306
  pInfo->tsmaId = pSma->uid;
×
2307
  pInfo->version = pSma->version;
×
2308
  pInfo->tsmaId = pSma->uid;
×
2309
  pInfo->destTbUid = pDestStb->uid;
×
2310
  SName sName = {0};
×
2311
  code = tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2312
  if (TSDB_CODE_SUCCESS != code) {
×
2313
    return code;
×
2314
  }
2315
  tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN);
×
2316
  tstrncpy(pInfo->targetDbFName, pSma->db, TSDB_DB_FNAME_LEN);
×
2317
  code = tNameFromString(&sName, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2318
  if (TSDB_CODE_SUCCESS != code) {
×
2319
    return code;
×
2320
  }
2321
  tstrncpy(pInfo->targetTb, sName.tname, TSDB_TABLE_NAME_LEN);
×
2322
  tstrncpy(pInfo->dbFName, pSma->db, TSDB_DB_FNAME_LEN);
×
2323
  code = tNameFromString(&sName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2324
  if (TSDB_CODE_SUCCESS != code) {
×
2325
    return code;
×
2326
  }
2327
  tstrncpy(pInfo->tb, sName.tname, TSDB_TABLE_NAME_LEN);
×
2328
  pInfo->pFuncs = taosArrayInit(8, sizeof(STableTSMAFuncInfo));
×
2329
  if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY;
×
2330

2331
  SNode *pNode, *pFunc;
2332
  if (TSDB_CODE_SUCCESS != nodesStringToNode(pBaseTsma ? pBaseTsma->ast : pSma->ast, &pNode)) {
×
2333
    taosArrayDestroy(pInfo->pFuncs);
×
2334
    pInfo->pFuncs = NULL;
×
2335
    return TSDB_CODE_TSMA_INVALID_STAT;
×
2336
  }
2337
  if (pNode) {
×
2338
    SSelectStmt *pSelect = (SSelectStmt *)pNode;
×
2339
    FOREACH(pFunc, pSelect->pProjectionList) {
×
2340
      STableTSMAFuncInfo funcInfo = {0};
×
2341
      SFunctionNode     *pFuncNode = (SFunctionNode *)pFunc;
×
2342
      if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
2343
      funcInfo.funcId = pFuncNode->funcId;
×
2344
      funcInfo.colId = ((SColumnNode *)pFuncNode->pParameterList->pHead->pNode)->colId;
×
2345
      if (!taosArrayPush(pInfo->pFuncs, &funcInfo)) {
×
2346
        code = terrno;
×
2347
        taosArrayDestroy(pInfo->pFuncs);
×
2348
        nodesDestroyNode(pNode);
×
2349
        return code;
×
2350
      }
2351
    }
2352
    nodesDestroyNode(pNode);
×
2353
  }
2354
  pInfo->ast = taosStrdup(pSma->ast);
×
2355
  if (!pInfo->ast) code = terrno;
×
2356

2357
  if (code == TSDB_CODE_SUCCESS && pDestStb->numOfTags > 0) {
×
2358
    pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema));
×
2359
    if (!pInfo->pTags) {
×
2360
      code = terrno;
×
2361
    } else {
2362
      for (int32_t i = 0; i < pDestStb->numOfTags; ++i) {
×
2363
        if (NULL == taosArrayPush(pInfo->pTags, &pDestStb->pTags[i])) {
×
2364
          code = terrno;
×
2365
          break;
×
2366
        }
2367
      }
2368
    }
2369
  }
2370
  if (code == TSDB_CODE_SUCCESS) {
×
2371
    pInfo->pUsedCols = taosArrayInit(pDestStb->numOfColumns - 3, sizeof(SSchema));
×
2372
    if (!pInfo->pUsedCols)
×
2373
      code = terrno;
×
2374
    else {
2375
      // skip _wstart, _wend, _duration
2376
      for (int32_t i = 1; i < pDestStb->numOfColumns - 2; ++i) {
×
2377
        if (NULL == taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i])) {
×
2378
          code = terrno;
×
2379
          break;
×
2380
        }
2381
      }
2382
    }
2383
  }
2384
  TAOS_RETURN(code);
×
2385
}
2386

2387
// @note remember to mndReleaseSma(*ppOut)
2388
static int32_t mndGetDeepestBaseForTsma(SMnode *pMnode, SSmaObj *pSma, SSmaObj **ppOut) {
×
2389
  int32_t  code = 0;
×
2390
  SSmaObj *pRecursiveTsma = NULL;
×
2391
  if (pSma->baseSmaName[0]) {
×
2392
    pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName);
×
2393
    if (!pRecursiveTsma) {
×
2394
      mError("base tsma: %s for tsma: %s not found", pSma->baseSmaName, pSma->name);
×
2395
      return TSDB_CODE_MND_SMA_NOT_EXIST;
×
2396
    }
2397
    while (pRecursiveTsma->baseSmaName[0]) {
×
2398
      SSmaObj *pTmpSma = pRecursiveTsma;
×
2399
      pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
×
2400
      if (!pRecursiveTsma) {
×
2401
        mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name);
×
2402
        mndReleaseSma(pMnode, pTmpSma);
×
2403
        return TSDB_CODE_MND_SMA_NOT_EXIST;
×
2404
      }
2405
      mndReleaseSma(pMnode, pTmpSma);
×
2406
    }
2407
  }
2408
  *ppOut = pRecursiveTsma;
×
2409
  return code;
×
2410
}
2411

2412
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
×
2413
  int32_t  code = -1;
×
2414
  SSmaObj *pSma = NULL;
×
2415
  SSmaObj *pBaseTsma = NULL;
×
2416
  SStbObj *pDstStb = NULL;
×
2417

2418
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName);
×
2419
  if (pSma) {
×
2420
    pDstStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
2421
    if (!pDstStb) {
×
2422
      sdbRelease(pMnode->pSdb, pSma);
×
2423
      return TSDB_CODE_SUCCESS;
×
2424
    }
2425

2426
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2427
    if (!pTsma) {
×
2428
      code = terrno;
×
2429
      sdbRelease(pMnode->pSdb, pSma);
×
2430
      mndReleaseStb(pMnode, pDstStb);
×
2431
      TAOS_RETURN(code);
×
2432
    }
2433

2434
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
×
2435
    if (code == 0) {
×
2436
      code = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma, pBaseTsma);
×
2437
    }
2438
    mndReleaseStb(pMnode, pDstStb);
×
2439
    sdbRelease(pMnode->pSdb, pSma);
×
2440
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
2441
    if (terrno) {
×
2442
      tFreeAndClearTableTSMAInfo(pTsma);
×
2443
      TAOS_RETURN(code);
×
2444
    }
2445
    if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
×
2446
      code = terrno;
×
2447
      tFreeAndClearTableTSMAInfo(pTsma);
×
2448
    }
2449
    *exist = true;
×
2450
  }
2451
  TAOS_RETURN(code);
×
2452
}
2453

2454
typedef bool (*tsmaFilter)(const SSmaObj *pSma, void *param);
2455

2456
static int32_t mndGetSomeTsmas(SMnode *pMnode, STableTSMAInfoRsp *pRsp, tsmaFilter filtered, void *param, bool *exist) {
×
2457
  int32_t     code = 0;
×
2458
  SSmaObj    *pSma = NULL;
×
2459
  SSmaObj    *pBaseTsma = NULL;
×
2460
  SSdb       *pSdb = pMnode->pSdb;
×
2461
  void       *pIter = NULL;
×
2462
  SStreamObj *pStream = NULL;
×
2463
  SStbObj    *pStb = NULL;
×
2464
  bool        shouldRetry = false;
×
2465

2466
  while (1) {
×
2467
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
×
2468
    if (pIter == NULL) break;
×
2469

2470
    if (filtered(pSma, param)) {
×
2471
      sdbRelease(pSdb, pSma);
×
2472
      continue;
×
2473
    }
2474

2475
    pStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
2476
    if (!pStb) {
×
2477
      sdbRelease(pSdb, pSma);
×
2478
      shouldRetry = true;
×
2479
      continue;
×
2480
    }
2481

2482
    SName smaName;
2483
    char  streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
2484
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2485
    if (TSDB_CODE_SUCCESS != code) {
×
2486
      sdbRelease(pSdb, pSma);
×
2487
      mndReleaseStb(pMnode, pStb);
×
2488
      TAOS_RETURN(code);
×
2489
    }
2490
    snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
×
2491
    pStream = NULL;
×
2492

2493
    code = mndAcquireStream(pMnode, streamName, &pStream);
×
2494
    if (!pStream) {
×
2495
      shouldRetry = true;
×
2496
      sdbRelease(pSdb, pSma);
×
2497
      mndReleaseStb(pMnode, pStb);
×
2498
      continue;
×
2499
    }
2500
    if (code != 0) {
×
2501
      sdbRelease(pSdb, pSma);
×
2502
      mndReleaseStb(pMnode, pStb);
×
2503
      TAOS_RETURN(code);
×
2504
    }
2505

2506
    int64_t streamId = pStream->uid;
×
2507
    mndReleaseStream(pMnode, pStream);
×
2508

2509
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2510
    if (!pTsma) {
×
2511
      code = terrno;
×
2512
      mndReleaseStb(pMnode, pStb);
×
2513
      sdbRelease(pSdb, pSma);
×
2514
      sdbCancelFetch(pSdb, pIter);
×
2515
      TAOS_RETURN(code);
×
2516
    }
2517
    pTsma->streamUid = streamId;
×
2518

2519
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
×
2520
    if (code == 0) {
×
2521
      code = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma, pBaseTsma);
×
2522
    }
2523
    mndReleaseStb(pMnode, pStb);
×
2524
    sdbRelease(pSdb, pSma);
×
2525
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
2526
    if (terrno) {
×
2527
      tFreeAndClearTableTSMAInfo(pTsma);
×
2528
      sdbCancelFetch(pSdb, pIter);
×
2529
      TAOS_RETURN(code);
×
2530
    }
2531
    if (NULL == taosArrayPush(pRsp->pTsmas, &pTsma)) {
×
2532
      code = terrno;
×
2533
      tFreeAndClearTableTSMAInfo(pTsma);
×
2534
      sdbCancelFetch(pSdb, pIter);
×
2535
      TAOS_RETURN(code);
×
2536
    }
2537
    *exist = true;
×
2538
  }
2539
  if (shouldRetry) {
×
2540
    return TSDB_CODE_NEED_RETRY;
×
2541
  }
2542
  return TSDB_CODE_SUCCESS;
×
2543
}
2544

2545
static bool tsmaTbFilter(const SSmaObj *pSma, void *param) {
×
2546
  const char *tbFName = param;
×
2547
  return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0;
×
2548
}
2549

2550
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *pRsp, bool *exist) {
×
2551
  return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist);
×
2552
}
2553

2554
static bool tsmaDbFilter(const SSmaObj *pSma, void *param) {
×
2555
  uint64_t *dbUid = param;
×
2556
  return pSma->dbUid != *dbUid;
×
2557
}
2558

2559
int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist) {
×
2560
  return mndGetSomeTsmas(pMnode, pRsp, tsmaDbFilter, &dbUid, exist);
×
2561
}
2562

2563
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
×
2564
  STableTSMAInfoRsp rsp = {0};
×
2565
  int32_t           code = -1;
×
2566
  STableTSMAInfoReq tsmaReq = {0};
×
2567
  bool              exist = false;
×
2568
  SMnode           *pMnode = pReq->info.node;
×
2569

2570
  TAOS_CHECK_GOTO(tDeserializeTableTSMAInfoReq(pReq->pCont, pReq->contLen, &tsmaReq), NULL, _OVER);
×
2571

2572
  rsp.pTsmas = taosArrayInit(4, POINTER_BYTES);
×
2573
  if (NULL == rsp.pTsmas) {
×
2574
    code = terrno;
×
2575
    goto _OVER;
×
2576
  }
2577

2578
  if (tsmaReq.fetchingWithTsmaName) {
×
2579
    code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
×
2580
  } else {
2581
    code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
×
2582
    if (TSDB_CODE_NEED_RETRY == code) {
×
2583
      code = TSDB_CODE_SUCCESS;
×
2584
    }
2585
  }
2586
  if (code) {
×
2587
    goto _OVER;
×
2588
  }
2589

2590
  if (!exist) {
×
2591
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
2592
  } else {
2593
    int32_t contLen = tSerializeTableTSMAInfoRsp(NULL, 0, &rsp);
×
2594
    void   *pRsp = rpcMallocCont(contLen);
×
2595
    if (pRsp == NULL) {
×
2596
      code = terrno;
×
2597
      goto _OVER;
×
2598
    }
2599

2600
    int32_t len = tSerializeTableTSMAInfoRsp(pRsp, contLen, &rsp);
×
2601
    if (len < 0) {
×
2602
      code = terrno;
×
2603
      goto _OVER;
×
2604
    }
2605

2606
    pReq->info.rsp = pRsp;
×
2607
    pReq->info.rspLen = contLen;
×
2608

2609
    code = 0;
×
2610
  }
2611

2612
_OVER:
×
2613
  tFreeTableTSMAInfoRsp(&rsp);
×
2614
  TAOS_RETURN(code);
×
2615
}
2616

2617
static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo **ppTsma) {
×
2618
  STableTSMAInfo *pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2619
  if (!pInfo) {
×
2620
    return terrno;
×
2621
  }
2622
  pInfo->pFuncs = NULL;
×
2623
  pInfo->tsmaId = pTsmaVer->tsmaId;
×
2624
  tstrncpy(pInfo->dbFName, pTsmaVer->dbFName, TSDB_DB_FNAME_LEN);
×
2625
  tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN);
×
2626
  tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN);
×
2627
  pInfo->dbId = pTsmaVer->dbId;
×
2628
  pInfo->ast = taosMemoryCalloc(1, 1);
×
2629
  if (!pInfo->ast) {
×
2630
    taosMemoryFree(pInfo);
×
2631
    return terrno;
×
2632
  }
2633
  *ppTsma = pInfo;
×
2634
  return TSDB_CODE_SUCCESS;
×
2635
}
2636

2637
int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp,
×
2638
                            int32_t *pRspLen) {
2639
  int32_t         code = -1;
×
2640
  STSMAHbRsp      hbRsp = {0};
×
2641
  int32_t         rspLen = 0;
×
2642
  void           *pRsp = NULL;
×
2643
  char            tsmaFName[TSDB_TABLE_FNAME_LEN] = {0};
×
2644
  STableTSMAInfo *pTsmaInfo = NULL;
×
2645

2646
  hbRsp.pTsmas = taosArrayInit(numOfTsmas, POINTER_BYTES);
×
2647
  if (!hbRsp.pTsmas) {
×
2648
    code = terrno;
×
2649
    TAOS_RETURN(code);
×
2650
  }
2651

2652
  for (int32_t i = 0; i < numOfTsmas; ++i) {
×
2653
    STSMAVersion *pTsmaVer = &pTsmaVersions[i];
×
2654
    pTsmaVer->dbId = be64toh(pTsmaVer->dbId);
×
2655
    pTsmaVer->tsmaId = be64toh(pTsmaVer->tsmaId);
×
2656
    pTsmaVer->version = ntohl(pTsmaVer->version);
×
2657

2658
    snprintf(tsmaFName, sizeof(tsmaFName), "%s.%s", pTsmaVer->dbFName, pTsmaVer->name);
×
2659
    SSmaObj *pSma = mndAcquireSma(pMnode, tsmaFName);
×
2660
    if (!pSma) {
×
2661
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2662
      if (code) goto _OVER;
×
2663
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2664
        code = terrno;
×
2665
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2666
        goto _OVER;
×
2667
      }
2668
      continue;
×
2669
    }
2670

2671
    if (pSma->uid != pTsmaVer->tsmaId) {
×
2672
      mDebug("tsma: %s.%" PRIx64 " tsmaId mismatch with current %" PRIx64, tsmaFName, pTsmaVer->tsmaId, pSma->uid);
×
2673
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2674
      mndReleaseSma(pMnode, pSma);
×
2675
      if (code) goto _OVER;
×
2676
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2677
        code = terrno;
×
2678
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2679
        goto _OVER;
×
2680
      }
2681
      continue;
×
2682
    } else if (pSma->version == pTsmaVer->version) {
×
2683
      mndReleaseSma(pMnode, pSma);
×
2684
      continue;
×
2685
    }
2686

2687
    SStbObj *pDestStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
2688
    if (!pDestStb) {
×
2689
      mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName);
×
2690
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2691
      mndReleaseSma(pMnode, pSma);
×
2692
      if (code) goto _OVER;
×
2693
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2694
        code = terrno;
×
2695
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2696
        goto _OVER;
×
2697
      }
2698
      continue;
×
2699
    }
2700

2701
    // dump smaObj into rsp
2702
    STableTSMAInfo *pInfo = NULL;
×
2703
    pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2704
    if (!pInfo) {
×
2705
      code = terrno;
×
2706
      mndReleaseSma(pMnode, pSma);
×
2707
      mndReleaseStb(pMnode, pDestStb);
×
2708
      goto _OVER;
×
2709
    }
2710

2711
    SSmaObj *pBaseSma = NULL;
×
2712
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma);
×
2713
    if (code == 0) code = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma);
×
2714

2715
    mndReleaseStb(pMnode, pDestStb);
×
2716
    mndReleaseSma(pMnode, pSma);
×
2717
    if (pBaseSma) mndReleaseSma(pMnode, pBaseSma);
×
2718
    if (terrno) {
×
2719
      tFreeAndClearTableTSMAInfo(pInfo);
×
2720
      goto _OVER;
×
2721
    }
2722

2723
    if (NULL == taosArrayPush(hbRsp.pTsmas, pInfo)) {
×
2724
      code = terrno;
×
2725
      tFreeAndClearTableTSMAInfo(pInfo);
×
2726
      goto _OVER;
×
2727
    }
2728
  }
2729

2730
  rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp);
×
2731
  if (rspLen < 0) {
×
2732
    code = terrno;
×
2733
    goto _OVER;
×
2734
  }
2735

2736
  pRsp = taosMemoryMalloc(rspLen);
×
2737
  if (!pRsp) {
×
2738
    code = terrno;
×
2739
    rspLen = 0;
×
2740
    goto _OVER;
×
2741
  }
2742

2743
  rspLen = tSerializeTSMAHbRsp(pRsp, rspLen, &hbRsp);
×
2744
  if (rspLen < 0) {
×
2745
    code = terrno;
×
2746
    goto _OVER;
×
2747
  }
2748
  code = 0;
×
2749
_OVER:
×
2750
  tFreeTSMAHbRsp(&hbRsp);
×
2751
  *ppRsp = pRsp;
×
2752
  *pRspLen = rspLen;
×
2753
  TAOS_RETURN(code);
×
2754
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc