• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4969

27 Feb 2026 07:19AM UTC coverage: 67.69% (+0.8%) from 66.902%
#4969

push

travis-ci

web-flow
merge: from main to 3.0 #34603

15 of 58 new or added lines in 2 files covered. (25.86%)

5075 existing lines in 154 files now uncovered.

208337 of 307781 relevant lines covered (67.69%)

129686642.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.74
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndTrans.h"
23
#include "mndUser.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 100000
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStmRuntime  mStreamMgmt = {0};
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
43

44
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
45
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
46

47
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
48
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
49
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
50
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
51
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq);
52
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq);
53

54
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
55

56
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
57
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
58
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
59
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
60
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
61
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
62

63
void mndCleanupStream(SMnode *pMnode) {
418,480✔
64
  mDebug("try to clean up stream");
418,480✔
65
  
66
  msmHandleBecomeNotLeader(pMnode);
418,480✔
67
  
68
  mDebug("mnd stream runtime info cleanup");
418,480✔
69
}
418,480✔
70

71
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
197,459✔
72
  int32_t     code = 0;
197,459✔
73
  int32_t     lino = 0;
197,459✔
74
  SSdbRow    *pRow = NULL;
197,459✔
75
  SStreamObj *pStream = NULL;
197,459✔
76
  void       *buf = NULL;
197,459✔
77
  int8_t      sver = 0;
197,459✔
78
  int32_t     tlen;
197,060✔
79
  int32_t     dataPos = 0;
197,459✔
80

81
  code = sdbGetRawSoftVer(pRaw, &sver);
197,459✔
82
  TSDB_CHECK_CODE(code, lino, _over);
197,459✔
83

84
  if (sver != MND_STREAM_VER_NUMBER && sver != MND_STREAM_COMPATIBLE_VER_NUMBER) {
197,459✔
85
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
86
    goto _over;
×
87
  }
88

89
  pRow = sdbAllocRow(sizeof(SStreamObj));
197,459✔
90
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
197,459✔
91

92
  pStream = sdbGetRowObj(pRow);
197,459✔
93
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
197,459✔
94

95
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
197,459✔
96

97
  buf = taosMemoryMalloc(tlen + 1);
197,459✔
98
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
197,459✔
99

100
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
197,459✔
101

102
  SDecoder decoder;
197,060✔
103
  tDecoderInit(&decoder, buf, tlen + 1);
197,459✔
104
  code = tDecodeSStreamObj(&decoder, pStream, sver);
197,459✔
105
  tDecoderClear(&decoder);
197,459✔
106

107
  if (code < 0) {
197,459✔
108
    tFreeStreamObj(pStream);
×
109
  }
110

111
_over:
197,459✔
112
  taosMemoryFreeClear(buf);
197,459✔
113

114
  if (code != TSDB_CODE_SUCCESS) {
197,459✔
115
    char *p = (pStream == NULL || NULL == pStream->pCreate) ? "null" : pStream->pCreate->name;
×
116
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
117
    taosMemoryFreeClear(pRow);
×
118

119
    terrno = code;
×
120
    return NULL;
×
121
  } else {
122
    mTrace("stream:%s, decode from raw:%p, row:%p", pStream->pCreate->name, pRaw, pStream);
197,459✔
123

124
    terrno = 0;
197,459✔
125
    return pRow;
197,459✔
126
  }
127
}
128

129
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
166,434✔
130
  mTrace("stream:%s, perform insert action", pStream->pCreate->name);
166,434✔
131
  return 0;
166,434✔
132
}
133

134
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
197,459✔
135
  mInfo("stream:%s, perform delete action", pStream->pCreate->name);
197,459✔
136
  tFreeStreamObj(pStream);
197,459✔
137
  return 0;
197,459✔
138
}
139

140
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
5,601✔
141
  mTrace("stream:%s, perform update action", pOldStream->pCreate->name);
5,601✔
142

143
  atomic_store_32(&pOldStream->mainSnodeId, pNewStream->mainSnodeId);
5,601✔
144
  atomic_store_8(&pOldStream->userStopped, atomic_load_8(&pNewStream->userStopped));
5,601✔
145
  pOldStream->ownerId = pNewStream->ownerId;
5,601✔
146
  pOldStream->updateTime = pNewStream->updateTime;
5,601✔
147
  
148
  return 0;
5,601✔
149
}
150

151
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
395,969✔
152
  int32_t code = 0;
395,969✔
153
  SSdb   *pSdb = pMnode->pSdb;
395,969✔
154
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
395,969✔
155
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
395,969✔
156
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
177,110✔
157
  }
158
  return code;
395,969✔
159
}
160

161
static bool mndStreamGetNameFromId(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
162
  SStreamObj* pStream = pObj;
×
163

164
  if (pStream->pCreate->streamId == *(int64_t*)p1) {
×
165
    strncpy((char*)p2, pStream->name, TSDB_STREAM_NAME_LEN);
×
166
    return false;
×
167
  }
168

169
  return true;
×
170
}
171

172
int32_t mndAcquireStreamById(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
×
173
  int32_t code = 0;
×
174
  SSdb   *pSdb = pMnode->pSdb;
×
175
  char streamName[TSDB_STREAM_NAME_LEN];
×
176
  streamName[0] = 0;
×
177
  
178
  sdbTraverse(pSdb, SDB_STREAM, mndStreamGetNameFromId, &streamId, streamName, NULL);
×
179
  if (streamName[0]) {
×
180
    (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
181
    if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
182
      code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
183
    }
184
  }
185
  
186
  return code;
×
187
}
188

189
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
197,761✔
190
  SSdb *pSdb = pMnode->pSdb;
197,761✔
191
  sdbRelease(pSdb, pStream);
197,761✔
192
}
197,761✔
193

194
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
195
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
196
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
197
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
198
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
199

200
static void mndStreamBuildObj(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate, SUserObj *pOperUser,
165,600✔
201
                              int32_t snodeId) {
202
  int32_t code = 0;
165,600✔
203

204
  pObj->pCreate = pCreate;
165,600✔
205
  strncpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
165,600✔
206
  (void)snprintf(pObj->createUser, sizeof(pObj->createUser), "%s", pOperUser->name);
165,600✔
207
  pObj->ownerId = pOperUser->uid;
165,600✔
208
  pObj->mainSnodeId = snodeId;
165,600✔
209

210
  pObj->userDropped = 0;
165,600✔
211
  pObj->userStopped = 0;
165,600✔
212

213
  pObj->createTime = taosGetTimestampMs();
165,600✔
214
  pObj->updateTime = pObj->createTime;
165,600✔
215

216
  mstLogSStreamObj("create stream", pObj);
165,600✔
217
}
165,600✔
218

219
static int32_t mndStreamCreateOutStb(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream, const char *user) {
62,144✔
220
  SStbObj *pStb = NULL;
62,144✔
221
  SDbObj  *pDb = NULL;
62,144✔
222
  int32_t  code = 0;
62,144✔
223
  int32_t  lino = 0;
62,144✔
224

225
  SMCreateStbReq createReq = {0};
62,144✔
226
  TAOS_STRNCAT(createReq.name, pStream->outDB, TSDB_DB_FNAME_LEN);
62,144✔
227
  TAOS_STRNCAT(createReq.name, ".", 2);
62,144✔
228
  TAOS_STRNCAT(createReq.name,  pStream->outTblName, TSDB_TABLE_NAME_LEN);
62,144✔
229
  createReq.numOfColumns = taosArrayGetSize(pStream->outCols);
62,144✔
230
  createReq.numOfTags = pStream->outTags ? taosArrayGetSize(pStream->outTags) : 1;
62,144✔
231
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
62,144✔
232
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
62,144✔
233

234
  // build fields
235
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
330,690✔
236
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
268,546✔
237
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
268,546✔
238
    SFieldWithOptions *pSrc = taosArrayGet(pStream->outCols, i);
268,546✔
239

240
    tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
268,546✔
241
    pField->flags = pSrc->flags;
268,546✔
242
    pField->type = pSrc->type;
268,546✔
243
    pField->bytes = pSrc->bytes;
268,546✔
244
    pField->compress = createDefaultColCmprByType(pField->type);
268,546✔
245
    if (IS_DECIMAL_TYPE(pField->type)) {
268,546✔
246
      pField->typeMod = pSrc->typeMod;
×
247
      pField->flags |= COL_HAS_TYPE_MOD;
×
248
    }
249
  }
250

251
  if (NULL == pStream->outTags) {
62,144✔
252
    createReq.numOfTags = 1;
×
253
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
254
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
255

256
    // build tags
257
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
258
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
259

260
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
261
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
262
    pField->flags = 0;
×
263
    pField->bytes = 8;
×
264
  } else {
265
    createReq.numOfTags = taosArrayGetSize(pStream->outTags);
62,144✔
266
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
62,144✔
267
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
62,144✔
268

269
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
169,164✔
270
      SField *pField = taosArrayGet(createReq.pTags, i);
107,020✔
271
      if (pField == NULL) {
107,020✔
272
        continue;
×
273
      }
274

275
      TAOS_FIELD_E *pSrc = taosArrayGet(pStream->outTags, i);
107,020✔
276
      pField->bytes = pSrc->bytes;
107,020✔
277
      pField->flags = 0;
107,020✔
278
      pField->type = pSrc->type;
107,020✔
279
      tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
107,020✔
280
    }
281
  }
282

283
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
62,144✔
284
    goto _OVER;
×
285
  }
286

287
  pStb = mndAcquireStb(pMnode, createReq.name);
62,144✔
288
  if (pStb != NULL) {
62,144✔
289
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
290
    goto _OVER;
×
291
  }
292

293
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
62,144✔
294
  if (pDb == NULL) {
62,144✔
295
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
296
    goto _OVER;
×
297
  }
298

299
  int32_t numOfStbs = -1;
62,144✔
300
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
62,144✔
301
    goto _OVER;
×
302
  }
303

304
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
62,144✔
305
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
306
    goto _OVER;
×
307
  }
308

309
  SStbObj stbObj = {0};
62,144✔
310

311
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
62,144✔
312
    goto _OVER;
×
313
  }
314

315
  stbObj.uid = pStream->outStbUid;
62,144✔
316

317
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
62,144✔
318
    mndFreeStb(&stbObj);
×
319
    goto _OVER;
×
320
  }
321

322
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->outTblName, createReq.numOfColumns);
62,144✔
323

324
  tFreeSMCreateStbReq(&createReq);
62,144✔
325
  mndFreeStb(&stbObj);
62,144✔
326
  mndReleaseStb(pMnode, pStb);
62,144✔
327
  mndReleaseDb(pMnode, pDb);
62,144✔
328
  return code;
62,144✔
329

330
_OVER:
×
331
  tFreeSMCreateStbReq(&createReq);
×
332
  mndReleaseStb(pMnode, pStb);
×
333
  mndReleaseDb(pMnode, pDb);
×
334

335
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->outTblName, lino,
×
336
         tstrerror(code));
337
  return code;
×
338
}
339

340
static int32_t mndStreamValidateCreate(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq* pCreate) {
165,600✔
341
  int32_t code = 0, lino = 0;
165,600✔
342
  int64_t streamId = pCreate->streamId;
165,600✔
343
  char   *pUser = RPC_MSG_USER(pReq);
165,600✔
344

345
  if (pCreate->streamDB) {
165,600✔
346
    // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->streamDB);
347
    code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
165,600✔
348
                                     pCreate->streamDB, false);
165,600✔
349
    if (code) {
165,600✔
350
      if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
UNCOV
351
      mstsError("user %s failed to create stream %s in db %s since %s", pUser, pCreate->name, pCreate->streamDB,
×
352
                tstrerror(code));
353
    }
354
    TSDB_CHECK_CODE(code, lino, _OVER);
165,600✔
355
  }
356

357
  if (pCreate->triggerDB) {
165,600✔
358
    // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, pCreate->triggerDB);
359
    code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
164,732✔
360
                                     pCreate->triggerDB, false);
164,732✔
361
    if (code) {
164,732✔
UNCOV
362
      if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
UNCOV
363
      mstsError("user %s failed to create stream %s using trigger db %s since %s", pUser, pCreate->name,
×
364
                pCreate->triggerDB, tstrerror(code));
365
    }
366
    TSDB_CHECK_CODE(code, lino, _OVER);
164,732✔
367
#if 0  // TODO check the owner of trigger table
368
    if (pCreate->triggerTblName) {
369
      // check trigger table privilege
370
      code = mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_TBL_SELECT, "", pCreate->triggerDB, pCreate->triggerTblName);
371
      if (code) {
372
        mstsError("user %s failed to create stream %s using trigger table %s.%s since %s", pUser, pCreate->name,
373
                  pCreate->triggerDB, pCreate->triggerTblName, tstrerror(code));
374
      }
375
      TSDB_CHECK_CODE(code, lino, _OVER);
376
    }
377
#endif
378
  }
379

380
  if (pCreate->calcDB) {
165,600✔
381
    int32_t dbNum = taosArrayGetSize(pCreate->calcDB);
162,800✔
382
    for (int32_t i = 0; i < dbNum; ++i) {
325,600✔
383
      char *calcDB = taosArrayGetP(pCreate->calcDB, i);
162,800✔
384
      // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, calcDB);
385
      code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, calcDB, false);
162,800✔
386
      if (code) {
162,800✔
UNCOV
387
        if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
UNCOV
388
        mstsError("user %s failed to create stream %s using calcDB %s since %s", pUser, pCreate->name, calcDB,
×
389
                  tstrerror(code));
390
      }
391
      TSDB_CHECK_CODE(code, lino, _OVER);
162,800✔
392
    }
393
  }
394

395
  if (pCreate->outDB) {
165,600✔
396
    // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->outDB);
397
    code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pCreate->outDB,
162,800✔
398
                                     false);
399
    if (code) {
162,800✔
UNCOV
400
      if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
UNCOV
401
      mstsError("user %s failed to create stream %s using out db %s since %s", pUser, pCreate->name, pCreate->outDB,
×
402
                tstrerror(code));
403
    }
404
    TSDB_CHECK_CODE(code, lino, _OVER);
162,800✔
405
  }
406

407
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
165,600✔
408
  if (streamNum > MND_STREAM_MAX_NUM) {
165,600✔
UNCOV
409
    code = TSDB_CODE_MND_TOO_MANY_STREAMS;
×
UNCOV
410
    mstsError("failed to create stream %s since %s, stream number:%d", pCreate->name, tstrerror(code), streamNum);
×
UNCOV
411
    return code;
×
412
  }
413

414
_OVER:
165,600✔
415

416
  return code;
165,600✔
417
}
418

419
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
631,790✔
420
  SSdb   *pSdb = pMnode->pSdb;
631,790✔
421
  void   *pIter = NULL;
631,790✔
422
  int32_t code = 0;
631,790✔
423

424
  while (1) {
43,466✔
425
    SStreamObj *pStream = NULL;
675,256✔
426
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
675,256✔
427
    if (pIter == NULL) break;
675,256✔
428

429
    if (0 == strcmp(pStream->pCreate->streamDB, pDb->name)) {
43,466✔
430
      mInfo("start to drop stream %s in db %s", pStream->pCreate->name, pDb->name);
17,254✔
431
      
432
      pStream->updateTime = taosGetTimestampMs();
34,508✔
433
      
434
      atomic_store_8(&pStream->userDropped, 1);
17,254✔
435
      
436
      MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
17,254✔
437
      
438
      msmUndeployStream(pMnode, pStream->pCreate->streamId, pStream->pCreate->name);
17,254✔
439
      
440
      // drop stream
441
      code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
17,254✔
442
      if (code) {
17,254✔
UNCOV
443
        mError("drop db trans:%d failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
444
        sdbRelease(pSdb, pStream);
×
UNCOV
445
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
446
        TAOS_RETURN(code);
×
447
      }
448
    }
449

450
    sdbRelease(pSdb, pStream);
43,466✔
451
  }
452

453
  return 0;
631,790✔
454
}
455

456
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
235,942✔
457
  SMnode     *pMnode = pReq->info.node;
235,942✔
458
  SSdb       *pSdb = pMnode->pSdb;
235,942✔
459
  int32_t     numOfRows = 0;
235,942✔
460
  SStreamObj *pStream = NULL;
235,942✔
461
  SUserObj   *pOperUser = NULL;
235,942✔
462
  int32_t     code = 0, lino = 0;
235,942✔
463
  bool        showAll = false;
235,942✔
464

465
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
235,942✔
466
  showAll =
235,942✔
467
      (0 == mndCheckObjPrivilegeRec(pMnode, pOperUser, PRIV_CM_SHOW, PRIV_OBJ_STREAM, 0, pOperUser->acctId, "*", "*"));
235,942✔
468

469
  while (numOfRows < rows) {
741,451✔
470
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
741,451✔
471
    if (pShow->pIter == NULL) break;
741,451✔
472

473
    if (!showAll) {
505,509✔
UNCOV
474
      if ((mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_SHOW, PRIV_OBJ_STREAM, pStream->ownerId,
×
UNCOV
475
                                    pStream->pCreate->streamDB, mndGetStableStr(pStream->pCreate->name)))) {
×
UNCOV
476
        sdbRelease(pSdb, pStream);
×
UNCOV
477
        continue;
×
478
      }
479
    }
480

481
    code = mstSetStreamAttrResBlock(pMnode, pStream, pBlock, numOfRows);
505,509✔
482
    if (code == 0) {
505,509✔
483
      numOfRows++;
505,509✔
484
    }
485
    sdbRelease(pSdb, pStream);
505,509✔
486
  }
487
  code = 0;
235,942✔
488
  pShow->numOfRows += numOfRows;
235,942✔
489
_exit:
235,942✔
490
  mndReleaseUser(pMnode, pOperUser);
235,942✔
491
  if (code != 0) {
235,942✔
UNCOV
492
    mError("failed to retrieve stream list at line %d since %s", lino, tstrerror(code));
×
UNCOV
493
    TAOS_RETURN(code);
×
494
  }
495
  return numOfRows;
235,942✔
496
}
497

UNCOV
498
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
UNCOV
499
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
500
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
501
}
×
502

503
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
284,526✔
504
  SMnode     *pMnode = pReq->info.node;
284,526✔
505
  SSdb       *pSdb = pMnode->pSdb;
284,526✔
506
  int32_t     numOfRows = 0;
284,526✔
507
  SStreamObj *pStream = NULL;
284,526✔
508
  int32_t     code = 0;
284,526✔
509

510
  while (numOfRows < rowsCapacity) {
2,755,132✔
511
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
2,670,356✔
512
    if (pShow->pIter == NULL) {
2,670,356✔
513
      break;
199,750✔
514
    }
515

516
    code = mstSetStreamTasksResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
2,470,606✔
517

518
    sdbRelease(pSdb, pStream);
2,470,606✔
519
  }
520

521
  pShow->numOfRows += numOfRows;
284,526✔
522
  return numOfRows;
284,526✔
523
}
524

UNCOV
525
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
UNCOV
526
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
527
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
528
}
×
529

530
static int32_t mndRetrieveStreamRecalculates(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
256✔
531
  SMnode     *pMnode = pReq->info.node;
256✔
532
  SSdb       *pSdb = pMnode->pSdb;
256✔
533
  int32_t     numOfRows = 0;
256✔
534
  SStreamObj *pStream = NULL;
256✔
535
  int32_t     code = 0;
256✔
536

537
  while (numOfRows < rowsCapacity) {
512✔
538
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
512✔
539
    if (pShow->pIter == NULL) {
512✔
540
      break;
256✔
541
    }
542

543
    code = mstSetStreamRecalculatesResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
256✔
544

545
    sdbRelease(pSdb, pStream);
256✔
546
  }
547

548
  pShow->numOfRows += numOfRows;
256✔
549
  return numOfRows;
256✔
550
}
551

UNCOV
552
static void mndCancelGetNextStreamRecalculates(SMnode *pMnode, void *pIter) {
×
UNCOV
553
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
554
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
555
}
×
556

557

558
static bool mndStreamUpdateTagsFlag(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
2,024,035✔
559
  SStreamObj *pStream = pObj;
2,024,035✔
560
  if (atomic_load_8(&pStream->userDropped)) {
2,024,035✔
UNCOV
561
    return true;
×
562
  }
563

564
  if (TSDB_SUPER_TABLE != pStream->pCreate->triggerTblType && 
2,024,035✔
565
      TSDB_CHILD_TABLE != pStream->pCreate->triggerTblType && 
1,116,316✔
566
      TSDB_VIRTUAL_CHILD_TABLE != pStream->pCreate->triggerTblType) {
798,462✔
567
    return true;
122,132✔
568
  }
569

570
  if (pStream->pCreate->triggerTblSuid != *(uint64_t*)p1) {
1,901,903✔
571
    return true;
1,861,459✔
572
  }
573

574
  if (NULL == pStream->pCreate->partitionCols) {
40,444✔
575
    return true;
3,728✔
576
  }
577

578
  SNodeList* pList = NULL;
36,716✔
579
  int32_t code = nodesStringToList(pStream->pCreate->partitionCols, &pList);
36,716✔
580
  if (code) {
36,716✔
UNCOV
581
    nodesDestroyList(pList);
×
UNCOV
582
    mstError("partitionCols [%s] nodesStringToList failed with error:%s", (char*)pStream->pCreate->partitionCols, tstrerror(code));
×
UNCOV
583
    return true;
×
584
  }
585

586
  SSchema* pTags = (SSchema*)p2;
36,716✔
587
  int32_t* tagNum = (int32_t*)p3;
36,716✔
588

589
  SNode* pNode = NULL;
36,716✔
590
  FOREACH(pNode, pList) {
119,372✔
591
    SColumnNode* pCol = (SColumnNode*)pNode;
82,656✔
592
    for (int32_t i = 0; i < *tagNum; ++i) {
248,692✔
593
      if (pCol->colId == pTags[i].colId) {
234,503✔
594
        pTags[i].flags |= COL_REF_BY_STM;
68,467✔
595
        break;
68,467✔
596
      }
597
    }
598
  }
599

600
  nodesDestroyList(pList);
36,716✔
601
  
602
  return true;
36,716✔
603
}
604

605

606
void mndStreamUpdateTagsRefFlag(SMnode *pMnode, int64_t suid, SSchema* pTags, int32_t tagNum) {
14,572,969✔
607
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
14,572,969✔
608
  if (streamNum <= 0) {
14,573,636✔
609
    return;
14,445,691✔
610
  }
611

612
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mndStreamUpdateTagsFlag, &suid, pTags, &tagNum);
127,945✔
613
}
614

615
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) {
2,178✔
616
  SMnode     *pMnode = pReq->info.node;
2,178✔
617
  SStreamObj *pStream = NULL;
2,178✔
618
  SUserObj   *pOperUser = NULL;
2,178✔
619
  int32_t     code = 0;
2,178✔
620

621
  SMPauseStreamReq pauseReq = {0};
2,178✔
622
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
2,178✔
UNCOV
623
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
624
  }
625

626
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
2,178✔
627
  if (pStream == NULL || code != 0) {
2,178✔
UNCOV
628
    if (pauseReq.igNotExists) {
×
629
      mInfo("stream:%s, not exist, not stop stream", pauseReq.name);
×
630
      taosMemoryFree(pauseReq.name);
×
631
      return 0;
×
632
    } else {
UNCOV
633
      mError("stream:%s not exist, failed to stop stream", pauseReq.name);
×
UNCOV
634
      taosMemoryFree(pauseReq.name);
×
UNCOV
635
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
636
    }
637
  }
638

639
  taosMemoryFree(pauseReq.name);
2,178✔
640

641
  int64_t streamId = pStream->pCreate->streamId;
2,178✔
642
  
643
  mstsInfo("start to stop stream %s", pStream->name);
2,178✔
644

645
  // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
646
  if((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) {
2,178✔
UNCOV
647
    mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
648
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
649
    return code;
×
650
  }
651
  if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
2,178✔
652
                                        pStream->pCreate->streamDB, false))) {
2,178✔
653
    if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
654
  }
655
  if ((code != TSDB_CODE_SUCCESS) ||
4,356✔
656
      (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_STOP, PRIV_OBJ_STREAM, pStream->ownerId,
2,178✔
657
                                       pStream->pCreate->streamDB, mndGetStableStr(pStream->pCreate->name)))) {
2,178✔
UNCOV
658
    mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
659
    mndReleaseUser(pMnode, pOperUser);
×
UNCOV
660
    sdbRelease(pMnode->pSdb, pStream);
×
661
    return code;
×
662
  }
663

664
  mndReleaseUser(pMnode, pOperUser); // release user after privilege check
2,178✔
665

666
  if (atomic_load_8(&pStream->userDropped)) {
2,178✔
UNCOV
667
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
UNCOV
668
    mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
669
    sdbRelease(pMnode->pSdb, pStream);
×
670
    return code;
×
671
  }
672

673
  STrans *pTrans = NULL;
2,178✔
674
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, &pTrans);
2,178✔
675
  if (pTrans == NULL || code) {
2,178✔
UNCOV
676
    mstsError("failed to stop stream %s since %s", pStream->name, tstrerror(code));
×
UNCOV
677
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
678
    return code;
×
679
  }
680

681
  pStream->updateTime = taosGetTimestampMs();
4,356✔
682

683
  atomic_store_8(&pStream->userStopped, 1);
2,178✔
684

685
  MND_STREAM_SET_LAST_TS(STM_EVENT_STOP_STREAM, pStream->updateTime);
2,178✔
686

687
  msmUndeployStream(pMnode, streamId, pStream->name);
2,178✔
688

689
  // stop stream
690
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
2,178✔
691
  if (code != TSDB_CODE_SUCCESS) {
2,178✔
UNCOV
692
    sdbRelease(pMnode->pSdb, pStream);
×
693
    mndTransDrop(pTrans);
×
694
    return code;
×
695
  }
696

697
  code = mndTransPrepare(pMnode, pTrans);
2,178✔
698
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,178✔
UNCOV
699
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
700
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
701
    mndTransDrop(pTrans);
×
UNCOV
702
    return code;
×
703
  }
704

705
  sdbRelease(pMnode->pSdb, pStream);
2,178✔
706
  mndTransDrop(pTrans);
2,178✔
707

708
  return TSDB_CODE_ACTION_IN_PROGRESS;
2,178✔
709
}
710

711

712
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq) {
2,178✔
713
  SMnode     *pMnode = pReq->info.node;
2,178✔
714
  SStreamObj *pStream = NULL;
2,178✔
715
  SUserObj   *pOperUser = NULL;
2,178✔
716
  int32_t     code = 0;
2,178✔
717

718
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
2,178✔
UNCOV
719
    return code;
×
720
  }
721

722
  SMResumeStreamReq resumeReq = {0};
2,178✔
723
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
2,178✔
724
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
725
  }
726

727
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
2,178✔
728
  if (pStream == NULL || code != 0) {
2,178✔
729
    if (resumeReq.igNotExists) {
×
730
      mInfo("stream:%s not exist, not start stream", resumeReq.name);
×
731
      taosMemoryFree(resumeReq.name);
×
UNCOV
732
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
733
      return 0;
×
734
    } else {
UNCOV
735
      mError("stream:%s not exist, failed to start stream", resumeReq.name);
×
UNCOV
736
      taosMemoryFree(resumeReq.name);
×
UNCOV
737
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
738
    }
739
  }
740

741
  taosMemoryFree(resumeReq.name);
2,178✔
742

743
  int64_t streamId = pStream->pCreate->streamId;
2,178✔
744

745
  mstsInfo("start to start stream %s from stopped", pStream->name);
2,178✔
746

747
  // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
748
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) {
2,178✔
UNCOV
749
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
750
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
751
    return code;
×
752
  }
753
  if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
2,178✔
754
                                        pStream->pCreate->streamDB, false))) {
2,178✔
755
    if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
756
  }
757
  if ((code != TSDB_CODE_SUCCESS) ||
4,356✔
758
      (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_START, PRIV_OBJ_STREAM, pStream->ownerId,
2,178✔
759
                                       pStream->pCreate->streamDB, mndGetStableStr(pStream->pCreate->name)))) {
2,178✔
UNCOV
760
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
761
    mndReleaseUser(pMnode, pOperUser);
×
762
    sdbRelease(pMnode->pSdb, pStream);
×
763
    return code;
×
764
  }
765

766
  mndReleaseUser(pMnode, pOperUser); // release user after privilege check
2,178✔
767

768
  if (atomic_load_8(&pStream->userDropped)) {
2,178✔
769
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
770
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
771
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
772
    return code;
×
773
  }
774

775
  if (0 == atomic_load_8(&pStream->userStopped)) {
2,178✔
UNCOV
776
    code = TSDB_CODE_MND_STREAM_NOT_STOPPED;
×
UNCOV
777
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
778
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
779
    return code;
×
780
  }
781
  
782
  atomic_store_8(&pStream->userStopped, 0);
2,178✔
783

784
  pStream->updateTime = taosGetTimestampMs();
4,356✔
785

786
  MND_STREAM_SET_LAST_TS(STM_EVENT_START_STREAM, pStream->updateTime);
2,178✔
787

788
  STrans *pTrans = NULL;
2,178✔
789
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_START_NAME, &pTrans);
2,178✔
790
  if (pTrans == NULL || code) {
2,178✔
791
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
792
    sdbRelease(pMnode->pSdb, pStream);
×
793
    return code;
×
794
  }
795

796
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
2,178✔
797
  if (code != TSDB_CODE_SUCCESS) {
2,178✔
798
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
799
    sdbRelease(pMnode->pSdb, pStream);
×
800
    mndTransDrop(pTrans);
×
801
    return code;
×
802
  }
803

804
  code = mndTransPrepare(pMnode, pTrans);
2,178✔
805
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,178✔
UNCOV
806
    mstsError("trans:%d, failed to prepare start stream %s trans since %s", pTrans->id, pStream->name, tstrerror(code));
×
UNCOV
807
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
808
    mndTransDrop(pTrans);
×
UNCOV
809
    return code;
×
810
  }
811

812
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->name, NULL, true, STREAM_ACT_DEPLOY);
2,178✔
813

814
  sdbRelease(pMnode->pSdb, pStream);
2,178✔
815
  mndTransDrop(pTrans);
2,178✔
816

817
  return TSDB_CODE_ACTION_IN_PROGRESS;
2,178✔
818
}
819

820
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
8,670✔
821
  SMnode     *pMnode = pReq->info.node;
8,670✔
822
  SStreamObj *pStream = NULL;
8,670✔
823
  SUserObj   *pOperUser = NULL;
8,670✔
824
  int32_t     code = 0;
8,670✔
825
  int32_t     notExistNum = 0;
8,670✔
826

827
  SMDropStreamReq dropReq = {0};
8,670✔
828
  int64_t         tss = taosGetTimestampMs();
8,670✔
829
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
8,670✔
UNCOV
830
    mError("invalid drop stream msg recv, discarded");
×
UNCOV
831
    code = TSDB_CODE_INVALID_MSG;
×
832
    TAOS_RETURN(code);
×
833
  }
834

835
  mDebug("recv drop stream msg, count:%d", dropReq.count);
8,670✔
836

837
  // Acquire user object for privilege check
838
  code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser);
8,670✔
839
  if (code != 0) {
8,670✔
840
    tFreeMDropStreamReq(&dropReq);
×
841
    TAOS_RETURN(code);
×
842
  }
843

844
  // check if all streams exist
845
  if (!dropReq.igNotExists) {
8,670✔
846
    for (int32_t i = 0; i < dropReq.count; i++) {
14,120✔
847
      if (!sdbCheckExists(pMnode->pSdb, SDB_STREAM, dropReq.name[i])) {
7,685✔
848
        mError("stream:%s not exist failed to drop it", dropReq.name[i]);
1,000✔
849
        mndReleaseUser(pMnode, pOperUser);
1,000✔
850
        tFreeMDropStreamReq(&dropReq);
1,000✔
851
        TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1,000✔
852
      }
853
    }
854
  }
855

856
  // Create a single transaction for all stream drops
857
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_DROP_NAME);
7,670✔
858
  if (pTrans == NULL) {
7,670✔
UNCOV
859
    mError("failed to create drop stream transaction since %s", tstrerror(terrno));
×
UNCOV
860
    code = terrno;
×
UNCOV
861
    mndReleaseUser(pMnode, pOperUser);
×
UNCOV
862
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
863
    TAOS_RETURN(code);
×
864
  }
865
  pTrans->ableToBeKilled = true;
7,670✔
866

867
  // Process all streams and add them to the transaction
868
  for (int32_t i = 0; i < dropReq.count; i++) {
16,840✔
869
    char *streamName = dropReq.name[i];
9,170✔
870
    mDebug("drop stream[%d/%d]: %s", i + 1, dropReq.count, streamName);
9,170✔
871

872
    code = mndAcquireStream(pMnode, streamName, &pStream);
9,170✔
873
    if (pStream == NULL || code != 0) {
9,170✔
874
      mWarn("stream:%s not exist, ignore not exist is set, drop stream exec done with success", streamName);
1,000✔
875
      sdbRelease(pMnode->pSdb, pStream);
1,000✔
876
      pStream = NULL;
1,000✔
877
      notExistNum++;
1,000✔
878
      continue;
1,000✔
879
    }
880

881
    int64_t streamId = pStream->pCreate->streamId;
8,170✔
882

883
    if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
8,170✔
884
                                          pStream->pCreate->streamDB, true))) {
8,170✔
UNCOV
885
      if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
886
    }
887
    if ((code != TSDB_CODE_SUCCESS) ||
16,340✔
888
        (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_DROP, PRIV_OBJ_STREAM, pStream->ownerId,
8,170✔
889
                                         pStream->pCreate->streamDB, mndGetStableStr(pStream->pCreate->name)))) {
8,170✔
890
      mstsError("user %s failed to drop stream %s since %s", pReq->info.conn.user, streamName, tstrerror(code));
×
891
      sdbRelease(pMnode->pSdb, pStream);
×
892
      pStream = NULL;
×
893
      mndTransDrop(pTrans);
×
894
      pTrans = NULL;
×
895
      goto _OVER;
×
896
    }
897

898
    if (pStream->pCreate->tsmaId != 0) {
8,170✔
899
      mstsDebug("try to drop tsma related stream, tsmaId:%" PRIx64, pStream->pCreate->tsmaId);
1,962✔
900

901
      void    *pIter = NULL;
1,962✔
902
      SSmaObj *pSma = NULL;
1,962✔
903
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
1,962✔
904
      while (pIter) {
3,815✔
905
        if (pSma && pSma->uid == pStream->pCreate->tsmaId) {
1,853✔
UNCOV
906
          sdbRelease(pMnode->pSdb, pSma);
×
UNCOV
907
          sdbRelease(pMnode->pSdb, pStream);
×
908
          pStream = NULL;
×
909

UNCOV
910
          sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
911
          code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
912

UNCOV
913
          mstsError("refused to drop tsma-related stream %s since tsma still exists", streamName);
×
UNCOV
914
          mndTransDrop(pTrans);
×
UNCOV
915
          pTrans = NULL;
×
UNCOV
916
          goto _OVER;
×
917
        }
918

919
        if (pSma) {
1,853✔
920
          sdbRelease(pMnode->pSdb, pSma);
1,853✔
921
        }
922

923
        pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
1,853✔
924
      }
925
    }
926

927
    mstsInfo("start to drop stream %s", pStream->pCreate->name);
8,170✔
928

929
    pStream->updateTime = taosGetTimestampMs();
16,340✔
930

931
    atomic_store_8(&pStream->userDropped, 1);
8,170✔
932

933
    MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
8,170✔
934

935
    msmUndeployStream(pMnode, streamId, pStream->pCreate->name);
8,170✔
936

937
    // Append drop stream operation to the transaction
938
    code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
8,170✔
939
    if (code) {
8,170✔
UNCOV
940
      mstsError("trans:%d, failed to append drop stream %s trans since %s", pTrans->id, streamName, tstrerror(code));
×
UNCOV
941
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
942
      pStream = NULL;
×
943
      // mndStreamTransAppend already called mndTransDrop on failure, set pTrans to NULL to avoid double free
UNCOV
944
      pTrans = NULL;
×
UNCOV
945
      goto _OVER;
×
946
    }
947

948
    sdbRelease(pMnode->pSdb, pStream);
8,170✔
949
    pStream = NULL;
8,170✔
950

951
    mstsDebug("drop stream %s added to transaction", streamName);
8,170✔
952
  }
953

954
  // Prepare and execute the transaction for all streams
955
  if (notExistNum < dropReq.count) {
7,670✔
956
    code = mndTransPrepare(pMnode, pTrans);
7,420✔
957
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
7,420✔
UNCOV
958
      mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
959
      mndTransDrop(pTrans);
×
UNCOV
960
      goto _OVER;
×
961
    }
962
    mInfo("trans:%d, drop stream transaction prepared for %d streams", pTrans->id, dropReq.count - notExistNum);
7,420✔
963
  } else {
964
    // All streams don't exist, no need to prepare transaction
965
    mndTransDrop(pTrans);
250✔
966
    pTrans = NULL;
250✔
967
  }
968

969
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE && notExistNum < dropReq.count) {
7,670✔
970
    int64_t tse = taosGetTimestampMs();
7,420✔
971
    double  duration = (double)(tse - tss);
7,420✔
972
    duration = duration / 1000;
7,420✔
973
    // Use first stream's database for audit (assuming all streams are from same db in batch)
974
    if (dropReq.count > 0) {
7,420✔
975
      SStreamObj *pFirstStream = NULL;
7,420✔
976
      if (mndAcquireStream(pMnode, dropReq.name[0], &pFirstStream) == 0 && pFirstStream != NULL) {
7,420✔
UNCOV
977
        auditRecord(pReq, pMnode->clusterId, "dropStream", "", pFirstStream->pCreate->streamDB, NULL, 0, duration, 0);
×
978
        sdbRelease(pMnode->pSdb, pFirstStream);
×
979
      }
980
    }
981
  }
982

983
  // If any stream was successfully added to transaction, return ACTION_IN_PROGRESS
984
  // Otherwise, all streams don't exist (and igNotExists is set), return SUCCESS
985
  code = (notExistNum < dropReq.count) ? TSDB_CODE_ACTION_IN_PROGRESS : TSDB_CODE_SUCCESS;
7,670✔
986

987
_OVER:
7,670✔
988
  if (pStream) {
7,670✔
UNCOV
989
    sdbRelease(pMnode->pSdb, pStream);
×
990
  }
991
  if (pTrans) {
7,670✔
992
    mndTransDrop(pTrans);
7,420✔
993
  }
994
  mndReleaseUser(pMnode, pOperUser);
7,670✔
995
  tFreeMDropStreamReq(&dropReq);
7,670✔
996
  TAOS_RETURN(code);
7,670✔
997
}
998

999
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
173,070✔
1000
  SMnode     *pMnode = pReq->info.node;
173,070✔
1001
  SStreamObj *pStream = NULL;
173,070✔
1002
  SStreamObj  streamObj = {0};
173,070✔
1003
  SUserObj    *pOperUser = NULL;
173,070✔
1004
  int32_t     code = TSDB_CODE_SUCCESS;
173,070✔
1005
  int32_t     lino = 0;
173,070✔
1006
  STrans     *pTrans = NULL;
173,070✔
1007
  uint64_t    streamId = 0;
173,070✔
1008
  SCMCreateStreamReq* pCreate = NULL;
173,070✔
1009
  int64_t             tss = taosGetTimestampMs();
173,070✔
1010

1011
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
173,070✔
UNCOV
1012
    goto _OVER;
×
1013
  }
1014
  
1015
#ifdef WINDOWS
1016
  code = TSDB_CODE_MND_INVALID_PLATFORM;
1017
  goto _OVER;
1018
#endif
1019

1020
  pCreate = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
173,070✔
1021
  TSDB_CHECK_NULL(pCreate, code, lino, _OVER, terrno);
173,070✔
1022
  
1023
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, pCreate);
173,070✔
1024
  TSDB_CHECK_CODE(code, lino, _OVER);
173,070✔
1025

1026
  streamId = pCreate->streamId;
173,070✔
1027

1028
  mstsInfo("start to create stream %s, sql:%s", pCreate->name, pCreate->sql);
173,070✔
1029

1030
  int32_t snodeId = msmAssignRandomSnodeId(pMnode, streamId);
173,070✔
1031
  if (!GOT_SNODE(snodeId)) {
173,070✔
1032
    code = terrno;
2,530✔
1033
    TSDB_CHECK_CODE(code, lino, _OVER);
2,530✔
1034
  }
1035
  
1036
  code = mndAcquireStream(pMnode, pCreate->name, &pStream);
170,540✔
1037
  if (pStream != NULL && code == 0) {
170,540✔
1038
    if (pCreate->igExists) {
4,940✔
UNCOV
1039
      mstsInfo("stream %s already exist, ignore exist is set", pCreate->name);
×
1040
    } else {
1041
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
4,940✔
1042
    }
1043

1044
    mndReleaseStream(pMnode, pStream);
4,940✔
1045
    goto _OVER;
4,940✔
1046
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
165,600✔
UNCOV
1047
    goto _OVER;
×
1048
  }
1049

1050
  code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser);
165,600✔
1051
  if (pOperUser == NULL) {
165,600✔
UNCOV
1052
    TSDB_CHECK_CODE(TSDB_CODE_MND_NO_USER_FROM_CONN, lino, _OVER);
×
1053
  }
1054

1055
  code = mndStreamValidateCreate(pMnode, pReq, pCreate);
165,600✔
1056
  TSDB_CHECK_CODE(code, lino, _OVER);
165,600✔
1057

1058
  mndStreamBuildObj(pMnode, &streamObj, pCreate, pOperUser, snodeId);
165,600✔
1059
  pCreate = NULL;
165,600✔
1060

1061
  pStream = &streamObj;
165,600✔
1062

1063
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, &pTrans);
165,600✔
1064
  if (pTrans == NULL || code) {
165,600✔
UNCOV
1065
    goto _OVER;
×
1066
  }
1067

1068
  // create stb for stream
1069
  if (TSDB_SUPER_TABLE == pStream->pCreate->outTblType && !pStream->pCreate->outStbExists) {
165,600✔
1070
    pStream->pCreate->outStbUid = mndGenerateUid(pStream->pCreate->outTblName, strlen(pStream->pCreate->outTblName));
62,144✔
1071
    code = mndStreamCreateOutStb(pMnode, pTrans, pStream->pCreate, RPC_MSG_USER(pReq));
62,144✔
1072
    TSDB_CHECK_CODE(code, lino, _OVER);
62,144✔
1073
  }
1074

1075
  // add stream to trans
1076
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
165,600✔
1077
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
165,600✔
UNCOV
1078
    mstsError("failed to persist stream %s since %s", pStream->pCreate->name, tstrerror(code));
×
UNCOV
1079
    goto _OVER;
×
1080
  }
1081

1082
  // execute creation
1083
  code = mndTransPrepare(pMnode, pTrans);
165,600✔
1084
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
165,600✔
UNCOV
1085
    mstsError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
UNCOV
1086
    goto _OVER;
×
1087
  }
1088
  code = TSDB_CODE_ACTION_IN_PROGRESS;
165,600✔
1089

1090
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
165,600✔
1091
    int64_t tse = taosGetTimestampMs();
165,600✔
1092
    double  duration = (double)(tse - tss);
165,600✔
1093
    duration = duration / 1000;
165,600✔
1094
    auditRecord(pReq, pMnode->clusterId, "createStream", pStream->pCreate->streamDB, pStream->pCreate->name,
165,600✔
1095
                pStream->pCreate->sql, strlen(pStream->pCreate->sql), duration, 0);
165,600✔
1096
  }
1097

1098
  MND_STREAM_SET_LAST_TS(STM_EVENT_CREATE_STREAM, taosGetTimestampMs());
304,768✔
1099

1100
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, true, STREAM_ACT_DEPLOY);
165,600✔
1101

1102
_OVER:
173,070✔
1103

1104
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
173,070✔
1105
    if (pStream && pStream->pCreate) {
7,470✔
1106
      mstsError("failed to create stream %s at line:%d since %s", pStream->pCreate->name, lino, tstrerror(code));
4,940✔
1107
    } else {
1108
      mstsError("failed to create stream at line:%d since %s", lino, tstrerror(code));
2,530✔
1109
    }
1110
  } else {
1111
    mstsDebug("create stream %s half completed", pStream->pCreate ? pStream->pCreate->name : "unknown");
165,600✔
1112
  }
1113

1114
  tFreeSCMCreateStreamReq(pCreate);
173,070✔
1115
  taosMemoryFreeClear(pCreate);
173,070✔
1116

1117
  mndTransDrop(pTrans);
173,070✔
1118
  tFreeStreamObj(&streamObj);
173,070✔
1119
  mndReleaseUser(pMnode, pOperUser);
173,070✔
1120

1121
  return code;
173,070✔
1122
}
1123

1124
static int32_t mndProcessRecalcStreamReq(SRpcMsg *pReq) {
11,188✔
1125
  SMnode     *pMnode = pReq->info.node;
11,188✔
1126
  SStreamObj *pStream = NULL;
11,188✔
1127
  SUserObj   *pOperUser = NULL;
11,188✔
1128
  int32_t     code = 0;
11,188✔
1129
  int64_t     tss = taosGetTimestampMs();
11,188✔
1130

1131
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
11,188✔
1132
    return code;
×
1133
  }
1134

1135
  SMRecalcStreamReq recalcReq = {0};
11,188✔
1136
  if (tDeserializeSMRecalcStreamReq(pReq->pCont, pReq->contLen, &recalcReq) < 0) {
11,188✔
UNCOV
1137
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1138
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1139
  }
1140

1141
  code = mndAcquireStream(pMnode, recalcReq.name, &pStream);
11,188✔
1142
  if (pStream == NULL || code != 0) {
11,188✔
UNCOV
1143
    mError("stream:%s not exist, failed to recalc stream", recalcReq.name);
×
UNCOV
1144
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1145
    TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1146
  }
1147

1148
  int64_t streamId = pStream->pCreate->streamId;
11,188✔
1149
  
1150
  mstsInfo("start to recalc stream %s", recalcReq.name);
11,188✔
1151

1152
  // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
1153
  // if (code != TSDB_CODE_SUCCESS) {
1154
  //   mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code));
1155
  //   sdbRelease(pMnode->pSdb, pStream);
1156
  //   tFreeMRecalcStreamReq(&recalcReq);
1157
  //   return code;
1158
  // }
1159

1160
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) {
11,188✔
UNCOV
1161
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
1162
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1163
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1164
    TAOS_RETURN(code);
×
1165
  }
1166

1167
  if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
11,188✔
1168
                                        pStream->pCreate->streamDB, false))) {
11,188✔
1169
    if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
2,560✔
1170
  }
1171
  if ((code != TSDB_CODE_SUCCESS) ||
19,816✔
1172
      (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_RECALC, PRIV_OBJ_STREAM, pStream->ownerId,
8,628✔
1173
                                       pStream->pCreate->streamDB, mndGetStableStr(pStream->pCreate->name)))) {
8,628✔
1174
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
7,680✔
1175
    mndReleaseUser(pMnode, pOperUser);
7,680✔
1176
    sdbRelease(pMnode->pSdb, pStream);
7,680✔
1177
    tFreeMRecalcStreamReq(&recalcReq);
7,680✔
1178
    return code;
7,680✔
1179
  }
1180

1181
  mndReleaseUser(pMnode, pOperUser); // release user after privilege check
3,508✔
1182

1183
  if (atomic_load_8(&pStream->userDropped)) {
3,508✔
UNCOV
1184
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
UNCOV
1185
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code));
×
UNCOV
1186
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1187
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1188
    return code;
×
1189
  }
1190

1191
  if (atomic_load_8(&pStream->userStopped)) {
3,508✔
UNCOV
1192
    code = TSDB_CODE_MND_STREAM_STOPPED;
×
UNCOV
1193
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code));
×
UNCOV
1194
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1195
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1196
    return code;
×
1197
  }
1198

1199
  if (WINDOW_TYPE_PERIOD == pStream->pCreate->triggerType) {
3,508✔
1200
    code = TSDB_CODE_OPS_NOT_SUPPORT;
192✔
1201
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
192✔
1202
    sdbRelease(pMnode->pSdb, pStream);
192✔
1203
    tFreeMRecalcStreamReq(&recalcReq);
192✔
1204
    return code;
192✔
1205
  }
1206

1207
  /*
1208
  pStream->updateTime = taosGetTimestampMs();
1209

1210
  STrans *pTrans = NULL;
1211
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RECALC_NAME, &pTrans);
1212
  if (pTrans == NULL || code) {
1213
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
1214
    sdbRelease(pMnode->pSdb, pStream);
1215
    return code;
1216
  }
1217

1218
  // stop stream
1219
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
1220
  if (code != TSDB_CODE_SUCCESS) {
1221
    sdbRelease(pMnode->pSdb, pStream);
1222
    mndTransDrop(pTrans);
1223
    return code;
1224
  }
1225

1226
  code = mndTransPrepare(pMnode, pTrans);
1227
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1228
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
1229
    sdbRelease(pMnode->pSdb, pStream);
1230
    mndTransDrop(pTrans);
1231
    return code;
1232
  }
1233
*/
1234

1235
  code = msmRecalcStream(pMnode, pStream->pCreate->streamId, &recalcReq.timeRange);
3,316✔
1236
  if (code != TSDB_CODE_SUCCESS) {
3,316✔
UNCOV
1237
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1238
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1239
    return code;
×
1240
  }
1241

1242
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE){
3,316✔
1243
    char buf[128];
3,316✔
1244
    snprintf(buf, sizeof(buf), "start:%" PRId64 ", end:%" PRId64, recalcReq.timeRange.skey, recalcReq.timeRange.ekey);
3,316✔
1245
    int64_t tse = taosGetTimestampMs();
3,316✔
1246
    double  duration = (double)(tse - tss);
3,316✔
1247
    duration = duration / 1000;
3,316✔
1248
    auditRecord(pReq, pMnode->clusterId, "recalcStream", pStream->name, recalcReq.name, buf, strlen(buf), duration, 0);
3,316✔
1249
  }  
1250

1251
  sdbRelease(pMnode->pSdb, pStream);
3,316✔
1252
  tFreeMRecalcStreamReq(&recalcReq);
3,316✔
1253
//  mndTransDrop(pTrans);
1254

1255
  return TSDB_CODE_SUCCESS;
3,316✔
1256
}
1257

1258

1259
int32_t mndInitStream(SMnode *pMnode) {
418,540✔
1260
  SSdbTable table = {
418,540✔
1261
      .sdbType = SDB_STREAM,
1262
      .keyType = SDB_KEY_BINARY,
1263
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
1264
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
1265
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
1266
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
1267
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
1268
  };
1269

1270
  if (!tsDisableStream) {
418,540✔
1271
    mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
418,540✔
1272
    mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
418,540✔
1273
    mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessStartStreamReq);
418,540✔
1274
    mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessStopStreamReq);
418,540✔
1275
    mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);  
418,540✔
1276
    mndSetMsgHandle(pMnode, TDMT_MND_RECALC_STREAM, mndProcessRecalcStreamReq);
418,540✔
1277
  }
1278
  
1279
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
418,540✔
1280
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
418,540✔
1281
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
418,540✔
1282
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
418,540✔
1283
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndRetrieveStreamRecalculates);
418,540✔
1284
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndCancelGetNextStreamRecalculates);
418,540✔
1285

1286
  int32_t code = sdbSetTable(pMnode->pSdb, table);
418,540✔
1287
  if (code) {
418,540✔
UNCOV
1288
    return code;
×
1289
  }
1290

1291
  //code = sdbSetTable(pMnode->pSdb, tableSeq);
1292
  return code;
418,540✔
1293
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc