• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.31
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndTrans.h"
23
#include "osMemory.h"
24
#include "parser.h"
25
#include "taoserror.h"
26
#include "tmisce.h"
27
#include "tname.h"
28

29
#define MND_STREAM_MAX_NUM 100000
30

31
typedef struct {
32
  int8_t placeHolder;  // // to fix windows compile error, define place holder
33
} SMStreamNodeCheckMsg;
34

35
static int32_t  mndNodeCheckSentinel = 0;
36
SStmRuntime  mStreamMgmt = {0};
37

38
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
39
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
41
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
42

43
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
44
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
50
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq);
51
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq);
52

53
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
54

55
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
56
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
57
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
58
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
59
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
60
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
61

62
void mndCleanupStream(SMnode *pMnode) {
1,929✔
63
  mDebug("try to clean up stream");
1,929✔
64
  
65
  msmHandleBecomeNotLeader(pMnode);
1,929✔
66
  
67
  mDebug("mnd stream runtime info cleanup");
1,929✔
68
}
1,929✔
69

70
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
332✔
71
  int32_t     code = 0;
332✔
72
  int32_t     lino = 0;
332✔
73
  SSdbRow    *pRow = NULL;
332✔
74
  SStreamObj *pStream = NULL;
332✔
75
  void       *buf = NULL;
332✔
76
  int8_t      sver = 0;
332✔
77
  int32_t     tlen;
78
  int32_t     dataPos = 0;
332✔
79

80
  code = sdbGetRawSoftVer(pRaw, &sver);
332✔
81
  TSDB_CHECK_CODE(code, lino, _over);
332!
82

83
  if (sver != MND_STREAM_VER_NUMBER) {
332!
84
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
85
    goto _over;
×
86
  }
87

88
  pRow = sdbAllocRow(sizeof(SStreamObj));
332✔
89
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
332!
90

91
  pStream = sdbGetRowObj(pRow);
332✔
92
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
332!
93

94
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
332!
95

96
  buf = taosMemoryMalloc(tlen + 1);
332!
97
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
332!
98

99
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
332!
100

101
  SDecoder decoder;
102
  tDecoderInit(&decoder, buf, tlen + 1);
332✔
103
  code = tDecodeSStreamObj(&decoder, pStream, sver);
332✔
104
  tDecoderClear(&decoder);
332✔
105

106
  if (code < 0) {
332!
107
    tFreeStreamObj(pStream);
×
108
  }
109

110
_over:
332✔
111
  taosMemoryFreeClear(buf);
332!
112

113
  if (code != TSDB_CODE_SUCCESS) {
332!
114
    char *p = (pStream == NULL || NULL == pStream->pCreate) ? "null" : pStream->pCreate->name;
×
115
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
116
    taosMemoryFreeClear(pRow);
×
117

118
    terrno = code;
×
119
    return NULL;
×
120
  } else {
121
    mTrace("stream:%s, decode from raw:%p, row:%p", pStream->pCreate->name, pRaw, pStream);
332!
122

123
    terrno = 0;
332✔
124
    return pRow;
332✔
125
  }
126
}
127

128
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
284✔
129
  mTrace("stream:%s, perform insert action", pStream->pCreate->name);
284!
130
  return 0;
284✔
131
}
132

133
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
332✔
134
  mInfo("stream:%s, perform delete action", pStream->pCreate->name);
332!
135
  tFreeStreamObj(pStream);
332✔
136
  return 0;
332✔
137
}
138

139
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
26✔
140
  mTrace("stream:%s, perform update action", pOldStream->pCreate->name);
26!
141

142
  atomic_store_32(&pOldStream->mainSnodeId, pNewStream->mainSnodeId);
26✔
143
  atomic_store_8(&pOldStream->userStopped, atomic_load_8(&pNewStream->userStopped));
26✔
144
  pOldStream->updateTime = pNewStream->updateTime;
26✔
145
  
146
  return 0;
26✔
147
}
148

149
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
762✔
150
  int32_t code = 0;
762✔
151
  SSdb   *pSdb = pMnode->pSdb;
762✔
152
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
762✔
153
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
762!
154
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
306✔
155
  }
156
  return code;
762✔
157
}
158

159
static bool mndStreamGetNameFromId(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
160
  SStreamObj* pStream = pObj;
×
161

162
  if (pStream->pCreate->streamId == *(int64_t*)p1) {
×
163
    strncpy((char*)p2, pStream->name, TSDB_STREAM_NAME_LEN);
×
164
    return false;
×
165
  }
166

167
  return true;
×
168
}
169

170
int32_t mndAcquireStreamById(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
×
171
  int32_t code = 0;
×
172
  SSdb   *pSdb = pMnode->pSdb;
×
173
  char streamName[TSDB_STREAM_NAME_LEN];
174
  streamName[0] = 0;
×
175
  
176
  sdbTraverse(pSdb, SDB_STREAM, mndStreamGetNameFromId, &streamId, streamName, NULL);
×
177
  if (streamName[0]) {
×
178
    (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
179
    if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
180
      code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
181
    }
182
  }
183
  
184
  return code;
×
185
}
186

187
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
364✔
188
  SSdb *pSdb = pMnode->pSdb;
364✔
189
  sdbRelease(pSdb, pStream);
364✔
190
}
364✔
191

192
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
193
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
194
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
195
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
196
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
197

198
static void mndStreamBuildObj(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate, int32_t snodeId) {
284✔
199
  int32_t     code = 0;
284✔
200

201
  pObj->pCreate = pCreate;
284✔
202
  strncpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
284✔
203
  pObj->mainSnodeId = snodeId;
284✔
204
  
205
  pObj->userDropped = 0;
284✔
206
  pObj->userStopped = 0;
284✔
207
  
208
  pObj->createTime = taosGetTimestampMs();
284✔
209
  pObj->updateTime = pObj->createTime;
284✔
210

211
  mstLogSStreamObj("create stream", pObj);
284✔
212
}
284✔
213

214
static int32_t mndStreamCreateOutStb(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream, const char *user) {
155✔
215
  SStbObj *pStb = NULL;
155✔
216
  SDbObj  *pDb = NULL;
155✔
217
  int32_t  code = 0;
155✔
218
  int32_t  lino = 0;
155✔
219

220
  SMCreateStbReq createReq = {0};
155✔
221
  TAOS_STRNCAT(createReq.name, pStream->outDB, TSDB_DB_FNAME_LEN);
155✔
222
  TAOS_STRNCAT(createReq.name, ".", 2);
155✔
223
  TAOS_STRNCAT(createReq.name,  pStream->outTblName, TSDB_TABLE_NAME_LEN);
155✔
224
  createReq.numOfColumns = taosArrayGetSize(pStream->outCols);
155✔
225
  createReq.numOfTags = pStream->outTags ? taosArrayGetSize(pStream->outTags) : 1;
155!
226
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
155✔
227
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
155!
228

229
  // build fields
230
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
789✔
231
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
634✔
232
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
634!
233
    SFieldWithOptions *pSrc = taosArrayGet(pStream->outCols, i);
634✔
234

235
    tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
634✔
236
    pField->flags = pSrc->flags;
634✔
237
    pField->type = pSrc->type;
634✔
238
    pField->bytes = pSrc->bytes;
634✔
239
    pField->compress = createDefaultColCmprByType(pField->type);
634✔
240
    if (IS_DECIMAL_TYPE(pField->type)) {
634!
241
      pField->typeMod = pSrc->typeMod;
×
242
      pField->flags |= COL_HAS_TYPE_MOD;
×
243
    }
244
  }
245

246
  if (NULL == pStream->outTags) {
155!
247
    createReq.numOfTags = 1;
×
248
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
249
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
250

251
    // build tags
252
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
253
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
254

255
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
256
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
257
    pField->flags = 0;
×
258
    pField->bytes = 8;
×
259
  } else {
260
    createReq.numOfTags = taosArrayGetSize(pStream->outTags);
155✔
261
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
155✔
262
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
155!
263

264
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
432✔
265
      SField *pField = taosArrayGet(createReq.pTags, i);
277✔
266
      if (pField == NULL) {
277!
267
        continue;
×
268
      }
269

270
      TAOS_FIELD_E *pSrc = taosArrayGet(pStream->outTags, i);
277✔
271
      pField->bytes = pSrc->bytes;
277✔
272
      pField->flags = 0;
277✔
273
      pField->type = pSrc->type;
277✔
274
      tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
277✔
275
    }
276
  }
277

278
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
155!
279
    goto _OVER;
×
280
  }
281

282
  pStb = mndAcquireStb(pMnode, createReq.name);
155✔
283
  if (pStb != NULL) {
155!
284
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
285
    goto _OVER;
×
286
  }
287

288
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
155✔
289
  if (pDb == NULL) {
155!
290
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
291
    goto _OVER;
×
292
  }
293

294
  int32_t numOfStbs = -1;
155✔
295
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
155!
296
    goto _OVER;
×
297
  }
298

299
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
155!
300
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
301
    goto _OVER;
×
302
  }
303

304
  SStbObj stbObj = {0};
155✔
305

306
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
155!
307
    goto _OVER;
×
308
  }
309

310
  stbObj.uid = pStream->outStbUid;
155✔
311

312
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
155!
313
    mndFreeStb(&stbObj);
×
314
    goto _OVER;
×
315
  }
316

317
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->outTblName, createReq.numOfColumns);
155✔
318

319
  tFreeSMCreateStbReq(&createReq);
155✔
320
  mndFreeStb(&stbObj);
155✔
321
  mndReleaseStb(pMnode, pStb);
155✔
322
  mndReleaseDb(pMnode, pDb);
155✔
323
  return code;
155✔
324

325
_OVER:
×
326
  tFreeSMCreateStbReq(&createReq);
×
327
  mndReleaseStb(pMnode, pStb);
×
328
  mndReleaseDb(pMnode, pDb);
×
329

330
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->outTblName, lino,
×
331
         tstrerror(code));
332
  return code;
×
333
}
334

335
static int32_t mndStreamValidateCreate(SMnode *pMnode, char* pUser, SCMCreateStreamReq* pCreate) {
305✔
336
  int32_t code = 0, lino = 0;
305✔
337
  int64_t streamId = pCreate->streamId;
305✔
338

339
  if (pCreate->streamDB) {
305!
340
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->streamDB);
305✔
341
    if (code) {
305✔
342
      mstsError("user %s failed to create stream %s in db %s since %s", pUser, pCreate->name, pCreate->streamDB, tstrerror(code));
20!
343
    }
344
    TSDB_CHECK_CODE(code, lino, _OVER);
305✔
345
  }
346

347
  if (pCreate->triggerDB) {
285✔
348
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, pCreate->triggerDB);
284✔
349
    if (code) {
284✔
350
      mstsError("user %s failed to create stream %s using trigger db %s since %s", pUser, pCreate->name, pCreate->triggerDB, tstrerror(code));
1!
351
    }
352
    TSDB_CHECK_CODE(code, lino, _OVER);
284✔
353
  }
354

355
  if (pCreate->calcDB) {
284!
356
    int32_t dbNum = taosArrayGetSize(pCreate->calcDB);
284✔
357
    for (int32_t i = 0; i < dbNum; ++i) {
567✔
358
      char* calcDB = taosArrayGetP(pCreate->calcDB, i);
283✔
359
      code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, calcDB);
283✔
360
      if (code) {
283!
361
        mstsError("user %s failed to create stream %s using calcDB %s since %s", pUser, pCreate->name, calcDB, tstrerror(code));
×
362
      }
363
      TSDB_CHECK_CODE(code, lino, _OVER);
283!
364
    }
365
  }
366

367
  if (pCreate->outDB) {
284✔
368
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->outDB);
283✔
369
    if (code) {
283!
370
      mstsError("user %s failed to create stream %s using out db %s since %s", pUser, pCreate->name, pCreate->outDB, tstrerror(code));
×
371
    }
372
    TSDB_CHECK_CODE(code, lino, _OVER);
283!
373
  }
374

375
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
284✔
376
  if (streamNum > MND_STREAM_MAX_NUM) {
284!
377
    code = TSDB_CODE_MND_TOO_MANY_STREAMS;
×
378
    mstsError("failed to create stream %s since %s, stream number:%d", pCreate->name, tstrerror(code), streamNum);
×
379
    return code;
×
380
  }
381

382
_OVER:
284✔
383

384
  return code;
305✔
385
}
386

387
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,912✔
388
  SSdb   *pSdb = pMnode->pSdb;
1,912✔
389
  void   *pIter = NULL;
1,912✔
390
  int32_t code = 0;
1,912✔
391

392
  while (1) {
97✔
393
    SStreamObj *pStream = NULL;
2,009✔
394
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
2,009✔
395
    if (pIter == NULL) break;
2,009✔
396

397
    if (0 == strcmp(pStream->pCreate->streamDB, pDb->name)) {
97✔
398
      mInfo("start to drop stream %s in db %s", pStream->pCreate->name, pDb->name);
3!
399
      
400
      pStream->updateTime = taosGetTimestampMs();
3✔
401
      
402
      atomic_store_8(&pStream->userDropped, 1);
3✔
403
      
404
      MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
3!
405
      
406
      msmUndeployStream(pMnode, pStream->pCreate->streamId, pStream->pCreate->name);
3✔
407
      
408
      // drop stream
409
      code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
3✔
410
      if (code) {
3!
411
        mError("drop db trans:%d failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
412
        sdbRelease(pSdb, pStream);
×
413
        sdbCancelFetch(pSdb, pIter);
×
414
        TAOS_RETURN(code);
×
415
      }
416
    }
417

418
    sdbRelease(pSdb, pStream);
97✔
419
  }
420

421
  return 0;
1,912✔
422
}
423

424
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
647✔
425
  SMnode     *pMnode = pReq->info.node;
647✔
426
  SSdb       *pSdb = pMnode->pSdb;
647✔
427
  int32_t     numOfRows = 0;
647✔
428
  SStreamObj *pStream = NULL;
647✔
429
  int32_t     code = 0;
647✔
430

431
  while (numOfRows < rows) {
1,952!
432
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
1,952✔
433
    if (pShow->pIter == NULL) break;
1,952✔
434

435
    code = mstSetStreamAttrResBlock(pMnode, pStream, pBlock, numOfRows);
1,305✔
436
    if (code == 0) {
1,305!
437
      numOfRows++;
1,305✔
438
    }
439
    sdbRelease(pSdb, pStream);
1,305✔
440
  }
441

442
  pShow->numOfRows += numOfRows;
647✔
443
  return numOfRows;
647✔
444
}
445

446
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
447
  SSdb *pSdb = pMnode->pSdb;
×
448
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
449
}
×
450

451
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
384✔
452
  SMnode     *pMnode = pReq->info.node;
384✔
453
  SSdb       *pSdb = pMnode->pSdb;
384✔
454
  int32_t     numOfRows = 0;
384✔
455
  SStreamObj *pStream = NULL;
384✔
456
  int32_t     code = 0;
384✔
457

458
  while (numOfRows < rowsCapacity) {
3,393✔
459
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
3,328✔
460
    if (pShow->pIter == NULL) {
3,328✔
461
      break;
319✔
462
    }
463

464
    code = mstSetStreamTasksResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
3,009✔
465

466
    sdbRelease(pSdb, pStream);
3,009✔
467
  }
468

469
  pShow->numOfRows += numOfRows;
384✔
470
  return numOfRows;
384✔
471
}
472

473
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
474
  SSdb *pSdb = pMnode->pSdb;
×
475
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
476
}
×
477

478
static int32_t mndRetrieveStreamRecalculates(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
1✔
479
  SMnode     *pMnode = pReq->info.node;
1✔
480
  SSdb       *pSdb = pMnode->pSdb;
1✔
481
  int32_t     numOfRows = 0;
1✔
482
  SStreamObj *pStream = NULL;
1✔
483
  int32_t     code = 0;
1✔
484

485
  while (numOfRows < rowsCapacity) {
2!
486
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
2✔
487
    if (pShow->pIter == NULL) {
2✔
488
      break;
1✔
489
    }
490

491
    code = mstSetStreamRecalculatesResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
1✔
492

493
    sdbRelease(pSdb, pStream);
1✔
494
  }
495

496
  pShow->numOfRows += numOfRows;
1✔
497
  return numOfRows;
1✔
498
}
499

500
static void mndCancelGetNextStreamRecalculates(SMnode *pMnode, void *pIter) {
×
501
  SSdb *pSdb = pMnode->pSdb;
×
502
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
503
}
×
504

505

506
static bool mndStreamUpdateTagsFlag(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
3,573✔
507
  SStreamObj *pStream = pObj;
3,573✔
508
  if (atomic_load_8(&pStream->userDropped)) {
3,573!
509
    return true;
×
510
  }
511

512
  if (TSDB_SUPER_TABLE != pStream->pCreate->triggerTblType && 
3,573✔
513
      TSDB_CHILD_TABLE != pStream->pCreate->triggerTblType && 
1,375✔
514
      TSDB_VIRTUAL_CHILD_TABLE != pStream->pCreate->triggerTblType) {
552✔
515
    return true;
277✔
516
  }
517

518
  if (pStream->pCreate->triggerTblSuid != *(uint64_t*)p1) {
3,296✔
519
    return true;
3,265✔
520
  }
521

522
  if (NULL == pStream->pCreate->partitionCols) {
31✔
523
    return true;
3✔
524
  }
525

526
  SNodeList* pList = NULL;
28✔
527
  int32_t code = nodesStringToList(pStream->pCreate->partitionCols, &pList);
28✔
528
  if (code) {
28!
529
    nodesDestroyList(pList);
×
530
    mstError("partitionCols [%s] nodesStringToList failed with error:%s", (char*)pStream->pCreate->partitionCols, tstrerror(code));
×
531
    return true;
×
532
  }
533

534
  SSchema* pTags = (SSchema*)p2;
28✔
535
  int32_t* tagNum = (int32_t*)p3;
28✔
536

537
  SNode* pNode = NULL;
28✔
538
  FOREACH(pNode, pList) {
84!
539
    SColumnNode* pCol = (SColumnNode*)pNode;
56✔
540
    for (int32_t i = 0; i < *tagNum; ++i) {
197✔
541
      if (pCol->colId == pTags[i].colId) {
172✔
542
        pTags[i].flags |= COL_REF_BY_STM;
31✔
543
        break;
31✔
544
      }
545
    }
546
  }
547

548
  nodesDestroyList(pList);
28✔
549
  
550
  return true;
28✔
551
}
552

553

554
void mndStreamUpdateTagsRefFlag(SMnode *pMnode, int64_t suid, SSchema* pTags, int32_t tagNum) {
60,335✔
555
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
60,335✔
556
  if (streamNum <= 0) {
60,335✔
557
    return;
60,021✔
558
  }
559

560
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mndStreamUpdateTagsFlag, &suid, pTags, &tagNum);
314✔
561
}
562

563
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) {
21✔
564
  SMnode     *pMnode = pReq->info.node;
21✔
565
  SStreamObj *pStream = NULL;
21✔
566
  int32_t     code = 0;
21✔
567

568
  SMPauseStreamReq pauseReq = {0};
21✔
569
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
21!
570
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
571
  }
572

573
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
21✔
574
  if (pStream == NULL || code != 0) {
21!
575
    if (pauseReq.igNotExists) {
×
576
      mInfo("stream:%s, not exist, not stop stream", pauseReq.name);
×
577
      taosMemoryFree(pauseReq.name);
×
578
      return 0;
×
579
    } else {
580
      mError("stream:%s not exist, failed to stop stream", pauseReq.name);
×
581
      taosMemoryFree(pauseReq.name);
×
582
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
583
    }
584
  }
585

586
  taosMemoryFree(pauseReq.name);
21!
587

588
  int64_t streamId = pStream->pCreate->streamId;
21✔
589
  
590
  mstsInfo("start to stop stream %s", pStream->name);
21!
591

592
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
21✔
593
  if (code != TSDB_CODE_SUCCESS) {
21✔
594
    mstsError("user %s failed to stop stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
10!
595
    sdbRelease(pMnode->pSdb, pStream);
10✔
596
    return code;
10✔
597
  }
598

599
  if (atomic_load_8(&pStream->userDropped)) {
11!
600
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
601
    mstsError("user %s failed to stop stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
602
    sdbRelease(pMnode->pSdb, pStream);
×
603
    return code;
×
604
  }
605

606
  STrans *pTrans = NULL;
11✔
607
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, &pTrans);
11✔
608
  if (pTrans == NULL || code) {
11!
609
    mstsError("failed to stop stream %s since %s", pStream->name, tstrerror(code));
×
610
    sdbRelease(pMnode->pSdb, pStream);
×
611
    return code;
×
612
  }
613

614
  pStream->updateTime = taosGetTimestampMs();
11✔
615

616
  atomic_store_8(&pStream->userStopped, 1);
11✔
617

618
  MND_STREAM_SET_LAST_TS(STM_EVENT_STOP_STREAM, pStream->updateTime);
11!
619

620
  msmUndeployStream(pMnode, streamId, pStream->name);
11✔
621

622
  // stop stream
623
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
11✔
624
  if (code != TSDB_CODE_SUCCESS) {
11!
625
    sdbRelease(pMnode->pSdb, pStream);
×
626
    mndTransDrop(pTrans);
×
627
    return code;
×
628
  }
629

630
  code = mndTransPrepare(pMnode, pTrans);
11✔
631
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11!
632
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
×
633
    sdbRelease(pMnode->pSdb, pStream);
×
634
    mndTransDrop(pTrans);
×
635
    return code;
×
636
  }
637

638
  sdbRelease(pMnode->pSdb, pStream);
11✔
639
  mndTransDrop(pTrans);
11✔
640

641
  return TSDB_CODE_ACTION_IN_PROGRESS;
11✔
642
}
643

644

645
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq) {
21✔
646
  SMnode     *pMnode = pReq->info.node;
21✔
647
  SStreamObj *pStream = NULL;
21✔
648
  int32_t     code = 0;
21✔
649

650
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
21!
651
    return code;
×
652
  }
653

654
  SMResumeStreamReq resumeReq = {0};
21✔
655
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
21!
656
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
657
  }
658

659
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
21✔
660
  if (pStream == NULL || code != 0) {
21!
661
    if (resumeReq.igNotExists) {
×
662
      mInfo("stream:%s not exist, not start stream", resumeReq.name);
×
663
      taosMemoryFree(resumeReq.name);
×
664
      sdbRelease(pMnode->pSdb, pStream);
×
665
      return 0;
×
666
    } else {
667
      mError("stream:%s not exist, failed to start stream", resumeReq.name);
×
668
      taosMemoryFree(resumeReq.name);
×
669
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
670
    }
671
  }
672

673
  taosMemoryFree(resumeReq.name);
21!
674

675
  int64_t streamId = pStream->pCreate->streamId;
21✔
676

677
  mstsInfo("start to start stream %s from stopped", pStream->name);
21!
678

679
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
21✔
680
  if (code != TSDB_CODE_SUCCESS) {
21✔
681
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
10!
682
    sdbRelease(pMnode->pSdb, pStream);
10✔
683
    return code;
10✔
684
  }
685

686
  if (atomic_load_8(&pStream->userDropped)) {
11!
687
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
688
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
689
    sdbRelease(pMnode->pSdb, pStream);
×
690
    return code;
×
691
  }
692

693
  if (0 == atomic_load_8(&pStream->userStopped)) {
11!
694
    code = TSDB_CODE_MND_STREAM_NOT_STOPPED;
×
695
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
696
    sdbRelease(pMnode->pSdb, pStream);
×
697
    return code;
×
698
  }
699
  
700
  atomic_store_8(&pStream->userStopped, 0);
11✔
701

702
  pStream->updateTime = taosGetTimestampMs();
11✔
703

704
  MND_STREAM_SET_LAST_TS(STM_EVENT_START_STREAM, pStream->updateTime);
11!
705

706
  STrans *pTrans = NULL;
11✔
707
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_START_NAME, &pTrans);
11✔
708
  if (pTrans == NULL || code) {
11!
709
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
710
    sdbRelease(pMnode->pSdb, pStream);
×
711
    return code;
×
712
  }
713

714
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
11✔
715
  if (code != TSDB_CODE_SUCCESS) {
11!
716
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
717
    sdbRelease(pMnode->pSdb, pStream);
×
718
    mndTransDrop(pTrans);
×
719
    return code;
×
720
  }
721

722
  code = mndTransPrepare(pMnode, pTrans);
11✔
723
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11!
724
    mstsError("trans:%d, failed to prepare start stream %s trans since %s", pTrans->id, pStream->name, tstrerror(code));
×
725
    sdbRelease(pMnode->pSdb, pStream);
×
726
    mndTransDrop(pTrans);
×
727
    return code;
×
728
  }
729

730
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->name, NULL, true, STREAM_ACT_DEPLOY);
11✔
731

732
  sdbRelease(pMnode->pSdb, pStream);
11✔
733
  mndTransDrop(pTrans);
11✔
734

735
  return TSDB_CODE_ACTION_IN_PROGRESS;
11✔
736
}
737

738

739
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
29✔
740
  SMnode     *pMnode = pReq->info.node;
29✔
741
  SStreamObj *pStream = NULL;
29✔
742
  int32_t     code = 0;
29✔
743

744
  SMDropStreamReq dropReq = {0};
29✔
745
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
29!
746
    mError("invalid drop stream msg recv, discarded");
×
747
    code = TSDB_CODE_INVALID_MSG;
×
748
    TAOS_RETURN(code);
×
749
  }
750

751
  mDebug("recv drop stream:%s msg", dropReq.name);
29!
752

753
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
29✔
754
  if (pStream == NULL || code != 0) {
29!
755
    if (dropReq.igNotExists) {
×
756
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
×
757
      sdbRelease(pMnode->pSdb, pStream);
×
758
      tFreeMDropStreamReq(&dropReq);
×
759
      return 0;
×
760
    } else {
761
      mError("stream:%s not exist failed to drop it", dropReq.name);
×
762
      tFreeMDropStreamReq(&dropReq);
×
763
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
764
    }
765
  }
766

767
  int64_t streamId = pStream->pCreate->streamId;
29✔
768

769
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
29✔
770
  if (code != 0) {
29✔
771
    mstsError("user %s failed to drop stream %s since %s", pReq->info.conn.user, dropReq.name, tstrerror(code));
10!
772
    sdbRelease(pMnode->pSdb, pStream);
10✔
773
    tFreeMDropStreamReq(&dropReq);
10✔
774
    return code;
10✔
775
  }
776

777
  if (pStream->pCreate->tsmaId != 0) {
19!
778
    mstsDebug("try to drop tsma related stream, tsmaId:%" PRIx64, pStream->pCreate->tsmaId);
×
779

780
    void    *pIter = NULL;
×
781
    SSmaObj *pSma = NULL;
×
782
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
783
    while (pIter) {
×
784
      if (pSma && pSma->uid == pStream->pCreate->tsmaId) {
×
785
        sdbRelease(pMnode->pSdb, pSma);
×
786
        sdbRelease(pMnode->pSdb, pStream);
×
787

788
        sdbCancelFetch(pMnode->pSdb, pIter);
×
789
        tFreeMDropStreamReq(&dropReq);
×
790
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
791

792
        mstsError("refused to drop tsma-related stream %s since tsma still exists", dropReq.name);
×
793
        TAOS_RETURN(code);
×
794
      }
795

796
      if (pSma) {
×
797
        sdbRelease(pMnode->pSdb, pSma);
×
798
      }
799

800
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
801
    }
802
  }
803

804
  mstsInfo("start to drop stream %s", pStream->pCreate->name);
19!
805

806
  pStream->updateTime = taosGetTimestampMs();
19✔
807

808
  atomic_store_8(&pStream->userDropped, 1);
19✔
809

810
  MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
19!
811

812
  msmUndeployStream(pMnode, streamId, pStream->pCreate->name);
19✔
813

814
  STrans *pTrans = NULL;
19✔
815
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, &pTrans);
19✔
816
  if (pTrans == NULL || code) {
19!
817
    mstsError("failed to drop stream %s since %s", dropReq.name, tstrerror(code));
×
818
    sdbRelease(pMnode->pSdb, pStream);
×
819
    tFreeMDropStreamReq(&dropReq);
×
820
    TAOS_RETURN(code);
×
821
  }
822

823
  // drop stream
824
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
19✔
825
  if (code) {
19!
826
    mstsError("trans:%d, failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
827
    sdbRelease(pMnode->pSdb, pStream);
×
828
    mndTransDrop(pTrans);
×
829
    tFreeMDropStreamReq(&dropReq);
×
830
    TAOS_RETURN(code);
×
831
  }
832

833
  code = mndTransPrepare(pMnode, pTrans);
19✔
834
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
19!
835
    mstsError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
836
    sdbRelease(pMnode->pSdb, pStream);
×
837
    mndTransDrop(pTrans);
×
838
    tFreeMDropStreamReq(&dropReq);
×
839
    TAOS_RETURN(code);
×
840
  }
841

842
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", pStream->pCreate->streamDB, NULL, 0);
19✔
843

844
  sdbRelease(pMnode->pSdb, pStream);
19✔
845
  mndTransDrop(pTrans);
19✔
846

847
  mstsDebug("drop stream %s half completed", dropReq.name);
19!
848
  code = TSDB_CODE_ACTION_IN_PROGRESS;
19✔
849

850
  tFreeMDropStreamReq(&dropReq);
19✔
851
  
852
  TAOS_RETURN(code);
19✔
853
}
854

855
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
335✔
856
  SMnode     *pMnode = pReq->info.node;
335✔
857
  SStreamObj *pStream = NULL;
335✔
858
  SStreamObj  streamObj = {0};
335✔
859
  int32_t     code = TSDB_CODE_SUCCESS;
335✔
860
  int32_t     lino = 0;
335✔
861
  STrans     *pTrans = NULL;
335✔
862
  uint64_t    streamId = 0;
335✔
863
  SCMCreateStreamReq* pCreate = NULL;
335✔
864

865
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
335!
866
    goto _OVER;
×
867
  }
868
  
869
#ifdef WINDOWS
870
  code = TSDB_CODE_MND_INVALID_PLATFORM;
871
  goto _OVER;
872
#endif
873

874
  pCreate = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
335!
875
  TSDB_CHECK_NULL(pCreate, code, lino, _OVER, terrno);
335!
876
  
877
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, pCreate);
335✔
878
  TSDB_CHECK_CODE(code, lino, _OVER);
335!
879

880
  streamId = pCreate->streamId;
335✔
881

882
  mstsInfo("start to create stream %s, sql:%s", pCreate->name, pCreate->sql);
335!
883

884
  int32_t snodeId = msmAssignRandomSnodeId(pMnode, streamId);
335✔
885
  if (!GOT_SNODE(snodeId)) {
335✔
886
    code = terrno;
10✔
887
    TSDB_CHECK_CODE(code, lino, _OVER);
10!
888
  }
889
  
890
  code = mndAcquireStream(pMnode, pCreate->name, &pStream);
325✔
891
  if (pStream != NULL && code == 0) {
325!
892
    if (pCreate->igExists) {
20!
893
      mstsInfo("stream %s already exist, ignore exist is set", pCreate->name);
×
894
    } else {
895
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
20✔
896
    }
897

898
    mndReleaseStream(pMnode, pStream);
20✔
899
    goto _OVER;
20✔
900
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
305!
901
    goto _OVER;
×
902
  }
903

904
  code = mndStreamValidateCreate(pMnode, pReq->info.conn.user, pCreate);
305✔
905
  TSDB_CHECK_CODE(code, lino, _OVER);
305✔
906

907
  mndStreamBuildObj(pMnode, &streamObj, pCreate, snodeId);
284✔
908
  pCreate = NULL;
284✔
909

910
  pStream = &streamObj;
284✔
911

912
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, &pTrans);
284✔
913
  if (pTrans == NULL || code) {
284!
914
    goto _OVER;
×
915
  }
916

917
  // create stb for stream
918
  if (TSDB_SUPER_TABLE == pStream->pCreate->outTblType && !pStream->pCreate->outStbExists) {
284✔
919
    pStream->pCreate->outStbUid = mndGenerateUid(pStream->pCreate->outTblName, strlen(pStream->pCreate->outTblName));
155✔
920
    code = mndStreamCreateOutStb(pMnode, pTrans, pStream->pCreate, pReq->info.conn.user);
155✔
921
    TSDB_CHECK_CODE(code, lino, _OVER);
155!
922
  }
923

924
  // add stream to trans
925
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
284✔
926
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
284!
927
    mstsError("failed to persist stream %s since %s", pStream->pCreate->name, tstrerror(code));
×
928
    goto _OVER;
×
929
  }
930

931
  // execute creation
932
  code = mndTransPrepare(pMnode, pTrans);
284✔
933
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
284!
934
    mstsError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
935
    goto _OVER;
×
936
  }
937
  code = TSDB_CODE_ACTION_IN_PROGRESS;
284✔
938

939
  auditRecord(pReq, pMnode->clusterId, "createStream", pStream->pCreate->streamDB, pStream->pCreate->name, pStream->pCreate->sql, strlen(pStream->pCreate->sql));
284✔
940

941
  MND_STREAM_SET_LAST_TS(STM_EVENT_CREATE_STREAM, taosGetTimestampMs());
446✔
942

943
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, true, STREAM_ACT_DEPLOY);
284✔
944

945
_OVER:
335✔
946

947
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
335!
948
    if (pStream && pStream->pCreate) {
51!
949
      mstsError("failed to create stream %s at line:%d since %s", pStream->pCreate->name, lino, tstrerror(code));
20!
950
    } else {
951
      mstsError("failed to create stream at line:%d since %s", lino, tstrerror(code));
31!
952
    }
953
  } else {
954
    mstsDebug("create stream %s half completed", pStream->pCreate ? pStream->pCreate->name : "unknown");
284!
955
  }
956

957
  tFreeSCMCreateStreamReq(pCreate);
335✔
958
  taosMemoryFreeClear(pCreate);
335!
959

960
  mndTransDrop(pTrans);
335✔
961
  tFreeStreamObj(&streamObj);
335✔
962

963
  return code;
335✔
964
}
965

966
static int32_t mndProcessRecalcStreamReq(SRpcMsg *pReq) {
21✔
967
  SMnode     *pMnode = pReq->info.node;
21✔
968
  SStreamObj *pStream = NULL;
21✔
969
  int32_t     code = 0;
21✔
970

971
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
21!
972
    return code;
×
973
  }
974

975
  SMRecalcStreamReq recalcReq = {0};
21✔
976
  if (tDeserializeSMRecalcStreamReq(pReq->pCont, pReq->contLen, &recalcReq) < 0) {
21!
977
    tFreeMRecalcStreamReq(&recalcReq);
×
978
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
979
  }
980

981
  code = mndAcquireStream(pMnode, recalcReq.name, &pStream);
21✔
982
  if (pStream == NULL || code != 0) {
21!
983
    mError("stream:%s not exist, failed to recalc stream", recalcReq.name);
×
984
    tFreeMRecalcStreamReq(&recalcReq);
×
985
    TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
986
  }
987

988
  int64_t streamId = pStream->pCreate->streamId;
21✔
989
  
990
  mstsInfo("start to recalc stream %s", recalcReq.name);
21!
991

992
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
21✔
993
  if (code != TSDB_CODE_SUCCESS) {
21✔
994
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
20!
995
    sdbRelease(pMnode->pSdb, pStream);
20✔
996
    tFreeMRecalcStreamReq(&recalcReq);
20✔
997
    return code;
20✔
998
  }
999

1000
  if (atomic_load_8(&pStream->userDropped)) {
1!
1001
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
1002
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
1003
    sdbRelease(pMnode->pSdb, pStream);
×
1004
    tFreeMRecalcStreamReq(&recalcReq);
×
1005
    return code;
×
1006
  }
1007

1008
  if (atomic_load_8(&pStream->userStopped)) {
1!
1009
    code = TSDB_CODE_MND_STREAM_STOPPED;
×
1010
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
1011
    sdbRelease(pMnode->pSdb, pStream);
×
1012
    tFreeMRecalcStreamReq(&recalcReq);
×
1013
    return code;
×
1014
  }
1015

1016
  if (WINDOW_TYPE_PERIOD == pStream->pCreate->triggerType) {
1!
1017
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1018
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
×
1019
    sdbRelease(pMnode->pSdb, pStream);
×
1020
    tFreeMRecalcStreamReq(&recalcReq);
×
1021
    return code;
×
1022
  }
1023

1024
  /*
1025
  pStream->updateTime = taosGetTimestampMs();
1026

1027
  STrans *pTrans = NULL;
1028
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RECALC_NAME, &pTrans);
1029
  if (pTrans == NULL || code) {
1030
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
1031
    sdbRelease(pMnode->pSdb, pStream);
1032
    return code;
1033
  }
1034

1035
  // stop stream
1036
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
1037
  if (code != TSDB_CODE_SUCCESS) {
1038
    sdbRelease(pMnode->pSdb, pStream);
1039
    mndTransDrop(pTrans);
1040
    return code;
1041
  }
1042

1043
  code = mndTransPrepare(pMnode, pTrans);
1044
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1045
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
1046
    sdbRelease(pMnode->pSdb, pStream);
1047
    mndTransDrop(pTrans);
1048
    return code;
1049
  }
1050
*/
1051

1052
  code = msmRecalcStream(pMnode, pStream->pCreate->streamId, &recalcReq.timeRange);
1✔
1053
  if (code != TSDB_CODE_SUCCESS) {
1!
1054
    sdbRelease(pMnode->pSdb, pStream);
×
1055
    tFreeMRecalcStreamReq(&recalcReq);
×
1056
    return code;
×
1057
  }
1058
  
1059
  char buf[128];
1060
  snprintf(buf, sizeof(buf), "start:%" PRId64 ", end:%" PRId64, recalcReq.timeRange.skey, recalcReq.timeRange.ekey);
1✔
1061
  auditRecord(pReq, pMnode->clusterId, "recalcStream", pStream->name, recalcReq.name, buf, strlen(buf));
1✔
1062

1063
  sdbRelease(pMnode->pSdb, pStream);
1✔
1064
  tFreeMRecalcStreamReq(&recalcReq);
1✔
1065
//  mndTransDrop(pTrans);
1066

1067
  return TSDB_CODE_SUCCESS;
1✔
1068
}
1069

1070

1071
int32_t mndInitStream(SMnode *pMnode) {
1,929✔
1072
  SSdbTable table = {
1,929✔
1073
      .sdbType = SDB_STREAM,
1074
      .keyType = SDB_KEY_BINARY,
1075
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
1076
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
1077
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
1078
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
1079
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
1080
  };
1081

1082
  if (!tsDisableStream) {
1,929!
1083
    mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
1,929✔
1084
    mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
1,929✔
1085
    mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessStartStreamReq);
1,929✔
1086
    mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessStopStreamReq);
1,929✔
1087
    mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);  
1,929✔
1088
    mndSetMsgHandle(pMnode, TDMT_MND_RECALC_STREAM, mndProcessRecalcStreamReq);
1,929✔
1089
  }
1090
  
1091
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
1,929✔
1092
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
1,929✔
1093
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
1,929✔
1094
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
1,929✔
1095
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndRetrieveStreamRecalculates);
1,929✔
1096
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndCancelGetNextStreamRecalculates);
1,929✔
1097

1098
  int32_t code = sdbSetTable(pMnode->pSdb, table);
1,929✔
1099
  if (code) {
1,929!
1100
    return code;
×
1101
  }
1102

1103
  //code = sdbSetTable(pMnode->pSdb, tableSeq);
1104
  return code;
1,929✔
1105
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc