• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4797

16 Oct 2025 01:24AM UTC coverage: 61.083% (+0.2%) from 60.915%
#4797

push

travis-ci

web-flow
Merge 9f5f33536 into 19574fe21

155292 of 324369 branches covered (47.88%)

Branch coverage included in aggregate %.

79 of 100 new or added lines in 19 files covered. (79.0%)

2515 existing lines in 105 files now uncovered.

207484 of 269534 relevant lines covered (76.98%)

126629055.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.48
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndTrans.h"
23
#include "osMemory.h"
24
#include "parser.h"
25
#include "taoserror.h"
26
#include "tmisce.h"
27
#include "tname.h"
28

29
#define MND_STREAM_MAX_NUM 100000
30

31
typedef struct {
32
  int8_t placeHolder;  // // to fix windows compile error, define place holder
33
} SMStreamNodeCheckMsg;
34

35
static int32_t  mndNodeCheckSentinel = 0;
36
SStmRuntime  mStreamMgmt = {0};
37

38
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
39
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
41
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
42

43
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
44
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
50
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq);
51
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq);
52

53
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
54

55
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
56
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
57
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
58
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
59
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
60
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
61

62
void mndCleanupStream(SMnode *pMnode) {
553,765✔
63
  mDebug("try to clean up stream");
553,765✔
64
  
65
  msmHandleBecomeNotLeader(pMnode);
553,765✔
66
  
67
  mDebug("mnd stream runtime info cleanup");
553,765✔
68
}
553,765✔
69

70
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
201,627✔
71
  int32_t     code = 0;
201,627✔
72
  int32_t     lino = 0;
201,627✔
73
  SSdbRow    *pRow = NULL;
201,627✔
74
  SStreamObj *pStream = NULL;
201,627✔
75
  void       *buf = NULL;
201,627✔
76
  int8_t      sver = 0;
201,627✔
77
  int32_t     tlen;
200,415✔
78
  int32_t     dataPos = 0;
201,627✔
79

80
  code = sdbGetRawSoftVer(pRaw, &sver);
201,627✔
81
  TSDB_CHECK_CODE(code, lino, _over);
201,627!
82

83
  if (sver != MND_STREAM_VER_NUMBER) {
201,627!
84
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
85
    goto _over;
×
86
  }
87

88
  pRow = sdbAllocRow(sizeof(SStreamObj));
201,627✔
89
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
201,627!
90

91
  pStream = sdbGetRowObj(pRow);
201,627✔
92
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
201,627!
93

94
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
201,627!
95

96
  buf = taosMemoryMalloc(tlen + 1);
201,627!
97
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
201,627!
98

99
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
201,627!
100

101
  SDecoder decoder;
200,415✔
102
  tDecoderInit(&decoder, buf, tlen + 1);
201,627✔
103
  code = tDecodeSStreamObj(&decoder, pStream, sver);
201,627✔
104
  tDecoderClear(&decoder);
201,627✔
105

106
  if (code < 0) {
201,627!
107
    tFreeStreamObj(pStream);
×
108
  }
109

110
_over:
201,627✔
111
  taosMemoryFreeClear(buf);
201,627!
112

113
  if (code != TSDB_CODE_SUCCESS) {
201,627!
114
    char *p = (pStream == NULL || NULL == pStream->pCreate) ? "null" : pStream->pCreate->name;
×
115
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
116
    taosMemoryFreeClear(pRow);
×
117

118
    terrno = code;
×
119
    return NULL;
×
120
  } else {
121
    mTrace("stream:%s, decode from raw:%p, row:%p", pStream->pCreate->name, pRaw, pStream);
201,627!
122

123
    terrno = 0;
201,627✔
124
    return pRow;
201,627✔
125
  }
126
}
127

128
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
170,558✔
129
  mTrace("stream:%s, perform insert action", pStream->pCreate->name);
170,558!
130
  return 0;
170,558✔
131
}
132

133
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
201,627✔
134
  mInfo("stream:%s, perform delete action", pStream->pCreate->name);
201,627!
135
  tFreeStreamObj(pStream);
201,627✔
136
  return 0;
201,627✔
137
}
138

139
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
15,119✔
140
  mTrace("stream:%s, perform update action", pOldStream->pCreate->name);
15,119!
141

142
  atomic_store_32(&pOldStream->mainSnodeId, pNewStream->mainSnodeId);
15,119✔
143
  atomic_store_8(&pOldStream->userStopped, atomic_load_8(&pNewStream->userStopped));
15,119✔
144
  pOldStream->updateTime = pNewStream->updateTime;
15,119✔
145
  
146
  return 0;
15,119✔
147
}
148

149
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
441,849✔
150
  int32_t code = 0;
441,849✔
151
  SSdb   *pSdb = pMnode->pSdb;
441,849✔
152
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
441,849✔
153
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
441,849!
154
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
191,785✔
155
  }
156
  return code;
441,849✔
157
}
158

159
static bool mndStreamGetNameFromId(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
160
  SStreamObj* pStream = pObj;
×
161

162
  if (pStream->pCreate->streamId == *(int64_t*)p1) {
×
163
    strncpy((char*)p2, pStream->name, TSDB_STREAM_NAME_LEN);
×
164
    return false;
×
165
  }
166

167
  return true;
×
168
}
169

170
int32_t mndAcquireStreamById(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
×
171
  int32_t code = 0;
×
172
  SSdb   *pSdb = pMnode->pSdb;
×
173
  char streamName[TSDB_STREAM_NAME_LEN];
×
174
  streamName[0] = 0;
×
175
  
176
  sdbTraverse(pSdb, SDB_STREAM, mndStreamGetNameFromId, &streamId, streamName, NULL);
×
177
  if (streamName[0]) {
×
178
    (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
179
    if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
180
      code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
181
    }
182
  }
183
  
184
  return code;
×
185
}
186

187
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
192,620✔
188
  SSdb *pSdb = pMnode->pSdb;
192,620✔
189
  sdbRelease(pSdb, pStream);
192,620✔
190
}
192,620✔
191

192
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
193
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
194
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
195
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
196
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
197

198
static void mndStreamBuildObj(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate, int32_t snodeId) {
169,958✔
199
  int32_t     code = 0;
169,958✔
200

201
  pObj->pCreate = pCreate;
169,958✔
202
  strncpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
169,958!
203
  pObj->mainSnodeId = snodeId;
169,958✔
204
  
205
  pObj->userDropped = 0;
169,958✔
206
  pObj->userStopped = 0;
169,958✔
207
  
208
  pObj->createTime = taosGetTimestampMs();
169,958✔
209
  pObj->updateTime = pObj->createTime;
169,958✔
210

211
  mstLogSStreamObj("create stream", pObj);
169,958✔
212
}
169,958✔
213

214
static int32_t mndStreamCreateOutStb(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream, const char *user) {
98,558✔
215
  SStbObj *pStb = NULL;
98,558✔
216
  SDbObj  *pDb = NULL;
98,558✔
217
  int32_t  code = 0;
98,558✔
218
  int32_t  lino = 0;
98,558✔
219

220
  SMCreateStbReq createReq = {0};
98,558✔
221
  TAOS_STRNCAT(createReq.name, pStream->outDB, TSDB_DB_FNAME_LEN);
98,558!
222
  TAOS_STRNCAT(createReq.name, ".", 2);
98,558!
223
  TAOS_STRNCAT(createReq.name,  pStream->outTblName, TSDB_TABLE_NAME_LEN);
98,558!
224
  createReq.numOfColumns = taosArrayGetSize(pStream->outCols);
98,558✔
225
  createReq.numOfTags = pStream->outTags ? taosArrayGetSize(pStream->outTags) : 1;
98,558!
226
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
98,558✔
227
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
98,558!
228

229
  // build fields
230
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
496,727✔
231
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
398,169✔
232
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
398,169!
233
    SFieldWithOptions *pSrc = taosArrayGet(pStream->outCols, i);
398,169✔
234

235
    tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
398,169!
236
    pField->flags = pSrc->flags;
398,169✔
237
    pField->type = pSrc->type;
398,169✔
238
    pField->bytes = pSrc->bytes;
398,169✔
239
    pField->compress = createDefaultColCmprByType(pField->type);
398,169✔
240
    if (IS_DECIMAL_TYPE(pField->type)) {
398,169!
241
      pField->typeMod = pSrc->typeMod;
×
242
      pField->flags |= COL_HAS_TYPE_MOD;
×
243
    }
244
  }
245

246
  if (NULL == pStream->outTags) {
98,558!
247
    createReq.numOfTags = 1;
×
248
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
249
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
250

251
    // build tags
252
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
253
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
254

255
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
256
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
257
    pField->flags = 0;
×
258
    pField->bytes = 8;
×
259
  } else {
260
    createReq.numOfTags = taosArrayGetSize(pStream->outTags);
98,558✔
261
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
98,558✔
262
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
98,558!
263

264
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
261,026✔
265
      SField *pField = taosArrayGet(createReq.pTags, i);
162,468✔
266
      if (pField == NULL) {
162,468!
267
        continue;
×
268
      }
269

270
      TAOS_FIELD_E *pSrc = taosArrayGet(pStream->outTags, i);
162,468✔
271
      pField->bytes = pSrc->bytes;
162,468✔
272
      pField->flags = 0;
162,468✔
273
      pField->type = pSrc->type;
162,468✔
274
      tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
162,468!
275
    }
276
  }
277

278
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
98,558!
279
    goto _OVER;
×
280
  }
281

282
  pStb = mndAcquireStb(pMnode, createReq.name);
98,558✔
283
  if (pStb != NULL) {
98,558!
284
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
285
    goto _OVER;
×
286
  }
287

288
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
98,558✔
289
  if (pDb == NULL) {
98,558!
290
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
291
    goto _OVER;
×
292
  }
293

294
  int32_t numOfStbs = -1;
98,558✔
295
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
98,558!
296
    goto _OVER;
×
297
  }
298

299
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
98,558!
300
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
301
    goto _OVER;
×
302
  }
303

304
  SStbObj stbObj = {0};
98,558✔
305

306
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
98,558!
307
    goto _OVER;
×
308
  }
309

310
  stbObj.uid = pStream->outStbUid;
98,558✔
311

312
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
98,558!
313
    mndFreeStb(&stbObj);
×
314
    goto _OVER;
×
315
  }
316

317
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->outTblName, createReq.numOfColumns);
98,558✔
318

319
  tFreeSMCreateStbReq(&createReq);
98,558✔
320
  mndFreeStb(&stbObj);
98,558✔
321
  mndReleaseStb(pMnode, pStb);
98,558✔
322
  mndReleaseDb(pMnode, pDb);
98,558✔
323
  return code;
98,558✔
324

325
_OVER:
×
326
  tFreeSMCreateStbReq(&createReq);
×
327
  mndReleaseStb(pMnode, pStb);
×
328
  mndReleaseDb(pMnode, pDb);
×
329

330
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->outTblName, lino,
×
331
         tstrerror(code));
332
  return code;
×
333
}
334

335
static int32_t mndStreamValidateCreate(SMnode *pMnode, char* pUser, SCMCreateStreamReq* pCreate) {
190,696✔
336
  int32_t code = 0, lino = 0;
190,696✔
337
  int64_t streamId = pCreate->streamId;
190,696✔
338

339
  if (pCreate->streamDB) {
190,696!
340
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->streamDB);
190,696✔
341
    if (code) {
190,696✔
342
      mstsError("user %s failed to create stream %s in db %s since %s", pUser, pCreate->name, pCreate->streamDB, tstrerror(code));
12,912!
343
    }
344
    TSDB_CHECK_CODE(code, lino, _OVER);
190,696✔
345
  }
346

347
  if (pCreate->triggerDB) {
177,784✔
348
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, pCreate->triggerDB);
177,232✔
349
    if (code) {
177,232✔
350
      mstsError("user %s failed to create stream %s using trigger db %s since %s", pUser, pCreate->name, pCreate->triggerDB, tstrerror(code));
6,194!
351
    }
352
    TSDB_CHECK_CODE(code, lino, _OVER);
177,232✔
353
  }
354

355
  if (pCreate->calcDB) {
171,590!
356
    int32_t dbNum = taosArrayGetSize(pCreate->calcDB);
171,590✔
357
    for (int32_t i = 0; i < dbNum; ++i) {
341,552✔
358
      char* calcDB = taosArrayGetP(pCreate->calcDB, i);
171,050✔
359
      code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, calcDB);
171,050✔
360
      if (code) {
171,050✔
361
        mstsError("user %s failed to create stream %s using calcDB %s since %s", pUser, pCreate->name, calcDB, tstrerror(code));
1,088!
362
      }
363
      TSDB_CHECK_CODE(code, lino, _OVER);
171,050✔
364
    }
365
  }
366

367
  if (pCreate->outDB) {
170,502✔
368
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->outDB);
169,962✔
369
    if (code) {
169,962✔
370
      mstsError("user %s failed to create stream %s using out db %s since %s", pUser, pCreate->name, pCreate->outDB, tstrerror(code));
544!
371
    }
372
    TSDB_CHECK_CODE(code, lino, _OVER);
169,962✔
373
  }
374

375
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
169,958✔
376
  if (streamNum > MND_STREAM_MAX_NUM) {
169,958!
377
    code = TSDB_CODE_MND_TOO_MANY_STREAMS;
×
378
    mstsError("failed to create stream %s since %s, stream number:%d", pCreate->name, tstrerror(code), streamNum);
×
379
    return code;
×
380
  }
381

382
_OVER:
169,958✔
383

384
  return code;
190,696✔
385
}
386

387
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
724,997✔
388
  SSdb   *pSdb = pMnode->pSdb;
724,997✔
389
  void   *pIter = NULL;
724,997✔
390
  int32_t code = 0;
724,997✔
391

392
  while (1) {
62,694✔
393
    SStreamObj *pStream = NULL;
787,691✔
394
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
787,691✔
395
    if (pIter == NULL) break;
787,691✔
396

397
    if (0 == strcmp(pStream->pCreate->streamDB, pDb->name)) {
62,694!
398
      mInfo("start to drop stream %s in db %s", pStream->pCreate->name, pDb->name);
5,406!
399
      
400
      pStream->updateTime = taosGetTimestampMs();
10,812✔
401
      
402
      atomic_store_8(&pStream->userDropped, 1);
5,406✔
403
      
404
      MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
5,406✔
405
      
406
      msmUndeployStream(pMnode, pStream->pCreate->streamId, pStream->pCreate->name);
5,406✔
407
      
408
      // drop stream
409
      code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
5,406✔
410
      if (code) {
5,406!
411
        mError("drop db trans:%d failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
412
        sdbRelease(pSdb, pStream);
×
413
        sdbCancelFetch(pSdb, pIter);
×
414
        TAOS_RETURN(code);
×
415
      }
416
    }
417

418
    sdbRelease(pSdb, pStream);
62,694✔
419
  }
420

421
  return 0;
724,997✔
422
}
423

424
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
402,958✔
425
  SMnode     *pMnode = pReq->info.node;
402,958✔
426
  SSdb       *pSdb = pMnode->pSdb;
402,958✔
427
  int32_t     numOfRows = 0;
402,958✔
428
  SStreamObj *pStream = NULL;
402,958✔
429
  int32_t     code = 0;
402,958✔
430

431
  while (numOfRows < rows) {
1,194,760!
432
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
1,194,760✔
433
    if (pShow->pIter == NULL) break;
1,194,760✔
434

435
    code = mstSetStreamAttrResBlock(pMnode, pStream, pBlock, numOfRows);
791,802✔
436
    if (code == 0) {
791,802!
437
      numOfRows++;
791,802✔
438
    }
439
    sdbRelease(pSdb, pStream);
791,802✔
440
  }
441

442
  pShow->numOfRows += numOfRows;
402,958✔
443
  return numOfRows;
402,958✔
444
}
445

446
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
447
  SSdb *pSdb = pMnode->pSdb;
×
448
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
449
}
×
450

451
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
281,619✔
452
  SMnode     *pMnode = pReq->info.node;
281,619✔
453
  SSdb       *pSdb = pMnode->pSdb;
281,619✔
454
  int32_t     numOfRows = 0;
281,619✔
455
  SStreamObj *pStream = NULL;
281,619✔
456
  int32_t     code = 0;
281,619✔
457

458
  while (numOfRows < rowsCapacity) {
2,172,702✔
459
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
2,128,505✔
460
    if (pShow->pIter == NULL) {
2,128,505✔
461
      break;
237,422✔
462
    }
463

464
    code = mstSetStreamTasksResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
1,891,083✔
465

466
    sdbRelease(pSdb, pStream);
1,891,083✔
467
  }
468

469
  pShow->numOfRows += numOfRows;
281,619✔
470
  return numOfRows;
281,619✔
471
}
472

473
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
474
  SSdb *pSdb = pMnode->pSdb;
×
475
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
476
}
×
477

478
static int32_t mndRetrieveStreamRecalculates(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
580✔
479
  SMnode     *pMnode = pReq->info.node;
580✔
480
  SSdb       *pSdb = pMnode->pSdb;
580✔
481
  int32_t     numOfRows = 0;
580✔
482
  SStreamObj *pStream = NULL;
580✔
483
  int32_t     code = 0;
580✔
484

485
  while (numOfRows < rowsCapacity) {
1,160!
486
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
1,160✔
487
    if (pShow->pIter == NULL) {
1,160✔
488
      break;
580✔
489
    }
490

491
    code = mstSetStreamRecalculatesResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
580✔
492

493
    sdbRelease(pSdb, pStream);
580✔
494
  }
495

496
  pShow->numOfRows += numOfRows;
580✔
497
  return numOfRows;
580✔
498
}
499

500
static void mndCancelGetNextStreamRecalculates(SMnode *pMnode, void *pIter) {
×
501
  SSdb *pSdb = pMnode->pSdb;
×
502
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
503
}
×
504

505

506
static bool mndStreamUpdateTagsFlag(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
1,950,243✔
507
  SStreamObj *pStream = pObj;
1,950,243✔
508
  if (atomic_load_8(&pStream->userDropped)) {
1,950,243!
509
    return true;
×
510
  }
511

512
  if (TSDB_SUPER_TABLE != pStream->pCreate->triggerTblType && 
1,950,243✔
513
      TSDB_CHILD_TABLE != pStream->pCreate->triggerTblType && 
720,299✔
514
      TSDB_VIRTUAL_CHILD_TABLE != pStream->pCreate->triggerTblType) {
274,739✔
515
    return true;
152,847✔
516
  }
517

518
  if (pStream->pCreate->triggerTblSuid != *(uint64_t*)p1) {
1,797,396✔
519
    return true;
1,783,509✔
520
  }
521

522
  if (NULL == pStream->pCreate->partitionCols) {
13,887!
UNCOV
523
    return true;
×
524
  }
525

526
  SNodeList* pList = NULL;
13,887✔
527
  int32_t code = nodesStringToList(pStream->pCreate->partitionCols, &pList);
13,887✔
528
  if (code) {
13,887!
529
    nodesDestroyList(pList);
×
530
    mstError("partitionCols [%s] nodesStringToList failed with error:%s", (char*)pStream->pCreate->partitionCols, tstrerror(code));
×
531
    return true;
×
532
  }
533

534
  SSchema* pTags = (SSchema*)p2;
13,887✔
535
  int32_t* tagNum = (int32_t*)p3;
13,887✔
536

537
  SNode* pNode = NULL;
13,887✔
538
  FOREACH(pNode, pList) {
43,140!
539
    SColumnNode* pCol = (SColumnNode*)pNode;
29,253✔
540
    for (int32_t i = 0; i < *tagNum; ++i) {
84,653✔
541
      if (pCol->colId == pTags[i].colId) {
70,766✔
542
        pTags[i].flags |= COL_REF_BY_STM;
15,366✔
543
        break;
15,366✔
544
      }
545
    }
546
  }
547

548
  nodesDestroyList(pList);
13,887✔
549
  
550
  return true;
13,887✔
551
}
552

553

554
void mndStreamUpdateTagsRefFlag(SMnode *pMnode, int64_t suid, SSchema* pTags, int32_t tagNum) {
24,650,635✔
555
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
24,650,635✔
556
  if (streamNum <= 0) {
24,650,635✔
557
    return;
24,463,339✔
558
  }
559

560
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mndStreamUpdateTagsFlag, &suid, pTags, &tagNum);
187,296✔
561
}
562

563
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) {
11,294✔
564
  SMnode     *pMnode = pReq->info.node;
11,294✔
565
  SStreamObj *pStream = NULL;
11,294✔
566
  int32_t     code = 0;
11,294✔
567

568
  SMPauseStreamReq pauseReq = {0};
11,294✔
569
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
11,294!
570
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
571
  }
572

573
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
11,294✔
574
  if (pStream == NULL || code != 0) {
11,294!
575
    if (pauseReq.igNotExists) {
×
576
      mInfo("stream:%s, not exist, not stop stream", pauseReq.name);
×
577
      taosMemoryFree(pauseReq.name);
×
578
      return 0;
×
579
    } else {
580
      mError("stream:%s not exist, failed to stop stream", pauseReq.name);
×
581
      taosMemoryFree(pauseReq.name);
×
582
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
583
    }
584
  }
585

586
  taosMemoryFree(pauseReq.name);
11,294!
587

588
  int64_t streamId = pStream->pCreate->streamId;
11,294✔
589
  
590
  mstsInfo("start to stop stream %s", pStream->name);
11,294!
591

592
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
11,294✔
593
  if (code != TSDB_CODE_SUCCESS) {
11,294✔
594
    mstsError("user %s failed to stop stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
5,640!
595
    sdbRelease(pMnode->pSdb, pStream);
5,640✔
596
    return code;
5,640✔
597
  }
598

599
  if (atomic_load_8(&pStream->userDropped)) {
5,654!
600
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
601
    mstsError("user %s failed to stop stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
602
    sdbRelease(pMnode->pSdb, pStream);
×
603
    return code;
×
604
  }
605

606
  STrans *pTrans = NULL;
5,654✔
607
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, &pTrans);
5,654✔
608
  if (pTrans == NULL || code) {
5,654!
609
    mstsError("failed to stop stream %s since %s", pStream->name, tstrerror(code));
×
610
    sdbRelease(pMnode->pSdb, pStream);
×
611
    return code;
×
612
  }
613

614
  pStream->updateTime = taosGetTimestampMs();
11,308✔
615

616
  atomic_store_8(&pStream->userStopped, 1);
5,654✔
617

618
  MND_STREAM_SET_LAST_TS(STM_EVENT_STOP_STREAM, pStream->updateTime);
5,654!
619

620
  msmUndeployStream(pMnode, streamId, pStream->name);
5,654✔
621

622
  // stop stream
623
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
5,654✔
624
  if (code != TSDB_CODE_SUCCESS) {
5,654!
625
    sdbRelease(pMnode->pSdb, pStream);
×
626
    mndTransDrop(pTrans);
×
627
    return code;
×
628
  }
629

630
  code = mndTransPrepare(pMnode, pTrans);
5,654✔
631
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
5,654!
632
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
×
633
    sdbRelease(pMnode->pSdb, pStream);
×
634
    mndTransDrop(pTrans);
×
635
    return code;
×
636
  }
637

638
  sdbRelease(pMnode->pSdb, pStream);
5,654✔
639
  mndTransDrop(pTrans);
5,654✔
640

641
  return TSDB_CODE_ACTION_IN_PROGRESS;
5,654✔
642
}
643

644

645
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq) {
11,294✔
646
  SMnode     *pMnode = pReq->info.node;
11,294✔
647
  SStreamObj *pStream = NULL;
11,294✔
648
  int32_t     code = 0;
11,294✔
649

650
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
11,294!
651
    return code;
×
652
  }
653

654
  SMResumeStreamReq resumeReq = {0};
11,294✔
655
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
11,294!
656
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
657
  }
658

659
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
11,294✔
660
  if (pStream == NULL || code != 0) {
11,294!
661
    if (resumeReq.igNotExists) {
×
662
      mInfo("stream:%s not exist, not start stream", resumeReq.name);
×
663
      taosMemoryFree(resumeReq.name);
×
664
      sdbRelease(pMnode->pSdb, pStream);
×
665
      return 0;
×
666
    } else {
667
      mError("stream:%s not exist, failed to start stream", resumeReq.name);
×
668
      taosMemoryFree(resumeReq.name);
×
669
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
670
    }
671
  }
672

673
  taosMemoryFree(resumeReq.name);
11,294!
674

675
  int64_t streamId = pStream->pCreate->streamId;
11,294✔
676

677
  mstsInfo("start to start stream %s from stopped", pStream->name);
11,294!
678

679
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
11,294✔
680
  if (code != TSDB_CODE_SUCCESS) {
11,294✔
681
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
5,640!
682
    sdbRelease(pMnode->pSdb, pStream);
5,640✔
683
    return code;
5,640✔
684
  }
685

686
  if (atomic_load_8(&pStream->userDropped)) {
5,654!
687
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
688
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
689
    sdbRelease(pMnode->pSdb, pStream);
×
690
    return code;
×
691
  }
692

693
  if (0 == atomic_load_8(&pStream->userStopped)) {
5,654!
694
    code = TSDB_CODE_MND_STREAM_NOT_STOPPED;
×
695
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
696
    sdbRelease(pMnode->pSdb, pStream);
×
697
    return code;
×
698
  }
699
  
700
  atomic_store_8(&pStream->userStopped, 0);
5,654✔
701

702
  pStream->updateTime = taosGetTimestampMs();
11,308✔
703

704
  MND_STREAM_SET_LAST_TS(STM_EVENT_START_STREAM, pStream->updateTime);
5,654!
705

706
  STrans *pTrans = NULL;
5,654✔
707
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_START_NAME, &pTrans);
5,654✔
708
  if (pTrans == NULL || code) {
5,654!
709
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
710
    sdbRelease(pMnode->pSdb, pStream);
×
711
    return code;
×
712
  }
713

714
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
5,654✔
715
  if (code != TSDB_CODE_SUCCESS) {
5,654!
716
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
717
    sdbRelease(pMnode->pSdb, pStream);
×
718
    mndTransDrop(pTrans);
×
719
    return code;
×
720
  }
721

722
  code = mndTransPrepare(pMnode, pTrans);
5,654✔
723
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
5,654!
724
    mstsError("trans:%d, failed to prepare start stream %s trans since %s", pTrans->id, pStream->name, tstrerror(code));
×
725
    sdbRelease(pMnode->pSdb, pStream);
×
726
    mndTransDrop(pTrans);
×
727
    return code;
×
728
  }
729

730
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->name, NULL, true, STREAM_ACT_DEPLOY);
5,654✔
731

732
  sdbRelease(pMnode->pSdb, pStream);
5,654✔
733
  mndTransDrop(pTrans);
5,654✔
734

735
  return TSDB_CODE_ACTION_IN_PROGRESS;
5,654✔
736
}
737

738

739
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
16,184✔
740
  SMnode     *pMnode = pReq->info.node;
16,184✔
741
  SStreamObj *pStream = NULL;
16,184✔
742
  int32_t     code = 0;
16,184✔
743

744
  SMDropStreamReq dropReq = {0};
16,184✔
745
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
16,184!
746
    mError("invalid drop stream msg recv, discarded");
×
747
    code = TSDB_CODE_INVALID_MSG;
×
748
    TAOS_RETURN(code);
×
749
  }
750

751
  mDebug("recv drop stream:%s msg", dropReq.name);
16,184!
752

753
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
16,184✔
754
  if (pStream == NULL || code != 0) {
16,184!
755
    if (dropReq.igNotExists) {
×
756
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
×
757
      sdbRelease(pMnode->pSdb, pStream);
×
758
      tFreeMDropStreamReq(&dropReq);
×
759
      return 0;
×
760
    } else {
761
      mError("stream:%s not exist failed to drop it", dropReq.name);
×
762
      tFreeMDropStreamReq(&dropReq);
×
763
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
764
    }
765
  }
766

767
  int64_t streamId = pStream->pCreate->streamId;
16,184✔
768

769
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
16,184✔
770
  if (code != 0) {
16,184✔
771
    mstsError("user %s failed to drop stream %s since %s", pReq->info.conn.user, dropReq.name, tstrerror(code));
5,640!
772
    sdbRelease(pMnode->pSdb, pStream);
5,640✔
773
    tFreeMDropStreamReq(&dropReq);
5,640✔
774
    return code;
5,640✔
775
  }
776

777
  if (pStream->pCreate->tsmaId != 0) {
10,544!
778
    mstsDebug("try to drop tsma related stream, tsmaId:%" PRIx64, pStream->pCreate->tsmaId);
×
779

780
    void    *pIter = NULL;
×
781
    SSmaObj *pSma = NULL;
×
782
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
783
    while (pIter) {
×
784
      if (pSma && pSma->uid == pStream->pCreate->tsmaId) {
×
785
        sdbRelease(pMnode->pSdb, pSma);
×
786
        sdbRelease(pMnode->pSdb, pStream);
×
787

788
        sdbCancelFetch(pMnode->pSdb, pIter);
×
789
        tFreeMDropStreamReq(&dropReq);
×
790
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
791

792
        mstsError("refused to drop tsma-related stream %s since tsma still exists", dropReq.name);
×
793
        TAOS_RETURN(code);
×
794
      }
795

796
      if (pSma) {
×
797
        sdbRelease(pMnode->pSdb, pSma);
×
798
      }
799

800
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
801
    }
802
  }
803

804
  mstsInfo("start to drop stream %s", pStream->pCreate->name);
10,544!
805

806
  pStream->updateTime = taosGetTimestampMs();
21,088✔
807

808
  atomic_store_8(&pStream->userDropped, 1);
10,544✔
809

810
  MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
10,544!
811

812
  msmUndeployStream(pMnode, streamId, pStream->pCreate->name);
10,544✔
813

814
  STrans *pTrans = NULL;
10,544✔
815
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, &pTrans);
10,544✔
816
  if (pTrans == NULL || code) {
10,544!
817
    mstsError("failed to drop stream %s since %s", dropReq.name, tstrerror(code));
×
818
    sdbRelease(pMnode->pSdb, pStream);
×
819
    tFreeMDropStreamReq(&dropReq);
×
820
    TAOS_RETURN(code);
×
821
  }
822

823
  // drop stream
824
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
10,544✔
825
  if (code) {
10,544!
826
    mstsError("trans:%d, failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
827
    sdbRelease(pMnode->pSdb, pStream);
×
828
    mndTransDrop(pTrans);
×
829
    tFreeMDropStreamReq(&dropReq);
×
830
    TAOS_RETURN(code);
×
831
  }
832

833
  code = mndTransPrepare(pMnode, pTrans);
10,544✔
834
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
10,544!
835
    mstsError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
836
    sdbRelease(pMnode->pSdb, pStream);
×
837
    mndTransDrop(pTrans);
×
838
    tFreeMDropStreamReq(&dropReq);
×
839
    TAOS_RETURN(code);
×
840
  }
841

842
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", pStream->pCreate->streamDB, NULL, 0);
10,544✔
843

844
  sdbRelease(pMnode->pSdb, pStream);
10,544✔
845
  mndTransDrop(pTrans);
10,544✔
846

847
  mstsDebug("drop stream %s half completed", dropReq.name);
10,544!
848
  code = TSDB_CODE_ACTION_IN_PROGRESS;
10,544✔
849

850
  tFreeMDropStreamReq(&dropReq);
10,544✔
851
  
852
  TAOS_RETURN(code);
10,544✔
853
}
854

855
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
207,786✔
856
  SMnode     *pMnode = pReq->info.node;
207,786✔
857
  SStreamObj *pStream = NULL;
207,786✔
858
  SStreamObj  streamObj = {0};
207,786✔
859
  int32_t     code = TSDB_CODE_SUCCESS;
207,786✔
860
  int32_t     lino = 0;
207,786✔
861
  STrans     *pTrans = NULL;
207,786✔
862
  uint64_t    streamId = 0;
207,786✔
863
  SCMCreateStreamReq* pCreate = NULL;
207,786✔
864

865
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
207,786!
866
    goto _OVER;
×
867
  }
868
  
869
#ifdef WINDOWS
870
  code = TSDB_CODE_MND_INVALID_PLATFORM;
871
  goto _OVER;
872
#endif
873

874
  pCreate = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
207,786!
875
  TSDB_CHECK_NULL(pCreate, code, lino, _OVER, terrno);
207,786!
876
  
877
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, pCreate);
207,786✔
878
  TSDB_CHECK_CODE(code, lino, _OVER);
207,786!
879

880
  streamId = pCreate->streamId;
207,786✔
881

882
  mstsInfo("start to create stream %s, sql:%s", pCreate->name, pCreate->sql);
207,786!
883

884
  int32_t snodeId = msmAssignRandomSnodeId(pMnode, streamId);
207,786✔
885
  if (!GOT_SNODE(snodeId)) {
207,786✔
886
    code = terrno;
5,710✔
887
    TSDB_CHECK_CODE(code, lino, _OVER);
5,710!
888
  }
889
  
890
  code = mndAcquireStream(pMnode, pCreate->name, &pStream);
202,076✔
891
  if (pStream != NULL && code == 0) {
202,076!
892
    if (pCreate->igExists) {
11,380!
893
      mstsInfo("stream %s already exist, ignore exist is set", pCreate->name);
×
894
    } else {
895
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
11,380✔
896
    }
897

898
    mndReleaseStream(pMnode, pStream);
11,380✔
899
    goto _OVER;
11,380✔
900
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
190,696!
901
    goto _OVER;
×
902
  }
903

904
  code = mndStreamValidateCreate(pMnode, pReq->info.conn.user, pCreate);
190,696✔
905
  TSDB_CHECK_CODE(code, lino, _OVER);
190,696✔
906

907
  mndStreamBuildObj(pMnode, &streamObj, pCreate, snodeId);
169,958✔
908
  pCreate = NULL;
169,958✔
909

910
  pStream = &streamObj;
169,958✔
911

912
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, &pTrans);
169,958✔
913
  if (pTrans == NULL || code) {
169,958!
914
    goto _OVER;
×
915
  }
916

917
  // create stb for stream
918
  if (TSDB_SUPER_TABLE == pStream->pCreate->outTblType && !pStream->pCreate->outStbExists) {
169,958✔
919
    pStream->pCreate->outStbUid = mndGenerateUid(pStream->pCreate->outTblName, strlen(pStream->pCreate->outTblName));
98,558!
920
    code = mndStreamCreateOutStb(pMnode, pTrans, pStream->pCreate, pReq->info.conn.user);
98,558✔
921
    TSDB_CHECK_CODE(code, lino, _OVER);
98,558!
922
  }
923

924
  // add stream to trans
925
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
169,958✔
926
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
169,958!
927
    mstsError("failed to persist stream %s since %s", pStream->pCreate->name, tstrerror(code));
×
928
    goto _OVER;
×
929
  }
930

931
  // execute creation
932
  code = mndTransPrepare(pMnode, pTrans);
169,958✔
933
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
169,958!
934
    mstsError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
935
    goto _OVER;
×
936
  }
937
  code = TSDB_CODE_ACTION_IN_PROGRESS;
169,958✔
938

939
  auditRecord(pReq, pMnode->clusterId, "createStream", pStream->pCreate->streamDB, pStream->pCreate->name, pStream->pCreate->sql, strlen(pStream->pCreate->sql));
169,958!
940

941
  MND_STREAM_SET_LAST_TS(STM_EVENT_CREATE_STREAM, taosGetTimestampMs());
275,052✔
942

943
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, true, STREAM_ACT_DEPLOY);
169,958✔
944

945
_OVER:
207,786✔
946

947
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
207,786!
948
    if (pStream && pStream->pCreate) {
37,828!
949
      mstsError("failed to create stream %s at line:%d since %s", pStream->pCreate->name, lino, tstrerror(code));
11,380!
950
    } else {
951
      mstsError("failed to create stream at line:%d since %s", lino, tstrerror(code));
26,448!
952
    }
953
  } else {
954
    mstsDebug("create stream %s half completed", pStream->pCreate ? pStream->pCreate->name : "unknown");
169,958!
955
  }
956

957
  tFreeSCMCreateStreamReq(pCreate);
207,786✔
958
  taosMemoryFreeClear(pCreate);
207,786!
959

960
  mndTransDrop(pTrans);
207,786✔
961
  tFreeStreamObj(&streamObj);
207,786✔
962

963
  return code;
207,786✔
964
}
965

966
static int32_t mndProcessRecalcStreamReq(SRpcMsg *pReq) {
18,672✔
967
  SMnode     *pMnode = pReq->info.node;
18,672✔
968
  SStreamObj *pStream = NULL;
18,672✔
969
  int32_t     code = 0;
18,672✔
970

971
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
18,672!
972
    return code;
×
973
  }
974

975
  SMRecalcStreamReq recalcReq = {0};
18,672✔
976
  if (tDeserializeSMRecalcStreamReq(pReq->pCont, pReq->contLen, &recalcReq) < 0) {
18,672!
977
    tFreeMRecalcStreamReq(&recalcReq);
×
978
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
979
  }
980

981
  code = mndAcquireStream(pMnode, recalcReq.name, &pStream);
18,672✔
982
  if (pStream == NULL || code != 0) {
18,672!
983
    mError("stream:%s not exist, failed to recalc stream", recalcReq.name);
×
984
    tFreeMRecalcStreamReq(&recalcReq);
×
985
    TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
986
  }
987

988
  int64_t streamId = pStream->pCreate->streamId;
18,672✔
989
  
990
  mstsInfo("start to recalc stream %s", recalcReq.name);
18,672!
991

992
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
18,672✔
993
  if (code != TSDB_CODE_SUCCESS) {
18,672✔
994
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
11,600!
995
    sdbRelease(pMnode->pSdb, pStream);
11,600✔
996
    tFreeMRecalcStreamReq(&recalcReq);
11,600✔
997
    return code;
11,600✔
998
  }
999

1000
  if (atomic_load_8(&pStream->userDropped)) {
7,072!
1001
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
1002
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
1003
    sdbRelease(pMnode->pSdb, pStream);
×
1004
    tFreeMRecalcStreamReq(&recalcReq);
×
1005
    return code;
×
1006
  }
1007

1008
  if (atomic_load_8(&pStream->userStopped)) {
7,072!
1009
    code = TSDB_CODE_MND_STREAM_STOPPED;
×
1010
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
1011
    sdbRelease(pMnode->pSdb, pStream);
×
1012
    tFreeMRecalcStreamReq(&recalcReq);
×
1013
    return code;
×
1014
  }
1015

1016
  if (WINDOW_TYPE_PERIOD == pStream->pCreate->triggerType) {
7,072!
UNCOV
1017
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
UNCOV
1018
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
×
UNCOV
1019
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
1020
    tFreeMRecalcStreamReq(&recalcReq);
×
UNCOV
1021
    return code;
×
1022
  }
1023

1024
  /*
1025
  pStream->updateTime = taosGetTimestampMs();
1026

1027
  STrans *pTrans = NULL;
1028
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RECALC_NAME, &pTrans);
1029
  if (pTrans == NULL || code) {
1030
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
1031
    sdbRelease(pMnode->pSdb, pStream);
1032
    return code;
1033
  }
1034

1035
  // stop stream
1036
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
1037
  if (code != TSDB_CODE_SUCCESS) {
1038
    sdbRelease(pMnode->pSdb, pStream);
1039
    mndTransDrop(pTrans);
1040
    return code;
1041
  }
1042

1043
  code = mndTransPrepare(pMnode, pTrans);
1044
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1045
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
1046
    sdbRelease(pMnode->pSdb, pStream);
1047
    mndTransDrop(pTrans);
1048
    return code;
1049
  }
1050
*/
1051

1052
  code = msmRecalcStream(pMnode, pStream->pCreate->streamId, &recalcReq.timeRange);
7,072✔
1053
  if (code != TSDB_CODE_SUCCESS) {
7,072!
1054
    sdbRelease(pMnode->pSdb, pStream);
×
1055
    tFreeMRecalcStreamReq(&recalcReq);
×
1056
    return code;
×
1057
  }
1058
  
1059
  char buf[128];
7,072✔
1060
  snprintf(buf, sizeof(buf), "start:%" PRId64 ", end:%" PRId64, recalcReq.timeRange.skey, recalcReq.timeRange.ekey);
7,072✔
1061
  auditRecord(pReq, pMnode->clusterId, "recalcStream", pStream->name, recalcReq.name, buf, strlen(buf));
7,072✔
1062

1063
  sdbRelease(pMnode->pSdb, pStream);
7,072✔
1064
  tFreeMRecalcStreamReq(&recalcReq);
7,072✔
1065
//  mndTransDrop(pTrans);
1066

1067
  return TSDB_CODE_SUCCESS;
7,072✔
1068
}
1069

1070

1071
int32_t mndInitStream(SMnode *pMnode) {
553,987✔
1072
  SSdbTable table = {
553,987✔
1073
      .sdbType = SDB_STREAM,
1074
      .keyType = SDB_KEY_BINARY,
1075
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
1076
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
1077
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
1078
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
1079
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
1080
  };
1081

1082
  if (!tsDisableStream) {
553,987!
1083
    mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
553,987✔
1084
    mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
553,987✔
1085
    mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessStartStreamReq);
553,987✔
1086
    mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessStopStreamReq);
553,987✔
1087
    mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);  
553,987✔
1088
    mndSetMsgHandle(pMnode, TDMT_MND_RECALC_STREAM, mndProcessRecalcStreamReq);
553,987✔
1089
  }
1090
  
1091
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
553,987✔
1092
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
553,987✔
1093
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
553,987✔
1094
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
553,987✔
1095
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndRetrieveStreamRecalculates);
553,987✔
1096
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndCancelGetNextStreamRecalculates);
553,987✔
1097

1098
  int32_t code = sdbSetTable(pMnode->pSdb, table);
553,987✔
1099
  if (code) {
553,987!
1100
    return code;
×
1101
  }
1102

1103
  //code = sdbSetTable(pMnode->pSdb, tableSeq);
1104
  return code;
553,987✔
1105
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc