• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.95
/source/dnode/mnode/impl/src/mndSma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSma.h"
18
#include "functionMgt.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndIndex.h"
22
#include "mndIndexComm.h"
23
#include "mndInfoSchema.h"
24
#include "mndMnode.h"
25
#include "mndPrivilege.h"
26
#include "mndScheduler.h"
27
#include "mndShow.h"
28
#include "mndStb.h"
29
#include "mndStream.h"
30
#include "mndTrans.h"
31
#include "mndUser.h"
32
#include "mndVgroup.h"
33
#include "parser.h"
34
#include "tname.h"
35

36
#define TSDB_SMA_VER_NUMBER   1
37
#define TSDB_SMA_RESERVE_SIZE 64
38

39
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma);
40
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
41
static int32_t  mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
42
static int32_t  mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
43
static int32_t  mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
44
static int32_t  mndProcessCreateSmaReq(SRpcMsg *pReq);
45
static int32_t  mndProcessDropSmaReq(SRpcMsg *pReq);
46
static int32_t  mndProcessGetSmaReq(SRpcMsg *pReq);
47
static int32_t  mndProcessGetTbSmaReq(SRpcMsg *pReq);
48
static int32_t  mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void     mndDestroySmaObj(SSmaObj *pSmaObj);
50

51
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq);
52
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq);
53

54
// sma and tag index comm func
55
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
56
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
57
static void    mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
58

59
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
60
static void    mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter);
61
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq);
62

63
typedef struct SCreateTSMACxt {
64
  SMnode        *pMnode;
65
  const SRpcMsg *pRpcReq;
66
  union {
67
    const SMCreateSmaReq *pCreateSmaReq;
68
    const SMDropSmaReq   *pDropSmaReq;
69
  };
70
  SDbObj             *pDb;
71
  SStbObj            *pSrcStb;
72
  SSmaObj            *pSma;
73
  const SSmaObj      *pBaseSma;
74
  SCMCreateStreamReq *pCreateStreamReq;
75
  SMDropStreamReq    *pDropStreamReq;
76
  const char         *streamName;
77
  const char         *targetStbFullName;
78
  SNodeList          *pProjects;
79
} SCreateTSMACxt;
80

81
int32_t mndInitSma(SMnode *pMnode) {
13✔
82
  SSdbTable table = {
13✔
83
      .sdbType = SDB_SMA,
84
      .keyType = SDB_KEY_BINARY,
85
      .encodeFp = (SdbEncodeFp)mndSmaActionEncode,
86
      .decodeFp = (SdbDecodeFp)mndSmaActionDecode,
87
      .insertFp = (SdbInsertFp)mndSmaActionInsert,
88
      .updateFp = (SdbUpdateFp)mndSmaActionUpdate,
89
      .deleteFp = (SdbDeleteFp)mndSmaActionDelete,
90
  };
91

92
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq);
13✔
93
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
13✔
94
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
13✔
95
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
13✔
96
  mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
13✔
97
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
13✔
98

99
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
13✔
100
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
13✔
101

102
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq);
13✔
103
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq);
13✔
104
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_TSMA, mndProcessGetTbTSMAReq);
13✔
105
  mndSetMsgHandle(pMnode, TDMT_MND_GET_TSMA, mndProcessGetTbTSMAReq);
13✔
106
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA);
13✔
107
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA);
13✔
108

109
  return sdbSetTable(pMnode->pSdb, table);
13✔
110
}
111

112
void mndCleanupSma(SMnode *pMnode) {}
13✔
113

UNCOV
114
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
×
UNCOV
115
  int32_t code = 0;
×
UNCOV
116
  int32_t lino = 0;
×
UNCOV
117
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
118

UNCOV
119
  int32_t size =
×
UNCOV
120
      sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
×
UNCOV
121
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size);
×
UNCOV
122
  if (pRaw == NULL) goto _OVER;
×
123

UNCOV
124
  int32_t dataPos = 0;
×
UNCOV
125
  SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
×
UNCOV
126
  SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
×
UNCOV
127
  SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
×
UNCOV
128
  SDB_SET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
×
UNCOV
129
  SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER)
×
UNCOV
130
  SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER)
×
UNCOV
131
  SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER)
×
UNCOV
132
  SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER)
×
UNCOV
133
  SDB_SET_INT64(pRaw, dataPos, pSma->dstTbUid, _OVER)
×
UNCOV
134
  SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER)
×
UNCOV
135
  SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER)
×
UNCOV
136
  SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER)
×
UNCOV
137
  SDB_SET_INT32(pRaw, dataPos, pSma->dstVgId, _OVER)
×
UNCOV
138
  SDB_SET_INT64(pRaw, dataPos, pSma->interval, _OVER)
×
UNCOV
139
  SDB_SET_INT64(pRaw, dataPos, pSma->offset, _OVER)
×
UNCOV
140
  SDB_SET_INT64(pRaw, dataPos, pSma->sliding, _OVER)
×
UNCOV
141
  SDB_SET_INT32(pRaw, dataPos, pSma->exprLen, _OVER)
×
UNCOV
142
  SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER)
×
UNCOV
143
  SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER)
×
UNCOV
144
  SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER)
×
UNCOV
145
  SDB_SET_INT32(pRaw, dataPos, pSma->version, _OVER)
×
146

UNCOV
147
  if (pSma->exprLen > 0) {
×
UNCOV
148
    SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
×
149
  }
UNCOV
150
  if (pSma->tagsFilterLen > 0) {
×
151
    SDB_SET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
152
  }
UNCOV
153
  if (pSma->sqlLen > 0) {
×
UNCOV
154
    SDB_SET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
×
155
  }
UNCOV
156
  if (pSma->astLen > 0) {
×
UNCOV
157
    SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
×
158
  }
UNCOV
159
  SDB_SET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
×
160

UNCOV
161
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
×
UNCOV
162
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
×
163

UNCOV
164
  terrno = 0;
×
165

UNCOV
166
_OVER:
×
UNCOV
167
  if (terrno != 0) {
×
168
    mError("sma:%s, failed to encode to raw:%p since %s", pSma->name, pRaw, terrstr());
×
169
    sdbFreeRaw(pRaw);
×
170
    return NULL;
×
171
  }
172

UNCOV
173
  mTrace("sma:%s, encode to raw:%p, row:%p", pSma->name, pRaw, pSma);
×
UNCOV
174
  return pRaw;
×
175
}
176

UNCOV
177
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
×
UNCOV
178
  int32_t code = 0;
×
UNCOV
179
  int32_t lino = 0;
×
UNCOV
180
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
181
  SSdbRow *pRow = NULL;
×
UNCOV
182
  SSmaObj *pSma = NULL;
×
183

UNCOV
184
  int8_t sver = 0;
×
UNCOV
185
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
×
186

UNCOV
187
  if (sver != TSDB_SMA_VER_NUMBER) {
×
188
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
189
    goto _OVER;
×
190
  }
191

UNCOV
192
  pRow = sdbAllocRow(sizeof(SSmaObj));
×
UNCOV
193
  if (pRow == NULL) goto _OVER;
×
194

UNCOV
195
  pSma = sdbGetRowObj(pRow);
×
UNCOV
196
  if (pSma == NULL) goto _OVER;
×
197

UNCOV
198
  int32_t dataPos = 0;
×
199

UNCOV
200
  SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER)
×
UNCOV
201
  SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER)
×
UNCOV
202
  SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER)
×
UNCOV
203
  SDB_GET_BINARY(pRaw, dataPos, pSma->dstTbName, TSDB_TABLE_FNAME_LEN, _OVER)
×
UNCOV
204
  SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER)
×
UNCOV
205
  SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER)
×
UNCOV
206
  SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER)
×
UNCOV
207
  SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER)
×
UNCOV
208
  SDB_GET_INT64(pRaw, dataPos, &pSma->dstTbUid, _OVER)
×
UNCOV
209
  SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER)
×
UNCOV
210
  SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER)
×
UNCOV
211
  SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER)
×
UNCOV
212
  SDB_GET_INT32(pRaw, dataPos, &pSma->dstVgId, _OVER)
×
UNCOV
213
  SDB_GET_INT64(pRaw, dataPos, &pSma->interval, _OVER)
×
UNCOV
214
  SDB_GET_INT64(pRaw, dataPos, &pSma->offset, _OVER)
×
UNCOV
215
  SDB_GET_INT64(pRaw, dataPos, &pSma->sliding, _OVER)
×
UNCOV
216
  SDB_GET_INT32(pRaw, dataPos, &pSma->exprLen, _OVER)
×
UNCOV
217
  SDB_GET_INT32(pRaw, dataPos, &pSma->tagsFilterLen, _OVER)
×
UNCOV
218
  SDB_GET_INT32(pRaw, dataPos, &pSma->sqlLen, _OVER)
×
UNCOV
219
  SDB_GET_INT32(pRaw, dataPos, &pSma->astLen, _OVER)
×
UNCOV
220
  SDB_GET_INT32(pRaw, dataPos, &pSma->version, _OVER)
×
221

UNCOV
222
  if (pSma->exprLen > 0) {
×
UNCOV
223
    pSma->expr = taosMemoryCalloc(pSma->exprLen, 1);
×
UNCOV
224
    if (pSma->expr == NULL) goto _OVER;
×
UNCOV
225
    SDB_GET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER)
×
226
  }
227

UNCOV
228
  if (pSma->tagsFilterLen > 0) {
×
229
    pSma->tagsFilter = taosMemoryCalloc(pSma->tagsFilterLen, 1);
×
230
    if (pSma->tagsFilter == NULL) goto _OVER;
×
231
    SDB_GET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER)
×
232
  }
233

UNCOV
234
  if (pSma->sqlLen > 0) {
×
UNCOV
235
    pSma->sql = taosMemoryCalloc(pSma->sqlLen, 1);
×
UNCOV
236
    if (pSma->sql == NULL) goto _OVER;
×
UNCOV
237
    SDB_GET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER)
×
238
  }
239

UNCOV
240
  if (pSma->astLen > 0) {
×
UNCOV
241
    pSma->ast = taosMemoryCalloc(pSma->astLen, 1);
×
UNCOV
242
    if (pSma->ast == NULL) goto _OVER;
×
UNCOV
243
    SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
×
244
  }
UNCOV
245
  SDB_GET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
×
246

UNCOV
247
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
×
248

UNCOV
249
  terrno = 0;
×
250

UNCOV
251
_OVER:
×
UNCOV
252
  if (terrno != 0) {
×
253
    if (pSma != NULL) {
×
254
      mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr());
×
255
      taosMemoryFreeClear(pSma->expr);
×
256
      taosMemoryFreeClear(pSma->tagsFilter);
×
257
      taosMemoryFreeClear(pSma->sql);
×
258
      taosMemoryFreeClear(pSma->ast);
×
259
    }
260
    taosMemoryFreeClear(pRow);
×
261
    return NULL;
×
262
  }
263

UNCOV
264
  mTrace("sma:%s, decode from raw:%p, row:%p", pSma->name, pRaw, pSma);
×
UNCOV
265
  return pRow;
×
266
}
267

UNCOV
268
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma) {
×
UNCOV
269
  mTrace("sma:%s, perform insert action, row:%p", pSma->name, pSma);
×
UNCOV
270
  return 0;
×
271
}
272

UNCOV
273
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSma) {
×
UNCOV
274
  mTrace("sma:%s, perform delete action, row:%p", pSma->name, pSma);
×
UNCOV
275
  taosMemoryFreeClear(pSma->tagsFilter);
×
UNCOV
276
  taosMemoryFreeClear(pSma->expr);
×
UNCOV
277
  taosMemoryFreeClear(pSma->sql);
×
UNCOV
278
  taosMemoryFreeClear(pSma->ast);
×
UNCOV
279
  return 0;
×
280
}
281

UNCOV
282
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew) {
×
UNCOV
283
  mTrace("sma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
×
UNCOV
284
  return 0;
×
285
}
286

UNCOV
287
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName) {
×
UNCOV
288
  SSdb    *pSdb = pMnode->pSdb;
×
UNCOV
289
  SSmaObj *pSma = sdbAcquire(pSdb, SDB_SMA, smaName);
×
UNCOV
290
  if (pSma == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
291
    terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
×
292
  }
UNCOV
293
  return pSma;
×
294
}
295

296
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) {
12✔
297
  SSdb *pSdb = pMnode->pSdb;
12✔
298
  sdbRelease(pSdb, pSma);
12✔
299
}
12✔
300

301
SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *db) { return mndAcquireDb(pMnode, db); }
×
302

UNCOV
303
static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
×
UNCOV
304
  SEncoder encoder = {0};
×
UNCOV
305
  int32_t  contLen = 0;
×
UNCOV
306
  SName    name = {0};
×
UNCOV
307
  int32_t  code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
308
  if (TSDB_CODE_SUCCESS != code) {
×
309
    return NULL;
×
310
  }
311

UNCOV
312
  SVCreateTSmaReq req = {0};
×
UNCOV
313
  req.version = 0;
×
UNCOV
314
  req.intervalUnit = pSma->intervalUnit;
×
UNCOV
315
  req.slidingUnit = pSma->slidingUnit;
×
316
//  req.timezoneInt = pSma->timezone;
UNCOV
317
  tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
×
UNCOV
318
  req.exprLen = pSma->exprLen;
×
UNCOV
319
  req.tagsFilterLen = pSma->tagsFilterLen;
×
UNCOV
320
  req.indexUid = pSma->uid;
×
UNCOV
321
  req.tableUid = pSma->stbUid;
×
UNCOV
322
  req.dstVgId = pSma->dstVgId;
×
UNCOV
323
  req.dstTbUid = pSma->dstTbUid;
×
UNCOV
324
  req.interval = pSma->interval;
×
UNCOV
325
  req.offset = pSma->offset;
×
UNCOV
326
  req.sliding = pSma->sliding;
×
UNCOV
327
  req.expr = pSma->expr;
×
UNCOV
328
  req.tagsFilter = pSma->tagsFilter;
×
UNCOV
329
  req.schemaRow = pSma->schemaRow;
×
UNCOV
330
  req.schemaTag = pSma->schemaTag;
×
UNCOV
331
  req.dstTbName = pSma->dstTbName;
×
332

333
  // get length
UNCOV
334
  int32_t ret = 0;
×
UNCOV
335
  tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret);
×
UNCOV
336
  if (ret < 0) {
×
337
    return NULL;
×
338
  }
UNCOV
339
  contLen += sizeof(SMsgHead);
×
340

UNCOV
341
  SMsgHead *pHead = taosMemoryMalloc(contLen);
×
UNCOV
342
  if (pHead == NULL) {
×
343
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
344
    return NULL;
×
345
  }
346

UNCOV
347
  pHead->contLen = htonl(contLen);
×
UNCOV
348
  pHead->vgId = htonl(pVgroup->vgId);
×
349

UNCOV
350
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
×
UNCOV
351
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
×
UNCOV
352
  if (tEncodeSVCreateTSmaReq(&encoder, &req) < 0) {
×
353
    taosMemoryFreeClear(pHead);
×
354
    tEncoderClear(&encoder);
×
355
    return NULL;
×
356
  }
357

UNCOV
358
  tEncoderClear(&encoder);
×
359

UNCOV
360
  *pContLen = contLen;
×
UNCOV
361
  return pHead;
×
362
}
363

364
static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
×
365
  SEncoder encoder = {0};
×
366
  int32_t  contLen;
367
  SName    name = {0};
×
368
  int32_t  code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
369
  if (TSDB_CODE_SUCCESS != code) {
×
370
    terrno = code;
×
371
    return NULL;
×
372
  }
373

374
  SVDropTSmaReq req = {0};
×
375
  req.indexUid = pSma->uid;
×
376
  tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN);
×
377

378
  // get length
379
  int32_t ret = 0;
×
380
  tEncodeSize(tEncodeSVDropTSmaReq, &req, contLen, ret);
×
381
  if (ret < 0) {
×
382
    return NULL;
×
383
  }
384

385
  contLen += sizeof(SMsgHead);
×
386

387
  SMsgHead *pHead = taosMemoryMalloc(contLen);
×
388
  if (pHead == NULL) {
×
389
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
390
    return NULL;
×
391
  }
392

393
  pHead->contLen = htonl(contLen);
×
394
  pHead->vgId = htonl(pVgroup->vgId);
×
395

396
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
×
397
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
×
398

399
  if (tEncodeSVDropTSmaReq(&encoder, &req) < 0) {
×
400
    taosMemoryFreeClear(pHead);
×
401
    tEncoderClear(&encoder);
×
402
    return NULL;
×
403
  }
404
  tEncoderClear(&encoder);
×
405

406
  *pContLen = contLen;
×
407
  return pHead;
×
408
}
409

UNCOV
410
static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
UNCOV
411
  int32_t  code = 0;
×
UNCOV
412
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
×
UNCOV
413
  if (pRedoRaw == NULL) {
×
414
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
415
    if (terrno != 0) code = terrno;
×
416
    TAOS_RETURN(code);
×
417
  }
UNCOV
418
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
UNCOV
419
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
420

UNCOV
421
  TAOS_RETURN(code);
×
422
}
423

UNCOV
424
static int32_t mndSetCreateSmaUndoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
UNCOV
425
  int32_t  code = 0;
×
UNCOV
426
  SSdbRaw *pUndoRaw = mndSmaActionEncode(pSma);
×
UNCOV
427
  if (!pUndoRaw) {
×
428
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
429
    if (terrno != 0) code = terrno;
×
430
    TAOS_RETURN(code);
×
431
  }
UNCOV
432
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
UNCOV
433
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
UNCOV
434
  TAOS_RETURN(code);
×
435
}
436

UNCOV
437
static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
UNCOV
438
  int32_t  code = 0;
×
UNCOV
439
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
×
UNCOV
440
  if (pCommitRaw == NULL) {
×
441
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
442
    if (terrno != 0) code = terrno;
×
443
    TAOS_RETURN(code);
×
444
  }
UNCOV
445
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
UNCOV
446
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
447

UNCOV
448
  TAOS_RETURN(code);
×
449
}
450

UNCOV
451
static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
UNCOV
452
  int32_t  code = 0;
×
UNCOV
453
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
UNCOV
454
  if (pVgRaw == NULL) {
×
455
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
456
    if (terrno != 0) code = terrno;
×
457
    TAOS_RETURN(code);
×
458
  }
UNCOV
459
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
×
UNCOV
460
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_UPDATE));
×
UNCOV
461
  TAOS_RETURN(code);
×
462
}
463

UNCOV
464
static int32_t mndSetCreateSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
UNCOV
465
  int32_t  code = 0;
×
UNCOV
466
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
UNCOV
467
  if (pVgRaw == NULL) {
×
468
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
469
    if (terrno != 0) code = terrno;
×
470
    TAOS_RETURN(code);
×
471
  }
UNCOV
472
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
×
UNCOV
473
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_READY));
×
UNCOV
474
  TAOS_RETURN(code);
×
475
}
476

UNCOV
477
static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
×
UNCOV
478
  int32_t code = 0;
×
UNCOV
479
  SStbObj stbObj = {0};
×
UNCOV
480
  taosRLockLatch(&pStb->lock);
×
UNCOV
481
  memcpy(&stbObj, pStb, sizeof(SStbObj));
×
UNCOV
482
  taosRUnLockLatch(&pStb->lock);
×
UNCOV
483
  stbObj.numOfColumns = 0;
×
UNCOV
484
  stbObj.pColumns = NULL;
×
UNCOV
485
  stbObj.numOfTags = 0;
×
UNCOV
486
  stbObj.pTags = NULL;
×
UNCOV
487
  stbObj.numOfFuncs = 0;
×
UNCOV
488
  stbObj.pFuncs = NULL;
×
UNCOV
489
  stbObj.updateTime = taosGetTimestampMs();
×
UNCOV
490
  stbObj.lock = 0;
×
UNCOV
491
  stbObj.smaVer++;
×
492

UNCOV
493
  SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj);
×
UNCOV
494
  if (pCommitRaw == NULL) {
×
495
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
496
    if (terrno != 0) code = terrno;
×
497
    TAOS_RETURN(code);
×
498
  }
UNCOV
499
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
UNCOV
500
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
501

UNCOV
502
  TAOS_RETURN(code);
×
503
}
504

UNCOV
505
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
×
506
                                                SSmaObj *pSma) {
UNCOV
507
  int32_t    code = 0;
×
UNCOV
508
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
×
UNCOV
509
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
UNCOV
510
  if (pDnode == NULL) {
×
511
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
512
    if (terrno != 0) code = terrno;
×
513
    TAOS_RETURN(code);
×
514
  }
515

UNCOV
516
  STransAction action = {0};
×
UNCOV
517
  action.epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
518
  mndReleaseDnode(pMnode, pDnode);
×
519

520
  // todo add sma info here
UNCOV
521
  SNode *pAst = NULL;
×
UNCOV
522
  TAOS_CHECK_RETURN(nodesStringToNode(pSma->ast, &pAst));
×
UNCOV
523
  if ((code = qExtractResultSchema(pAst, &pSma->schemaRow.nCols, &pSma->schemaRow.pSchema)) != 0) {
×
524
    nodesDestroyNode(pAst);
×
525
    TAOS_RETURN(code);
×
526
  }
UNCOV
527
  nodesDestroyNode(pAst);
×
UNCOV
528
  pSma->schemaRow.version = 1;
×
529

530
  // TODO: the schemaTag generated by qExtractResultXXX later.
UNCOV
531
  pSma->schemaTag.nCols = 1;
×
UNCOV
532
  pSma->schemaTag.version = 1;
×
UNCOV
533
  pSma->schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema));
×
UNCOV
534
  if (!pSma->schemaTag.pSchema) {
×
535
    TAOS_RETURN(-1);
×
536
  }
UNCOV
537
  pSma->schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT;
×
UNCOV
538
  pSma->schemaTag.pSchema[0].bytes = TYPE_BYTES[TSDB_DATA_TYPE_BIGINT];
×
UNCOV
539
  pSma->schemaTag.pSchema[0].colId = pSma->schemaRow.nCols + PRIMARYKEY_TIMESTAMP_COL_ID;
×
UNCOV
540
  pSma->schemaTag.pSchema[0].flags = 0;
×
UNCOV
541
  snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId");
×
542

UNCOV
543
  int32_t smaContLen = 0;
×
UNCOV
544
  void   *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
×
UNCOV
545
  if (pSmaReq == NULL) {
×
546
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
547
    if (terrno != 0) code = terrno;
×
548
    TAOS_RETURN(code);
×
549
  }
UNCOV
550
  pVgroup->pTsma = pSmaReq;
×
551

UNCOV
552
  int32_t contLen = 0;
×
UNCOV
553
  void   *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
UNCOV
554
  if (pReq == NULL) {
×
555
    taosMemoryFreeClear(pSmaReq);
×
556
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
557
    if (terrno != 0) code = terrno;
×
558
    TAOS_RETURN(code);
×
559
  }
560

UNCOV
561
  action.mTraceId = pTrans->mTraceId;
×
UNCOV
562
  action.pCont = pReq;
×
UNCOV
563
  action.contLen = contLen;
×
UNCOV
564
  action.msgType = TDMT_DND_CREATE_VNODE;
×
UNCOV
565
  action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
×
566

UNCOV
567
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
568
    taosMemoryFreeClear(pSmaReq);
×
569
    taosMemoryFree(pReq);
×
570
    TAOS_RETURN(code);
×
571
  }
572

UNCOV
573
  action.pCont = pSmaReq;
×
UNCOV
574
  action.contLen = smaContLen;
×
UNCOV
575
  action.msgType = TDMT_VND_CREATE_SMA;
×
UNCOV
576
  action.acceptableCode = TSDB_CODE_TSMA_ALREADY_EXIST;
×
577

UNCOV
578
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
579
    taosMemoryFreeClear(pSmaReq);
×
580
    TAOS_RETURN(code);
×
581
  }
582

UNCOV
583
  TAOS_RETURN(code);
×
584
}
585

UNCOV
586
static void mndDestroySmaObj(SSmaObj *pSmaObj) {
×
UNCOV
587
  if (pSmaObj) {
×
UNCOV
588
    taosMemoryFreeClear(pSmaObj->schemaRow.pSchema);
×
UNCOV
589
    taosMemoryFreeClear(pSmaObj->schemaTag.pSchema);
×
590
  }
UNCOV
591
}
×
592

UNCOV
593
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb,
×
594
                            const char *streamName) {
UNCOV
595
  int32_t code = 0;
×
UNCOV
596
  if (pDb->cfg.replications > 1) {
×
597
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
598
    mError("sma:%s, failed to create since not support multiple replicas", pCreate->name);
×
599
    TAOS_RETURN(code);
×
600
  }
UNCOV
601
  SSmaObj smaObj = {0};
×
UNCOV
602
  memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
×
UNCOV
603
  memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
×
UNCOV
604
  memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
×
UNCOV
605
  smaObj.createdTime = taosGetTimestampMs();
×
UNCOV
606
  smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
×
607

UNCOV
608
  char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
×
UNCOV
609
  snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name);
×
UNCOV
610
  memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
611
  smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
612
  smaObj.stbUid = pStb->uid;
×
UNCOV
613
  smaObj.dbUid = pStb->dbUid;
×
UNCOV
614
  smaObj.intervalUnit = pCreate->intervalUnit;
×
UNCOV
615
  smaObj.slidingUnit = pCreate->slidingUnit;
×
616
#if 0
617
//  smaObj.timezone = pCreate->timezone;
618
#endif
619
//  smaObj.timezone = taosGetLocalTimezoneOffset();  // use timezone of server
UNCOV
620
  smaObj.interval = pCreate->interval;
×
UNCOV
621
  smaObj.offset = pCreate->offset;
×
UNCOV
622
  smaObj.sliding = pCreate->sliding;
×
UNCOV
623
  smaObj.exprLen = pCreate->exprLen;
×
UNCOV
624
  smaObj.tagsFilterLen = pCreate->tagsFilterLen;
×
UNCOV
625
  smaObj.sqlLen = pCreate->sqlLen;
×
UNCOV
626
  smaObj.astLen = pCreate->astLen;
×
UNCOV
627
  if (smaObj.exprLen > 0) {
×
UNCOV
628
    smaObj.expr = pCreate->expr;
×
629
  }
UNCOV
630
  if (smaObj.tagsFilterLen > 0) {
×
631
    smaObj.tagsFilter = pCreate->tagsFilter;
×
632
  }
UNCOV
633
  if (smaObj.sqlLen > 0) {
×
UNCOV
634
    smaObj.sql = pCreate->sql;
×
635
  }
UNCOV
636
  if (smaObj.astLen > 0) {
×
UNCOV
637
    smaObj.ast = pCreate->ast;
×
638
  }
639

UNCOV
640
  SStreamObj streamObj = {0};
×
UNCOV
641
  tstrncpy(streamObj.name, streamName, TSDB_STREAM_FNAME_LEN);
×
UNCOV
642
  tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
×
UNCOV
643
  tstrncpy(streamObj.targetDb, streamObj.sourceDb, TSDB_DB_FNAME_LEN);
×
UNCOV
644
  streamObj.createTime = taosGetTimestampMs();
×
UNCOV
645
  streamObj.updateTime = streamObj.createTime;
×
UNCOV
646
  streamObj.uid = mndGenerateUid(streamName, strlen(streamName));
×
UNCOV
647
  streamObj.sourceDbUid = pDb->uid;
×
UNCOV
648
  streamObj.targetDbUid = pDb->uid;
×
UNCOV
649
  streamObj.version = 1;
×
UNCOV
650
  streamObj.sql = taosStrdup(pCreate->sql);
×
UNCOV
651
  if (!streamObj.sql) {
×
652
    return terrno;
×
653
  }
UNCOV
654
  streamObj.smaId = smaObj.uid;
×
UNCOV
655
  streamObj.conf.watermark = pCreate->watermark;
×
UNCOV
656
  streamObj.deleteMark = pCreate->deleteMark;
×
UNCOV
657
  streamObj.conf.fillHistory = STREAM_FILL_HISTORY_ON;
×
UNCOV
658
  streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
×
UNCOV
659
  streamObj.conf.triggerParam = pCreate->maxDelay;
×
UNCOV
660
  streamObj.ast = taosStrdup(smaObj.ast);
×
UNCOV
661
  if (!streamObj.ast) {
×
662
    taosMemoryFree(streamObj.sql);
×
663
    return terrno;
×
664
  }
UNCOV
665
  streamObj.indexForMultiAggBalance = -1;
×
666

667
  // check the maxDelay
UNCOV
668
  if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
×
UNCOV
669
    int64_t msInterval = -1;
×
670
    int32_t code =
UNCOV
671
        convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND, &msInterval);
×
UNCOV
672
    if (TSDB_CODE_SUCCESS != code) {
×
673
      mError("sma:%s, failed to create since convert time failed: %s", smaObj.name, tstrerror(code));
×
674
      return code;
×
675
    }
UNCOV
676
    streamObj.conf.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
×
677
  }
UNCOV
678
  if (streamObj.conf.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
×
679
    streamObj.conf.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
×
680
  }
681

UNCOV
682
  if ((code = mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg)) != 0) {
×
683
    mError("sma:%s, failed to create since %s", smaObj.name, tstrerror(code));
×
684
    TAOS_RETURN(code);
×
685
  }
UNCOV
686
  smaObj.dstVgId = streamObj.fixedSinkVg.vgId;
×
UNCOV
687
  streamObj.fixedSinkVgId = smaObj.dstVgId;
×
688

UNCOV
689
  SNode *pAst = NULL;
×
UNCOV
690
  if (nodesStringToNode(streamObj.ast, &pAst) < 0) {
×
691
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
692
    mError("sma:%s, failed to create since parse ast error", smaObj.name);
×
693
    TAOS_RETURN(code);
×
694
  }
695

696
  // extract output schema from ast
UNCOV
697
  if (qExtractResultSchema(pAst, (int32_t *)&streamObj.outputSchema.nCols, &streamObj.outputSchema.pSchema) != 0) {
×
698
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
699
    mError("sma:%s, failed to create since extract result schema error", smaObj.name);
×
700
    TAOS_RETURN(code);
×
701
  }
702

UNCOV
703
  SQueryPlan  *pPlan = NULL;
×
UNCOV
704
  SPlanContext cxt = {
×
705
      .pAstRoot = pAst,
706
      .topicQuery = false,
707
      .streamQuery = true,
UNCOV
708
      .triggerType = streamObj.conf.trigger,
×
UNCOV
709
      .watermark = streamObj.conf.watermark,
×
UNCOV
710
      .deleteMark = streamObj.deleteMark,
×
711
  };
712

UNCOV
713
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
×
714
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
715
    mError("sma:%s, failed to create since create query plan error", smaObj.name);
×
716
    TAOS_RETURN(code);
×
717
  }
718

719
  // save physcial plan
UNCOV
720
  if (nodesNodeToString((SNode *)pPlan, false, &streamObj.physicalPlan, NULL) != 0) {
×
721
    code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
722
    mError("sma:%s, failed to create since save physcial plan error", smaObj.name);
×
723
    TAOS_RETURN(code);
×
724
  }
725

UNCOV
726
  if (pAst != NULL) nodesDestroyNode(pAst);
×
UNCOV
727
  nodesDestroyNode((SNode *)pPlan);
×
728

UNCOV
729
  code = -1;
×
UNCOV
730
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-sma");
×
UNCOV
731
  if (pTrans == NULL) {
×
732
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
733
    if (terrno != 0) code = terrno;
×
734
    goto _OVER;
×
735
  }
UNCOV
736
  mndTransSetDbName(pTrans, pDb->name, NULL);
×
UNCOV
737
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
738

UNCOV
739
  mndTransSetSerial(pTrans);
×
UNCOV
740
  mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
×
UNCOV
741
  TAOS_CHECK_GOTO(mndAddNewVgPrepareAction(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
×
UNCOV
742
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj), NULL, _OVER);
×
UNCOV
743
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
×
UNCOV
744
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj), NULL, _OVER);
×
UNCOV
745
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg), NULL, _OVER);
×
UNCOV
746
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
×
UNCOV
747
  TAOS_CHECK_GOTO(mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj), NULL, _OVER);
×
UNCOV
748
  TAOS_CHECK_GOTO(mndScheduleStream(pMnode, &streamObj, 1685959190000, NULL), NULL, _OVER);
×
UNCOV
749
  TAOS_CHECK_GOTO(mndPersistStream(pTrans, &streamObj), NULL, _OVER);
×
UNCOV
750
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
751

UNCOV
752
  mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreate->name,
×
753
        smaObj.uid, smaObj.stbUid, smaObj.dstTbUid, smaObj.dstTbName, smaObj.dstVgId);
754

UNCOV
755
  code = 0;
×
756

UNCOV
757
_OVER:
×
UNCOV
758
  tFreeStreamObj(&streamObj);
×
UNCOV
759
  mndDestroySmaObj(&smaObj);
×
UNCOV
760
  mndTransDrop(pTrans);
×
UNCOV
761
  TAOS_RETURN(code);
×
762
}
763

UNCOV
764
static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
×
UNCOV
765
  int32_t code = TSDB_CODE_MND_INVALID_SMA_OPTION;
×
UNCOV
766
  if (pCreate->name[0] == 0) TAOS_RETURN(code);
×
UNCOV
767
  if (pCreate->stb[0] == 0) TAOS_RETURN(code);
×
UNCOV
768
  if (pCreate->igExists < 0 || pCreate->igExists > 1) TAOS_RETURN(code);
×
UNCOV
769
  if (pCreate->intervalUnit < 0) TAOS_RETURN(code);
×
UNCOV
770
  if (pCreate->slidingUnit < 0) TAOS_RETURN(code);
×
UNCOV
771
  if (pCreate->timezone < 0) TAOS_RETURN(code);
×
UNCOV
772
  if (pCreate->interval < 0) TAOS_RETURN(code);
×
UNCOV
773
  if (pCreate->offset < 0) TAOS_RETURN(code);
×
UNCOV
774
  if (pCreate->sliding < 0) TAOS_RETURN(code);
×
UNCOV
775
  if (pCreate->exprLen < 0) TAOS_RETURN(code);
×
UNCOV
776
  if (pCreate->tagsFilterLen < 0) TAOS_RETURN(code);
×
UNCOV
777
  if (pCreate->sqlLen < 0) TAOS_RETURN(code);
×
UNCOV
778
  if (pCreate->astLen < 0) TAOS_RETURN(code);
×
UNCOV
779
  if (pCreate->exprLen != 0 && strlen(pCreate->expr) + 1 != pCreate->exprLen) TAOS_RETURN(code);
×
UNCOV
780
  if (pCreate->tagsFilterLen != 0 && strlen(pCreate->tagsFilter) + 1 != pCreate->tagsFilterLen) TAOS_RETURN(code);
×
UNCOV
781
  if (pCreate->sqlLen != 0 && strlen(pCreate->sql) + 1 != pCreate->sqlLen) TAOS_RETURN(code);
×
UNCOV
782
  if (pCreate->astLen != 0 && strlen(pCreate->ast) + 1 != pCreate->astLen) TAOS_RETURN(code);
×
783

UNCOV
784
  SName smaName = {0};
×
UNCOV
785
  if (tNameFromString(&smaName, pCreate->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) < 0) return -1;
×
UNCOV
786
  if (*(char *)tNameGetTableName(&smaName) == 0) return -1;
×
787

UNCOV
788
  code = 0;
×
UNCOV
789
  TAOS_RETURN(code);
×
790
}
791

UNCOV
792
static int32_t mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
×
793
  SName   n;
UNCOV
794
  int32_t code = tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
795
  if (TSDB_CODE_SUCCESS != code) {
×
796
    return code;
×
797
  }
UNCOV
798
  snprintf(streamName, TSDB_TABLE_FNAME_LEN,"%d.%s", n.acctId, n.tname);
×
UNCOV
799
  return TSDB_CODE_SUCCESS;
×
800
}
801

UNCOV
802
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
×
UNCOV
803
  SMnode        *pMnode = pReq->info.node;
×
UNCOV
804
  int32_t        code = -1;
×
UNCOV
805
  SStbObj       *pStb = NULL;
×
UNCOV
806
  SSmaObj       *pSma = NULL;
×
UNCOV
807
  SStreamObj    *pStream = NULL;
×
UNCOV
808
  SDbObj        *pDb = NULL;
×
UNCOV
809
  SMCreateSmaReq createReq = {0};
×
810

UNCOV
811
  int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
×
812

UNCOV
813
  TAOS_CHECK_GOTO(tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
×
814

815
#ifdef WINDOWS
816
  terrno = TSDB_CODE_MND_INVALID_PLATFORM;
817
  goto _OVER;
818
#endif
UNCOV
819
  mInfo("sma:%s, start to create", createReq.name);
×
UNCOV
820
  TAOS_CHECK_GOTO(mndCheckCreateSmaReq(&createReq), NULL, _OVER);
×
821

UNCOV
822
  pStb = mndAcquireStb(pMnode, createReq.stb);
×
UNCOV
823
  if (pStb == NULL) {
×
824
    mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
825
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
826
    if (terrno != 0) code = terrno;
×
827
    goto _OVER;
×
828
  }
829

UNCOV
830
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
831
  code = mndGetStreamNameFromSmaName(streamName, createReq.name);
×
UNCOV
832
  if (TSDB_CODE_SUCCESS != code) {
×
833
    goto _OVER;
×
834
  }
835

UNCOV
836
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
UNCOV
837
  if (pStream != NULL || code == 0) {
×
838
    mError("sma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
×
839
    code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
840
    goto _OVER;
×
841
  }
UNCOV
842
  SSIdx idx = {0};
×
UNCOV
843
  if ((code = mndAcquireGlobalIdx(pMnode, createReq.name, SDB_SMA, &idx)) == 0) {
×
UNCOV
844
    pSma = idx.pIdx;
×
845
  } else {
UNCOV
846
    goto _OVER;
×
847
  }
848

UNCOV
849
  if (pSma != NULL) {
×
850
    if (createReq.igExists) {
×
851
      mInfo("sma:%s, already exist in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
852
      code = 0;
×
853
      goto _OVER;
×
854
    } else {
855
      code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
856
      goto _OVER;
×
857
    }
858
  }
859

UNCOV
860
  SName name = {0};
×
UNCOV
861
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
862
  if (TSDB_CODE_SUCCESS != code) {
×
863
    goto _OVER;
×
864
  }
UNCOV
865
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
866
  (void)tNameGetFullDbName(&name, db);
×
867

UNCOV
868
  pDb = mndAcquireDb(pMnode, db);
×
UNCOV
869
  if (pDb == NULL) {
×
870
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
871
    goto _OVER;
×
872
  }
873

UNCOV
874
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
875

UNCOV
876
  code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb, streamName);
×
UNCOV
877
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
878

879
_OVER:
×
UNCOV
880
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
881
    mError("sma:%s, failed to create since %s", createReq.name, tstrerror(code));
×
882
  }
883

UNCOV
884
  mndReleaseStb(pMnode, pStb);
×
UNCOV
885
  mndReleaseSma(pMnode, pSma);
×
UNCOV
886
  mndReleaseStream(pMnode, pStream);
×
UNCOV
887
  mndReleaseDb(pMnode, pDb);
×
UNCOV
888
  tFreeSMCreateSmaReq(&createReq);
×
889

UNCOV
890
  TAOS_RETURN(code);
×
891
}
892

UNCOV
893
static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
UNCOV
894
  int32_t  code = 0;
×
UNCOV
895
  SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
×
UNCOV
896
  if (pRedoRaw == NULL) {
×
897
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
898
    if (terrno != 0) code = terrno;
×
899
    return -1;
×
900
  }
UNCOV
901
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
UNCOV
902
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
903

UNCOV
904
  return 0;
×
905
}
906

UNCOV
907
static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
×
UNCOV
908
  int32_t  code = 0;
×
UNCOV
909
  SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma);
×
UNCOV
910
  if (pCommitRaw == NULL) {
×
911
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
912
    if (terrno != 0) code = terrno;
×
913
    return -1;
×
914
  }
UNCOV
915
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
UNCOV
916
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
917

UNCOV
918
  return 0;
×
919
}
920

UNCOV
921
static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
UNCOV
922
  int32_t  code = 0;
×
UNCOV
923
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
UNCOV
924
  if (pVgRaw == NULL) {
×
925
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
926
    if (terrno != 0) code = terrno;
×
927
    return -1;
×
928
  }
UNCOV
929
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
×
UNCOV
930
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPING));
×
931

UNCOV
932
  return 0;
×
933
}
934

UNCOV
935
static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
UNCOV
936
  int32_t  code = 0;
×
UNCOV
937
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
UNCOV
938
  if (pVgRaw == NULL) {
×
939
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
940
    if (terrno != 0) code = terrno;
×
941
    return -1;
×
942
  }
UNCOV
943
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
×
UNCOV
944
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED));
×
945

UNCOV
946
  return 0;
×
947
}
948

UNCOV
949
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
×
UNCOV
950
  int32_t    code = 0;
×
UNCOV
951
  SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
×
UNCOV
952
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
×
UNCOV
953
  if (pDnode == NULL) {
×
954
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
955
    if (terrno != 0) code = terrno;
×
956
    TAOS_RETURN(code);
×
957
  }
958

UNCOV
959
  STransAction action = {0};
×
UNCOV
960
  action.epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
961
  mndReleaseDnode(pMnode, pDnode);
×
962

UNCOV
963
  int32_t contLen = 0;
×
UNCOV
964
  void   *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
×
UNCOV
965
  if (pReq == NULL) {
×
966
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
967
    if (terrno != 0) code = terrno;
×
968
    TAOS_RETURN(code);
×
969
  }
970

UNCOV
971
  action.pCont = pReq;
×
UNCOV
972
  action.contLen = contLen;
×
UNCOV
973
  action.msgType = TDMT_DND_DROP_VNODE;
×
UNCOV
974
  action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
×
975

UNCOV
976
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
977
    taosMemoryFree(pReq);
×
978
    TAOS_RETURN(code);
×
979
  }
980

UNCOV
981
  TAOS_RETURN(code);
×
982
}
983

UNCOV
984
static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) {
×
UNCOV
985
  int32_t     code = -1;
×
UNCOV
986
  SVgObj     *pVgroup = NULL;
×
UNCOV
987
  SStbObj    *pStb = NULL;
×
UNCOV
988
  STrans     *pTrans = NULL;
×
UNCOV
989
  SStreamObj *pStream = NULL;
×
990

UNCOV
991
  pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
UNCOV
992
  if (pVgroup == NULL) {
×
993
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
994
    if (terrno != 0) code = terrno;
×
995
    goto _OVER;
×
996
  }
997

UNCOV
998
  pStb = mndAcquireStb(pMnode, pSma->stb);
×
UNCOV
999
  if (pStb == NULL) {
×
1000
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1001
    if (terrno != 0) code = terrno;
×
1002
    goto _OVER;
×
1003
  }
1004

UNCOV
1005
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-sma");
×
UNCOV
1006
  if (pTrans == NULL) {
×
1007
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1008
    if (terrno != 0) code = terrno;
×
1009
    goto _OVER;
×
1010
  }
1011

UNCOV
1012
  mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
×
UNCOV
1013
  mndTransSetDbName(pTrans, pDb->name, NULL);
×
UNCOV
1014
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
×
1015

UNCOV
1016
  mndTransSetSerial(pTrans);
×
1017

UNCOV
1018
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1019
  code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
UNCOV
1020
  if (TSDB_CODE_SUCCESS != code) {
×
1021
    goto _OVER;
×
1022
  }
1023

UNCOV
1024
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
UNCOV
1025
  if (pStream == NULL || pStream->smaId != pSma->uid || code != 0) {
×
1026
    sdbRelease(pMnode->pSdb, pStream);
×
1027
    goto _OVER;
×
1028
  } else {
UNCOV
1029
    if ((code = mndStreamSetDropAction(pMnode, pTrans, pStream)) < 0) {
×
1030
      mError("stream:%s, failed to drop task since %s", pStream->name, tstrerror(code));
×
1031
      sdbRelease(pMnode->pSdb, pStream);
×
1032
      goto _OVER;
×
1033
    }
1034

1035
    // drop stream
UNCOV
1036
    if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
1037
      mError("stream:%s, failed to drop log since %s", pStream->name, tstrerror(code));
×
1038
      sdbRelease(pMnode->pSdb, pStream);
×
1039
      goto _OVER;
×
1040
    }
1041
  }
UNCOV
1042
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
UNCOV
1043
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
UNCOV
1044
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
UNCOV
1045
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
UNCOV
1046
  TAOS_CHECK_GOTO(mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb), NULL, _OVER);
×
UNCOV
1047
  TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
UNCOV
1048
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
1049

UNCOV
1050
  code = 0;
×
1051

UNCOV
1052
_OVER:
×
UNCOV
1053
  mndTransDrop(pTrans);
×
UNCOV
1054
  mndReleaseStream(pMnode, pStream);
×
UNCOV
1055
  mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
1056
  mndReleaseStb(pMnode, pStb);
×
UNCOV
1057
  TAOS_RETURN(code);
×
1058
}
1059

1060
int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
1✔
1061
  SSdb    *pSdb = pMnode->pSdb;
1✔
1062
  SSmaObj *pSma = NULL;
1✔
1063
  void    *pIter = NULL;
1✔
1064
  SVgObj  *pVgroup = NULL;
1✔
1065
  int32_t  code = -1;
1✔
1066

1067
  while (1) {
1068
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
1✔
1069
    if (pIter == NULL) break;
1!
1070

UNCOV
1071
    if (pSma->stbUid == pStb->uid) {
×
1072
      mndTransSetSerial(pTrans);
×
1073
      pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
1074
      if (pVgroup == NULL) {
×
1075
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1076
        if (terrno != 0) code = terrno;
×
1077
        goto _OVER;
×
1078
      }
1079

1080
      char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
1081
      code = mndGetStreamNameFromSmaName(streamName, pSma->name);
×
1082
      if (TSDB_CODE_SUCCESS != code) {
×
1083
        goto _OVER;
×
1084
      }
1085

1086
      SStreamObj *pStream = NULL;
×
1087
      code = mndAcquireStream(pMnode, streamName, &pStream);
×
1088
      if ((pStream != NULL && pStream->smaId == pSma->uid) || code != 0) {
×
1089
        if ((code = mndStreamSetDropAction(pMnode, pTrans, pStream)) < 0) {
×
1090
          mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
×
1091
          mndReleaseStream(pMnode, pStream);
×
1092
          goto _OVER;
×
1093
        }
1094

1095
        if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED)) < 0) {
×
1096
          mndReleaseStream(pMnode, pStream);
×
1097
          goto _OVER;
×
1098
        }
1099

1100
        mndReleaseStream(pMnode, pStream);
×
1101
      }
1102

1103
      TAOS_CHECK_GOTO(mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup), NULL, _OVER);
×
1104
      TAOS_CHECK_GOTO(mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup), NULL, _OVER);
×
1105
      TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pMnode, pTrans, pSma), NULL, _OVER);
×
1106
      mndReleaseVgroup(pMnode, pVgroup);
×
1107
      pVgroup = NULL;
×
1108
    }
1109

UNCOV
1110
    sdbRelease(pSdb, pSma);
×
1111
  }
1112

1113
  code = 0;
1✔
1114

1115
_OVER:
1✔
1116
  sdbCancelFetch(pSdb, pIter);
1✔
1117
  sdbRelease(pSdb, pSma);
1✔
1118
  mndReleaseVgroup(pMnode, pVgroup);
1✔
1119
  TAOS_RETURN(code);
1✔
1120
}
1121

1122
int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
8✔
1123
  int32_t code = 0;
8✔
1124
  SSdb   *pSdb = pMnode->pSdb;
8✔
1125
  void   *pIter = NULL;
8✔
1126

UNCOV
1127
  while (1) {
×
1128
    SSmaObj *pSma = NULL;
8✔
1129
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
8✔
1130
    if (pIter == NULL) break;
8!
1131

UNCOV
1132
    if (pSma->dbUid == pDb->uid) {
×
UNCOV
1133
      if ((code = mndSetDropSmaCommitLogs(pMnode, pTrans, pSma)) != 0) {
×
1134
        sdbRelease(pSdb, pSma);
×
1135
        sdbCancelFetch(pSdb, pSma);
×
1136
        TAOS_RETURN(code);
×
1137
      }
1138
    }
1139

UNCOV
1140
    sdbRelease(pSdb, pSma);
×
1141
  }
1142

1143
  TAOS_RETURN(code);
8✔
1144
}
1145

1146
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
12✔
1147
  SMnode      *pMnode = pReq->info.node;
12✔
1148
  int32_t      code = -1;
12✔
1149
  SDbObj      *pDb = NULL;
12✔
1150
  SSmaObj     *pSma = NULL;
12✔
1151
  SMDropSmaReq dropReq = {0};
12✔
1152

1153
  TAOS_CHECK_GOTO(tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
12!
1154

1155
  mInfo("sma:%s, start to drop", dropReq.name);
12!
1156

1157
  SSIdx idx = {0};
12✔
1158
  if ((code = mndAcquireGlobalIdx(pMnode, dropReq.name, SDB_SMA, &idx)) == 0) {
12!
UNCOV
1159
    pSma = idx.pIdx;
×
1160
  } else {
1161
    goto _OVER;
12✔
1162
  }
UNCOV
1163
  if (pSma == NULL) {
×
UNCOV
1164
    if (dropReq.igNotExists) {
×
1165
      mInfo("sma:%s, not exist, ignore not exist is set", dropReq.name);
×
1166
      code = 0;
×
1167
      goto _OVER;
×
1168
    } else {
UNCOV
1169
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
UNCOV
1170
      goto _OVER;
×
1171
    }
1172
  }
1173

UNCOV
1174
  SName name = {0};
×
UNCOV
1175
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1176
  if (TSDB_CODE_SUCCESS != code) {
×
1177
    goto _OVER;
×
1178
  }
UNCOV
1179
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1180
  (void)tNameGetFullDbName(&name, db);
×
1181

UNCOV
1182
  pDb = mndAcquireDb(pMnode, db);
×
UNCOV
1183
  if (pDb == NULL) {
×
1184
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1185
    goto _OVER;
×
1186
  }
1187

UNCOV
1188
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
1189

UNCOV
1190
  code = mndDropSma(pMnode, pReq, pDb, pSma);
×
UNCOV
1191
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1192

1193
_OVER:
×
1194
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
12!
1195
    mError("sma:%s, failed to drop since %s", dropReq.name, tstrerror(code));
12!
1196
  }
1197

1198
  mndReleaseSma(pMnode, pSma);
12✔
1199
  mndReleaseDb(pMnode, pDb);
12✔
1200
  TAOS_RETURN(code);
12✔
1201
}
1202

1203
static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
×
1204
  int32_t  code = -1;
×
1205
  SSmaObj *pSma = NULL;
×
1206

1207
  SSIdx idx = {0};
×
1208
  if (0 == mndAcquireGlobalIdx(pMnode, indexReq->indexFName, SDB_SMA, &idx)) {
×
1209
    pSma = idx.pIdx;
×
1210
  } else {
1211
    *exist = false;
×
1212
    return 0;
×
1213
  }
1214

1215
  if (pSma == NULL) {
×
1216
    *exist = false;
×
1217
    return 0;
×
1218
  }
1219

1220
  memcpy(rsp->dbFName, pSma->db, sizeof(pSma->db));
×
1221
  memcpy(rsp->tblFName, pSma->stb, sizeof(pSma->stb));
×
1222
  tstrncpy(rsp->indexType, TSDB_INDEX_TYPE_SMA, TSDB_INDEX_TYPE_LEN);
×
1223

1224
  SNodeList *pList = NULL;
×
1225
  int32_t    extOffset = 0;
×
1226
  code = nodesStringToList(pSma->expr, &pList);
×
1227
  if (0 == code) {
×
1228
    SNode *node = NULL;
×
1229
    FOREACH(node, pList) {
×
1230
      SFunctionNode *pFunc = (SFunctionNode *)node;
×
1231
      extOffset += tsnprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
×
1232
                             (extOffset ? "," : ""), pFunc->functionName);
×
1233
    }
1234

1235
    *exist = true;
×
1236
  }
1237

1238
  mndReleaseSma(pMnode, pSma);
×
1239
  return code;
×
1240
}
1241

UNCOV
1242
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) {
×
UNCOV
1243
  int32_t         code = 0;
×
UNCOV
1244
  SSmaObj        *pSma = NULL;
×
UNCOV
1245
  SSdb           *pSdb = pMnode->pSdb;
×
UNCOV
1246
  void           *pIter = NULL;
×
1247
  STableIndexInfo info;
1248

UNCOV
1249
  SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
×
UNCOV
1250
  if (NULL == pStb) {
×
UNCOV
1251
    *exist = false;
×
UNCOV
1252
    return TSDB_CODE_SUCCESS;
×
1253
  }
1254

UNCOV
1255
  tstrncpy(rsp->dbFName, pStb->db, TSDB_DB_FNAME_LEN);
×
UNCOV
1256
  tstrncpy(rsp->tbName, pStb->name + strlen(pStb->db) + 1, TSDB_TABLE_NAME_LEN);
×
UNCOV
1257
  rsp->suid = pStb->uid;
×
UNCOV
1258
  rsp->version = pStb->smaVer;
×
UNCOV
1259
  mndReleaseStb(pMnode, pStb);
×
1260

UNCOV
1261
  while (1) {
×
UNCOV
1262
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
×
UNCOV
1263
    if (pIter == NULL) break;
×
1264

UNCOV
1265
    if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
×
UNCOV
1266
      sdbRelease(pSdb, pSma);
×
UNCOV
1267
      continue;
×
1268
    }
1269

UNCOV
1270
    info.intervalUnit = pSma->intervalUnit;
×
UNCOV
1271
    info.slidingUnit = pSma->slidingUnit;
×
UNCOV
1272
    info.interval = pSma->interval;
×
UNCOV
1273
    info.offset = pSma->offset;
×
UNCOV
1274
    info.sliding = pSma->sliding;
×
UNCOV
1275
    info.dstTbUid = pSma->dstTbUid;
×
UNCOV
1276
    info.dstVgId = pSma->dstVgId;
×
1277

UNCOV
1278
    SVgObj *pVg = mndAcquireVgroup(pMnode, pSma->dstVgId);
×
UNCOV
1279
    if (pVg == NULL) {
×
UNCOV
1280
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1281
      if (terrno != 0) code = terrno;
×
UNCOV
1282
      sdbRelease(pSdb, pSma);
×
UNCOV
1283
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
1284
      return code;
×
1285
    }
UNCOV
1286
    info.epSet = mndGetVgroupEpset(pMnode, pVg);
×
1287

UNCOV
1288
    info.expr = taosMemoryMalloc(pSma->exprLen + 1);
×
UNCOV
1289
    if (info.expr == NULL) {
×
1290
      code = terrno;
×
1291
      sdbRelease(pSdb, pSma);
×
1292
      sdbCancelFetch(pSdb, pIter);
×
1293
      return code;
×
1294
    }
1295

UNCOV
1296
    memcpy(info.expr, pSma->expr, pSma->exprLen);
×
UNCOV
1297
    info.expr[pSma->exprLen] = 0;
×
1298

UNCOV
1299
    if (NULL == taosArrayPush(rsp->pIndex, &info)) {
×
1300
      code = terrno;
×
1301
      taosMemoryFree(info.expr);
×
1302
      sdbRelease(pSdb, pSma);
×
1303
      sdbCancelFetch(pSdb, pIter);
×
1304
      return code;
×
1305
    }
1306

UNCOV
1307
    rsp->indexSize += sizeof(info) + pSma->exprLen + 1;
×
UNCOV
1308
    *exist = true;
×
1309

UNCOV
1310
    sdbRelease(pSdb, pSma);
×
1311
  }
1312

UNCOV
1313
  TAOS_RETURN(code);
×
1314
}
1315

1316
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
×
1317
  SUserIndexReq indexReq = {0};
×
1318
  SMnode       *pMnode = pReq->info.node;
×
1319
  int32_t       code = -1;
×
1320
  SUserIndexRsp rsp = {0};
×
1321
  bool          exist = false;
×
1322

1323
  TAOS_CHECK_GOTO(tDeserializeSUserIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
1324

1325
  code = mndGetSma(pMnode, &indexReq, &rsp, &exist);
×
1326
  if (code) {
×
1327
    goto _OVER;
×
1328
  }
1329

1330
  if (!exist) {
×
1331
    // TODO GET INDEX FROM FULLTEXT
1332
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
1333
  } else {
1334
    int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
×
1335
    void   *pRsp = rpcMallocCont(contLen);
×
1336
    if (pRsp == NULL) {
×
1337
      code = terrno;
×
1338
      goto _OVER;
×
1339
    }
1340

1341
    contLen = tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
×
1342
    if (contLen < 0) {
×
1343
      code = terrno;
×
1344
      goto _OVER;
×
1345
    }
1346

1347
    pReq->info.rsp = pRsp;
×
1348
    pReq->info.rspLen = contLen;
×
1349

1350
    code = 0;
×
1351
  }
1352

1353
_OVER:
×
1354
  if (code != 0) {
×
1355
    mError("failed to get index %s since %s", indexReq.indexFName, tstrerror(code));
×
1356
  }
1357

1358
  TAOS_RETURN(code);
×
1359
}
1360

UNCOV
1361
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
×
UNCOV
1362
  STableIndexReq indexReq = {0};
×
UNCOV
1363
  SMnode        *pMnode = pReq->info.node;
×
UNCOV
1364
  int32_t        code = -1;
×
UNCOV
1365
  STableIndexRsp rsp = {0};
×
UNCOV
1366
  bool           exist = false;
×
1367

UNCOV
1368
  TAOS_CHECK_GOTO(tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq), NULL, _OVER);
×
1369

UNCOV
1370
  rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
×
UNCOV
1371
  if (NULL == rsp.pIndex) {
×
1372
    code = terrno;
×
1373
    goto _OVER;
×
1374
  }
1375

UNCOV
1376
  code = mndGetTableSma(pMnode, indexReq.tbFName, &rsp, &exist);
×
UNCOV
1377
  if (code) {
×
UNCOV
1378
    goto _OVER;
×
1379
  }
1380

UNCOV
1381
  if (!exist) {
×
UNCOV
1382
    code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
×
1383
  } else {
UNCOV
1384
    int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp);
×
UNCOV
1385
    void   *pRsp = rpcMallocCont(contLen);
×
UNCOV
1386
    if (pRsp == NULL) {
×
1387
      code = terrno;
×
1388
      goto _OVER;
×
1389
    }
1390

UNCOV
1391
    contLen = tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
×
UNCOV
1392
    if (contLen < 0) {
×
1393
      code = terrno;
×
1394
      goto _OVER;
×
1395
    }
1396

UNCOV
1397
    pReq->info.rsp = pRsp;
×
UNCOV
1398
    pReq->info.rspLen = contLen;
×
1399

UNCOV
1400
    code = 0;
×
1401
  }
1402

UNCOV
1403
_OVER:
×
UNCOV
1404
  if (code != 0) {
×
UNCOV
1405
    mError("failed to get table index %s since %s", indexReq.tbFName, tstrerror(code));
×
1406
  }
1407

UNCOV
1408
  tFreeSerializeSTableIndexRsp(&rsp);
×
UNCOV
1409
  TAOS_RETURN(code);
×
1410
}
1411

1412
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
16✔
1413
  SMnode  *pMnode = pReq->info.node;
16✔
1414
  SSdb    *pSdb = pMnode->pSdb;
16✔
1415
  int32_t  numOfRows = 0;
16✔
1416
  SSmaObj *pSma = NULL;
16✔
1417
  int32_t  cols = 0;
16✔
1418
  int32_t  code = 0;
16✔
1419

1420
  SDbObj *pDb = NULL;
16✔
1421
  if (strlen(pShow->db) > 0) {
16!
UNCOV
1422
    pDb = mndAcquireDb(pMnode, pShow->db);
×
UNCOV
1423
    if (pDb == NULL) return 0;
×
1424
  }
1425
  SSmaAndTagIter *pIter = pShow->pIter;
16✔
1426
  while (numOfRows < rows) {
16!
1427
    pIter->pSmaIter = sdbFetch(pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
16✔
1428
    if (pIter->pSmaIter == NULL) break;
16!
1429

UNCOV
1430
    if (NULL != pDb && pSma->dbUid != pDb->uid) {
×
1431
      sdbRelease(pSdb, pSma);
×
1432
      continue;
×
1433
    }
1434

UNCOV
1435
    cols = 0;
×
1436

UNCOV
1437
    SName smaName = {0};
×
UNCOV
1438
    SName stbName = {0};
×
UNCOV
1439
    char  n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1440
    char  n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1441
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1442
    char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1443
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1444
      STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
×
UNCOV
1445
      STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db));
×
UNCOV
1446
      code = tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
1447
    }
UNCOV
1448
    SColumnInfoData *pColInfo = NULL;
×
UNCOV
1449
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1450
      STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
×
1451

UNCOV
1452
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1453
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
×
1454
    }
UNCOV
1455
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1456
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1457
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
×
1458
    }
UNCOV
1459
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1460
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1461
      code = colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
×
1462
    }
UNCOV
1463
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1464
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1465
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
×
1466
    }
UNCOV
1467
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1468
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1469
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
×
1470
    }
1471

UNCOV
1472
    char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1473
    STR_TO_VARSTR(col, (char *)"");
×
1474

UNCOV
1475
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1476
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1477
      code = colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
×
1478
    }
1479

UNCOV
1480
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1481
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1482

UNCOV
1483
      char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1484
      STR_TO_VARSTR(tag, (char *)"sma_index");
×
UNCOV
1485
      code = colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
×
1486
    }
1487

UNCOV
1488
    numOfRows++;
×
UNCOV
1489
    sdbRelease(pSdb, pSma);
×
UNCOV
1490
    if (TSDB_CODE_SUCCESS != code) {
×
1491
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
1492
      numOfRows = -1;
×
1493
      break;
×
1494
    }
1495
  }
1496

1497
  mndReleaseDb(pMnode, pDb);
16✔
1498
  pShow->numOfRows += numOfRows;
16✔
1499
  return numOfRows;
16✔
1500
}
1501

1502
// sma and tag index comm func
1503
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
12✔
1504
  int ret = mndProcessDropSmaReq(pReq);
12✔
1505
  if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST) {
12!
1506
    terrno = 0;
12✔
1507
    ret = mndProcessDropTagIdxReq(pReq);
12✔
1508
  }
1509
  return ret;
12✔
1510
}
1511

1512
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
16✔
1513
  if (pShow->pIter == NULL) {
16!
1514
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
16!
1515
  }
1516
  if (!pShow->pIter) {
16!
1517
    return terrno;
×
1518
  }
1519
  int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
16✔
1520
  if (read < rows) {
16!
1521
    read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read);
16✔
1522
  }
1523
  // no more to read
1524
  if (read < rows) {
16!
1525
    taosMemoryFree(pShow->pIter);
16!
1526
    pShow->pIter = NULL;
16✔
1527
  }
1528
  return read;
16✔
1529
}
1530
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
×
1531
  SSmaAndTagIter *p = pIter;
×
1532
  if (p != NULL) {
×
1533
    SSdb *pSdb = pMnode->pSdb;
×
1534
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
1535
    sdbCancelFetchByType(pSdb, p->pIdxIter, SDB_IDX);
×
1536
  }
1537
  taosMemoryFree(p);
×
1538
}
×
1539

UNCOV
1540
static void initSMAObj(SCreateTSMACxt *pCxt) {
×
UNCOV
1541
  memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
×
UNCOV
1542
  memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
×
UNCOV
1543
  memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
×
UNCOV
1544
  if (pCxt->pBaseSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pBaseSma->name, TSDB_TABLE_FNAME_LEN);
×
UNCOV
1545
  pCxt->pSma->createdTime = taosGetTimestampMs();
×
UNCOV
1546
  pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
×
1547

UNCOV
1548
  memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
1549
  pCxt->pSma->dstTbUid = 0;  // not used
×
UNCOV
1550
  pCxt->pSma->stbUid = pCxt->pSrcStb ? pCxt->pSrcStb->uid : pCxt->pCreateSmaReq->normSourceTbUid;
×
UNCOV
1551
  pCxt->pSma->dbUid = pCxt->pDb->uid;
×
UNCOV
1552
  pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
×
UNCOV
1553
  pCxt->pSma->intervalUnit = pCxt->pCreateSmaReq->intervalUnit;
×
1554
//  pCxt->pSma->timezone = taosGetLocalTimezoneOffset();
UNCOV
1555
  pCxt->pSma->version = 1;
×
1556

UNCOV
1557
  pCxt->pSma->exprLen = pCxt->pCreateSmaReq->exprLen;
×
UNCOV
1558
  pCxt->pSma->sqlLen = pCxt->pCreateSmaReq->sqlLen;
×
UNCOV
1559
  pCxt->pSma->astLen = pCxt->pCreateSmaReq->astLen;
×
UNCOV
1560
  pCxt->pSma->expr = pCxt->pCreateSmaReq->expr;
×
UNCOV
1561
  pCxt->pSma->sql = pCxt->pCreateSmaReq->sql;
×
UNCOV
1562
  pCxt->pSma->ast = pCxt->pCreateSmaReq->ast;
×
UNCOV
1563
}
×
1564

UNCOV
1565
static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
×
UNCOV
1566
  tstrncpy(pCxt->pCreateStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
×
UNCOV
1567
  tstrncpy(pCxt->pCreateStreamReq->sourceDB, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
×
UNCOV
1568
  tstrncpy(pCxt->pCreateStreamReq->targetStbFullName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
1569
  pCxt->pCreateStreamReq->igExists = false;
×
UNCOV
1570
  pCxt->pCreateStreamReq->triggerType = STREAM_TRIGGER_MAX_DELAY;
×
UNCOV
1571
  pCxt->pCreateStreamReq->igExpired = false;
×
UNCOV
1572
  pCxt->pCreateStreamReq->fillHistory = STREAM_FILL_HISTORY_ON;
×
UNCOV
1573
  pCxt->pCreateStreamReq->maxDelay = 10000;
×
UNCOV
1574
  pCxt->pCreateStreamReq->watermark = 0;
×
UNCOV
1575
  pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb ? pCxt->pSrcStb->numOfTags + 1 : 1;
×
UNCOV
1576
  pCxt->pCreateStreamReq->checkpointFreq = 0;
×
UNCOV
1577
  pCxt->pCreateStreamReq->createStb = 1;
×
UNCOV
1578
  pCxt->pCreateStreamReq->targetStbUid = 0;
×
UNCOV
1579
  pCxt->pCreateStreamReq->fillNullCols = NULL;
×
UNCOV
1580
  pCxt->pCreateStreamReq->igUpdate = 0;
×
UNCOV
1581
  pCxt->pCreateStreamReq->deleteMark = pCxt->pCreateSmaReq->deleteMark;
×
UNCOV
1582
  pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
×
UNCOV
1583
  pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid;
×
UNCOV
1584
  pCxt->pCreateStreamReq->ast = taosStrdup(pCxt->pCreateSmaReq->ast);
×
UNCOV
1585
  if (!pCxt->pCreateStreamReq->ast) {
×
1586
    return terrno;
×
1587
  }
UNCOV
1588
  pCxt->pCreateStreamReq->sql = taosStrdup(pCxt->pCreateSmaReq->sql);
×
UNCOV
1589
  if (!pCxt->pCreateStreamReq->sql) {
×
1590
    return terrno;
×
1591
  }
1592

1593
  // construct tags
UNCOV
1594
  pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pCreateStreamReq->numOfTags, sizeof(SField));
×
UNCOV
1595
  if (!pCxt->pCreateStreamReq->pTags) {
×
1596
    return terrno;
×
1597
  }
UNCOV
1598
  SField  f = {0};
×
UNCOV
1599
  int32_t code = 0;
×
UNCOV
1600
  if (pCxt->pSrcStb) {
×
UNCOV
1601
    for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) {
×
UNCOV
1602
      SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
×
UNCOV
1603
      f.bytes = pSchema->bytes;
×
UNCOV
1604
      f.type = pSchema->type;
×
UNCOV
1605
      f.flags = pSchema->flags;
×
UNCOV
1606
      tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN);
×
UNCOV
1607
      if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
×
1608
        code = terrno;
×
1609
        break;
×
1610
      }
1611
    }
1612
  }
1613

UNCOV
1614
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1615
    f.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
×
UNCOV
1616
    f.flags = COL_SMA_ON;
×
UNCOV
1617
    f.type = TSDB_DATA_TYPE_BINARY;
×
UNCOV
1618
    tstrncpy(f.name, "tbname", strlen("tbname") + 1);
×
UNCOV
1619
    if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
×
1620
      code = terrno;
×
1621
    }
1622
  }
1623

UNCOV
1624
  if (TSDB_CODE_SUCCESS == code) {
×
1625
    // construct output cols
1626
    SNode *pNode;
UNCOV
1627
    FOREACH(pNode, pCxt->pProjects) {
×
UNCOV
1628
      SExprNode *pExprNode = (SExprNode *)pNode;
×
UNCOV
1629
      f.bytes = pExprNode->resType.bytes;
×
UNCOV
1630
      f.type = pExprNode->resType.type;
×
UNCOV
1631
      f.flags = COL_SMA_ON;
×
UNCOV
1632
      tstrncpy(f.name, pExprNode->userAlias, TSDB_COL_NAME_LEN);
×
UNCOV
1633
      if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pCols, &f)) {
×
1634
        code = terrno;
×
1635
        break;
×
1636
      }
1637
    }
1638
  }
UNCOV
1639
  return code;
×
1640
}
1641

UNCOV
1642
static int32_t mndCreateTSMABuildDropStreamReq(SCreateTSMACxt *pCxt) {
×
UNCOV
1643
  tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
×
UNCOV
1644
  pCxt->pDropStreamReq->igNotExists = false;
×
UNCOV
1645
  pCxt->pDropStreamReq->sql = taosStrdup(pCxt->pDropSmaReq->name);
×
UNCOV
1646
  if (!pCxt->pDropStreamReq->sql) {
×
1647
    return terrno;
×
1648
  }
UNCOV
1649
  pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
×
UNCOV
1650
  return TSDB_CODE_SUCCESS;
×
1651
}
1652

UNCOV
1653
static int32_t mndSetUpdateDbTsmaVersionPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
×
UNCOV
1654
  int32_t  code = 0;
×
UNCOV
1655
  SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
×
UNCOV
1656
  if (pRedoRaw == NULL) {
×
1657
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1658
    if (terrno != 0) code = terrno;
×
1659
    TAOS_RETURN(code);
×
1660
  }
UNCOV
1661
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
×
1662
    sdbFreeRaw(pRedoRaw);
×
1663
    TAOS_RETURN(code);
×
1664
  }
1665

UNCOV
1666
  TAOS_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
×
1667
}
1668

UNCOV
1669
static int32_t mndSetUpdateDbTsmaVersionCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
×
UNCOV
1670
  int32_t  code = 0;
×
UNCOV
1671
  SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
×
UNCOV
1672
  if (pCommitRaw == NULL) {
×
1673
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1674
    if (terrno != 0) code = terrno;
×
1675
    TAOS_RETURN(code);
×
1676
  }
UNCOV
1677
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
1678
    sdbFreeRaw(pCommitRaw);
×
1679
    TAOS_RETURN(code);
×
1680
  }
1681

UNCOV
1682
  TAOS_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
1683
}
1684

UNCOV
1685
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt *pCxt) {
×
UNCOV
1686
  int32_t      code = -1;
×
UNCOV
1687
  STransAction createStreamRedoAction = {0};
×
UNCOV
1688
  STransAction createStreamUndoAction = {0};
×
UNCOV
1689
  STransAction dropStbUndoAction = {0};
×
UNCOV
1690
  SMDropStbReq dropStbReq = {0};
×
UNCOV
1691
  STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
×
UNCOV
1692
  if (!pTrans) {
×
1693
    code = terrno;
×
1694
    goto _OVER;
×
1695
  }
UNCOV
1696
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
×
UNCOV
1697
  TAOS_CHECK_GOTO(mndTransCheckConflict(pCxt->pMnode, pTrans), NULL, _OVER);
×
1698

UNCOV
1699
  mndTransSetSerial(pTrans);
×
UNCOV
1700
  mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name,
×
1701
        pCxt->pCreateStreamReq->name);
1702

UNCOV
1703
  mndGetMnodeEpSet(pCxt->pMnode, &createStreamRedoAction.epSet);
×
UNCOV
1704
  createStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
UNCOV
1705
  createStreamRedoAction.msgType = TDMT_STREAM_CREATE;
×
UNCOV
1706
  createStreamRedoAction.contLen = tSerializeSCMCreateStreamReq(0, 0, pCxt->pCreateStreamReq);
×
UNCOV
1707
  createStreamRedoAction.pCont = taosMemoryCalloc(1, createStreamRedoAction.contLen);
×
UNCOV
1708
  if (!createStreamRedoAction.pCont) {
×
1709
    code = terrno;
×
1710
    goto _OVER;
×
1711
  }
UNCOV
1712
  if (createStreamRedoAction.contLen != tSerializeSCMCreateStreamReq(createStreamRedoAction.pCont,
×
1713
                                                                     createStreamRedoAction.contLen,
UNCOV
1714
                                                                     pCxt->pCreateStreamReq)) {
×
1715
    mError("sma: %s, failed to create due to create stream req encode failure", pCxt->pCreateSmaReq->name);
×
1716
    code = TSDB_CODE_INVALID_MSG;
×
1717
    goto _OVER;
×
1718
  }
1719

UNCOV
1720
  createStreamUndoAction.epSet = createStreamRedoAction.epSet;
×
UNCOV
1721
  createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
UNCOV
1722
  createStreamUndoAction.msgType = TDMT_STREAM_DROP;
×
UNCOV
1723
  createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
×
UNCOV
1724
  createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
×
UNCOV
1725
  if (!createStreamUndoAction.pCont) {
×
1726
    code = terrno;
×
1727
    goto _OVER;
×
1728
  }
UNCOV
1729
  if (createStreamUndoAction.contLen !=
×
UNCOV
1730
      tSerializeSMDropStreamReq(createStreamUndoAction.pCont, createStreamUndoAction.contLen, pCxt->pDropStreamReq)) {
×
1731
    mError("sma: %s, failed to create due to drop stream req encode failure", pCxt->pCreateSmaReq->name);
×
1732
    code = TSDB_CODE_INVALID_MSG;
×
1733
    goto _OVER;
×
1734
  }
1735

UNCOV
1736
  dropStbReq.igNotExists = true;
×
UNCOV
1737
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
1738
  dropStbUndoAction.epSet = createStreamRedoAction.epSet;
×
UNCOV
1739
  dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
×
UNCOV
1740
  dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
×
UNCOV
1741
  dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
×
UNCOV
1742
  dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
×
UNCOV
1743
  dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
×
UNCOV
1744
  if (!dropStbUndoAction.pCont) {
×
1745
    code = terrno;
×
1746
    goto _OVER;
×
1747
  }
UNCOV
1748
  if (dropStbUndoAction.contLen !=
×
UNCOV
1749
      tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
×
1750
    mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
×
1751
    code = TSDB_CODE_INVALID_MSG;
×
1752
    goto _OVER;
×
1753
  }
1754

UNCOV
1755
  SDbObj newDb = {0};
×
UNCOV
1756
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
×
UNCOV
1757
  newDb.tsmaVersion++;
×
UNCOV
1758
  TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
UNCOV
1759
  TAOS_CHECK_GOTO(mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
UNCOV
1760
  TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
UNCOV
1761
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &createStreamRedoAction), NULL, _OVER);
×
UNCOV
1762
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &createStreamUndoAction), NULL, _OVER);
×
UNCOV
1763
  TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &dropStbUndoAction), NULL, _OVER);
×
UNCOV
1764
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
UNCOV
1765
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
UNCOV
1766
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
×
1767

UNCOV
1768
  code = TSDB_CODE_SUCCESS;
×
1769

UNCOV
1770
_OVER:
×
UNCOV
1771
  mndTransDrop(pTrans);
×
UNCOV
1772
  TAOS_RETURN(code);
×
1773
}
1774

UNCOV
1775
static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
×
UNCOV
1776
  int32_t            code = 0;
×
UNCOV
1777
  SSmaObj            sma = {0};
×
UNCOV
1778
  SCMCreateStreamReq createStreamReq = {0};
×
UNCOV
1779
  SMDropStreamReq    dropStreamReq = {0};
×
1780

UNCOV
1781
  pCxt->pSma = &sma;
×
UNCOV
1782
  initSMAObj(pCxt);
×
1783

UNCOV
1784
  SNodeList *pProjects = NULL;
×
UNCOV
1785
  code = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects);
×
UNCOV
1786
  if (TSDB_CODE_SUCCESS != code) {
×
1787
    goto _OVER;
×
1788
  }
UNCOV
1789
  pCxt->pProjects = pProjects;
×
1790

UNCOV
1791
  pCxt->pCreateStreamReq = &createStreamReq;
×
UNCOV
1792
  if (pCxt->pCreateSmaReq->pVgroupVerList) {
×
UNCOV
1793
    pCxt->pCreateStreamReq->pVgroupVerList = taosArrayDup(pCxt->pCreateSmaReq->pVgroupVerList, NULL);
×
UNCOV
1794
    if (!pCxt->pCreateStreamReq->pVgroupVerList) {
×
1795
      code = terrno;
×
1796
      goto _OVER;
×
1797
    }
1798
  }
UNCOV
1799
  if (LIST_LENGTH(pProjects) > 0) {
×
UNCOV
1800
    createStreamReq.pCols = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SField));
×
UNCOV
1801
    if (!createStreamReq.pCols) {
×
1802
      code = terrno;
×
1803
      goto _OVER;
×
1804
    }
1805
  }
UNCOV
1806
  pCxt->pDropStreamReq = &dropStreamReq;
×
UNCOV
1807
  code = mndCreateTSMABuildCreateStreamReq(pCxt);
×
UNCOV
1808
  if (TSDB_CODE_SUCCESS != code) {
×
1809
    goto _OVER;
×
1810
  }
UNCOV
1811
  code = mndCreateTSMABuildDropStreamReq(pCxt);
×
UNCOV
1812
  if (TSDB_CODE_SUCCESS != code) {
×
1813
    goto _OVER;
×
1814
  }
1815

UNCOV
1816
  if (TSDB_CODE_SUCCESS != (code = mndCreateTSMATxnPrepare(pCxt))) {
×
1817
    goto _OVER;
×
1818
  } else {
UNCOV
1819
    mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 " dstTb:%s dstVg:%d", pCxt->pCreateSmaReq->name, sma.uid,
×
1820
          sma.stbUid, sma.dstTbName, sma.dstVgId);
UNCOV
1821
    code = 0;
×
1822
  }
1823

UNCOV
1824
_OVER:
×
UNCOV
1825
  tFreeSCMCreateStreamReq(pCxt->pCreateStreamReq);
×
UNCOV
1826
  if (pCxt->pDropStreamReq) tFreeMDropStreamReq(pCxt->pDropStreamReq);
×
UNCOV
1827
  pCxt->pCreateStreamReq = NULL;
×
UNCOV
1828
  if (pProjects) nodesDestroyList(pProjects);
×
UNCOV
1829
  pCxt->pProjects = NULL;
×
UNCOV
1830
  TAOS_RETURN(code);
×
1831
}
1832

UNCOV
1833
static int32_t mndTSMAGenerateOutputName(const char *tsmaName, char *streamName, char *targetStbName) {
×
1834
  SName   smaName;
UNCOV
1835
  int32_t code = tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1836
  if (TSDB_CODE_SUCCESS != code) {
×
1837
    return code;
×
1838
  }
UNCOV
1839
  snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
×
UNCOV
1840
  snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s"TSMA_RES_STB_POSTFIX, tsmaName);
×
UNCOV
1841
  return TSDB_CODE_SUCCESS;
×
1842
}
1843

UNCOV
1844
static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq) {
×
1845
#ifdef WINDOWS
1846
  TAOS_RETURN(TSDB_CODE_MND_INVALID_PLATFORM);
1847
#endif
UNCOV
1848
  SMnode        *pMnode = pReq->info.node;
×
UNCOV
1849
  int32_t        code = -1;
×
UNCOV
1850
  SDbObj        *pDb = NULL;
×
UNCOV
1851
  SStbObj       *pStb = NULL;
×
UNCOV
1852
  SSmaObj       *pSma = NULL;
×
UNCOV
1853
  SSmaObj       *pBaseTsma = NULL;
×
UNCOV
1854
  SStreamObj    *pStream = NULL;
×
UNCOV
1855
  int64_t        mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
×
UNCOV
1856
  SMCreateSmaReq createReq = {0};
×
1857

UNCOV
1858
  if (sdbGetSize(pMnode->pSdb, SDB_SMA) >= tsMaxTsmaNum) {
×
1859
    code = TSDB_CODE_MND_MAX_TSMA_NUM_EXCEEDED;
×
1860
    goto _OVER;
×
1861
  }
1862

UNCOV
1863
  if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
×
1864
    code = TSDB_CODE_INVALID_MSG;
×
1865
    goto _OVER;
×
1866
  }
1867

UNCOV
1868
  mInfo("start to create tsma: %s", createReq.name);
×
UNCOV
1869
  if ((code = mndCheckCreateSmaReq(&createReq)) != 0) goto _OVER;
×
1870

UNCOV
1871
  if (createReq.normSourceTbUid == 0) {
×
UNCOV
1872
    pStb = mndAcquireStb(pMnode, createReq.stb);
×
UNCOV
1873
    if (!pStb && !createReq.recursiveTsma) {
×
1874
      mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
×
1875
      code = TSDB_CODE_MND_STB_NOT_EXIST;
×
1876
      goto _OVER;
×
1877
    }
1878
  }
1879

UNCOV
1880
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1881
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1882
  code = mndTSMAGenerateOutputName(createReq.name, streamName, streamTargetStbFullName);
×
UNCOV
1883
  if (TSDB_CODE_SUCCESS != code) {
×
1884
    mInfo("tsma:%s, faield to generate name", createReq.name);
×
1885
    goto _OVER;
×
1886
  }
1887

UNCOV
1888
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.name);
×
UNCOV
1889
  if (pSma && createReq.igExists) {
×
1890
    mInfo("tsma:%s, already exists in sma:%s, ignore exist is set", createReq.name, pSma->name);
×
1891
    code = 0;
×
1892
    goto _OVER;
×
1893
  }
1894

UNCOV
1895
  if (pSma) {
×
UNCOV
1896
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
UNCOV
1897
    goto _OVER;
×
1898
  }
1899

UNCOV
1900
  SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName);
×
UNCOV
1901
  if (pTargetStb) {
×
1902
    code = TSDB_CODE_TDB_STB_ALREADY_EXIST;
×
1903
    mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name,
×
1904
           streamTargetStbFullName);
1905
    goto _OVER;
×
1906
  }
1907

UNCOV
1908
  code = mndAcquireStream(pMnode, streamName, &pStream);
×
UNCOV
1909
  if (pStream != NULL || code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
×
UNCOV
1910
    mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
×
UNCOV
1911
    code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
×
UNCOV
1912
    goto _OVER;
×
1913
  }
1914

UNCOV
1915
  SName name = {0};
×
UNCOV
1916
  code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
1917
  if (TSDB_CODE_SUCCESS != code) {
×
1918
    goto _OVER;
×
1919
  }
UNCOV
1920
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1921
  (void)tNameGetFullDbName(&name, db);
×
1922

UNCOV
1923
  pDb = mndAcquireDb(pMnode, db);
×
UNCOV
1924
  if (pDb == NULL) {
×
1925
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
1926
    goto _OVER;
×
1927
  }
1928

UNCOV
1929
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
×
1930

UNCOV
1931
  if (createReq.recursiveTsma) {
×
UNCOV
1932
    pBaseTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName);
×
UNCOV
1933
    if (!pBaseTsma) {
×
1934
      mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName);
×
1935
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
1936
      goto _OVER;
×
1937
    }
UNCOV
1938
    if (!pStb) {
×
1939
      createReq.normSourceTbUid = pBaseTsma->stbUid;
×
1940
    }
1941
  }
1942

UNCOV
1943
  SCreateTSMACxt cxt = {
×
1944
      .pMnode = pMnode,
1945
      .pCreateSmaReq = &createReq,
1946
      .pCreateStreamReq = NULL,
1947
      .streamName = streamName,
1948
      .targetStbFullName = streamTargetStbFullName,
1949
      .pDb = pDb,
1950
      .pRpcReq = pReq,
1951
      .pSma = NULL,
1952
      .pBaseSma = pBaseTsma,
1953
      .pSrcStb = pStb,
1954
  };
1955

UNCOV
1956
  code = mndCreateTSMA(&cxt);
×
UNCOV
1957
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1958

1959
_OVER:
×
UNCOV
1960
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
1961
    mError("tsma:%s, failed to create since %s", createReq.name, tstrerror(code));
×
1962
  }
1963

UNCOV
1964
  if (pStb) mndReleaseStb(pMnode, pStb);
×
UNCOV
1965
  if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
UNCOV
1966
  mndReleaseSma(pMnode, pSma);
×
UNCOV
1967
  mndReleaseStream(pMnode, pStream);
×
UNCOV
1968
  mndReleaseDb(pMnode, pDb);
×
UNCOV
1969
  tFreeSMCreateSmaReq(&createReq);
×
1970

UNCOV
1971
  TAOS_RETURN(code);
×
1972
}
1973

UNCOV
1974
static int32_t mndDropTSMA(SCreateTSMACxt *pCxt) {
×
UNCOV
1975
  int32_t      code = -1;
×
UNCOV
1976
  STransAction dropStreamRedoAction = {0};
×
UNCOV
1977
  STrans      *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "drop-tsma");
×
UNCOV
1978
  if (!pTrans) {
×
1979
    code = terrno;
×
1980
    goto _OVER;
×
1981
  }
UNCOV
1982
  SMDropStreamReq dropStreamReq = {0};
×
UNCOV
1983
  pCxt->pDropStreamReq = &dropStreamReq;
×
UNCOV
1984
  code = mndCreateTSMABuildDropStreamReq(pCxt);
×
UNCOV
1985
  if (TSDB_CODE_SUCCESS != code) {
×
1986
    goto _OVER;
×
1987
  }
UNCOV
1988
  mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
×
UNCOV
1989
  if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
×
UNCOV
1990
  mndTransSetSerial(pTrans);
×
UNCOV
1991
  mndGetMnodeEpSet(pCxt->pMnode, &dropStreamRedoAction.epSet);
×
UNCOV
1992
  dropStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
UNCOV
1993
  dropStreamRedoAction.msgType = TDMT_STREAM_DROP;
×
UNCOV
1994
  dropStreamRedoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
×
UNCOV
1995
  dropStreamRedoAction.pCont = taosMemoryCalloc(1, dropStreamRedoAction.contLen);
×
UNCOV
1996
  if (!dropStreamRedoAction.pCont) {
×
1997
    code = terrno;
×
1998
    goto _OVER;
×
1999
  }
UNCOV
2000
  if (dropStreamRedoAction.contLen !=
×
UNCOV
2001
      tSerializeSMDropStreamReq(dropStreamRedoAction.pCont, dropStreamRedoAction.contLen, pCxt->pDropStreamReq)) {
×
2002
    mError("tsma: %s, failed to drop due to drop stream req encode failure", pCxt->pDropSmaReq->name);
×
2003
    code = TSDB_CODE_INVALID_MSG;
×
2004
    goto _OVER;
×
2005
  }
2006

2007
  // output stable is not dropped when dropping stream, dropping it when dropping tsma
UNCOV
2008
  SMDropStbReq dropStbReq = {0};
×
UNCOV
2009
  dropStbReq.igNotExists = false;
×
UNCOV
2010
  tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
2011
  dropStbReq.sql = "drop";
×
UNCOV
2012
  dropStbReq.sqlLen = 5;
×
2013

UNCOV
2014
  STransAction dropStbRedoAction = {0};
×
UNCOV
2015
  mndGetMnodeEpSet(pCxt->pMnode, &dropStbRedoAction.epSet);
×
UNCOV
2016
  dropStbRedoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
×
UNCOV
2017
  dropStbRedoAction.msgType = TDMT_MND_STB_DROP;
×
UNCOV
2018
  dropStbRedoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
×
UNCOV
2019
  dropStbRedoAction.pCont = taosMemoryCalloc(1, dropStbRedoAction.contLen);
×
UNCOV
2020
  if (!dropStbRedoAction.pCont) {
×
2021
    code = terrno;
×
2022
    goto _OVER;
×
2023
  }
UNCOV
2024
  if (dropStbRedoAction.contLen !=
×
UNCOV
2025
      tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
×
2026
    mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name);
×
2027
    code = TSDB_CODE_INVALID_MSG;
×
2028
    goto _OVER;
×
2029
  }
2030

UNCOV
2031
  SDbObj newDb = {0};
×
UNCOV
2032
  memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
×
UNCOV
2033
  newDb.tsmaVersion++;
×
UNCOV
2034
  TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
UNCOV
2035
  TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
×
UNCOV
2036
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStreamRedoAction), NULL, _OVER);
×
UNCOV
2037
  TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStbRedoAction), NULL, _OVER);
×
UNCOV
2038
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
UNCOV
2039
  TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
×
UNCOV
2040
  TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
×
UNCOV
2041
  code = TSDB_CODE_SUCCESS;
×
UNCOV
2042
_OVER:
×
UNCOV
2043
  tFreeMDropStreamReq(pCxt->pDropStreamReq);
×
UNCOV
2044
  mndTransDrop(pTrans);
×
UNCOV
2045
  TAOS_RETURN(code);
×
2046
}
2047

UNCOV
2048
static bool hasRecursiveTsmasBasedOnMe(SMnode *pMnode, const SSmaObj *pSma) {
×
UNCOV
2049
  SSmaObj *pSmaObj = NULL;
×
UNCOV
2050
  void    *pIter = NULL;
×
2051
  while (1) {
UNCOV
2052
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj);
×
UNCOV
2053
    if (pIter == NULL) break;
×
UNCOV
2054
    if (0 == strncmp(pSmaObj->baseSmaName, pSma->name, TSDB_TABLE_FNAME_LEN)) {
×
UNCOV
2055
      sdbRelease(pMnode->pSdb, pSmaObj);
×
UNCOV
2056
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
2057
      return true;
×
2058
    }
UNCOV
2059
    sdbRelease(pMnode->pSdb, pSmaObj);
×
2060
  }
UNCOV
2061
  return false;
×
2062
}
2063

UNCOV
2064
static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq) {
×
UNCOV
2065
  int32_t      code = -1;
×
UNCOV
2066
  SMDropSmaReq dropReq = {0};
×
UNCOV
2067
  SSmaObj     *pSma = NULL;
×
UNCOV
2068
  SDbObj      *pDb = NULL;
×
UNCOV
2069
  SMnode      *pMnode = pReq->info.node;
×
UNCOV
2070
  if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) {
×
2071
    code = TSDB_CODE_INVALID_MSG;
×
2072
    goto _OVER;
×
2073
  }
2074

UNCOV
2075
  char streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
2076
  char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
2077
  code = mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
×
UNCOV
2078
  if (TSDB_CODE_SUCCESS != code) {
×
2079
    goto _OVER;
×
2080
  }
2081

UNCOV
2082
  SStbObj *pStb = mndAcquireStb(pMnode, streamTargetStbFullName);
×
2083

UNCOV
2084
  pSma = mndAcquireSma(pMnode, dropReq.name);
×
UNCOV
2085
  if (!pSma && dropReq.igNotExists) {
×
2086
    code = 0;
×
2087
    goto _OVER;
×
2088
  }
UNCOV
2089
  if (!pSma) {
×
2090
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
2091
    goto _OVER;
×
2092
  }
UNCOV
2093
  SName name = {0};
×
UNCOV
2094
  code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
2095
  if (TSDB_CODE_SUCCESS != code) {
×
2096
    goto _OVER;
×
2097
  }
UNCOV
2098
  char db[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
2099
  (void)tNameGetFullDbName(&name, db);
×
2100

UNCOV
2101
  pDb = mndAcquireDb(pMnode, db);
×
UNCOV
2102
  if (!pDb) {
×
2103
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
2104
    goto _OVER;
×
2105
  }
2106

UNCOV
2107
  if ((code = mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb)) != 0) {
×
2108
    goto _OVER;
×
2109
  }
2110

UNCOV
2111
  if (hasRecursiveTsmasBasedOnMe(pMnode, pSma)) {
×
UNCOV
2112
    code = TSDB_CODE_MND_INVALID_DROP_TSMA;
×
UNCOV
2113
    goto _OVER;
×
2114
  }
2115

UNCOV
2116
  SCreateTSMACxt cxt = {
×
2117
      .pDb = pDb,
2118
      .pMnode = pMnode,
2119
      .pRpcReq = pReq,
2120
      .pSma = pSma,
2121
      .streamName = streamName,
2122
      .targetStbFullName = streamTargetStbFullName,
2123
      .pDropSmaReq = &dropReq,
2124
  };
2125

UNCOV
2126
  code = mndDropTSMA(&cxt);
×
2127

UNCOV
2128
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
2129
_OVER:
×
2130

UNCOV
2131
  mndReleaseStb(pMnode, pStb);
×
UNCOV
2132
  mndReleaseSma(pMnode, pSma);
×
UNCOV
2133
  mndReleaseDb(pMnode, pDb);
×
UNCOV
2134
  TAOS_RETURN(code);
×
2135
}
2136

2137
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
2138
  SDbObj          *pDb = NULL;
×
2139
  int32_t          numOfRows = 0;
×
2140
  SSmaObj         *pSma = NULL;
×
2141
  SMnode          *pMnode = pReq->info.node;
×
2142
  int32_t          code = 0;
×
2143
  SColumnInfoData *pColInfo;
2144
  if (pShow->pIter == NULL) {
×
2145
    pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
×
2146
  }
2147
  if (!pShow->pIter) {
×
2148
    return terrno;
×
2149
  }
2150
  if (pShow->db[0]) {
×
2151
    pDb = mndAcquireDb(pMnode, pShow->db);
×
2152
  }
2153
  SSmaAndTagIter *pIter = pShow->pIter;
×
2154
  while (numOfRows < rows) {
×
2155
    pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
×
2156
    if (pIter->pSmaIter == NULL) break;
×
2157
    SDbObj *pSrcDb = mndAcquireDb(pMnode, pSma->db);
×
2158

2159
    if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) {
×
2160
      sdbRelease(pMnode->pSdb, pSma);
×
2161
      if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
×
2162
      continue;
×
2163
    }
2164

2165
    int32_t cols = 0;
×
2166
    SName   n = {0};
×
2167

2168
    code = tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2169
    char smaName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
2170
    if (TSDB_CODE_SUCCESS == code) {
×
2171
      STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n));
×
2172
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2173
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
2174
    }
2175

2176
    char db[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
2177
    if (TSDB_CODE_SUCCESS == code) {
×
2178
      STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
×
2179
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2180
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
2181
    }
2182

2183
    if (TSDB_CODE_SUCCESS == code) {
×
2184
      code = tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2185
    }
2186
    char srcTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
2187
    if (TSDB_CODE_SUCCESS == code) {
×
2188
      STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
×
2189
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2190
      code = colDataSetVal(pColInfo, numOfRows, (const char *)srcTb, false);
×
2191
    }
2192

2193
    if (TSDB_CODE_SUCCESS == code) {
×
2194
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2195
      code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
×
2196
    }
2197

2198
    if (TSDB_CODE_SUCCESS == code) {
×
2199
      code = tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
2200
    }
2201

2202
    if (TSDB_CODE_SUCCESS == code) {
×
2203
      char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
2204
      STR_TO_VARSTR(targetTb, (char *)tNameGetTableName(&n));
×
2205
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2206
      code = colDataSetVal(pColInfo, numOfRows, (const char *)targetTb, false);
×
2207
    }
2208

2209
    if (TSDB_CODE_SUCCESS == code) {
×
2210
      // stream name
2211
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2212
      code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
×
2213
    }
2214

2215
    if (TSDB_CODE_SUCCESS == code) {
×
2216
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2217
      code = colDataSetVal(pColInfo, numOfRows, (const char *)(&pSma->createdTime), false);
×
2218
    }
2219

2220
    // interval
2221
    char    interval[64 + VARSTR_HEADER_SIZE] = {0};
×
2222
    int32_t len = 0;
×
2223
    if (TSDB_CODE_SUCCESS == code) {
×
2224
      if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
×
2225
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
×
2226
                        getPrecisionUnit(pSrcDb->cfg.precision));
×
2227
      } else {
2228
        len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
×
2229
      }
2230
      varDataSetLen(interval, len);
×
2231
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2232
      code = colDataSetVal(pColInfo, numOfRows, interval, false);
×
2233
    }
2234

2235
    char buf[TSDB_MAX_SAVED_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
2236
    if (TSDB_CODE_SUCCESS == code) {
×
2237
      // create sql
2238
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2239
      len = tsnprintf(buf + VARSTR_HEADER_SIZE, TSDB_MAX_SAVED_SQL_LEN, "%s", pSma->sql);
×
2240
      varDataSetLen(buf, TMIN(len, TSDB_MAX_SAVED_SQL_LEN));
×
2241
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
2242
    }
2243

2244
    // func list
2245
    len = 0;
×
2246
    SNode *pNode = NULL, *pFunc = NULL;
×
2247
    if (TSDB_CODE_SUCCESS == code) {
×
2248
      code = nodesStringToNode(pSma->ast, &pNode);
×
2249
    }
2250
    if (TSDB_CODE_SUCCESS == code) {
×
2251
      char *start = buf + VARSTR_HEADER_SIZE;
×
2252
      FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) {
×
2253
        if (nodeType(pFunc) == QUERY_NODE_FUNCTION) {
×
2254
          SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
×
2255
          if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
2256
          len += tsnprintf(start, TSDB_MAX_SAVED_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "",
×
2257
                           ((SExprNode *)pFunc)->userAlias);
×
2258
          if (len >= TSDB_MAX_SAVED_SQL_LEN) {
×
2259
            len = TSDB_MAX_SAVED_SQL_LEN;
×
2260
            break;
×
2261
          }
2262
          start = buf + VARSTR_HEADER_SIZE + len;
×
2263
        }
2264
      }
2265
      nodesDestroyNode(pNode);
×
2266
    }
2267

2268
    if (TSDB_CODE_SUCCESS == code) {
×
2269
      varDataSetLen(buf, len);
×
2270
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2271
      code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
2272
    }
2273

2274
    numOfRows++;
×
2275
    mndReleaseSma(pMnode, pSma);
×
2276
    mndReleaseDb(pMnode, pSrcDb);
×
2277
    if (TSDB_CODE_SUCCESS != code) {
×
2278
      sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
×
2279
      numOfRows = code;
×
2280
      break;
×
2281
    }
2282
  }
2283
  mndReleaseDb(pMnode, pDb);
×
2284
  pShow->numOfRows += numOfRows;
×
2285
  if (numOfRows < rows) {
×
2286
    taosMemoryFree(pShow->pIter);
×
2287
    pShow->pIter = NULL;
×
2288
  }
2289
  return numOfRows;
×
2290
}
2291

2292
static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
×
2293
  SSmaAndTagIter *p = pIter;
×
2294
  if (p != NULL) {
×
2295
    SSdb *pSdb = pMnode->pSdb;
×
2296
    sdbCancelFetchByType(pSdb, p->pSmaIter, SDB_SMA);
×
2297
  }
2298
  taosMemoryFree(p);
×
2299
}
×
2300

UNCOV
2301
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj *pSma, const SStbObj *pDestStb, STableTSMAInfo *pInfo,
×
2302
                               const SSmaObj *pBaseTsma) {
UNCOV
2303
  int32_t code = 0;
×
UNCOV
2304
  pInfo->interval = pSma->interval;
×
UNCOV
2305
  pInfo->unit = pSma->intervalUnit;
×
UNCOV
2306
  pInfo->tsmaId = pSma->uid;
×
UNCOV
2307
  pInfo->version = pSma->version;
×
UNCOV
2308
  pInfo->tsmaId = pSma->uid;
×
UNCOV
2309
  pInfo->destTbUid = pDestStb->uid;
×
UNCOV
2310
  SName sName = {0};
×
UNCOV
2311
  code = tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
2312
  if (TSDB_CODE_SUCCESS != code) {
×
2313
    return code;
×
2314
  }
UNCOV
2315
  tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN);
×
UNCOV
2316
  tstrncpy(pInfo->targetDbFName, pSma->db, TSDB_DB_FNAME_LEN);
×
UNCOV
2317
  code = tNameFromString(&sName, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
2318
  if (TSDB_CODE_SUCCESS != code) {
×
2319
    return code;
×
2320
  }
UNCOV
2321
  tstrncpy(pInfo->targetTb, sName.tname, TSDB_TABLE_NAME_LEN);
×
UNCOV
2322
  tstrncpy(pInfo->dbFName, pSma->db, TSDB_DB_FNAME_LEN);
×
UNCOV
2323
  code = tNameFromString(&sName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
2324
  if (TSDB_CODE_SUCCESS != code) {
×
2325
    return code;
×
2326
  }
UNCOV
2327
  tstrncpy(pInfo->tb, sName.tname, TSDB_TABLE_NAME_LEN);
×
UNCOV
2328
  pInfo->pFuncs = taosArrayInit(8, sizeof(STableTSMAFuncInfo));
×
UNCOV
2329
  if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY;
×
2330

2331
  SNode *pNode, *pFunc;
UNCOV
2332
  if (TSDB_CODE_SUCCESS != nodesStringToNode(pBaseTsma ? pBaseTsma->ast : pSma->ast, &pNode)) {
×
2333
    taosArrayDestroy(pInfo->pFuncs);
×
2334
    pInfo->pFuncs = NULL;
×
2335
    return TSDB_CODE_TSMA_INVALID_STAT;
×
2336
  }
UNCOV
2337
  if (pNode) {
×
UNCOV
2338
    SSelectStmt *pSelect = (SSelectStmt *)pNode;
×
UNCOV
2339
    FOREACH(pFunc, pSelect->pProjectionList) {
×
UNCOV
2340
      STableTSMAFuncInfo funcInfo = {0};
×
UNCOV
2341
      SFunctionNode     *pFuncNode = (SFunctionNode *)pFunc;
×
UNCOV
2342
      if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
×
UNCOV
2343
      funcInfo.funcId = pFuncNode->funcId;
×
UNCOV
2344
      funcInfo.colId = ((SColumnNode *)pFuncNode->pParameterList->pHead->pNode)->colId;
×
UNCOV
2345
      if (!taosArrayPush(pInfo->pFuncs, &funcInfo)) {
×
2346
        code = terrno;
×
2347
        taosArrayDestroy(pInfo->pFuncs);
×
2348
        nodesDestroyNode(pNode);
×
2349
        return code;
×
2350
      }
2351
    }
UNCOV
2352
    nodesDestroyNode(pNode);
×
2353
  }
UNCOV
2354
  pInfo->ast = taosStrdup(pSma->ast);
×
UNCOV
2355
  if (!pInfo->ast) code = terrno;
×
2356

UNCOV
2357
  if (code == TSDB_CODE_SUCCESS && pDestStb->numOfTags > 0) {
×
UNCOV
2358
    pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema));
×
UNCOV
2359
    if (!pInfo->pTags) {
×
2360
      code = terrno;
×
2361
    } else {
UNCOV
2362
      for (int32_t i = 0; i < pDestStb->numOfTags; ++i) {
×
UNCOV
2363
        if (NULL == taosArrayPush(pInfo->pTags, &pDestStb->pTags[i])) {
×
2364
          code = terrno;
×
2365
          break;
×
2366
        }
2367
      }
2368
    }
2369
  }
UNCOV
2370
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
2371
    pInfo->pUsedCols = taosArrayInit(pDestStb->numOfColumns - 3, sizeof(SSchema));
×
UNCOV
2372
    if (!pInfo->pUsedCols)
×
2373
      code = terrno;
×
2374
    else {
2375
      // skip _wstart, _wend, _duration
UNCOV
2376
      for (int32_t i = 1; i < pDestStb->numOfColumns - 2; ++i) {
×
UNCOV
2377
        if (NULL == taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i])) {
×
2378
          code = terrno;
×
2379
          break;
×
2380
        }
2381
      }
2382
    }
2383
  }
UNCOV
2384
  TAOS_RETURN(code);
×
2385
}
2386

2387
// @note remember to mndReleaseSma(*ppOut)
UNCOV
2388
static int32_t mndGetDeepestBaseForTsma(SMnode *pMnode, SSmaObj *pSma, SSmaObj **ppOut) {
×
UNCOV
2389
  int32_t  code = 0;
×
UNCOV
2390
  SSmaObj *pRecursiveTsma = NULL;
×
UNCOV
2391
  if (pSma->baseSmaName[0]) {
×
UNCOV
2392
    pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName);
×
UNCOV
2393
    if (!pRecursiveTsma) {
×
2394
      mError("base tsma: %s for tsma: %s not found", pSma->baseSmaName, pSma->name);
×
2395
      return TSDB_CODE_MND_SMA_NOT_EXIST;
×
2396
    }
UNCOV
2397
    while (pRecursiveTsma->baseSmaName[0]) {
×
UNCOV
2398
      SSmaObj *pTmpSma = pRecursiveTsma;
×
UNCOV
2399
      pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
×
UNCOV
2400
      if (!pRecursiveTsma) {
×
2401
        mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name);
×
2402
        mndReleaseSma(pMnode, pTmpSma);
×
2403
        return TSDB_CODE_MND_SMA_NOT_EXIST;
×
2404
      }
UNCOV
2405
      mndReleaseSma(pMnode, pTmpSma);
×
2406
    }
2407
  }
UNCOV
2408
  *ppOut = pRecursiveTsma;
×
UNCOV
2409
  return code;
×
2410
}
2411

UNCOV
2412
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
×
UNCOV
2413
  int32_t  code = -1;
×
UNCOV
2414
  SSmaObj *pSma = NULL;
×
UNCOV
2415
  SSmaObj *pBaseTsma = NULL;
×
UNCOV
2416
  SStbObj *pDstStb = NULL;
×
2417

UNCOV
2418
  pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName);
×
UNCOV
2419
  if (pSma) {
×
UNCOV
2420
    pDstStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
UNCOV
2421
    if (!pDstStb) {
×
2422
      sdbRelease(pMnode->pSdb, pSma);
×
2423
      return TSDB_CODE_SUCCESS;
×
2424
    }
2425

UNCOV
2426
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
UNCOV
2427
    if (!pTsma) {
×
2428
      code = terrno;
×
2429
      sdbRelease(pMnode->pSdb, pSma);
×
2430
      mndReleaseStb(pMnode, pDstStb);
×
2431
      TAOS_RETURN(code);
×
2432
    }
2433

UNCOV
2434
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
×
UNCOV
2435
    if (code == 0) {
×
UNCOV
2436
      code = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma, pBaseTsma);
×
2437
    }
UNCOV
2438
    mndReleaseStb(pMnode, pDstStb);
×
UNCOV
2439
    sdbRelease(pMnode->pSdb, pSma);
×
UNCOV
2440
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
UNCOV
2441
    if (terrno) {
×
2442
      tFreeAndClearTableTSMAInfo(pTsma);
×
2443
      TAOS_RETURN(code);
×
2444
    }
UNCOV
2445
    if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
×
2446
      code = terrno;
×
2447
      tFreeAndClearTableTSMAInfo(pTsma);
×
2448
    }
UNCOV
2449
    *exist = true;
×
2450
  }
UNCOV
2451
  TAOS_RETURN(code);
×
2452
}
2453

2454
typedef bool (*tsmaFilter)(const SSmaObj *pSma, void *param);
2455

2456
static int32_t mndGetSomeTsmas(SMnode *pMnode, STableTSMAInfoRsp *pRsp, tsmaFilter filtered, void *param, bool *exist) {
29✔
2457
  int32_t     code = 0;
29✔
2458
  SSmaObj    *pSma = NULL;
29✔
2459
  SSmaObj    *pBaseTsma = NULL;
29✔
2460
  SSdb       *pSdb = pMnode->pSdb;
29✔
2461
  void       *pIter = NULL;
29✔
2462
  SStreamObj *pStream = NULL;
29✔
2463
  SStbObj    *pStb = NULL;
29✔
2464
  bool        shouldRetry = false;
29✔
2465

UNCOV
2466
  while (1) {
×
2467
    pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
29✔
2468
    if (pIter == NULL) break;
29!
2469

UNCOV
2470
    if (filtered(pSma, param)) {
×
UNCOV
2471
      sdbRelease(pSdb, pSma);
×
UNCOV
2472
      continue;
×
2473
    }
2474

UNCOV
2475
    pStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
UNCOV
2476
    if (!pStb) {
×
UNCOV
2477
      sdbRelease(pSdb, pSma);
×
UNCOV
2478
      shouldRetry = true;
×
UNCOV
2479
      continue;
×
2480
    }
2481

2482
    SName smaName;
UNCOV
2483
    char  streamName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
2484
    code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
UNCOV
2485
    if (TSDB_CODE_SUCCESS != code) {
×
2486
      sdbRelease(pSdb, pSma);
×
2487
      mndReleaseStb(pMnode, pStb);
×
2488
      TAOS_RETURN(code);
×
2489
    }
UNCOV
2490
    snprintf(streamName, TSDB_TABLE_FNAME_LEN, "%d.%s", smaName.acctId, smaName.tname);
×
UNCOV
2491
    pStream = NULL;
×
2492

UNCOV
2493
    code = mndAcquireStream(pMnode, streamName, &pStream);
×
UNCOV
2494
    if (!pStream) {
×
2495
      shouldRetry = true;
×
2496
      sdbRelease(pSdb, pSma);
×
2497
      mndReleaseStb(pMnode, pStb);
×
2498
      continue;
×
2499
    }
UNCOV
2500
    if (code != 0) {
×
2501
      sdbRelease(pSdb, pSma);
×
2502
      mndReleaseStb(pMnode, pStb);
×
2503
      TAOS_RETURN(code);
×
2504
    }
2505

UNCOV
2506
    int64_t streamId = pStream->uid;
×
UNCOV
2507
    mndReleaseStream(pMnode, pStream);
×
2508

UNCOV
2509
    STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
UNCOV
2510
    if (!pTsma) {
×
2511
      code = terrno;
×
2512
      mndReleaseStb(pMnode, pStb);
×
2513
      sdbRelease(pSdb, pSma);
×
2514
      sdbCancelFetch(pSdb, pIter);
×
2515
      TAOS_RETURN(code);
×
2516
    }
UNCOV
2517
    pTsma->streamUid = streamId;
×
2518

UNCOV
2519
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
×
UNCOV
2520
    if (code == 0) {
×
UNCOV
2521
      code = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma, pBaseTsma);
×
2522
    }
UNCOV
2523
    mndReleaseStb(pMnode, pStb);
×
UNCOV
2524
    sdbRelease(pSdb, pSma);
×
UNCOV
2525
    if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
×
UNCOV
2526
    if (terrno) {
×
2527
      tFreeAndClearTableTSMAInfo(pTsma);
×
2528
      sdbCancelFetch(pSdb, pIter);
×
2529
      TAOS_RETURN(code);
×
2530
    }
UNCOV
2531
    if (NULL == taosArrayPush(pRsp->pTsmas, &pTsma)) {
×
2532
      code = terrno;
×
2533
      tFreeAndClearTableTSMAInfo(pTsma);
×
2534
      sdbCancelFetch(pSdb, pIter);
×
2535
      TAOS_RETURN(code);
×
2536
    }
UNCOV
2537
    *exist = true;
×
2538
  }
2539
  if (shouldRetry) {
29!
UNCOV
2540
    return TSDB_CODE_NEED_RETRY;
×
2541
  }
2542
  return TSDB_CODE_SUCCESS;
29✔
2543
}
2544

UNCOV
2545
static bool tsmaTbFilter(const SSmaObj *pSma, void *param) {
×
UNCOV
2546
  const char *tbFName = param;
×
UNCOV
2547
  return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0;
×
2548
}
2549

UNCOV
2550
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *pRsp, bool *exist) {
×
UNCOV
2551
  return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist);
×
2552
}
2553

UNCOV
2554
static bool tsmaDbFilter(const SSmaObj *pSma, void *param) {
×
UNCOV
2555
  uint64_t *dbUid = param;
×
UNCOV
2556
  return pSma->dbUid != *dbUid;
×
2557
}
2558

2559
int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist) {
29✔
2560
  return mndGetSomeTsmas(pMnode, pRsp, tsmaDbFilter, &dbUid, exist);
29✔
2561
}
2562

UNCOV
2563
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
×
UNCOV
2564
  STableTSMAInfoRsp rsp = {0};
×
UNCOV
2565
  int32_t           code = -1;
×
UNCOV
2566
  STableTSMAInfoReq tsmaReq = {0};
×
UNCOV
2567
  bool              exist = false;
×
UNCOV
2568
  SMnode           *pMnode = pReq->info.node;
×
2569

UNCOV
2570
  TAOS_CHECK_GOTO(tDeserializeTableTSMAInfoReq(pReq->pCont, pReq->contLen, &tsmaReq), NULL, _OVER);
×
2571

UNCOV
2572
  rsp.pTsmas = taosArrayInit(4, POINTER_BYTES);
×
UNCOV
2573
  if (NULL == rsp.pTsmas) {
×
2574
    code = terrno;
×
2575
    goto _OVER;
×
2576
  }
2577

UNCOV
2578
  if (tsmaReq.fetchingWithTsmaName) {
×
UNCOV
2579
    code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
×
2580
  } else {
UNCOV
2581
    code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
×
UNCOV
2582
    if (TSDB_CODE_NEED_RETRY == code) {
×
2583
      code = TSDB_CODE_SUCCESS;
×
2584
    }
2585
  }
UNCOV
2586
  if (code) {
×
2587
    goto _OVER;
×
2588
  }
2589

UNCOV
2590
  if (!exist) {
×
UNCOV
2591
    code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
2592
  } else {
UNCOV
2593
    int32_t contLen = tSerializeTableTSMAInfoRsp(NULL, 0, &rsp);
×
UNCOV
2594
    void   *pRsp = rpcMallocCont(contLen);
×
UNCOV
2595
    if (pRsp == NULL) {
×
2596
      code = terrno;
×
2597
      goto _OVER;
×
2598
    }
2599

UNCOV
2600
    int32_t len = tSerializeTableTSMAInfoRsp(pRsp, contLen, &rsp);
×
UNCOV
2601
    if (len < 0) {
×
2602
      code = terrno;
×
2603
      goto _OVER;
×
2604
    }
2605

UNCOV
2606
    pReq->info.rsp = pRsp;
×
UNCOV
2607
    pReq->info.rspLen = contLen;
×
2608

UNCOV
2609
    code = 0;
×
2610
  }
2611

UNCOV
2612
_OVER:
×
UNCOV
2613
  tFreeTableTSMAInfoRsp(&rsp);
×
UNCOV
2614
  TAOS_RETURN(code);
×
2615
}
2616

UNCOV
2617
static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo **ppTsma) {
×
UNCOV
2618
  STableTSMAInfo *pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
UNCOV
2619
  if (!pInfo) {
×
2620
    return terrno;
×
2621
  }
UNCOV
2622
  pInfo->pFuncs = NULL;
×
UNCOV
2623
  pInfo->tsmaId = pTsmaVer->tsmaId;
×
UNCOV
2624
  tstrncpy(pInfo->dbFName, pTsmaVer->dbFName, TSDB_DB_FNAME_LEN);
×
UNCOV
2625
  tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN);
×
UNCOV
2626
  tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN);
×
UNCOV
2627
  pInfo->dbId = pTsmaVer->dbId;
×
UNCOV
2628
  pInfo->ast = taosMemoryCalloc(1, 1);
×
UNCOV
2629
  if (!pInfo->ast) {
×
2630
    taosMemoryFree(pInfo);
×
2631
    return terrno;
×
2632
  }
UNCOV
2633
  *ppTsma = pInfo;
×
UNCOV
2634
  return TSDB_CODE_SUCCESS;
×
2635
}
2636

UNCOV
2637
int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp,
×
2638
                            int32_t *pRspLen) {
UNCOV
2639
  int32_t         code = -1;
×
UNCOV
2640
  STSMAHbRsp      hbRsp = {0};
×
UNCOV
2641
  int32_t         rspLen = 0;
×
UNCOV
2642
  void           *pRsp = NULL;
×
UNCOV
2643
  char            tsmaFName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
2644
  STableTSMAInfo *pTsmaInfo = NULL;
×
2645

UNCOV
2646
  hbRsp.pTsmas = taosArrayInit(numOfTsmas, POINTER_BYTES);
×
UNCOV
2647
  if (!hbRsp.pTsmas) {
×
2648
    code = terrno;
×
2649
    TAOS_RETURN(code);
×
2650
  }
2651

UNCOV
2652
  for (int32_t i = 0; i < numOfTsmas; ++i) {
×
UNCOV
2653
    STSMAVersion *pTsmaVer = &pTsmaVersions[i];
×
UNCOV
2654
    pTsmaVer->dbId = be64toh(pTsmaVer->dbId);
×
UNCOV
2655
    pTsmaVer->tsmaId = be64toh(pTsmaVer->tsmaId);
×
UNCOV
2656
    pTsmaVer->version = ntohl(pTsmaVer->version);
×
2657

UNCOV
2658
    snprintf(tsmaFName, sizeof(tsmaFName), "%s.%s", pTsmaVer->dbFName, pTsmaVer->name);
×
UNCOV
2659
    SSmaObj *pSma = mndAcquireSma(pMnode, tsmaFName);
×
UNCOV
2660
    if (!pSma) {
×
UNCOV
2661
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
UNCOV
2662
      if (code) goto _OVER;
×
UNCOV
2663
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2664
        code = terrno;
×
2665
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2666
        goto _OVER;
×
2667
      }
UNCOV
2668
      continue;
×
2669
    }
2670

UNCOV
2671
    if (pSma->uid != pTsmaVer->tsmaId) {
×
2672
      mDebug("tsma: %s.%" PRIx64 " tsmaId mismatch with current %" PRIx64, tsmaFName, pTsmaVer->tsmaId, pSma->uid);
×
2673
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2674
      mndReleaseSma(pMnode, pSma);
×
2675
      if (code) goto _OVER;
×
2676
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2677
        code = terrno;
×
2678
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2679
        goto _OVER;
×
2680
      }
2681
      continue;
×
UNCOV
2682
    } else if (pSma->version == pTsmaVer->version) {
×
UNCOV
2683
      mndReleaseSma(pMnode, pSma);
×
UNCOV
2684
      continue;
×
2685
    }
2686

2687
    SStbObj *pDestStb = mndAcquireStb(pMnode, pSma->dstTbName);
×
2688
    if (!pDestStb) {
×
2689
      mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName);
×
2690
      code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
×
2691
      mndReleaseSma(pMnode, pSma);
×
2692
      if (code) goto _OVER;
×
2693
      if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
×
2694
        code = terrno;
×
2695
        tFreeAndClearTableTSMAInfo(pTsmaInfo);
×
2696
        goto _OVER;
×
2697
      }
2698
      continue;
×
2699
    }
2700

2701
    // dump smaObj into rsp
2702
    STableTSMAInfo *pInfo = NULL;
×
2703
    pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
×
2704
    if (!pInfo) {
×
2705
      code = terrno;
×
2706
      mndReleaseSma(pMnode, pSma);
×
2707
      mndReleaseStb(pMnode, pDestStb);
×
2708
      goto _OVER;
×
2709
    }
2710

2711
    SSmaObj *pBaseSma = NULL;
×
2712
    code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma);
×
2713
    if (code == 0) code = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma);
×
2714

2715
    mndReleaseStb(pMnode, pDestStb);
×
2716
    mndReleaseSma(pMnode, pSma);
×
2717
    if (pBaseSma) mndReleaseSma(pMnode, pBaseSma);
×
2718
    if (terrno) {
×
2719
      tFreeAndClearTableTSMAInfo(pInfo);
×
2720
      goto _OVER;
×
2721
    }
2722

2723
    if (NULL == taosArrayPush(hbRsp.pTsmas, pInfo)) {
×
2724
      code = terrno;
×
2725
      tFreeAndClearTableTSMAInfo(pInfo);
×
2726
      goto _OVER;
×
2727
    }
2728
  }
2729

UNCOV
2730
  rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp);
×
UNCOV
2731
  if (rspLen < 0) {
×
2732
    code = terrno;
×
2733
    goto _OVER;
×
2734
  }
2735

UNCOV
2736
  pRsp = taosMemoryMalloc(rspLen);
×
UNCOV
2737
  if (!pRsp) {
×
2738
    code = terrno;
×
2739
    rspLen = 0;
×
2740
    goto _OVER;
×
2741
  }
2742

UNCOV
2743
  rspLen = tSerializeTSMAHbRsp(pRsp, rspLen, &hbRsp);
×
UNCOV
2744
  if (rspLen < 0) {
×
2745
    code = terrno;
×
2746
    goto _OVER;
×
2747
  }
UNCOV
2748
  code = 0;
×
UNCOV
2749
_OVER:
×
UNCOV
2750
  tFreeTSMAHbRsp(&hbRsp);
×
UNCOV
2751
  *ppRsp = pRsp;
×
UNCOV
2752
  *pRspLen = rspLen;
×
UNCOV
2753
  TAOS_RETURN(code);
×
2754
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc