• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4658

09 Aug 2025 02:19PM UTC coverage: 59.866% (-1.3%) from 61.2%
#4658

push

travis-ci

GitHub
fix(stream)[TD-37079]: force close the last window in fill_history (#32436)

137251 of 291849 branches covered (47.03%)

Branch coverage included in aggregate %.

207628 of 284239 relevant lines covered (73.05%)

4861547.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.6
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndTrans.h"
23
#include "osMemory.h"
24
#include "parser.h"
25
#include "taoserror.h"
26
#include "tmisce.h"
27
#include "tname.h"
28

29
#define MND_STREAM_MAX_NUM 100000
30

31
typedef struct {
32
  int8_t placeHolder;  // // to fix windows compile error, define place holder
33
} SMStreamNodeCheckMsg;
34

35
static int32_t  mndNodeCheckSentinel = 0;
36
SStmRuntime  mStreamMgmt = {0};
37

38
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
39
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
41
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
42

43
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
44
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
50
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq);
51
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq);
52

53
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
54

55
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
56
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
57
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
58
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
59
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
60
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
61

62
void mndCleanupStream(SMnode *pMnode) {
2,559✔
63
  msmDestroyRuntimeInfo(pMnode);
2,559✔
64
  
65
  mDebug("mnd stream runtime info cleanup");
2,559✔
66
}
2,559✔
67

68
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
262✔
69
  int32_t     code = 0;
262✔
70
  int32_t     lino = 0;
262✔
71
  SSdbRow    *pRow = NULL;
262✔
72
  SStreamObj *pStream = NULL;
262✔
73
  void       *buf = NULL;
262✔
74
  int8_t      sver = 0;
262✔
75
  int32_t     tlen;
76
  int32_t     dataPos = 0;
262✔
77

78
  code = sdbGetRawSoftVer(pRaw, &sver);
262✔
79
  TSDB_CHECK_CODE(code, lino, _over);
262!
80

81
  if (sver != MND_STREAM_VER_NUMBER) {
262!
82
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
83
    goto _over;
×
84
  }
85

86
  pRow = sdbAllocRow(sizeof(SStreamObj));
262✔
87
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
262!
88

89
  pStream = sdbGetRowObj(pRow);
262✔
90
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
262!
91

92
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
262!
93

94
  buf = taosMemoryMalloc(tlen + 1);
262!
95
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
262!
96

97
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
262!
98

99
  SDecoder decoder;
100
  tDecoderInit(&decoder, buf, tlen + 1);
262✔
101
  code = tDecodeSStreamObj(&decoder, pStream, sver);
262✔
102
  tDecoderClear(&decoder);
262✔
103

104
  if (code < 0) {
262!
105
    tFreeStreamObj(pStream);
×
106
  }
107

108
_over:
262✔
109
  taosMemoryFreeClear(buf);
262!
110

111
  if (code != TSDB_CODE_SUCCESS) {
262!
112
    char *p = (pStream == NULL || NULL == pStream->pCreate) ? "null" : pStream->pCreate->name;
×
113
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
114
    taosMemoryFreeClear(pRow);
×
115

116
    terrno = code;
×
117
    return NULL;
×
118
  } else {
119
    mTrace("stream:%s, decode from raw:%p, row:%p", pStream->pCreate->name, pRaw, pStream);
262!
120

121
    terrno = 0;
262✔
122
    return pRow;
262✔
123
  }
124
}
125

126
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
216✔
127
  mTrace("stream:%s, perform insert action", pStream->pCreate->name);
216!
128
  return 0;
216✔
129
}
130

131
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
262✔
132
  mInfo("stream:%s, perform delete action", pStream->pCreate->name);
262!
133
  tFreeStreamObj(pStream);
262✔
134
  return 0;
262✔
135
}
136

137
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
27✔
138
  mTrace("stream:%s, perform update action", pOldStream->pCreate->name);
27!
139

140
  atomic_store_32(&pOldStream->mainSnodeId, pNewStream->mainSnodeId);
27✔
141
  atomic_store_8(&pOldStream->userStopped, atomic_load_8(&pNewStream->userStopped));
27✔
142
  pOldStream->updateTime = pNewStream->updateTime;
27✔
143
  
144
  return 0;
27✔
145
}
146

147
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
624✔
148
  int32_t code = 0;
624✔
149
  SSdb   *pSdb = pMnode->pSdb;
624✔
150
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
624✔
151
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
624!
152
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
244✔
153
  }
154
  return code;
624✔
155
}
156

157
static bool mndStreamGetNameFromId(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
158
  SStreamObj* pStream = pObj;
×
159

160
  if (pStream->pCreate->streamId == *(int64_t*)p1) {
×
161
    strncpy((char*)p2, pStream->name, TSDB_STREAM_NAME_LEN);
×
162
    return false;
×
163
  }
164

165
  return true;
×
166
}
167

168
int32_t mndAcquireStreamById(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
×
169
  int32_t code = 0;
×
170
  SSdb   *pSdb = pMnode->pSdb;
×
171
  char streamName[TSDB_STREAM_NAME_LEN];
172
  streamName[0] = 0;
×
173
  
174
  sdbTraverse(pSdb, SDB_STREAM, mndStreamGetNameFromId, &streamId, streamName, NULL);
×
175
  if (streamName[0]) {
×
176
    (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
177
    if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
178
      code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
179
    }
180
  }
181
  
182
  return code;
×
183
}
184

185
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
288✔
186
  SSdb *pSdb = pMnode->pSdb;
288✔
187
  sdbRelease(pSdb, pStream);
288✔
188
}
288✔
189

190
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
191
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
192
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
193
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
194
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
195

196
static void mndStreamBuildObj(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate, int32_t snodeId) {
216✔
197
  int32_t     code = 0;
216✔
198

199
  pObj->pCreate = pCreate;
216✔
200
  strncpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
216✔
201
  pObj->mainSnodeId = snodeId;
216✔
202
  
203
  pObj->userDropped = 0;
216✔
204
  pObj->userStopped = 0;
216✔
205
  
206
  pObj->createTime = taosGetTimestampMs();
216✔
207
  pObj->updateTime = pObj->createTime;
216✔
208

209
  mstLogSStreamObj("create stream", pObj);
216✔
210
}
216✔
211

212
static int32_t mndStreamCreateOutStb(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream, const char *user) {
121✔
213
  SStbObj *pStb = NULL;
121✔
214
  SDbObj  *pDb = NULL;
121✔
215
  int32_t  code = 0;
121✔
216
  int32_t  lino = 0;
121✔
217

218
  SMCreateStbReq createReq = {0};
121✔
219
  TAOS_STRNCAT(createReq.name, pStream->outDB, TSDB_DB_FNAME_LEN);
121✔
220
  TAOS_STRNCAT(createReq.name, ".", 2);
121✔
221
  TAOS_STRNCAT(createReq.name,  pStream->outTblName, TSDB_TABLE_NAME_LEN);
121✔
222
  createReq.numOfColumns = taosArrayGetSize(pStream->outCols);
121✔
223
  createReq.numOfTags = pStream->outTags ? taosArrayGetSize(pStream->outTags) : 1;
121!
224
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
121✔
225
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
121!
226

227
  // build fields
228
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
608✔
229
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
487✔
230
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
487!
231
    SFieldWithOptions *pSrc = taosArrayGet(pStream->outCols, i);
487✔
232

233
    tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
487✔
234
    pField->flags = 0;
487✔
235
    pField->type = pSrc->type;
487✔
236
    pField->bytes = pSrc->bytes;
487✔
237
    pField->compress = createDefaultColCmprByType(pField->type);
487✔
238
    if (IS_DECIMAL_TYPE(pField->type)) {
487!
239
      pField->typeMod = pSrc->typeMod;
×
240
      pField->flags |= COL_HAS_TYPE_MOD;
×
241
    }
242
  }
243

244
  if (NULL == pStream->outTags) {
121!
245
    createReq.numOfTags = 1;
×
246
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
247
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
248

249
    // build tags
250
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
251
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
252

253
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
254
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
255
    pField->flags = 0;
×
256
    pField->bytes = 8;
×
257
  } else {
258
    createReq.numOfTags = taosArrayGetSize(pStream->outTags);
121✔
259
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
121✔
260
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
121!
261

262
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
346✔
263
      SField *pField = taosArrayGet(createReq.pTags, i);
225✔
264
      if (pField == NULL) {
225!
265
        continue;
×
266
      }
267

268
      TAOS_FIELD_E *pSrc = taosArrayGet(pStream->outTags, i);
225✔
269
      pField->bytes = pSrc->bytes;
225✔
270
      pField->flags = 0;
225✔
271
      pField->type = pSrc->type;
225✔
272
      tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
225✔
273
    }
274
  }
275

276
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
121!
277
    goto _OVER;
×
278
  }
279

280
  pStb = mndAcquireStb(pMnode, createReq.name);
121✔
281
  if (pStb != NULL) {
121!
282
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
283
    goto _OVER;
×
284
  }
285

286
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
121✔
287
  if (pDb == NULL) {
121!
288
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
289
    goto _OVER;
×
290
  }
291

292
  int32_t numOfStbs = -1;
121✔
293
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
121!
294
    goto _OVER;
×
295
  }
296

297
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
121!
298
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
299
    goto _OVER;
×
300
  }
301

302
  SStbObj stbObj = {0};
121✔
303

304
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
121!
305
    goto _OVER;
×
306
  }
307

308
  stbObj.uid = pStream->outStbUid;
121✔
309

310
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
121!
311
    mndFreeStb(&stbObj);
×
312
    goto _OVER;
×
313
  }
314

315
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->outTblName, createReq.numOfColumns);
121✔
316

317
  tFreeSMCreateStbReq(&createReq);
121✔
318
  mndFreeStb(&stbObj);
121✔
319
  mndReleaseStb(pMnode, pStb);
121✔
320
  mndReleaseDb(pMnode, pDb);
121✔
321
  return code;
121✔
322

323
_OVER:
×
324
  tFreeSMCreateStbReq(&createReq);
×
325
  mndReleaseStb(pMnode, pStb);
×
326
  mndReleaseDb(pMnode, pDb);
×
327

328
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->outTblName, lino,
×
329
         tstrerror(code));
330
  return code;
×
331
}
332

333
static int32_t mndStreamValidateCreate(SMnode *pMnode, char* pUser, SCMCreateStreamReq* pCreate) {
243✔
334
  int32_t code = 0, lino = 0;
243✔
335
  int64_t streamId = pCreate->streamId;
243✔
336

337
  if (pCreate->streamDB) {
243!
338
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->streamDB);
243✔
339
    if (code) {
243✔
340
      mstsError("user %s failed to create stream %s in db %s since %s", pUser, pCreate->name, pCreate->streamDB, tstrerror(code));
20!
341
    }
342
    TSDB_CHECK_CODE(code, lino, _OVER);
243✔
343
  }
344

345
  if (pCreate->triggerDB) {
223✔
346
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, pCreate->triggerDB);
222✔
347
    if (code) {
222✔
348
      mstsError("user %s failed to create stream %s using trigger db %s since %s", pUser, pCreate->name, pCreate->triggerDB, tstrerror(code));
7!
349
    }
350
    TSDB_CHECK_CODE(code, lino, _OVER);
222✔
351
  }
352

353
  if (pCreate->calcDB) {
216!
354
    int32_t dbNum = taosArrayGetSize(pCreate->calcDB);
216✔
355
    for (int32_t i = 0; i < dbNum; ++i) {
431✔
356
      char* calcDB = taosArrayGetP(pCreate->calcDB, i);
215✔
357
      code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, calcDB);
215✔
358
      if (code) {
215!
359
        mstsError("user %s failed to create stream %s using calcDB %s since %s", pUser, pCreate->name, calcDB, tstrerror(code));
×
360
      }
361
      TSDB_CHECK_CODE(code, lino, _OVER);
215!
362
    }
363
  }
364

365
  if (pCreate->outDB) {
216✔
366
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->outDB);
215✔
367
    if (code) {
215!
368
      mstsError("user %s failed to create stream %s using out db %s since %s", pUser, pCreate->name, pCreate->outDB, tstrerror(code));
×
369
    }
370
    TSDB_CHECK_CODE(code, lino, _OVER);
215!
371
  }
372

373
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
216✔
374
  if (streamNum > MND_STREAM_MAX_NUM) {
216!
375
    code = TSDB_CODE_MND_TOO_MANY_STREAMS;
×
376
    mstsError("failed to create stream %s since %s, stream number:%d", pCreate->name, tstrerror(code), streamNum);
×
377
    return code;
×
378
  }
379

380
_OVER:
216✔
381

382
  return code;
243✔
383
}
384

385
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,780✔
386
  SSdb   *pSdb = pMnode->pSdb;
1,780✔
387
  void   *pIter = NULL;
1,780✔
388
  int32_t code = 0;
1,780✔
389

390
  while (1) {
×
391
    SStreamObj *pStream = NULL;
1,780✔
392
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
1,780✔
393
    if (pIter == NULL) break;
1,780!
394

395
    if (0 == strcmp(pStream->pCreate->streamDB, pDb->name)) {
×
396
      mInfo("start to drop stream %s in db %s", pStream->pCreate->name, pDb->name);
×
397
      
398
      pStream->updateTime = taosGetTimestampMs();
×
399
      
400
      atomic_store_8(&pStream->userDropped, 1);
×
401
      
402
      MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
×
403
      
404
      msmUndeployStream(pMnode, pStream->pCreate->streamId, pStream->pCreate->name);
×
405
      
406
      // drop stream
407
      code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
×
408
      if (code) {
×
409
        mError("drop db trans:%d failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
410
        sdbRelease(pSdb, pStream);
×
411
        sdbCancelFetch(pSdb, pIter);
×
412
        TAOS_RETURN(code);
×
413
      }
414
    }
415

416
    sdbRelease(pSdb, pStream);
×
417
  }
418

419
  return 0;
1,780✔
420
}
421

422
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
447✔
423
  SMnode     *pMnode = pReq->info.node;
447✔
424
  SSdb       *pSdb = pMnode->pSdb;
447✔
425
  int32_t     numOfRows = 0;
447✔
426
  SStreamObj *pStream = NULL;
447✔
427
  int32_t     code = 0;
447✔
428

429
  while (numOfRows < rows) {
1,621!
430
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
1,621✔
431
    if (pShow->pIter == NULL) break;
1,621✔
432

433
    code = mstSetStreamAttrResBlock(pMnode, pStream, pBlock, numOfRows);
1,174✔
434
    if (code == 0) {
1,174!
435
      numOfRows++;
1,174✔
436
    }
437
    sdbRelease(pSdb, pStream);
1,174✔
438
  }
439

440
  pShow->numOfRows += numOfRows;
447✔
441
  return numOfRows;
447✔
442
}
443

444
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
445
  SSdb *pSdb = pMnode->pSdb;
×
446
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
447
}
×
448

449
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
298✔
450
  SMnode     *pMnode = pReq->info.node;
298✔
451
  SSdb       *pSdb = pMnode->pSdb;
298✔
452
  int32_t     numOfRows = 0;
298✔
453
  SStreamObj *pStream = NULL;
298✔
454
  int32_t     code = 0;
298✔
455

456
  while (numOfRows < rowsCapacity) {
2,391✔
457
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
2,340✔
458
    if (pShow->pIter == NULL) {
2,340✔
459
      break;
247✔
460
    }
461

462
    code = mstSetStreamTasksResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
2,093✔
463

464
    sdbRelease(pSdb, pStream);
2,093✔
465
  }
466

467
  pShow->numOfRows += numOfRows;
298✔
468
  return numOfRows;
298✔
469
}
470

471
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
472
  SSdb *pSdb = pMnode->pSdb;
×
473
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
474
}
×
475

476
static int32_t mndRetrieveStreamRecalculates(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
1✔
477
  SMnode     *pMnode = pReq->info.node;
1✔
478
  SSdb       *pSdb = pMnode->pSdb;
1✔
479
  int32_t     numOfRows = 0;
1✔
480
  SStreamObj *pStream = NULL;
1✔
481
  int32_t     code = 0;
1✔
482

483
  while (numOfRows < rowsCapacity) {
2!
484
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
2✔
485
    if (pShow->pIter == NULL) {
2✔
486
      break;
1✔
487
    }
488

489
    code = mstSetStreamRecalculatesResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
1✔
490

491
    sdbRelease(pSdb, pStream);
1✔
492
  }
493

494
  pShow->numOfRows += numOfRows;
1✔
495
  return numOfRows;
1✔
496
}
497

498
static void mndCancelGetNextStreamRecalculates(SMnode *pMnode, void *pIter) {
×
499
  SSdb *pSdb = pMnode->pSdb;
×
500
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
501
}
×
502

503

504
static bool mndStreamUpdateTagsFlag(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
2,896✔
505
  SStreamObj *pStream = pObj;
2,896✔
506
  if (atomic_load_8(&pStream->userDropped)) {
2,896!
507
    return true;
×
508
  }
509

510
  if (TSDB_SUPER_TABLE != pStream->pCreate->triggerTblType && 
2,896✔
511
      TSDB_CHILD_TABLE != pStream->pCreate->triggerTblType && 
1,162✔
512
      TSDB_VIRTUAL_CHILD_TABLE != pStream->pCreate->triggerTblType) {
348✔
513
    return true;
122✔
514
  }
515

516
  if (pStream->pCreate->triggerTblSuid != *(uint64_t*)p1) {
2,774✔
517
    return true;
2,770✔
518
  }
519

520
  if (NULL == pStream->pCreate->partitionCols) {
4!
521
    return true;
×
522
  }
523

524
  SNodeList* pList = NULL;
4✔
525
  int32_t code = nodesStringToList(pStream->pCreate->partitionCols, &pList);
4✔
526
  if (code) {
4!
527
    nodesDestroyList(pList);
×
528
    mstError("partitionCols [%s] nodesStringToList failed with error:%s", (char*)pStream->pCreate->partitionCols, tstrerror(code));
×
529
    return true;
×
530
  }
531

532
  SSchema* pTags = (SSchema*)p2;
4✔
533
  int32_t* tagNum = (int32_t*)p3;
4✔
534

535
  SNode* pNode = NULL;
4✔
536
  FOREACH(pNode, pList) {
18!
537
    SColumnNode* pCol = (SColumnNode*)pNode;
14✔
538
    for (int32_t i = 0; i < *tagNum; ++i) {
76✔
539
      if (pCol->colId == pTags[i].colId) {
72✔
540
        pTags[i].flags |= COL_REF_BY_STM;
10✔
541
        break;
10✔
542
      }
543
    }
544
  }
545

546
  nodesDestroyList(pList);
4✔
547
  
548
  return true;
4✔
549
}
550

551

552
void mndStreamUpdateTagsRefFlag(SMnode *pMnode, int64_t suid, SSchema* pTags, int32_t tagNum) {
61,265✔
553
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
61,265✔
554
  if (streamNum <= 0) {
61,265✔
555
    return;
61,014✔
556
  }
557

558
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mndStreamUpdateTagsFlag, &suid, pTags, &tagNum);
251✔
559
}
560

561
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) {
21✔
562
  SMnode     *pMnode = pReq->info.node;
21✔
563
  SStreamObj *pStream = NULL;
21✔
564
  int32_t     code = 0;
21✔
565

566
  SMPauseStreamReq pauseReq = {0};
21✔
567
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
21!
568
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
569
  }
570

571
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
21✔
572
  if (pStream == NULL || code != 0) {
21!
573
    if (pauseReq.igNotExists) {
×
574
      mInfo("stream:%s, not exist, not stop stream", pauseReq.name);
×
575
      taosMemoryFree(pauseReq.name);
×
576
      return 0;
×
577
    } else {
578
      mError("stream:%s not exist, failed to stop stream", pauseReq.name);
×
579
      taosMemoryFree(pauseReq.name);
×
580
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
581
    }
582
  }
583

584
  taosMemoryFree(pauseReq.name);
21!
585

586
  int64_t streamId = pStream->pCreate->streamId;
21✔
587
  
588
  mstsInfo("start to stop stream %s", pStream->name);
21!
589

590
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
21✔
591
  if (code != TSDB_CODE_SUCCESS) {
21✔
592
    mstsError("user %s failed to stop stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
10!
593
    sdbRelease(pMnode->pSdb, pStream);
10✔
594
    return code;
10✔
595
  }
596

597
  if (atomic_load_8(&pStream->userDropped)) {
11!
598
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
599
    mstsError("user %s failed to stop stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
600
    sdbRelease(pMnode->pSdb, pStream);
×
601
    return code;
×
602
  }
603

604
  STrans *pTrans = NULL;
11✔
605
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, &pTrans);
11✔
606
  if (pTrans == NULL || code) {
11!
607
    mstsError("failed to stop stream %s since %s", pStream->name, tstrerror(code));
×
608
    sdbRelease(pMnode->pSdb, pStream);
×
609
    return code;
×
610
  }
611

612
  pStream->updateTime = taosGetTimestampMs();
11✔
613

614
  atomic_store_8(&pStream->userStopped, 1);
11✔
615

616
  MND_STREAM_SET_LAST_TS(STM_EVENT_STOP_STREAM, pStream->updateTime);
11!
617

618
  msmUndeployStream(pMnode, streamId, pStream->name);
11✔
619

620
  // stop stream
621
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
11✔
622
  if (code != TSDB_CODE_SUCCESS) {
11!
623
    sdbRelease(pMnode->pSdb, pStream);
×
624
    mndTransDrop(pTrans);
×
625
    return code;
×
626
  }
627

628
  code = mndTransPrepare(pMnode, pTrans);
11✔
629
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11!
630
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
×
631
    sdbRelease(pMnode->pSdb, pStream);
×
632
    mndTransDrop(pTrans);
×
633
    return code;
×
634
  }
635

636
  sdbRelease(pMnode->pSdb, pStream);
11✔
637
  mndTransDrop(pTrans);
11✔
638

639
  return TSDB_CODE_ACTION_IN_PROGRESS;
11✔
640
}
641

642

643
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq) {
21✔
644
  SMnode     *pMnode = pReq->info.node;
21✔
645
  SStreamObj *pStream = NULL;
21✔
646
  int32_t     code = 0;
21✔
647

648
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
21!
649
    return code;
×
650
  }
651

652
  SMResumeStreamReq resumeReq = {0};
21✔
653
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
21!
654
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
655
  }
656

657
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
21✔
658
  if (pStream == NULL || code != 0) {
21!
659
    if (resumeReq.igNotExists) {
×
660
      mInfo("stream:%s not exist, not start stream", resumeReq.name);
×
661
      taosMemoryFree(resumeReq.name);
×
662
      sdbRelease(pMnode->pSdb, pStream);
×
663
      return 0;
×
664
    } else {
665
      mError("stream:%s not exist, failed to start stream", resumeReq.name);
×
666
      taosMemoryFree(resumeReq.name);
×
667
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
668
    }
669
  }
670

671
  taosMemoryFree(resumeReq.name);
21!
672

673
  int64_t streamId = pStream->pCreate->streamId;
21✔
674

675
  mstsInfo("start to start stream %s from stopped", pStream->name);
21!
676

677
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
21✔
678
  if (code != TSDB_CODE_SUCCESS) {
21✔
679
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
10!
680
    sdbRelease(pMnode->pSdb, pStream);
10✔
681
    return code;
10✔
682
  }
683

684
  if (atomic_load_8(&pStream->userDropped)) {
11!
685
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
686
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
687
    sdbRelease(pMnode->pSdb, pStream);
×
688
    return code;
×
689
  }
690

691
  if (0 == atomic_load_8(&pStream->userStopped)) {
11!
692
    code = TSDB_CODE_MND_STREAM_NOT_STOPPED;
×
693
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, pStream->name, tstrerror(code));
×
694
    sdbRelease(pMnode->pSdb, pStream);
×
695
    return code;
×
696
  }
697
  
698
  atomic_store_8(&pStream->userStopped, 0);
11✔
699

700
  pStream->updateTime = taosGetTimestampMs();
11✔
701

702
  MND_STREAM_SET_LAST_TS(STM_EVENT_START_STREAM, pStream->updateTime);
11!
703

704
  STrans *pTrans = NULL;
11✔
705
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_START_NAME, &pTrans);
11✔
706
  if (pTrans == NULL || code) {
11!
707
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
708
    sdbRelease(pMnode->pSdb, pStream);
×
709
    return code;
×
710
  }
711

712
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
11✔
713
  if (code != TSDB_CODE_SUCCESS) {
11!
714
    mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code));
×
715
    sdbRelease(pMnode->pSdb, pStream);
×
716
    mndTransDrop(pTrans);
×
717
    return code;
×
718
  }
719

720
  code = mndTransPrepare(pMnode, pTrans);
11✔
721
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
11!
722
    mstsError("trans:%d, failed to prepare start stream %s trans since %s", pTrans->id, pStream->name, tstrerror(code));
×
723
    sdbRelease(pMnode->pSdb, pStream);
×
724
    mndTransDrop(pTrans);
×
725
    return code;
×
726
  }
727

728
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->name, NULL, true, STREAM_ACT_DEPLOY);
11✔
729

730
  sdbRelease(pMnode->pSdb, pStream);
11✔
731
  mndTransDrop(pTrans);
11✔
732

733
  return TSDB_CODE_ACTION_IN_PROGRESS;
11✔
734
}
735

736

737
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
29✔
738
  SMnode     *pMnode = pReq->info.node;
29✔
739
  SStreamObj *pStream = NULL;
29✔
740
  int32_t     code = 0;
29✔
741

742
  SMDropStreamReq dropReq = {0};
29✔
743
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
29!
744
    mError("invalid drop stream msg recv, discarded");
×
745
    code = TSDB_CODE_INVALID_MSG;
×
746
    TAOS_RETURN(code);
×
747
  }
748

749
  mDebug("recv drop stream:%s msg", dropReq.name);
29!
750

751
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
29✔
752
  if (pStream == NULL || code != 0) {
29!
753
    if (dropReq.igNotExists) {
×
754
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
×
755
      sdbRelease(pMnode->pSdb, pStream);
×
756
      tFreeMDropStreamReq(&dropReq);
×
757
      return 0;
×
758
    } else {
759
      mError("stream:%s not exist failed to drop it", dropReq.name);
×
760
      tFreeMDropStreamReq(&dropReq);
×
761
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
762
    }
763
  }
764

765
  int64_t streamId = pStream->pCreate->streamId;
29✔
766

767
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
29✔
768
  if (code != 0) {
29✔
769
    mstsError("user %s failed to drop stream %s since %s", pReq->info.conn.user, dropReq.name, tstrerror(code));
10!
770
    sdbRelease(pMnode->pSdb, pStream);
10✔
771
    tFreeMDropStreamReq(&dropReq);
10✔
772
    return code;
10✔
773
  }
774

775
  if (pStream->pCreate->tsmaId != 0) {
19!
776
    mstsDebug("try to drop tsma related stream, tsmaId:%" PRIx64, pStream->pCreate->tsmaId);
×
777

778
    void    *pIter = NULL;
×
779
    SSmaObj *pSma = NULL;
×
780
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
781
    while (pIter) {
×
782
      if (pSma && pSma->uid == pStream->pCreate->tsmaId) {
×
783
        sdbRelease(pMnode->pSdb, pSma);
×
784
        sdbRelease(pMnode->pSdb, pStream);
×
785

786
        sdbCancelFetch(pMnode->pSdb, pIter);
×
787
        tFreeMDropStreamReq(&dropReq);
×
788
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
789

790
        mstsError("refused to drop tsma-related stream %s since tsma still exists", dropReq.name);
×
791
        TAOS_RETURN(code);
×
792
      }
793

794
      if (pSma) {
×
795
        sdbRelease(pMnode->pSdb, pSma);
×
796
      }
797

798
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
799
    }
800
  }
801

802
  mstsInfo("start to drop stream %s", pStream->pCreate->name);
19!
803

804
  pStream->updateTime = taosGetTimestampMs();
19✔
805

806
  atomic_store_8(&pStream->userDropped, 1);
19✔
807

808
  MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
19!
809

810
  msmUndeployStream(pMnode, streamId, pStream->pCreate->name);
19✔
811

812
  STrans *pTrans = NULL;
19✔
813
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, &pTrans);
19✔
814
  if (pTrans == NULL || code) {
19!
815
    mstsError("failed to drop stream %s since %s", dropReq.name, tstrerror(code));
×
816
    sdbRelease(pMnode->pSdb, pStream);
×
817
    tFreeMDropStreamReq(&dropReq);
×
818
    TAOS_RETURN(code);
×
819
  }
820

821
  // drop stream
822
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
19✔
823
  if (code) {
19!
824
    mstsError("trans:%d, failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
825
    sdbRelease(pMnode->pSdb, pStream);
×
826
    mndTransDrop(pTrans);
×
827
    tFreeMDropStreamReq(&dropReq);
×
828
    TAOS_RETURN(code);
×
829
  }
830

831
  code = mndTransPrepare(pMnode, pTrans);
19✔
832
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
19!
833
    mstsError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
834
    sdbRelease(pMnode->pSdb, pStream);
×
835
    mndTransDrop(pTrans);
×
836
    tFreeMDropStreamReq(&dropReq);
×
837
    TAOS_RETURN(code);
×
838
  }
839

840
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", pStream->pCreate->streamDB, NULL, 0);
19✔
841

842
  sdbRelease(pMnode->pSdb, pStream);
19✔
843
  mndTransDrop(pTrans);
19✔
844

845
  mstsDebug("drop stream %s half completed", dropReq.name);
19!
846
  code = TSDB_CODE_ACTION_IN_PROGRESS;
19✔
847

848
  tFreeMDropStreamReq(&dropReq);
19✔
849
  
850
  TAOS_RETURN(code);
19✔
851
}
852

853
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
273✔
854
  SMnode     *pMnode = pReq->info.node;
273✔
855
  SStreamObj *pStream = NULL;
273✔
856
  SStreamObj  streamObj = {0};
273✔
857
  int32_t     code = TSDB_CODE_SUCCESS;
273✔
858
  int32_t     lino = 0;
273✔
859
  STrans     *pTrans = NULL;
273✔
860
  uint64_t    streamId = 0;
273✔
861
  SCMCreateStreamReq* pCreate = NULL;
273✔
862

863
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
273!
864
    goto _OVER;
×
865
  }
866
  
867
#ifdef WINDOWS
868
  code = TSDB_CODE_MND_INVALID_PLATFORM;
869
  goto _OVER;
870
#endif
871

872
  pCreate = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
273!
873
  TSDB_CHECK_NULL(pCreate, code, lino, _OVER, terrno);
273!
874
  
875
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, pCreate);
273✔
876
  TSDB_CHECK_CODE(code, lino, _OVER);
273!
877

878
  streamId = pCreate->streamId;
273✔
879

880
  mstsInfo("start to create stream %s, sql:%s", pCreate->name, pCreate->sql);
273!
881

882
  int32_t snodeId = msmAssignRandomSnodeId(pMnode, streamId);
273✔
883
  if (!GOT_SNODE(snodeId)) {
273✔
884
    code = terrno;
10✔
885
    TSDB_CHECK_CODE(code, lino, _OVER);
10!
886
  }
887
  
888
  code = mndAcquireStream(pMnode, pCreate->name, &pStream);
263✔
889
  if (pStream != NULL && code == 0) {
263!
890
    if (pCreate->igExists) {
20!
891
      mstsInfo("stream %s already exist, ignore exist is set", pCreate->name);
×
892
    } else {
893
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
20✔
894
    }
895

896
    mndReleaseStream(pMnode, pStream);
20✔
897
    goto _OVER;
20✔
898
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
243!
899
    goto _OVER;
×
900
  }
901

902
  code = mndStreamValidateCreate(pMnode, pReq->info.conn.user, pCreate);
243✔
903
  TSDB_CHECK_CODE(code, lino, _OVER);
243✔
904

905
  mndStreamBuildObj(pMnode, &streamObj, pCreate, snodeId);
216✔
906
  pCreate = NULL;
216✔
907

908
  pStream = &streamObj;
216✔
909

910
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, &pTrans);
216✔
911
  if (pTrans == NULL || code) {
216!
912
    goto _OVER;
×
913
  }
914

915
  // create stb for stream
916
  if (TSDB_SUPER_TABLE == pStream->pCreate->outTblType && !pStream->pCreate->outStbExists) {
216✔
917
    pStream->pCreate->outStbUid = mndGenerateUid(pStream->pCreate->outTblName, strlen(pStream->pCreate->outTblName));
121✔
918
    code = mndStreamCreateOutStb(pMnode, pTrans, pStream->pCreate, pReq->info.conn.user);
121✔
919
    TSDB_CHECK_CODE(code, lino, _OVER);
121!
920
  }
921

922
  // add stream to trans
923
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
216✔
924
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
216!
925
    mstsError("failed to persist stream %s since %s", pStream->pCreate->name, tstrerror(code));
×
926
    goto _OVER;
×
927
  }
928

929
  // execute creation
930
  code = mndTransPrepare(pMnode, pTrans);
216✔
931
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
216!
932
    mstsError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
933
    goto _OVER;
×
934
  }
935
  code = TSDB_CODE_ACTION_IN_PROGRESS;
216✔
936

937
  auditRecord(pReq, pMnode->clusterId, "createStream", pStream->pCreate->streamDB, pStream->pCreate->name, pStream->pCreate->sql, strlen(pStream->pCreate->sql));
216✔
938

939
  MND_STREAM_SET_LAST_TS(STM_EVENT_CREATE_STREAM, taosGetTimestampMs());
338✔
940

941
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, true, STREAM_ACT_DEPLOY);
216✔
942

943
_OVER:
273✔
944

945
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
273!
946
    if (pStream && pStream->pCreate) {
57!
947
      mstsError("failed to create stream %s at line:%d since %s", pStream->pCreate->name, lino, tstrerror(code));
20!
948
    } else {
949
      mstsError("failed to create stream at line:%d since %s", lino, tstrerror(code));
37!
950
    }
951
  } else {
952
    mstsDebug("create stream %s half completed", pStream->pCreate ? pStream->pCreate->name : "unknown");
216!
953
  }
954

955
  tFreeSCMCreateStreamReq(pCreate);
273✔
956
  taosMemoryFreeClear(pCreate);
273!
957

958
  mndTransDrop(pTrans);
273✔
959
  tFreeStreamObj(&streamObj);
273✔
960

961
  return code;
273✔
962
}
963

964
static int32_t mndProcessRecalcStreamReq(SRpcMsg *pReq) {
21✔
965
  SMnode     *pMnode = pReq->info.node;
21✔
966
  SStreamObj *pStream = NULL;
21✔
967
  int32_t     code = 0;
21✔
968

969
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
21!
970
    return code;
×
971
  }
972

973
  SMRecalcStreamReq recalcReq = {0};
21✔
974
  if (tDeserializeSMRecalcStreamReq(pReq->pCont, pReq->contLen, &recalcReq) < 0) {
21!
975
    tFreeMRecalcStreamReq(&recalcReq);
×
976
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
977
  }
978

979
  code = mndAcquireStream(pMnode, recalcReq.name, &pStream);
21✔
980
  if (pStream == NULL || code != 0) {
21!
981
    mError("stream:%s not exist, failed to recalc stream", recalcReq.name);
×
982
    tFreeMRecalcStreamReq(&recalcReq);
×
983
    TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
984
  }
985

986
  int64_t streamId = pStream->pCreate->streamId;
21✔
987
  
988
  mstsInfo("start to recalc stream %s", recalcReq.name);
21!
989

990
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
21✔
991
  if (code != TSDB_CODE_SUCCESS) {
21✔
992
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
20!
993
    sdbRelease(pMnode->pSdb, pStream);
20✔
994
    tFreeMRecalcStreamReq(&recalcReq);
20✔
995
    return code;
20✔
996
  }
997

998
  if (atomic_load_8(&pStream->userDropped)) {
1!
999
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
1000
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
1001
    sdbRelease(pMnode->pSdb, pStream);
×
1002
    tFreeMRecalcStreamReq(&recalcReq);
×
1003
    return code;
×
1004
  }
1005

1006
  if (atomic_load_8(&pStream->userStopped)) {
1!
1007
    code = TSDB_CODE_MND_STREAM_STOPPED;
×
1008
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
1009
    sdbRelease(pMnode->pSdb, pStream);
×
1010
    tFreeMRecalcStreamReq(&recalcReq);
×
1011
    return code;
×
1012
  }
1013

1014
  if (WINDOW_TYPE_PERIOD == pStream->pCreate->triggerType) {
1!
1015
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1016
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
×
1017
    sdbRelease(pMnode->pSdb, pStream);
×
1018
    tFreeMRecalcStreamReq(&recalcReq);
×
1019
    return code;
×
1020
  }
1021

1022
  /*
1023
  pStream->updateTime = taosGetTimestampMs();
1024

1025
  STrans *pTrans = NULL;
1026
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RECALC_NAME, &pTrans);
1027
  if (pTrans == NULL || code) {
1028
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
1029
    sdbRelease(pMnode->pSdb, pStream);
1030
    return code;
1031
  }
1032

1033
  // stop stream
1034
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
1035
  if (code != TSDB_CODE_SUCCESS) {
1036
    sdbRelease(pMnode->pSdb, pStream);
1037
    mndTransDrop(pTrans);
1038
    return code;
1039
  }
1040

1041
  code = mndTransPrepare(pMnode, pTrans);
1042
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1043
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
1044
    sdbRelease(pMnode->pSdb, pStream);
1045
    mndTransDrop(pTrans);
1046
    return code;
1047
  }
1048
*/
1049

1050
  code = msmRecalcStream(pMnode, pStream->pCreate->streamId, &recalcReq.timeRange);
1✔
1051
  if (code != TSDB_CODE_SUCCESS) {
1!
1052
    sdbRelease(pMnode->pSdb, pStream);
×
1053
    tFreeMRecalcStreamReq(&recalcReq);
×
1054
    return code;
×
1055
  }
1056
  
1057
  char buf[128];
1058
  snprintf(buf, sizeof(buf), "start:%" PRId64 ", end:%" PRId64, recalcReq.timeRange.skey, recalcReq.timeRange.ekey);
1✔
1059
  auditRecord(pReq, pMnode->clusterId, "recalcStream", pStream->name, recalcReq.name, buf, strlen(buf));
1✔
1060

1061
  sdbRelease(pMnode->pSdb, pStream);
1✔
1062
  tFreeMRecalcStreamReq(&recalcReq);
1✔
1063
//  mndTransDrop(pTrans);
1064

1065
  return TSDB_CODE_SUCCESS;
1✔
1066
}
1067

1068

1069
int32_t mndInitStream(SMnode *pMnode) {
2,559✔
1070
  SSdbTable table = {
2,559✔
1071
      .sdbType = SDB_STREAM,
1072
      .keyType = SDB_KEY_BINARY,
1073
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
1074
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
1075
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
1076
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
1077
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
1078
  };
1079

1080
  if (!tsDisableStream) {
2,559!
1081
    mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
2,559✔
1082
    mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
2,559✔
1083
    mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessStartStreamReq);
2,559✔
1084
    mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessStopStreamReq);
2,559✔
1085
    mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);  
2,559✔
1086
    mndSetMsgHandle(pMnode, TDMT_MND_RECALC_STREAM, mndProcessRecalcStreamReq);
2,559✔
1087
  }
1088
  
1089
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
2,559✔
1090
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
2,559✔
1091
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
2,559✔
1092
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
2,559✔
1093
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndRetrieveStreamRecalculates);
2,559✔
1094
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndCancelGetNextStreamRecalculates);
2,559✔
1095

1096
  int32_t code = sdbSetTable(pMnode->pSdb, table);
2,559✔
1097
  if (code) {
2,559!
1098
    return code;
×
1099
  }
1100

1101
  //code = sdbSetTable(pMnode->pSdb, tableSeq);
1102
  return code;
2,559✔
1103
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc