• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4661

08 Aug 2025 08:36AM UTC coverage: 59.883% (-0.2%) from 60.053%
#4661

push

travis-ci

web-flow
test: update cases desc (#32498)

137331 of 291923 branches covered (47.04%)

Branch coverage included in aggregate %.

207730 of 284307 relevant lines covered (73.07%)

4552406.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.24
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndTrans.h"
23
#include "osMemory.h"
24
#include "parser.h"
25
#include "taoserror.h"
26
#include "tmisce.h"
27
#include "tname.h"
28

29
#define MND_STREAM_MAX_NUM 100000
30

31
typedef struct {
32
  int8_t placeHolder;  // // to fix windows compile error, define place holder
33
} SMStreamNodeCheckMsg;
34

35
static int32_t  mndNodeCheckSentinel = 0;
36
SStmRuntime  mStreamMgmt = {0};
37

38
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
39
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
41
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
42

43
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
44
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
50
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq);
51
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq);
52

53
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
54

55
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
56
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
57
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
58
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
59
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
60
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
61

62
void mndCleanupStream(SMnode *pMnode) {
2,512✔
63
  msmDestroyRuntimeInfo(pMnode);
2,512✔
64
  
65
  mDebug("mnd stream runtime info cleanup");
2,512✔
66
}
2,512✔
67

68
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
290✔
69
  int32_t     code = 0;
290✔
70
  int32_t     lino = 0;
290✔
71
  SSdbRow    *pRow = NULL;
290✔
72
  SStreamObj *pStream = NULL;
290✔
73
  void       *buf = NULL;
290✔
74
  int8_t      sver = 0;
290✔
75
  int32_t     tlen;
76
  int32_t     dataPos = 0;
290✔
77

78
  code = sdbGetRawSoftVer(pRaw, &sver);
290✔
79
  TSDB_CHECK_CODE(code, lino, _over);
290!
80

81
  if (sver != MND_STREAM_VER_NUMBER) {
290!
82
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
83
    goto _over;
×
84
  }
85

86
  pRow = sdbAllocRow(sizeof(SStreamObj));
290✔
87
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
290!
88

89
  pStream = sdbGetRowObj(pRow);
290✔
90
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
290!
91

92
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
290!
93

94
  buf = taosMemoryMalloc(tlen + 1);
290!
95
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
290!
96

97
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
290!
98

99
  SDecoder decoder;
100
  tDecoderInit(&decoder, buf, tlen + 1);
290✔
101
  code = tDecodeSStreamObj(&decoder, pStream, sver);
290✔
102
  tDecoderClear(&decoder);
290✔
103

104
  if (code < 0) {
290!
105
    tFreeStreamObj(pStream);
×
106
  }
107

108
_over:
290✔
109
  taosMemoryFreeClear(buf);
290!
110

111
  if (code != TSDB_CODE_SUCCESS) {
290!
112
    char *p = (pStream == NULL || NULL == pStream->pCreate) ? "null" : pStream->pCreate->name;
×
113
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
114
    taosMemoryFreeClear(pRow);
×
115

116
    terrno = code;
×
117
    return NULL;
×
118
  } else {
119
    mTrace("stream:%s, decode from raw:%p, row:%p", pStream->pCreate->name, pRaw, pStream);
290!
120

121
    terrno = 0;
290✔
122
    return pRow;
290✔
123
  }
124
}
125

126
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
243✔
127
  mTrace("stream:%s, perform insert action", pStream->pCreate->name);
243!
128
  return 0;
243✔
129
}
130

131
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
290✔
132
  mInfo("stream:%s, perform delete action", pStream->pCreate->name);
290!
133
  tFreeStreamObj(pStream);
290✔
134
  return 0;
290✔
135
}
136

137
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
28✔
138
  mTrace("stream:%s, perform update action", pOldStream->pCreate->name);
28!
139

140
  atomic_store_32(&pOldStream->mainSnodeId, pNewStream->mainSnodeId);
28✔
141
  atomic_store_8(&pOldStream->userStopped, atomic_load_8(&pNewStream->userStopped));
28✔
142
  pOldStream->updateTime = pNewStream->updateTime;
28✔
143
  
144
  return 0;
28✔
145
}
146

147
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
672✔
148
  int32_t code = 0;
672✔
149
  SSdb   *pSdb = pMnode->pSdb;
672✔
150
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
672✔
151
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
672!
152
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
265✔
153
  }
154
  return code;
672✔
155
}
156

157
static bool mndStreamGetNameFromId(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
158
  SStreamObj* pStream = pObj;
×
159

160
  if (pStream->pCreate->streamId == *(int64_t*)p1) {
×
161
    strncpy((char*)p2, pStream->name, TSDB_STREAM_NAME_LEN);
×
162
    return false;
×
163
  }
164

165
  return true;
×
166
}
167

168
int32_t mndAcquireStreamById(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
×
169
  int32_t code = 0;
×
170
  SSdb   *pSdb = pMnode->pSdb;
×
171
  char streamName[TSDB_STREAM_NAME_LEN];
172
  streamName[0] = 0;
×
173
  
174
  sdbTraverse(pSdb, SDB_STREAM, mndStreamGetNameFromId, &streamId, streamName, NULL);
×
175
  if (streamName[0]) {
×
176
    (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
177
    if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
178
      code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
179
    }
180
  }
181
  
182
  return code;
×
183
}
184

185
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
315✔
186
  SSdb *pSdb = pMnode->pSdb;
315✔
187
  sdbRelease(pSdb, pStream);
315✔
188
}
315✔
189

190
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
191
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
192
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
193
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
194
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
195

196
static void mndStreamBuildObj(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate, int32_t snodeId) {
243✔
197
  int32_t     code = 0;
243✔
198

199
  pObj->pCreate = pCreate;
243✔
200
  strncpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
243✔
201
  pObj->mainSnodeId = snodeId;
243✔
202
  
203
  pObj->userDropped = 0;
243✔
204
  pObj->userStopped = 0;
243✔
205
  
206
  pObj->createTime = taosGetTimestampMs();
243✔
207
  pObj->updateTime = pObj->createTime;
243✔
208

209
  mstLogSStreamObj("create stream", pObj);
243✔
210
}
243✔
211

212
static int32_t mndStreamCreateOutStb(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream, const char *user) {
134✔
213
  SStbObj *pStb = NULL;
134✔
214
  SDbObj  *pDb = NULL;
134✔
215
  int32_t  code = 0;
134✔
216
  int32_t  lino = 0;
134✔
217

218
  SMCreateStbReq createReq = {0};
134✔
219
  TAOS_STRNCAT(createReq.name, pStream->outDB, TSDB_DB_FNAME_LEN);
134✔
220
  TAOS_STRNCAT(createReq.name, ".", 2);
134✔
221
  TAOS_STRNCAT(createReq.name,  pStream->outTblName, TSDB_TABLE_NAME_LEN);
134✔
222
  createReq.numOfColumns = taosArrayGetSize(pStream->outCols);
134✔
223
  createReq.numOfTags = pStream->outTags ? taosArrayGetSize(pStream->outTags) : 1;
134!
224
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
134✔
225
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
134!
226

227
  // build fields
228
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
662✔
229
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
528✔
230
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
528!
231
    SFieldWithOptions *pSrc = taosArrayGet(pStream->outCols, i);
528✔
232

233
    tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
528✔
234
    pField->flags = 0;
528✔
235
    pField->type = pSrc->type;
528✔
236
    pField->bytes = pSrc->bytes;
528✔
237
    pField->compress = createDefaultColCmprByType(pField->type);
528✔
238
    if (IS_DECIMAL_TYPE(pField->type)) {
528!
239
      pField->typeMod = pSrc->typeMod;
×
240
      pField->flags |= COL_HAS_TYPE_MOD;
×
241
    }
242
  }
243

244
  if (NULL == pStream->outTags) {
134!
245
    createReq.numOfTags = 1;
×
246
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
247
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
248

249
    // build tags
250
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
251
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
252

253
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
254
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
255
    pField->flags = 0;
×
256
    pField->bytes = 8;
×
257
  } else {
258
    createReq.numOfTags = taosArrayGetSize(pStream->outTags);
134✔
259
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
134✔
260
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
134!
261

262
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
373✔
263
      SField *pField = taosArrayGet(createReq.pTags, i);
239✔
264
      if (pField == NULL) {
239!
265
        continue;
×
266
      }
267

268
      TAOS_FIELD_E *pSrc = taosArrayGet(pStream->outTags, i);
239✔
269
      pField->bytes = pSrc->bytes;
239✔
270
      pField->flags = 0;
239✔
271
      pField->type = pSrc->type;
239✔
272
      tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
239✔
273
    }
274
  }
275

276
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
134!
277
    goto _OVER;
×
278
  }
279

280
  pStb = mndAcquireStb(pMnode, createReq.name);
134✔
281
  if (pStb != NULL) {
134!
282
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
283
    goto _OVER;
×
284
  }
285

286
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
134✔
287
  if (pDb == NULL) {
134!
288
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
289
    goto _OVER;
×
290
  }
291

292
  int32_t numOfStbs = -1;
134✔
293
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
134!
294
    goto _OVER;
×
295
  }
296

297
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
134!
298
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
299
    goto _OVER;
×
300
  }
301

302
  SStbObj stbObj = {0};
134✔
303

304
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
134!
305
    goto _OVER;
×
306
  }
307

308
  stbObj.uid = pStream->outStbUid;
134✔
309

310
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
134!
311
    mndFreeStb(&stbObj);
×
312
    goto _OVER;
×
313
  }
314

315
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->outTblName, createReq.numOfColumns);
134✔
316

317
  tFreeSMCreateStbReq(&createReq);
134✔
318
  mndFreeStb(&stbObj);
134✔
319
  mndReleaseStb(pMnode, pStb);
134✔
320
  mndReleaseDb(pMnode, pDb);
134✔
321
  return code;
134✔
322

323
_OVER:
×
324
  tFreeSMCreateStbReq(&createReq);
×
325
  mndReleaseStb(pMnode, pStb);
×
326
  mndReleaseDb(pMnode, pDb);
×
327

328
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->outTblName, lino,
×
329
         tstrerror(code));
330
  return code;
×
331
}
332

333
static int32_t mndStreamValidateCreate(SMnode *pMnode, char* pUser, SCMCreateStreamReq* pCreate) {
264✔
334
  int32_t code = 0, lino = 0;
264✔
335
  int64_t streamId = pCreate->streamId;
264✔
336

337
  if (pCreate->streamDB) {
264!
338
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->streamDB);
264✔
339
    if (code) {
264✔
340
      mstsError("user %s failed to create stream %s in db %s since %s", pUser, pCreate->name, pCreate->streamDB, tstrerror(code));
20!
341
    }
342
    TSDB_CHECK_CODE(code, lino, _OVER);
264✔
343
  }
344

345
  if (pCreate->triggerDB) {
244✔
346
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, pCreate->triggerDB);
243✔
347
    if (code) {
243✔
348
      mstsError("user %s failed to create stream %s using trigger db %s since %s", pUser, pCreate->name, pCreate->triggerDB, tstrerror(code));
1!
349
    }
350
    TSDB_CHECK_CODE(code, lino, _OVER);
243✔
351
  }
352

353
  if (pCreate->calcDB) {
243!
354
    int32_t dbNum = taosArrayGetSize(pCreate->calcDB);
243✔
355
    for (int32_t i = 0; i < dbNum; ++i) {
485✔
356
      char* calcDB = taosArrayGetP(pCreate->calcDB, i);
242✔
357
      code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, calcDB);
242✔
358
      if (code) {
242!
359
        mstsError("user %s failed to create stream %s using calcDB %s since %s", pUser, pCreate->name, calcDB, tstrerror(code));
×
360
      }
361
      TSDB_CHECK_CODE(code, lino, _OVER);
242!
362
    }
363
  }
364

365
  if (pCreate->outDB) {
243✔
366
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->outDB);
242✔
367
    if (code) {
242!
368
      mstsError("user %s failed to create stream %s using out db %s since %s", pUser, pCreate->name, pCreate->outDB, tstrerror(code));
×
369
    }
370
    TSDB_CHECK_CODE(code, lino, _OVER);
242!
371
  }
372

373
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
243✔
374
  if (streamNum > MND_STREAM_MAX_NUM) {
243!
375
    code = TSDB_CODE_MND_TOO_MANY_STREAMS;
×
376
    mstsError("failed to create stream %s since %s, stream number:%d", pCreate->name, tstrerror(code), streamNum);
×
377
    return code;
×
378
  }
379

380
_OVER:
243✔
381

382
  return code;
264✔
383
}
384

385
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,726✔
386
  SSdb   *pSdb = pMnode->pSdb;
1,726✔
387
  void   *pIter = NULL;
1,726✔
388
  int32_t code = 0;
1,726✔
389

390
  while (1) {
×
391
    SStreamObj *pStream = NULL;
1,726✔
392
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
1,726✔
393
    if (pIter == NULL) break;
1,726!
394

395
    if (0 == strcmp(pStream->pCreate->streamDB, pDb->name)) {
×
396
      mInfo("start to drop stream %s in db %s", pStream->pCreate->name, pDb->name);
×
397
      
398
      pStream->updateTime = taosGetTimestampMs();
×
399
      
400
      atomic_store_8(&pStream->userDropped, 1);
×
401
      
402
      MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
×
403
      
404
      msmUndeployStream(pMnode, pStream->pCreate->streamId, pStream->pCreate->name);
×
405
      
406
      // drop stream
407
      code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
×
408
      if (code) {
×
409
        mError("drop db trans:%d failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
410
        sdbRelease(pSdb, pStream);
×
411
        sdbCancelFetch(pSdb, pIter);
×
412
        TAOS_RETURN(code);
×
413
      }
414
    }
415

416
    sdbRelease(pSdb, pStream);
×
417
  }
418

419
  return 0;
1,726✔
420
}
421

422
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
502✔
423
  SMnode     *pMnode = pReq->info.node;
502✔
424
  SSdb       *pSdb = pMnode->pSdb;
502✔
425
  int32_t     numOfRows = 0;
502✔
426
  SStreamObj *pStream = NULL;
502✔
427
  int32_t     code = 0;
502✔
428

429
  while (numOfRows < rows) {
1,901!
430
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
1,901✔
431
    if (pShow->pIter == NULL) break;
1,901✔
432

433
    code = mstSetStreamAttrResBlock(pMnode, pStream, pBlock, numOfRows);
1,399✔
434
    if (code == 0) {
1,399!
435
      numOfRows++;
1,399✔
436
    }
437
    sdbRelease(pSdb, pStream);
1,399✔
438
  }
439

440
  pShow->numOfRows += numOfRows;
502✔
441
  return numOfRows;
502✔
442
}
443

444
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
445
  SSdb *pSdb = pMnode->pSdb;
×
446
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
447
}
×
448

449
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
331✔
450
  SMnode     *pMnode = pReq->info.node;
331✔
451
  SSdb       *pSdb = pMnode->pSdb;
331✔
452
  int32_t     numOfRows = 0;
331✔
453
  SStreamObj *pStream = NULL;
331✔
454
  int32_t     code = 0;
331✔
455

456
  while (numOfRows < rowsCapacity) {
2,851✔
457
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
2,793✔
458
    if (pShow->pIter == NULL) {
2,793✔
459
      break;
273✔
460
    }
461

462
    code = mstSetStreamTasksResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
2,520✔
463

464
    sdbRelease(pSdb, pStream);
2,520✔
465
  }
466

467
  pShow->numOfRows += numOfRows;
331✔
468
  return numOfRows;
331✔
469
}
470

471
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
472
  SSdb *pSdb = pMnode->pSdb;
×
473
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
474
}
×
475

476
static int32_t mndRetrieveStreamRecalculates(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
1✔
477
  SMnode     *pMnode = pReq->info.node;
1✔
478
  SSdb       *pSdb = pMnode->pSdb;
1✔
479
  int32_t     numOfRows = 0;
1✔
480
  SStreamObj *pStream = NULL;
1✔
481
  int32_t     code = 0;
1✔
482

483
  while (numOfRows < rowsCapacity) {
2!
484
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
2✔
485
    if (pShow->pIter == NULL) {
2✔
486
      break;
1✔
487
    }
488

489
    code = mstSetStreamRecalculatesResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
1✔
490

491
    sdbRelease(pSdb, pStream);
1✔
492
  }
493

494
  pShow->numOfRows += numOfRows;
1✔
495
  return numOfRows;
1✔
496
}
497

498
static void mndCancelGetNextStreamRecalculates(SMnode *pMnode, void *pIter) {
×
499
  SSdb *pSdb = pMnode->pSdb;
×
500
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
501
}
×
502

503

504
static bool mndStreamUpdateTagsFlag(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
3,070✔
505
  SStreamObj *pStream = pObj;
3,070✔
506
  if (atomic_load_8(&pStream->userDropped)) {
3,070!
507
    return true;
×
508
  }
509

510
  if (TSDB_SUPER_TABLE != pStream->pCreate->triggerTblType && 
3,070✔
511
      TSDB_CHILD_TABLE != pStream->pCreate->triggerTblType && 
1,190✔
512
      TSDB_VIRTUAL_CHILD_TABLE != pStream->pCreate->triggerTblType) {
376✔
513
    return true;
122✔
514
  }
515

516
  if (pStream->pCreate->triggerTblSuid != *(uint64_t*)p1) {
2,948✔
517
    return true;
2,944✔
518
  }
519

520
  if (NULL == pStream->pCreate->partitionCols) {
4!
521
    return true;
×
522
  }
523

524
  SNodeList* pList = NULL;
4✔
525
  int32_t code = nodesStringToList(pStream->pCreate->partitionCols, &pList);
4✔
526
  if (code) {
4!
527
    nodesDestroyList(pList);
×
528
    mstError("partitionCols [%s] nodesStringToList failed with error:%s", (char*)pStream->pCreate->partitionCols, tstrerror(code));
×
529
    return true;
×
530
  }
531

532
  SSchema* pTags = (SSchema*)p2;
4✔
533
  int32_t* tagNum = (int32_t*)p3;
4✔
534

535
  SNode* pNode = NULL;
4✔
536
  FOREACH(pNode, pList) {
18!
537
    SColumnNode* pCol = (SColumnNode*)pNode;
14✔
538
    for (int32_t i = 0; i < *tagNum; ++i) {
76✔
539
      if (pCol->colId == pTags[i].colId) {
72✔
540
        pTags[i].flags |= COL_REF_BY_STM;
10✔
541
        break;
10✔
542
      }
543
    }
544
  }
545

546
  nodesDestroyList(pList);
4✔
547
  
548
  return true;
4✔
549
}
550

551

552
void mndStreamUpdateTagsRefFlag(SMnode *pMnode, int64_t suid, SSchema* pTags, int32_t tagNum) {
61,192✔
553
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
61,192✔
554
  if (streamNum <= 0) {
61,192✔
555
    return;
60,917✔
556
  }
557

558
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mndStreamUpdateTagsFlag, &suid, pTags, &tagNum);
275✔
559
}
560

561
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) {
21✔
562
  SMnode     *pMnode = pReq->info.node;
21✔
563
  SStreamObj *pStream = NULL;
21✔
564
  int32_t     code = 0;
21✔
565

566
  SMPauseStreamReq pauseReq = {0};
21✔
567
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
21!
568
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
569
  }
570

571
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
21✔
572
  if (pStream == NULL || code != 0) {
21!
573
    if (pauseReq.igNotExists) {
×
574
      mInfo("stream:%s, not exist, not stop stream", pauseReq.name);
×
575
      taosMemoryFree(pauseReq.name);
×
576
      return 0;
×
577
    } else {
578
      mError("stream:%s not exist, failed to stop stream", pauseReq.name);
×
579
      taosMemoryFree(pauseReq.name);
×
580
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
581
    }
582
  }
583

584
  taosMemoryFree(pauseReq.name);
21!
585

586
  int64_t streamId = pStream->pCreate->streamId;
21✔
587
  
588
  mstsInfo("start to stop stream %s", pStream->name);
21!
589

590
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
21✔
591
  if (code != TSDB_CODE_SUCCESS) {
21✔
592
    mstsError("user %s failed to stop stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
10!
593
    sdbRelease(pMnode->pSdb, pStream);
10✔
594
    return code;
10✔
595
  }
596

597
  if (atomic_load_8(&pStream->userDropped)) {
11!
598
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
599
    mstsError("user %s failed to stop stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
600
    sdbRelease(pMnode->pSdb, pStream);
×
601
    return code;
×
602
  }
603

604
  STrans *pTrans = NULL;
11✔
605
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, &pTrans);
11✔
606
  if (pTrans == NULL || code) {
11!
607
    mstsError("failed to stop stream %s since %s", pStream->name, tstrerror(code));
×
608
    sdbRelease(pMnode->pSdb, pStream);
×
609
    return code;
×
610
  }
611

612
  pStream->updateTime = taosGetTimestampMs();
11✔
613

614
  atomic_store_8(&pStream->userStopped, 1);
11✔
615

616
  MND_STREAM_SET_LAST_TS(STM_EVENT_STOP_STREAM, pStream->updateTime);
11!
617

618
  msmUndeployStream(pMnode, streamId, pStream->name);
11✔
619

620
  // stop stream
621
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
11✔
622
  if (code != TSDB_CODE_SUCCESS) {
11!
623
    sdbRelease(pMnode->pSdb, pStream);
×
624
    mndTransDrop(pTrans);
×
625
    return code;
×
626
  }
627

628
  code = mndTransPrepare(pMnode, pTrans);
11✔
629
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11!
630
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
×
631
    sdbRelease(pMnode->pSdb, pStream);
×
632
    mndTransDrop(pTrans);
×
633
    return code;
×
634
  }
635

636
  sdbRelease(pMnode->pSdb, pStream);
11✔
637
  mndTransDrop(pTrans);
11✔
638

639
  return TSDB_CODE_ACTION_IN_PROGRESS;
11✔
640
}
641

642

643
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq) {
21✔
644
  SMnode     *pMnode = pReq->info.node;
21✔
645
  SStreamObj *pStream = NULL;
21✔
646
  int32_t     code = 0;
21✔
647

648
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
21!
649
    return code;
×
650
  }
651

652
  SMResumeStreamReq resumeReq = {0};
21✔
653
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
21!
654
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
655
  }
656

657
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
21✔
658
  if (pStream == NULL || code != 0) {
21!
659
    if (resumeReq.igNotExists) {
×
660
      mInfo("stream:%s not exist, not start stream", resumeReq.name);
×
661
      taosMemoryFree(resumeReq.name);
×
662
      sdbRelease(pMnode->pSdb, pStream);
×
663
      return 0;
×
664
    } else {
665
      mError("stream:%s not exist, failed to start stream", resumeReq.name);
×
666
      taosMemoryFree(resumeReq.name);
×
667
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
668
    }
669
  }
670

671
  taosMemoryFree(resumeReq.name);
21!
672

673
  int64_t streamId = pStream->pCreate->streamId;
21✔
674

675
  mstsInfo("start to start stream %s from stopped", pStream->name);
21!
676

677
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
21✔
678
  if (code != TSDB_CODE_SUCCESS) {
21✔
679
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
10!
680
    sdbRelease(pMnode->pSdb, pStream);
10✔
681
    return code;
10✔
682
  }
683

684
  if (atomic_load_8(&pStream->userDropped)) {
11!
685
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
686
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
687
    sdbRelease(pMnode->pSdb, pStream);
×
688
    return code;
×
689
  }
690

691
  if (0 == atomic_load_8(&pStream->userStopped)) {
11!
692
    code = TSDB_CODE_MND_STREAM_NOT_STOPPED;
×
693
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
694
    sdbRelease(pMnode->pSdb, pStream);
×
695
    return code;
×
696
  }
697
  
698
  atomic_store_8(&pStream->userStopped, 0);
11✔
699

700
  pStream->updateTime = taosGetTimestampMs();
11✔
701

702
  MND_STREAM_SET_LAST_TS(STM_EVENT_START_STREAM, pStream->updateTime);
11!
703

704
  STrans *pTrans = NULL;
11✔
705
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_START_NAME, &pTrans);
11✔
706
  if (pTrans == NULL || code) {
11!
707
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
708
    sdbRelease(pMnode->pSdb, pStream);
×
709
    return code;
×
710
  }
711

712
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
11✔
713
  if (code != TSDB_CODE_SUCCESS) {
11!
714
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
715
    sdbRelease(pMnode->pSdb, pStream);
×
716
    mndTransDrop(pTrans);
×
717
    return code;
×
718
  }
719

720
  code = mndTransPrepare(pMnode, pTrans);
11✔
721
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11!
722
    mstsError("trans:%d, failed to prepare start stream %s trans since %s", pTrans->id, pStream->name, tstrerror(code));
×
723
    sdbRelease(pMnode->pSdb, pStream);
×
724
    mndTransDrop(pTrans);
×
725
    return code;
×
726
  }
727

728
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->name, NULL, true, STREAM_ACT_DEPLOY);
11✔
729

730
  sdbRelease(pMnode->pSdb, pStream);
11✔
731
  mndTransDrop(pTrans);
11✔
732

733
  return TSDB_CODE_ACTION_IN_PROGRESS;
11✔
734
}
735

736

737
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
29✔
738
  SMnode     *pMnode = pReq->info.node;
29✔
739
  SStreamObj *pStream = NULL;
29✔
740
  int32_t     code = 0;
29✔
741

742
  SMDropStreamReq dropReq = {0};
29✔
743
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
29!
744
    mError("invalid drop stream msg recv, discarded");
×
745
    code = TSDB_CODE_INVALID_MSG;
×
746
    TAOS_RETURN(code);
×
747
  }
748

749
  mDebug("recv drop stream:%s msg", dropReq.name);
29!
750

751
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
29✔
752
  if (pStream == NULL || code != 0) {
29!
753
    if (dropReq.igNotExists) {
×
754
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
×
755
      sdbRelease(pMnode->pSdb, pStream);
×
756
      tFreeMDropStreamReq(&dropReq);
×
757
      return 0;
×
758
    } else {
759
      mError("stream:%s not exist failed to drop it", dropReq.name);
×
760
      tFreeMDropStreamReq(&dropReq);
×
761
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
762
    }
763
  }
764

765
  int64_t streamId = pStream->pCreate->streamId;
29✔
766

767
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
29✔
768
  if (code != 0) {
29✔
769
    mstsError("user %s failed to drop stream %s since %s", pReq->info.conn.user, dropReq.name, tstrerror(code));
10!
770
    sdbRelease(pMnode->pSdb, pStream);
10✔
771
    tFreeMDropStreamReq(&dropReq);
10✔
772
    return code;
10✔
773
  }
774

775
  if (pStream->pCreate->tsmaId != 0) {
19!
776
    mstsDebug("try to drop tsma related stream, tsmaId:%" PRIx64, pStream->pCreate->tsmaId);
×
777

778
    void    *pIter = NULL;
×
779
    SSmaObj *pSma = NULL;
×
780
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
781
    while (pIter) {
×
782
      if (pSma && pSma->uid == pStream->pCreate->tsmaId) {
×
783
        sdbRelease(pMnode->pSdb, pSma);
×
784
        sdbRelease(pMnode->pSdb, pStream);
×
785

786
        sdbCancelFetch(pMnode->pSdb, pIter);
×
787
        tFreeMDropStreamReq(&dropReq);
×
788
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
789

790
        mstsError("refused to drop tsma-related stream %s since tsma still exists", dropReq.name);
×
791
        TAOS_RETURN(code);
×
792
      }
793

794
      if (pSma) {
×
795
        sdbRelease(pMnode->pSdb, pSma);
×
796
      }
797

798
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
799
    }
800
  }
801

802
  mstsInfo("start to drop stream %s", pStream->pCreate->name);
19!
803

804
  pStream->updateTime = taosGetTimestampMs();
19✔
805

806
  atomic_store_8(&pStream->userDropped, 1);
19✔
807

808
  MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
19!
809

810
  msmUndeployStream(pMnode, streamId, pStream->pCreate->name);
19✔
811

812
  STrans *pTrans = NULL;
19✔
813
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, &pTrans);
19✔
814
  if (pTrans == NULL || code) {
19!
815
    mstsError("failed to drop stream %s since %s", dropReq.name, tstrerror(code));
×
816
    sdbRelease(pMnode->pSdb, pStream);
×
817
    tFreeMDropStreamReq(&dropReq);
×
818
    TAOS_RETURN(code);
×
819
  }
820

821
  // drop stream
822
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
19✔
823
  if (code) {
19!
824
    mstsError("trans:%d, failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
825
    sdbRelease(pMnode->pSdb, pStream);
×
826
    mndTransDrop(pTrans);
×
827
    tFreeMDropStreamReq(&dropReq);
×
828
    TAOS_RETURN(code);
×
829
  }
830

831
  code = mndTransPrepare(pMnode, pTrans);
19✔
832
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
19!
833
    mstsError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
834
    sdbRelease(pMnode->pSdb, pStream);
×
835
    mndTransDrop(pTrans);
×
836
    tFreeMDropStreamReq(&dropReq);
×
837
    TAOS_RETURN(code);
×
838
  }
839

840
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", pStream->pCreate->streamDB, NULL, 0);
19✔
841

842
  sdbRelease(pMnode->pSdb, pStream);
19✔
843
  mndTransDrop(pTrans);
19✔
844

845
  mstsDebug("drop stream %s half completed", dropReq.name);
19!
846
  code = TSDB_CODE_ACTION_IN_PROGRESS;
19✔
847

848
  tFreeMDropStreamReq(&dropReq);
19✔
849
  
850
  TAOS_RETURN(code);
19✔
851
}
852

853
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
284✔
854
  SMnode     *pMnode = pReq->info.node;
284✔
855
  SStreamObj *pStream = NULL;
284✔
856
  SStreamObj  streamObj = {0};
284✔
857
  int32_t     code = TSDB_CODE_SUCCESS;
284✔
858
  int32_t     lino = 0;
284✔
859
  STrans     *pTrans = NULL;
284✔
860
  uint64_t    streamId = 0;
284✔
861
  SCMCreateStreamReq* pCreate = NULL;
284✔
862

863
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
284!
864
    goto _OVER;
×
865
  }
866
  
867
#ifdef WINDOWS
868
  code = TSDB_CODE_MND_INVALID_PLATFORM;
869
  goto _OVER;
870
#endif
871

872
  pCreate = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
284!
873
  TSDB_CHECK_NULL(pCreate, code, lino, _OVER, terrno);
284!
874
  
875
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, pCreate);
284✔
876
  TSDB_CHECK_CODE(code, lino, _OVER);
284!
877

878
  streamId = pCreate->streamId;
284✔
879

880
  mstsInfo("start to create stream %s, sql:%s", pCreate->name, pCreate->sql);
284!
881

882
  int32_t snodeId = msmAssignRandomSnodeId(pMnode, streamId);
284✔
883
  if (!GOT_SNODE(snodeId)) {
284!
884
    code = terrno;
×
885
    TSDB_CHECK_CODE(code, lino, _OVER);
×
886
  }
887
  
888
  code = mndAcquireStream(pMnode, pCreate->name, &pStream);
284✔
889
  if (pStream != NULL && code == 0) {
284!
890
    if (pCreate->igExists) {
20!
891
      mstsInfo("stream %s already exist, ignore exist is set", pCreate->name);
×
892
    } else {
893
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
20✔
894
    }
895

896
    mndReleaseStream(pMnode, pStream);
20✔
897
    goto _OVER;
20✔
898
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
264!
899
    goto _OVER;
×
900
  }
901

902
  code = mndStreamValidateCreate(pMnode, pReq->info.conn.user, pCreate);
264✔
903
  TSDB_CHECK_CODE(code, lino, _OVER);
264✔
904

905
  mndStreamBuildObj(pMnode, &streamObj, pCreate, snodeId);
243✔
906
  pCreate = NULL;
243✔
907

908
  pStream = &streamObj;
243✔
909

910
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, &pTrans);
243✔
911
  if (pTrans == NULL || code) {
243!
912
    goto _OVER;
×
913
  }
914

915
  // create stb for stream
916
  if (TSDB_SUPER_TABLE == pStream->pCreate->outTblType && !pStream->pCreate->outStbExists) {
243✔
917
    pStream->pCreate->outStbUid = mndGenerateUid(pStream->pCreate->outTblName, strlen(pStream->pCreate->outTblName));
134✔
918
    code = mndStreamCreateOutStb(pMnode, pTrans, pStream->pCreate, pReq->info.conn.user);
134✔
919
    TSDB_CHECK_CODE(code, lino, _OVER);
134!
920
  }
921

922
  // add stream to trans
923
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
243✔
924
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
243!
925
    mstsError("failed to persist stream %s since %s", pStream->pCreate->name, tstrerror(code));
×
926
    goto _OVER;
×
927
  }
928

929
  // execute creation
930
  code = mndTransPrepare(pMnode, pTrans);
243✔
931
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
243!
932
    mstsError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
933
    goto _OVER;
×
934
  }
935
  code = TSDB_CODE_ACTION_IN_PROGRESS;
243✔
936

937
  auditRecord(pReq, pMnode->clusterId, "createStream", pStream->pCreate->streamDB, pStream->pCreate->name, pStream->pCreate->sql, strlen(pStream->pCreate->sql));
243✔
938

939
  MND_STREAM_SET_LAST_TS(STM_EVENT_CREATE_STREAM, taosGetTimestampMs());
392✔
940

941
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, true, STREAM_ACT_DEPLOY);
243✔
942

943
_OVER:
284✔
944

945
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
284!
946
    if (pStream && pStream->pCreate) {
41!
947
      mstsError("failed to create stream %s at line:%d since %s", pStream->pCreate->name, lino, tstrerror(code));
20!
948
    } else {
949
      mstsError("failed to create stream at line:%d since %s", lino, tstrerror(code));
21!
950
    }
951
  } else {
952
    mstsDebug("create stream %s half completed", pStream->pCreate ? pStream->pCreate->name : "unknown");
243!
953
  }
954

955
  tFreeSCMCreateStreamReq(pCreate);
284✔
956
  taosMemoryFreeClear(pCreate);
284!
957

958
  mndTransDrop(pTrans);
284✔
959
  tFreeStreamObj(&streamObj);
284✔
960

961
  return code;
284✔
962
}
963

964
static int32_t mndProcessRecalcStreamReq(SRpcMsg *pReq) {
21✔
965
  SMnode     *pMnode = pReq->info.node;
21✔
966
  SStreamObj *pStream = NULL;
21✔
967
  int32_t     code = 0;
21✔
968

969
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
21!
970
    return code;
×
971
  }
972

973
  SMRecalcStreamReq recalcReq = {0};
21✔
974
  if (tDeserializeSMRecalcStreamReq(pReq->pCont, pReq->contLen, &recalcReq) < 0) {
21!
975
    tFreeMRecalcStreamReq(&recalcReq);
×
976
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
977
  }
978

979
  code = mndAcquireStream(pMnode, recalcReq.name, &pStream);
21✔
980
  if (pStream == NULL || code != 0) {
21!
981
    mError("stream:%s not exist, failed to recalc stream", recalcReq.name);
×
982
    tFreeMRecalcStreamReq(&recalcReq);
×
983
    TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
984
  }
985

986
  int64_t streamId = pStream->pCreate->streamId;
21✔
987
  
988
  mstsInfo("start to recalc stream %s", recalcReq.name);
21!
989

990
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
21✔
991
  if (code != TSDB_CODE_SUCCESS) {
21✔
992
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
20!
993
    sdbRelease(pMnode->pSdb, pStream);
20✔
994
    tFreeMRecalcStreamReq(&recalcReq);
20✔
995
    return code;
20✔
996
  }
997

998
  if (atomic_load_8(&pStream->userDropped)) {
1!
999
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
1000
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
1001
    sdbRelease(pMnode->pSdb, pStream);
×
1002
    tFreeMRecalcStreamReq(&recalcReq);
×
1003
    return code;
×
1004
  }
1005

1006
  if (atomic_load_8(&pStream->userStopped)) {
1!
1007
    code = TSDB_CODE_MND_STREAM_STOPPED;
×
1008
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
1009
    sdbRelease(pMnode->pSdb, pStream);
×
1010
    tFreeMRecalcStreamReq(&recalcReq);
×
1011
    return code;
×
1012
  }
1013

1014
  if (WINDOW_TYPE_PERIOD == pStream->pCreate->triggerType) {
1!
1015
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1016
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
×
1017
    sdbRelease(pMnode->pSdb, pStream);
×
1018
    tFreeMRecalcStreamReq(&recalcReq);
×
1019
    return code;
×
1020
  }
1021

1022
  /*
1023
  pStream->updateTime = taosGetTimestampMs();
1024

1025
  STrans *pTrans = NULL;
1026
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RECALC_NAME, &pTrans);
1027
  if (pTrans == NULL || code) {
1028
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
1029
    sdbRelease(pMnode->pSdb, pStream);
1030
    return code;
1031
  }
1032

1033
  // stop stream
1034
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
1035
  if (code != TSDB_CODE_SUCCESS) {
1036
    sdbRelease(pMnode->pSdb, pStream);
1037
    mndTransDrop(pTrans);
1038
    return code;
1039
  }
1040

1041
  code = mndTransPrepare(pMnode, pTrans);
1042
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1043
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
1044
    sdbRelease(pMnode->pSdb, pStream);
1045
    mndTransDrop(pTrans);
1046
    return code;
1047
  }
1048
*/
1049

1050
  code = msmRecalcStream(pMnode, pStream->pCreate->streamId, &recalcReq.timeRange);
1✔
1051
  if (code != TSDB_CODE_SUCCESS) {
1!
1052
    sdbRelease(pMnode->pSdb, pStream);
×
1053
    tFreeMRecalcStreamReq(&recalcReq);
×
1054
    return code;
×
1055
  }
1056
  
1057
  char buf[128];
1058
  snprintf(buf, sizeof(buf), "start:%" PRId64 ", end:%" PRId64, recalcReq.timeRange.skey, recalcReq.timeRange.ekey);
1✔
1059
  auditRecord(pReq, pMnode->clusterId, "recalcStream", pStream->name, recalcReq.name, buf, strlen(buf));
1✔
1060

1061
  sdbRelease(pMnode->pSdb, pStream);
1✔
1062
  tFreeMRecalcStreamReq(&recalcReq);
1✔
1063
//  mndTransDrop(pTrans);
1064

1065
  return TSDB_CODE_SUCCESS;
1✔
1066
}
1067

1068

1069
int32_t mndInitStream(SMnode *pMnode) {
2,512✔
1070
  SSdbTable table = {
2,512✔
1071
      .sdbType = SDB_STREAM,
1072
      .keyType = SDB_KEY_BINARY,
1073
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
1074
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
1075
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
1076
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
1077
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
1078
  };
1079

1080
  if (!tsDisableStream) {
2,512!
1081
    mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
2,512✔
1082
    mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
2,512✔
1083
    mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessStartStreamReq);
2,512✔
1084
    mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessStopStreamReq);
2,512✔
1085
    mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);  
2,512✔
1086
    mndSetMsgHandle(pMnode, TDMT_MND_RECALC_STREAM, mndProcessRecalcStreamReq);
2,512✔
1087
  }
1088
  
1089
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
2,512✔
1090
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
2,512✔
1091
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
2,512✔
1092
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
2,512✔
1093
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndRetrieveStreamRecalculates);
2,512✔
1094
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndCancelGetNextStreamRecalculates);
2,512✔
1095

1096
  int32_t code = sdbSetTable(pMnode->pSdb, table);
2,512✔
1097
  if (code) {
2,512!
1098
    return code;
×
1099
  }
1100

1101
  //code = sdbSetTable(pMnode->pSdb, tableSeq);
1102
  return code;
2,512✔
1103
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc