• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4747

21 Sep 2025 11:53PM UTC coverage: 58.002% (-1.1%) from 59.065%
#4747

push

travis-ci

web-flow
fix: refine python taos error log matching in checkAsan.sh (#33029)

* fix: refine python taos error log matching in checkAsan.sh

* fix: improve python taos error log matching in checkAsan.sh

133398 of 293157 branches covered (45.5%)

Branch coverage included in aggregate %.

201778 of 284713 relevant lines covered (70.87%)

5539418.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.02
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndTrans.h"
23
#include "osMemory.h"
24
#include "parser.h"
25
#include "taoserror.h"
26
#include "tmisce.h"
27
#include "tname.h"
28

29
#define MND_STREAM_MAX_NUM 100000
30

31
typedef struct {
32
  int8_t placeHolder;  // // to fix windows compile error, define place holder
33
} SMStreamNodeCheckMsg;
34

35
static int32_t  mndNodeCheckSentinel = 0;
36
SStmRuntime  mStreamMgmt = {0};
37

38
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
39
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
41
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
42

43
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
44
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
50
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq);
51
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq);
52

53
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
54

55
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
56
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
57
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
58
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
59
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
60
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
61

62
void mndCleanupStream(SMnode *pMnode) {
1,927✔
63
  mDebug("try to clean up stream");
1,927✔
64
  
65
  msmHandleBecomeNotLeader(pMnode);
1,927✔
66
  
67
  mDebug("mnd stream runtime info cleanup");
1,927✔
68
}
1,927✔
69

70
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
381✔
71
  int32_t     code = 0;
381✔
72
  int32_t     lino = 0;
381✔
73
  SSdbRow    *pRow = NULL;
381✔
74
  SStreamObj *pStream = NULL;
381✔
75
  void       *buf = NULL;
381✔
76
  int8_t      sver = 0;
381✔
77
  int32_t     tlen;
78
  int32_t     dataPos = 0;
381✔
79

80
  code = sdbGetRawSoftVer(pRaw, &sver);
381✔
81
  TSDB_CHECK_CODE(code, lino, _over);
381!
82

83
  if (sver != MND_STREAM_VER_NUMBER) {
381!
84
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
85
    goto _over;
×
86
  }
87

88
  pRow = sdbAllocRow(sizeof(SStreamObj));
381✔
89
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
381!
90

91
  pStream = sdbGetRowObj(pRow);
381✔
92
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
381!
93

94
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
381!
95

96
  buf = taosMemoryMalloc(tlen + 1);
381!
97
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
381!
98

99
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
381!
100

101
  SDecoder decoder;
102
  tDecoderInit(&decoder, buf, tlen + 1);
381✔
103
  code = tDecodeSStreamObj(&decoder, pStream, sver);
381✔
104
  tDecoderClear(&decoder);
381✔
105

106
  if (code < 0) {
381!
107
    tFreeStreamObj(pStream);
×
108
  }
109

110
_over:
381✔
111
  taosMemoryFreeClear(buf);
381!
112

113
  if (code != TSDB_CODE_SUCCESS) {
381!
114
    char *p = (pStream == NULL || NULL == pStream->pCreate) ? "null" : pStream->pCreate->name;
×
115
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
116
    taosMemoryFreeClear(pRow);
×
117

118
    terrno = code;
×
119
    return NULL;
×
120
  } else {
121
    mTrace("stream:%s, decode from raw:%p, row:%p", pStream->pCreate->name, pRaw, pStream);
381✔
122

123
    terrno = 0;
381✔
124
    return pRow;
381✔
125
  }
126
}
127

128
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
334✔
129
  mTrace("stream:%s, perform insert action", pStream->pCreate->name);
334✔
130
  return 0;
334✔
131
}
132

133
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
381✔
134
  mInfo("stream:%s, perform delete action", pStream->pCreate->name);
381!
135
  tFreeStreamObj(pStream);
381✔
136
  return 0;
381✔
137
}
138

139
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
27✔
140
  mTrace("stream:%s, perform update action", pOldStream->pCreate->name);
27!
141

142
  atomic_store_32(&pOldStream->mainSnodeId, pNewStream->mainSnodeId);
27✔
143
  atomic_store_8(&pOldStream->userStopped, atomic_load_8(&pNewStream->userStopped));
27✔
144
  pOldStream->updateTime = pNewStream->updateTime;
27✔
145
  
146
  return 0;
27✔
147
}
148

149
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
864✔
150
  int32_t code = 0;
864✔
151
  SSdb   *pSdb = pMnode->pSdb;
864✔
152
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
864✔
153
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
864!
154
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
372✔
155
  }
156
  return code;
864✔
157
}
158

159
static bool mndStreamGetNameFromId(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
160
  SStreamObj* pStream = pObj;
×
161

162
  if (pStream->pCreate->streamId == *(int64_t*)p1) {
×
163
    strncpy((char*)p2, pStream->name, TSDB_STREAM_NAME_LEN);
×
164
    return false;
×
165
  }
166

167
  return true;
×
168
}
169

170
int32_t mndAcquireStreamById(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
×
171
  int32_t code = 0;
×
172
  SSdb   *pSdb = pMnode->pSdb;
×
173
  char streamName[TSDB_STREAM_NAME_LEN];
174
  streamName[0] = 0;
×
175
  
176
  sdbTraverse(pSdb, SDB_STREAM, mndStreamGetNameFromId, &streamId, streamName, NULL);
×
177
  if (streamName[0]) {
×
178
    (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
179
    if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
180
      code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
181
    }
182
  }
183
  
184
  return code;
×
185
}
186

187
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
400✔
188
  SSdb *pSdb = pMnode->pSdb;
400✔
189
  sdbRelease(pSdb, pStream);
400✔
190
}
400✔
191

192
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
193
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
194
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
195
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
196
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
197

198
static void mndStreamBuildObj(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate, int32_t snodeId) {
334✔
199
  int32_t     code = 0;
334✔
200

201
  pObj->pCreate = pCreate;
334✔
202
  strncpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
334✔
203
  pObj->mainSnodeId = snodeId;
334✔
204
  
205
  pObj->userDropped = 0;
334✔
206
  pObj->userStopped = 0;
334✔
207
  
208
  pObj->createTime = taosGetTimestampMs();
334✔
209
  pObj->updateTime = pObj->createTime;
334✔
210

211
  mstLogSStreamObj("create stream", pObj);
334✔
212
}
334✔
213

214
static int32_t mndStreamCreateOutStb(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream, const char *user) {
155✔
215
  SStbObj *pStb = NULL;
155✔
216
  SDbObj  *pDb = NULL;
155✔
217
  int32_t  code = 0;
155✔
218
  int32_t  lino = 0;
155✔
219

220
  SMCreateStbReq createReq = {0};
155✔
221
  TAOS_STRNCAT(createReq.name, pStream->outDB, TSDB_DB_FNAME_LEN);
155✔
222
  TAOS_STRNCAT(createReq.name, ".", 2);
155✔
223
  TAOS_STRNCAT(createReq.name,  pStream->outTblName, TSDB_TABLE_NAME_LEN);
155✔
224
  createReq.numOfColumns = taosArrayGetSize(pStream->outCols);
155✔
225
  createReq.numOfTags = pStream->outTags ? taosArrayGetSize(pStream->outTags) : 1;
155!
226
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
155✔
227
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
155!
228

229
  // build fields
230
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
787✔
231
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
632✔
232
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
632!
233
    SFieldWithOptions *pSrc = taosArrayGet(pStream->outCols, i);
632✔
234

235
    tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
632✔
236
    pField->flags = pSrc->flags;
632✔
237
    pField->type = pSrc->type;
632✔
238
    pField->bytes = pSrc->bytes;
632✔
239
    pField->compress = createDefaultColCmprByType(pField->type);
632✔
240
    if (IS_DECIMAL_TYPE(pField->type)) {
632!
241
      pField->typeMod = pSrc->typeMod;
×
242
      pField->flags |= COL_HAS_TYPE_MOD;
×
243
    }
244
  }
245

246
  if (NULL == pStream->outTags) {
155!
247
    createReq.numOfTags = 1;
×
248
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
249
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
250

251
    // build tags
252
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
253
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
254

255
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
256
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
257
    pField->flags = 0;
×
258
    pField->bytes = 8;
×
259
  } else {
260
    createReq.numOfTags = taosArrayGetSize(pStream->outTags);
155✔
261
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
155✔
262
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
155!
263

264
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
424✔
265
      SField *pField = taosArrayGet(createReq.pTags, i);
269✔
266
      if (pField == NULL) {
269!
267
        continue;
×
268
      }
269

270
      TAOS_FIELD_E *pSrc = taosArrayGet(pStream->outTags, i);
269✔
271
      pField->bytes = pSrc->bytes;
269✔
272
      pField->flags = 0;
269✔
273
      pField->type = pSrc->type;
269✔
274
      tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
269✔
275
    }
276
  }
277

278
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
155!
279
    goto _OVER;
×
280
  }
281

282
  pStb = mndAcquireStb(pMnode, createReq.name);
155✔
283
  if (pStb != NULL) {
155!
284
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
285
    goto _OVER;
×
286
  }
287

288
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
155✔
289
  if (pDb == NULL) {
155!
290
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
291
    goto _OVER;
×
292
  }
293

294
  int32_t numOfStbs = -1;
155✔
295
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
155!
296
    goto _OVER;
×
297
  }
298

299
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
155!
300
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
301
    goto _OVER;
×
302
  }
303

304
  SStbObj stbObj = {0};
155✔
305

306
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
155!
307
    goto _OVER;
×
308
  }
309

310
  stbObj.uid = pStream->outStbUid;
155✔
311

312
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
155!
313
    mndFreeStb(&stbObj);
×
314
    goto _OVER;
×
315
  }
316

317
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->outTblName, createReq.numOfColumns);
155✔
318

319
  tFreeSMCreateStbReq(&createReq);
155✔
320
  mndFreeStb(&stbObj);
155✔
321
  mndReleaseStb(pMnode, pStb);
155✔
322
  mndReleaseDb(pMnode, pDb);
155✔
323
  return code;
155✔
324

325
_OVER:
×
326
  tFreeSMCreateStbReq(&createReq);
×
327
  mndReleaseStb(pMnode, pStb);
×
328
  mndReleaseDb(pMnode, pDb);
×
329

330
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->outTblName, lino,
×
331
         tstrerror(code));
332
  return code;
×
333
}
334

335
static int32_t mndStreamValidateCreate(SMnode *pMnode, char* pUser, SCMCreateStreamReq* pCreate) {
371✔
336
  int32_t code = 0, lino = 0;
371✔
337
  int64_t streamId = pCreate->streamId;
371✔
338

339
  if (pCreate->streamDB) {
371!
340
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->streamDB);
371✔
341
    if (code) {
371✔
342
      mstsError("user %s failed to create stream %s in db %s since %s", pUser, pCreate->name, pCreate->streamDB, tstrerror(code));
23!
343
    }
344
    TSDB_CHECK_CODE(code, lino, _OVER);
371✔
345
  }
346

347
  if (pCreate->triggerDB) {
348✔
348
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, pCreate->triggerDB);
346✔
349
    if (code) {
346✔
350
      mstsError("user %s failed to create stream %s using trigger db %s since %s", pUser, pCreate->name, pCreate->triggerDB, tstrerror(code));
11!
351
    }
352
    TSDB_CHECK_CODE(code, lino, _OVER);
346✔
353
  }
354

355
  if (pCreate->calcDB) {
337!
356
    int32_t dbNum = taosArrayGetSize(pCreate->calcDB);
337✔
357
    for (int32_t i = 0; i < dbNum; ++i) {
667✔
358
      char* calcDB = taosArrayGetP(pCreate->calcDB, i);
332✔
359
      code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, calcDB);
332✔
360
      if (code) {
332✔
361
        mstsError("user %s failed to create stream %s using calcDB %s since %s", pUser, pCreate->name, calcDB, tstrerror(code));
2!
362
      }
363
      TSDB_CHECK_CODE(code, lino, _OVER);
332✔
364
    }
365
  }
366

367
  if (pCreate->outDB) {
335✔
368
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->outDB);
330✔
369
    if (code) {
330✔
370
      mstsError("user %s failed to create stream %s using out db %s since %s", pUser, pCreate->name, pCreate->outDB, tstrerror(code));
1!
371
    }
372
    TSDB_CHECK_CODE(code, lino, _OVER);
330✔
373
  }
374

375
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
334✔
376
  if (streamNum > MND_STREAM_MAX_NUM) {
334!
377
    code = TSDB_CODE_MND_TOO_MANY_STREAMS;
×
378
    mstsError("failed to create stream %s since %s, stream number:%d", pCreate->name, tstrerror(code), streamNum);
×
379
    return code;
×
380
  }
381

382
_OVER:
334✔
383

384
  return code;
371✔
385
}
386

387
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,887✔
388
  SSdb   *pSdb = pMnode->pSdb;
1,887✔
389
  void   *pIter = NULL;
1,887✔
390
  int32_t code = 0;
1,887✔
391

392
  while (1) {
23✔
393
    SStreamObj *pStream = NULL;
1,910✔
394
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
1,910✔
395
    if (pIter == NULL) break;
1,910✔
396

397
    if (0 == strcmp(pStream->pCreate->streamDB, pDb->name)) {
23✔
398
      mInfo("start to drop stream %s in db %s", pStream->pCreate->name, pDb->name);
1!
399
      
400
      pStream->updateTime = taosGetTimestampMs();
1✔
401
      
402
      atomic_store_8(&pStream->userDropped, 1);
1✔
403
      
404
      MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
1!
405
      
406
      msmUndeployStream(pMnode, pStream->pCreate->streamId, pStream->pCreate->name);
1✔
407
      
408
      // drop stream
409
      code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
1✔
410
      if (code) {
1!
411
        mError("drop db trans:%d failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
412
        sdbRelease(pSdb, pStream);
×
413
        sdbCancelFetch(pSdb, pIter);
×
414
        TAOS_RETURN(code);
×
415
      }
416
    }
417

418
    sdbRelease(pSdb, pStream);
23✔
419
  }
420

421
  return 0;
1,887✔
422
}
423

424
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
624✔
425
  SMnode     *pMnode = pReq->info.node;
624✔
426
  SSdb       *pSdb = pMnode->pSdb;
624✔
427
  int32_t     numOfRows = 0;
624✔
428
  SStreamObj *pStream = NULL;
624✔
429
  int32_t     code = 0;
624✔
430

431
  while (numOfRows < rows) {
1,936!
432
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
1,936✔
433
    if (pShow->pIter == NULL) break;
1,936✔
434

435
    code = mstSetStreamAttrResBlock(pMnode, pStream, pBlock, numOfRows);
1,312✔
436
    if (code == 0) {
1,312!
437
      numOfRows++;
1,312✔
438
    }
439
    sdbRelease(pSdb, pStream);
1,312✔
440
  }
441

442
  pShow->numOfRows += numOfRows;
624✔
443
  return numOfRows;
624✔
444
}
445

446
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
447
  SSdb *pSdb = pMnode->pSdb;
×
448
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
449
}
×
450

451
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
455✔
452
  SMnode     *pMnode = pReq->info.node;
455✔
453
  SSdb       *pSdb = pMnode->pSdb;
455✔
454
  int32_t     numOfRows = 0;
455✔
455
  SStreamObj *pStream = NULL;
455✔
456
  int32_t     code = 0;
455✔
457

458
  while (numOfRows < rowsCapacity) {
4,488✔
459
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
4,355✔
460
    if (pShow->pIter == NULL) {
4,355✔
461
      break;
322✔
462
    }
463

464
    code = mstSetStreamTasksResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
4,033✔
465

466
    sdbRelease(pSdb, pStream);
4,033✔
467
  }
468

469
  pShow->numOfRows += numOfRows;
455✔
470
  return numOfRows;
455✔
471
}
472

473
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
474
  SSdb *pSdb = pMnode->pSdb;
×
475
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
476
}
×
477

478
static int32_t mndRetrieveStreamRecalculates(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
1✔
479
  SMnode     *pMnode = pReq->info.node;
1✔
480
  SSdb       *pSdb = pMnode->pSdb;
1✔
481
  int32_t     numOfRows = 0;
1✔
482
  SStreamObj *pStream = NULL;
1✔
483
  int32_t     code = 0;
1✔
484

485
  while (numOfRows < rowsCapacity) {
2!
486
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
2✔
487
    if (pShow->pIter == NULL) {
2✔
488
      break;
1✔
489
    }
490

491
    code = mstSetStreamRecalculatesResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
1✔
492

493
    sdbRelease(pSdb, pStream);
1✔
494
  }
495

496
  pShow->numOfRows += numOfRows;
1✔
497
  return numOfRows;
1✔
498
}
499

500
static void mndCancelGetNextStreamRecalculates(SMnode *pMnode, void *pIter) {
×
501
  SSdb *pSdb = pMnode->pSdb;
×
502
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
503
}
×
504

505

506
static bool mndStreamUpdateTagsFlag(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
4,334✔
507
  SStreamObj *pStream = pObj;
4,334✔
508
  if (atomic_load_8(&pStream->userDropped)) {
4,334!
509
    return true;
×
510
  }
511

512
  if (TSDB_SUPER_TABLE != pStream->pCreate->triggerTblType && 
4,334✔
513
      TSDB_CHILD_TABLE != pStream->pCreate->triggerTblType && 
2,242✔
514
      TSDB_VIRTUAL_CHILD_TABLE != pStream->pCreate->triggerTblType) {
1,428✔
515
    return true;
190✔
516
  }
517

518
  if (pStream->pCreate->triggerTblSuid != *(uint64_t*)p1) {
4,144✔
519
    return true;
4,140✔
520
  }
521

522
  if (NULL == pStream->pCreate->partitionCols) {
4!
523
    return true;
×
524
  }
525

526
  SNodeList* pList = NULL;
4✔
527
  int32_t code = nodesStringToList(pStream->pCreate->partitionCols, &pList);
4✔
528
  if (code) {
4!
529
    nodesDestroyList(pList);
×
530
    mstError("partitionCols [%s] nodesStringToList failed with error:%s", (char*)pStream->pCreate->partitionCols, tstrerror(code));
×
531
    return true;
×
532
  }
533

534
  SSchema* pTags = (SSchema*)p2;
4✔
535
  int32_t* tagNum = (int32_t*)p3;
4✔
536

537
  SNode* pNode = NULL;
4✔
538
  FOREACH(pNode, pList) {
18!
539
    SColumnNode* pCol = (SColumnNode*)pNode;
14✔
540
    for (int32_t i = 0; i < *tagNum; ++i) {
76✔
541
      if (pCol->colId == pTags[i].colId) {
72✔
542
        pTags[i].flags |= COL_REF_BY_STM;
10✔
543
        break;
10✔
544
      }
545
    }
546
  }
547

548
  nodesDestroyList(pList);
4✔
549
  
550
  return true;
4✔
551
}
552

553

554
void mndStreamUpdateTagsRefFlag(SMnode *pMnode, int64_t suid, SSchema* pTags, int32_t tagNum) {
60,038✔
555
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
60,038✔
556
  if (streamNum <= 0) {
60,038✔
557
    return;
59,746✔
558
  }
559

560
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mndStreamUpdateTagsFlag, &suid, pTags, &tagNum);
292✔
561
}
562

563
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) {
21✔
564
  SMnode     *pMnode = pReq->info.node;
21✔
565
  SStreamObj *pStream = NULL;
21✔
566
  int32_t     code = 0;
21✔
567

568
  SMPauseStreamReq pauseReq = {0};
21✔
569
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
21!
570
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
571
  }
572

573
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
21✔
574
  if (pStream == NULL || code != 0) {
21!
575
    if (pauseReq.igNotExists) {
×
576
      mInfo("stream:%s, not exist, not stop stream", pauseReq.name);
×
577
      taosMemoryFree(pauseReq.name);
×
578
      return 0;
×
579
    } else {
580
      mError("stream:%s not exist, failed to stop stream", pauseReq.name);
×
581
      taosMemoryFree(pauseReq.name);
×
582
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
583
    }
584
  }
585

586
  taosMemoryFree(pauseReq.name);
21!
587

588
  int64_t streamId = pStream->pCreate->streamId;
21✔
589
  
590
  mstsInfo("start to stop stream %s", pStream->name);
21!
591

592
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
21✔
593
  if (code != TSDB_CODE_SUCCESS) {
21✔
594
    mstsError("user %s failed to stop stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
10!
595
    sdbRelease(pMnode->pSdb, pStream);
10✔
596
    return code;
10✔
597
  }
598

599
  if (atomic_load_8(&pStream->userDropped)) {
11!
600
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
601
    mstsError("user %s failed to stop stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
602
    sdbRelease(pMnode->pSdb, pStream);
×
603
    return code;
×
604
  }
605

606
  STrans *pTrans = NULL;
11✔
607
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, &pTrans);
11✔
608
  if (pTrans == NULL || code) {
11!
609
    mstsError("failed to stop stream %s since %s", pStream->name, tstrerror(code));
×
610
    sdbRelease(pMnode->pSdb, pStream);
×
611
    return code;
×
612
  }
613

614
  pStream->updateTime = taosGetTimestampMs();
11✔
615

616
  atomic_store_8(&pStream->userStopped, 1);
11✔
617

618
  MND_STREAM_SET_LAST_TS(STM_EVENT_STOP_STREAM, pStream->updateTime);
11!
619

620
  msmUndeployStream(pMnode, streamId, pStream->name);
11✔
621

622
  // stop stream
623
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
11✔
624
  if (code != TSDB_CODE_SUCCESS) {
11!
625
    sdbRelease(pMnode->pSdb, pStream);
×
626
    mndTransDrop(pTrans);
×
627
    return code;
×
628
  }
629

630
  code = mndTransPrepare(pMnode, pTrans);
11✔
631
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11!
632
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
×
633
    sdbRelease(pMnode->pSdb, pStream);
×
634
    mndTransDrop(pTrans);
×
635
    return code;
×
636
  }
637

638
  sdbRelease(pMnode->pSdb, pStream);
11✔
639
  mndTransDrop(pTrans);
11✔
640

641
  return TSDB_CODE_ACTION_IN_PROGRESS;
11✔
642
}
643

644

645
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq) {
21✔
646
  SMnode     *pMnode = pReq->info.node;
21✔
647
  SStreamObj *pStream = NULL;
21✔
648
  int32_t     code = 0;
21✔
649

650
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
21!
651
    return code;
×
652
  }
653

654
  SMResumeStreamReq resumeReq = {0};
21✔
655
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
21!
656
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
657
  }
658

659
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
21✔
660
  if (pStream == NULL || code != 0) {
21!
661
    if (resumeReq.igNotExists) {
×
662
      mInfo("stream:%s not exist, not start stream", resumeReq.name);
×
663
      taosMemoryFree(resumeReq.name);
×
664
      sdbRelease(pMnode->pSdb, pStream);
×
665
      return 0;
×
666
    } else {
667
      mError("stream:%s not exist, failed to start stream", resumeReq.name);
×
668
      taosMemoryFree(resumeReq.name);
×
669
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
670
    }
671
  }
672

673
  taosMemoryFree(resumeReq.name);
21!
674

675
  int64_t streamId = pStream->pCreate->streamId;
21✔
676

677
  mstsInfo("start to start stream %s from stopped", pStream->name);
21!
678

679
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
21✔
680
  if (code != TSDB_CODE_SUCCESS) {
21✔
681
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
10!
682
    sdbRelease(pMnode->pSdb, pStream);
10✔
683
    return code;
10✔
684
  }
685

686
  if (atomic_load_8(&pStream->userDropped)) {
11!
687
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
688
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
689
    sdbRelease(pMnode->pSdb, pStream);
×
690
    return code;
×
691
  }
692

693
  if (0 == atomic_load_8(&pStream->userStopped)) {
11!
694
    code = TSDB_CODE_MND_STREAM_NOT_STOPPED;
×
695
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
696
    sdbRelease(pMnode->pSdb, pStream);
×
697
    return code;
×
698
  }
699
  
700
  atomic_store_8(&pStream->userStopped, 0);
11✔
701

702
  pStream->updateTime = taosGetTimestampMs();
11✔
703

704
  MND_STREAM_SET_LAST_TS(STM_EVENT_START_STREAM, pStream->updateTime);
11!
705

706
  STrans *pTrans = NULL;
11✔
707
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_START_NAME, &pTrans);
11✔
708
  if (pTrans == NULL || code) {
11!
709
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
710
    sdbRelease(pMnode->pSdb, pStream);
×
711
    return code;
×
712
  }
713

714
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
11✔
715
  if (code != TSDB_CODE_SUCCESS) {
11!
716
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
717
    sdbRelease(pMnode->pSdb, pStream);
×
718
    mndTransDrop(pTrans);
×
719
    return code;
×
720
  }
721

722
  code = mndTransPrepare(pMnode, pTrans);
11✔
723
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11!
724
    mstsError("trans:%d, failed to prepare start stream %s trans since %s", pTrans->id, pStream->name, tstrerror(code));
×
725
    sdbRelease(pMnode->pSdb, pStream);
×
726
    mndTransDrop(pTrans);
×
727
    return code;
×
728
  }
729

730
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->name, NULL, true, STREAM_ACT_DEPLOY);
11✔
731

732
  sdbRelease(pMnode->pSdb, pStream);
11✔
733
  mndTransDrop(pTrans);
11✔
734

735
  return TSDB_CODE_ACTION_IN_PROGRESS;
11✔
736
}
737

738

739
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
29✔
740
  SMnode     *pMnode = pReq->info.node;
29✔
741
  SStreamObj *pStream = NULL;
29✔
742
  int32_t     code = 0;
29✔
743

744
  SMDropStreamReq dropReq = {0};
29✔
745
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
29!
746
    mError("invalid drop stream msg recv, discarded");
×
747
    code = TSDB_CODE_INVALID_MSG;
×
748
    TAOS_RETURN(code);
×
749
  }
750

751
  mDebug("recv drop stream:%s msg", dropReq.name);
29!
752

753
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
29✔
754
  if (pStream == NULL || code != 0) {
29!
755
    if (dropReq.igNotExists) {
×
756
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
×
757
      sdbRelease(pMnode->pSdb, pStream);
×
758
      tFreeMDropStreamReq(&dropReq);
×
759
      return 0;
×
760
    } else {
761
      mError("stream:%s not exist failed to drop it", dropReq.name);
×
762
      tFreeMDropStreamReq(&dropReq);
×
763
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
764
    }
765
  }
766

767
  int64_t streamId = pStream->pCreate->streamId;
29✔
768

769
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
29✔
770
  if (code != 0) {
29✔
771
    mstsError("user %s failed to drop stream %s since %s", pReq->info.conn.user, dropReq.name, tstrerror(code));
10!
772
    sdbRelease(pMnode->pSdb, pStream);
10✔
773
    tFreeMDropStreamReq(&dropReq);
10✔
774
    return code;
10✔
775
  }
776

777
  if (pStream->pCreate->tsmaId != 0) {
19!
778
    mstsDebug("try to drop tsma related stream, tsmaId:%" PRIx64, pStream->pCreate->tsmaId);
×
779

780
    void    *pIter = NULL;
×
781
    SSmaObj *pSma = NULL;
×
782
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
783
    while (pIter) {
×
784
      if (pSma && pSma->uid == pStream->pCreate->tsmaId) {
×
785
        sdbRelease(pMnode->pSdb, pSma);
×
786
        sdbRelease(pMnode->pSdb, pStream);
×
787

788
        sdbCancelFetch(pMnode->pSdb, pIter);
×
789
        tFreeMDropStreamReq(&dropReq);
×
790
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
791

792
        mstsError("refused to drop tsma-related stream %s since tsma still exists", dropReq.name);
×
793
        TAOS_RETURN(code);
×
794
      }
795

796
      if (pSma) {
×
797
        sdbRelease(pMnode->pSdb, pSma);
×
798
      }
799

800
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
801
    }
802
  }
803

804
  mstsInfo("start to drop stream %s", pStream->pCreate->name);
19!
805

806
  pStream->updateTime = taosGetTimestampMs();
19✔
807

808
  atomic_store_8(&pStream->userDropped, 1);
19✔
809

810
  MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
19!
811

812
  msmUndeployStream(pMnode, streamId, pStream->pCreate->name);
19✔
813

814
  STrans *pTrans = NULL;
19✔
815
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, &pTrans);
19✔
816
  if (pTrans == NULL || code) {
19!
817
    mstsError("failed to drop stream %s since %s", dropReq.name, tstrerror(code));
×
818
    sdbRelease(pMnode->pSdb, pStream);
×
819
    tFreeMDropStreamReq(&dropReq);
×
820
    TAOS_RETURN(code);
×
821
  }
822

823
  // drop stream
824
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
19✔
825
  if (code) {
19!
826
    mstsError("trans:%d, failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
827
    sdbRelease(pMnode->pSdb, pStream);
×
828
    mndTransDrop(pTrans);
×
829
    tFreeMDropStreamReq(&dropReq);
×
830
    TAOS_RETURN(code);
×
831
  }
832

833
  code = mndTransPrepare(pMnode, pTrans);
19✔
834
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
19!
835
    mstsError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
836
    sdbRelease(pMnode->pSdb, pStream);
×
837
    mndTransDrop(pTrans);
×
838
    tFreeMDropStreamReq(&dropReq);
×
839
    TAOS_RETURN(code);
×
840
  }
841

842
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", pStream->pCreate->streamDB, NULL, 0);
19✔
843

844
  sdbRelease(pMnode->pSdb, pStream);
19✔
845
  mndTransDrop(pTrans);
19✔
846

847
  mstsDebug("drop stream %s half completed", dropReq.name);
19!
848
  code = TSDB_CODE_ACTION_IN_PROGRESS;
19✔
849

850
  tFreeMDropStreamReq(&dropReq);
19✔
851
  
852
  TAOS_RETURN(code);
19✔
853
}
854

855
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
401✔
856
  SMnode     *pMnode = pReq->info.node;
401✔
857
  SStreamObj *pStream = NULL;
401✔
858
  SStreamObj  streamObj = {0};
401✔
859
  int32_t     code = TSDB_CODE_SUCCESS;
401✔
860
  int32_t     lino = 0;
401✔
861
  STrans     *pTrans = NULL;
401✔
862
  uint64_t    streamId = 0;
401✔
863
  SCMCreateStreamReq* pCreate = NULL;
401✔
864

865
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
401!
866
    goto _OVER;
×
867
  }
868
  
869
#ifdef WINDOWS
870
  code = TSDB_CODE_MND_INVALID_PLATFORM;
871
  goto _OVER;
872
#endif
873

874
  pCreate = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
401!
875
  TSDB_CHECK_NULL(pCreate, code, lino, _OVER, terrno);
401!
876
  
877
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, pCreate);
401✔
878
  TSDB_CHECK_CODE(code, lino, _OVER);
401!
879

880
  streamId = pCreate->streamId;
401✔
881

882
  mstsInfo("start to create stream %s, sql:%s", pCreate->name, pCreate->sql);
401!
883

884
  int32_t snodeId = msmAssignRandomSnodeId(pMnode, streamId);
401✔
885
  if (!GOT_SNODE(snodeId)) {
401✔
886
    code = terrno;
10✔
887
    TSDB_CHECK_CODE(code, lino, _OVER);
10!
888
  }
889
  
890
  code = mndAcquireStream(pMnode, pCreate->name, &pStream);
391✔
891
  if (pStream != NULL && code == 0) {
391!
892
    if (pCreate->igExists) {
20!
893
      mstsInfo("stream %s already exist, ignore exist is set", pCreate->name);
×
894
    } else {
895
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
20✔
896
    }
897

898
    mndReleaseStream(pMnode, pStream);
20✔
899
    goto _OVER;
20✔
900
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
371!
901
    goto _OVER;
×
902
  }
903

904
  code = mndStreamValidateCreate(pMnode, pReq->info.conn.user, pCreate);
371✔
905
  TSDB_CHECK_CODE(code, lino, _OVER);
371✔
906

907
  mndStreamBuildObj(pMnode, &streamObj, pCreate, snodeId);
334✔
908
  pCreate = NULL;
334✔
909

910
  pStream = &streamObj;
334✔
911

912
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, &pTrans);
334✔
913
  if (pTrans == NULL || code) {
334!
914
    goto _OVER;
×
915
  }
916

917
  // create stb for stream
918
  if (TSDB_SUPER_TABLE == pStream->pCreate->outTblType && !pStream->pCreate->outStbExists) {
334✔
919
    pStream->pCreate->outStbUid = mndGenerateUid(pStream->pCreate->outTblName, strlen(pStream->pCreate->outTblName));
155✔
920
    code = mndStreamCreateOutStb(pMnode, pTrans, pStream->pCreate, pReq->info.conn.user);
155✔
921
    TSDB_CHECK_CODE(code, lino, _OVER);
155!
922
  }
923

924
  // add stream to trans
925
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
334✔
926
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
334!
927
    mstsError("failed to persist stream %s since %s", pStream->pCreate->name, tstrerror(code));
×
928
    goto _OVER;
×
929
  }
930

931
  // execute creation
932
  code = mndTransPrepare(pMnode, pTrans);
334✔
933
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
334!
934
    mstsError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
935
    goto _OVER;
×
936
  }
937
  code = TSDB_CODE_ACTION_IN_PROGRESS;
334✔
938

939
  auditRecord(pReq, pMnode->clusterId, "createStream", pStream->pCreate->streamDB, pStream->pCreate->name, pStream->pCreate->sql, strlen(pStream->pCreate->sql));
334✔
940

941
  MND_STREAM_SET_LAST_TS(STM_EVENT_CREATE_STREAM, taosGetTimestampMs());
562✔
942

943
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, true, STREAM_ACT_DEPLOY);
334✔
944

945
_OVER:
401✔
946

947
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
401!
948
    if (pStream && pStream->pCreate) {
67!
949
      mstsError("failed to create stream %s at line:%d since %s", pStream->pCreate->name, lino, tstrerror(code));
20!
950
    } else {
951
      mstsError("failed to create stream at line:%d since %s", lino, tstrerror(code));
47!
952
    }
953
  } else {
954
    mstsDebug("create stream %s half completed", pStream->pCreate ? pStream->pCreate->name : "unknown");
334!
955
  }
956

957
  tFreeSCMCreateStreamReq(pCreate);
401✔
958
  taosMemoryFreeClear(pCreate);
401!
959

960
  mndTransDrop(pTrans);
401✔
961
  tFreeStreamObj(&streamObj);
401✔
962

963
  return code;
401✔
964
}
965

966
static int32_t mndProcessRecalcStreamReq(SRpcMsg *pReq) {
21✔
967
  SMnode     *pMnode = pReq->info.node;
21✔
968
  SStreamObj *pStream = NULL;
21✔
969
  int32_t     code = 0;
21✔
970

971
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
21!
972
    return code;
×
973
  }
974

975
  SMRecalcStreamReq recalcReq = {0};
21✔
976
  if (tDeserializeSMRecalcStreamReq(pReq->pCont, pReq->contLen, &recalcReq) < 0) {
21!
977
    tFreeMRecalcStreamReq(&recalcReq);
×
978
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
979
  }
980

981
  code = mndAcquireStream(pMnode, recalcReq.name, &pStream);
21✔
982
  if (pStream == NULL || code != 0) {
21!
983
    mError("stream:%s not exist, failed to recalc stream", recalcReq.name);
×
984
    tFreeMRecalcStreamReq(&recalcReq);
×
985
    TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
986
  }
987

988
  int64_t streamId = pStream->pCreate->streamId;
21✔
989
  
990
  mstsInfo("start to recalc stream %s", recalcReq.name);
21!
991

992
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
21✔
993
  if (code != TSDB_CODE_SUCCESS) {
21✔
994
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
20!
995
    sdbRelease(pMnode->pSdb, pStream);
20✔
996
    tFreeMRecalcStreamReq(&recalcReq);
20✔
997
    return code;
20✔
998
  }
999

1000
  if (atomic_load_8(&pStream->userDropped)) {
1!
1001
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
1002
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
1003
    sdbRelease(pMnode->pSdb, pStream);
×
1004
    tFreeMRecalcStreamReq(&recalcReq);
×
1005
    return code;
×
1006
  }
1007

1008
  if (atomic_load_8(&pStream->userStopped)) {
1!
1009
    code = TSDB_CODE_MND_STREAM_STOPPED;
×
1010
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
1011
    sdbRelease(pMnode->pSdb, pStream);
×
1012
    tFreeMRecalcStreamReq(&recalcReq);
×
1013
    return code;
×
1014
  }
1015

1016
  if (WINDOW_TYPE_PERIOD == pStream->pCreate->triggerType) {
1!
1017
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1018
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
×
1019
    sdbRelease(pMnode->pSdb, pStream);
×
1020
    tFreeMRecalcStreamReq(&recalcReq);
×
1021
    return code;
×
1022
  }
1023

1024
  /*
1025
  pStream->updateTime = taosGetTimestampMs();
1026

1027
  STrans *pTrans = NULL;
1028
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RECALC_NAME, &pTrans);
1029
  if (pTrans == NULL || code) {
1030
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
1031
    sdbRelease(pMnode->pSdb, pStream);
1032
    return code;
1033
  }
1034

1035
  // stop stream
1036
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
1037
  if (code != TSDB_CODE_SUCCESS) {
1038
    sdbRelease(pMnode->pSdb, pStream);
1039
    mndTransDrop(pTrans);
1040
    return code;
1041
  }
1042

1043
  code = mndTransPrepare(pMnode, pTrans);
1044
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1045
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
1046
    sdbRelease(pMnode->pSdb, pStream);
1047
    mndTransDrop(pTrans);
1048
    return code;
1049
  }
1050
*/
1051

1052
  code = msmRecalcStream(pMnode, pStream->pCreate->streamId, &recalcReq.timeRange);
1✔
1053
  if (code != TSDB_CODE_SUCCESS) {
1!
1054
    sdbRelease(pMnode->pSdb, pStream);
×
1055
    tFreeMRecalcStreamReq(&recalcReq);
×
1056
    return code;
×
1057
  }
1058
  
1059
  char buf[128];
1060
  snprintf(buf, sizeof(buf), "start:%" PRId64 ", end:%" PRId64, recalcReq.timeRange.skey, recalcReq.timeRange.ekey);
1✔
1061
  auditRecord(pReq, pMnode->clusterId, "recalcStream", pStream->name, recalcReq.name, buf, strlen(buf));
1✔
1062

1063
  sdbRelease(pMnode->pSdb, pStream);
1✔
1064
  tFreeMRecalcStreamReq(&recalcReq);
1✔
1065
//  mndTransDrop(pTrans);
1066

1067
  return TSDB_CODE_SUCCESS;
1✔
1068
}
1069

1070

1071
int32_t mndInitStream(SMnode *pMnode) {
1,927✔
1072
  SSdbTable table = {
1,927✔
1073
      .sdbType = SDB_STREAM,
1074
      .keyType = SDB_KEY_BINARY,
1075
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
1076
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
1077
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
1078
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
1079
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
1080
  };
1081

1082
  if (!tsDisableStream) {
1,927!
1083
    mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
1,927✔
1084
    mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
1,927✔
1085
    mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessStartStreamReq);
1,927✔
1086
    mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessStopStreamReq);
1,927✔
1087
    mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);  
1,927✔
1088
    mndSetMsgHandle(pMnode, TDMT_MND_RECALC_STREAM, mndProcessRecalcStreamReq);
1,927✔
1089
  }
1090
  
1091
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
1,927✔
1092
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
1,927✔
1093
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
1,927✔
1094
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
1,927✔
1095
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndRetrieveStreamRecalculates);
1,927✔
1096
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndCancelGetNextStreamRecalculates);
1,927✔
1097

1098
  int32_t code = sdbSetTable(pMnode->pSdb, table);
1,927✔
1099
  if (code) {
1,927!
1100
    return code;
×
1101
  }
1102

1103
  //code = sdbSetTable(pMnode->pSdb, tableSeq);
1104
  return code;
1,927✔
1105
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc