• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4945

30 Jan 2026 06:19AM UTC coverage: 66.87% (+0.02%) from 66.849%
#4945

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1126 of 2018 new or added lines in 72 files covered. (55.8%)

13708 existing lines in 159 files now uncovered.

205277 of 306978 relevant lines covered (66.87%)

126353544.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.92
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndTrans.h"
23
#include "mndUser.h"
24
#include "osMemory.h"
25
#include "parser.h"
26
#include "taoserror.h"
27
#include "tmisce.h"
28
#include "tname.h"
29

30
#define MND_STREAM_MAX_NUM 100000
31

32
typedef struct {
33
  int8_t placeHolder;  // // to fix windows compile error, define place holder
34
} SMStreamNodeCheckMsg;
35

36
static int32_t  mndNodeCheckSentinel = 0;
37
SStmRuntime  mStreamMgmt = {0};
38

39
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
41
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
42
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
43

44
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
45
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
46

47
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
48
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
49
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
50
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
51
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq);
52
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq);
53

54
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
55

56
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
57
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
58
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
59
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
60
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
61
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
62

63
void mndCleanupStream(SMnode *pMnode) {
399,576✔
64
  mDebug("try to clean up stream");
399,576✔
65
  
66
  msmHandleBecomeNotLeader(pMnode);
399,576✔
67
  
68
  mDebug("mnd stream runtime info cleanup");
399,576✔
69
}
399,576✔
70

71
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
185,040✔
72
  int32_t     code = 0;
185,040✔
73
  int32_t     lino = 0;
185,040✔
74
  SSdbRow    *pRow = NULL;
185,040✔
75
  SStreamObj *pStream = NULL;
185,040✔
76
  void       *buf = NULL;
185,040✔
77
  int8_t      sver = 0;
185,040✔
78
  int32_t     tlen;
184,690✔
79
  int32_t     dataPos = 0;
185,040✔
80

81
  code = sdbGetRawSoftVer(pRaw, &sver);
185,040✔
82
  TSDB_CHECK_CODE(code, lino, _over);
185,040✔
83

84
  if (sver != MND_STREAM_VER_NUMBER && sver != MND_STREAM_COMPATIBLE_VER_NUMBER) {
185,040✔
85
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
86
    goto _over;
×
87
  }
88

89
  pRow = sdbAllocRow(sizeof(SStreamObj));
185,040✔
90
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
185,040✔
91

92
  pStream = sdbGetRowObj(pRow);
185,040✔
93
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
185,040✔
94

95
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
185,040✔
96

97
  buf = taosMemoryMalloc(tlen + 1);
185,040✔
98
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
185,040✔
99

100
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
185,040✔
101

102
  SDecoder decoder;
184,690✔
103
  tDecoderInit(&decoder, buf, tlen + 1);
185,040✔
104
  code = tDecodeSStreamObj(&decoder, pStream, sver);
185,040✔
105
  tDecoderClear(&decoder);
185,040✔
106

107
  if (code < 0) {
185,040✔
108
    tFreeStreamObj(pStream);
×
109
  }
110

111
_over:
185,040✔
112
  taosMemoryFreeClear(buf);
185,040✔
113

114
  if (code != TSDB_CODE_SUCCESS) {
185,040✔
115
    char *p = (pStream == NULL || NULL == pStream->pCreate) ? "null" : pStream->pCreate->name;
×
116
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
117
    taosMemoryFreeClear(pRow);
×
118

119
    terrno = code;
×
120
    return NULL;
×
121
  } else {
122
    mTrace("stream:%s, decode from raw:%p, row:%p", pStream->pCreate->name, pRaw, pStream);
185,040✔
123

124
    terrno = 0;
185,040✔
125
    return pRow;
185,040✔
126
  }
127
}
128

129
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
157,664✔
130
  mTrace("stream:%s, perform insert action", pStream->pCreate->name);
157,664✔
131
  return 0;
157,664✔
132
}
133

134
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
185,040✔
135
  mInfo("stream:%s, perform delete action", pStream->pCreate->name);
185,040✔
136
  tFreeStreamObj(pStream);
185,040✔
137
  return 0;
185,040✔
138
}
139

140
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
4,996✔
141
  mTrace("stream:%s, perform update action", pOldStream->pCreate->name);
4,996✔
142

143
  atomic_store_32(&pOldStream->mainSnodeId, pNewStream->mainSnodeId);
4,996✔
144
  atomic_store_8(&pOldStream->userStopped, atomic_load_8(&pNewStream->userStopped));
4,996✔
145
  pOldStream->ownerId = pNewStream->ownerId;
4,996✔
146
  pOldStream->updateTime = pNewStream->updateTime;
4,996✔
147
  
148
  return 0;
4,996✔
149
}
150

151
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
349,513✔
152
  int32_t code = 0;
349,513✔
153
  SSdb   *pSdb = pMnode->pSdb;
349,513✔
154
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
349,513✔
155
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
349,513✔
156
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
163,571✔
157
  }
158
  return code;
349,513✔
159
}
160

161
static bool mndStreamGetNameFromId(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
162
  SStreamObj* pStream = pObj;
×
163

164
  if (pStream->pCreate->streamId == *(int64_t*)p1) {
×
165
    strncpy((char*)p2, pStream->name, TSDB_STREAM_NAME_LEN);
×
166
    return false;
×
167
  }
168

169
  return true;
×
170
}
171

172
int32_t mndAcquireStreamById(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
×
173
  int32_t code = 0;
×
174
  SSdb   *pSdb = pMnode->pSdb;
×
175
  char streamName[TSDB_STREAM_NAME_LEN];
×
176
  streamName[0] = 0;
×
177
  
178
  sdbTraverse(pSdb, SDB_STREAM, mndStreamGetNameFromId, &streamId, streamName, NULL);
×
179
  if (streamName[0]) {
×
180
    (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
181
    if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
182
      code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
183
    }
184
  }
185
  
186
  return code;
×
187
}
188

189
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
165,528✔
190
  SSdb *pSdb = pMnode->pSdb;
165,528✔
191
  sdbRelease(pSdb, pStream);
165,528✔
192
}
165,528✔
193

194
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
195
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
196
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
197
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
198
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
199

200
static void mndStreamBuildObj(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate, SUserObj *pOperUser,
156,882✔
201
                              int32_t snodeId) {
202
  int32_t code = 0;
156,882✔
203

204
  pObj->pCreate = pCreate;
156,882✔
205
  strncpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
156,882✔
206
  (void)snprintf(pObj->createUser, sizeof(pObj->createUser), "%s", pOperUser->name);
156,882✔
207
  pObj->ownerId = pOperUser->uid;
156,882✔
208
  pObj->mainSnodeId = snodeId;
156,882✔
209

210
  pObj->userDropped = 0;
156,882✔
211
  pObj->userStopped = 0;
156,882✔
212

213
  pObj->createTime = taosGetTimestampMs();
156,882✔
214
  pObj->updateTime = pObj->createTime;
156,882✔
215

216
  mstLogSStreamObj("create stream", pObj);
156,882✔
217
}
156,882✔
218

219
static int32_t mndStreamCreateOutStb(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream, const char *user) {
56,356✔
220
  SStbObj *pStb = NULL;
56,356✔
221
  SDbObj  *pDb = NULL;
56,356✔
222
  int32_t  code = 0;
56,356✔
223
  int32_t  lino = 0;
56,356✔
224

225
  SMCreateStbReq createReq = {0};
56,356✔
226
  TAOS_STRNCAT(createReq.name, pStream->outDB, TSDB_DB_FNAME_LEN);
56,356✔
227
  TAOS_STRNCAT(createReq.name, ".", 2);
56,356✔
228
  TAOS_STRNCAT(createReq.name,  pStream->outTblName, TSDB_TABLE_NAME_LEN);
56,356✔
229
  createReq.numOfColumns = taosArrayGetSize(pStream->outCols);
56,356✔
230
  createReq.numOfTags = pStream->outTags ? taosArrayGetSize(pStream->outTags) : 1;
56,356✔
231
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
56,356✔
232
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
56,356✔
233

234
  // build fields
235
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
298,468✔
236
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
242,112✔
237
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
242,112✔
238
    SFieldWithOptions *pSrc = taosArrayGet(pStream->outCols, i);
242,112✔
239

240
    tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
242,112✔
241
    pField->flags = pSrc->flags;
242,112✔
242
    pField->type = pSrc->type;
242,112✔
243
    pField->bytes = pSrc->bytes;
242,112✔
244
    pField->compress = createDefaultColCmprByType(pField->type);
242,112✔
245
    if (IS_DECIMAL_TYPE(pField->type)) {
242,112✔
246
      pField->typeMod = pSrc->typeMod;
×
247
      pField->flags |= COL_HAS_TYPE_MOD;
×
248
    }
249
  }
250

251
  if (NULL == pStream->outTags) {
56,356✔
252
    createReq.numOfTags = 1;
×
253
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
254
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
255

256
    // build tags
257
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
258
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
259

260
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
261
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
262
    pField->flags = 0;
×
263
    pField->bytes = 8;
×
264
  } else {
265
    createReq.numOfTags = taosArrayGetSize(pStream->outTags);
56,356✔
266
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
56,356✔
267
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
56,356✔
268

269
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
144,683✔
270
      SField *pField = taosArrayGet(createReq.pTags, i);
88,327✔
271
      if (pField == NULL) {
88,327✔
272
        continue;
×
273
      }
274

275
      TAOS_FIELD_E *pSrc = taosArrayGet(pStream->outTags, i);
88,327✔
276
      pField->bytes = pSrc->bytes;
88,327✔
277
      pField->flags = 0;
88,327✔
278
      pField->type = pSrc->type;
88,327✔
279
      tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
88,327✔
280
    }
281
  }
282

283
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
56,356✔
284
    goto _OVER;
×
285
  }
286

287
  pStb = mndAcquireStb(pMnode, createReq.name);
56,356✔
288
  if (pStb != NULL) {
56,356✔
289
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
290
    goto _OVER;
×
291
  }
292

293
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
56,356✔
294
  if (pDb == NULL) {
56,356✔
295
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
296
    goto _OVER;
×
297
  }
298

299
  int32_t numOfStbs = -1;
56,356✔
300
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
56,356✔
301
    goto _OVER;
×
302
  }
303

304
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
56,356✔
305
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
306
    goto _OVER;
×
307
  }
308

309
  SStbObj stbObj = {0};
56,356✔
310

311
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
56,356✔
312
    goto _OVER;
×
313
  }
314

315
  stbObj.uid = pStream->outStbUid;
56,356✔
316

317
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
56,356✔
318
    mndFreeStb(&stbObj);
×
319
    goto _OVER;
×
320
  }
321

322
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->outTblName, createReq.numOfColumns);
56,356✔
323

324
  tFreeSMCreateStbReq(&createReq);
56,356✔
325
  mndFreeStb(&stbObj);
56,356✔
326
  mndReleaseStb(pMnode, pStb);
56,356✔
327
  mndReleaseDb(pMnode, pDb);
56,356✔
328
  return code;
56,356✔
329

330
_OVER:
×
331
  tFreeSMCreateStbReq(&createReq);
×
332
  mndReleaseStb(pMnode, pStb);
×
333
  mndReleaseDb(pMnode, pDb);
×
334

335
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->outTblName, lino,
×
336
         tstrerror(code));
337
  return code;
×
338
}
339

340
static int32_t mndStreamValidateCreate(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq* pCreate) {
156,882✔
341
  int32_t code = 0, lino = 0;
156,882✔
342
  int64_t streamId = pCreate->streamId;
156,882✔
343
  char   *pUser = RPC_MSG_USER(pReq);
156,882✔
344

345
  if (pCreate->streamDB) {
156,882✔
346
    // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->streamDB);
347
    code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pCreate->streamDB);
156,882✔
348
    if (code) {
156,882✔
UNCOV
349
      mstsError("user %s failed to create stream %s in db %s since %s", pUser, pCreate->name, pCreate->streamDB,
×
350
                tstrerror(code));
351
    }
352
    TSDB_CHECK_CODE(code, lino, _OVER);
156,882✔
353
  }
354

355
  if (pCreate->triggerDB) {
156,882✔
356
    // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, pCreate->triggerDB);
357
    code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pCreate->triggerDB);
155,954✔
358
    if (code) {
155,954✔
UNCOV
359
      mstsError("user %s failed to create stream %s using trigger db %s since %s", pUser, pCreate->name,
×
360
                pCreate->triggerDB, tstrerror(code));
361
    }
362
    TSDB_CHECK_CODE(code, lino, _OVER);
155,954✔
363
#if 0  // TODO check the owner of trigger table
364
    if (pCreate->triggerTblName) {
365
      // check trigger table privilege
366
      code = mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_TBL_SELECT, "", pCreate->triggerDB, pCreate->triggerTblName);
367
      if (code) {
368
        mstsError("user %s failed to create stream %s using trigger table %s.%s since %s", pUser, pCreate->name,
369
                  pCreate->triggerDB, pCreate->triggerTblName, tstrerror(code));
370
      }
371
      TSDB_CHECK_CODE(code, lino, _OVER);
372
    }
373
#endif
374
  }
375

376
  if (pCreate->calcDB) {
156,882✔
377
    int32_t dbNum = taosArrayGetSize(pCreate->calcDB);
153,805✔
378
    for (int32_t i = 0; i < dbNum; ++i) {
307,610✔
379
      char *calcDB = taosArrayGetP(pCreate->calcDB, i);
153,805✔
380
      // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, calcDB);
381
      code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, calcDB);
153,805✔
382
      if (code) {
153,805✔
UNCOV
383
        mstsError("user %s failed to create stream %s using calcDB %s since %s", pUser, pCreate->name, calcDB,
×
384
                  tstrerror(code));
385
      }
386
      TSDB_CHECK_CODE(code, lino, _OVER);
153,805✔
387
    }
388
  }
389

390
  if (pCreate->outDB) {
156,882✔
391
    // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->outDB);
392
    code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pCreate->outDB);
153,805✔
393
    if (code) {
153,805✔
UNCOV
394
      mstsError("user %s failed to create stream %s using out db %s since %s", pUser, pCreate->name, pCreate->outDB,
×
395
                tstrerror(code));
396
    }
397
    TSDB_CHECK_CODE(code, lino, _OVER);
153,805✔
398
  }
399

400
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
156,882✔
401
  if (streamNum > MND_STREAM_MAX_NUM) {
156,882✔
UNCOV
402
    code = TSDB_CODE_MND_TOO_MANY_STREAMS;
×
UNCOV
403
    mstsError("failed to create stream %s since %s, stream number:%d", pCreate->name, tstrerror(code), streamNum);
×
UNCOV
404
    return code;
×
405
  }
406

407
_OVER:
156,882✔
408

409
  return code;
156,882✔
410
}
411

412
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
604,460✔
413
  SSdb   *pSdb = pMnode->pSdb;
604,460✔
414
  void   *pIter = NULL;
604,460✔
415
  int32_t code = 0;
604,460✔
416

417
  while (1) {
41,501✔
418
    SStreamObj *pStream = NULL;
645,961✔
419
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
645,961✔
420
    if (pIter == NULL) break;
645,961✔
421

422
    if (0 == strcmp(pStream->pCreate->streamDB, pDb->name)) {
41,501✔
423
      mInfo("start to drop stream %s in db %s", pStream->pCreate->name, pDb->name);
16,571✔
424
      
425
      pStream->updateTime = taosGetTimestampMs();
33,142✔
426
      
427
      atomic_store_8(&pStream->userDropped, 1);
16,571✔
428
      
429
      MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
16,571✔
430
      
431
      msmUndeployStream(pMnode, pStream->pCreate->streamId, pStream->pCreate->name);
16,571✔
432
      
433
      // drop stream
434
      code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
16,571✔
435
      if (code) {
16,571✔
436
        mError("drop db trans:%d failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
437
        sdbRelease(pSdb, pStream);
×
UNCOV
438
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
439
        TAOS_RETURN(code);
×
440
      }
441
    }
442

443
    sdbRelease(pSdb, pStream);
41,501✔
444
  }
445

446
  return 0;
604,460✔
447
}
448

449
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
220,842✔
450
  SMnode     *pMnode = pReq->info.node;
220,842✔
451
  SSdb       *pSdb = pMnode->pSdb;
220,842✔
452
  int32_t     numOfRows = 0;
220,842✔
453
  SStreamObj *pStream = NULL;
220,842✔
454
  SUserObj   *pOperUser = NULL;
220,842✔
455
  int32_t     code = 0, lino = 0;
220,842✔
456
  bool        showAll = false;
220,842✔
457

458
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
220,842✔
459
  showAll =
220,842✔
460
      (0 == mndCheckObjPrivilegeRec(pMnode, pOperUser, PRIV_CM_SHOW, PRIV_OBJ_STREAM, 0, pOperUser->acctId, "*", "*"));
220,842✔
461

462
  while (numOfRows < rows) {
705,226✔
463
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
705,226✔
464
    if (pShow->pIter == NULL) break;
705,226✔
465

466
    if (!showAll) {
484,384✔
467
      if ((mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_SHOW, PRIV_OBJ_STREAM, pStream->ownerId,
×
UNCOV
468
                                    pStream->pCreate->streamDB, pStream->pCreate->name))) {
×
UNCOV
469
        sdbRelease(pSdb, pStream);
×
UNCOV
470
        continue;
×
471
      }
472
    }
473

474
    code = mstSetStreamAttrResBlock(pMnode, pStream, pBlock, numOfRows);
484,384✔
475
    if (code == 0) {
484,384✔
476
      numOfRows++;
484,384✔
477
    }
478
    sdbRelease(pSdb, pStream);
484,384✔
479
  }
480
  code = 0;
220,842✔
481
  pShow->numOfRows += numOfRows;
220,842✔
482
_exit:
220,842✔
483
  mndReleaseUser(pMnode, pOperUser);
220,842✔
484
  if (code != 0) {
220,842✔
UNCOV
485
    mError("failed to retrieve stream list at line %d since %s", lino, tstrerror(code));
×
UNCOV
486
    TAOS_RETURN(code);
×
487
  }
488
  return numOfRows;
220,842✔
489
}
490

491
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
UNCOV
492
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
493
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
494
}
×
495

496
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
255,131✔
497
  SMnode     *pMnode = pReq->info.node;
255,131✔
498
  SSdb       *pSdb = pMnode->pSdb;
255,131✔
499
  int32_t     numOfRows = 0;
255,131✔
500
  SStreamObj *pStream = NULL;
255,131✔
501
  int32_t     code = 0;
255,131✔
502

503
  while (numOfRows < rowsCapacity) {
2,552,246✔
504
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
2,468,553✔
505
    if (pShow->pIter == NULL) {
2,468,553✔
506
      break;
171,438✔
507
    }
508

509
    code = mstSetStreamTasksResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
2,297,115✔
510

511
    sdbRelease(pSdb, pStream);
2,297,115✔
512
  }
513

514
  pShow->numOfRows += numOfRows;
255,131✔
515
  return numOfRows;
255,131✔
516
}
517

518
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
UNCOV
519
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
520
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
521
}
×
522

523
static int32_t mndRetrieveStreamRecalculates(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
241✔
524
  SMnode     *pMnode = pReq->info.node;
241✔
525
  SSdb       *pSdb = pMnode->pSdb;
241✔
526
  int32_t     numOfRows = 0;
241✔
527
  SStreamObj *pStream = NULL;
241✔
528
  int32_t     code = 0;
241✔
529

530
  while (numOfRows < rowsCapacity) {
482✔
531
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
482✔
532
    if (pShow->pIter == NULL) {
482✔
533
      break;
241✔
534
    }
535

536
    code = mstSetStreamRecalculatesResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
241✔
537

538
    sdbRelease(pSdb, pStream);
241✔
539
  }
540

541
  pShow->numOfRows += numOfRows;
241✔
542
  return numOfRows;
241✔
543
}
544

545
static void mndCancelGetNextStreamRecalculates(SMnode *pMnode, void *pIter) {
×
UNCOV
546
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
547
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
UNCOV
548
}
×
549

550

551
static bool mndStreamUpdateTagsFlag(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
2,017,884✔
552
  SStreamObj *pStream = pObj;
2,017,884✔
553
  if (atomic_load_8(&pStream->userDropped)) {
2,017,884✔
UNCOV
554
    return true;
×
555
  }
556

557
  if (TSDB_SUPER_TABLE != pStream->pCreate->triggerTblType && 
2,017,884✔
558
      TSDB_CHILD_TABLE != pStream->pCreate->triggerTblType && 
1,162,587✔
559
      TSDB_VIRTUAL_CHILD_TABLE != pStream->pCreate->triggerTblType) {
858,597✔
560
    return true;
116,317✔
561
  }
562

563
  if (pStream->pCreate->triggerTblSuid != *(uint64_t*)p1) {
1,901,567✔
564
    return true;
1,870,714✔
565
  }
566

567
  if (NULL == pStream->pCreate->partitionCols) {
30,853✔
568
    return true;
3,978✔
569
  }
570

571
  SNodeList* pList = NULL;
26,875✔
572
  int32_t code = nodesStringToList(pStream->pCreate->partitionCols, &pList);
26,875✔
573
  if (code) {
26,875✔
UNCOV
574
    nodesDestroyList(pList);
×
UNCOV
575
    mstError("partitionCols [%s] nodesStringToList failed with error:%s", (char*)pStream->pCreate->partitionCols, tstrerror(code));
×
UNCOV
576
    return true;
×
577
  }
578

579
  SSchema* pTags = (SSchema*)p2;
26,875✔
580
  int32_t* tagNum = (int32_t*)p3;
26,875✔
581

582
  SNode* pNode = NULL;
26,875✔
583
  FOREACH(pNode, pList) {
60,484✔
584
    SColumnNode* pCol = (SColumnNode*)pNode;
33,609✔
585
    for (int32_t i = 0; i < *tagNum; ++i) {
65,538✔
586
      if (pCol->colId == pTags[i].colId) {
58,648✔
587
        pTags[i].flags |= COL_REF_BY_STM;
26,719✔
588
        break;
26,719✔
589
      }
590
    }
591
  }
592

593
  nodesDestroyList(pList);
26,875✔
594
  
595
  return true;
26,875✔
596
}
597

598

599
void mndStreamUpdateTagsRefFlag(SMnode *pMnode, int64_t suid, SSchema* pTags, int32_t tagNum) {
14,223,851✔
600
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
14,223,851✔
601
  if (streamNum <= 0) {
14,224,347✔
602
    return;
14,110,367✔
603
  }
604

605
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mndStreamUpdateTagsFlag, &suid, pTags, &tagNum);
113,980✔
606
}
607

608
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) {
2,082✔
609
  SMnode     *pMnode = pReq->info.node;
2,082✔
610
  SStreamObj *pStream = NULL;
2,082✔
611
  SUserObj   *pOperUser = NULL;
2,082✔
612
  int32_t     code = 0;
2,082✔
613

614
  SMPauseStreamReq pauseReq = {0};
2,082✔
615
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
2,082✔
UNCOV
616
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
617
  }
618

619
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
2,082✔
620
  if (pStream == NULL || code != 0) {
2,082✔
621
    if (pauseReq.igNotExists) {
×
UNCOV
622
      mInfo("stream:%s, not exist, not stop stream", pauseReq.name);
×
623
      taosMemoryFree(pauseReq.name);
×
624
      return 0;
×
625
    } else {
UNCOV
626
      mError("stream:%s not exist, failed to stop stream", pauseReq.name);
×
UNCOV
627
      taosMemoryFree(pauseReq.name);
×
UNCOV
628
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
629
    }
630
  }
631

632
  taosMemoryFree(pauseReq.name);
2,082✔
633

634
  int64_t streamId = pStream->pCreate->streamId;
2,082✔
635
  
636
  mstsInfo("start to stop stream %s", pStream->name);
2,082✔
637

638
  // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
639
  if((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) {
2,082✔
UNCOV
640
    mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
641
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
642
    return code;
×
643
  }
644

645
  if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
2,082✔
646
                                        pStream->pCreate->streamDB)) ||
4,164✔
647
      (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_STOP, PRIV_OBJ_STREAM, pStream->ownerId,
2,082✔
648
                                       pStream->pCreate->streamDB, pStream->pCreate->name))) {
2,082✔
649
    mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
650
    mndReleaseUser(pMnode, pOperUser);
×
UNCOV
651
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
652
    return code;
×
653
  }
654

655
  mndReleaseUser(pMnode, pOperUser); // release user after privilege check
2,082✔
656

657
  if (atomic_load_8(&pStream->userDropped)) {
2,082✔
658
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
UNCOV
659
    mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
660
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
661
    return code;
×
662
  }
663

664
  STrans *pTrans = NULL;
2,082✔
665
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, &pTrans);
2,082✔
666
  if (pTrans == NULL || code) {
2,082✔
UNCOV
667
    mstsError("failed to stop stream %s since %s", pStream->name, tstrerror(code));
×
UNCOV
668
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
669
    return code;
×
670
  }
671

672
  pStream->updateTime = taosGetTimestampMs();
4,164✔
673

674
  atomic_store_8(&pStream->userStopped, 1);
2,082✔
675

676
  MND_STREAM_SET_LAST_TS(STM_EVENT_STOP_STREAM, pStream->updateTime);
2,082✔
677

678
  msmUndeployStream(pMnode, streamId, pStream->name);
2,082✔
679

680
  // stop stream
681
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
2,082✔
682
  if (code != TSDB_CODE_SUCCESS) {
2,082✔
UNCOV
683
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
684
    mndTransDrop(pTrans);
×
UNCOV
685
    return code;
×
686
  }
687

688
  code = mndTransPrepare(pMnode, pTrans);
2,082✔
689
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,082✔
690
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
691
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
692
    mndTransDrop(pTrans);
×
UNCOV
693
    return code;
×
694
  }
695

696
  sdbRelease(pMnode->pSdb, pStream);
2,082✔
697
  mndTransDrop(pTrans);
2,082✔
698

699
  return TSDB_CODE_ACTION_IN_PROGRESS;
2,082✔
700
}
701

702

703
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq) {
2,082✔
704
  SMnode     *pMnode = pReq->info.node;
2,082✔
705
  SStreamObj *pStream = NULL;
2,082✔
706
  SUserObj   *pOperUser = NULL;
2,082✔
707
  int32_t     code = 0;
2,082✔
708

709
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
2,082✔
UNCOV
710
    return code;
×
711
  }
712

713
  SMResumeStreamReq resumeReq = {0};
2,082✔
714
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
2,082✔
UNCOV
715
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
716
  }
717

718
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
2,082✔
719
  if (pStream == NULL || code != 0) {
2,082✔
720
    if (resumeReq.igNotExists) {
×
721
      mInfo("stream:%s not exist, not start stream", resumeReq.name);
×
UNCOV
722
      taosMemoryFree(resumeReq.name);
×
723
      sdbRelease(pMnode->pSdb, pStream);
×
724
      return 0;
×
725
    } else {
UNCOV
726
      mError("stream:%s not exist, failed to start stream", resumeReq.name);
×
UNCOV
727
      taosMemoryFree(resumeReq.name);
×
UNCOV
728
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
729
    }
730
  }
731

732
  taosMemoryFree(resumeReq.name);
2,082✔
733

734
  int64_t streamId = pStream->pCreate->streamId;
2,082✔
735

736
  mstsInfo("start to start stream %s from stopped", pStream->name);
2,082✔
737

738
  // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
739
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) {
2,082✔
UNCOV
740
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
741
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
742
    return code;
×
743
  }
744

745
  if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
2,082✔
746
                                        pStream->pCreate->streamDB)) ||
4,164✔
747
      (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_START, PRIV_OBJ_STREAM, pStream->ownerId,
2,082✔
748
                                       pStream->pCreate->streamDB, pStream->pCreate->name))) {
2,082✔
749
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
750
    mndReleaseUser(pMnode, pOperUser);
×
UNCOV
751
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
752
    return code;
×
753
  }
754

755
  mndReleaseUser(pMnode, pOperUser); // release user after privilege check
2,082✔
756

757
  if (atomic_load_8(&pStream->userDropped)) {
2,082✔
758
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
UNCOV
759
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
760
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
761
    return code;
×
762
  }
763

764
  if (0 == atomic_load_8(&pStream->userStopped)) {
2,082✔
765
    code = TSDB_CODE_MND_STREAM_NOT_STOPPED;
×
UNCOV
766
    mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
767
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
768
    return code;
×
769
  }
770
  
771
  atomic_store_8(&pStream->userStopped, 0);
2,082✔
772

773
  pStream->updateTime = taosGetTimestampMs();
4,164✔
774

775
  MND_STREAM_SET_LAST_TS(STM_EVENT_START_STREAM, pStream->updateTime);
2,082✔
776

777
  STrans *pTrans = NULL;
2,082✔
778
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_START_NAME, &pTrans);
2,082✔
779
  if (pTrans == NULL || code) {
2,082✔
UNCOV
780
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
UNCOV
781
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
782
    return code;
×
783
  }
784

785
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
2,082✔
786
  if (code != TSDB_CODE_SUCCESS) {
2,082✔
787
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
UNCOV
788
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
789
    mndTransDrop(pTrans);
×
UNCOV
790
    return code;
×
791
  }
792

793
  code = mndTransPrepare(pMnode, pTrans);
2,082✔
794
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2,082✔
795
    mstsError("trans:%d, failed to prepare start stream %s trans since %s", pTrans->id, pStream->name, tstrerror(code));
×
UNCOV
796
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
797
    mndTransDrop(pTrans);
×
UNCOV
798
    return code;
×
799
  }
800

801
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->name, NULL, true, STREAM_ACT_DEPLOY);
2,082✔
802

803
  sdbRelease(pMnode->pSdb, pStream);
2,082✔
804
  mndTransDrop(pTrans);
2,082✔
805

806
  return TSDB_CODE_ACTION_IN_PROGRESS;
2,082✔
807
}
808

809
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
6,277✔
810
  SMnode     *pMnode = pReq->info.node;
6,277✔
811
  SStreamObj *pStream = NULL;
6,277✔
812
  SUserObj   *pOperUser = NULL;
6,277✔
813
  int32_t     code = 0;
6,277✔
814
  int32_t     notExistNum = 0;
6,277✔
815

816
  SMDropStreamReq dropReq = {0};
6,277✔
817
  int64_t         tss = taosGetTimestampMs();
6,277✔
818
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
6,277✔
UNCOV
819
    mError("invalid drop stream msg recv, discarded");
×
UNCOV
820
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
821
    TAOS_RETURN(code);
×
822
  }
823

824
  mDebug("recv drop stream msg, count:%d", dropReq.count);
6,277✔
825

826
  // Acquire user object for privilege check
827
  code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser);
6,277✔
828
  if (code != 0) {
6,277✔
UNCOV
829
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
830
    TAOS_RETURN(code);
×
831
  }
832

833
  // check if all streams exist
834
  if (!dropReq.igNotExists) {
6,277✔
835
    for (int32_t i = 0; i < dropReq.count; i++) {
9,534✔
836
      if (!sdbCheckExists(pMnode->pSdb, SDB_STREAM, dropReq.name[i])) {
5,352✔
837
        mError("stream:%s not exist failed to drop it", dropReq.name[i]);
936✔
838
        mndReleaseUser(pMnode, pOperUser);
936✔
839
        tFreeMDropStreamReq(&dropReq);
936✔
840
        TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
936✔
841
      }
842
    }
843
  }
844

845
  // Create a single transaction for all stream drops
846
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_DROP_NAME);
5,341✔
847
  if (pTrans == NULL) {
5,341✔
848
    mError("failed to create drop stream transaction since %s", tstrerror(terrno));
×
849
    code = terrno;
×
UNCOV
850
    mndReleaseUser(pMnode, pOperUser);
×
UNCOV
851
    tFreeMDropStreamReq(&dropReq);
×
UNCOV
852
    TAOS_RETURN(code);
×
853
  }
854
  pTrans->ableToBeKilled = true;
5,341✔
855

856
  // Process all streams and add them to the transaction
857
  for (int32_t i = 0; i < dropReq.count; i++) {
12,086✔
858
    char *streamName = dropReq.name[i];
6,745✔
859
    mDebug("drop stream[%d/%d]: %s", i + 1, dropReq.count, streamName);
6,745✔
860

861
    code = mndAcquireStream(pMnode, streamName, &pStream);
6,745✔
862
    if (pStream == NULL || code != 0) {
6,745✔
863
      mWarn("stream:%s not exist, ignore not exist is set, drop stream exec done with success", streamName);
936✔
864
      sdbRelease(pMnode->pSdb, pStream);
936✔
865
      pStream = NULL;
936✔
866
      notExistNum++;
936✔
867
      continue;
936✔
868
    }
869

870
    int64_t streamId = pStream->pCreate->streamId;
5,809✔
871

872
    if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
5,809✔
873
                                          pStream->pCreate->streamDB)) ||
11,618✔
874
        (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_DROP, PRIV_OBJ_STREAM, pStream->ownerId,
5,809✔
875
                                         pStream->pCreate->streamDB, pStream->pCreate->name))) {
5,809✔
876
      mstsError("user %s failed to drop stream %s since %s", pReq->info.conn.user, streamName, tstrerror(code));
×
877
      sdbRelease(pMnode->pSdb, pStream);
×
878
      pStream = NULL;
×
UNCOV
879
      mndTransDrop(pTrans);
×
UNCOV
880
      pTrans = NULL;
×
UNCOV
881
      goto _OVER;
×
882
    }
883

884
    if (pStream->pCreate->tsmaId != 0) {
5,809✔
885
      mstsDebug("try to drop tsma related stream, tsmaId:%" PRIx64, pStream->pCreate->tsmaId);
×
886

887
      void    *pIter = NULL;
×
888
      SSmaObj *pSma = NULL;
×
889
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
890
      while (pIter) {
×
891
        if (pSma && pSma->uid == pStream->pCreate->tsmaId) {
×
UNCOV
892
          sdbRelease(pMnode->pSdb, pSma);
×
893
          sdbRelease(pMnode->pSdb, pStream);
×
894
          pStream = NULL;
×
895

896
          sdbCancelFetch(pMnode->pSdb, pIter);
×
897
          code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
898

899
          mstsError("refused to drop tsma-related stream %s since tsma still exists", streamName);
×
UNCOV
900
          mndTransDrop(pTrans);
×
UNCOV
901
          pTrans = NULL;
×
902
          goto _OVER;
×
903
        }
904

UNCOV
905
        if (pSma) {
×
906
          sdbRelease(pMnode->pSdb, pSma);
×
907
        }
908

UNCOV
909
        pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
910
      }
911
    }
912

913
    mstsInfo("start to drop stream %s", pStream->pCreate->name);
5,809✔
914

915
    pStream->updateTime = taosGetTimestampMs();
11,618✔
916

917
    atomic_store_8(&pStream->userDropped, 1);
5,809✔
918

919
    MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
5,809✔
920

921
    msmUndeployStream(pMnode, streamId, pStream->pCreate->name);
5,809✔
922

923
    // Append drop stream operation to the transaction
924
    code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
5,809✔
925
    if (code) {
5,809✔
UNCOV
926
      mstsError("trans:%d, failed to append drop stream %s trans since %s", pTrans->id, streamName, tstrerror(code));
×
927
      sdbRelease(pMnode->pSdb, pStream);
×
928
      pStream = NULL;
×
929
      // mndStreamTransAppend already called mndTransDrop on failure, set pTrans to NULL to avoid double free
UNCOV
930
      pTrans = NULL;
×
UNCOV
931
      goto _OVER;
×
932
    }
933

934
    sdbRelease(pMnode->pSdb, pStream);
5,809✔
935
    pStream = NULL;
5,809✔
936

937
    mstsDebug("drop stream %s added to transaction", streamName);
5,809✔
938
  }
939

940
  // Prepare and execute the transaction for all streams
941
  if (notExistNum < dropReq.count) {
5,341✔
942
    code = mndTransPrepare(pMnode, pTrans);
5,107✔
943
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
5,107✔
UNCOV
944
      mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
945
      mndTransDrop(pTrans);
×
UNCOV
946
      goto _OVER;
×
947
    }
948
    mInfo("trans:%d, drop stream transaction prepared for %d streams", pTrans->id, dropReq.count - notExistNum);
5,107✔
949
  } else {
950
    // All streams don't exist, no need to prepare transaction
951
    mndTransDrop(pTrans);
234✔
952
    pTrans = NULL;
234✔
953
  }
954

955
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE && notExistNum < dropReq.count) {
5,341✔
956
    int64_t tse = taosGetTimestampMs();
5,107✔
957
    double  duration = (double)(tse - tss);
5,107✔
958
    duration = duration / 1000;
5,107✔
959
    // Use first stream's database for audit (assuming all streams are from same db in batch)
960
    if (dropReq.count > 0) {
5,107✔
961
      SStreamObj *pFirstStream = NULL;
5,107✔
962
      if (mndAcquireStream(pMnode, dropReq.name[0], &pFirstStream) == 0 && pFirstStream != NULL) {
5,107✔
UNCOV
963
        auditRecord(pReq, pMnode->clusterId, "dropStream", "", pFirstStream->pCreate->streamDB, NULL, 0, duration, 0);
×
UNCOV
964
        sdbRelease(pMnode->pSdb, pFirstStream);
×
965
      }
966
    }
967
  }
968

969
  // If any stream was successfully added to transaction, return ACTION_IN_PROGRESS
970
  // Otherwise, all streams don't exist (and igNotExists is set), return SUCCESS
971
  code = (notExistNum < dropReq.count) ? TSDB_CODE_ACTION_IN_PROGRESS : TSDB_CODE_SUCCESS;
5,341✔
972

973
_OVER:
5,341✔
974
  if (pStream) {
5,341✔
UNCOV
975
    sdbRelease(pMnode->pSdb, pStream);
×
976
  }
977
  if (pTrans) {
5,341✔
978
    mndTransDrop(pTrans);
5,107✔
979
  }
980
  mndReleaseUser(pMnode, pOperUser);
5,341✔
981
  tFreeMDropStreamReq(&dropReq);
5,341✔
982
  TAOS_RETURN(code);
5,341✔
983
}
984

985
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
163,952✔
986
  SMnode     *pMnode = pReq->info.node;
163,952✔
987
  SStreamObj *pStream = NULL;
163,952✔
988
  SStreamObj  streamObj = {0};
163,952✔
989
  SUserObj    *pOperUser = NULL;
163,952✔
990
  int32_t     code = TSDB_CODE_SUCCESS;
163,952✔
991
  int32_t     lino = 0;
163,952✔
992
  STrans     *pTrans = NULL;
163,952✔
993
  uint64_t    streamId = 0;
163,952✔
994
  SCMCreateStreamReq* pCreate = NULL;
163,952✔
995
  int64_t             tss = taosGetTimestampMs();
163,952✔
996

997
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
163,952✔
UNCOV
998
    goto _OVER;
×
999
  }
1000
  
1001
#ifdef WINDOWS
1002
  code = TSDB_CODE_MND_INVALID_PLATFORM;
1003
  goto _OVER;
1004
#endif
1005

1006
  pCreate = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
163,952✔
1007
  TSDB_CHECK_NULL(pCreate, code, lino, _OVER, terrno);
163,952✔
1008
  
1009
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, pCreate);
163,952✔
1010
  TSDB_CHECK_CODE(code, lino, _OVER);
163,952✔
1011

1012
  streamId = pCreate->streamId;
163,952✔
1013

1014
  mstsInfo("start to create stream %s, sql:%s", pCreate->name, pCreate->sql);
163,952✔
1015

1016
  int32_t snodeId = msmAssignRandomSnodeId(pMnode, streamId);
163,952✔
1017
  if (!GOT_SNODE(snodeId)) {
163,952✔
1018
    code = terrno;
2,370✔
1019
    TSDB_CHECK_CODE(code, lino, _OVER);
2,370✔
1020
  }
1021
  
1022
  code = mndAcquireStream(pMnode, pCreate->name, &pStream);
161,582✔
1023
  if (pStream != NULL && code == 0) {
161,582✔
1024
    if (pCreate->igExists) {
4,700✔
UNCOV
1025
      mstsInfo("stream %s already exist, ignore exist is set", pCreate->name);
×
1026
    } else {
1027
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
4,700✔
1028
    }
1029

1030
    mndReleaseStream(pMnode, pStream);
4,700✔
1031
    goto _OVER;
4,700✔
1032
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
156,882✔
UNCOV
1033
    goto _OVER;
×
1034
  }
1035

1036
  code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser);
156,882✔
1037
  if (pOperUser == NULL) {
156,882✔
UNCOV
1038
    TSDB_CHECK_CODE(TSDB_CODE_MND_NO_USER_FROM_CONN, lino, _OVER);
×
1039
  }
1040

1041
  code = mndStreamValidateCreate(pMnode, pReq, pCreate);
156,882✔
1042
  TSDB_CHECK_CODE(code, lino, _OVER);
156,882✔
1043

1044
  mndStreamBuildObj(pMnode, &streamObj, pCreate, pOperUser, snodeId);
156,882✔
1045
  pCreate = NULL;
156,882✔
1046

1047
  pStream = &streamObj;
156,882✔
1048

1049
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, &pTrans);
156,882✔
1050
  if (pTrans == NULL || code) {
156,882✔
UNCOV
1051
    goto _OVER;
×
1052
  }
1053

1054
  // create stb for stream
1055
  if (TSDB_SUPER_TABLE == pStream->pCreate->outTblType && !pStream->pCreate->outStbExists) {
156,882✔
1056
    pStream->pCreate->outStbUid = mndGenerateUid(pStream->pCreate->outTblName, strlen(pStream->pCreate->outTblName));
56,356✔
1057
    code = mndStreamCreateOutStb(pMnode, pTrans, pStream->pCreate, RPC_MSG_USER(pReq));
56,356✔
1058
    TSDB_CHECK_CODE(code, lino, _OVER);
56,356✔
1059
  }
1060

1061
  // add stream to trans
1062
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
156,882✔
1063
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
156,882✔
UNCOV
1064
    mstsError("failed to persist stream %s since %s", pStream->pCreate->name, tstrerror(code));
×
UNCOV
1065
    goto _OVER;
×
1066
  }
1067

1068
  // execute creation
1069
  code = mndTransPrepare(pMnode, pTrans);
156,882✔
1070
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
156,882✔
UNCOV
1071
    mstsError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
UNCOV
1072
    goto _OVER;
×
1073
  }
1074
  code = TSDB_CODE_ACTION_IN_PROGRESS;
156,882✔
1075

1076
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
156,882✔
1077
    int64_t tse = taosGetTimestampMs();
156,882✔
1078
    double  duration = (double)(tse - tss);
156,882✔
1079
    duration = duration / 1000;
156,882✔
1080
    auditRecord(pReq, pMnode->clusterId, "createStream", pStream->pCreate->streamDB, pStream->pCreate->name,
156,882✔
1081
                pStream->pCreate->sql, strlen(pStream->pCreate->sql), duration, 0);
156,882✔
1082
  }
1083

1084
  MND_STREAM_SET_LAST_TS(STM_EVENT_CREATE_STREAM, taosGetTimestampMs());
288,966✔
1085

1086
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, true, STREAM_ACT_DEPLOY);
156,882✔
1087

1088
_OVER:
163,952✔
1089

1090
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
163,952✔
1091
    if (pStream && pStream->pCreate) {
7,070✔
1092
      mstsError("failed to create stream %s at line:%d since %s", pStream->pCreate->name, lino, tstrerror(code));
4,700✔
1093
    } else {
1094
      mstsError("failed to create stream at line:%d since %s", lino, tstrerror(code));
2,370✔
1095
    }
1096
  } else {
1097
    mstsDebug("create stream %s half completed", pStream->pCreate ? pStream->pCreate->name : "unknown");
156,882✔
1098
  }
1099

1100
  tFreeSCMCreateStreamReq(pCreate);
163,952✔
1101
  taosMemoryFreeClear(pCreate);
163,952✔
1102

1103
  mndTransDrop(pTrans);
163,952✔
1104
  tFreeStreamObj(&streamObj);
163,952✔
1105
  mndReleaseUser(pMnode, pOperUser);
163,952✔
1106

1107
  return code;
163,952✔
1108
}
1109

1110
static int32_t mndProcessRecalcStreamReq(SRpcMsg *pReq) {
10,441✔
1111
  SMnode     *pMnode = pReq->info.node;
10,441✔
1112
  SStreamObj *pStream = NULL;
10,441✔
1113
  SUserObj   *pOperUser = NULL;
10,441✔
1114
  int32_t     code = 0;
10,441✔
1115
  int64_t     tss = taosGetTimestampMs();
10,441✔
1116

1117
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
10,441✔
UNCOV
1118
    return code;
×
1119
  }
1120

1121
  SMRecalcStreamReq recalcReq = {0};
10,441✔
1122
  if (tDeserializeSMRecalcStreamReq(pReq->pCont, pReq->contLen, &recalcReq) < 0) {
10,441✔
UNCOV
1123
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1124
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
1125
  }
1126

1127
  code = mndAcquireStream(pMnode, recalcReq.name, &pStream);
10,441✔
1128
  if (pStream == NULL || code != 0) {
10,441✔
UNCOV
1129
    mError("stream:%s not exist, failed to recalc stream", recalcReq.name);
×
UNCOV
1130
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1131
    TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
1132
  }
1133

1134
  int64_t streamId = pStream->pCreate->streamId;
10,441✔
1135
  
1136
  mstsInfo("start to recalc stream %s", recalcReq.name);
10,441✔
1137

1138
  // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
1139
  // if (code != TSDB_CODE_SUCCESS) {
1140
  //   mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code));
1141
  //   sdbRelease(pMnode->pSdb, pStream);
1142
  //   tFreeMRecalcStreamReq(&recalcReq);
1143
  //   return code;
1144
  // }
1145

1146
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) {
10,441✔
1147
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
×
UNCOV
1148
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1149
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1150
    TAOS_RETURN(code);
×
1151
  }
1152

1153
  if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB,
10,441✔
1154
                                        pStream->pCreate->streamDB)) ||
13,742✔
1155
      (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_RECALC, PRIV_OBJ_STREAM, pStream->ownerId,
3,301✔
1156
                                       pStream->pCreate->streamDB, pStream->pCreate->name))) {
3,301✔
1157
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code));
7,140✔
1158
    mndReleaseUser(pMnode, pOperUser);
7,140✔
1159
    sdbRelease(pMnode->pSdb, pStream);
7,140✔
1160
    tFreeMRecalcStreamReq(&recalcReq);
7,140✔
1161
    return code;
7,140✔
1162
  }
1163

1164
  mndReleaseUser(pMnode, pOperUser); // release user after privilege check
3,301✔
1165

1166
  if (atomic_load_8(&pStream->userDropped)) {
3,301✔
1167
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
1168
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code));
×
UNCOV
1169
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1170
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1171
    return code;
×
1172
  }
1173

1174
  if (atomic_load_8(&pStream->userStopped)) {
3,301✔
1175
    code = TSDB_CODE_MND_STREAM_STOPPED;
×
1176
    mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code));
×
UNCOV
1177
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1178
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1179
    return code;
×
1180
  }
1181

1182
  if (WINDOW_TYPE_PERIOD == pStream->pCreate->triggerType) {
3,301✔
1183
    code = TSDB_CODE_OPS_NOT_SUPPORT;
185✔
1184
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
185✔
1185
    sdbRelease(pMnode->pSdb, pStream);
185✔
1186
    tFreeMRecalcStreamReq(&recalcReq);
185✔
1187
    return code;
185✔
1188
  }
1189

1190
  /*
1191
  pStream->updateTime = taosGetTimestampMs();
1192

1193
  STrans *pTrans = NULL;
1194
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RECALC_NAME, &pTrans);
1195
  if (pTrans == NULL || code) {
1196
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
1197
    sdbRelease(pMnode->pSdb, pStream);
1198
    return code;
1199
  }
1200

1201
  // stop stream
1202
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
1203
  if (code != TSDB_CODE_SUCCESS) {
1204
    sdbRelease(pMnode->pSdb, pStream);
1205
    mndTransDrop(pTrans);
1206
    return code;
1207
  }
1208

1209
  code = mndTransPrepare(pMnode, pTrans);
1210
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1211
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
1212
    sdbRelease(pMnode->pSdb, pStream);
1213
    mndTransDrop(pTrans);
1214
    return code;
1215
  }
1216
*/
1217

1218
  code = msmRecalcStream(pMnode, pStream->pCreate->streamId, &recalcReq.timeRange);
3,116✔
1219
  if (code != TSDB_CODE_SUCCESS) {
3,116✔
UNCOV
1220
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1221
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1222
    return code;
×
1223
  }
1224

1225
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE){
3,116✔
1226
    char buf[128];
3,116✔
1227
    snprintf(buf, sizeof(buf), "start:%" PRId64 ", end:%" PRId64, recalcReq.timeRange.skey, recalcReq.timeRange.ekey);
3,116✔
1228
    int64_t tse = taosGetTimestampMs();
3,116✔
1229
    double  duration = (double)(tse - tss);
3,116✔
1230
    duration = duration / 1000;
3,116✔
1231
    auditRecord(pReq, pMnode->clusterId, "recalcStream", pStream->name, recalcReq.name, buf, strlen(buf), duration, 0);
3,116✔
1232
  }  
1233

1234
  sdbRelease(pMnode->pSdb, pStream);
3,116✔
1235
  tFreeMRecalcStreamReq(&recalcReq);
3,116✔
1236
//  mndTransDrop(pTrans);
1237

1238
  return TSDB_CODE_SUCCESS;
3,116✔
1239
}
1240

1241

1242
int32_t mndInitStream(SMnode *pMnode) {
399,634✔
1243
  SSdbTable table = {
399,634✔
1244
      .sdbType = SDB_STREAM,
1245
      .keyType = SDB_KEY_BINARY,
1246
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
1247
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
1248
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
1249
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
1250
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
1251
  };
1252

1253
  if (!tsDisableStream) {
399,634✔
1254
    mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
399,634✔
1255
    mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
399,634✔
1256
    mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessStartStreamReq);
399,634✔
1257
    mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessStopStreamReq);
399,634✔
1258
    mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);  
399,634✔
1259
    mndSetMsgHandle(pMnode, TDMT_MND_RECALC_STREAM, mndProcessRecalcStreamReq);
399,634✔
1260
  }
1261
  
1262
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
399,634✔
1263
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
399,634✔
1264
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
399,634✔
1265
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
399,634✔
1266
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndRetrieveStreamRecalculates);
399,634✔
1267
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndCancelGetNextStreamRecalculates);
399,634✔
1268

1269
  int32_t code = sdbSetTable(pMnode->pSdb, table);
399,634✔
1270
  if (code) {
399,634✔
UNCOV
1271
    return code;
×
1272
  }
1273

1274
  //code = sdbSetTable(pMnode->pSdb, tableSeq);
1275
  return code;
399,634✔
1276
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc