• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.36
/source/dnode/mnode/impl/src/mndStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "audit.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndTrans.h"
23
#include "osMemory.h"
24
#include "parser.h"
25
#include "taoserror.h"
26
#include "tmisce.h"
27
#include "tname.h"
28

29
#define MND_STREAM_MAX_NUM 100000
30

31
typedef struct {
32
  int8_t placeHolder;  // // to fix windows compile error, define place holder
33
} SMStreamNodeCheckMsg;
34

35
static int32_t  mndNodeCheckSentinel = 0;
36
SStmRuntime  mStreamMgmt = {0};
37

38
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
39
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
40
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
41
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
42

43
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
44
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
45

46
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
static void    mndCancelGetNextStream(SMnode *pMnode, void *pIter);
48
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
50
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq);
51
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq);
52

53
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
54

55
SSdbRaw       *mndStreamSeqActionEncode(SStreamObj *pStream);
56
SSdbRow       *mndStreamSeqActionDecode(SSdbRaw *pRaw);
57
static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
58
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
59
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
60
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
61

62
void mndCleanupStream(SMnode *pMnode) {
15✔
63
  //STREAMTODO
64
  mDebug("mnd stream runtime info cleanup");
15!
65
}
15✔
66

67
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
×
68
  int32_t     code = 0;
×
69
  int32_t     lino = 0;
×
70
  SSdbRow    *pRow = NULL;
×
71
  SStreamObj *pStream = NULL;
×
72
  void       *buf = NULL;
×
73
  int8_t      sver = 0;
×
74
  int32_t     tlen;
75
  int32_t     dataPos = 0;
×
76

77
  code = sdbGetRawSoftVer(pRaw, &sver);
×
78
  TSDB_CHECK_CODE(code, lino, _over);
×
79

80
  if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
×
81
    mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
×
82
    goto _over;
×
83
  }
84

85
  pRow = sdbAllocRow(sizeof(SStreamObj));
×
86
  TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
×
87

88
  pStream = sdbGetRowObj(pRow);
×
89
  TSDB_CHECK_NULL(pStream, code, lino, _over, terrno);
×
90

91
  SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
×
92

93
  buf = taosMemoryMalloc(tlen + 1);
×
94
  TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
×
95

96
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
×
97

98
  SDecoder decoder;
99
  tDecoderInit(&decoder, buf, tlen + 1);
×
100
  code = tDecodeSStreamObj(&decoder, pStream, sver);
×
101
  tDecoderClear(&decoder);
×
102

103
  if (code < 0) {
×
104
    tFreeStreamObj(pStream);
×
105
  }
106

107
_over:
×
108
  taosMemoryFreeClear(buf);
×
109

110
  if (code != TSDB_CODE_SUCCESS) {
×
111
    char *p = (pStream == NULL) ? "null" : pStream->pCreate->name;
×
112
    mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino);
×
113
    taosMemoryFreeClear(pRow);
×
114

115
    terrno = code;
×
116
    return NULL;
×
117
  } else {
118
    mTrace("stream:%s, decode from raw:%p, row:%p", pStream->pCreate->name, pRaw, pStream);
×
119

120
    terrno = 0;
×
121
    return pRow;
×
122
  }
123
}
124

125
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
×
126
  mTrace("stream:%s, perform insert action", pStream->pCreate->name);
×
127
  return 0;
×
128
}
129

130
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
×
131
  mInfo("stream:%s, perform delete action", pStream->pCreate->name);
×
132
  tFreeStreamObj(pStream);
×
133
  return 0;
×
134
}
135

136
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
×
137
  mTrace("stream:%s, perform update action", pOldStream->pCreate->name);
×
138

139
  atomic_store_32(&pOldStream->mainSnodeId, pNewStream->mainSnodeId);
×
140
  atomic_store_8(&pOldStream->userStopped, atomic_load_8(&pNewStream->userStopped));
×
141
  pOldStream->updateTime = pNewStream->updateTime;
×
142
  
143
  return 0;
×
144
}
145

146
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
×
147
  int32_t code = 0;
×
148
  SSdb   *pSdb = pMnode->pSdb;
×
149
  (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
150
  if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
151
    code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
152
  }
153
  return code;
×
154
}
155

156
static bool mndStreamGetNameFromId(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
157
  SStreamObj* pStream = pObj;
×
158

159
  if (pStream->pCreate->streamId == *(int64_t*)p1) {
×
160
    strncpy((char*)p2, pStream->name, TSDB_STREAM_NAME_LEN);
×
161
    return false;
×
162
  }
163

164
  return true;
×
165
}
166

167
int32_t mndAcquireStreamById(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
×
168
  int32_t code = 0;
×
169
  SSdb   *pSdb = pMnode->pSdb;
×
170
  char streamName[TSDB_STREAM_NAME_LEN];
171
  streamName[0] = 0;
×
172
  
173
  sdbTraverse(pSdb, SDB_STREAM, mndStreamGetNameFromId, &streamId, streamName, NULL);
×
174
  if (streamName[0]) {
×
175
    (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
×
176
    if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
177
      code = TSDB_CODE_MND_STREAM_NOT_EXIST;
×
178
    }
179
  }
180
  
181
  return code;
×
182
}
183

184
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
×
185
  SSdb *pSdb = pMnode->pSdb;
×
186
  sdbRelease(pSdb, pStream);
×
187
}
×
188

189
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
×
190
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
×
191
int32_t  mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
192
int32_t  mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
×
193
int32_t  mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
×
194

195
static int32_t mndStreamBuildObj(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate, int32_t snodeId) {
×
196
  int32_t     code = 0;
×
197

198
  pObj->pCreate = pCreate;
×
199
  strncpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
×
200
  pObj->mainSnodeId = snodeId;
×
201
  
202
  pObj->userDropped = 0;
×
203
  pObj->userStopped = 0;
×
204
  
205
  pObj->createTime = taosGetTimestampMs();
×
206
  pObj->updateTime = pObj->createTime;
×
207

208
  mstLogSStreamObj("create stream", pObj);
×
209

210
  return code;
×
211
}
212

213
static int32_t mndStreamCreateOutStb(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream, const char *user) {
×
214
  SStbObj *pStb = NULL;
×
215
  SDbObj  *pDb = NULL;
×
216
  int32_t  code = 0;
×
217
  int32_t  lino = 0;
×
218

219
  SMCreateStbReq createReq = {0};
×
220
  TAOS_STRNCAT(createReq.name, pStream->outDB, TSDB_DB_FNAME_LEN);
×
221
  TAOS_STRNCAT(createReq.name, ".", 2);
×
222
  TAOS_STRNCAT(createReq.name,  pStream->outTblName, TSDB_TABLE_NAME_LEN);
×
223
  createReq.numOfColumns = taosArrayGetSize(pStream->outCols);
×
224
  createReq.numOfTags = pStream->outTags ? taosArrayGetSize(pStream->outTags) : 1;
×
225
  createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
×
226
  TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
×
227

228
  // build fields
229
  for (int32_t i = 0; i < createReq.numOfColumns; i++) {
×
230
    SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
×
231
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
232
    SFieldWithOptions *pSrc = taosArrayGet(pStream->outCols, i);
×
233

234
    tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
×
235
    pField->flags = 0;
×
236
    pField->type = pSrc->type;
×
237
    pField->bytes = pSrc->bytes;
×
238
    pField->compress = createDefaultColCmprByType(pField->type);
×
239
    if (IS_DECIMAL_TYPE(pField->type)) {
×
240
      pField->typeMod = pSrc->typeMod;
×
241
      pField->flags |= COL_HAS_TYPE_MOD;
×
242
    }
243
  }
244

245
  if (NULL == pStream->outTags) {
×
246
    createReq.numOfTags = 1;
×
247
    createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
×
248
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
249

250
    // build tags
251
    SField *pField = taosArrayGet(createReq.pTags, 0);
×
252
    TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
×
253

254
    tstrncpy(pField->name, "group_id", sizeof(pField->name));
×
255
    pField->type = TSDB_DATA_TYPE_UBIGINT;
×
256
    pField->flags = 0;
×
257
    pField->bytes = 8;
×
258
  } else {
259
    createReq.numOfTags = taosArrayGetSize(pStream->outTags);
×
260
    createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
×
261
    TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
×
262

263
    for (int32_t i = 0; i < createReq.numOfTags; i++) {
×
264
      SField *pField = taosArrayGet(createReq.pTags, i);
×
265
      if (pField == NULL) {
×
266
        continue;
×
267
      }
268

269
      TAOS_FIELD_E *pSrc = taosArrayGet(pStream->outTags, i);
×
270
      pField->bytes = pSrc->bytes;
×
271
      pField->flags = 0;
×
272
      pField->type = pSrc->type;
×
273
      tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN);
×
274
    }
275
  }
276

277
  if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
×
278
    goto _OVER;
×
279
  }
280

281
  pStb = mndAcquireStb(pMnode, createReq.name);
×
282
  if (pStb != NULL) {
×
283
    code = TSDB_CODE_MND_STB_ALREADY_EXIST;
×
284
    goto _OVER;
×
285
  }
286

287
  pDb = mndAcquireDbByStb(pMnode, createReq.name);
×
288
  if (pDb == NULL) {
×
289
    code = TSDB_CODE_MND_DB_NOT_SELECTED;
×
290
    goto _OVER;
×
291
  }
292

293
  int32_t numOfStbs = -1;
×
294
  if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
×
295
    goto _OVER;
×
296
  }
297

298
  if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
×
299
    code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
×
300
    goto _OVER;
×
301
  }
302

303
  SStbObj stbObj = {0};
×
304

305
  if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
×
306
    goto _OVER;
×
307
  }
308

309
  stbObj.uid = pStream->outStbUid;
×
310

311
  if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
×
312
    mndFreeStb(&stbObj);
×
313
    goto _OVER;
×
314
  }
315

316
  mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->outTblName, createReq.numOfColumns);
×
317

318
  tFreeSMCreateStbReq(&createReq);
×
319
  mndFreeStb(&stbObj);
×
320
  mndReleaseStb(pMnode, pStb);
×
321
  mndReleaseDb(pMnode, pDb);
×
322
  return code;
×
323

324
_OVER:
×
325
  tFreeSMCreateStbReq(&createReq);
×
326
  mndReleaseStb(pMnode, pStb);
×
327
  mndReleaseDb(pMnode, pDb);
×
328

329
  mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->outTblName, lino,
×
330
         tstrerror(code));
331
  return code;
×
332
}
333

334
static int32_t mndStreamValidateCreate(SMnode *pMnode, char* pUser, SCMCreateStreamReq* pCreate) {
×
335
  int32_t code = 0, lino = 0;
×
336
  int64_t streamId = pCreate->streamId;
×
337

338
  if (pCreate->streamDB) {
×
339
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->streamDB);
×
340
    if (code) {
×
341
      mstsError("user %s failed to create stream %s in db %s since %s", pUser, pCreate->name, pCreate->streamDB, tstrerror(code));
×
342
    }
343
    TSDB_CHECK_CODE(code, lino, _OVER);
×
344
  }
345

346
  if (pCreate->triggerDB) {
×
347
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, pCreate->triggerDB);
×
348
    if (code) {
×
349
      mstsError("user %s failed to create stream %s using trigger db %s since %s", pUser, pCreate->name, pCreate->triggerDB, tstrerror(code));
×
350
    }
351
    TSDB_CHECK_CODE(code, lino, _OVER);
×
352
  }
353

354
  if (pCreate->calcDB) {
×
355
    int32_t dbNum = taosArrayGetSize(pCreate->calcDB);
×
356
    for (int32_t i = 0; i < dbNum; ++i) {
×
357
      char* calcDB = taosArrayGetP(pCreate->calcDB, i);
×
358
      code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, calcDB);
×
359
      if (code) {
×
360
        mstsError("user %s failed to create stream %s using calcDB %s since %s", pUser, pCreate->name, calcDB, tstrerror(code));
×
361
      }
362
      TSDB_CHECK_CODE(code, lino, _OVER);
×
363
    }
364
  }
365

366
  if (pCreate->outDB) {
×
367
    code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->outDB);
×
368
    if (code) {
×
369
      mstsError("user %s failed to create stream %s using out db %s since %s", pUser, pCreate->name, pCreate->outDB, tstrerror(code));
×
370
    }
371
    TSDB_CHECK_CODE(code, lino, _OVER);
×
372
  }
373

374
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
×
375
  if (streamNum > MND_STREAM_MAX_NUM) {
×
376
    code = TSDB_CODE_MND_TOO_MANY_STREAMS;
×
377
    mstsError("failed to create stream %s since %s, stream number:%d", pCreate->name, tstrerror(code), streamNum);
×
378
    return code;
×
379
  }
380

381
_OVER:
×
382

383
  return code;
×
384
}
385

386
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
8✔
387
  SSdb   *pSdb = pMnode->pSdb;
8✔
388
  void   *pIter = NULL;
8✔
389
  int32_t code = 0;
8✔
390

391
  while (1) {
×
392
    SStreamObj *pStream = NULL;
8✔
393
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
8✔
394
    if (pIter == NULL) break;
8!
395

396
    if (0 == strcmp(pStream->pCreate->streamDB, pDb->name)) {
×
397
      mInfo("start to drop stream %s in db %s", pStream->pCreate->name, pDb->name);
×
398
      
399
      pStream->updateTime = taosGetTimestampMs();
×
400
      
401
      atomic_store_8(&pStream->userDropped, 1);
×
402
      
403
      MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
×
404
      
405
      msmUndeployStream(pMnode, pStream->pCreate->streamId, pStream->pCreate->name);
×
406
      
407
      // drop stream
408
      code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
×
409
      if (code) {
×
410
        mError("drop db trans:%d failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
411
        sdbRelease(pSdb, pStream);
×
412
        sdbCancelFetch(pSdb, pIter);
×
413
        TAOS_RETURN(code);
×
414
      }
415
    }
416

417
    sdbRelease(pSdb, pStream);
×
418
  }
419

420
  return 0;
8✔
421
}
422

423
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
424
  SMnode     *pMnode = pReq->info.node;
×
425
  SSdb       *pSdb = pMnode->pSdb;
×
426
  int32_t     numOfRows = 0;
×
427
  SStreamObj *pStream = NULL;
×
428
  int32_t     code = 0;
×
429

430
  while (numOfRows < rows) {
×
431
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
×
432
    if (pShow->pIter == NULL) break;
×
433

434
    code = mstSetStreamAttrResBlock(pMnode, pStream, pBlock, numOfRows);
×
435
    if (code == 0) {
×
436
      numOfRows++;
×
437
    }
438
    sdbRelease(pSdb, pStream);
×
439
  }
440

441
  pShow->numOfRows += numOfRows;
×
442
  return numOfRows;
×
443
}
444

445
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
×
446
  SSdb *pSdb = pMnode->pSdb;
×
447
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
448
}
×
449

450
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
×
451
  SMnode     *pMnode = pReq->info.node;
×
452
  SSdb       *pSdb = pMnode->pSdb;
×
453
  int32_t     numOfRows = 0;
×
454
  SStreamObj *pStream = NULL;
×
455
  int32_t     code = 0;
×
456

457
  while (numOfRows < rowsCapacity) {
×
458
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
×
459
    if (pShow->pIter == NULL) {
×
460
      break;
×
461
    }
462

463
    code = mstSetStreamTasksResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
×
464

465
    sdbRelease(pSdb, pStream);
×
466
  }
467

468
  pShow->numOfRows += numOfRows;
×
469
  return numOfRows;
×
470
}
471

472
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
×
473
  SSdb *pSdb = pMnode->pSdb;
×
474
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
475
}
×
476

477
static int32_t mndRetrieveStreamRecalculates(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
×
478
  SMnode     *pMnode = pReq->info.node;
×
479
  SSdb       *pSdb = pMnode->pSdb;
×
480
  int32_t     numOfRows = 0;
×
481
  SStreamObj *pStream = NULL;
×
482
  int32_t     code = 0;
×
483

484
  while (numOfRows < rowsCapacity) {
×
485
    pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
×
486
    if (pShow->pIter == NULL) {
×
487
      break;
×
488
    }
489

490
    code = mstSetStreamRecalculatesResBlock(pStream, pBlock, &numOfRows, rowsCapacity);
×
491

492
    sdbRelease(pSdb, pStream);
×
493
  }
494

495
  pShow->numOfRows += numOfRows;
×
496
  return numOfRows;
×
497
}
498

499
static void mndCancelGetNextStreamRecalculates(SMnode *pMnode, void *pIter) {
×
500
  SSdb *pSdb = pMnode->pSdb;
×
501
  sdbCancelFetchByType(pSdb, pIter, SDB_STREAM);
×
502
}
×
503

504

505
static bool mndStreamUpdateTagsFlag(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
506
  SStreamObj *pStream = pObj;
×
507
  if (atomic_load_8(&pStream->userDropped)) {
×
508
    return true;
×
509
  }
510

511
  if (TSDB_SUPER_TABLE != pStream->pCreate->triggerTblType && 
×
512
      TSDB_CHILD_TABLE != pStream->pCreate->triggerTblType && 
×
513
      TSDB_VIRTUAL_CHILD_TABLE != pStream->pCreate->triggerTblType) {
×
514
    return true;
×
515
  }
516

517
  if (pStream->pCreate->triggerTblSuid != *(uint64_t*)p1) {
×
518
    return true;
×
519
  }
520

521
  if (NULL == pStream->pCreate->partitionCols) {
×
522
    return true;
×
523
  }
524

525
  SNodeList* pList = NULL;
×
526
  int32_t code = nodesStringToList(pStream->pCreate->partitionCols, &pList);
×
527
  if (code) {
×
528
    nodesDestroyList(pList);
×
529
    mstError("partitionCols [%s] nodesStringToList failed with error:%s", (char*)pStream->pCreate->partitionCols, tstrerror(code));
×
530
    return true;
×
531
  }
532

533
  SSchema* pTags = (SSchema*)p2;
×
534
  int32_t* tagNum = (int32_t*)p3;
×
535

536
  SNode* pNode = NULL;
×
537
  FOREACH(pNode, pList) {
×
538
    SColumnNode* pCol = (SColumnNode*)pNode;
×
539
    for (int32_t i = 0; i < *tagNum; ++i) {
×
540
      if (pCol->colId == pTags[i].colId) {
×
541
        pTags[i].flags |= COL_REF_BY_STM;
×
542
        break;
×
543
      }
544
    }
545
  }
546

547
  nodesDestroyList(pList);
×
548
  
549
  return true;
×
550
}
551

552

553
void mndStreamUpdateTagsRefFlag(SMnode *pMnode, int64_t suid, SSchema* pTags, int32_t tagNum) {
5✔
554
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
5✔
555
  if (streamNum <= 0) {
5!
556
    return;
5✔
557
  }
558

559
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mndStreamUpdateTagsFlag, &suid, pTags, &tagNum);
×
560
}
561

562
static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) {
×
563
  SMnode     *pMnode = pReq->info.node;
×
564
  SStreamObj *pStream = NULL;
×
565
  int32_t     code = 0;
×
566

567
  SMPauseStreamReq pauseReq = {0};
×
568
  if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
×
569
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
570
  }
571

572
  code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
×
573
  if (pStream == NULL || code != 0) {
×
574
    if (pauseReq.igNotExists) {
×
575
      mInfo("stream:%s, not exist, not stop stream", pauseReq.name);
×
576
      return 0;
×
577
    } else {
578
      mError("stream:%s not exist, failed to stop stream", pauseReq.name);
×
579
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
580
    }
581
  }
582

583
  int64_t streamId = pStream->pCreate->streamId;
×
584
  
585
  mstsInfo("start to stop stream %s", pauseReq.name);
×
586

587
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
×
588
  if (code != TSDB_CODE_SUCCESS) {
×
589
    mstsError("user %s failed to stop stream %s since %s", pReq->info.conn.user, pauseReq.name, tstrerror(code));
×
590
    sdbRelease(pMnode->pSdb, pStream);
×
591
    return code;
×
592
  }
593

594
  if (atomic_load_8(&pStream->userDropped)) {
×
595
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
596
    mstsError("user %s failed to stop stream %s since %s", pReq->info.conn.user, pauseReq.name, tstrerror(code));
×
597
    sdbRelease(pMnode->pSdb, pStream);
×
598
    return code;
×
599
  }
600

601
  STrans *pTrans = NULL;
×
602
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, &pTrans);
×
603
  if (pTrans == NULL || code) {
×
604
    mstsError("failed to stop stream %s since %s", pauseReq.name, tstrerror(code));
×
605
    sdbRelease(pMnode->pSdb, pStream);
×
606
    return code;
×
607
  }
608

609
  pStream->updateTime = taosGetTimestampMs();
×
610

611
  atomic_store_8(&pStream->userStopped, 1);
×
612

613
  MND_STREAM_SET_LAST_TS(STM_EVENT_STOP_STREAM, pStream->updateTime);
×
614

615
  msmUndeployStream(pMnode, streamId, pStream->pCreate->name);
×
616

617
  // stop stream
618
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
×
619
  if (code != TSDB_CODE_SUCCESS) {
×
620
    sdbRelease(pMnode->pSdb, pStream);
×
621
    mndTransDrop(pTrans);
×
622
    return code;
×
623
  }
624

625
  code = mndTransPrepare(pMnode, pTrans);
×
626
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
627
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
×
628
    sdbRelease(pMnode->pSdb, pStream);
×
629
    mndTransDrop(pTrans);
×
630
    return code;
×
631
  }
632

633
  sdbRelease(pMnode->pSdb, pStream);
×
634
  mndTransDrop(pTrans);
×
635

636
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
637
}
638

639

640
static int32_t mndProcessStartStreamReq(SRpcMsg *pReq) {
×
641
  SMnode     *pMnode = pReq->info.node;
×
642
  SStreamObj *pStream = NULL;
×
643
  int32_t     code = 0;
×
644

645
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
646
    return code;
×
647
  }
648

649
  SMResumeStreamReq resumeReq = {0};
×
650
  if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
×
651
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
652
  }
653

654
  code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
×
655
  if (pStream == NULL || code != 0) {
×
656
    if (resumeReq.igNotExists) {
×
657
      mInfo("stream:%s not exist, not start stream", resumeReq.name);
×
658
      sdbRelease(pMnode->pSdb, pStream);
×
659
      return 0;
×
660
    } else {
661
      mError("stream:%s not exist, failed to start stream", resumeReq.name);
×
662
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
663
    }
664
  }
665

666
  int64_t streamId = pStream->pCreate->streamId;
×
667

668
  mstsInfo("start to start stream %s from stopped", resumeReq.name);
×
669

670
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
×
671
  if (code != TSDB_CODE_SUCCESS) {
×
672
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, resumeReq.name, tstrerror(code));
×
673
    sdbRelease(pMnode->pSdb, pStream);
×
674
    return code;
×
675
  }
676

677
  if (atomic_load_8(&pStream->userDropped)) {
×
678
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
679
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, resumeReq.name, tstrerror(code));
×
680
    sdbRelease(pMnode->pSdb, pStream);
×
681
    return code;
×
682
  }
683

684
  if (0 == atomic_load_8(&pStream->userStopped)) {
×
685
    code = TSDB_CODE_MND_STREAM_NOT_STOPPED;
×
686
    mstsError("user %s failed to start stream %s since %s", pReq->info.conn.user, resumeReq.name, tstrerror(code));
×
687
    sdbRelease(pMnode->pSdb, pStream);
×
688
    return code;
×
689
  }
690
  
691
  atomic_store_8(&pStream->userStopped, 0);
×
692

693
  pStream->updateTime = taosGetTimestampMs();
×
694

695
  MND_STREAM_SET_LAST_TS(STM_EVENT_START_STREAM, pStream->updateTime);
×
696

697
  STrans *pTrans = NULL;
×
698
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_START_NAME, &pTrans);
×
699
  if (pTrans == NULL || code) {
×
700
    mstsError("failed to start stream %s since %s", resumeReq.name, tstrerror(code));
×
701
    sdbRelease(pMnode->pSdb, pStream);
×
702
    return code;
×
703
  }
704

705
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
×
706
  if (code != TSDB_CODE_SUCCESS) {
×
707
    mstsError("failed to start stream %s since %s", resumeReq.name, tstrerror(code));
×
708
    sdbRelease(pMnode->pSdb, pStream);
×
709
    mndTransDrop(pTrans);
×
710
    return code;
×
711
  }
712

713
  code = mndTransPrepare(pMnode, pTrans);
×
714
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
715
    mstsError("trans:%d, failed to prepare start stream %s trans since %s", pTrans->id, resumeReq.name, tstrerror(code));
×
716
    sdbRelease(pMnode->pSdb, pStream);
×
717
    mndTransDrop(pTrans);
×
718
    return code;
×
719
  }
720

721
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, true, STREAM_ACT_DEPLOY);
×
722

723
  sdbRelease(pMnode->pSdb, pStream);
×
724
  mndTransDrop(pTrans);
×
725

726
  return TSDB_CODE_ACTION_IN_PROGRESS;
×
727
}
728

729

730
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
×
731
  SMnode     *pMnode = pReq->info.node;
×
732
  SStreamObj *pStream = NULL;
×
733
  int32_t     code = 0;
×
734

735
  SMDropStreamReq dropReq = {0};
×
736
  if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
×
737
    mError("invalid drop stream msg recv, discarded");
×
738
    code = TSDB_CODE_INVALID_MSG;
×
739
    TAOS_RETURN(code);
×
740
  }
741

742
  mDebug("recv drop stream:%s msg", dropReq.name);
×
743

744
  code = mndAcquireStream(pMnode, dropReq.name, &pStream);
×
745
  if (pStream == NULL || code != 0) {
×
746
    if (dropReq.igNotExists) {
×
747
      mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
×
748
      sdbRelease(pMnode->pSdb, pStream);
×
749
      tFreeMDropStreamReq(&dropReq);
×
750
      return 0;
×
751
    } else {
752
      mError("stream:%s not exist failed to drop it", dropReq.name);
×
753
      tFreeMDropStreamReq(&dropReq);
×
754
      TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
755
    }
756
  }
757

758
  int64_t streamId = pStream->pCreate->streamId;
×
759

760
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
×
761
  if (code != 0) {
×
762
    mstsError("user %s failed to drop stream %s since %s", pReq->info.conn.user, dropReq.name, tstrerror(code));
×
763
    sdbRelease(pMnode->pSdb, pStream);
×
764
    tFreeMDropStreamReq(&dropReq);
×
765
    return code;
×
766
  }
767

768
  if (pStream->pCreate->tsmaId != 0) {
×
769
    mstsDebug("try to drop tsma related stream, tsmaId:%" PRIx64, pStream->pCreate->tsmaId);
×
770

771
    void    *pIter = NULL;
×
772
    SSmaObj *pSma = NULL;
×
773
    pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
774
    while (pIter) {
×
775
      if (pSma && pSma->uid == pStream->pCreate->tsmaId) {
×
776
        sdbRelease(pMnode->pSdb, pSma);
×
777
        sdbRelease(pMnode->pSdb, pStream);
×
778

779
        sdbCancelFetch(pMnode->pSdb, pIter);
×
780
        tFreeMDropStreamReq(&dropReq);
×
781
        code = TSDB_CODE_TSMA_MUST_BE_DROPPED;
×
782

783
        mstsError("refused to drop tsma-related stream %s since tsma still exists", dropReq.name);
×
784
        TAOS_RETURN(code);
×
785
      }
786

787
      if (pSma) {
×
788
        sdbRelease(pMnode->pSdb, pSma);
×
789
      }
790

791
      pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
×
792
    }
793
  }
794

795
  mstsInfo("start to drop stream %s", pStream->pCreate->name);
×
796

797
  pStream->updateTime = taosGetTimestampMs();
×
798

799
  atomic_store_8(&pStream->userDropped, 1);
×
800

801
  MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime);
×
802

803
  msmUndeployStream(pMnode, streamId, pStream->pCreate->name);
×
804

805
  STrans *pTrans = NULL;
×
806
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, &pTrans);
×
807
  if (pTrans == NULL || code) {
×
808
    mstsError("failed to drop stream %s since %s", dropReq.name, tstrerror(code));
×
809
    sdbRelease(pMnode->pSdb, pStream);
×
810
    tFreeMDropStreamReq(&dropReq);
×
811
    TAOS_RETURN(code);
×
812
  }
813

814
  // drop stream
815
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED);
×
816
  if (code) {
×
817
    mstsError("trans:%d, failed to append drop stream trans since %s", pTrans->id, tstrerror(code));
×
818
    sdbRelease(pMnode->pSdb, pStream);
×
819
    mndTransDrop(pTrans);
×
820
    tFreeMDropStreamReq(&dropReq);
×
821
    TAOS_RETURN(code);
×
822
  }
823

824
  code = mndTransPrepare(pMnode, pTrans);
×
825
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
826
    mstsError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
×
827
    sdbRelease(pMnode->pSdb, pStream);
×
828
    mndTransDrop(pTrans);
×
829
    tFreeMDropStreamReq(&dropReq);
×
830
    TAOS_RETURN(code);
×
831
  }
832

833
  auditRecord(pReq, pMnode->clusterId, "dropStream", "", pStream->pCreate->streamDB, NULL, 0);
×
834

835
  sdbRelease(pMnode->pSdb, pStream);
×
836
  mndTransDrop(pTrans);
×
837

838
  mstsDebug("drop stream %s half completed", dropReq.name);
×
839
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
840

841
  tFreeMDropStreamReq(&dropReq);
×
842
  
843
  TAOS_RETURN(code);
×
844
}
845

846
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
×
847
  SMnode     *pMnode = pReq->info.node;
×
848
  SStreamObj *pStream = NULL;
×
849
  SStreamObj  streamObj = {0};
×
850
  int32_t     code = TSDB_CODE_SUCCESS;
×
851
  int32_t     lino = 0;
×
852
  STrans     *pTrans = NULL;
×
853
  uint64_t    streamId = 0;
×
854
  SCMCreateStreamReq* pCreate = NULL;
×
855

856
#ifdef WINDOWS
857
  code = TSDB_CODE_MND_INVALID_PLATFORM;
858
  goto _OVER;
859
#endif
860

861
  pCreate = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
×
862
  TSDB_CHECK_NULL(pCreate, code, lino, _OVER, terrno);
×
863
  
864
  code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, pCreate);
×
865
  TSDB_CHECK_CODE(code, lino, _OVER);
×
866

867
  streamId = pCreate->streamId;
×
868

869
  mstsInfo("start to create stream %s, sql:%s", pCreate->name, pCreate->sql);
×
870

871
  int32_t snodeId = msmAssignRandomSnodeId(pMnode, streamId);
×
872
  if (!GOT_SNODE(snodeId)) {
×
873
    code = terrno;
×
874
    TSDB_CHECK_CODE(code, lino, _OVER);
×
875
  }
876
  
877
  code = mndAcquireStream(pMnode, pCreate->name, &pStream);
×
878
  if (pStream != NULL && code == 0) {
×
879
    if (pCreate->igExists) {
×
880
      mstsInfo("stream %s already exist, ignore exist is set", pCreate->name);
×
881
    } else {
882
      code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
×
883
    }
884

885
    mndReleaseStream(pMnode, pStream);
×
886
    goto _OVER;
×
887
  } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
×
888
    goto _OVER;
×
889
  }
890

891
  if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) {
×
892
    goto _OVER;
×
893
  }
894

895
  code = mndStreamValidateCreate(pMnode, pReq->info.conn.user, pCreate);
×
896
  TSDB_CHECK_CODE(code, lino, _OVER);
×
897

898
  code = mndStreamBuildObj(pMnode, &streamObj, pCreate, snodeId);
×
899
  TSDB_CHECK_CODE(code, lino, _OVER);
×
900

901
  pStream = &streamObj;
×
902

903
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, &pTrans);
×
904
  if (pTrans == NULL || code) {
×
905
    goto _OVER;
×
906
  }
907

908
  // create stb for stream
909
  if (TSDB_SUPER_TABLE == pCreate->outTblType && !pCreate->outStbExists) {
×
910
    pCreate->outStbUid = mndGenerateUid(pCreate->outTblName, strlen(pCreate->outTblName));
×
911
    code = mndStreamCreateOutStb(pMnode, pTrans, pStream->pCreate, pReq->info.conn.user);
×
912
    TSDB_CHECK_CODE(code, lino, _OVER);
×
913
  }
914

915
  // add stream to trans
916
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
×
917
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
918
    mstsError("failed to persist stream %s since %s", pCreate->name, tstrerror(code));
×
919
    goto _OVER;
×
920
  }
921

922
  // execute creation
923
  code = mndTransPrepare(pMnode, pTrans);
×
924
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
925
    mstsError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
×
926
    goto _OVER;
×
927
  }
928

929
  auditRecord(pReq, pMnode->clusterId, "createStream", pCreate->streamDB, pCreate->name, pCreate->sql, strlen(pCreate->sql));
×
930

931
  MND_STREAM_SET_LAST_TS(STM_EVENT_CREATE_STREAM, taosGetTimestampMs());
×
932

933
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, true, STREAM_ACT_DEPLOY);
×
934

935
_OVER:
×
936

937
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
938
    mstsError("failed to create stream %s at line:%d since %s", pCreate ? pCreate->name : "unknown", lino, tstrerror(code));
×
939
  } else {
940
    mstsDebug("create stream %s half completed", pCreate ? pCreate->name : "unknown");
×
941
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
942
  }
943

944
  mndTransDrop(pTrans);
×
945
  tFreeStreamObj(&streamObj);
×
946

947
  return code;
×
948
}
949

950
static int32_t mndProcessRecalcStreamReq(SRpcMsg *pReq) {
×
951
  SMnode     *pMnode = pReq->info.node;
×
952
  SStreamObj *pStream = NULL;
×
953
  int32_t     code = 0;
×
954

955
  SMRecalcStreamReq recalcReq = {0};
×
956
  if (tDeserializeSMRecalcStreamReq(pReq->pCont, pReq->contLen, &recalcReq) < 0) {
×
957
    tFreeMRecalcStreamReq(&recalcReq);
×
958
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
959
  }
960

961
  code = mndAcquireStream(pMnode, recalcReq.name, &pStream);
×
962
  if (pStream == NULL || code != 0) {
×
963
    mError("stream:%s not exist, failed to recalc stream", recalcReq.name);
×
964
    tFreeMRecalcStreamReq(&recalcReq);
×
965
    TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
966
  }
967

968
  int64_t streamId = pStream->pCreate->streamId;
×
969
  
970
  mstsInfo("start to recalc stream %s", recalcReq.name);
×
971

972
  code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->pCreate->streamDB);
×
973
  if (code != TSDB_CODE_SUCCESS) {
×
974
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
975
    sdbRelease(pMnode->pSdb, pStream);
×
976
    tFreeMRecalcStreamReq(&recalcReq);
×
977
    return code;
×
978
  }
979

980
  if (atomic_load_8(&pStream->userDropped)) {
×
981
    code = TSDB_CODE_MND_STREAM_DROPPING;
×
982
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
983
    sdbRelease(pMnode->pSdb, pStream);
×
984
    tFreeMRecalcStreamReq(&recalcReq);
×
985
    return code;
×
986
  }
987

988
  if (atomic_load_8(&pStream->userStopped)) {
×
989
    code = TSDB_CODE_MND_STREAM_STOPPED;
×
990
    mstsError("user %s failed to recalc stream %s since %s", pReq->info.conn.user, recalcReq.name, tstrerror(code));
×
991
    sdbRelease(pMnode->pSdb, pStream);
×
992
    tFreeMRecalcStreamReq(&recalcReq);
×
993
    return code;
×
994
  }
995

996
  if (WINDOW_TYPE_PERIOD == pStream->pCreate->triggerType) {
×
997
    code = TSDB_CODE_OPS_NOT_SUPPORT;
×
998
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
×
999
    sdbRelease(pMnode->pSdb, pStream);
×
1000
    tFreeMRecalcStreamReq(&recalcReq);
×
1001
    return code;
×
1002
  }
1003

1004
  /*
1005
  pStream->updateTime = taosGetTimestampMs();
1006

1007
  STrans *pTrans = NULL;
1008
  code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RECALC_NAME, &pTrans);
1009
  if (pTrans == NULL || code) {
1010
    mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code));
1011
    sdbRelease(pMnode->pSdb, pStream);
1012
    return code;
1013
  }
1014

1015
  // stop stream
1016
  code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY);
1017
  if (code != TSDB_CODE_SUCCESS) {
1018
    sdbRelease(pMnode->pSdb, pStream);
1019
    mndTransDrop(pTrans);
1020
    return code;
1021
  }
1022

1023
  code = mndTransPrepare(pMnode, pTrans);
1024
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1025
    mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code));
1026
    sdbRelease(pMnode->pSdb, pStream);
1027
    mndTransDrop(pTrans);
1028
    return code;
1029
  }
1030
*/
1031

1032
  code = msmRecalcStream(pMnode, pStream->pCreate->streamId, &recalcReq.timeRange);
×
1033
  if (code != TSDB_CODE_SUCCESS) {
×
1034
    sdbRelease(pMnode->pSdb, pStream);
×
1035
    tFreeMRecalcStreamReq(&recalcReq);
×
1036
    return code;
×
1037
  }
1038
  
1039
  char buf[128];
1040
  snprintf(buf, sizeof(buf), "start:%" PRId64 ", end:%" PRId64, recalcReq.timeRange.skey, recalcReq.timeRange.ekey);
×
1041
  auditRecord(pReq, pMnode->clusterId, "recalcStream", pStream->name, recalcReq.name, buf, strlen(buf));
×
1042

1043
  sdbRelease(pMnode->pSdb, pStream);
×
1044
  tFreeMRecalcStreamReq(&recalcReq);
×
1045
//  mndTransDrop(pTrans);
1046

1047
  return TSDB_CODE_SUCCESS;
×
1048
}
1049

1050

1051
int32_t mndInitStream(SMnode *pMnode) {
15✔
1052
  SSdbTable table = {
15✔
1053
      .sdbType = SDB_STREAM,
1054
      .keyType = SDB_KEY_BINARY,
1055
      .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
1056
      .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
1057
      .insertFp = (SdbInsertFp)mndStreamActionInsert,
1058
      .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
1059
      .deleteFp = (SdbDeleteFp)mndStreamActionDelete,
1060
  };
1061
/*
1062
  SSdbTable tableSeq = {
1063
      .sdbType = SDB_STREAM_SEQ,
1064
      .keyType = SDB_KEY_BINARY,
1065
      .encodeFp = (SdbEncodeFp)mndStreamSeqActionEncode,
1066
      .decodeFp = (SdbDecodeFp)mndStreamSeqActionDecode,
1067
      .insertFp = (SdbInsertFp)mndStreamSeqActionInsert,
1068
      .updateFp = (SdbUpdateFp)mndStreamSeqActionUpdate,
1069
      .deleteFp = (SdbDeleteFp)mndStreamSeqActionDelete,
1070
  };
1071
*/
1072
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
15✔
1073
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
15✔
1074
  mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessStartStreamReq);
15✔
1075
  mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessStopStreamReq);
15✔
1076
  mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);  
15✔
1077
  mndSetMsgHandle(pMnode, TDMT_MND_RECALC_STREAM, mndProcessRecalcStreamReq);
15✔
1078

1079
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
15✔
1080
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
15✔
1081
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
15✔
1082
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
15✔
1083
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndRetrieveStreamRecalculates);
15✔
1084
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndCancelGetNextStreamRecalculates);
15✔
1085

1086
  int32_t code = sdbSetTable(pMnode->pSdb, table);
15✔
1087
  if (code) {
15!
1088
    return code;
×
1089
  }
1090

1091
  //code = sdbSetTable(pMnode->pSdb, tableSeq);
1092
  return code;
15✔
1093
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc