• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4908

30 Dec 2025 10:52AM UTC coverage: 65.386% (-0.2%) from 65.541%
#4908

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

1330 existing lines in 113 files now uncovered.

193461 of 295877 relevant lines covered (65.39%)

115765274.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.0
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndTrans.h"
23
#include "mndUser.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 100000
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStmRuntime  mStreamMgmt = {0};
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
43

44
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
45
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
46

47
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
48
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
49
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
50
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
51
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq);
52
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq);
53

54
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
55

56
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
57
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
58
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
59
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
60
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
61
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
62

63
void mndCleanupStream(SMnode *pMnode) {
397,918✔
64
  mDebug("try to clean up stream");
397,918✔
65
  
66
  msmHandleBecomeNotLeader(pMnode);
397,918✔
67
  
68
  mDebug("mnd stream runtime info cleanup");
397,918✔
69
}
397,918✔
70

71
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
152,423✔
72
  int32_t     code = 0;
152,423✔
73
  int32_t     lino = 0;
152,423✔
74
  SSdbRow    *pRow = NULL;
152,423✔
75
  SStreamObj *pStream = NULL;
152,423✔
76
  void       *buf = NULL;
152,423✔
77
  int8_t      sver = 0;
152,423✔
78
  int32_t     tlen;
151,975✔
79
  int32_t     dataPos = 0;
152,423✔
80

81
  code = sdbGetRawSoftVer(pRaw, &sver);
152,423✔
82
  TSDB_CHECK_CODE(code, lino, _over);
152,423✔
83

84
  if (sver != MND_STREAM_VER_NUMBER && sver != MND_STREAM_COMPATIBLE_VER_NUMBER) {
152,423✔
85
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
86
    goto _over;
×
87
  }
88

89
  pRow = sdbAllocRow(sizeof(SStreamObj));
152,423✔
90
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
152,423✔
91

92
  pStream = sdbGetRowObj(pRow);
152,423✔
93
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
152,423✔
94

95
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
152,423✔
96

97
  buf = taosMemoryMalloc(tlen + 1);
152,423✔
98
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
152,423✔
99

100
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
152,423✔
101

102
  SDecoder decoder;
151,975✔
103
  tDecoderInit(&decoder, buf, tlen + 1);
152,423✔
104
  code = tDecodeSStreamObj(&decoder, pStream, sver);
152,423✔
105
  tDecoderClear(&decoder);
152,423✔
106

107
  if (code < 0) {
152,423✔
108
    tFreeStreamObj(pStream);
×
109
  }
110

111
_over:
152,423✔
112
  taosMemoryFreeClear(buf);
152,423✔
113

114
  if (code != TSDB_CODE_SUCCESS) {
152,423✔
115
    char *p = (pStream == NULL || NULL == pStream->pCreate) ? "null" : pStream->pCreate->name;
×
116
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
117
    taosMemoryFreeClear(pRow);
×
118

119
    terrno = code;
×
120
    return NULL;
×
121
  } else {
122
    mTrace("stream:%s, decode from raw:%p, row:%p", pStream->pCreate->name, pRaw, pStream);
152,423✔
123

124
    terrno = 0;
152,423✔
125
    return pRow;
152,423✔
126
  }
127
}
128

129
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
138,999✔
130
  mTrace("stream:%s, perform insert action", pStream->pCreate->name);
138,999✔
131
  return 0;
138,999✔
132
}
133

134
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
152,423✔
135
  mInfo("stream:%s, perform delete action", pStream->pCreate->name);
152,423✔
136
  tFreeStreamObj(pStream);
152,423✔
137
  return 0;
152,423✔
138
}
139

140
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
4,372✔
141
  mTrace("stream:%s, perform update action", pOldStream->pCreate->name);
4,372✔
142

143
  atomic_store_32(&pOldStream->mainSnodeId, pNewStream->mainSnodeId);
4,372✔
144
  atomic_store_8(&pOldStream->userStopped, atomic_load_8(&pNewStream->userStopped));
4,372✔
145
  pOldStream->ownerId = pNewStream->ownerId;
4,372✔
146
  pOldStream->updateTime = pNewStream->updateTime;
4,372✔
147
  
148
  return 0;
4,372✔
149
}
150

151
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
312,197✔
152
  int32_t code = 0;
312,197✔
153
  SSdb   *pSdb = pMnode->pSdb;
312,197✔
154
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
312,197✔
155
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
312,197✔
156
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
145,030✔
157
  }
158
  return code;
312,197✔
159
}
160

161
static bool mndStreamGetNameFromId(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
162
  SStreamObj* pStream = pObj;
×
163

164
  if (pStream->pCreate->streamId == *(int64_t*)p1) {
×
165
    strncpy((char*)p2, pStream->name, TSDB_STREAM_NAME_LEN);
×
166
    return false;
×
167
  }
168

169
  return true;
×
170
}
171

172
int32_t mndAcquireStreamById(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
×
173
  int32_t code = 0;
×
174
  SSdb   *pSdb = pMnode->pSdb;
×
175
  char streamName[TSDB_STREAM_NAME_LEN];
×
176
  streamName[0] = 0;
×
177
  
178
  sdbTraverse(pSdb, SDB_STREAM, mndStreamGetNameFromId, &streamId, streamName, NULL);
×
179
  if (streamName[0]) {
×
180
    (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
181
    if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
182
      code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
183
    }
184
  }
185
  
186
  return code;
×
187
}
188

189
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
147,189✔
190
  SSdb *pSdb = pMnode->pSdb;
147,189✔
191
  sdbRelease(pSdb, pStream);
147,189✔
192
}
147,189✔
193

194
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
195
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
196
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
197
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
198
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
199

200
static void mndStreamBuildObj(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate, SUserObj *pOperUser,
138,176✔
201
                              int32_t snodeId) {
202
  int32_t code = 0;
138,176✔
203

204
  pObj->pCreate = pCreate;
138,176✔
205
  strncpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
138,176✔
206
  (void)snprintf(pObj->createUser, sizeof(pObj->createUser), "%s", pOperUser->name);
138,176✔
207
  pObj->ownerId = pOperUser->uid;
138,176✔
208
  pObj->mainSnodeId = snodeId;
138,176✔
209

210
  pObj->userDropped = 0;
138,176✔
211
  pObj->userStopped = 0;
138,176✔
212

213
  pObj->createTime = taosGetTimestampMs();
138,176✔
214
  pObj->updateTime = pObj->createTime;
138,176✔
215

216
  mstLogSStreamObj("create stream", pObj);
138,176✔
217
}
138,176✔
218

219
static int32_t mndStreamCreateOutStb(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream, const char *user) {
56,609✔
220
  SStbObj *pStb = NULL;
56,609✔
221
  SDbObj  *pDb = NULL;
56,609✔
222
  int32_t  code = 0;
56,609✔
223
  int32_t  lino = 0;
56,609✔
224

225
  SMCreateStbReq createReq = {0};
56,609✔
226
  TAOS_STRNCAT(createReq.name, pStream->outDB, TSDB_DB_FNAME_LEN);
56,609✔
227
  TAOS_STRNCAT(createReq.name, ".", 2);
56,609✔
228
  TAOS_STRNCAT(createReq.name,  pStream->outTblName, TSDB_TABLE_NAME_LEN);
56,609✔
229
  createReq.numOfColumns = taosArrayGetSize(pStream->outCols);
56,609✔
230
  createReq.numOfTags = pStream->outTags ? taosArrayGetSize(pStream->outTags) : 1;
56,609✔
231
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
56,609✔
232
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
56,609✔
233

234
  // build fields
235
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
299,145✔
236
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
242,536✔
237
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
242,536✔
238
    SFieldWithOptions *pSrc = taosArrayGet(pStream->outCols, i);
242,536✔
239

240
    tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
242,536✔
241
    pField->flags = pSrc->flags;
242,536✔
242
    pField->type = pSrc->type;
242,536✔
243
    pField->bytes = pSrc->bytes;
242,536✔
244
    pField->compress = createDefaultColCmprByType(pField->type);
242,536✔
245
    if (IS_DECIMAL_TYPE(pField->type)) {
242,536✔
246
      pField->typeMod = pSrc->typeMod;
×
247
      pField->flags |= COL_HAS_TYPE_MOD;
×
248
    }
249
  }
250

251
  if (NULL == pStream->outTags) {
56,609✔
252
    createReq.numOfTags = 1;
×
253
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
254
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
255

256
    // build tags
257
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
258
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
259

260
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
261
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
262
    pField->flags = 0;
×
263
    pField->bytes = 8;
×
264
  } else {
265
    createReq.numOfTags = taosArrayGetSize(pStream->outTags);
56,609✔
266
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
56,609✔
267
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
56,609✔
268

269
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
145,424✔
270
      SField *pField = taosArrayGet(createReq.pTags, i);
88,815✔
271
      if (pField == NULL) {
88,815✔
272
        continue;
×
273
      }
274

275
      TAOS_FIELD_E *pSrc = taosArrayGet(pStream->outTags, i);
88,815✔
276
      pField->bytes = pSrc->bytes;
88,815✔
277
      pField->flags = 0;
88,815✔
278
      pField->type = pSrc->type;
88,815✔
279
      tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
88,815✔
280
    }
281
  }
282

283
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
56,609✔
284
    goto _OVER;
×
285
  }
286

287
  pStb = mndAcquireStb(pMnode, createReq.name);
56,609✔
288
  if (pStb != NULL) {
56,609✔
289
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
290
    goto _OVER;
×
291
  }
292

293
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
56,609✔
294
  if (pDb == NULL) {
56,609✔
295
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
296
    goto _OVER;
×
297
  }
298

299
  int32_t numOfStbs = -1;
56,609✔
300
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
56,609✔
301
    goto _OVER;
×
302
  }
303

304
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
56,609✔
305
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
306
    goto _OVER;
×
307
  }
308

309
  SStbObj stbObj = {0};
56,609✔
310

311
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
56,609✔
312
    goto _OVER;
×
313
  }
314

315
  stbObj.uid = pStream->outStbUid;
56,609✔
316

317
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
56,609✔
318
    mndFreeStb(&stbObj);
×
319
    goto _OVER;
×
320
  }
321

322
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->outTblName, createReq.numOfColumns);
56,609✔
323

324
  tFreeSMCreateStbReq(&createReq);
56,609✔
325
  mndFreeStb(&stbObj);
56,609✔
326
  mndReleaseStb(pMnode, pStb);
56,609✔
327
  mndReleaseDb(pMnode, pDb);
56,609✔
328
  return code;
56,609✔
329

330
_OVER:
×
331
  tFreeSMCreateStbReq(&createReq);
×
332
  mndReleaseStb(pMnode, pStb);
×
333
  mndReleaseDb(pMnode, pDb);
×
334

335
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->outTblName, lino,
×
336
         tstrerror(code));
337
  return code;
×
338
}
339

340
static int32_t mndStreamValidateCreate(SMnode *pMnode, SUserObj* pOperUser, SCMCreateStreamReq* pCreate) {
138,176✔
341
  int32_t code = 0, lino = 0;
138,176✔
342
  int64_t streamId = pCreate->streamId;
138,176✔
343
  char   *pUser = pOperUser->name;
138,176✔
344
  char    objFName[TSDB_PRIV_MAX_KEY_LEN] = {0};
138,176✔
345

346
  (void)snprintf(objFName, sizeof(objFName), "%d.*", pOperUser->acctId);
138,176✔
347

348
  if (pCreate->streamDB) {
138,176✔
349
    // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->streamDB);
350
    code = mndCheckDbPrivilegeByNameRecF(pMnode, pOperUser, PRIV_DB_USE, PRIV_OBJ_DB, pCreate->streamDB, NULL);
138,176✔
351
    if (code) {
138,176✔
352
      mstsError("user %s failed to create stream %s in db %s since %s", pUser, pCreate->name, pCreate->streamDB,
×
353
                tstrerror(code));
354
    }
355
    TSDB_CHECK_CODE(code, lino, _OVER);
138,176✔
356
  }
357

358
  if (pCreate->triggerDB) {
138,176✔
359
    // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, pCreate->triggerDB);
360
    code = mndCheckDbPrivilegeByNameRecF(pMnode, pOperUser, PRIV_DB_USE, PRIV_OBJ_DB, pCreate->triggerDB, NULL);
137,235✔
361
    if (code) {
137,235✔
362
      mstsError("user %s failed to create stream %s using trigger db %s since %s", pUser, pCreate->name,
×
363
                pCreate->triggerDB, tstrerror(code));
364
    }
365
    TSDB_CHECK_CODE(code, lino, _OVER);
137,235✔
366
#if 0  // TODO check the owner of trigger table
367
    if (pCreate->triggerTblName) {
368
      // check trigger table privilege
369
      code = mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_TBL_SELECT, "", pCreate->triggerDB, pCreate->triggerTblName);
370
      if (code) {
371
        mstsError("user %s failed to create stream %s using trigger table %s.%s since %s", pUser, pCreate->name,
372
                  pCreate->triggerDB, pCreate->triggerTblName, tstrerror(code));
373
      }
374
      TSDB_CHECK_CODE(code, lino, _OVER);
375
    }
376
#endif
377
  }
378

379
  if (pCreate->calcDB) {
138,176✔
380
    int32_t dbNum = taosArrayGetSize(pCreate->calcDB);
135,056✔
381
    for (int32_t i = 0; i < dbNum; ++i) {
270,112✔
382
      char *calcDB = taosArrayGetP(pCreate->calcDB, i);
135,056✔
383
      // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, calcDB);
384
      code = mndCheckDbPrivilegeByNameRecF(pMnode, pOperUser, PRIV_DB_USE, PRIV_OBJ_DB, calcDB, NULL);
135,056✔
385
      if (code) {
135,056✔
386
        mstsError("user %s failed to create stream %s using calcDB %s since %s", pUser, pCreate->name, calcDB,
×
387
                  tstrerror(code));
388
      }
389
      TSDB_CHECK_CODE(code, lino, _OVER);
135,056✔
390
    }
391
  }
392

393
  if (pCreate->outDB) {
138,176✔
394
    // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->outDB);
395
    code = mndCheckDbPrivilegeByNameRecF(pMnode, pOperUser, PRIV_DB_USE, PRIV_OBJ_DB, pCreate->outDB, NULL);
135,056✔
396
    if (code) {
135,056✔
397
      mstsError("user %s failed to create stream %s using out db %s since %s", pUser, pCreate->name, pCreate->outDB,
×
398
                tstrerror(code));
399
    }
400
    TSDB_CHECK_CODE(code, lino, _OVER);
135,056✔
401
  }
402

403
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
138,176✔
404
  if (streamNum > MND_STREAM_MAX_NUM) {
138,176✔
405
    code = TSDB_CODE_MND_TOO_MANY_STREAMS;
×
406
    mstsError("failed to create stream %s since %s, stream number:%d", pCreate->name, tstrerror(code), streamNum);
×
407
    return code;
×
408
  }
409

410
_OVER:
138,176✔
411

412
  return code;
138,176✔
413
}
414

415
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
600,075✔
416
  SSdb   *pSdb = pMnode->pSdb;
600,075✔
417
  void   *pIter = NULL;
600,075✔
418
  int32_t code = 0;
600,075✔
419

420
  while (1) {
28,485✔
421
    SStreamObj *pStream = NULL;
628,560✔
422
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
628,560✔
423
    if (pIter == NULL) break;
628,560✔
424

425
    if (0 == strcmp(pStream->pCreate->streamDB, pDb->name)) {
28,485✔
426
      mInfo("start to drop stream %s in db %s", pStream->pCreate->name, pDb->name);
3,303✔
427
      
428
      pStream->updateTime = taosGetTimestampMs();
6,606✔
429
      
430
      atomic_store_8(&pStream->userDropped, 1);
3,303✔
431
      
432
      MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
3,303✔
433
      
434
      msmUndeployStream(pMnode, pStream->pCreate->streamId, pStream->pCreate->name);
3,303✔
435
      
436
      // drop stream
437
      code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
3,303✔
438
      if (code) {
3,303✔
439
        mError("drop db trans:%d failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
440
        sdbRelease(pSdb, pStream);
×
441
        sdbCancelFetch(pSdb, pIter);
×
442
        TAOS_RETURN(code);
×
443
      }
444
    }
445

446
    sdbRelease(pSdb, pStream);
28,485✔
447
  }
448

449
  return 0;
600,075✔
450
}
451

452
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
210,891✔
453
  SMnode     *pMnode = pReq->info.node;
210,891✔
454
  SSdb       *pSdb = pMnode->pSdb;
210,891✔
455
  int32_t     numOfRows = 0;
210,891✔
456
  SStreamObj *pStream = NULL;
210,891✔
457
  SUserObj   *pOperUser = NULL;
210,891✔
458
  int32_t     code = 0, lino = 0;
210,891✔
459
  bool        showAll = false;
210,891✔
460

461
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
210,891✔
462
  showAll =
210,891✔
463
      (0 == mndCheckObjPrivilegeRec(pMnode, pOperUser, PRIV_CM_SHOW, PRIV_OBJ_STREAM, 0, pOperUser->acctId, "*", "*"));
210,891✔
464

465
  while (numOfRows < rows) {
666,170✔
466
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
666,170✔
467
    if (pShow->pIter == NULL) break;
666,170✔
468

469
    if (!showAll) {
455,279✔
470
      if ((mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_SHOW, PRIV_OBJ_STREAM, pStream->ownerId,
×
471
                                    pStream->pCreate->streamDB, pStream->pCreate->name))) {
×
472
        sdbRelease(pSdb, pStream);
×
473
        continue;
×
474
      }
475
    }
476

477
    code = mstSetStreamAttrResBlock(pMnode, pStream, pBlock, numOfRows);
455,279✔
478
    if (code == 0) {
455,279✔
479
      numOfRows++;
455,279✔
480
    }
481
    sdbRelease(pSdb, pStream);
455,279✔
482
  }
483
  code = 0;
210,891✔
484
  pShow->numOfRows += numOfRows;
210,891✔
485
_exit:
210,891✔
486
  mndReleaseUser(pMnode, pOperUser);
210,891✔
487
  if (code != 0) {
210,891✔
488
    mError("failed to retrieve stream list at line %d since %s", lino, tstrerror(code));
×
489
    TAOS_RETURN(code);
×
490
  }
491
  return numOfRows;
210,891✔
492
}
493

494
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
495
  SSdb *pSdb = pMnode->pSdb;
×
496
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
497
}
×
498

499
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
167,314✔
500
  SMnode     *pMnode = pReq->info.node;
167,314✔
501
  SSdb       *pSdb = pMnode->pSdb;
167,314✔
502
  int32_t     numOfRows = 0;
167,314✔
503
  SStreamObj *pStream = NULL;
167,314✔
504
  int32_t     code = 0;
167,314✔
505

506
  while (numOfRows < rowsCapacity) {
2,591,096✔
507
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
2,591,096✔
508
    if (pShow->pIter == NULL) {
2,591,096✔
509
      break;
167,314✔
510
    }
511

512
    code = mstSetStreamTasksResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
2,423,782✔
513

514
    sdbRelease(pSdb, pStream);
2,423,782✔
515
  }
516

517
  pShow->numOfRows += numOfRows;
167,314✔
518
  return numOfRows;
167,314✔
519
}
520

521
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
522
  SSdb *pSdb = pMnode->pSdb;
×
523
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
524
}
×
525

526
static int32_t mndRetrieveStreamRecalculates(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
245✔
527
  SMnode     *pMnode = pReq->info.node;
245✔
528
  SSdb       *pSdb = pMnode->pSdb;
245✔
529
  int32_t     numOfRows = 0;
245✔
530
  SStreamObj *pStream = NULL;
245✔
531
  int32_t     code = 0;
245✔
532

533
  while (numOfRows < rowsCapacity) {
490✔
534
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
490✔
535
    if (pShow->pIter == NULL) {
490✔
536
      break;
245✔
537
    }
538

539
    code = mstSetStreamRecalculatesResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
245✔
540

541
    sdbRelease(pSdb, pStream);
245✔
542
  }
543

544
  pShow->numOfRows += numOfRows;
245✔
545
  return numOfRows;
245✔
546
}
547

548
static void mndCancelGetNextStreamRecalculates(SMnode *pMnode, void *pIter) {
×
549
  SSdb *pSdb = pMnode->pSdb;
×
550
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
551
}
×
552

553

554
static bool mndStreamUpdateTagsFlag(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
2,013,067✔
555
  SStreamObj *pStream = pObj;
2,013,067✔
556
  if (atomic_load_8(&pStream->userDropped)) {
2,013,067✔
557
    return true;
×
558
  }
559

560
  if (TSDB_SUPER_TABLE != pStream->pCreate->triggerTblType && 
2,013,067✔
561
      TSDB_CHILD_TABLE != pStream->pCreate->triggerTblType && 
1,147,363✔
562
      TSDB_VIRTUAL_CHILD_TABLE != pStream->pCreate->triggerTblType) {
840,511✔
563
    return true;
87,757✔
564
  }
565

566
  if (pStream->pCreate->triggerTblSuid != *(uint64_t*)p1) {
1,925,310✔
567
    return true;
1,893,685✔
568
  }
569

570
  if (NULL == pStream->pCreate->partitionCols) {
31,625✔
571
    return true;
4,054✔
572
  }
573

574
  SNodeList* pList = NULL;
27,571✔
575
  int32_t code = nodesStringToList(pStream->pCreate->partitionCols, &pList);
27,571✔
576
  if (code) {
27,571✔
577
    nodesDestroyList(pList);
×
578
    mstError("partitionCols [%s] nodesStringToList failed with error:%s", (char*)pStream->pCreate->partitionCols, tstrerror(code));
×
579
    return true;
×
580
  }
581

582
  SSchema* pTags = (SSchema*)p2;
27,571✔
583
  int32_t* tagNum = (int32_t*)p3;
27,571✔
584

585
  SNode* pNode = NULL;
27,571✔
586
  FOREACH(pNode, pList) {
61,801✔
587
    SColumnNode* pCol = (SColumnNode*)pNode;
34,230✔
588
    for (int32_t i = 0; i < *tagNum; ++i) {
66,657✔
589
      if (pCol->colId == pTags[i].colId) {
59,857✔
590
        pTags[i].flags |= COL_REF_BY_STM;
27,430✔
591
        break;
27,430✔
592
      }
593
    }
594
  }
595

596
  nodesDestroyList(pList);
27,571✔
597
  
598
  return true;
27,571✔
599
}
600

601

602
void mndStreamUpdateTagsRefFlag(SMnode *pMnode, int64_t suid, SSchema* pTags, int32_t tagNum) {
14,030,248✔
603
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
14,030,248✔
604
  if (streamNum <= 0) {
14,030,248✔
605
    return;
13,918,111✔
606
  }
607

608
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mndStreamUpdateTagsFlag, &suid, pTags, &tagNum);
112,137✔
609
}
610

611
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) {
1,791✔
612
  SMnode     *pMnode = pReq->info.node;
1,791✔
613
  SStreamObj *pStream = NULL;
1,791✔
614
  SUserObj   *pOperUser = NULL;
1,791✔
615
  int32_t     code = 0;
1,791✔
616

617
  SMPauseStreamReq pauseReq = {0};
1,791✔
618
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
1,791✔
619
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
620
  }
621

622
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
1,791✔
623
  if (pStream == NULL || code != 0) {
1,791✔
624
    if (pauseReq.igNotExists) {
×
625
      mInfo("stream:%s, not exist, not stop stream", pauseReq.name);
×
626
      taosMemoryFree(pauseReq.name);
×
627
      return 0;
×
628
    } else {
629
      mError("stream:%s not exist, failed to stop stream", pauseReq.name);
×
630
      taosMemoryFree(pauseReq.name);
×
631
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
632
    }
633
  }
634

635
  taosMemoryFree(pauseReq.name);
1,791✔
636

637
  int64_t streamId = pStream->pCreate->streamId;
1,791✔
638
  
639
  mstsInfo("start to stop stream %s", pStream->name);
1,791✔
640

641
  // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
642
  if((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) {
1,791✔
643
    mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
644
    sdbRelease(pMnode->pSdb, pStream);
×
645
    return code;
×
646
  }
647

648
  if ((code = mndCheckDbPrivilegeByNameRecF(pMnode, pOperUser, PRIV_DB_USE, PRIV_OBJ_DB, pStream->pCreate->streamDB,
1,791✔
649
                                            NULL)) ||
1,791✔
650
      (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_STOP, PRIV_OBJ_STREAM, pStream->ownerId,
1,791✔
651
                                       pStream->pCreate->streamDB, pStream->pCreate->name))) {
1,791✔
652
    mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
653
    mndReleaseUser(pMnode, pOperUser);
×
654
    sdbRelease(pMnode->pSdb, pStream);
×
655
    return code;
×
656
  }
657

658
  mndReleaseUser(pMnode, pOperUser); // release user after privilege check
1,791✔
659

660
  if (atomic_load_8(&pStream->userDropped)) {
1,791✔
661
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
662
    mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
663
    sdbRelease(pMnode->pSdb, pStream);
×
664
    return code;
×
665
  }
666

667
  STrans *pTrans = NULL;
1,791✔
668
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, &pTrans);
1,791✔
669
  if (pTrans == NULL || code) {
1,791✔
670
    mstsError("failed to stop stream %s since %s", pStream->name, tstrerror(code));
×
671
    sdbRelease(pMnode->pSdb, pStream);
×
672
    return code;
×
673
  }
674

675
  pStream->updateTime = taosGetTimestampMs();
3,582✔
676

677
  atomic_store_8(&pStream->userStopped, 1);
1,791✔
678

679
  MND_STREAM_SET_LAST_TS(STM_EVENT_STOP_STREAM, pStream->updateTime);
1,791✔
680

681
  msmUndeployStream(pMnode, streamId, pStream->name);
1,791✔
682

683
  // stop stream
684
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
1,791✔
685
  if (code != TSDB_CODE_SUCCESS) {
1,791✔
686
    sdbRelease(pMnode->pSdb, pStream);
×
687
    mndTransDrop(pTrans);
×
688
    return code;
×
689
  }
690

691
  code = mndTransPrepare(pMnode, pTrans);
1,791✔
692
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,791✔
693
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
×
694
    sdbRelease(pMnode->pSdb, pStream);
×
695
    mndTransDrop(pTrans);
×
696
    return code;
×
697
  }
698

699
  sdbRelease(pMnode->pSdb, pStream);
1,791✔
700
  mndTransDrop(pTrans);
1,791✔
701

702
  return TSDB_CODE_ACTION_IN_PROGRESS;
1,791✔
703
}
704

705

706
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq) {
1,791✔
707
  SMnode     *pMnode = pReq->info.node;
1,791✔
708
  SStreamObj *pStream = NULL;
1,791✔
709
  SUserObj   *pOperUser = NULL;
1,791✔
710
  int32_t     code = 0;
1,791✔
711

712
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
1,791✔
713
    return code;
×
714
  }
715

716
  SMResumeStreamReq resumeReq = {0};
1,791✔
717
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
1,791✔
718
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
719
  }
720

721
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
1,791✔
722
  if (pStream == NULL || code != 0) {
1,791✔
723
    if (resumeReq.igNotExists) {
×
724
      mInfo("stream:%s not exist, not start stream", resumeReq.name);
×
725
      taosMemoryFree(resumeReq.name);
×
726
      sdbRelease(pMnode->pSdb, pStream);
×
727
      return 0;
×
728
    } else {
729
      mError("stream:%s not exist, failed to start stream", resumeReq.name);
×
730
      taosMemoryFree(resumeReq.name);
×
731
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
732
    }
733
  }
734

735
  taosMemoryFree(resumeReq.name);
1,791✔
736

737
  int64_t streamId = pStream->pCreate->streamId;
1,791✔
738

739
  mstsInfo("start to start stream %s from stopped", pStream->name);
1,791✔
740

741
  // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
742
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) {
1,791✔
743
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
744
    sdbRelease(pMnode->pSdb, pStream);
×
745
    return code;
×
746
  }
747

748
  if ((code = mndCheckDbPrivilegeByNameRecF(pMnode, pOperUser, PRIV_DB_USE, PRIV_OBJ_DB, pStream->pCreate->streamDB,
1,791✔
749
                                            NULL)) ||
1,791✔
750
      (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_START, PRIV_OBJ_STREAM, pStream->ownerId,
1,791✔
751
                                       pStream->pCreate->streamDB, pStream->pCreate->name))) {
1,791✔
752
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
753
    mndReleaseUser(pMnode, pOperUser);
×
754
    sdbRelease(pMnode->pSdb, pStream);
×
755
    return code;
×
756
  }
757

758
  mndReleaseUser(pMnode, pOperUser); // release user after privilege check
1,791✔
759

760
  if (atomic_load_8(&pStream->userDropped)) {
1,791✔
761
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
762
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
763
    sdbRelease(pMnode->pSdb, pStream);
×
764
    return code;
×
765
  }
766

767
  if (0 == atomic_load_8(&pStream->userStopped)) {
1,791✔
768
    code = TSDB_CODE_MND_STREAM_NOT_STOPPED;
×
769
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
770
    sdbRelease(pMnode->pSdb, pStream);
×
771
    return code;
×
772
  }
773
  
774
  atomic_store_8(&pStream->userStopped, 0);
1,791✔
775

776
  pStream->updateTime = taosGetTimestampMs();
3,582✔
777

778
  MND_STREAM_SET_LAST_TS(STM_EVENT_START_STREAM, pStream->updateTime);
1,791✔
779

780
  STrans *pTrans = NULL;
1,791✔
781
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_START_NAME, &pTrans);
1,791✔
782
  if (pTrans == NULL || code) {
1,791✔
783
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
784
    sdbRelease(pMnode->pSdb, pStream);
×
785
    return code;
×
786
  }
787

788
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
1,791✔
789
  if (code != TSDB_CODE_SUCCESS) {
1,791✔
790
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
791
    sdbRelease(pMnode->pSdb, pStream);
×
792
    mndTransDrop(pTrans);
×
793
    return code;
×
794
  }
795

796
  code = mndTransPrepare(pMnode, pTrans);
1,791✔
797
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,791✔
798
    mstsError("trans:%d, failed to prepare start stream %s trans since %s", pTrans->id, pStream->name, tstrerror(code));
×
799
    sdbRelease(pMnode->pSdb, pStream);
×
800
    mndTransDrop(pTrans);
×
801
    return code;
×
802
  }
803

804
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->name, NULL, true, STREAM_ACT_DEPLOY);
1,791✔
805

806
  sdbRelease(pMnode->pSdb, pStream);
1,791✔
807
  mndTransDrop(pTrans);
1,791✔
808

809
  return TSDB_CODE_ACTION_IN_PROGRESS;
1,791✔
810
}
811

812
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
6,225✔
813
  SMnode     *pMnode = pReq->info.node;
6,225✔
814
  SStreamObj *pStream = NULL;
6,225✔
815
  SUserObj   *pOperUser = NULL;
6,225✔
816
  int32_t     code = 0;
6,225✔
817
  int32_t     notExistNum = 0;
6,225✔
818

819
  SMDropStreamReq dropReq = {0};
6,225✔
820
  int64_t         tss = taosGetTimestampMs();
6,225✔
821
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
6,225✔
822
    mError("invalid drop stream msg recv, discarded");
×
823
    code = TSDB_CODE_INVALID_MSG;
×
824
    TAOS_RETURN(code);
×
825
  }
826

827
  mDebug("recv drop stream msg, count:%d", dropReq.count);
6,225✔
828

829
  // Acquire user object for privilege check
830
  code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser);
6,225✔
831
  if (code != 0) {
6,225✔
NEW
832
    tFreeMDropStreamReq(&dropReq);
×
NEW
833
    TAOS_RETURN(code);
×
834
  }
835

836
  // check if all streams exist
837
  if (!dropReq.igNotExists) {
6,225✔
838
    for (int32_t i = 0; i < dropReq.count; i++) {
9,366✔
839
      if (!sdbCheckExists(pMnode->pSdb, SDB_STREAM, dropReq.name[i])) {
5,278✔
840
        mError("stream:%s not exist failed to drop it", dropReq.name[i]);
952✔
841
        mndReleaseUser(pMnode, pOperUser);
952✔
842
        tFreeMDropStreamReq(&dropReq);
952✔
843
        TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
952✔
844
      }
845
    }
846
  }
847

848
  // Create a single transaction for all stream drops
849
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_DROP_NAME);
5,273✔
850
  if (pTrans == NULL) {
5,273✔
NEW
851
    mError("failed to create drop stream transaction since %s", tstrerror(terrno));
×
NEW
852
    code = terrno;
×
NEW
853
    mndReleaseUser(pMnode, pOperUser);
×
NEW
854
    tFreeMDropStreamReq(&dropReq);
×
NEW
855
    TAOS_RETURN(code);
×
856
  }
857
  pTrans->ableToBeKilled = true;
5,273✔
858

859
  // Process all streams and add them to the transaction
860
  for (int32_t i = 0; i < dropReq.count; i++) {
11,974✔
861
    char *streamName = dropReq.name[i];
6,701✔
862
    mDebug("drop stream[%d/%d]: %s", i + 1, dropReq.count, streamName);
6,701✔
863

864
    code = mndAcquireStream(pMnode, streamName, &pStream);
6,701✔
865
    if (pStream == NULL || code != 0) {
6,701✔
866
      mWarn("stream:%s not exist, ignore not exist is set, drop stream exec done with success", streamName);
952✔
867
      sdbRelease(pMnode->pSdb, pStream);
952✔
868
      pStream = NULL;
952✔
869
      notExistNum++;
952✔
870
      continue;
952✔
871
    }
872

873
    int64_t streamId = pStream->pCreate->streamId;
5,749✔
874

875
    if ((code = mndCheckDbPrivilegeByNameRecF(pMnode, pOperUser, PRIV_DB_USE, PRIV_OBJ_DB, pStream->pCreate->streamDB,
5,749✔
876
                                              NULL)) ||
5,749✔
877
        (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_DROP, PRIV_OBJ_STREAM, pStream->ownerId,
5,749✔
878
                                         pStream->pCreate->streamDB, pStream->pCreate->name))) {
5,749✔
NEW
879
      mstsError("user %s failed to drop stream %s since %s", pReq->info.conn.user, streamName, tstrerror(code));
×
NEW
880
      sdbRelease(pMnode->pSdb, pStream);
×
NEW
881
      pStream = NULL;
×
NEW
882
      mndTransDrop(pTrans);
×
NEW
883
      pTrans = NULL;
×
NEW
884
      goto _OVER;
×
885
    }
886

887
    if (pStream->pCreate->tsmaId != 0) {
5,749✔
NEW
888
      mstsDebug("try to drop tsma related stream, tsmaId:%" PRIx64, pStream->pCreate->tsmaId);
×
889

NEW
890
      void    *pIter = NULL;
×
NEW
891
      SSmaObj *pSma = NULL;
×
UNCOV
892
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
NEW
893
      while (pIter) {
×
NEW
894
        if (pSma && pSma->uid == pStream->pCreate->tsmaId) {
×
NEW
895
          sdbRelease(pMnode->pSdb, pSma);
×
NEW
896
          sdbRelease(pMnode->pSdb, pStream);
×
NEW
897
          pStream = NULL;
×
898

NEW
899
          sdbCancelFetch(pMnode->pSdb, pIter);
×
NEW
900
          code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
901

NEW
902
          mstsError("refused to drop tsma-related stream %s since tsma still exists", streamName);
×
NEW
903
          mndTransDrop(pTrans);
×
NEW
904
          pTrans = NULL;
×
NEW
905
          goto _OVER;
×
906
        }
907

NEW
908
        if (pSma) {
×
NEW
909
          sdbRelease(pMnode->pSdb, pSma);
×
910
        }
911

NEW
912
        pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
913
      }
914
    }
915

916
    mstsInfo("start to drop stream %s", pStream->pCreate->name);
5,749✔
917

918
    pStream->updateTime = taosGetTimestampMs();
11,498✔
919

920
    atomic_store_8(&pStream->userDropped, 1);
5,749✔
921

922
    MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
5,749✔
923

924
    msmUndeployStream(pMnode, streamId, pStream->pCreate->name);
5,749✔
925

926
    // Append drop stream operation to the transaction
927
    code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
5,749✔
928
    if (code) {
5,749✔
NEW
929
      mstsError("trans:%d, failed to append drop stream %s trans since %s", pTrans->id, streamName, tstrerror(code));
×
NEW
930
      sdbRelease(pMnode->pSdb, pStream);
×
NEW
931
      pStream = NULL;
×
932
      // mndStreamTransAppend already called mndTransDrop on failure, set pTrans to NULL to avoid double free
NEW
933
      pTrans = NULL;
×
NEW
934
      goto _OVER;
×
935
    }
936

937
    sdbRelease(pMnode->pSdb, pStream);
5,749✔
938
    pStream = NULL;
5,749✔
939

940
    mstsDebug("drop stream %s added to transaction", streamName);
5,749✔
941
  }
942

943
  // Prepare and execute the transaction for all streams
944
  if (notExistNum < dropReq.count) {
5,273✔
945
    code = mndTransPrepare(pMnode, pTrans);
5,035✔
946
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
5,035✔
NEW
947
      mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
NEW
948
      mndTransDrop(pTrans);
×
NEW
949
      goto _OVER;
×
950
    }
951
    mInfo("trans:%d, drop stream transaction prepared for %d streams", pTrans->id, dropReq.count - notExistNum);
5,035✔
952
  } else {
953
    // All streams don't exist, no need to prepare transaction
954
    mndTransDrop(pTrans);
238✔
955
    pTrans = NULL;
238✔
956
  }
957

958
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE && notExistNum < dropReq.count) {
5,273✔
959
    int64_t tse = taosGetTimestampMs();
5,035✔
960
    double  duration = (double)(tse - tss);
5,035✔
961
    duration = duration / 1000;
5,035✔
962
    // Use first stream's database for audit (assuming all streams are from same db in batch)
963
    if (dropReq.count > 0) {
5,035✔
964
      SStreamObj *pFirstStream = NULL;
5,035✔
965
      if (mndAcquireStream(pMnode, dropReq.name[0], &pFirstStream) == 0 && pFirstStream != NULL) {
5,035✔
NEW
966
        auditRecord(pReq, pMnode->clusterId, "dropStream", "", pFirstStream->pCreate->streamDB, NULL, 0, duration, 0);
×
NEW
967
        sdbRelease(pMnode->pSdb, pFirstStream);
×
968
      }
969
    }
970
  }
971

972
  // If any stream was successfully added to transaction, return ACTION_IN_PROGRESS
973
  // Otherwise, all streams don't exist (and igNotExists is set), return SUCCESS
974
  code = (notExistNum < dropReq.count) ? TSDB_CODE_ACTION_IN_PROGRESS : TSDB_CODE_SUCCESS;
5,273✔
975

976
_OVER:
5,273✔
977
  if (pStream) {
5,273✔
NEW
978
    sdbRelease(pMnode->pSdb, pStream);
×
979
  }
980
  if (pTrans) {
5,273✔
981
    mndTransDrop(pTrans);
5,035✔
982
  }
983
  mndReleaseUser(pMnode, pOperUser);
5,273✔
984
  tFreeMDropStreamReq(&dropReq);
5,273✔
985
  TAOS_RETURN(code);
5,273✔
986
}
987

988
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
145,326✔
989
  SMnode     *pMnode = pReq->info.node;
145,326✔
990
  SStreamObj *pStream = NULL;
145,326✔
991
  SStreamObj  streamObj = {0};
145,326✔
992
  SUserObj    *pOperUser = NULL;
145,326✔
993
  int32_t     code = TSDB_CODE_SUCCESS;
145,326✔
994
  int32_t     lino = 0;
145,326✔
995
  STrans     *pTrans = NULL;
145,326✔
996
  uint64_t    streamId = 0;
145,326✔
997
  SCMCreateStreamReq* pCreate = NULL;
145,326✔
998
  int64_t             tss = taosGetTimestampMs();
145,326✔
999

1000
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
145,326✔
UNCOV
1001
    goto _OVER;
×
1002
  }
1003
  
1004
#ifdef WINDOWS
1005
  code = TSDB_CODE_MND_INVALID_PLATFORM;
1006
  goto _OVER;
1007
#endif
1008

1009
  pCreate = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
145,326✔
1010
  TSDB_CHECK_NULL(pCreate, code, lino, _OVER, terrno);
145,326✔
1011
  
1012
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, pCreate);
145,326✔
1013
  TSDB_CHECK_CODE(code, lino, _OVER);
145,326✔
1014

1015
  streamId = pCreate->streamId;
145,326✔
1016

1017
  mstsInfo("start to create stream %s, sql:%s", pCreate->name, pCreate->sql);
145,326✔
1018

1019
  int32_t snodeId = msmAssignRandomSnodeId(pMnode, streamId);
145,326✔
1020
  if (!GOT_SNODE(snodeId)) {
145,326✔
1021
    code = terrno;
2,390✔
1022
    TSDB_CHECK_CODE(code, lino, _OVER);
2,390✔
1023
  }
1024
  
1025
  code = mndAcquireStream(pMnode, pCreate->name, &pStream);
142,936✔
1026
  if (pStream != NULL && code == 0) {
142,936✔
1027
    if (pCreate->igExists) {
4,760✔
1028
      mstsInfo("stream %s already exist, ignore exist is set", pCreate->name);
×
1029
    } else {
1030
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
4,760✔
1031
    }
1032

1033
    mndReleaseStream(pMnode, pStream);
4,760✔
1034
    goto _OVER;
4,760✔
1035
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
138,176✔
UNCOV
1036
    goto _OVER;
×
1037
  }
1038

1039
  code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser);
138,176✔
1040
  if (pOperUser == NULL) {
138,176✔
UNCOV
1041
    TSDB_CHECK_CODE(TSDB_CODE_MND_NO_USER_FROM_CONN, lino, _OVER);
×
1042
  }
1043

1044
  code = mndStreamValidateCreate(pMnode, pOperUser, pCreate);
138,176✔
1045
  TSDB_CHECK_CODE(code, lino, _OVER);
138,176✔
1046

1047
  mndStreamBuildObj(pMnode, &streamObj, pCreate, pOperUser, snodeId);
138,176✔
1048
  pCreate = NULL;
138,176✔
1049

1050
  pStream = &streamObj;
138,176✔
1051

1052
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, &pTrans);
138,176✔
1053
  if (pTrans == NULL || code) {
138,176✔
UNCOV
1054
    goto _OVER;
×
1055
  }
1056

1057
  // create stb for stream
1058
  if (TSDB_SUPER_TABLE == pStream->pCreate->outTblType && !pStream->pCreate->outStbExists) {
138,176✔
1059
    pStream->pCreate->outStbUid = mndGenerateUid(pStream->pCreate->outTblName, strlen(pStream->pCreate->outTblName));
56,609✔
1060
    code = mndStreamCreateOutStb(pMnode, pTrans, pStream->pCreate, RPC_MSG_USER(pReq));
56,609✔
1061
    TSDB_CHECK_CODE(code, lino, _OVER);
56,609✔
1062
  }
1063

1064
  // add stream to trans
1065
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
138,176✔
1066
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
138,176✔
UNCOV
1067
    mstsError("failed to persist stream %s since %s", pStream->pCreate->name, tstrerror(code));
×
UNCOV
1068
    goto _OVER;
×
1069
  }
1070

1071
  // execute creation
1072
  code = mndTransPrepare(pMnode, pTrans);
138,176✔
1073
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
138,176✔
UNCOV
1074
    mstsError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
UNCOV
1075
    goto _OVER;
×
1076
  }
1077
  code = TSDB_CODE_ACTION_IN_PROGRESS;
138,176✔
1078

1079
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
138,176✔
1080
    int64_t tse = taosGetTimestampMs();
138,176✔
1081
    double  duration = (double)(tse - tss);
138,176✔
1082
    duration = duration / 1000;
138,176✔
1083
    auditRecord(pReq, pMnode->clusterId, "createStream", pStream->pCreate->streamDB, pStream->pCreate->name,
138,176✔
1084
                pStream->pCreate->sql, strlen(pStream->pCreate->sql), duration, 0);
138,176✔
1085
  }
1086

1087
  MND_STREAM_SET_LAST_TS(STM_EVENT_CREATE_STREAM, taosGetTimestampMs());
251,502✔
1088

1089
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, true, STREAM_ACT_DEPLOY);
138,176✔
1090

1091
_OVER:
145,326✔
1092

1093
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
145,326✔
1094
    if (pStream && pStream->pCreate) {
7,150✔
1095
      mstsError("failed to create stream %s at line:%d since %s", pStream->pCreate->name, lino, tstrerror(code));
4,760✔
1096
    } else {
1097
      mstsError("failed to create stream at line:%d since %s", lino, tstrerror(code));
2,390✔
1098
    }
1099
  } else {
1100
    mstsDebug("create stream %s half completed", pStream->pCreate ? pStream->pCreate->name : "unknown");
138,176✔
1101
  }
1102

1103
  tFreeSCMCreateStreamReq(pCreate);
145,326✔
1104
  taosMemoryFreeClear(pCreate);
145,326✔
1105

1106
  mndTransDrop(pTrans);
145,326✔
1107
  tFreeStreamObj(&streamObj);
145,326✔
1108
  mndReleaseUser(pMnode, pOperUser);
145,326✔
1109

1110
  return code;
145,326✔
1111
}
1112

1113
static int32_t mndProcessRecalcStreamReq(SRpcMsg *pReq) {
10,647✔
1114
  SMnode     *pMnode = pReq->info.node;
10,647✔
1115
  SStreamObj *pStream = NULL;
10,647✔
1116
  SUserObj   *pOperUser = NULL;
10,647✔
1117
  int32_t     code = 0;
10,647✔
1118
  int64_t     tss = taosGetTimestampMs();
10,647✔
1119

1120
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
10,647✔
UNCOV
1121
    return code;
×
1122
  }
1123

1124
  SMRecalcStreamReq recalcReq = {0};
10,647✔
1125
  if (tDeserializeSMRecalcStreamReq(pReq->pCont, pReq->contLen, &recalcReq) < 0) {
10,647✔
UNCOV
1126
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1127
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1128
  }
1129

1130
  code = mndAcquireStream(pMnode, recalcReq.name, &pStream);
10,647✔
1131
  if (pStream == NULL || code != 0) {
10,647✔
UNCOV
1132
    mError("stream:%s not exist, failed to recalc stream", recalcReq.name);
×
UNCOV
1133
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1134
    TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1135
  }
1136

1137
  int64_t streamId = pStream->pCreate->streamId;
10,647✔
1138
  
1139
  mstsInfo("start to recalc stream %s", recalcReq.name);
10,647✔
1140

1141
  // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
1142
  // if (code != TSDB_CODE_SUCCESS) {
1143
  //   mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code));
1144
  //   sdbRelease(pMnode->pSdb, pStream);
1145
  //   tFreeMRecalcStreamReq(&recalcReq);
1146
  //   return code;
1147
  // }
1148

1149
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) {
10,647✔
UNCOV
1150
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
1151
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1152
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1153
    TAOS_RETURN(code);
×
1154
  }
1155

1156
  if ((code = mndCheckDbPrivilegeByNameRecF(pMnode, pOperUser, PRIV_DB_USE, PRIV_OBJ_DB, pStream->pCreate->streamDB,
10,647✔
1157
                                            NULL)) ||
3,387✔
1158
      (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_START, PRIV_OBJ_STREAM, pStream->ownerId,
3,387✔
1159
                                       pStream->pCreate->streamDB, pStream->pCreate->name))) {
3,387✔
1160
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
7,260✔
1161
    mndReleaseUser(pMnode, pOperUser);
7,260✔
1162
    sdbRelease(pMnode->pSdb, pStream);
7,260✔
1163
    tFreeMRecalcStreamReq(&recalcReq);
7,260✔
1164
    return code;
7,260✔
1165
  }
1166

1167
  mndReleaseUser(pMnode, pOperUser); // release user after privilege check
3,387✔
1168

1169
  if (atomic_load_8(&pStream->userDropped)) {
3,387✔
UNCOV
1170
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
UNCOV
1171
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code));
×
UNCOV
1172
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1173
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1174
    return code;
×
1175
  }
1176

1177
  if (atomic_load_8(&pStream->userStopped)) {
3,387✔
UNCOV
1178
    code = TSDB_CODE_MND_STREAM_STOPPED;
×
UNCOV
1179
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code));
×
UNCOV
1180
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1181
    tFreeMRecalcStreamReq(&recalcReq);
×
1182
    return code;
×
1183
  }
1184

1185
  if (WINDOW_TYPE_PERIOD == pStream->pCreate->triggerType) {
3,387✔
1186
    code = TSDB_CODE_OPS_NOT_SUPPORT;
191✔
1187
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
191✔
1188
    sdbRelease(pMnode->pSdb, pStream);
191✔
1189
    tFreeMRecalcStreamReq(&recalcReq);
191✔
1190
    return code;
191✔
1191
  }
1192

1193
  /*
1194
  pStream->updateTime = taosGetTimestampMs();
1195

1196
  STrans *pTrans = NULL;
1197
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RECALC_NAME, &pTrans);
1198
  if (pTrans == NULL || code) {
1199
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
1200
    sdbRelease(pMnode->pSdb, pStream);
1201
    return code;
1202
  }
1203

1204
  // stop stream
1205
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
1206
  if (code != TSDB_CODE_SUCCESS) {
1207
    sdbRelease(pMnode->pSdb, pStream);
1208
    mndTransDrop(pTrans);
1209
    return code;
1210
  }
1211

1212
  code = mndTransPrepare(pMnode, pTrans);
1213
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1214
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
1215
    sdbRelease(pMnode->pSdb, pStream);
1216
    mndTransDrop(pTrans);
1217
    return code;
1218
  }
1219
*/
1220

1221
  code = msmRecalcStream(pMnode, pStream->pCreate->streamId, &recalcReq.timeRange);
3,196✔
1222
  if (code != TSDB_CODE_SUCCESS) {
3,196✔
UNCOV
1223
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1224
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1225
    return code;
×
1226
  }
1227

1228
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE){
3,196✔
1229
    char buf[128];
3,196✔
1230
    snprintf(buf, sizeof(buf), "start:%" PRId64 ", end:%" PRId64, recalcReq.timeRange.skey, recalcReq.timeRange.ekey);
3,196✔
1231
    int64_t tse = taosGetTimestampMs();
3,196✔
1232
    double  duration = (double)(tse - tss);
3,196✔
1233
    duration = duration / 1000;
3,196✔
1234
    auditRecord(pReq, pMnode->clusterId, "recalcStream", pStream->name, recalcReq.name, buf, strlen(buf), duration, 0);
3,196✔
1235
  }  
1236

1237
  sdbRelease(pMnode->pSdb, pStream);
3,196✔
1238
  tFreeMRecalcStreamReq(&recalcReq);
3,196✔
1239
//  mndTransDrop(pTrans);
1240

1241
  return TSDB_CODE_SUCCESS;
3,196✔
1242
}
1243

1244

1245
int32_t mndInitStream(SMnode *pMnode) {
397,982✔
1246
  SSdbTable table = {
397,982✔
1247
      .sdbType = SDB_STREAM,
1248
      .keyType = SDB_KEY_BINARY,
1249
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
1250
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
1251
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
1252
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
1253
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
1254
  };
1255

1256
  if (!tsDisableStream) {
397,982✔
1257
    mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
397,982✔
1258
    mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
397,982✔
1259
    mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessStartStreamReq);
397,982✔
1260
    mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessStopStreamReq);
397,982✔
1261
    mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);  
397,982✔
1262
    mndSetMsgHandle(pMnode, TDMT_MND_RECALC_STREAM, mndProcessRecalcStreamReq);
397,982✔
1263
  }
1264
  
1265
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
397,982✔
1266
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
397,982✔
1267
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
397,982✔
1268
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
397,982✔
1269
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndRetrieveStreamRecalculates);
397,982✔
1270
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndCancelGetNextStreamRecalculates);
397,982✔
1271

1272
  int32_t code = sdbSetTable(pMnode->pSdb, table);
397,982✔
1273
  if (code) {
397,982✔
UNCOV
1274
    return code;
×
1275
  }
1276

1277
  //code = sdbSetTable(pMnode->pSdb, tableSeq);
1278
  return code;
397,982✔
1279
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc