• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5013

03 Apr 2026 03:59PM UTC coverage: 72.317% (+0.01%) from 72.305%
#5013

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

13131 existing lines in 160 files now uncovered.

257489 of 356056 relevant lines covered (72.32%)

129893134.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.24
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndTrans.h"
23
#include "mndUser.h"
24
#include "mndVgroup.h"
25
#include "osMemory.h"
26
#include "parser.h"
27
#include "taoserror.h"
28
#include "tmisce.h"
29
#include "tname.h"
30

31
#define MND_STREAM_MAX_NUM 100000
32

33
typedef struct {
34
  int8_t placeHolder;  // // to fix windows compile error, define place holder
35
} SMStreamNodeCheckMsg;
36

37
static int32_t  mndNodeCheckSentinel = 0;
38
SStmRuntime  mStreamMgmt = {0};
39

40
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
42
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
43
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
44

45
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
46
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
47

48
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
50
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
51
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
52
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq);
53
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq);
54

55
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
56

57
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
58
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
59
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
60
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
61
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
62
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
63

64
void mndCleanupStream(SMnode *pMnode) {
457,161✔
65
  mDebug("try to clean up stream");
457,161✔
66
  
67
  msmHandleBecomeNotLeader(pMnode);
457,161✔
68
  
69
  mDebug("mnd stream runtime info cleanup");
457,161✔
70
}
457,161✔
71

72
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
237,810✔
73
  int32_t     code = 0;
237,810✔
74
  int32_t     lino = 0;
237,810✔
75
  SSdbRow    *pRow = NULL;
237,810✔
76
  SStreamObj *pStream = NULL;
237,810✔
77
  void       *buf = NULL;
237,810✔
78
  int8_t      sver = 0;
237,810✔
79
  int32_t     tlen;
237,428✔
80
  int32_t     dataPos = 0;
237,810✔
81

82
  code = sdbGetRawSoftVer(pRaw, &sver);
237,810✔
83
  TSDB_CHECK_CODE(code, lino, _over);
237,810✔
84

85
  if (sver != MND_STREAM_VER_NUMBER && sver != MND_STREAM_COMPATIBLE_VER_NUMBER) {
237,810✔
UNCOV
86
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
87
    goto _over;
×
88
  }
89

90
  pRow = sdbAllocRow(sizeof(SStreamObj));
237,810✔
91
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
237,810✔
92

93
  pStream = sdbGetRowObj(pRow);
237,810✔
94
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
237,810✔
95

96
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
237,810✔
97

98
  buf = taosMemoryMalloc(tlen + 1);
237,810✔
99
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
237,810✔
100

101
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
237,810✔
102

103
  SDecoder decoder;
237,428✔
104
  tDecoderInit(&decoder, buf, tlen + 1);
237,810✔
105
  code = tDecodeSStreamObj(&decoder, pStream, sver);
237,810✔
106
  tDecoderClear(&decoder);
237,810✔
107

108
  if (code < 0) {
237,810✔
UNCOV
109
    tFreeStreamObj(pStream);
×
110
  }
111

112
_over:
237,810✔
113
  taosMemoryFreeClear(buf);
237,810✔
114

115
  if (code != TSDB_CODE_SUCCESS) {
237,810✔
UNCOV
116
    char *p = (pStream == NULL || NULL == pStream->pCreate) ? "null" : pStream->pCreate->name;
×
117
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
118
    taosMemoryFreeClear(pRow);
×
119

UNCOV
120
    terrno = code;
×
121
    return NULL;
×
122
  } else {
123
    mTrace("stream:%s, decode from raw:%p, row:%p", pStream->pCreate->name, pRaw, pStream);
237,810✔
124

125
    terrno = 0;
237,810✔
126
    return pRow;
237,810✔
127
  }
128
}
129

130
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
198,203✔
131
  mTrace("stream:%s, perform insert action", pStream->pCreate->name);
198,203✔
132
  return 0;
198,203✔
133
}
134

135
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
237,810✔
136
  mInfo("stream:%s, perform delete action", pStream->pCreate->name);
237,810✔
137
  tFreeStreamObj(pStream);
237,810✔
138
  return 0;
237,810✔
139
}
140

141
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
5,733✔
142
  mTrace("stream:%s, perform update action", pOldStream->pCreate->name);
5,733✔
143

144
  atomic_store_32(&pOldStream->mainSnodeId, pNewStream->mainSnodeId);
5,733✔
145
  atomic_store_8(&pOldStream->userStopped, atomic_load_8(&pNewStream->userStopped));
5,733✔
146
  pOldStream->ownerId = pNewStream->ownerId;
5,733✔
147
  pOldStream->updateTime = pNewStream->updateTime;
5,733✔
148
  
149
  return 0;
5,733✔
150
}
151

152
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
487,855✔
153
  int32_t code = 0;
487,855✔
154
  SSdb   *pSdb = pMnode->pSdb;
487,855✔
155
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
487,855✔
156
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
487,855✔
157
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
220,875✔
158
  }
159
  return code;
487,855✔
160
}
161

UNCOV
162
static bool mndStreamGetNameFromId(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
163
  SStreamObj* pStream = pObj;
×
164

UNCOV
165
  if (pStream->pCreate->streamId == *(int64_t*)p1) {
×
166
    tstrncpy((char *)p2, pStream->name, TSDB_STREAM_NAME_LEN);
×
167
    return false;
×
168
  }
169

UNCOV
170
  return true;
×
171
}
172

UNCOV
173
int32_t mndAcquireStreamById(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
×
174
  int32_t code = 0;
×
175
  SSdb   *pSdb = pMnode->pSdb;
×
176
  char streamName[TSDB_STREAM_NAME_LEN];
×
177
  streamName[0] = 0;
×
178
  
UNCOV
179
  sdbTraverse(pSdb, SDB_STREAM, mndStreamGetNameFromId, &streamId, streamName, NULL);
×
180
  if (streamName[0]) {
×
181
    (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
182
    if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
183
      code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
184
    }
185
  }
186
  
UNCOV
187
  return code;
×
188
}
189

190
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
237,616✔
191
  SSdb *pSdb = pMnode->pSdb;
237,616✔
192
  sdbRelease(pSdb, pStream);
237,616✔
193
}
237,616✔
194

UNCOV
195
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
196
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
197
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
198
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
199
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
200

201
static void mndStreamBuildObj(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate, SUserObj *pOperUser,
197,380✔
202
                              int32_t snodeId) {
203
  int32_t code = 0;
197,380✔
204

205
  pObj->pCreate = pCreate;
197,380✔
206
  tstrncpy(pObj->name, pCreate->name, sizeof(pObj->name));
197,380✔
207
  (void)snprintf(pObj->createUser, sizeof(pObj->createUser), "%s", pOperUser->name);
197,380✔
208
  pObj->ownerId = pOperUser->uid;
197,380✔
209
  pObj->mainSnodeId = snodeId;
197,380✔
210

211
  pObj->userDropped = 0;
197,380✔
212
  pObj->userStopped = 0;
197,380✔
213

214
  pObj->createTime = taosGetTimestampMs();
197,380✔
215
  pObj->updateTime = pObj->createTime;
197,380✔
216

217
  mstLogSStreamObj("create stream", pObj);
197,380✔
218
}
197,380✔
219

220
static int32_t mndStreamCreateOutStb(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream, const char *user) {
69,174✔
221
  SStbObj *pStb = NULL;
69,174✔
222
  SDbObj  *pDb = NULL;
69,174✔
223
  int32_t  code = 0;
69,174✔
224
  int32_t  lino = 0;
69,174✔
225

226
  SMCreateStbReq createReq = {0};
69,174✔
227
  TAOS_STRNCAT(createReq.name, pStream->outDB, TSDB_DB_FNAME_LEN);
69,174✔
228
  TAOS_STRNCAT(createReq.name, ".", 2);
69,174✔
229
  TAOS_STRNCAT(createReq.name,  pStream->outTblName, TSDB_TABLE_NAME_LEN);
69,174✔
230
  createReq.numOfColumns = taosArrayGetSize(pStream->outCols);
69,174✔
231
  createReq.numOfTags = pStream->outTags ? taosArrayGetSize(pStream->outTags) : 1;
69,174✔
232
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
69,174✔
233
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
69,174✔
234

235
  // build fields
236
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
360,193✔
237
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
291,019✔
238
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
291,019✔
239
    SFieldWithOptions *pSrc = taosArrayGet(pStream->outCols, i);
291,019✔
240

241
    tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
291,019✔
242
    pField->flags = pSrc->flags;
291,019✔
243
    pField->type = pSrc->type;
291,019✔
244
    pField->bytes = pSrc->bytes;
291,019✔
245
    pField->compress = createDefaultColCmprByType(pField->type);
291,019✔
246
    if (IS_DECIMAL_TYPE(pField->type)) {
291,019✔
UNCOV
247
      pField->typeMod = pSrc->typeMod;
×
248
      pField->flags |= COL_HAS_TYPE_MOD;
×
249
    }
250
  }
251

252
  if (NULL == pStream->outTags) {
69,174✔
UNCOV
253
    createReq.numOfTags = 1;
×
254
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
255
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
256

257
    // build tags
UNCOV
258
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
259
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
260

UNCOV
261
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
262
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
263
    pField->flags = 0;
×
264
    pField->bytes = 8;
×
265
  } else {
266
    createReq.numOfTags = taosArrayGetSize(pStream->outTags);
69,174✔
267
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
69,174✔
268
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
69,174✔
269

270
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
184,661✔
271
      SField *pField = taosArrayGet(createReq.pTags, i);
115,487✔
272
      if (pField == NULL) {
115,487✔
UNCOV
273
        continue;
×
274
      }
275

276
      TAOS_FIELD_E *pSrc = taosArrayGet(pStream->outTags, i);
115,487✔
277
      pField->bytes = pSrc->bytes;
115,487✔
278
      pField->flags = 0;
115,487✔
279
      pField->type = pSrc->type;
115,487✔
280
      tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
115,487✔
281
    }
282
  }
283

284
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
69,174✔
UNCOV
285
    goto _OVER;
×
286
  }
287

288
  pStb = mndAcquireStb(pMnode, createReq.name);
69,174✔
289
  if (pStb != NULL) {
69,174✔
UNCOV
290
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
291
    goto _OVER;
×
292
  }
293

294
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
69,174✔
295
  if (pDb == NULL) {
69,174✔
UNCOV
296
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
297
    goto _OVER;
×
298
  }
299

300
  int32_t numOfStbs = -1;
69,174✔
301
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
69,174✔
UNCOV
302
    goto _OVER;
×
303
  }
304

305
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
69,174✔
UNCOV
306
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
307
    goto _OVER;
×
308
  }
309

310
  SStbObj stbObj = {0};
69,174✔
311

312
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
69,174✔
UNCOV
313
    goto _OVER;
×
314
  }
315

316
  stbObj.uid = pStream->outStbUid;
69,174✔
317

318
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
69,174✔
UNCOV
319
    mndFreeStb(&stbObj);
×
320
    goto _OVER;
×
321
  }
322

323
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->outTblName, createReq.numOfColumns);
69,174✔
324

325
  tFreeSMCreateStbReq(&createReq);
69,174✔
326
  mndFreeStb(&stbObj);
69,174✔
327
  mndReleaseStb(pMnode, pStb);
69,174✔
328
  mndReleaseDb(pMnode, pDb);
69,174✔
329
  return code;
69,174✔
330

UNCOV
331
_OVER:
×
332
  tFreeSMCreateStbReq(&createReq);
×
333
  mndReleaseStb(pMnode, pStb);
×
334
  mndReleaseDb(pMnode, pDb);
×
335

UNCOV
336
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->outTblName, lino,
×
337
         tstrerror(code));
UNCOV
338
  return code;
×
339
}
340

341
static int32_t mndStreamCreateOutTable(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream) {
188✔
342
  int32_t  code = 0;
188✔
343
  int32_t  lino = 0;
188✔
344
  SVgObj  *pVgroup = NULL;
188✔
345
  SDbObj  *pDb = NULL;
188✔
346
  SName    name = {0};
188✔
347
  char     dbFName[TSDB_DB_FNAME_LEN] = {0};
188✔
348

349
  // Parse database and table name
350
  if ((code = tNameFromString(&name, pStream->outDB, T_NAME_ACCT | T_NAME_DB)) != 0) {
188✔
NEW
351
    mError("stream:%s failed to parse outDB:%s, code:%s", pStream->name, pStream->outDB, tstrerror(code));
×
NEW
352
    return code;
×
353
  }
354
  if ((code = tNameGetFullDbName(&name, dbFName)) != 0) {
188✔
NEW
355
    mError("stream:%s failed to get full db name, code:%s", pStream->name, tstrerror(code));
×
NEW
356
    return code;
×
357
  }
358

359
  // Get database object
360
  pDb = mndAcquireDb(pMnode, dbFName);
188✔
361
  if (pDb == NULL) {
188✔
NEW
362
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
NEW
363
    mError("stream:%s failed to acquire db:%s, code:%s", pStream->name, dbFName, tstrerror(code));
×
NEW
364
    return code;
×
365
  }
366

367
  // Set transaction db name and check conflict (similar to mndAddStbToTrans)
368
  mndTransSetDbName(pTrans, pDb->name, pStream->outTblName);
188✔
369
  code = mndTransCheckConflict(pMnode, pTrans);
188✔
370
  if (code != TSDB_CODE_SUCCESS) {
188✔
NEW
371
    mError("stream:%s failed to check conflict, code:%s", pStream->name, tstrerror(code));
×
NEW
372
    goto _OVER;
×
373
  }
374

375
  // Get vgroup by vgId
376
  if (pStream->outTblVgId <= 0) {
188✔
NEW
377
    mError("stream:%s invalid outTblVgId:%d", pStream->name, pStream->outTblVgId);
×
NEW
378
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
NEW
379
    goto _OVER;
×
380
  }
381

382
  pVgroup = mndAcquireVgroup(pMnode, pStream->outTblVgId);
188✔
383
  if (pVgroup == NULL) {
188✔
NEW
384
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
NEW
385
    mError("stream:%s failed to acquire vgroup:%d, code:%s", pStream->name, pStream->outTblVgId, tstrerror(code));
×
NEW
386
    goto _OVER;
×
387
  }
388

389
  // Verify vgroup belongs to the database
390
  if (pVgroup->dbUid != pDb->uid) {
188✔
NEW
391
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
NEW
392
    mError("stream:%s vgroup:%d does not belong to db:%s", pStream->name, pStream->outTblVgId, dbFName);
×
NEW
393
    goto _OVER;
×
394
  }
395

396
  // Build SVCreateTbReq (reusing logic from buildNormalTableCreateReq)
397
  SVCreateTbReq createReq = {0};
188✔
398
  createReq.type = TSDB_NORMAL_TABLE;
188✔
399
  createReq.flags = TD_CREATE_NORMAL_TB_IN_STREAM | TD_CREATE_IF_NOT_EXISTS;
188✔
400
  createReq.uid = mndGenerateUid(pStream->outTblName, strlen(pStream->outTblName));
188✔
401
  createReq.btime = taosGetTimestampMs();
188✔
402
  createReq.ttl = TSDB_DEFAULT_TABLE_TTL;
188✔
403
  createReq.commentLen = -1;
188✔
404
  createReq.name = taosStrdup(pStream->outTblName);
188✔
405
  if (createReq.name == NULL) {
188✔
NEW
406
    code = terrno;
×
NEW
407
    goto _OVER;
×
408
  }
409

410
  // Build schema from outCols (same logic as buildNormalTableCreateReq)
411
  int32_t numOfCols = taosArrayGetSize(pStream->outCols);
188✔
412
  createReq.ntb.schemaRow.nCols = numOfCols;
188✔
413
  createReq.ntb.schemaRow.version = 1;
188✔
414
  createReq.ntb.schemaRow.pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema));
188✔
415
  if (createReq.ntb.schemaRow.pSchema == NULL) {
188✔
NEW
416
    code = terrno;
×
NEW
417
    goto _OVER;
×
418
  }
419

420
  for (int32_t i = 0; i < numOfCols; i++) {
564✔
421
    SFieldWithOptions *pField = taosArrayGet(pStream->outCols, i);
376✔
422
    if (pField == NULL) {
376✔
NEW
423
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
NEW
424
      goto _OVER;
×
425
    }
426

427
    createReq.ntb.schemaRow.pSchema[i].colId = i + 1;
376✔
428
    createReq.ntb.schemaRow.pSchema[i].type = pField->type;
376✔
429
    createReq.ntb.schemaRow.pSchema[i].bytes = pField->bytes;
376✔
430
    createReq.ntb.schemaRow.pSchema[i].flags = pField->flags;
376✔
431
    tstrncpy(createReq.ntb.schemaRow.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
376✔
432

433
    if (IS_DECIMAL_TYPE(pField->type)) {
376✔
NEW
434
      if (createReq.pExtSchemas == NULL) {
×
NEW
435
        createReq.pExtSchemas = taosMemoryCalloc(numOfCols, sizeof(SExtSchema));
×
NEW
436
        if (createReq.pExtSchemas == NULL) {
×
NEW
437
          code = terrno;
×
NEW
438
          goto _OVER;
×
439
        }
440
      }
NEW
441
      createReq.pExtSchemas[i].typeMod = pField->typeMod;
×
442
    }
443
  }
444

445
  // Initialize colCmpr with default encode/compress/level per column type
446
  code = tInitDefaultSColCmprWrapperByCols(&createReq.colCmpr, numOfCols);
188✔
447
  if (code != TSDB_CODE_SUCCESS) {
188✔
NEW
448
    goto _OVER;
×
449
  }
450
  for (int32_t i = 0; i < numOfCols; i++) {
564✔
451
    SSchema *pSchema = &createReq.ntb.schemaRow.pSchema[i];
376✔
452
    createReq.colCmpr.pColCmpr[i].id = pSchema->colId;
376✔
453
    createReq.colCmpr.pColCmpr[i].alg = createDefaultColCmprByType(pSchema->type);
376✔
454
  }
455

456
  // Build SVCreateTbBatchReq (vnode expects batch request)
457
  SVCreateTbBatchReq batchReq = {0};
188✔
458
  batchReq.nReqs = 1;
188✔
459
  batchReq.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
188✔
460
  if (batchReq.pArray == NULL) {
188✔
NEW
461
    code = terrno;
×
NEW
462
    goto _OVER;
×
463
  }
464
  if (taosArrayPush(batchReq.pArray, &createReq) == NULL) {
376✔
NEW
465
    code = terrno;
×
NEW
466
    taosArrayDestroy(batchReq.pArray);
×
NEW
467
    goto _OVER;
×
468
  }
469
  batchReq.source = TD_REQ_FROM_APP;
188✔
470

471
  // Serialize the batch request
472
  int32_t contLen = 0;
188✔
473
  int32_t ret = 0;
188✔
474
  tEncodeSize(tEncodeSVCreateTbBatchReq, &batchReq, contLen, ret);
188✔
475
  if (ret < 0) {
188✔
NEW
476
    code = terrno;
×
NEW
477
    taosArrayDestroy(batchReq.pArray);
×
NEW
478
    goto _OVER;
×
479
  }
480

481
  contLen += sizeof(SMsgHead);
188✔
482

483
  SMsgHead *pHead = taosMemoryCalloc(1, contLen);
188✔
484
  if (pHead == NULL) {
188✔
NEW
485
    code = terrno;
×
NEW
486
    taosArrayDestroy(batchReq.pArray);
×
NEW
487
    goto _OVER;
×
488
  }
489
  pHead->contLen = htonl(contLen);
188✔
490
  pHead->vgId = htonl(pVgroup->vgId);
188✔
491

492
  SEncoder encoder = {0};
188✔
493
  void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
188✔
494
  tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
188✔
495
  code = tEncodeSVCreateTbBatchReq(&encoder, &batchReq);
188✔
496
  tEncoderClear(&encoder);
188✔
497
  taosArrayDestroy(batchReq.pArray);
188✔
498
  if (code < 0) {
188✔
NEW
499
    taosMemoryFree(pHead);
×
NEW
500
    goto _OVER;
×
501
  }
502

503
  // Add to transaction redo actions
504
  STransAction action = {0};
188✔
505
  action.mTraceId = pTrans->mTraceId;
188✔
506
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
188✔
507
  action.pCont = pHead;
188✔
508
  action.contLen = contLen;
188✔
509
  action.msgType = TDMT_VND_CREATE_TABLE;
188✔
510
  action.acceptableCode = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
188✔
511
  action.retryCode = TSDB_CODE_TDB_TABLE_NOT_EXIST;
188✔
512
  action.groupId = pVgroup->vgId;
188✔
513

514
  code = mndTransAppendRedoAction(pTrans, &action);
188✔
515
  if (code != TSDB_CODE_SUCCESS) {
188✔
NEW
516
    taosMemoryFree(pHead);
×
NEW
517
    goto _OVER;
×
518
  }
519

520
  // Build undo action (drop table if transaction fails)
521
  SVDropTbReq dropReq = {0};
188✔
522
  dropReq.name = createReq.name;  // vnode metaCheckDropTableReq requires name
188✔
523
  dropReq.uid = createReq.uid;
188✔
524
  dropReq.igNotExists = 1;  // Ignore if table doesn't exist
188✔
525
  dropReq.isVirtual = 0;
188✔
526

527
  SVDropTbBatchReq dropBatchReq = {0};
188✔
528
  dropBatchReq.nReqs = 1;
188✔
529
  dropBatchReq.pArray = taosArrayInit(1, sizeof(SVDropTbReq));
188✔
530
  if (dropBatchReq.pArray == NULL) {
188✔
NEW
531
    code = terrno;
×
NEW
532
    goto _OVER;
×
533
  }
534
  if (taosArrayPush(dropBatchReq.pArray, &dropReq) == NULL) {
376✔
NEW
535
    code = terrno;
×
NEW
536
    taosArrayDestroy(dropBatchReq.pArray);
×
NEW
537
    goto _OVER;
×
538
  }
539

540
  // Serialize drop batch request
541
  int32_t dropContLen = 0;
188✔
542
  int32_t dropRet = 0;
188✔
543
  tEncodeSize(tEncodeSVDropTbBatchReq, &dropBatchReq, dropContLen, dropRet);
188✔
544
  if (dropRet < 0) {
188✔
NEW
545
    code = terrno;
×
NEW
546
    taosArrayDestroy(dropBatchReq.pArray);
×
NEW
547
    goto _OVER;
×
548
  }
549

550
  dropContLen += sizeof(SMsgHead);
188✔
551
  SMsgHead *pDropHead = taosMemoryCalloc(1, dropContLen);
188✔
552
  if (pDropHead == NULL) {
188✔
NEW
553
    code = terrno;
×
NEW
554
    taosArrayDestroy(dropBatchReq.pArray);
×
NEW
555
    goto _OVER;
×
556
  }
557
  pDropHead->contLen = htonl(dropContLen);
188✔
558
  pDropHead->vgId = htonl(pVgroup->vgId);
188✔
559

560
  SEncoder dropEncoder = {0};
188✔
561
  void *pDropBuf = POINTER_SHIFT(pDropHead, sizeof(SMsgHead));
188✔
562
  tEncoderInit(&dropEncoder, pDropBuf, dropContLen - sizeof(SMsgHead));
188✔
563
  code = tEncodeSVDropTbBatchReq(&dropEncoder, &dropBatchReq);
188✔
564
  tEncoderClear(&dropEncoder);
188✔
565
  taosArrayDestroy(dropBatchReq.pArray);
188✔
566
  if (code < 0) {
188✔
NEW
567
    taosMemoryFree(pDropHead);
×
NEW
568
    goto _OVER;
×
569
  }
570

571
  // Add undo action
572
  STransAction undoAction = {0};
188✔
573
  undoAction.epSet = mndGetVgroupEpset(pMnode, pVgroup);
188✔
574
  undoAction.pCont = pDropHead;
188✔
575
  undoAction.contLen = dropContLen;
188✔
576
  undoAction.msgType = TDMT_VND_DROP_TABLE;
188✔
577
  undoAction.acceptableCode = TSDB_CODE_TDB_TABLE_NOT_EXIST;
188✔
578

579
  code = mndTransAppendUndoAction(pTrans, &undoAction);
188✔
580
  if (code != TSDB_CODE_SUCCESS) {
188✔
NEW
581
    taosMemoryFree(pDropHead);
×
NEW
582
    goto _OVER;
×
583
  }
584

585
  mInfo("stream:%s created output normal table:%s in vgroup:%d", pStream->name, pStream->outTblName, pVgroup->vgId);
188✔
586

587
_OVER:
188✔
588
  // Free resources (note: pHead is owned by transaction, don't free it here)
589
  if (createReq.name) taosMemoryFree(createReq.name);
188✔
590
  if (createReq.ntb.schemaRow.pSchema) taosMemoryFree(createReq.ntb.schemaRow.pSchema);
188✔
591
  if (createReq.pExtSchemas) taosMemoryFree(createReq.pExtSchemas);
188✔
592
  if (createReq.colCmpr.pColCmpr) taosMemoryFreeClear(createReq.colCmpr.pColCmpr);
188✔
593

594
  mndReleaseVgroup(pMnode, pVgroup);
188✔
595
  mndReleaseDb(pMnode, pDb);
188✔
596

597
  if (code != TSDB_CODE_SUCCESS) {
188✔
NEW
598
    mError("stream:%s failed to create output normal table:%s, line:%d code:%s", pStream->name,
×
599
              pStream->outTblName, lino, tstrerror(code));
600
  }
601

602
  return code;
188✔
603
}
604

605
static int32_t mndStreamValidateCreate(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq* pCreate) {
197,380✔
606
  int32_t code = 0, lino = 0;
197,380✔
607
  int64_t streamId = pCreate->streamId;
197,380✔
608
  char   *pUser = RPC_MSG_USER(pReq);
197,380✔
609

610
  if (pCreate->streamDB) {
197,380✔
611
    // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->streamDB);
612
    code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
197,380✔
613
                                     pCreate->streamDB, false);
197,380✔
614
    if (code) {
197,380✔
UNCOV
615
      if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
616
      mstsError("user %s failed to create stream %s in db %s since %s", pUser, pCreate->name, pCreate->streamDB,
×
617
                tstrerror(code));
618
    }
619
    TSDB_CHECK_CODE(code, lino, _OVER);
197,380✔
620
  }
621

622
  if (pCreate->triggerDB) {
197,380✔
623
    // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, pCreate->triggerDB);
624
    code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
193,151✔
625
                                     pCreate->triggerDB, false);
193,151✔
626
    if (code) {
193,151✔
627
      if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
628
      mstsError("user %s failed to create stream %s using trigger db %s since %s", pUser, pCreate->name,
×
629
                pCreate->triggerDB, tstrerror(code));
630
    }
631
    TSDB_CHECK_CODE(code, lino, _OVER);
193,151✔
632
#if 0  // TODO check the owner of trigger table
633
    if (pCreate->triggerTblName) {
634
      // check trigger table privilege
635
      code = mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_TBL_SELECT, "", pCreate->triggerDB, pCreate->triggerTblName);
636
      if (code) {
637
        mstsError("user %s failed to create stream %s using trigger table %s.%s since %s", pUser, pCreate->name,
638
                  pCreate->triggerDB, pCreate->triggerTblName, tstrerror(code));
639
      }
640
      TSDB_CHECK_CODE(code, lino, _OVER);
641
    }
642
#endif
643
  }
644

645
  if (pCreate->calcDB) {
197,380✔
646
    int32_t dbNum = taosArrayGetSize(pCreate->calcDB);
194,062✔
647
    for (int32_t i = 0; i < dbNum; ++i) {
388,124✔
648
      char *calcDB = taosArrayGetP(pCreate->calcDB, i);
194,062✔
649
      // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, calcDB);
650
      code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, calcDB, false);
194,062✔
651
      if (code) {
194,062✔
UNCOV
652
        if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
UNCOV
653
        mstsError("user %s failed to create stream %s using calcDB %s since %s", pUser, pCreate->name, calcDB,
×
654
                  tstrerror(code));
655
      }
656
      TSDB_CHECK_CODE(code, lino, _OVER);
194,062✔
657
    }
658
  }
659

660
  if (pCreate->outDB) {
197,380✔
661
    // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->outDB);
662
    code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pCreate->outDB,
194,062✔
663
                                     false);
664
    if (code) {
194,062✔
UNCOV
665
      if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
UNCOV
666
      mstsError("user %s failed to create stream %s using out db %s since %s", pUser, pCreate->name, pCreate->outDB,
×
667
                tstrerror(code));
668
    }
669
    TSDB_CHECK_CODE(code, lino, _OVER);
194,062✔
670
  }
671

672
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
197,380✔
673
  if (streamNum > MND_STREAM_MAX_NUM) {
197,380✔
UNCOV
674
    code = TSDB_CODE_MND_TOO_MANY_STREAMS;
×
UNCOV
675
    mstsError("failed to create stream %s since %s, stream number:%d", pCreate->name, tstrerror(code), streamNum);
×
UNCOV
676
    return code;
×
677
  }
678

679
_OVER:
197,380✔
680

681
  return code;
197,380✔
682
}
683

684
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
694,562✔
685
  SSdb   *pSdb = pMnode->pSdb;
694,562✔
686
  void   *pIter = NULL;
694,562✔
687
  int32_t code = 0;
694,562✔
688

689
  while (1) {
29,502✔
690
    SStreamObj *pStream = NULL;
724,064✔
691
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
724,064✔
692
    if (pIter == NULL) break;
724,064✔
693

694
    if (0 == strcmp(pStream->pCreate->streamDB, pDb->name)) {
29,502✔
695
      mInfo("start to drop stream %s in db %s", pStream->pCreate->name, pDb->name);
18,478✔
696
      
697
      pStream->updateTime = taosGetTimestampMs();
36,956✔
698
      
699
      atomic_store_8(&pStream->userDropped, 1);
18,478✔
700
      
701
      MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
18,478✔
702
      
703
      msmUndeployStream(pMnode, pStream->pCreate->streamId, pStream->pCreate->name);
18,478✔
704
      
705
      // drop stream
706
      code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
18,478✔
707
      if (code) {
18,478✔
UNCOV
708
        mError("drop db trans:%d failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
709
        sdbRelease(pSdb, pStream);
×
UNCOV
710
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
711
        TAOS_RETURN(code);
×
712
      }
713
    }
714

715
    sdbRelease(pSdb, pStream);
29,502✔
716
  }
717

718
  return 0;
694,562✔
719
}
720

721
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
317,066✔
722
  SMnode     *pMnode = pReq->info.node;
317,066✔
723
  SSdb       *pSdb = pMnode->pSdb;
317,066✔
724
  int32_t     numOfRows = 0;
317,066✔
725
  SStreamObj *pStream = NULL;
317,066✔
726
  SUserObj   *pOperUser = NULL;
317,066✔
727
  int32_t     code = 0, lino = 0;
317,066✔
728
  bool        showAll = false;
317,066✔
729

730
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
317,066✔
731
  showAll =
317,066✔
732
      (0 == mndCheckObjPrivilegeRec(pMnode, pOperUser, PRIV_CM_SHOW, PRIV_OBJ_STREAM, 0, pOperUser->acctId, "*", "*"));
317,066✔
733

734
  while (numOfRows < rows) {
947,239✔
735
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
947,239✔
736
    if (pShow->pIter == NULL) break;
947,239✔
737

738
    if (!showAll) {
630,173✔
739
      if ((mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_SHOW, PRIV_OBJ_STREAM, pStream->ownerId,
3,406✔
740
                                    pStream->pCreate->streamDB, mndGetStableStr(pStream->pCreate->name)))) {
3,406✔
741
        sdbRelease(pSdb, pStream);
3,144✔
742
        continue;
3,144✔
743
      }
744
    }
745

746
    code = mstSetStreamAttrResBlock(pMnode, pStream, pBlock, numOfRows);
627,029✔
747
    if (code == 0) {
627,029✔
748
      numOfRows++;
627,029✔
749
    }
750
    sdbRelease(pSdb, pStream);
627,029✔
751
  }
752
  code = 0;
317,066✔
753
  pShow->numOfRows += numOfRows;
317,066✔
754
_exit:
317,066✔
755
  mndReleaseUser(pMnode, pOperUser);
317,066✔
756
  if (code != 0) {
317,066✔
UNCOV
757
    mError("failed to retrieve stream list at line %d since %s", lino, tstrerror(code));
×
UNCOV
758
    TAOS_RETURN(code);
×
759
  }
760
  return numOfRows;
317,066✔
761
}
762

UNCOV
763
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
764
  SSdb *pSdb = pMnode->pSdb;
×
765
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
766
}
×
767

768
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
376,384✔
769
  SMnode     *pMnode = pReq->info.node;
376,384✔
770
  SSdb       *pSdb = pMnode->pSdb;
376,384✔
771
  int32_t     numOfRows = 0;
376,384✔
772
  SStreamObj *pStream = NULL;
376,384✔
773
  int32_t     code = 0;
376,384✔
774

775
  while (numOfRows < rowsCapacity) {
3,084,271✔
776
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
2,973,667✔
777
    if (pShow->pIter == NULL) {
2,973,667✔
778
      break;
265,780✔
779
    }
780

781
    code = mstSetStreamTasksResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
2,707,887✔
782

783
    sdbRelease(pSdb, pStream);
2,707,887✔
784
  }
785

786
  pShow->numOfRows += numOfRows;
376,384✔
787
  return numOfRows;
376,384✔
788
}
789

UNCOV
790
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
UNCOV
791
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
792
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
793
}
×
794

795
static int32_t mndRetrieveStreamRecalculates(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
272✔
796
  SMnode     *pMnode = pReq->info.node;
272✔
797
  SSdb       *pSdb = pMnode->pSdb;
272✔
798
  int32_t     numOfRows = 0;
272✔
799
  SStreamObj *pStream = NULL;
272✔
800
  int32_t     code = 0;
272✔
801

802
  while (numOfRows < rowsCapacity) {
544✔
803
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
544✔
804
    if (pShow->pIter == NULL) {
544✔
805
      break;
272✔
806
    }
807

808
    code = mstSetStreamRecalculatesResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
272✔
809

810
    sdbRelease(pSdb, pStream);
272✔
811
  }
812

813
  pShow->numOfRows += numOfRows;
272✔
814
  return numOfRows;
272✔
815
}
816

UNCOV
817
static void mndCancelGetNextStreamRecalculates(SMnode *pMnode, void *pIter) {
×
818
  SSdb *pSdb = pMnode->pSdb;
×
819
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
820
}
×
821

822

823
static bool mndStreamUpdateTagsFlag(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
2,230,095✔
824
  SStreamObj *pStream = pObj;
2,230,095✔
825
  if (atomic_load_8(&pStream->userDropped)) {
2,230,095✔
UNCOV
826
    return true;
×
827
  }
828

829
  if (TSDB_SUPER_TABLE != pStream->pCreate->triggerTblType && 
2,230,095✔
830
      TSDB_CHILD_TABLE != pStream->pCreate->triggerTblType && 
1,286,363✔
831
      TSDB_VIRTUAL_CHILD_TABLE != pStream->pCreate->triggerTblType) {
907,287✔
832
    return true;
107,995✔
833
  }
834

835
  if (pStream->pCreate->triggerTblSuid != *(uint64_t*)p1) {
2,122,100✔
836
    return true;
2,041,243✔
837
  }
838

839
  if (NULL == pStream->pCreate->partitionCols) {
80,857✔
840
    return true;
45,820✔
841
  }
842

843
  SNodeList* pList = NULL;
35,037✔
844
  int32_t code = nodesStringToList(pStream->pCreate->partitionCols, &pList);
35,037✔
845
  if (code) {
35,037✔
846
    nodesDestroyList(pList);
×
847
    mstError("partitionCols [%s] nodesStringToList failed with error:%s", (char*)pStream->pCreate->partitionCols, tstrerror(code));
×
UNCOV
848
    return true;
×
849
  }
850

851
  SSchema* pTags = (SSchema*)p2;
35,037✔
852
  int32_t* tagNum = (int32_t*)p3;
35,037✔
853

854
  SNode* pNode = NULL;
35,037✔
855
  FOREACH(pNode, pList) {
118,334✔
856
    SColumnNode* pCol = (SColumnNode*)pNode;
83,297✔
857
    for (int32_t i = 0; i < *tagNum; ++i) {
258,037✔
858
      if (pCol->colId == pTags[i].colId) {
246,051✔
859
        pTags[i].flags |= COL_REF_BY_STM;
71,311✔
860
        break;
71,311✔
861
      }
862
    }
863
  }
864

865
  nodesDestroyList(pList);
35,037✔
866
  
867
  return true;
35,037✔
868
}
869

870

871
void mndStreamUpdateTagsRefFlag(SMnode *pMnode, int64_t suid, SSchema* pTags, int32_t tagNum) {
15,652,776✔
872
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
15,652,776✔
873
  if (streamNum <= 0) {
15,652,776✔
874
    return;
15,469,547✔
875
  }
876

877
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mndStreamUpdateTagsFlag, &suid, pTags, &tagNum);
183,229✔
878
}
879

880
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) {
2,814✔
881
  SMnode     *pMnode = pReq->info.node;
2,814✔
882
  SStreamObj *pStream = NULL;
2,814✔
883
  SUserObj   *pOperUser = NULL;
2,814✔
884
  int32_t     code = 0;
2,814✔
885

886
  SMPauseStreamReq pauseReq = {0};
2,814✔
887
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
2,814✔
UNCOV
888
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
889
  }
890

891
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
2,814✔
892
  if (pStream == NULL || code != 0) {
2,814✔
893
    if (pauseReq.igNotExists) {
×
UNCOV
894
      mInfo("stream:%s, not exist, not stop stream", pauseReq.name);
×
UNCOV
895
      taosMemoryFree(pauseReq.name);
×
UNCOV
896
      return 0;
×
897
    } else {
UNCOV
898
      mError("stream:%s not exist, failed to stop stream", pauseReq.name);
×
UNCOV
899
      taosMemoryFree(pauseReq.name);
×
UNCOV
900
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
901
    }
902
  }
903

904
  taosMemoryFree(pauseReq.name);
2,814✔
905

906
  int64_t streamId = pStream->pCreate->streamId;
2,814✔
907
  
908
  mstsInfo("start to stop stream %s", pStream->name);
2,814✔
909

910
  // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
911
  if((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) {
2,814✔
UNCOV
912
    mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
913
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
914
    return code;
×
915
  }
916
  if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
2,814✔
917
                                        pStream->pCreate->streamDB, false))) {
2,814✔
918
    if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
919
  }
920
  if ((code != TSDB_CODE_SUCCESS) ||
5,628✔
921
      (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_STOP, PRIV_OBJ_STREAM, pStream->ownerId,
2,814✔
922
                                       pStream->pCreate->streamDB, mndGetStableStr(pStream->pCreate->name)))) {
2,814✔
923
    mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
306✔
924
    mndReleaseUser(pMnode, pOperUser);
306✔
925
    sdbRelease(pMnode->pSdb, pStream);
306✔
926
    return code;
306✔
927
  }
928

929
  mndReleaseUser(pMnode, pOperUser); // release user after privilege check
2,508✔
930

931
  if (atomic_load_8(&pStream->userDropped)) {
2,508✔
UNCOV
932
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
UNCOV
933
    mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
934
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
935
    return code;
×
936
  }
937

938
  STrans *pTrans = NULL;
2,508✔
939
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, &pTrans);
2,508✔
940
  if (pTrans == NULL || code) {
2,508✔
941
    mstsError("failed to stop stream %s since %s", pStream->name, tstrerror(code));
×
UNCOV
942
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
943
    return code;
×
944
  }
945

946
  pStream->updateTime = taosGetTimestampMs();
5,016✔
947

948
  atomic_store_8(&pStream->userStopped, 1);
2,508✔
949

950
  MND_STREAM_SET_LAST_TS(STM_EVENT_STOP_STREAM, pStream->updateTime);
2,508✔
951

952
  msmUndeployStream(pMnode, streamId, pStream->name);
2,508✔
953

954
  // stop stream
955
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
2,508✔
956
  if (code != TSDB_CODE_SUCCESS) {
2,508✔
UNCOV
957
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
958
    mndTransDrop(pTrans);
×
UNCOV
959
    return code;
×
960
  }
961

962
  code = mndTransPrepare(pMnode, pTrans);
2,508✔
963
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,508✔
UNCOV
964
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
965
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
966
    mndTransDrop(pTrans);
×
UNCOV
967
    return code;
×
968
  }
969

970
  sdbRelease(pMnode->pSdb, pStream);
2,508✔
971
  mndTransDrop(pTrans);
2,508✔
972

973
  return TSDB_CODE_ACTION_IN_PROGRESS;
2,508✔
974
}
975

976

977
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq) {
2,464✔
978
  SMnode     *pMnode = pReq->info.node;
2,464✔
979
  SStreamObj *pStream = NULL;
2,464✔
980
  SUserObj   *pOperUser = NULL;
2,464✔
981
  int32_t     code = 0;
2,464✔
982

983
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
2,464✔
UNCOV
984
    return code;
×
985
  }
986

987
  SMResumeStreamReq resumeReq = {0};
2,464✔
988
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
2,464✔
UNCOV
989
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
990
  }
991

992
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
2,464✔
993
  if (pStream == NULL || code != 0) {
2,464✔
UNCOV
994
    if (resumeReq.igNotExists) {
×
UNCOV
995
      mInfo("stream:%s not exist, not start stream", resumeReq.name);
×
UNCOV
996
      taosMemoryFree(resumeReq.name);
×
UNCOV
997
      sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
998
      return 0;
×
999
    } else {
UNCOV
1000
      mError("stream:%s not exist, failed to start stream", resumeReq.name);
×
UNCOV
1001
      taosMemoryFree(resumeReq.name);
×
UNCOV
1002
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1003
    }
1004
  }
1005

1006
  taosMemoryFree(resumeReq.name);
2,464✔
1007

1008
  int64_t streamId = pStream->pCreate->streamId;
2,464✔
1009

1010
  mstsInfo("start to start stream %s from stopped", pStream->name);
2,464✔
1011

1012
  // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
1013
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) {
2,464✔
UNCOV
1014
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
1015
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1016
    return code;
×
1017
  }
1018
  if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
2,464✔
1019
                                        pStream->pCreate->streamDB, false))) {
2,464✔
UNCOV
1020
    if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
1021
  }
1022
  if ((code != TSDB_CODE_SUCCESS) ||
4,928✔
1023
      (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_START, PRIV_OBJ_STREAM, pStream->ownerId,
2,464✔
1024
                                       pStream->pCreate->streamDB, mndGetStableStr(pStream->pCreate->name)))) {
2,464✔
1025
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
131✔
1026
    mndReleaseUser(pMnode, pOperUser);
131✔
1027
    sdbRelease(pMnode->pSdb, pStream);
131✔
1028
    return code;
131✔
1029
  }
1030

1031
  mndReleaseUser(pMnode, pOperUser); // release user after privilege check
2,333✔
1032

1033
  if (atomic_load_8(&pStream->userDropped)) {
2,333✔
UNCOV
1034
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
UNCOV
1035
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
1036
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1037
    return code;
×
1038
  }
1039

1040
  if (0 == atomic_load_8(&pStream->userStopped)) {
2,333✔
UNCOV
1041
    code = TSDB_CODE_MND_STREAM_NOT_STOPPED;
×
UNCOV
1042
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
1043
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1044
    return code;
×
1045
  }
1046
  
1047
  atomic_store_8(&pStream->userStopped, 0);
2,333✔
1048

1049
  pStream->updateTime = taosGetTimestampMs();
4,666✔
1050

1051
  MND_STREAM_SET_LAST_TS(STM_EVENT_START_STREAM, pStream->updateTime);
2,333✔
1052

1053
  STrans *pTrans = NULL;
2,333✔
1054
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_START_NAME, &pTrans);
2,333✔
1055
  if (pTrans == NULL || code) {
2,333✔
1056
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
1057
    sdbRelease(pMnode->pSdb, pStream);
×
1058
    return code;
×
1059
  }
1060

1061
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
2,333✔
1062
  if (code != TSDB_CODE_SUCCESS) {
2,333✔
UNCOV
1063
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
UNCOV
1064
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1065
    mndTransDrop(pTrans);
×
UNCOV
1066
    return code;
×
1067
  }
1068

1069
  code = mndTransPrepare(pMnode, pTrans);
2,333✔
1070
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,333✔
UNCOV
1071
    mstsError("trans:%d, failed to prepare start stream %s trans since %s", pTrans->id, pStream->name, tstrerror(code));
×
UNCOV
1072
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1073
    mndTransDrop(pTrans);
×
UNCOV
1074
    return code;
×
1075
  }
1076

1077
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->name, NULL, true, STREAM_ACT_DEPLOY);
2,333✔
1078

1079
  sdbRelease(pMnode->pSdb, pStream);
2,333✔
1080
  mndTransDrop(pTrans);
2,333✔
1081

1082
  return TSDB_CODE_ACTION_IN_PROGRESS;
2,333✔
1083
}
1084

1085
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
20,235✔
1086
  SMnode     *pMnode = pReq->info.node;
20,235✔
1087
  SStreamObj *pStream = NULL;
20,235✔
1088
  SUserObj   *pOperUser = NULL;
20,235✔
1089
  int32_t     code = 0;
20,235✔
1090
  int32_t     notExistNum = 0;
20,235✔
1091

1092
  SMDropStreamReq dropReq = {0};
20,235✔
1093
  int64_t         tss = taosGetTimestampMs();
20,235✔
1094
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
20,235✔
UNCOV
1095
    mError("invalid drop stream msg recv, discarded");
×
UNCOV
1096
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1097
    TAOS_RETURN(code);
×
1098
  }
1099

1100
  mDebug("recv drop stream msg, count:%d", dropReq.count);
20,235✔
1101

1102
  // Acquire user object for privilege check
1103
  code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser);
20,235✔
1104
  if (code != 0) {
20,235✔
UNCOV
1105
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1106
    TAOS_RETURN(code);
×
1107
  }
1108

1109
  // check if all streams exist
1110
  if (!dropReq.igNotExists) {
20,235✔
1111
    for (int32_t i = 0; i < dropReq.count; i++) {
28,654✔
1112
      if (!sdbCheckExists(pMnode->pSdb, SDB_STREAM, dropReq.name[i])) {
14,972✔
1113
        mError("stream:%s not exist failed to drop it", dropReq.name[i]);
1,032✔
1114
        mndReleaseUser(pMnode, pOperUser);
1,032✔
1115
        tFreeMDropStreamReq(&dropReq);
1,032✔
1116
        TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
1,032✔
1117
      }
1118
    }
1119
  }
1120

1121
  // Create a single transaction for all stream drops
1122
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_DROP_NAME);
19,203✔
1123
  if (pTrans == NULL) {
19,203✔
UNCOV
1124
    mError("failed to create drop stream transaction since %s", tstrerror(terrno));
×
UNCOV
1125
    code = terrno;
×
UNCOV
1126
    mndReleaseUser(pMnode, pOperUser);
×
UNCOV
1127
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
1128
    TAOS_RETURN(code);
×
1129
  }
1130
  pTrans->ableToBeKilled = true;
19,203✔
1131

1132
  // Process all streams and add them to the transaction
1133
  for (int32_t i = 0; i < dropReq.count; i++) {
39,648✔
1134
    char *streamName = dropReq.name[i];
20,751✔
1135
    mDebug("drop stream[%d/%d]: %s", i + 1, dropReq.count, streamName);
20,751✔
1136

1137
    code = mndAcquireStream(pMnode, streamName, &pStream);
20,751✔
1138
    if (pStream == NULL || code != 0) {
20,751✔
1139
      mWarn("stream:%s not exist, ignore not exist is set, drop stream exec done with success", streamName);
5,049✔
1140
      sdbRelease(pMnode->pSdb, pStream);
5,049✔
1141
      pStream = NULL;
5,049✔
1142
      notExistNum++;
5,049✔
1143
      continue;
5,049✔
1144
    }
1145

1146
    int64_t streamId = pStream->pCreate->streamId;
15,702✔
1147

1148
    if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
15,702✔
1149
                                          pStream->pCreate->streamDB, true))) {
15,702✔
UNCOV
1150
      if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
×
1151
    }
1152
    if ((code != TSDB_CODE_SUCCESS) ||
31,404✔
1153
        (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_DROP, PRIV_OBJ_STREAM, pStream->ownerId,
15,702✔
1154
                                         pStream->pCreate->streamDB, mndGetStableStr(pStream->pCreate->name)))) {
15,702✔
1155
      mstsError("user %s failed to drop stream %s since %s", pReq->info.conn.user, streamName, tstrerror(code));
306✔
1156
      sdbRelease(pMnode->pSdb, pStream);
306✔
1157
      pStream = NULL;
306✔
1158
      mndTransDrop(pTrans);
306✔
1159
      pTrans = NULL;
306✔
1160
      goto _OVER;
306✔
1161
    }
1162

1163
    if (pStream->pCreate->tsmaId != 0) {
15,396✔
1164
      mstsDebug("try to drop tsma related stream, tsmaId:%" PRIx64, pStream->pCreate->tsmaId);
2,327✔
1165

1166
      void    *pIter = NULL;
2,327✔
1167
      SSmaObj *pSma = NULL;
2,327✔
1168
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
2,327✔
1169
      while (pIter) {
4,663✔
1170
        if (pSma && pSma->uid == pStream->pCreate->tsmaId) {
2,336✔
UNCOV
1171
          sdbRelease(pMnode->pSdb, pSma);
×
UNCOV
1172
          sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1173
          pStream = NULL;
×
1174

UNCOV
1175
          sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
1176
          code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
1177

1178
          mstsError("refused to drop tsma-related stream %s since tsma still exists", streamName);
×
1179
          mndTransDrop(pTrans);
×
UNCOV
1180
          pTrans = NULL;
×
UNCOV
1181
          goto _OVER;
×
1182
        }
1183

1184
        if (pSma) {
2,336✔
1185
          sdbRelease(pMnode->pSdb, pSma);
2,336✔
1186
        }
1187

1188
        pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
2,336✔
1189
      }
1190
    }
1191

1192
    mstsInfo("start to drop stream %s", pStream->pCreate->name);
15,396✔
1193

1194
    pStream->updateTime = taosGetTimestampMs();
30,792✔
1195

1196
    atomic_store_8(&pStream->userDropped, 1);
15,396✔
1197

1198
    MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
15,396✔
1199

1200
    msmUndeployStream(pMnode, streamId, pStream->pCreate->name);
15,396✔
1201

1202
    // Append drop stream operation to the transaction
1203
    code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
15,396✔
1204
    if (code) {
15,396✔
UNCOV
1205
      mstsError("trans:%d, failed to append drop stream %s trans since %s", pTrans->id, streamName, tstrerror(code));
×
1206
      sdbRelease(pMnode->pSdb, pStream);
×
1207
      pStream = NULL;
×
1208
      // mndStreamTransAppend already called mndTransDrop on failure, set pTrans to NULL to avoid double free
UNCOV
1209
      pTrans = NULL;
×
UNCOV
1210
      goto _OVER;
×
1211
    }
1212

1213
    sdbRelease(pMnode->pSdb, pStream);
15,396✔
1214
    pStream = NULL;
15,396✔
1215

1216
    mstsDebug("drop stream %s added to transaction", streamName);
15,396✔
1217
  }
1218

1219
  // Prepare and execute the transaction for all streams
1220
  if (notExistNum < dropReq.count) {
18,897✔
1221
    code = mndTransPrepare(pMnode, pTrans);
14,622✔
1222
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
14,622✔
1223
      mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
1224
      mndTransDrop(pTrans);
×
UNCOV
1225
      goto _OVER;
×
1226
    }
1227
    mInfo("trans:%d, drop stream transaction prepared for %d streams", pTrans->id, dropReq.count - notExistNum);
14,622✔
1228
  } else {
1229
    // All streams don't exist, no need to prepare transaction
1230
    mndTransDrop(pTrans);
4,275✔
1231
    pTrans = NULL;
4,275✔
1232
  }
1233

1234
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE && notExistNum < dropReq.count) {
18,897✔
1235
    int64_t tse = taosGetTimestampMs();
14,622✔
1236
    double  duration = (double)(tse - tss);
14,622✔
1237
    duration = duration / 1000;
14,622✔
1238
    // Use first stream's database for audit (assuming all streams are from same db in batch)
1239
    if (dropReq.count > 0) {
14,622✔
1240
      SStreamObj *pFirstStream = NULL;
14,622✔
1241
      if (mndAcquireStream(pMnode, dropReq.name[0], &pFirstStream) == 0 && pFirstStream != NULL) {
14,622✔
UNCOV
1242
        auditRecord(pReq, pMnode->clusterId, "dropStream", "", pFirstStream->pCreate->streamDB, NULL, 0, duration, 0);
×
UNCOV
1243
        sdbRelease(pMnode->pSdb, pFirstStream);
×
1244
      }
1245
    }
1246
  }
1247

1248
  // If any stream was successfully added to transaction, return ACTION_IN_PROGRESS
1249
  // Otherwise, all streams don't exist (and igNotExists is set), return SUCCESS
1250
  code = (notExistNum < dropReq.count) ? TSDB_CODE_ACTION_IN_PROGRESS : TSDB_CODE_SUCCESS;
18,897✔
1251

1252
_OVER:
19,203✔
1253
  if (pStream) {
19,203✔
1254
    sdbRelease(pMnode->pSdb, pStream);
×
1255
  }
1256
  if (pTrans) {
19,203✔
1257
    mndTransDrop(pTrans);
14,622✔
1258
  }
1259
  mndReleaseUser(pMnode, pOperUser);
19,203✔
1260
  tFreeMDropStreamReq(&dropReq);
19,203✔
1261
  TAOS_RETURN(code);
19,203✔
1262
}
1263

1264
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
205,160✔
1265
  SMnode     *pMnode = pReq->info.node;
205,160✔
1266
  SStreamObj *pStream = NULL;
205,160✔
1267
  SStreamObj  streamObj = {0};
205,160✔
1268
  SUserObj    *pOperUser = NULL;
205,160✔
1269
  int32_t     code = TSDB_CODE_SUCCESS;
205,160✔
1270
  int32_t     lino = 0;
205,160✔
1271
  STrans     *pTrans = NULL;
205,160✔
1272
  uint64_t    streamId = 0;
205,160✔
1273
  SCMCreateStreamReq* pCreate = NULL;
205,160✔
1274
  int64_t             tss = taosGetTimestampMs();
205,160✔
1275

1276
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
205,160✔
UNCOV
1277
    goto _OVER;
×
1278
  }
1279

1280
  pCreate = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
205,160✔
1281
  TSDB_CHECK_NULL(pCreate, code, lino, _OVER, terrno);
205,160✔
1282
  
1283
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, pCreate);
205,160✔
1284
  TSDB_CHECK_CODE(code, lino, _OVER);
205,160✔
1285

1286
  streamId = pCreate->streamId;
205,160✔
1287

1288
  mstsInfo("start to create stream %s, sql:%s", pCreate->name, pCreate->sql);
205,160✔
1289

1290
  int32_t snodeId = msmAssignRandomSnodeId(pMnode, streamId);
205,160✔
1291
  if (!GOT_SNODE(snodeId)) {
205,160✔
1292
    code = terrno;
2,620✔
1293
    TSDB_CHECK_CODE(code, lino, _OVER);
2,620✔
1294
  }
1295
  
1296
  code = mndAcquireStream(pMnode, pCreate->name, &pStream);
202,540✔
1297
  if (pStream != NULL && code == 0) {
202,540✔
1298
    if (pCreate->igExists) {
5,160✔
1299
      mstsInfo("stream %s already exist, ignore exist is set", pCreate->name);
×
1300
    } else {
1301
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
5,160✔
1302
    }
1303

1304
    mndReleaseStream(pMnode, pStream);
5,160✔
1305
    goto _OVER;
5,160✔
1306
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
197,380✔
1307
    goto _OVER;
×
1308
  }
1309

1310
  code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser);
197,380✔
1311
  if (pOperUser == NULL) {
197,380✔
UNCOV
1312
    TSDB_CHECK_CODE(TSDB_CODE_MND_NO_USER_FROM_CONN, lino, _OVER);
×
1313
  }
1314

1315
  code = mndStreamValidateCreate(pMnode, pReq, pCreate);
197,380✔
1316
  TSDB_CHECK_CODE(code, lino, _OVER);
197,380✔
1317

1318
  mndStreamBuildObj(pMnode, &streamObj, pCreate, pOperUser, snodeId);
197,380✔
1319
  pCreate = NULL;
197,380✔
1320

1321
  pStream = &streamObj;
197,380✔
1322

1323
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, &pTrans);
197,380✔
1324
  if (pTrans == NULL || code) {
197,380✔
UNCOV
1325
    goto _OVER;
×
1326
  }
1327

1328
  // create output table for stream if it doesn't exist
1329
  if (!pStream->pCreate->outStbExists) {
197,380✔
1330
    if (TSDB_SUPER_TABLE == pStream->pCreate->outTblType) {
193,546✔
1331
      // Create super table in mnode
1332
      pStream->pCreate->outStbUid = mndGenerateUid(pStream->pCreate->outTblName, strlen(pStream->pCreate->outTblName));
69,174✔
1333
      code = mndStreamCreateOutStb(pMnode, pTrans, pStream->pCreate, RPC_MSG_USER(pReq));
69,174✔
1334
      TSDB_CHECK_CODE(code, lino, _OVER);
69,174✔
1335
      mstsInfo("stream:%s created output super table:%s", pStream->pCreate->name, pStream->pCreate->outTblName);
69,174✔
1336
    } else if (TSDB_NORMAL_TABLE == pStream->pCreate->outTblType && pStream->pCreate->nodelayCreateSubtable) {
124,372✔
1337
      // Create normal table in vnode
1338
      code = mndStreamCreateOutTable(pMnode, pTrans, pStream->pCreate);
188✔
1339
      TSDB_CHECK_CODE(code, lino, _OVER);
188✔
1340
      mstsInfo("stream:%s created output normal table:%s", pStream->pCreate->name, pStream->pCreate->outTblName);
188✔
1341
    }
1342
  } else {
1343
    // Table exists, schema validation should have been done in client side
1344
    mstsInfo("stream:%s output table:%s already exists, using existing table",
3,834✔
1345
             pStream->pCreate->name, pStream->pCreate->outTblName);
1346
  }
1347

1348
  // add stream to trans
1349
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
197,380✔
1350
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
197,380✔
1351
    mstsError("failed to persist stream %s since %s", pStream->pCreate->name, tstrerror(code));
×
1352
    goto _OVER;
×
1353
  }
1354

1355
  // execute creation
1356
  code = mndTransPrepare(pMnode, pTrans);
197,380✔
1357
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
197,380✔
UNCOV
1358
    mstsError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
UNCOV
1359
    goto _OVER;
×
1360
  }
1361
  code = TSDB_CODE_ACTION_IN_PROGRESS;
197,380✔
1362

1363
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
197,380✔
1364
    int64_t tse = taosGetTimestampMs();
197,380✔
1365
    double  duration = (double)(tse - tss);
197,380✔
1366
    duration = duration / 1000;
197,380✔
1367
    auditRecord(pReq, pMnode->clusterId, "createStream", pStream->pCreate->streamDB, pStream->pCreate->name,
197,380✔
1368
                pStream->pCreate->sql, strlen(pStream->pCreate->sql), duration, 0);
197,380✔
1369
  }
1370

1371
  MND_STREAM_SET_LAST_TS(STM_EVENT_CREATE_STREAM, taosGetTimestampMs());
373,232✔
1372

1373
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, true, STREAM_ACT_DEPLOY);
197,380✔
1374

1375
_OVER:
205,160✔
1376

1377
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
205,160✔
1378
    if (pStream && pStream->pCreate) {
7,780✔
1379
      mstsError("failed to create stream %s at line:%d since %s", pStream->pCreate->name, lino, tstrerror(code));
5,160✔
1380
    } else {
1381
      mstsError("failed to create stream at line:%d since %s", lino, tstrerror(code));
2,620✔
1382
    }
1383
  } else {
1384
    mstsDebug("create stream %s half completed", pStream->pCreate ? pStream->pCreate->name : "unknown");
197,380✔
1385
  }
1386

1387
  tFreeSCMCreateStreamReq(pCreate);
205,160✔
1388
  taosMemoryFreeClear(pCreate);
205,160✔
1389

1390
  mndTransDrop(pTrans);
205,160✔
1391
  tFreeStreamObj(&streamObj);
205,160✔
1392
  mndReleaseUser(pMnode, pOperUser);
205,160✔
1393

1394
  return code;
205,160✔
1395
}
1396

1397
static int32_t mndProcessRecalcStreamReq(SRpcMsg *pReq) {
11,705✔
1398
  SMnode     *pMnode = pReq->info.node;
11,705✔
1399
  SStreamObj *pStream = NULL;
11,705✔
1400
  SUserObj   *pOperUser = NULL;
11,705✔
1401
  int32_t     code = 0;
11,705✔
1402
  int64_t     tss = taosGetTimestampMs();
11,705✔
1403

1404
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
11,705✔
1405
    return code;
×
1406
  }
1407

1408
  SMRecalcStreamReq recalcReq = {0};
11,705✔
1409
  if (tDeserializeSMRecalcStreamReq(pReq->pCont, pReq->contLen, &recalcReq) < 0) {
11,705✔
UNCOV
1410
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1411
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1412
  }
1413

1414
  code = mndAcquireStream(pMnode, recalcReq.name, &pStream);
11,705✔
1415
  if (pStream == NULL || code != 0) {
11,705✔
UNCOV
1416
    mError("stream:%s not exist, failed to recalc stream", recalcReq.name);
×
UNCOV
1417
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1418
    TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1419
  }
1420

1421
  int64_t streamId = pStream->pCreate->streamId;
11,705✔
1422
  
1423
  mstsInfo("start to recalc stream %s", recalcReq.name);
11,705✔
1424

1425
  // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
1426
  // if (code != TSDB_CODE_SUCCESS) {
1427
  //   mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code));
1428
  //   sdbRelease(pMnode->pSdb, pStream);
1429
  //   tFreeMRecalcStreamReq(&recalcReq);
1430
  //   return code;
1431
  // }
1432

1433
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) {
11,705✔
UNCOV
1434
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
1435
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1436
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1437
    TAOS_RETURN(code);
×
1438
  }
1439

1440
  if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
11,705✔
1441
                                        pStream->pCreate->streamDB, false))) {
11,705✔
1442
    if (code == TSDB_CODE_MND_NO_RIGHTS) code = TSDB_CODE_PAR_DB_USE_PERMISSION_DENIED;
2,670✔
1443
  }
1444
  if ((code != TSDB_CODE_SUCCESS) ||
20,740✔
1445
      (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_RECALC, PRIV_OBJ_STREAM, pStream->ownerId,
9,035✔
1446
                                       pStream->pCreate->streamDB, mndGetStableStr(pStream->pCreate->name)))) {
9,035✔
1447
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
8,141✔
1448
    mndReleaseUser(pMnode, pOperUser);
8,141✔
1449
    sdbRelease(pMnode->pSdb, pStream);
8,141✔
1450
    tFreeMRecalcStreamReq(&recalcReq);
8,141✔
1451
    return code;
8,141✔
1452
  }
1453

1454
  mndReleaseUser(pMnode, pOperUser); // release user after privilege check
3,564✔
1455

1456
  if (atomic_load_8(&pStream->userDropped)) {
3,564✔
1457
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
1458
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code));
×
1459
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1460
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1461
    return code;
×
1462
  }
1463

1464
  if (atomic_load_8(&pStream->userStopped)) {
3,564✔
UNCOV
1465
    code = TSDB_CODE_MND_STREAM_STOPPED;
×
UNCOV
1466
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code));
×
UNCOV
1467
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1468
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1469
    return code;
×
1470
  }
1471

1472
  if (WINDOW_TYPE_PERIOD == pStream->pCreate->triggerType) {
3,564✔
1473
    code = TSDB_CODE_OPS_NOT_SUPPORT;
196✔
1474
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
196✔
1475
    sdbRelease(pMnode->pSdb, pStream);
196✔
1476
    tFreeMRecalcStreamReq(&recalcReq);
196✔
1477
    return code;
196✔
1478
  }
1479

1480
  /*
1481
  pStream->updateTime = taosGetTimestampMs();
1482

1483
  STrans *pTrans = NULL;
1484
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RECALC_NAME, &pTrans);
1485
  if (pTrans == NULL || code) {
1486
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
1487
    sdbRelease(pMnode->pSdb, pStream);
1488
    return code;
1489
  }
1490

1491
  // stop stream
1492
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
1493
  if (code != TSDB_CODE_SUCCESS) {
1494
    sdbRelease(pMnode->pSdb, pStream);
1495
    mndTransDrop(pTrans);
1496
    return code;
1497
  }
1498

1499
  code = mndTransPrepare(pMnode, pTrans);
1500
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1501
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
1502
    sdbRelease(pMnode->pSdb, pStream);
1503
    mndTransDrop(pTrans);
1504
    return code;
1505
  }
1506
*/
1507

1508
  code = msmRecalcStream(pMnode, pStream->pCreate->streamId, &recalcReq.timeRange);
3,368✔
1509
  if (code != TSDB_CODE_SUCCESS) {
3,368✔
UNCOV
1510
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1511
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1512
    return code;
×
1513
  }
1514

1515
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE){
3,368✔
1516
    char buf[128];
3,368✔
1517
    snprintf(buf, sizeof(buf), "start:%" PRId64 ", end:%" PRId64, recalcReq.timeRange.skey, recalcReq.timeRange.ekey);
3,368✔
1518
    int64_t tse = taosGetTimestampMs();
3,368✔
1519
    double  duration = (double)(tse - tss);
3,368✔
1520
    duration = duration / 1000;
3,368✔
1521
    auditRecord(pReq, pMnode->clusterId, "recalcStream", pStream->name, recalcReq.name, buf, strlen(buf), duration, 0);
3,368✔
1522
  }  
1523

1524
  sdbRelease(pMnode->pSdb, pStream);
3,368✔
1525
  tFreeMRecalcStreamReq(&recalcReq);
3,368✔
1526
//  mndTransDrop(pTrans);
1527

1528
  return TSDB_CODE_SUCCESS;
3,368✔
1529
}
1530

1531

1532
int32_t mndInitStream(SMnode *pMnode) {
457,222✔
1533
  SSdbTable table = {
457,222✔
1534
      .sdbType = SDB_STREAM,
1535
      .keyType = SDB_KEY_BINARY,
1536
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
1537
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
1538
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
1539
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
1540
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
1541
  };
1542

1543
  if (!tsDisableStream) {
457,222✔
1544
    mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
457,222✔
1545
    mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
457,222✔
1546
    mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessStartStreamReq);
457,222✔
1547
    mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessStopStreamReq);
457,222✔
1548
    mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);  
457,222✔
1549
    mndSetMsgHandle(pMnode, TDMT_MND_RECALC_STREAM, mndProcessRecalcStreamReq);
457,222✔
1550
  }
1551
  
1552
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
457,222✔
1553
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
457,222✔
1554
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
457,222✔
1555
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
457,222✔
1556
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndRetrieveStreamRecalculates);
457,222✔
1557
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndCancelGetNextStreamRecalculates);
457,222✔
1558

1559
  int32_t code = sdbSetTable(pMnode->pSdb, table);
457,222✔
1560
  if (code) {
457,222✔
UNCOV
1561
    return code;
×
1562
  }
1563

1564
  //code = sdbSetTable(pMnode->pSdb, tableSeq);
1565
  return code;
457,222✔
1566
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc