• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4788

14 Oct 2025 11:21AM UTC coverage: 60.992% (-2.3%) from 63.264%
#4788

push

travis-ci

web-flow
Merge 7ca9b50f9 into 19574fe21

154868 of 324306 branches covered (47.75%)

Branch coverage included in aggregate %.

207304 of 269498 relevant lines covered (76.92%)

125773493.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.01
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23
#include "mndSnode.h"
24

25
bool mstWaitLock(SRWLatch* pLock, bool readLock) {
21,263,380✔
26
  if (readLock) {
21,263,380✔
27
    while (taosRTryLockLatch(pLock)) {
19,422,237✔
28
      taosMsleep(1);
534,252✔
29
    }
30

31
    return true;
18,887,985✔
32
  }
33

34
  taosWWaitLockLatch(pLock);
2,375,395✔
35

36
  return true;
2,375,395✔
37
}
38

39
void mstDestroySStmVgStreamStatus(void* p) { 
443,651✔
40
  SStmVgStreamStatus* pStatus = (SStmVgStreamStatus*)p;
443,651✔
41
  taosArrayDestroy(pStatus->trigReaders); 
443,651✔
42
  taosArrayDestroy(pStatus->calcReaders); 
443,651✔
43
}
443,651✔
44

45
void mstDestroySStmSnodeStreamStatus(void* p) { 
220,469✔
46
  SStmSnodeStreamStatus* pStatus = (SStmSnodeStreamStatus*)p;
220,469✔
47
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
881,876✔
48
    taosArrayDestroy(pStatus->runners[i]);
661,407✔
49
    pStatus->runners[i] = NULL;
661,407✔
50
  }
51
}
220,469✔
52

53

54
void mstDestroyVgroupStatus(SStmVgroupStatus* pVgStatus) {
×
55
  taosHashCleanup(pVgStatus->streamTasks);
×
56
  pVgStatus->streamTasks = NULL;
×
57
}
×
58

59
void mstDestroySStmTaskToDeployExt(void* param) {
1,428,016✔
60
  SStmTaskToDeployExt* pExt = (SStmTaskToDeployExt*)param;
1,428,016✔
61
  if (pExt->deployed) {
1,428,016✔
62
    return;
1,424,632✔
63
  }
64
  
65
  switch (pExt->deploy.task.type) {
3,384✔
66
    case STREAM_TRIGGER_TASK:
564✔
67
      taosArrayDestroy(pExt->deploy.msg.trigger.readerList);
564✔
68
      pExt->deploy.msg.trigger.readerList = NULL;
564✔
69
      taosArrayDestroy(pExt->deploy.msg.trigger.runnerList);
564✔
70
      pExt->deploy.msg.trigger.runnerList = NULL;
564✔
71
      break;
564✔
72
    case STREAM_RUNNER_TASK:
1,692✔
73
      taosMemoryFreeClear(pExt->deploy.msg.runner.pPlan);
1,692!
74
      break;
1,692✔
75
    default:  
1,128✔
76
      break;;
1,128✔
77
  }
78
}
79

80
void mstDestroyScanAddrList(void* param) {
349,614✔
81
  if (NULL == param) {
349,614!
82
    return;
×
83
  }
84
  SArray* pList = *(SArray**)param;
349,614✔
85
  taosArrayDestroy(pList);
349,614✔
86
}
87

88
void mstDestroySStmSnodeTasksDeploy(void* param) {
57,366✔
89
  SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)param;
57,366✔
90
  taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
57,366✔
91
  taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
57,366✔
92
}
57,366✔
93

94
void mstDestroySStmVgTasksToDeploy(void* param) {
153,415✔
95
  SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)param;
153,415✔
96
  taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
153,415✔
97
}
153,415✔
98

99
void mstDestroySStmSnodeStatus(void* param) {
51,317✔
100
  SStmSnodeStatus* pSnode = (SStmSnodeStatus*)param;
51,317✔
101
  taosHashCleanup(pSnode->streamTasks);
51,317✔
102
}
51,317✔
103

104
void mstDestroySStmVgroupStatus(void* param) {
116,083✔
105
  SStmVgroupStatus* pVg = (SStmVgroupStatus*)param;
116,083✔
106
  taosHashCleanup(pVg->streamTasks);
116,083✔
107
}
116,083✔
108

109
void mstResetSStmStatus(SStmStatus* pStatus) {
209,661✔
110
  taosArrayDestroy(pStatus->trigReaders);
209,661✔
111
  pStatus->trigReaders = NULL;
209,661✔
112
  taosArrayDestroy(pStatus->trigOReaders);
209,661✔
113
  pStatus->trigOReaders = NULL;
209,661✔
114
  pStatus->calcReaders = tdListFree(pStatus->calcReaders);
209,661✔
115
  if (pStatus->triggerTask) {
209,661✔
116
    (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, false);
208,137✔
117
    taosMemoryFreeClear(pStatus->triggerTask->detailStatus);
208,137!
118
    taosWUnLockLatch(&pStatus->triggerTask->detailStatusLock);
208,137✔
119
  }
120
  taosMemoryFreeClear(pStatus->triggerTask);
209,661!
121
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
838,644✔
122
    taosArrayDestroy(pStatus->runners[i]);
628,983✔
123
    pStatus->runners[i] = NULL;
628,983✔
124
  }
125
}
209,661✔
126

127
void mstDestroySStmStatus(void* param) {
201,835✔
128
  SStmStatus* pStatus = (SStmStatus*)param;
201,835✔
129
  taosMemoryFreeClear(pStatus->streamName);
201,835!
130

131
  mstResetSStmStatus(pStatus);
201,835✔
132

133
  taosWLockLatch(&pStatus->userRecalcLock);
201,835✔
134
  taosArrayDestroy(pStatus->userRecalcList);
201,835✔
135
  taosWUnLockLatch(&pStatus->userRecalcLock);
201,835✔
136

137
  tFreeSCMCreateStreamReq(pStatus->pCreate);
201,835✔
138
  taosMemoryFreeClear(pStatus->pCreate);  
201,835!
139
}
201,835✔
140

141
void mstDestroySStmAction(void* param) {
306,732✔
142
  SStmAction* pAction = (SStmAction*)param;
306,732✔
143

144
  taosArrayDestroy(pAction->undeploy.taskList);
306,732✔
145
  taosArrayDestroy(pAction->recalc.recalcList);
306,732✔
146
}
306,732✔
147

148
void mstClearSStmStreamDeploy(SStmStreamDeploy* pDeploy) {
280,103✔
149
  pDeploy->readerTasks = NULL;
280,103✔
150
  pDeploy->triggerTask = NULL;
280,103✔
151
  pDeploy->runnerTasks = NULL;
280,103✔
152
}
280,103✔
153

154
int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped) {
84,251✔
155
  SSdb   *pSdb = pMnode->pSdb;
84,251✔
156
  void   *pIter = NULL;
84,251✔
157
  
158
  while (1) {
356,253✔
159
    SStreamObj *pStream = NULL;
440,504✔
160
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
440,504✔
161
    if (pIter == NULL) break;
440,504✔
162

163
    if (pStream->pCreate->streamId == streamId) {
398,204✔
164
      *dropped = pStream->userDropped ? true : false;
41,951✔
165
      sdbRelease(pSdb, pStream);
41,951✔
166
      sdbCancelFetch(pSdb, pIter);
41,951✔
167
      mstsDebug("stream found, dropped:%d", *dropped);
41,951!
168
      return TSDB_CODE_SUCCESS;
41,951✔
169
    }
170
    
171
    sdbRelease(pSdb, pStream);
356,253✔
172
  }
173

174
  *dropped = true;
42,300✔
175

176
  return TSDB_CODE_SUCCESS;
42,300✔
177
}
178

179
typedef struct SStmCheckDbInUseCtx {
180
  bool* dbStream;
181
  bool* vtableStream;
182
  bool  ignoreCurrDb;
183
} SStmCheckDbInUseCtx;
184

185
static bool mstChkSetDbInUse(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
79,186✔
186
  SStreamObj *pStream = pObj;
79,186✔
187
  if (atomic_load_8(&pStream->userDropped)) {
79,186!
188
    return true;
×
189
  }
190

191
  SStmCheckDbInUseCtx* pCtx = (SStmCheckDbInUseCtx*)p2;
79,186✔
192
  if (pCtx->ignoreCurrDb && 0 == strcmp(pStream->pCreate->streamDB, p1)) {
79,186!
193
    return true;
5,912✔
194
  }
195
  
196
  if (pStream->pCreate->triggerDB && 0 == strcmp(pStream->pCreate->triggerDB, p1)) {
73,274!
197
    *pCtx->dbStream = true;
1,634✔
198
    return false;
1,634✔
199
  }
200

201
  int32_t calcDBNum = taosArrayGetSize(pStream->pCreate->calcDB);
71,640✔
202
  for (int32_t i = 0; i < calcDBNum; ++i) {
141,646✔
203
    char* calcDB = taosArrayGetP(pStream->pCreate->calcDB, i);
71,640✔
204
    if (0 == strcmp(calcDB, p1)) {
71,640!
205
      *pCtx->dbStream = true;
1,634✔
206
      return false;
1,634✔
207
    }
208
  }
209

210
  if (pStream->pCreate->vtableCalc || STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
70,006!
211
    *pCtx->vtableStream = true;
18,394✔
212
    return true;
18,394✔
213
  }
214
  
215
  return true;
51,612✔
216
}
217

218
void mstCheckDbInUse(SMnode *pMnode, char *dbFName, bool *dbStream, bool *vtableStream, bool ignoreCurrDb) {
727,551✔
219
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
727,551✔
220
  if (streamNum <= 0) {
727,551✔
221
    return;
714,311✔
222
  }
223

224
  SStmCheckDbInUseCtx ctx = {dbStream, vtableStream, ignoreCurrDb};
13,240✔
225
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mstChkSetDbInUse, dbFName, &ctx, NULL);
13,240✔
226
}
227

228
static void mstShowStreamStatus(char *dst, int8_t status, int32_t bufLen) {
×
229
  if (status == STREAM_STATUS_INIT) {
×
230
    tstrncpy(dst, "init", bufLen);
×
231
  } else if (status == STREAM_STATUS_RUNNING) {
×
232
    tstrncpy(dst, "running", bufLen);
×
233
  } else if (status == STREAM_STATUS_STOPPED) {
×
234
    tstrncpy(dst, "stopped", bufLen);
×
235
  } else if (status == STREAM_STATUS_FAILED) {
×
236
    tstrncpy(dst, "failed", bufLen);
×
237
  }
238
}
×
239

240
int32_t mstCheckSnodeExists(SMnode *pMnode) {
×
241
  SSdb      *pSdb = pMnode->pSdb;
×
242
  void      *pIter = NULL;
×
243
  SSnodeObj *pObj = NULL;
×
244

245
  while (1) {
246
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
247
    if (pIter == NULL) {
×
248
      break;
×
249
    }
250

251
    sdbRelease(pSdb, pObj);
×
252
    sdbCancelFetch(pSdb, pIter);
×
253
    return TSDB_CODE_SUCCESS;
×
254
  }
255

256
  return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
257
}
258

259
void mstSetTaskStatusFromMsg(SStmGrpCtx* pCtx, SStmTaskStatus* pTask, SStmTaskStatusMsg* pMsg) {
×
260
  pTask->id.taskId = pMsg->taskId;
×
261
  pTask->id.deployId = pMsg->deployId;
×
262
  pTask->id.seriousId = pMsg->seriousId;
×
263
  pTask->id.nodeId = pMsg->nodeId;
×
264
  pTask->id.taskIdx = pMsg->taskIdx;
×
265

266
  pTask->type = pMsg->type;
×
267
  pTask->flags = pMsg->flags;
×
268
  pTask->status = pMsg->status;
×
269
  pTask->lastUpTs = pCtx->currTs;
×
270
}
×
271

272
bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param) {
762,393✔
273
  while (0 == atomic_load_64(&pQueue->qRemainNum)) {
762,393✔
274
    return false;
546,992✔
275
  }
276

277
  SStmQNode *orig = pQueue->head;
215,401✔
278

279
  SStmQNode *node = pQueue->head->next;
215,401✔
280
  pQueue->head = pQueue->head->next;
215,401✔
281

282
  *param = node;
215,401✔
283

284
  taosMemoryFreeClear(orig);
215,401!
285

286
  (void)atomic_sub_fetch_64(&pQueue->qRemainNum, 1);
215,401✔
287

288
  return true;
215,401✔
289
}
290

291
void mndStreamActionEnqueue(SStmActionQ* pQueue, SStmQNode* param) {
215,401✔
292
  taosWLockLatch(&pQueue->lock);
215,401✔
293
  pQueue->tail->next = param;
215,401✔
294
  pQueue->tail = param;
215,401✔
295
  taosWUnLockLatch(&pQueue->lock);
215,401✔
296

297
  (void)atomic_add_fetch_64(&pQueue->qRemainNum, 1);
215,401✔
298
}
215,401✔
299

300
char* mstGetStreamActionString(int32_t action) {
110,967✔
301
  switch (action) {
110,967!
302
    case STREAM_ACT_DEPLOY:
110,967✔
303
      return "DEPLOY";
110,967✔
304
    case STREAM_ACT_UNDEPLOY:
×
305
      return "UNDEPLOY";
×
306
    case STREAM_ACT_START:
×
307
      return "START";
×
308
    case STREAM_ACT_UPDATE_TRIGGER:
×
309
      return "UPDATE TRIGGER";
×
310
    case STREAM_ACT_RECALC:
×
311
      return "USER RECALC";
×
312
    default:
×
313
      break;
×
314
  }
315

316
  return "UNKNOWN";
×
317
}
318

319
void mstPostStreamAction(SStmActionQ*       actionQ, int64_t streamId, char* streamName, void* param, bool userAction, int32_t action) {
211,845✔
320
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
211,845!
321
  if (NULL == pNode) {
211,845!
322
    taosMemoryFreeClear(param);
×
323
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
324
    return;
×
325
  }
326

327
  pNode->type = action;
211,845✔
328
  pNode->streamAct = true;
211,845✔
329
  pNode->action.stream.streamId = streamId;
211,845✔
330
  TAOS_STRCPY(pNode->action.stream.streamName, streamName);
211,845!
331
  pNode->action.stream.userAction = userAction;
211,845✔
332
  pNode->action.stream.actionParam = param;
211,845✔
333
  
334
  pNode->next = NULL;
211,845✔
335

336
  mndStreamActionEnqueue(actionQ, pNode);
211,845✔
337

338
  mstsDebug("stream action %s posted enqueue", mstGetStreamActionString(action));
211,845✔
339
}
340

341
void mstPostTaskAction(SStmActionQ*        actionQ, SStmTaskAction* pAction, int32_t action) {
3,556✔
342
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
3,556!
343
  if (NULL == pNode) {
3,556!
344
    int64_t streamId = pAction->streamId;
×
345
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
346
    return;
×
347
  }
348

349
  pNode->type = action;
3,556✔
350
  pNode->streamAct = false;
3,556✔
351
  pNode->action.task = *pAction;
3,556✔
352
  
353
  pNode->next = NULL;
3,556✔
354

355
  mndStreamActionEnqueue(actionQ, pNode);
3,556✔
356
}
357

358
void mstDestroyDbVgroupsHash(SSHashObj *pDbVgs) {
63,046✔
359
  int32_t iter = 0;
63,046✔
360
  SDBVgHashInfo* pVg = NULL;
63,046✔
361
  void* p = NULL;
63,046✔
362
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
357,207✔
363
    pVg = (SDBVgHashInfo*)p;
294,161✔
364
    taosArrayDestroy(pVg->vgArray);
294,161✔
365
  }
366
  
367
  tSimpleHashCleanup(pDbVgs);
63,046✔
368
}
63,046✔
369

370

371
int32_t mstBuildDBVgroupsMap(SMnode* pMnode, SSHashObj** ppRes) {
63,046✔
372
  void*   pIter = NULL;
63,046✔
373
  int32_t code = TSDB_CODE_SUCCESS;
63,046✔
374
  int32_t lino = 0;
63,046✔
375
  SArray* pTarget = NULL;
63,046✔
376
  SArray* pNew = NULL;
63,046✔
377
  SDbObj* pDb = NULL;
63,046✔
378
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
63,046✔
379
  SVgObj* pVgroup = NULL;
63,046✔
380

381
  SSHashObj* pDbVgroup = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
63,046✔
382
  TSDB_CHECK_NULL(pDbVgroup, code, lino, _exit, terrno);
63,046!
383

384
  while (1) {
413,552✔
385
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
476,598✔
386
    if (pIter == NULL) {
476,598✔
387
      break;
63,046✔
388
    }
389

390
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
413,552!
391
    if (NULL == pDbInfo) {
413,552✔
392
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
294,161✔
393
      if (NULL == pNew) {
294,161!
394
        sdbRelease(pMnode->pSdb, pVgroup);
×
395
        sdbCancelFetch(pMnode->pSdb, pIter);
×
396
        pVgroup = NULL;
×
397
        TSDB_CHECK_NULL(pNew, code, lino, _exit, terrno);
×
398
      }
399
      
400
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
294,161✔
401
      if (NULL == pDb) {
294,161!
402
        sdbRelease(pMnode->pSdb, pVgroup);
×
403
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
404
        pVgroup = NULL;
×
405
        TSDB_CHECK_NULL(pDb, code, lino, _exit, terrno);
×
406
      }
407
      dbInfo.vgSorted = false;
294,161✔
408
      dbInfo.hashMethod = pDb->cfg.hashMethod;
294,161✔
409
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
294,161✔
410
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
294,161✔
411
      dbInfo.vgArray = pNew;
294,161✔
412
      
413
      mndReleaseDb(pMnode, pDb);
294,161✔
414

415
      pTarget = pNew;
294,161✔
416
    } else {
417
      pTarget = pDbInfo->vgArray;
119,391✔
418
    }
419

420
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
413,552✔
421
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
413,552!
422
      sdbRelease(pMnode->pSdb, pVgroup);
×
423
      sdbCancelFetch(pMnode->pSdb, pIter);      
×
424
      pVgroup = NULL;
×
425
      TSDB_CHECK_NULL(NULL, code, lino, _exit, terrno);
×
426
    }
427

428
    if (NULL == pDbInfo) {
413,552✔
429
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
294,161!
430
      if (code) {
294,161!
431
        sdbRelease(pMnode->pSdb, pVgroup);
×
432
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
433
        pVgroup = NULL;
×
434
        TAOS_CHECK_EXIT(code);
×
435
      }
436
      pNew = NULL;
294,161✔
437
    }
438

439
    sdbRelease(pMnode->pSdb, pVgroup);
413,552✔
440
    pVgroup = NULL;
413,552✔
441
  }
442

443
  *ppRes = pDbVgroup;
63,046✔
444
  
445
_exit:
63,046✔
446

447
  taosArrayDestroy(pNew);
63,046✔
448

449
  if (code) {
63,046!
450
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
451
  }
452

453
  return code;
63,046✔
454
}
455

456
int mstDbVgInfoComp(const void* lp, const void* rp) {
87,828✔
457
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
87,828✔
458
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
87,828✔
459
  if (pLeft->hashBegin < pRight->hashBegin) {
87,828!
460
    return -1;
87,828✔
461
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
462
    return 1;
×
463
  }
464

465
  return 0;
×
466
}
467

468
int32_t mstTableHashValueComp(void const* lp, void const* rp) {
219,248✔
469
  uint32_t*    key = (uint32_t*)lp;
219,248✔
470
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
219,248✔
471

472
  if (*key < pVg->hashBegin) {
219,248!
473
    return -1;
×
474
  } else if (*key > pVg->hashEnd) {
219,248✔
475
    return 1;
54,071✔
476
  }
477

478
  return 0;
165,177✔
479
}
480

481

482
int32_t mstGetTableVgId(SSHashObj* pDbVgroups, char* dbFName, char *tbName, int32_t* vgId) {
165,177✔
483
  int32_t code = 0;
165,177✔
484
  int32_t lino = 0;
165,177✔
485
  SVgroupInfo* vgInfo = NULL;
165,177✔
486
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
165,177✔
487

488
  SDBVgHashInfo* dbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, dbFName, strlen(dbFName) + 1);
165,177!
489
  if (NULL == dbInfo) {
165,177!
490
    mstError("db %s does not exist", dbFName);
×
491
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
492
  }
493
  
494
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
165,177✔
495
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
330,354!
496
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
165,177!
497

498
  if (!dbInfo->vgSorted) {
165,177!
499
    taosArraySort(dbInfo->vgArray, mstDbVgInfoComp);
63,046✔
500
    dbInfo->vgSorted = true;
63,046✔
501
  }
502

503
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, mstTableHashValueComp, TD_EQ);
165,177✔
504
  if (NULL == vgInfo) {
165,177!
505
    mstError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
506
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
507
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
508
  }
509

510
  *vgId = vgInfo->vgId;
165,177✔
511

512
_exit:
165,177✔
513

514
  if (code) {
165,177!
515
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
516
  }
517

518
  return code;
165,177✔
519
}
520

521

522
void mstLogSStreamObj(char* tips, SStreamObj* p) {
203,965✔
523
  if (!(stDebugFlag & DEBUG_DEBUG)) {
203,965✔
524
    return;
99,796✔
525
  }
526
  
527
  if (NULL == p) {
104,169!
528
    mstDebug("%s: stream is NULL", tips);
×
529
    return;
×
530
  }
531

532
  mstDebug("%s: stream obj", tips);
104,169!
533
  mstDebug("name:%s mainSnodeId:%d userDropped:%d userStopped:%d createTime:%" PRId64 " updateTime:%" PRId64,
104,169!
534
      p->name, p->mainSnodeId, p->userDropped, p->userStopped, p->createTime, p->updateTime);
535

536
  SCMCreateStreamReq* q = p->pCreate;
104,169✔
537
  if (NULL == q) {
104,169!
538
    mstDebug("stream pCreate is NULL");
×
539
    return;
×
540
  }
541

542
  int64_t streamId = q->streamId;
104,169✔
543
  int32_t calcDBNum = taosArrayGetSize(q->calcDB);
104,169✔
544
  int32_t calcScanNum = taosArrayGetSize(q->calcScanPlanList);
104,169✔
545
  int32_t notifyUrlNum = taosArrayGetSize(q->pNotifyAddrUrls);
104,169✔
546
  int32_t outColNum = taosArrayGetSize(q->outCols);
104,169✔
547
  int32_t outTagNum = taosArrayGetSize(q->outTags);
104,169✔
548
  int32_t forceOutColNum = taosArrayGetSize(q->forceOutCols);
104,169✔
549

550
  mstsDebugL("create_info: name:%s sql:%s streamDB:%s triggerDB:%s outDB:%s calcDBNum:%d triggerTblName:%s outTblName:%s "
104,169!
551
      "igExists:%d triggerType:%d igDisorder:%d deleteReCalc:%d deleteOutTbl:%d fillHistory:%d fillHistroyFirst:%d "
552
      "calcNotifyOnly:%d lowLatencyCalc:%d igNoDataTrigger:%d notifyUrlNum:%d notifyEventTypes:%d addOptions:%d notifyHistory:%d "
553
      "outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " "
554
      "triggerTblType:%d triggerTblUid:%" PRIx64 " triggerTblSuid:%" PRIx64 " vtableCalc:%d outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d "
555
      "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d "
556
      "triggerTblVgId:%d outTblVgId:%d calcScanPlanNum:%d forceOutCols:%d",
557
      q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName,
558
      q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst,
559
      q->calcNotifyOnly, q->lowLatencyCalc, q->igNoDataTrigger, notifyUrlNum, q->notifyEventTypes, q->addOptions, q->notifyHistory,
560
      outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime,
561
      q->triggerTblType, q->triggerTblUid, q->triggerTblSuid, q->vtableCalc, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion,
562
      q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId,
563
      q->triggerTblVgId, q->outTblVgId, calcScanNum, forceOutColNum);
564

565
  switch (q->triggerType) {
104,169!
566
    case WINDOW_TYPE_INTERVAL: {
60,572✔
567
      SSlidingTrigger* t = &q->trigger.sliding;
60,572✔
568
      mstsDebug("sliding trigger options, intervalUnit:%d, slidingUnit:%d, offsetUnit:%d, soffsetUnit:%d, precision:%d, interval:%" PRId64 ", offset:%" PRId64 ", sliding:%" PRId64 ", soffset:%" PRId64, 
60,572!
569
          t->intervalUnit, t->slidingUnit, t->offsetUnit, t->soffsetUnit, t->precision, t->interval, t->offset, t->sliding, t->soffset);
570
      break;
60,572✔
571
    }  
572
    case WINDOW_TYPE_SESSION: {
4,042✔
573
      SSessionTrigger* t = &q->trigger.session;
4,042✔
574
      mstsDebug("session trigger options, slotId:%d, sessionVal:%" PRId64, t->slotId, t->sessionVal);
4,042!
575
      break;
4,042✔
576
    }
577
    case WINDOW_TYPE_STATE: {
19,408✔
578
      SStateWinTrigger* t = &q->trigger.stateWin;
19,408✔
579
      mstsDebug("state trigger options, slotId:%d, expr:%s, extend:%d, trueForDuration:%" PRId64, t->slotId, (char *)t->expr, t->extend, t->trueForDuration);
19,408!
580
      break;
19,408✔
581
    }
582
    case WINDOW_TYPE_EVENT:{
10,319✔
583
      SEventTrigger* t = &q->trigger.event;
10,319✔
584
      mstsDebug("event trigger options, startCond:%s, endCond:%s, trueForDuration:%" PRId64, (char*)t->startCond, (char*)t->endCond, t->trueForDuration);
10,319!
585
      break;
10,319✔
586
    }
587
    case WINDOW_TYPE_COUNT: {
5,482✔
588
      SCountTrigger* t = &q->trigger.count;
5,482✔
589
      mstsDebug("count trigger options, countVal:%" PRId64 ", sliding:%" PRId64 ", condCols:%s", t->countVal, t->sliding, (char*)t->condCols);
5,482!
590
      break;
5,482✔
591
    }
592
    case WINDOW_TYPE_PERIOD: {
4,346✔
593
      SPeriodTrigger* t = &q->trigger.period;
4,346✔
594
      mstsDebug("period trigger options, periodUnit:%d, offsetUnit:%d, precision:%d, period:%" PRId64 ", offset:%" PRId64, 
4,346!
595
          t->periodUnit, t->offsetUnit, t->precision, t->period, t->offset);
596
      break;
4,346✔
597
    }
598
    default:
×
599
      mstsDebug("unknown triggerType:%d", q->triggerType);
×
600
      break;
×
601
  }
602

603
  mstsDebugL("create_info: triggerCols:[%s]", (char*)q->triggerCols);
104,169!
604

605
  mstsDebugL("create_info: partitionCols:[%s]", (char*)q->partitionCols);
104,169!
606

607
  mstsDebugL("create_info: triggerScanPlan:[%s]", (char*)q->triggerScanPlan);
104,169!
608

609
  mstsDebugL("create_info: calcPlan:[%s]", (char*)q->calcPlan);
104,169!
610

611
  mstsDebugL("create_info: subTblNameExpr:[%s]", (char*)q->subTblNameExpr);
104,169!
612

613
  mstsDebugL("create_info: tagValueExpr:[%s]", (char*)q->tagValueExpr);
104,169!
614

615

616
  for (int32_t i = 0; i < calcDBNum; ++i) {
207,801✔
617
    char* dbName = taosArrayGetP(q->calcDB, i);
103,632✔
618
    mstsDebug("create_info: calcDB[%d] - %s", i, dbName);
103,632!
619
  }
620

621
  for (int32_t i = 0; i < calcScanNum; ++i) {
240,384✔
622
    SStreamCalcScan* pScan = taosArrayGet(q->calcScanPlanList, i);
136,215✔
623
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
136,215✔
624
    mstsDebugL("create_info: calcScanPlan[%d] - readFromCache:%d vgNum:%d scanPlan:[%s]", i, pScan->readFromCache, vgNum, (char*)pScan->scanPlan);
136,215!
625
    for (int32_t v = 0; v < vgNum; ++v) {
272,430✔
626
      mstsDebug("create_info: calcScanPlan[%d] vg[%d] - vgId:%d", i, v, *(int32_t*)taosArrayGet(pScan->vgList, v));
136,215!
627
    }
628
  }
629

630
  for (int32_t i = 0; i < notifyUrlNum; ++i) {
116,520✔
631
    char* url = taosArrayGetP(q->pNotifyAddrUrls, i);
12,351✔
632
    mstsDebug("create_info: notifyUrl[%d] - %s", i, url);
12,351!
633
  }
634

635
  for (int32_t i = 0; i < outColNum; ++i) {
485,631✔
636
    SFieldWithOptions* o = taosArrayGet(q->outCols, i);
381,462✔
637
    mstsDebug("create_info: outCol[%d] - name:%s type:%d flags:%d bytes:%d compress:%u typeMod:%d", 
381,462!
638
        i, o->name, o->type, o->flags, o->bytes, o->compress, o->typeMod);
639
  }
640
      
641
}
642

643
void mstLogSStmTaskStatus(char* name, int64_t streamId, SStmTaskStatus* pTask, int32_t idx) {
721,830✔
644
  mstsDebug("%s[%d]: task %" PRIx64 " deployId:%d SID:%" PRId64 " nodeId:%d tidx:%d type:%s flags:%" PRIx64 " status:%s lastUpTs:%" PRId64, 
721,830!
645
      name, idx, pTask->id.taskId, pTask->id.deployId, pTask->id.seriousId, pTask->id.nodeId, pTask->id.taskIdx,
646
      gStreamTaskTypeStr[pTask->type], pTask->flags, gStreamStatusStr[pTask->status], pTask->lastUpTs);
647
}
721,830✔
648

649
void mstLogSStmStatus(char* tips, int64_t streamId, SStmStatus* p) {
208,137✔
650
  if (!(stDebugFlag & DEBUG_DEBUG)) {
208,137✔
651
    return;
100,878✔
652
  }
653
  
654
  if (NULL == p) {
107,259!
655
    mstsDebug("%s: stream status is NULL", tips);
×
656
    return;
×
657
  }
658

659
  int32_t trigReaderNum = taosArrayGetSize(p->trigReaders);
107,259✔
660
  int32_t trigOReaderNum = taosArrayGetSize(p->trigOReaders);
107,259✔
661
  int32_t calcReaderNum = MST_LIST_SIZE(p->calcReaders);
107,259!
662
  int32_t triggerNum = p->triggerTask ? 1 : 0;
107,259✔
663
  int32_t runnerNum = 0;
107,259✔
664

665
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
427,425✔
666
    runnerNum += taosArrayGetSize(p->runners[i]);
320,166✔
667
  }
668

669
  mstsDebug("%s: stream status", tips);
107,259!
670
  mstsDebug("name:%s runnerNum:%d runnerDeploys:%d runnerReplica:%d lastActionTs:%" PRId64
107,259!
671
           " trigReaders:%d trigOReaders:%d calcReaders:%d trigger:%d runners:%d",
672
      p->streamName, p->runnerNum, p->runnerDeploys, p->runnerReplica, p->lastActionTs,
673
      trigReaderNum, trigOReaderNum, calcReaderNum, triggerNum, runnerNum);
674

675
  SStmTaskStatus* pTask = NULL;
107,259✔
676
  for (int32_t i = 0; i < trigReaderNum; ++i) {
263,784✔
677
    pTask = taosArrayGet(p->trigReaders, i);
156,525✔
678
    mstLogSStmTaskStatus("trigReader task", streamId, pTask, i);
156,525✔
679
  }
680

681
  for (int32_t i = 0; i < trigOReaderNum; ++i) {
107,259!
682
    pTask = taosArrayGet(p->trigOReaders, i);
×
683
    mstLogSStmTaskStatus("trigOReader task", streamId, pTask, i);
×
684
  }
685

686
  SListNode* pNode = listHead(p->calcReaders);
107,259✔
687
  for (int32_t i = 0; i < calcReaderNum; ++i) {
212,619✔
688
    pTask = (SStmTaskStatus*)pNode->data;
105,360✔
689
    mstLogSStmTaskStatus("calcReader task", streamId, pTask, i);
105,360✔
690
    pNode = TD_DLIST_NODE_NEXT(pNode);
105,360✔
691
  }
692

693
  if (triggerNum > 0) {
107,259!
694
    mstLogSStmTaskStatus("trigger task", streamId, p->triggerTask, 0);
107,259✔
695
  }
696

697
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
427,425✔
698
    int32_t num = taosArrayGetSize(p->runners[i]);
320,166✔
699
    if (num <= 0) {
320,166!
700
      continue;
×
701
    }
702
    
703
    mstsDebug("the %dth deploy runners status", i);
320,166!
704
    for (int32_t m = 0; m < num; ++m) {
672,852✔
705
      pTask = taosArrayGet(p->runners[i], m);
352,686✔
706
      mstLogSStmTaskStatus("runner task", streamId, pTask, m);
352,686✔
707
    }
708
  }
709
      
710
}
711

712
bool mstEventPassIsolation(int32_t num, int32_t event) {
4,908,081✔
713
  bool ret = ((mStreamMgmt.lastTs[event].ts + num * MST_SHORT_ISOLATION_DURATION) <= mStreamMgmt.hCtx.currentTs);
4,908,081✔
714
  if (ret) {
4,908,081✔
715
    mstDebug("event %s passed %d isolation, last:%" PRId64 ", curr:%" PRId64, 
3,646,012✔
716
        gMndStreamEvent[event], num, mStreamMgmt.lastTs[event].ts, mStreamMgmt.hCtx.currentTs);
717
  }
718

719
  return ret;
4,908,081✔
720
}
721

722
bool mstEventHandledChkSet(int32_t event) {
3,646,012✔
723
  if (0 == atomic_val_compare_exchange_8((int8_t*)&mStreamMgmt.lastTs[event].handled, 0, 1)) {
3,646,012✔
724
    mstDebug("event %s set handled", gMndStreamEvent[event]);
218,662✔
725
    return true;
218,662✔
726
  }
727
  return false;
3,427,350✔
728
}
729

730
int32_t mstGetStreamStatusStr(SStreamObj* pStream, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
645,135✔
731
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
645,135✔
732
  if (0 == active || MND_STM_STATE_NORMAL != state) {
645,135!
733
    mstDebug("mnode streamMgmt not in active mode, active:%d, state:%d", active, state);
618!
734
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
618!
735
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "Mnode may be unstable, try again later", msgSize);
618!
736
    return TSDB_CODE_SUCCESS;
618✔
737
  }
738

739
  if (atomic_load_8(&pStream->userDropped)) {
644,517!
740
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_DROPPING], statusSize);
×
741
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
742
    return TSDB_CODE_SUCCESS;
×
743
  }
744

745
  if (atomic_load_8(&pStream->userStopped)) {
644,517✔
746
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_STOPPED], statusSize);
11,286!
747
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
11,286!
748
    return TSDB_CODE_SUCCESS;
11,286✔
749
  }
750

751
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
633,231✔
752
  
753
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &pStream->pCreate->streamId, sizeof(pStream->pCreate->streamId));
633,231✔
754
  if (NULL == pStatus) {
633,231✔
755
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
121,581!
756
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
121,581!
757
    goto _exit;
121,581✔
758
  }
759

760
  char tmpBuf[256];
511,650✔
761
  int8_t stopped = atomic_load_8(&pStatus->stopped);
511,650✔
762
  switch (stopped) {
511,650!
763
    case 1:
1,016✔
764
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
1,016!
765
      snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s, Failed times: %" PRId64, tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes);
1,016✔
766
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
1,016!
767
      goto _exit;
1,016✔
768
      break;
769
    case 4:
×
770
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
×
771
      snprintf(tmpBuf, sizeof(tmpBuf), "Error: %s", tstrerror(TSDB_CODE_GRANT_STREAM_EXPIRED));
×
772
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
773
      goto _exit;
×
774
      break;
775
    default:
510,634✔
776
      break;
510,634✔
777
  }
778

779
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
510,634✔
780
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_RUNNING], statusSize);
212,838!
781
    strcpy(tmpBuf, "Running start from: ");
212,838✔
782
    (void)formatTimestampLocal(&tmpBuf[strlen(tmpBuf)], pStatus->triggerTask->runningStartTs, TSDB_TIME_PRECISION_MILLI);
212,838✔
783
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
212,838!
784
    goto _exit;
212,838✔
785
  }
786

787
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_INIT], statusSize);
297,796!
788
  snprintf(tmpBuf, sizeof(tmpBuf), "Current deploy times: %" PRId64, pStatus->deployTimes);
297,796✔
789
  STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
297,796!
790
  goto _exit;
297,796✔
791

792
_exit:
633,231✔
793
  
794
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
633,231✔
795

796
  return TSDB_CODE_SUCCESS;
633,231✔
797
}
798

799
int32_t mstSetStreamAttrResBlock(SMnode *pMnode, SStreamObj* pStream, SSDataBlock* pBlock, int32_t numOfRows) {
645,135✔
800
  int32_t code = 0;
645,135✔
801
  int32_t cols = 0;
645,135✔
802
  int32_t lino = 0;
645,135✔
803

804
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
645,135✔
805
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
645,135!
806
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
645,135✔
807
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
645,135!
808

809
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
645,135✔
810
  TSDB_CHECK_CODE(code, lino, _end);
645,135!
811

812
  // db_name
813
  char streamDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
645,135✔
814
  STR_WITH_MAXSIZE_TO_VARSTR(streamDB, mndGetDbStr(pStream->pCreate->streamDB), sizeof(streamDB));
645,135!
815
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
645,135✔
816
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
645,135!
817
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&streamDB, false);
645,135✔
818
  TSDB_CHECK_CODE(code, lino, _end);
645,135!
819

820
  // create time
821
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
645,135✔
822
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
645,135!
823
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->createTime, false);
645,135✔
824
  TSDB_CHECK_CODE(code, lino, _end);
645,135!
825

826
  // stream id
827
  char streamId2[19] = {0};
645,135✔
828
  char streamId[19 + VARSTR_HEADER_SIZE] = {0};
645,135✔
829
  snprintf(streamId2, sizeof(streamId2), "%" PRIx64, pStream->pCreate->streamId);
645,135✔
830
  STR_WITH_MAXSIZE_TO_VARSTR(streamId, streamId2, sizeof(streamId));
645,135!
831
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
645,135✔
832
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
645,135!
833
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamId, false);
645,135✔
834
  TSDB_CHECK_CODE(code, lino, _end);
645,135!
835

836
  // sql
837
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
645,135✔
838
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->pCreate->sql, sizeof(sql));
645,135!
839
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
645,135✔
840
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
645,135!
841
  code = colDataSetVal(pColInfo, numOfRows, (const char*)sql, false);
645,135✔
842
  TSDB_CHECK_CODE(code, lino, _end);
645,135!
843

844
  // status
845
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
645,135✔
846
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
645,135✔
847
  code = mstGetStreamStatusStr(pStream, status, sizeof(status), msg, sizeof(msg));
645,135✔
848
  TSDB_CHECK_CODE(code, lino, _end);
645,135!
849

850
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
645,135✔
851
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
645,135!
852
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
645,135✔
853
  TSDB_CHECK_CODE(code, lino, _end);
645,135!
854

855
  // snodeLeader
856
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
645,135✔
857
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
645,135!
858
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->mainSnodeId, false);
645,135✔
859
  TSDB_CHECK_CODE(code, lino, _end);
645,135!
860

861
  // snodeReplica
862
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
645,135✔
863
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
645,135!
864
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, pStream->mainSnodeId);
645,135✔
865
  int32_t replicaSnodeId = pSnode ? pSnode->replicaId : -1;
645,135!
866
  mndReleaseSnode(pMnode, pSnode);
645,135✔
867
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&replicaSnodeId, false);
645,135✔
868
  TSDB_CHECK_CODE(code, lino, _end);
645,135!
869

870
  // msg
871
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
645,135✔
872
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
645,135!
873
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
645,135✔
874

875
_end:
645,135✔
876
  if (code) {
645,135!
877
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
878
  }
879
  return code;
645,135✔
880
}
881

882

883
int32_t mstGetTaskStatusStr(SStmTaskStatus* pTask, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
15,353,695✔
884
  char tmpBuf[256];
15,282,919✔
885
  
886
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[pTask->status], statusSize);
15,353,695!
887
  if (STREAM_STATUS_FAILED == pTask->status && pTask->errCode) {
15,353,695!
888
    snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s", tstrerror(pTask->errCode));
×
889
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
890
    return TSDB_CODE_SUCCESS;
×
891
  }
892

893
  if (STREAM_TRIGGER_TASK == pTask->type && mstWaitLock(&pTask->detailStatusLock, true)) {
15,353,695!
894
    if (pTask->detailStatus) {
2,277,179✔
895
      SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pTask->detailStatus;
1,679,548✔
896
      snprintf(tmpBuf, sizeof(tmpBuf), "Current RT/HI/RE session num: %d/%d/%d, histroy progress:%d%%, total AUTO/USER recalc num: %d/%d", 
1,679,548!
897
          pTrigger->realtimeSessionNum, pTrigger->historySessionNum, pTrigger->recalcSessionNum, pTrigger->histroyProgress,
898
          pTrigger->autoRecalcNum, (int32_t)taosArrayGetSize(pTrigger->userRecalcs));
1,679,548✔
899
      taosRUnLockLatch(&pTask->detailStatusLock);
1,679,548✔
900
      return TSDB_CODE_SUCCESS;
1,679,548✔
901
    }
902

903
    taosRUnLockLatch(&pTask->detailStatusLock);    
597,631✔
904
  }
905
  
906
  STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
13,674,147!
907
  
908
  return TSDB_CODE_SUCCESS;
13,674,147✔
909
}
910

911
int32_t mstGetTaskExtraStr(SStmTaskStatus* pTask, char* extraStr, int32_t extraSize) {
15,353,695✔
912
  switch (pTask->type) {
15,353,695✔
913
    case STREAM_READER_TASK:
6,075,442✔
914
      if (STREAM_IS_TRIGGER_READER(pTask->flags)) {
6,075,442✔
915
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "trigReader", extraSize);
2,927,349!
916
      } else {
917
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "calcReader", extraSize);
3,148,093!
918
      }
919
      return TSDB_CODE_SUCCESS;
6,075,442✔
920
    case STREAM_RUNNER_TASK:
7,001,074✔
921
      if (STREAM_IS_TOP_RUNNER(pTask->flags)) {
7,001,074✔
922
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "topRunner", extraSize);
6,708,394!
923
        return TSDB_CODE_SUCCESS;
6,708,394✔
924
      }
925
      break;
292,680✔
926
    default:
2,277,179✔
927
      break;
2,277,179✔
928
  }
929

930
  STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "", extraSize);
2,569,859!
931
  return TSDB_CODE_SUCCESS;
2,569,859✔
932
}
933

934

935
int32_t mstSetStreamTaskResBlock(SStreamObj* pStream, SStmTaskStatus* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
15,353,695✔
936
  int32_t code = 0;
15,353,695✔
937
  int32_t cols = 0;
15,353,695✔
938
  int32_t lino = 0;
15,353,695✔
939

940
  // stream_name
941
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
15,353,695✔
942
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
15,353,695!
943
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
944
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
945

946
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
15,353,695✔
947
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
948

949
  // stream id
950
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
15,353,695✔
951
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
15,353,695✔
952
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE])); 
15,353,695✔
953
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
954
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
955
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
15,353,695✔
956
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
957

958
  // task id
959
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.taskId);
15,353,695✔
960
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
15,353,695✔
961
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
962
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
963
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
15,353,695✔
964
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
965

966
  // type
967
  char type[20 + VARSTR_HEADER_SIZE] = {0};
15,353,695✔
968
  STR_WITH_MAXSIZE_TO_VARSTR(type, (STREAM_READER_TASK == pTask->type) ? "Reader" : ((STREAM_TRIGGER_TASK == pTask->type) ? "Trigger" : "Runner"), sizeof(type));
15,353,695!
969
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
970
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
971
  code = colDataSetVal(pColInfo, numOfRows, (const char*)type, false);
15,353,695✔
972
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
973

974
  // serious id
975
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.seriousId);
15,353,695✔
976
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
15,353,695✔
977
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
978
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
979
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
15,353,695✔
980
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
981

982
  // deploy id
983
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
984
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
985
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.deployId, false);
15,353,695✔
986
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
987

988
  // node_type
989
  char nodeType[10 + VARSTR_HEADER_SIZE] = {0};
15,353,695✔
990
  STR_WITH_MAXSIZE_TO_VARSTR(nodeType, (STREAM_READER_TASK == pTask->type) ? "vnode" : "snode", sizeof(nodeType));
15,353,695!
991
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
992
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
993
  code = colDataSetVal(pColInfo, numOfRows, (const char*)nodeType, false);
15,353,695✔
994
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
995

996
  // node id
997
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
998
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
999
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.nodeId, false);
15,353,695✔
1000
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
1001

1002
  // task idx
1003
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
1004
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
1005
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.taskIdx, false);
15,353,695✔
1006
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
1007

1008
  // status
1009
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
15,353,695✔
1010
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
15,353,695✔
1011
  code = mstGetTaskStatusStr(pTask, status, sizeof(status), msg, sizeof(msg));
15,353,695✔
1012
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
1013

1014
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
1015
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
1016
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
15,353,695✔
1017
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
1018

1019
  // start time
1020
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
1021
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
1022
  if (pTask->runningStartTs) {
15,353,695✔
1023
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, false);
2,086,301✔
1024
  } else {
1025
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, true);
13,267,394✔
1026
  }
1027
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
1028

1029
  // last update
1030
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
1031
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
1032
  if (pTask->lastUpTs) {
15,353,695!
1033
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, false);
15,353,695✔
1034
  } else {
1035
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, true);
×
1036
  }
1037
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
1038

1039
  // extra info
1040
  char extra[64 + VARSTR_HEADER_SIZE] = {0};
15,353,695✔
1041
  code = mstGetTaskExtraStr(pTask, extra, sizeof(extra));
15,353,695✔
1042
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
1043
  
1044
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
1045
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
1046
  code = colDataSetVal(pColInfo, numOfRows, (const char*)extra, false);
15,353,695✔
1047
  TSDB_CHECK_CODE(code, lino, _end);
15,353,695!
1048

1049
  // msg
1050
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,353,695✔
1051
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,353,695!
1052
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
15,353,695✔
1053

1054
_end:
15,353,695✔
1055
  if (code) {
15,353,695!
1056
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1057
  }
1058
  return code;
15,353,695✔
1059
}
1060

1061
int32_t mstGetNumOfStreamTasks(SStmStatus* pStatus) {
2,278,865✔
1062
  int32_t num = taosArrayGetSize(pStatus->trigReaders) + taosArrayGetSize(pStatus->trigOReaders) + MST_LIST_SIZE(pStatus->calcReaders) + (pStatus->triggerTask ? 1 : 0);
2,278,865!
1063
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
9,115,460✔
1064
    num += taosArrayGetSize(pStatus->runners[i]);
6,836,595✔
1065
  }
1066

1067
  return num;
2,278,865✔
1068
}
1069

1070
int32_t mstSetStreamTasksResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
2,441,617✔
1071
  int32_t code = 0;
2,441,617✔
1072
  int32_t lino = 0;
2,441,617✔
1073
  int64_t streamId = pStream->pCreate->streamId;
2,441,617✔
1074

1075
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
2,441,617✔
1076

1077
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
2,441,617✔
1078
  if (NULL == pStatus) {
2,441,617✔
1079
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
162,752✔
1080
    goto _exit;
162,752✔
1081
  }
1082

1083
  int8_t stopped = atomic_load_8(&pStatus->stopped);
2,278,865✔
1084
  if (stopped) {
2,278,865!
1085
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1086
    goto _exit;
×
1087
  }
1088
  
1089
  int32_t count = mstGetNumOfStreamTasks(pStatus);
2,278,865✔
1090

1091
  if (*numOfRows + count > rowsCapacity) {
2,278,865✔
1092
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
69,605✔
1093
    if (code) {
69,605!
1094
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1095
      TAOS_CHECK_EXIT(code);
×
1096
    }
1097
  }
1098

1099
  SStmTaskStatus* pTask = NULL;
2,278,865✔
1100
  int32_t trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
2,278,865✔
1101
  for (int32_t i = 0; i < trigReaderNum; ++i) {
4,784,268✔
1102
    pTask = taosArrayGet(pStatus->trigReaders, i);
2,505,403✔
1103
  
1104
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
2,505,403✔
1105
    if (code == TSDB_CODE_SUCCESS) {
2,505,403!
1106
      (*numOfRows)++;
2,505,403✔
1107
    }
1108
  }
1109

1110
  trigReaderNum = taosArrayGetSize(pStatus->trigOReaders);
2,278,865✔
1111
  for (int32_t i = 0; i < trigReaderNum; ++i) {
2,700,811✔
1112
    pTask = taosArrayGet(pStatus->trigOReaders, i);
421,946✔
1113
  
1114
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
421,946✔
1115
    if (code == TSDB_CODE_SUCCESS) {
421,946!
1116
      (*numOfRows)++;
421,946✔
1117
    }
1118
  }
1119

1120

1121
  int32_t calcReaderNum = MST_LIST_SIZE(pStatus->calcReaders);
2,278,865!
1122
  SListNode* pNode = listHead(pStatus->calcReaders);
2,278,865✔
1123
  for (int32_t i = 0; i < calcReaderNum; ++i) {
5,426,958✔
1124
    pTask = (SStmTaskStatus*)pNode->data;
3,148,093✔
1125
  
1126
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
3,148,093✔
1127
    if (code == TSDB_CODE_SUCCESS) {
3,148,093!
1128
      (*numOfRows)++;
3,148,093✔
1129
    }
1130
    pNode = TD_DLIST_NODE_NEXT(pNode);
3,148,093✔
1131
  }
1132

1133
  if (pStatus->triggerTask) {
2,278,865✔
1134
    code = mstSetStreamTaskResBlock(pStream, pStatus->triggerTask, pBlock, *numOfRows);
2,277,179✔
1135
    if (code == TSDB_CODE_SUCCESS) {
2,277,179!
1136
      (*numOfRows)++;
2,277,179✔
1137
    }
1138
  }
1139

1140
  int32_t runnerNum = 0;
2,278,865✔
1141
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
9,115,460✔
1142
    runnerNum = taosArrayGetSize(pStatus->runners[i]);
6,836,595✔
1143
    for (int32_t m = 0; m < runnerNum; ++m) {
13,837,669✔
1144
      pTask = taosArrayGet(pStatus->runners[i], m);
7,001,074✔
1145
    
1146
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
7,001,074✔
1147
      if (code == TSDB_CODE_SUCCESS) {
7,001,074!
1148
        (*numOfRows)++;
7,001,074✔
1149
      }
1150
    }
1151
  }
1152
  
1153
  pBlock->info.rows = *numOfRows;
2,278,865✔
1154

1155
_exit:
2,441,617✔
1156
  
1157
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
2,441,617✔
1158

1159
  if (code) {
2,441,617!
1160
    mError("error happens when build stream tasks result block, lino:%d, code:%s", lino, tstrerror(code));
×
1161
  }
1162
  
1163
  return code;
2,441,617✔
1164
}
1165

1166

1167
int32_t mstAppendNewRecalcRange(int64_t streamId, SStmStatus *pStream, STimeWindow* pRange) {
7,020✔
1168
  int32_t code = 0;
7,020✔
1169
  int32_t lino = 0;
7,020✔
1170
  bool    locked = false;
7,020✔
1171
  SArray* userRecalcList = NULL;
7,020✔
1172

1173
  SStreamRecalcReq req = {.recalcId = 0, .start = pRange->skey, .end = pRange->ekey};
7,020✔
1174
  TAOS_CHECK_EXIT(taosGetSystemUUIDU64(&req.recalcId));
7,020!
1175
  
1176
  taosWLockLatch(&pStream->userRecalcLock);
7,020✔
1177
  locked = true;
7,020✔
1178
  
1179
  if (NULL == pStream->userRecalcList) {
7,020!
1180
    userRecalcList = taosArrayInit(2, sizeof(SStreamRecalcReq));
7,020✔
1181
    if (NULL == userRecalcList) {
7,020!
1182
      TAOS_CHECK_EXIT(terrno);
×
1183
    }
1184

1185
    TSDB_CHECK_NULL(taosArrayPush(userRecalcList, &req), code, lino, _exit, terrno);
7,020!
1186

1187
    atomic_store_ptr(&pStream->userRecalcList, userRecalcList);
7,020✔
1188
    userRecalcList = NULL;    
7,020✔
1189
  } else {
1190
    TSDB_CHECK_NULL(taosArrayPush(pStream->userRecalcList, &req), code, lino, _exit, terrno);
×
1191
  }
1192
  
1193
  mstsInfo("stream recalc ID:%" PRIx64 " range:%" PRId64 " - %" PRId64 " added", req.recalcId, pRange->skey, pRange->ekey);
7,020!
1194

1195
_exit:
7,020✔
1196

1197
  taosArrayDestroy(userRecalcList);
7,020✔
1198

1199
  if (locked) {
7,020!
1200
    taosWUnLockLatch(&pStream->userRecalcLock);
7,020✔
1201
  }
1202
  
1203
  if (code) {
7,020!
1204
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1205
  }
1206
  
1207
  return code;
7,020✔
1208
}
1209

1210

1211

1212
int32_t mstSetStreamRecalculateResBlock(SStreamObj* pStream, SSTriggerRecalcProgress* pProgress, SSDataBlock* pBlock, int32_t numOfRows) {
×
1213
  int32_t code = 0;
×
1214
  int32_t cols = 0;
×
1215
  int32_t lino = 0;
×
1216

1217
  // stream_name
1218
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1219
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
1220
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1221
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1222

1223
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
1224
  TSDB_CHECK_CODE(code, lino, _end);
×
1225

1226
  // stream id
1227
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
×
1228
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
×
1229
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE); 
×
1230
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1231
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1232
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1233
  TSDB_CHECK_CODE(code, lino, _end);
×
1234

1235
  // recalc id
1236
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pProgress->recalcId);
×
1237
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1238
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1239
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1240
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1241
  TSDB_CHECK_CODE(code, lino, _end);
×
1242

1243
  // start
1244
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1245
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1246
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->start, false);
×
1247
  TSDB_CHECK_CODE(code, lino, _end);
×
1248

1249
  // end
1250
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1251
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1252
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->end, false);
×
1253
  TSDB_CHECK_CODE(code, lino, _end);
×
1254

1255
  // progress
1256
  char progress[20 + VARSTR_HEADER_SIZE] = {0};
×
1257
  snprintf(&progress[VARSTR_HEADER_SIZE], sizeof(progress) - VARSTR_HEADER_SIZE, "%d%%", pProgress->progress);
×
1258
  varDataSetLen(progress, strlen(&progress[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1259
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1260
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1261
  code = colDataSetVal(pColInfo, numOfRows, (const char*)progress, false);
×
1262
  TSDB_CHECK_CODE(code, lino, _end);
×
1263

1264
_end:
×
1265
  if (code) {
×
1266
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1267
  }
1268
  return code;
×
1269
}
1270

1271

1272
int32_t mstSetStreamRecalculatesResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
576✔
1273
  int32_t code = 0;
576✔
1274
  int32_t lino = 0;
576✔
1275
  int64_t streamId = pStream->pCreate->streamId;
576✔
1276

1277
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
576✔
1278

1279
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
576✔
1280
  if (NULL == pStatus) {
576!
1281
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
×
1282
    goto _exit;
×
1283
  }
1284

1285
  int8_t stopped = atomic_load_8(&pStatus->stopped);
576✔
1286
  if (stopped) {
576!
1287
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1288
    goto _exit;
×
1289
  }
1290

1291
  if (NULL == pStatus->triggerTask) {
576!
1292
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1293
    goto _exit;
×
1294
  }
1295

1296
  (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, true);
576✔
1297
  if (NULL == pStatus->triggerTask->detailStatus) {
576!
1298
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1299
    taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1300
    goto _exit;
×
1301
  }
1302

1303
  SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pStatus->triggerTask->detailStatus;
576✔
1304
  int32_t count = taosArrayGetSize(pTrigger->userRecalcs);
576✔
1305

1306
  if (*numOfRows + count > rowsCapacity) {
576!
1307
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
×
1308
    if (code) {
×
1309
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1310
      taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1311
      TAOS_CHECK_EXIT(code);
×
1312
    }
1313
  }
1314

1315
  for (int32_t i = 0; i < count; ++i) {
576!
1316
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pTrigger->userRecalcs, i);
×
1317
  
1318
    code = mstSetStreamRecalculateResBlock(pStream, pProgress, pBlock, *numOfRows);
×
1319
    if (code == TSDB_CODE_SUCCESS) {
×
1320
      (*numOfRows)++;
×
1321
    }
1322
  }
1323

1324
  taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
576✔
1325
  
1326
  pBlock->info.rows = *numOfRows;
576✔
1327

1328
_exit:
576✔
1329
  
1330
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
576✔
1331

1332
  if (code) {
576!
1333
    mError("error happens when build stream recalculates result block, lino:%d, code:%s", lino, tstrerror(code));
×
1334
  }
1335
  
1336
  return code;
576✔
1337
}
1338

1339
int32_t mstGetScanUidFromPlan(int64_t streamId, void* scanPlan, int64_t* uid) {
257,929✔
1340
  SSubplan* pSubplan = NULL;
257,929✔
1341
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
257,929✔
1342
  
1343
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
257,929!
1344

1345
  if (pSubplan->pNode && nodeType(pSubplan->pNode) == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
257,929!
1346
    SScanPhysiNode* pScanNode = (SScanPhysiNode*)pSubplan->pNode;
110,456✔
1347
    *uid = pScanNode->uid;
110,456✔
1348
  }
1349
  
1350
_exit:
256,687✔
1351

1352
  if (code) {
257,929!
1353
    mstsError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1354
  }
1355

1356
  nodesDestroyNode((SNode *)pSubplan);
257,929✔
1357

1358
  return code;
257,929✔
1359
}
1360

1361

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc