• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5014

03 Apr 2026 03:59PM UTC coverage: 72.256% (-0.06%) from 72.317%
#5014

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4054 of 5985 new or added lines in 68 files covered. (67.74%)

13285 existing lines in 168 files now uncovered.

257272 of 356056 relevant lines covered (72.26%)

133154720.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.28
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23
#include "mndSnode.h"
24

25
bool mstWaitLock(SRWLatch* pLock, bool readLock) {
29,763,209✔
26
  if (readLock) {
29,763,209✔
27
    while (taosRTryLockLatch(pLock)) {
27,264,851✔
28
      taosMsleep(1);
177,309✔
29
    }
30

31
    return true;
27,087,542✔
32
  }
33

34
  taosWWaitLockLatch(pLock);
2,675,667✔
35

36
  return true;
2,675,667✔
37
}
38

39
void mstDestroySStmVgStreamStatus(void* p) { 
446,965✔
40
  SStmVgStreamStatus* pStatus = (SStmVgStreamStatus*)p;
446,965✔
41
  taosArrayDestroy(pStatus->trigReaders); 
446,965✔
42
  taosArrayDestroy(pStatus->calcReaders); 
446,965✔
43
}
446,965✔
44

45
void mstDestroySStmSnodeStreamStatus(void* p) { 
204,768✔
46
  SStmSnodeStreamStatus* pStatus = (SStmSnodeStreamStatus*)p;
204,768✔
47
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
819,072✔
48
    taosArrayDestroy(pStatus->runners[i]);
614,304✔
49
    pStatus->runners[i] = NULL;
614,304✔
50
  }
51
}
204,768✔
52

53

54
void mstDestroyVgroupStatus(SStmVgroupStatus* pVgStatus) {
×
55
  taosHashCleanup(pVgStatus->streamTasks);
×
56
  pVgStatus->streamTasks = NULL;
×
57
}
×
58

59
void mstDestroySStmTaskToDeployExt(void* param) {
1,521,254✔
60
  SStmTaskToDeployExt* pExt = (SStmTaskToDeployExt*)param;
1,521,254✔
61
  if (pExt->deployed) {
1,521,254✔
62
    return;
1,520,662✔
63
  }
64
  
65
  switch (pExt->deploy.task.type) {
592✔
UNCOV
66
    case STREAM_TRIGGER_TASK:
×
UNCOV
67
      taosArrayDestroy(pExt->deploy.msg.trigger.readerList);
×
UNCOV
68
      pExt->deploy.msg.trigger.readerList = NULL;
×
UNCOV
69
      taosArrayDestroy(pExt->deploy.msg.trigger.runnerList);
×
UNCOV
70
      pExt->deploy.msg.trigger.runnerList = NULL;
×
UNCOV
71
      break;
×
UNCOV
72
    case STREAM_RUNNER_TASK:
×
UNCOV
73
      taosMemoryFreeClear(pExt->deploy.msg.runner.pPlan);
×
UNCOV
74
      break;
×
75
    case STREAM_READER_TASK:
592✔
76
      if (!pExt->deploy.msg.reader.triggerReader) {
592✔
UNCOV
77
        SStreamReaderDeployFromCalc* pCalcReaderDeploy = &pExt->deploy.msg.reader.msg.calc;
×
UNCOV
78
        taosMemoryFreeClear(pCalcReaderDeploy->calcScanPlan);
×
79
      }
80
      break;
592✔
81
    default:  
×
82
      break;;
×
83
  }
84
}
85

86
void mstDestroyScanAddrList(void* param) {
393,495✔
87
  if (NULL == param) {
393,495✔
88
    return;
×
89
  }
90
  SArray* pList = *(SArray**)param;
393,495✔
91
  taosArrayDestroy(pList);
393,495✔
92
}
93

94
void mstDestroySStmSnodeTasksDeploy(void* param) {
59,166✔
95
  SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)param;
59,166✔
96
  taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
59,166✔
97
  taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
59,166✔
98
}
59,166✔
99

100
void mstDestroySStmVgTasksToDeploy(void* param) {
141,368✔
101
  SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)param;
141,368✔
102
  taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
141,368✔
103
}
141,368✔
104

105
void mstDestroySStmSnodeStatus(void* param) {
36,567✔
106
  SStmSnodeStatus* pSnode = (SStmSnodeStatus*)param;
36,567✔
107
  taosHashCleanup(pSnode->streamTasks);
36,567✔
108
}
36,567✔
109

110
void mstDestroySStmVgroupStatus(void* param) {
84,149✔
111
  SStmVgroupStatus* pVg = (SStmVgroupStatus*)param;
84,149✔
112
  taosHashCleanup(pVg->streamTasks);
84,149✔
113
}
84,149✔
114

115
void mstFreeTrigOReaderList(void* param) {
50,945✔
116
  SArray** ppList = (SArray**)param;
50,945✔
117
  taosArrayDestroy(*ppList);
50,945✔
118
}
50,945✔
119

120
void mstResetSStmStatus(SStmStatus* pStatus) {
199,183✔
121
  (void)mstWaitLock(&pStatus->resetLock, false);
199,183✔
122

123
  taosArrayDestroy(pStatus->trigReaders);
199,183✔
124
  pStatus->trigReaders = NULL;
199,183✔
125
  taosArrayDestroyEx(pStatus->trigOReaders, mstFreeTrigOReaderList);
199,183✔
126
  pStatus->trigOReaders = NULL;
199,183✔
127
  pStatus->calcReaders = tdListFree(pStatus->calcReaders);
199,183✔
128
  if (pStatus->triggerTask) {
199,183✔
129
    (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, false);
198,616✔
130
    taosMemoryFreeClear(pStatus->triggerTask->detailStatus);
198,616✔
131
    taosWUnLockLatch(&pStatus->triggerTask->detailStatusLock);
198,616✔
132
  }
133
  taosMemoryFreeClear(pStatus->triggerTask);
199,183✔
134
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
796,732✔
135
    taosArrayDestroy(pStatus->runners[i]);
597,549✔
136
    pStatus->runners[i] = NULL;
597,549✔
137
  }
138
  pStatus->lastTrigMgmtReqId = 0;
199,183✔
139

140
  taosWUnLockLatch(&pStatus->resetLock);
199,183✔
141
}
199,183✔
142

143
void mstDestroySStmStatus(void* param) {
191,319✔
144
  SStmStatus* pStatus = (SStmStatus*)param;
191,319✔
145
  taosMemoryFreeClear(pStatus->streamName);
191,319✔
146

147
  mstResetSStmStatus(pStatus);
191,319✔
148

149
  taosWLockLatch(&pStatus->userRecalcLock);
191,319✔
150
  taosArrayDestroy(pStatus->userRecalcList);
191,319✔
151
  taosWUnLockLatch(&pStatus->userRecalcLock);
191,319✔
152

153
  tFreeSCMCreateStreamReq(pStatus->pCreate);
191,319✔
154
  taosMemoryFreeClear(pStatus->pCreate);  
191,319✔
155
}
191,319✔
156

157
void mstDestroySStmAction(void* param) {
251,790✔
158
  SStmAction* pAction = (SStmAction*)param;
251,790✔
159

160
  taosArrayDestroy(pAction->undeploy.taskList);
251,790✔
161
  taosArrayDestroy(pAction->recalc.recalcList);
251,790✔
162
}
251,790✔
163

164
void mstClearSStmStreamDeploy(SStmStreamDeploy* pDeploy) {
264,894✔
165
  pDeploy->readerTasks = NULL;
264,894✔
166
  pDeploy->triggerTask = NULL;
264,894✔
167
  pDeploy->runnerTasks = NULL;
264,894✔
168
}
264,894✔
169

170
int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped) {
263,548✔
171
  SSdb   *pSdb = pMnode->pSdb;
263,548✔
172
  void   *pIter = NULL;
263,548✔
173
  
174
  while (1) {
669,017✔
175
    SStreamObj *pStream = NULL;
932,565✔
176
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
932,565✔
177
    if (pIter == NULL) break;
932,565✔
178

179
    if (pStream->pCreate->streamId == streamId) {
737,098✔
180
      *dropped = pStream->userDropped ? true : false;
68,081✔
181
      sdbRelease(pSdb, pStream);
68,081✔
182
      sdbCancelFetch(pSdb, pIter);
68,081✔
183
      mstsDebug("stream found, dropped:%d", *dropped);
68,081✔
184
      return TSDB_CODE_SUCCESS;
68,081✔
185
    }
186
    
187
    sdbRelease(pSdb, pStream);
669,017✔
188
  }
189

190
  *dropped = true;
195,467✔
191

192
  return TSDB_CODE_SUCCESS;
195,467✔
193
}
194

195
typedef struct SStmCheckDbInUseCtx {
196
  bool* dbStream;
197
  bool* vtableStream;
198
  bool  ignoreCurrDb;
199
} SStmCheckDbInUseCtx;
200

201
static bool mstChkSetDbInUse(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
39,422✔
202
  SStreamObj *pStream = pObj;
39,422✔
203
  if (atomic_load_8(&pStream->userDropped)) {
39,422✔
204
    return true;
×
205
  }
206

207
  SStmCheckDbInUseCtx* pCtx = (SStmCheckDbInUseCtx*)p2;
39,422✔
208
  if (pCtx->ignoreCurrDb && 0 == strcmp(pStream->pCreate->streamDB, p1)) {
39,422✔
209
    return true;
19,273✔
210
  }
211
  
212
  if (pStream->pCreate->triggerDB && 0 == strcmp(pStream->pCreate->triggerDB, p1)) {
20,149✔
213
    *pCtx->dbStream = true;
701✔
214
    return false;
701✔
215
  }
216

217
  int32_t calcDBNum = taosArrayGetSize(pStream->pCreate->calcDB);
19,448✔
218
  for (int32_t i = 0; i < calcDBNum; ++i) {
38,406✔
219
    char* calcDB = taosArrayGetP(pStream->pCreate->calcDB, i);
19,448✔
220
    if (0 == strcmp(calcDB, p1)) {
19,448✔
221
      *pCtx->dbStream = true;
490✔
222
      return false;
490✔
223
    }
224
  }
225

226
  if (pStream->pCreate->vtableCalc || STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
18,958✔
227
    *pCtx->vtableStream = true;
10,651✔
228
    return true;
10,651✔
229
  }
230
  
231
  return true;
8,307✔
232
}
233

234
void mstCheckDbInUse(SMnode *pMnode, char *dbFName, bool *dbStream, bool *vtableStream, bool ignoreCurrDb) {
739,330✔
235
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
739,330✔
236
  if (streamNum <= 0) {
739,330✔
237
    return;
729,726✔
238
  }
239

240
  SStmCheckDbInUseCtx ctx = {dbStream, vtableStream, ignoreCurrDb};
9,604✔
241
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mstChkSetDbInUse, dbFName, &ctx, NULL);
9,604✔
242
}
243

244
static void mstShowStreamStatus(char *dst, int8_t status, int32_t bufLen) {
×
245
  if (status == STREAM_STATUS_INIT) {
×
246
    tstrncpy(dst, "init", bufLen);
×
247
  } else if (status == STREAM_STATUS_RUNNING) {
×
248
    tstrncpy(dst, "running", bufLen);
×
249
  } else if (status == STREAM_STATUS_STOPPED) {
×
250
    tstrncpy(dst, "stopped", bufLen);
×
251
  } else if (status == STREAM_STATUS_FAILED) {
×
252
    tstrncpy(dst, "failed", bufLen);
×
253
  }
254
}
×
255

256
int32_t mstCheckSnodeExists(SMnode *pMnode) {
×
257
  SSdb      *pSdb = pMnode->pSdb;
×
258
  void      *pIter = NULL;
×
259
  SSnodeObj *pObj = NULL;
×
260

261
  while (1) {
262
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
263
    if (pIter == NULL) {
×
264
      break;
×
265
    }
266

267
    sdbRelease(pSdb, pObj);
×
268
    sdbCancelFetch(pSdb, pIter);
×
269
    return TSDB_CODE_SUCCESS;
×
270
  }
271

272
  return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
273
}
274

275
void mstSetTaskStatusFromMsg(SStmGrpCtx* pCtx, SStmTaskStatus* pTask, SStmTaskStatusMsg* pMsg) {
×
276
  pTask->id.taskId = pMsg->taskId;
×
277
  pTask->id.deployId = pMsg->deployId;
×
278
  pTask->id.seriousId = pMsg->seriousId;
×
279
  pTask->id.nodeId = pMsg->nodeId;
×
280
  pTask->id.taskIdx = pMsg->taskIdx;
×
281

282
  pTask->type = pMsg->type;
×
283
  pTask->flags = pMsg->flags;
×
284
  pTask->status = pMsg->status;
×
285
  pTask->lastUpTs = pCtx->currTs;
×
286
}
×
287

288
bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param) {
704,076✔
289
  while (0 == atomic_load_64(&pQueue->qRemainNum)) {
704,076✔
290
    return false;
494,074✔
291
  }
292

293
  SStmQNode *orig = pQueue->head;
210,002✔
294

295
  SStmQNode *node = pQueue->head->next;
210,002✔
296
  pQueue->head = pQueue->head->next;
210,002✔
297

298
  *param = node;
210,002✔
299

300
  taosMemoryFreeClear(orig);
210,002✔
301

302
  (void)atomic_sub_fetch_64(&pQueue->qRemainNum, 1);
210,002✔
303

304
  return true;
210,002✔
305
}
306

307
void mndStreamActionEnqueue(SStmActionQ* pQueue, SStmQNode* param) {
210,002✔
308
  taosWLockLatch(&pQueue->lock);
210,002✔
309
  pQueue->tail->next = param;
210,002✔
310
  pQueue->tail = param;
210,002✔
311
  taosWUnLockLatch(&pQueue->lock);
210,002✔
312

313
  (void)atomic_add_fetch_64(&pQueue->qRemainNum, 1);
210,002✔
314
}
210,002✔
315

316
char* mstGetStreamActionString(int32_t action) {
185,183✔
317
  switch (action) {
185,183✔
318
    case STREAM_ACT_DEPLOY:
185,183✔
319
      return "DEPLOY";
185,183✔
320
    case STREAM_ACT_UNDEPLOY:
×
321
      return "UNDEPLOY";
×
322
    case STREAM_ACT_START:
×
323
      return "START";
×
324
    case STREAM_ACT_UPDATE_TRIGGER:
×
325
      return "UPDATE TRIGGER";
×
326
    case STREAM_ACT_RECALC:
×
327
      return "USER RECALC";
×
328
    default:
×
329
      break;
×
330
  }
331

332
  return "UNKNOWN";
×
333
}
334

335
void mstPostStreamAction(SStmActionQ*       actionQ, int64_t streamId, char* streamName, void* param, bool userAction, int32_t action) {
208,468✔
336
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
208,468✔
337
  if (NULL == pNode) {
208,468✔
338
    taosMemoryFreeClear(param);
×
339
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
340
    return;
×
341
  }
342

343
  pNode->type = action;
208,468✔
344
  pNode->streamAct = true;
208,468✔
345
  pNode->action.stream.streamId = streamId;
208,468✔
346
  tstrncpy(pNode->action.stream.streamName, streamName, sizeof(pNode->action.stream.streamName));
208,468✔
347
  pNode->action.stream.userAction = userAction;
208,468✔
348
  pNode->action.stream.actionParam = param;
208,468✔
349
  
350
  pNode->next = NULL;
208,468✔
351

352
  mndStreamActionEnqueue(actionQ, pNode);
208,468✔
353

354
  mstsDebug("stream action %s posted enqueue", mstGetStreamActionString(action));
208,468✔
355
}
356

357
void mstPostTaskAction(SStmActionQ*        actionQ, SStmTaskAction* pAction, int32_t action) {
1,534✔
358
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
1,534✔
359
  if (NULL == pNode) {
1,534✔
360
    int64_t streamId = pAction->streamId;
×
361
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
362
    return;
×
363
  }
364

365
  pNode->type = action;
1,534✔
366
  pNode->streamAct = false;
1,534✔
367
  pNode->action.task = *pAction;
1,534✔
368
  
369
  pNode->next = NULL;
1,534✔
370

371
  mndStreamActionEnqueue(actionQ, pNode);
1,534✔
372
}
373

374
void mstDestroyDbVgroupsHash(SSHashObj *pDbVgs) {
61,509✔
375
  int32_t iter = 0;
61,509✔
376
  SDBVgHashInfo* pVg = NULL;
61,509✔
377
  void* p = NULL;
61,509✔
378
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
289,646✔
379
    pVg = (SDBVgHashInfo*)p;
228,137✔
380
    taosArrayDestroy(pVg->vgArray);
228,137✔
381
  }
382
  
383
  tSimpleHashCleanup(pDbVgs);
61,509✔
384
}
61,509✔
385

386

387
int32_t mstBuildDBVgroupsMap(SMnode* pMnode, SSHashObj** ppRes) {
61,509✔
388
  void*   pIter = NULL;
61,509✔
389
  int32_t code = TSDB_CODE_SUCCESS;
61,509✔
390
  int32_t lino = 0;
61,509✔
391
  SArray* pTarget = NULL;
61,509✔
392
  SArray* pNew = NULL;
61,509✔
393
  SDbObj* pDb = NULL;
61,509✔
394
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
61,509✔
395
  SVgObj* pVgroup = NULL;
61,509✔
396

397
  SSHashObj* pDbVgroup = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
61,509✔
398
  TSDB_CHECK_NULL(pDbVgroup, code, lino, _exit, terrno);
61,509✔
399

400
  while (1) {
375,857✔
401
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
437,366✔
402
    if (pIter == NULL) {
437,366✔
403
      break;
61,509✔
404
    }
405

406
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
375,857✔
407
    if (NULL == pDbInfo) {
375,857✔
408
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
228,137✔
409
      if (NULL == pNew) {
228,137✔
410
        sdbRelease(pMnode->pSdb, pVgroup);
×
411
        sdbCancelFetch(pMnode->pSdb, pIter);
×
412
        pVgroup = NULL;
×
413
        TSDB_CHECK_NULL(pNew, code, lino, _exit, terrno);
×
414
      }
415
      
416
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
228,137✔
417
      if (NULL == pDb) {
228,137✔
418
        sdbRelease(pMnode->pSdb, pVgroup);
×
419
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
420
        pVgroup = NULL;
×
421
        TSDB_CHECK_NULL(pDb, code, lino, _exit, terrno);
×
422
      }
423
      dbInfo.vgSorted = false;
228,137✔
424
      dbInfo.hashMethod = pDb->cfg.hashMethod;
228,137✔
425
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
228,137✔
426
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
228,137✔
427
      dbInfo.vgArray = pNew;
228,137✔
428
      
429
      mndReleaseDb(pMnode, pDb);
228,137✔
430

431
      pTarget = pNew;
228,137✔
432
    } else {
433
      pTarget = pDbInfo->vgArray;
147,720✔
434
    }
435

436
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
375,857✔
437
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
375,857✔
438
      sdbRelease(pMnode->pSdb, pVgroup);
×
439
      sdbCancelFetch(pMnode->pSdb, pIter);      
×
440
      pVgroup = NULL;
×
441
      TSDB_CHECK_NULL(NULL, code, lino, _exit, terrno);
×
442
    }
443

444
    if (NULL == pDbInfo) {
375,857✔
445
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
228,137✔
446
      if (code) {
228,137✔
447
        sdbRelease(pMnode->pSdb, pVgroup);
×
448
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
449
        pVgroup = NULL;
×
450
        TAOS_CHECK_EXIT(code);
×
451
      }
452
      pNew = NULL;
228,137✔
453
    }
454

455
    sdbRelease(pMnode->pSdb, pVgroup);
375,857✔
456
    pVgroup = NULL;
375,857✔
457
  }
458

459
  *ppRes = pDbVgroup;
61,509✔
460
  
461
_exit:
61,509✔
462

463
  taosArrayDestroy(pNew);
61,509✔
464

465
  if (code) {
61,509✔
466
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
467
  }
468

469
  return code;
61,509✔
470
}
471

472
int mstDbVgInfoComp(const void* lp, const void* rp) {
104,750✔
473
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
104,750✔
474
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
104,750✔
475
  if (pLeft->hashBegin < pRight->hashBegin) {
104,750✔
476
    return -1;
104,750✔
477
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
478
    return 1;
×
479
  }
480

481
  return 0;
×
482
}
483

484
int32_t mstTableHashValueComp(void const* lp, void const* rp) {
244,353✔
485
  uint32_t*    key = (uint32_t*)lp;
244,353✔
486
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
244,353✔
487

488
  if (*key < pVg->hashBegin) {
244,353✔
489
    return -1;
×
490
  } else if (*key > pVg->hashEnd) {
244,353✔
491
    return 1;
65,714✔
492
  }
493

494
  return 0;
178,639✔
495
}
496

497

498
int32_t mstGetTableVgId(SSHashObj* pDbVgroups, char* dbFName, char *tbName, int32_t* vgId) {
178,639✔
499
  int32_t code = 0;
178,639✔
500
  int32_t lino = 0;
178,639✔
501
  SVgroupInfo* vgInfo = NULL;
178,639✔
502
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
178,639✔
503

504
  SDBVgHashInfo* dbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, dbFName, strlen(dbFName) + 1);
178,639✔
505
  if (NULL == dbInfo) {
178,639✔
506
    mstError("db %s does not exist", dbFName);
×
507
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
508
  }
509
  
510
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
178,639✔
511
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
357,278✔
512
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
178,639✔
513

514
  if (!dbInfo->vgSorted) {
178,639✔
515
    taosArraySort(dbInfo->vgArray, mstDbVgInfoComp);
61,753✔
516
    dbInfo->vgSorted = true;
61,753✔
517
  }
518

519
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, mstTableHashValueComp, TD_EQ);
178,639✔
520
  if (NULL == vgInfo) {
178,639✔
521
    mstError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
522
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
523
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
524
  }
525

526
  *vgId = vgInfo->vgId;
178,639✔
527

528
_exit:
178,639✔
529

530
  if (code) {
178,639✔
531
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
532
  }
533

534
  return code;
178,639✔
535
}
536

537

538
void mstLogSStreamObj(char* tips, SStreamObj* p) {
198,743✔
539
  if (!(stDebugFlag & DEBUG_DEBUG)) {
198,743✔
540
    return;
22,552✔
541
  }
542
  
543
  if (NULL == p) {
176,191✔
544
    mstDebug("%s: stream is NULL", tips);
×
545
    return;
×
546
  }
547

548
  mstDebug("%s: stream obj", tips);
176,191✔
549
  mstDebug("name:%s mainSnodeId:%d userDropped:%d userStopped:%d createTime:%" PRId64 " updateTime:%" PRId64,
176,191✔
550
      p->name, p->mainSnodeId, p->userDropped, p->userStopped, p->createTime, p->updateTime);
551

552
  SCMCreateStreamReq* q = p->pCreate;
176,191✔
553
  if (NULL == q) {
176,191✔
554
    mstDebug("stream pCreate is NULL");
×
555
    return;
×
556
  }
557

558
  int64_t streamId = q->streamId;
176,191✔
559
  int32_t calcDBNum = taosArrayGetSize(q->calcDB);
176,191✔
560
  int32_t calcScanNum = taosArrayGetSize(q->calcScanPlanList);
176,191✔
561
  int32_t notifyUrlNum = taosArrayGetSize(q->pNotifyAddrUrls);
176,191✔
562
  int32_t outColNum = taosArrayGetSize(q->outCols);
176,191✔
563
  int32_t outTagNum = taosArrayGetSize(q->outTags);
176,191✔
564
  int32_t forceOutColNum = taosArrayGetSize(q->forceOutCols);
176,191✔
565

566
  mstsDebugL("create_info: name:%s sql:%s streamDB:%s triggerDB:%s outDB:%s calcDBNum:%d triggerTblName:%s outTblName:%s "
176,191✔
567
      "igExists:%d triggerType:%d igDisorder:%d deleteReCalc:%d deleteOutTbl:%d fillHistory:%d fillHistroyFirst:%d "
568
      "calcNotifyOnly:%d lowLatencyCalc:%d igNoDataTrigger:%d enableMultiGroupCalc:%d notifyUrlNum:%d notifyEventTypes:%d addOptions:%d notifyHistory:%d "
569
      "outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " "
570
      "triggerTblType:%d triggerTblUid:%" PRIx64 " triggerTblSuid:%" PRIx64 " vtableCalc:%d outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d "
571
      "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d calcPkSlotId:%d triPkSlotId:%d "
572
      "triggerTblVgId:%d outTblVgId:%d calcScanPlanNum:%d forceOutCols:%d idleTimeoutMs:%" PRId64,
573
      q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName,
574
      q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst,
575
      q->calcNotifyOnly, q->lowLatencyCalc, q->igNoDataTrigger, q->enableMultiGroupCalc, notifyUrlNum, q->notifyEventTypes, q->addOptions, q->notifyHistory,
576
      outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime,
577
      q->triggerTblType, q->triggerTblUid, q->triggerTblSuid, q->vtableCalc, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion,
578
      q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId, q->calcPkSlotId, q->triPkSlotId,
579
      q->triggerTblVgId, q->outTblVgId, calcScanNum, forceOutColNum, q->idleTimeoutMs);
580

581
  switch (q->triggerType) {
176,191✔
582
    case WINDOW_TYPE_INTERVAL: {
67,915✔
583
      SSlidingTrigger* t = &q->trigger.sliding;
67,915✔
584
      mstsDebug("sliding trigger options, intervalUnit:%d, slidingUnit:%d, offsetUnit:%d, soffsetUnit:%d, precision:%d, interval:%" PRId64 ", offset:%" PRId64 ", sliding:%" PRId64 ", soffset:%" PRId64, 
67,915✔
585
          t->intervalUnit, t->slidingUnit, t->offsetUnit, t->soffsetUnit, t->precision, t->interval, t->offset, t->sliding, t->soffset);
586
      break;
67,915✔
587
    }  
588
    case WINDOW_TYPE_SESSION: {
7,813✔
589
      SSessionTrigger* t = &q->trigger.session;
7,813✔
590
      mstsDebug("session trigger options, slotId:%d, sessionVal:%" PRId64, t->slotId, t->sessionVal);
7,813✔
591
      break;
7,813✔
592
    }
593
    case WINDOW_TYPE_STATE: {
49,134✔
594
      SStateWinTrigger* t = &q->trigger.stateWin;
49,134✔
595
      mstsDebug(
49,134✔
596
          "state trigger options, slotId:%d, expr:%s, extend:%d, zeroth:%s, trueForType: %d, trueForCount: %d, "
597
          "trueForDuration:%" PRId64,
598
          t->slotId, (char*)t->expr, t->extend, (char*)t->zeroth, t->trueForType, t->trueForCount, t->trueForDuration);
599
      break;
49,134✔
600
    }
601
    case WINDOW_TYPE_EVENT:{
21,520✔
602
      SEventTrigger* t = &q->trigger.event;
21,520✔
603
      mstsDebug(
21,520✔
604
          "event trigger options, startCond:%s, endCond:%s, trueForType: %d, trueForCount: %d, "
605
          "trueForDuration:%" PRId64,
606
          (char*)t->startCond, (char*)t->endCond, t->trueForType, t->trueForCount, t->trueForDuration);
607
      break;
21,520✔
608
    }
609
    case WINDOW_TYPE_COUNT: {
19,307✔
610
      SCountTrigger* t = &q->trigger.count;
19,307✔
611
      mstsDebug("count trigger options, countVal:%" PRId64 ", sliding:%" PRId64 ", condCols:%s", t->countVal, t->sliding, (char*)t->condCols);
19,307✔
612
      break;
19,307✔
613
    }
614
    case WINDOW_TYPE_PERIOD: {
10,502✔
615
      SPeriodTrigger* t = &q->trigger.period;
10,502✔
616
      mstsDebug("period trigger options, periodUnit:%d, offsetUnit:%d, precision:%d, period:%" PRId64 ", offset:%" PRId64, 
10,502✔
617
          t->periodUnit, t->offsetUnit, t->precision, t->period, t->offset);
618
      break;
10,502✔
619
    }
620
    default:
×
621
      mstsDebug("unknown triggerType:%d", q->triggerType);
×
622
      break;
×
623
  }
624

625
  mstsDebugL("create_info: triggerCols:[%s]", (char*)q->triggerCols);
176,191✔
626

627
  mstsDebugL("create_info: partitionCols:[%s]", (char*)q->partitionCols);
176,191✔
628

629
  mstsDebugL("create_info: triggerScanPlan:[%s]", (char*)q->triggerScanPlan);
176,191✔
630

631
  mstsDebugL("create_info: calcPlan:[%s]", (char*)q->calcPlan);
176,191✔
632

633
  mstsDebugL("create_info: subTblNameExpr:[%s]", (char*)q->subTblNameExpr);
176,191✔
634

635
  mstsDebugL("create_info: tagValueExpr:[%s]", (char*)q->tagValueExpr);
176,191✔
636

637

638
  for (int32_t i = 0; i < calcDBNum; ++i) {
349,254✔
639
    char* dbName = taosArrayGetP(q->calcDB, i);
173,063✔
640
    mstsDebug("create_info: calcDB[%d] - %s", i, dbName);
173,063✔
641
  }
642

643
  for (int32_t i = 0; i < calcScanNum; ++i) {
559,246✔
644
    SStreamCalcScan* pScan = taosArrayGet(q->calcScanPlanList, i);
383,055✔
645
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
383,055✔
646
    mstsDebugL("create_info: calcScanPlan[%d] - readFromCache:%d vgNum:%d scanPlan:[%s]", i, pScan->readFromCache, vgNum, (char*)pScan->scanPlan);
383,055✔
647
    for (int32_t v = 0; v < vgNum; ++v) {
766,110✔
648
      mstsDebug("create_info: calcScanPlan[%d] vg[%d] - vgId:%d", i, v, *(int32_t*)taosArrayGet(pScan->vgList, v));
383,055✔
649
    }
650
  }
651

652
  for (int32_t i = 0; i < notifyUrlNum; ++i) {
227,307✔
653
    char* url = taosArrayGetP(q->pNotifyAddrUrls, i);
51,116✔
654
    mstsDebug("create_info: notifyUrl[%d] - %s", i, url);
51,116✔
655
  }
656

657
  for (int32_t i = 0; i < outColNum; ++i) {
873,903✔
658
    SFieldWithOptions* o = taosArrayGet(q->outCols, i);
697,712✔
659
    mstsDebug("create_info: outCol[%d] - name:%s type:%d flags:%d bytes:%d compress:%u typeMod:%d", 
697,712✔
660
        i, o->name, o->type, o->flags, o->bytes, o->compress, o->typeMod);
661
  }
662
      
663
}
664

665
void mstLogSStmTaskStatus(char* name, int64_t streamId, SStmTaskStatus* pTask, int32_t idx) {
1,340,305✔
666
  mstsDebug("%s[%d]: task %" PRIx64 " deployId:%d SID:%" PRId64 " nodeId:%d tidx:%d type:%s flags:%" PRIx64 " status:%s lastUpTs:%" PRId64, 
1,340,305✔
667
      name, idx, pTask->id.taskId, pTask->id.deployId, pTask->id.seriousId, pTask->id.nodeId, pTask->id.taskIdx,
668
      gStreamTaskTypeStr[pTask->type], pTask->flags, gStreamStatusStr[pTask->status], pTask->lastUpTs);
669
}
1,340,305✔
670

671
void mstLogSStmStatus(char* tips, int64_t streamId, SStmStatus* p) {
198,616✔
672
  if (!(stDebugFlag & DEBUG_DEBUG)) {
198,616✔
673
    return;
22,552✔
674
  }
675
  
676
  if (NULL == p) {
176,064✔
677
    mstsDebug("%s: stream status is NULL", tips);
×
678
    return;
×
679
  }
680

681
  int32_t trigReaderNum = taosArrayGetSize(p->trigReaders);
176,064✔
682
  int32_t trigOReaderNum = msmGetTrigOReaderSize(p->trigOReaders);
176,064✔
683
  int32_t calcReaderNum = MST_LIST_SIZE(p->calcReaders);
176,064✔
684
  int32_t triggerNum = p->triggerTask ? 1 : 0;
176,064✔
685
  int32_t runnerNum = 0;
176,064✔
686

687
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
694,872✔
688
    runnerNum += taosArrayGetSize(p->runners[i]);
518,808✔
689
  }
690

691
  mstsDebug("%s: stream status", tips);
176,064✔
692
  mstsDebug("name:%s runnerNum:%d runnerDeploys:%d runnerReplica:%d lastActionTs:%" PRId64
176,064✔
693
           " trigReaders:%d trigOReaders:%d calcReaders:%d trigger:%d runners:%d",
694
      p->streamName, p->runnerNum, p->runnerDeploys, p->runnerReplica, p->lastActionTs,
695
      trigReaderNum, trigOReaderNum, calcReaderNum, triggerNum, runnerNum);
696

697
  SStmTaskStatus* pTask = NULL;
176,064✔
698
  for (int32_t i = 0; i < trigReaderNum; ++i) {
399,530✔
699
    pTask = taosArrayGet(p->trigReaders, i);
223,466✔
700
    mstLogSStmTaskStatus("trigReader task", streamId, pTask, i);
223,466✔
701
  }
702

703
  for (int32_t i = 0; i < trigOReaderNum; ++i) {
176,064✔
704
    pTask = msmGetTrigOReader(p->trigOReaders, i);
×
705
    mstLogSStmTaskStatus("trigOReader task", streamId, pTask, i);
×
706
  }
707

708
  SListNode* pNode = listHead(p->calcReaders);
176,064✔
709
  for (int32_t i = 0; i < calcReaderNum; ++i) {
494,825✔
710
    pTask = (SStmTaskStatus*)pNode->data;
318,761✔
711
    mstLogSStmTaskStatus("calcReader task", streamId, pTask, i);
318,761✔
712
    pNode = TD_DLIST_NODE_NEXT(pNode);
318,761✔
713
  }
714

715
  if (triggerNum > 0) {
176,064✔
716
    mstLogSStmTaskStatus("trigger task", streamId, p->triggerTask, 0);
176,064✔
717
  }
718

719
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
694,872✔
720
    int32_t num = taosArrayGetSize(p->runners[i]);
518,808✔
721
    if (num <= 0) {
518,808✔
722
      continue;
×
723
    }
724
    
725
    mstsDebug("the %dth deploy runners status", i);
518,808✔
726
    for (int32_t m = 0; m < num; ++m) {
1,140,822✔
727
      pTask = taosArrayGet(p->runners[i], m);
622,014✔
728
      mstLogSStmTaskStatus("runner task", streamId, pTask, m);
622,014✔
729
    }
730
  }
731
      
732
}
733

734
bool mstEventPassIsolation(int32_t num, int32_t event) {
4,293,927✔
735
  bool ret = ((mStreamMgmt.lastTs[event].ts + num * MST_SHORT_ISOLATION_DURATION) <= mStreamMgmt.hCtx.currentTs);
4,293,927✔
736
  if (ret) {
4,293,927✔
737
    mstDebug("event %s passed %d isolation, last:%" PRId64 ", curr:%" PRId64, 
3,330,972✔
738
        gMndStreamEvent[event], num, mStreamMgmt.lastTs[event].ts, mStreamMgmt.hCtx.currentTs);
739
  }
740

741
  return ret;
4,293,927✔
742
}
743

744
bool mstEventHandledChkSet(int32_t event) {
3,330,972✔
745
  if (0 == atomic_val_compare_exchange_8((int8_t*)&mStreamMgmt.lastTs[event].handled, 0, 1)) {
3,330,972✔
746
    mstDebug("event %s set handled", gMndStreamEvent[event]);
203,886✔
747
    return true;
203,886✔
748
  }
749
  return false;
3,127,086✔
750
}
751

752
int32_t mstGetStreamStatusStr(SStreamObj* pStream, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
640,408✔
753
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
640,408✔
754
  if (0 == active || MND_STM_STATE_NORMAL != state) {
640,408✔
755
    mstDebug("mnode streamMgmt not in active mode, active:%d, state:%d", active, state);
203✔
756
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
203✔
757
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "Mnode may be unstable, try again later", msgSize);
203✔
758
    return TSDB_CODE_SUCCESS;
203✔
759
  }
760

761
  if (atomic_load_8(&pStream->userDropped)) {
640,205✔
762
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_DROPPING], statusSize);
×
763
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
764
    return TSDB_CODE_SUCCESS;
×
765
  }
766

767
  if (atomic_load_8(&pStream->userStopped)) {
640,205✔
768
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_STOPPED], statusSize);
4,904✔
769
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
4,904✔
770
    return TSDB_CODE_SUCCESS;
4,904✔
771
  }
772

773
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
635,301✔
774
  
775
  int64_t streamId = pStream->pCreate->streamId;
635,301✔
776
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
635,301✔
777
  if (NULL == pStatus) {
635,301✔
778
    mstDebug("return Undeployed: stream not in streamMap (show streams)");
153,229✔
779
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
153,229✔
780
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
153,229✔
781
    goto _exit;
153,229✔
782
  }
783

784
  char tmpBuf[256];
482,072✔
785
  int8_t stopped = atomic_load_8(&pStatus->stopped);
482,072✔
786
  switch (stopped) {
482,072✔
787
    case 1:
12,815✔
788
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
12,815✔
789
      snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s, Failed times: %" PRId64, tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes);
12,815✔
790
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
12,815✔
791
      goto _exit;
12,815✔
792
      break;
UNCOV
793
    case 4:
×
UNCOV
794
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
×
795
      snprintf(tmpBuf, sizeof(tmpBuf), "Error: %s", tstrerror(TSDB_CODE_GRANT_STREAM_EXPIRED));
×
796
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
797
      goto _exit;
×
798
      break;
799
    default:
469,257✔
800
      break;
469,257✔
801
  }
802

803
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
469,257✔
804
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_RUNNING], statusSize);
223,775✔
805
    tstrncpy(tmpBuf, "Running start from: ", sizeof(tmpBuf));
223,775✔
806
    int32_t offset = strlen(tmpBuf);
223,775✔
807
    (void)formatTimestampLocal(tmpBuf + offset, sizeof(tmpBuf) - offset, pStatus->triggerTask->runningStartTs,
223,775✔
808
                               TSDB_TIME_PRECISION_MILLI);
809
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
223,775✔
810
    goto _exit;
223,775✔
811
  }
812

813
  mstsDebug("return Idle: pStatus=%p triggerTask=%p triggerStatus=%d (show streams view)",
245,482✔
814
      (void*)pStatus, (void*)(pStatus ? pStatus->triggerTask : NULL),
815
      (pStatus && pStatus->triggerTask) ? (int)pStatus->triggerTask->status : -1);
816
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_INIT], statusSize);
245,482✔
817
  snprintf(tmpBuf, sizeof(tmpBuf), "Current deploy times: %" PRId64, pStatus->deployTimes);
245,482✔
818
  STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
245,482✔
819
  goto _exit;
245,482✔
820

821
_exit:
635,301✔
822
  
823
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
635,301✔
824

825
  return TSDB_CODE_SUCCESS;
635,301✔
826
}
827

828
int32_t mstSetStreamAttrResBlock(SMnode *pMnode, SStreamObj* pStream, SSDataBlock* pBlock, int32_t numOfRows) {
640,408✔
829
  int32_t code = 0;
640,408✔
830
  int32_t cols = 0;
640,408✔
831
  int32_t lino = 0;
640,408✔
832

833
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
640,408✔
834
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
640,408✔
835
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
640,408✔
836
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
640,408✔
837

838
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
640,408✔
839
  TSDB_CHECK_CODE(code, lino, _end);
640,408✔
840

841
  // db_name
842
  char streamDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
640,408✔
843
  STR_WITH_MAXSIZE_TO_VARSTR(streamDB, mndGetDbStr(pStream->pCreate->streamDB), sizeof(streamDB));
640,408✔
844
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
640,408✔
845
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
640,408✔
846
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&streamDB, false);
640,408✔
847
  TSDB_CHECK_CODE(code, lino, _end);
640,408✔
848

849
  // create time
850
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
640,408✔
851
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
640,408✔
852
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->createTime, false);
640,408✔
853
  TSDB_CHECK_CODE(code, lino, _end);
640,408✔
854

855
  // stream id
856
  char streamId2[19] = {0};
640,408✔
857
  char streamId[19 + VARSTR_HEADER_SIZE] = {0};
640,408✔
858
  snprintf(streamId2, sizeof(streamId2), "%" PRIx64, pStream->pCreate->streamId);
640,408✔
859
  STR_WITH_MAXSIZE_TO_VARSTR(streamId, streamId2, sizeof(streamId));
640,408✔
860
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
640,408✔
861
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
640,408✔
862
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamId, false);
640,408✔
863
  TSDB_CHECK_CODE(code, lino, _end);
640,408✔
864

865
  // sql
866
  char sql[TSDB_INS_STREAM_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
640,408✔
867
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->pCreate->sql, sizeof(sql));
640,408✔
868
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
640,408✔
869
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
640,408✔
870
  code = colDataSetVal(pColInfo, numOfRows, (const char*)sql, false);
640,408✔
871
  TSDB_CHECK_CODE(code, lino, _end);
640,408✔
872

873
  // status
874
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
640,408✔
875
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
640,408✔
876
  code = mstGetStreamStatusStr(pStream, status, sizeof(status), msg, sizeof(msg));
640,408✔
877
  TSDB_CHECK_CODE(code, lino, _end);
640,408✔
878

879
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
640,408✔
880
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
640,408✔
881
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
640,408✔
882
  TSDB_CHECK_CODE(code, lino, _end);
640,408✔
883

884
  // snodeLeader
885
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
640,408✔
886
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
640,408✔
887
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->mainSnodeId, false);
640,408✔
888
  TSDB_CHECK_CODE(code, lino, _end);
640,408✔
889

890
  // snodeReplica
891
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
640,408✔
892
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
640,408✔
893
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, pStream->mainSnodeId);
640,408✔
894
  int32_t replicaSnodeId = pSnode ? pSnode->replicaId : -1;
640,408✔
895
  mndReleaseSnode(pMnode, pSnode);
640,408✔
896
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&replicaSnodeId, false);
640,408✔
897
  TSDB_CHECK_CODE(code, lino, _end);
640,408✔
898

899
  // msg
900
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
640,408✔
901
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
640,408✔
902
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
640,408✔
903

904
_end:
640,408✔
905
  if (code) {
640,408✔
UNCOV
906
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
907
  }
908
  return code;
640,408✔
909
}
910

911

912
int32_t mstGetTaskStatusStr(SStmTaskStatus* pTask, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
16,971,556✔
913
  char tmpBuf[256];
16,948,120✔
914
  
915
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[pTask->status], statusSize);
16,971,556✔
916
  if (STREAM_STATUS_FAILED == pTask->status && pTask->errCode) {
16,971,556✔
917
    snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s", tstrerror(pTask->errCode));
211✔
918
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
211✔
919
    return TSDB_CODE_SUCCESS;
211✔
920
  }
921

922
  if (STREAM_TRIGGER_TASK == pTask->type && mstWaitLock(&pTask->detailStatusLock, true)) {
16,971,345✔
923
    if (pTask->detailStatus) {
2,567,498✔
924
      SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pTask->detailStatus;
2,060,188✔
925
      snprintf(tmpBuf, sizeof(tmpBuf), "Current RT/HI/RE session num: %d/%d/%d, histroy progress:%d%%, total AUTO/USER recalc num: %d/%d", 
2,060,188✔
926
          pTrigger->realtimeSessionNum, pTrigger->historySessionNum, pTrigger->recalcSessionNum, pTrigger->histroyProgress,
927
          pTrigger->autoRecalcNum, (int32_t)taosArrayGetSize(pTrigger->userRecalcs));
2,060,188✔
928
      taosRUnLockLatch(&pTask->detailStatusLock);
2,060,188✔
929
      return TSDB_CODE_SUCCESS;
2,060,188✔
930
    }
931

932
    taosRUnLockLatch(&pTask->detailStatusLock);    
507,310✔
933
  }
934
  
935
  STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
14,911,157✔
936
  
937
  return TSDB_CODE_SUCCESS;
14,911,157✔
938
}
939

940
int32_t mstGetTaskExtraStr(SStmTaskStatus* pTask, char* extraStr, int32_t extraSize) {
16,971,556✔
941
  switch (pTask->type) {
16,971,556✔
942
    case STREAM_READER_TASK:
6,451,814✔
943
      if (STREAM_IS_TRIGGER_READER(pTask->flags)) {
6,451,814✔
944
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "trigReader", extraSize);
2,997,401✔
945
      } else {
946
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "calcReader", extraSize);
3,454,413✔
947
      }
948
      return TSDB_CODE_SUCCESS;
6,451,814✔
949
    case STREAM_RUNNER_TASK:
7,952,244✔
950
      if (STREAM_IS_TOP_RUNNER(pTask->flags)) {
7,952,244✔
951
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "topRunner", extraSize);
7,699,116✔
952
        return TSDB_CODE_SUCCESS;
7,699,116✔
953
      }
954
      break;
253,128✔
955
    default:
2,567,498✔
956
      break;
2,567,498✔
957
  }
958

959
  STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "", extraSize);
2,820,626✔
960
  return TSDB_CODE_SUCCESS;
2,820,626✔
961
}
962

963

964
int32_t mstSetStreamTaskResBlock(SStreamObj* pStream, SStmTaskStatus* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
16,971,556✔
965
  int32_t code = 0;
16,971,556✔
966
  int32_t cols = 0;
16,971,556✔
967
  int32_t lino = 0;
16,971,556✔
968

969
  // stream_name
970
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
16,971,556✔
971
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
16,971,556✔
972
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
973
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
974

975
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
16,971,556✔
976
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
977

978
  // stream id
979
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
16,971,556✔
980
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
16,971,556✔
981
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE])); 
16,971,556✔
982
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
983
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
984
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
16,971,556✔
985
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
986

987
  // task id
988
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.taskId);
16,971,556✔
989
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
16,971,556✔
990
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
991
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
992
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
16,971,556✔
993
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
994

995
  // type
996
  char type[20 + VARSTR_HEADER_SIZE] = {0};
16,971,556✔
997
  STR_WITH_MAXSIZE_TO_VARSTR(type, (STREAM_READER_TASK == pTask->type) ? "Reader" : ((STREAM_TRIGGER_TASK == pTask->type) ? "Trigger" : "Runner"), sizeof(type));
16,971,556✔
998
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
999
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
1000
  code = colDataSetVal(pColInfo, numOfRows, (const char*)type, false);
16,971,556✔
1001
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
1002

1003
  // serious id
1004
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.seriousId);
16,971,556✔
1005
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
16,971,556✔
1006
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
1007
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
1008
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
16,971,556✔
1009
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
1010

1011
  // deploy id
1012
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
1013
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
1014
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.deployId, false);
16,971,556✔
1015
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
1016

1017
  // node_type
1018
  char nodeType[10 + VARSTR_HEADER_SIZE] = {0};
16,971,556✔
1019
  STR_WITH_MAXSIZE_TO_VARSTR(nodeType, (STREAM_READER_TASK == pTask->type) ? "vnode" : "snode", sizeof(nodeType));
16,971,556✔
1020
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
1021
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
1022
  code = colDataSetVal(pColInfo, numOfRows, (const char*)nodeType, false);
16,971,556✔
1023
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
1024

1025
  // node id
1026
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
1027
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
1028
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.nodeId, false);
16,971,556✔
1029
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
1030

1031
  // task idx
1032
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
1033
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
1034
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.taskIdx, false);
16,971,556✔
1035
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
1036

1037
  // status
1038
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
16,971,556✔
1039
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
16,971,556✔
1040
  code = mstGetTaskStatusStr(pTask, status, sizeof(status), msg, sizeof(msg));
16,971,556✔
1041
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
1042

1043
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
1044
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
1045
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
16,971,556✔
1046
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
1047

1048
  // start time
1049
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
1050
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
1051
  if (pTask->runningStartTs) {
16,971,556✔
1052
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, false);
4,109,082✔
1053
  } else {
1054
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, true);
12,862,474✔
1055
  }
1056
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
1057

1058
  // last update
1059
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
1060
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
1061
  if (pTask->lastUpTs) {
16,971,556✔
1062
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, false);
16,970,838✔
1063
  } else {
1064
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, true);
718✔
1065
  }
1066
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
1067

1068
  // extra info
1069
  char extra[64 + VARSTR_HEADER_SIZE] = {0};
16,971,556✔
1070
  code = mstGetTaskExtraStr(pTask, extra, sizeof(extra));
16,971,556✔
1071
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
1072
  
1073
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
1074
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
1075
  code = colDataSetVal(pColInfo, numOfRows, (const char*)extra, false);
16,971,556✔
1076
  TSDB_CHECK_CODE(code, lino, _end);
16,971,556✔
1077

1078
  // msg
1079
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
16,971,556✔
1080
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,971,556✔
1081
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
16,971,556✔
1082

1083
_end:
16,971,556✔
1084
  if (code) {
16,971,556✔
UNCOV
1085
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1086
  }
1087
  return code;
16,971,556✔
1088
}
1089

1090
int32_t mstGetNumOfStreamTasks(SStmStatus* pStatus) {
2,571,932✔
1091
  int32_t num = taosArrayGetSize(pStatus->trigReaders) + msmGetTrigOReaderSize(pStatus->trigOReaders) + MST_LIST_SIZE(pStatus->calcReaders) + (pStatus->triggerTask ? 1 : 0);
2,571,932✔
1092
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
10,287,728✔
1093
    num += taosArrayGetSize(pStatus->runners[i]);
7,715,796✔
1094
  }
1095

1096
  return num;
2,571,932✔
1097
}
1098

1099
int32_t mstSetStreamTasksResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
2,800,355✔
1100
  int32_t code = 0;
2,800,355✔
1101
  int32_t lino = 0;
2,800,355✔
1102
  int64_t streamId = pStream->pCreate->streamId;
2,800,355✔
1103
  bool    statusLocked = false;
2,800,355✔
1104

1105
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
2,800,355✔
1106

1107
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
2,800,355✔
1108
  if (NULL == pStatus) {
2,800,355✔
1109
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
169,605✔
1110
    goto _exit;
169,605✔
1111
  }
1112

1113
  int8_t stopped = atomic_load_8(&pStatus->stopped);
2,630,750✔
1114
  if (stopped) {
2,630,750✔
1115
    mstsDebug("stream stopped %d, ignore it", stopped);
58,818✔
1116
    goto _exit;
58,818✔
1117
  }
1118

1119
  (void)mstWaitLock(&pStatus->resetLock, true);
2,571,932✔
1120
  statusLocked = true;
2,571,932✔
1121
  
1122
  int32_t count = mstGetNumOfStreamTasks(pStatus);
2,571,932✔
1123

1124
  if (*numOfRows + count > rowsCapacity) {
2,571,932✔
1125
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
68,693✔
1126
    if (code) {
68,693✔
UNCOV
1127
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
UNCOV
1128
      TAOS_CHECK_EXIT(code);
×
1129
    }
1130
  }
1131

1132
  SStmTaskStatus* pTask = NULL;
2,571,932✔
1133
  int32_t trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
2,571,932✔
1134
  for (int32_t i = 0; i < trigReaderNum; ++i) {
5,382,456✔
1135
    pTask = taosArrayGet(pStatus->trigReaders, i);
2,810,524✔
1136
  
1137
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
2,810,524✔
1138
    if (code == TSDB_CODE_SUCCESS) {
2,810,524✔
1139
      (*numOfRows)++;
2,810,524✔
1140
    }
1141
  }
1142

1143
  trigReaderNum = msmGetTrigOReaderSize(pStatus->trigOReaders);
2,571,932✔
1144
  for (int32_t i = 0; i < trigReaderNum; ++i) {
2,758,809✔
1145
    pTask = msmGetTrigOReader(pStatus->trigOReaders, i);
186,877✔
1146
  
1147
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
186,877✔
1148
    if (code == TSDB_CODE_SUCCESS) {
186,877✔
1149
      (*numOfRows)++;
186,877✔
1150
    }
1151
  }
1152

1153
  if (pStatus->calcReaders) {
2,571,932✔
1154
    int32_t calcReaderNum = MST_LIST_SIZE(pStatus->calcReaders);
2,571,932✔
1155
    SListNode* pNode = listHead(pStatus->calcReaders);
2,571,932✔
1156
    for (int32_t i = 0; i < calcReaderNum; ++i) {
6,026,345✔
1157
      pTask = (SStmTaskStatus*)pNode->data;
3,454,413✔
1158
    
1159
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
3,454,413✔
1160
      if (code == TSDB_CODE_SUCCESS) {
3,454,413✔
1161
        (*numOfRows)++;
3,454,413✔
1162
      }
1163
      pNode = TD_DLIST_NODE_NEXT(pNode);
3,454,413✔
1164
    }
1165
  }
1166

1167
  if (pStatus->triggerTask) {
2,571,932✔
1168
    code = mstSetStreamTaskResBlock(pStream, pStatus->triggerTask, pBlock, *numOfRows);
2,567,498✔
1169
    if (code == TSDB_CODE_SUCCESS) {
2,567,498✔
1170
      (*numOfRows)++;
2,567,498✔
1171
    }
1172
  }
1173

1174
  int32_t runnerNum = 0;
2,571,932✔
1175
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
10,287,728✔
1176
    runnerNum = taosArrayGetSize(pStatus->runners[i]);
7,715,796✔
1177
    for (int32_t m = 0; m < runnerNum; ++m) {
15,668,040✔
1178
      pTask = taosArrayGet(pStatus->runners[i], m);
7,952,244✔
1179
    
1180
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
7,952,244✔
1181
      if (code == TSDB_CODE_SUCCESS) {
7,952,244✔
1182
        (*numOfRows)++;
7,952,244✔
1183
      }
1184
    }
1185
  }
1186
  
1187
  pBlock->info.rows = *numOfRows;
2,571,932✔
1188

1189
_exit:
2,800,355✔
1190

1191
  if (statusLocked) {
2,800,355✔
1192
    taosRUnLockLatch(&pStatus->resetLock);
2,571,932✔
1193
  }
1194
  
1195
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
2,800,355✔
1196

1197
  if (code) {
2,800,355✔
UNCOV
1198
    mError("error happens when build stream tasks result block, lino:%d, code:%s", lino, tstrerror(code));
×
1199
  }
1200
  
1201
  return code;
2,800,355✔
1202
}
1203

1204

1205
int32_t mstAppendNewRecalcRange(int64_t streamId, SStmStatus *pStream, STimeWindow* pRange) {
3,640✔
1206
  int32_t code = 0;
3,640✔
1207
  int32_t lino = 0;
3,640✔
1208
  bool    locked = false;
3,640✔
1209
  SArray* userRecalcList = NULL;
3,640✔
1210

1211
  SStreamRecalcReq req = {.recalcId = 0, .start = pRange->skey, .end = pRange->ekey};
3,640✔
1212
  TAOS_CHECK_EXIT(taosGetSystemUUIDU64(&req.recalcId));
3,640✔
1213
  
1214
  taosWLockLatch(&pStream->userRecalcLock);
3,640✔
1215
  locked = true;
3,640✔
1216
  
1217
  if (NULL == pStream->userRecalcList) {
3,640✔
1218
    userRecalcList = taosArrayInit(2, sizeof(SStreamRecalcReq));
3,640✔
1219
    if (NULL == userRecalcList) {
3,640✔
UNCOV
1220
      TAOS_CHECK_EXIT(terrno);
×
1221
    }
1222

1223
    TSDB_CHECK_NULL(taosArrayPush(userRecalcList, &req), code, lino, _exit, terrno);
3,640✔
1224

1225
    atomic_store_ptr(&pStream->userRecalcList, userRecalcList);
3,640✔
1226
    userRecalcList = NULL;    
3,640✔
1227
  } else {
UNCOV
1228
    TSDB_CHECK_NULL(taosArrayPush(pStream->userRecalcList, &req), code, lino, _exit, terrno);
×
1229
  }
1230
  
1231
  mstsInfo("stream recalc ID:%" PRIx64 " range:%" PRId64 " - %" PRId64 " added", req.recalcId, pRange->skey, pRange->ekey);
3,640✔
1232

1233
_exit:
3,640✔
1234

1235
  taosArrayDestroy(userRecalcList);
3,640✔
1236

1237
  if (locked) {
3,640✔
1238
    taosWUnLockLatch(&pStream->userRecalcLock);
3,640✔
1239
  }
1240
  
1241
  if (code) {
3,640✔
UNCOV
1242
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1243
  }
1244
  
1245
  return code;
3,640✔
1246
}
1247

1248

1249

UNCOV
1250
int32_t mstSetStreamRecalculateResBlock(SStreamObj* pStream, SSTriggerRecalcProgress* pProgress, SSDataBlock* pBlock, int32_t numOfRows) {
×
UNCOV
1251
  int32_t code = 0;
×
UNCOV
1252
  int32_t cols = 0;
×
UNCOV
1253
  int32_t lino = 0;
×
1254

1255
  // stream_name
1256
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1257
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
1258
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1259
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1260

1261
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
1262
  TSDB_CHECK_CODE(code, lino, _end);
×
1263

1264
  // stream id
UNCOV
1265
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
×
1266
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
×
1267
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE); 
×
UNCOV
1268
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1269
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1270
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1271
  TSDB_CHECK_CODE(code, lino, _end);
×
1272

1273
  // recalc id
1274
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pProgress->recalcId);
×
1275
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1276
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1277
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
UNCOV
1278
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1279
  TSDB_CHECK_CODE(code, lino, _end);
×
1280

1281
  // start
1282
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1283
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1284
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->start, false);
×
UNCOV
1285
  TSDB_CHECK_CODE(code, lino, _end);
×
1286

1287
  // end
1288
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1289
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1290
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->end, false);
×
UNCOV
1291
  TSDB_CHECK_CODE(code, lino, _end);
×
1292

1293
  // progress
1294
  char progress[20 + VARSTR_HEADER_SIZE] = {0};
×
1295
  snprintf(&progress[VARSTR_HEADER_SIZE], sizeof(progress) - VARSTR_HEADER_SIZE, "%d%%", pProgress->progress);
×
1296
  varDataSetLen(progress, strlen(&progress[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
UNCOV
1297
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1298
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1299
  code = colDataSetVal(pColInfo, numOfRows, (const char*)progress, false);
×
1300
  TSDB_CHECK_CODE(code, lino, _end);
×
1301

1302
_end:
×
1303
  if (code) {
×
1304
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1305
  }
UNCOV
1306
  return code;
×
1307
}
1308

1309

1310
int32_t mstSetStreamRecalculatesResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
282✔
1311
  int32_t code = 0;
282✔
1312
  int32_t lino = 0;
282✔
1313
  int64_t streamId = pStream->pCreate->streamId;
282✔
1314

1315
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
282✔
1316

1317
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
282✔
1318
  if (NULL == pStatus) {
282✔
UNCOV
1319
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
×
UNCOV
1320
    goto _exit;
×
1321
  }
1322

1323
  int8_t stopped = atomic_load_8(&pStatus->stopped);
282✔
1324
  if (stopped) {
282✔
1325
    mstsDebug("stream stopped %d, ignore it", stopped);
×
UNCOV
1326
    goto _exit;
×
1327
  }
1328

1329
  if (NULL == pStatus->triggerTask) {
282✔
1330
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1331
    goto _exit;
×
1332
  }
1333

1334
  (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, true);
282✔
1335
  if (NULL == pStatus->triggerTask->detailStatus) {
282✔
1336
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
UNCOV
1337
    taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
UNCOV
1338
    goto _exit;
×
1339
  }
1340

1341
  SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pStatus->triggerTask->detailStatus;
282✔
1342
  int32_t count = taosArrayGetSize(pTrigger->userRecalcs);
282✔
1343

1344
  if (*numOfRows + count > rowsCapacity) {
282✔
UNCOV
1345
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
×
UNCOV
1346
    if (code) {
×
UNCOV
1347
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
UNCOV
1348
      taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
UNCOV
1349
      TAOS_CHECK_EXIT(code);
×
1350
    }
1351
  }
1352

1353
  for (int32_t i = 0; i < count; ++i) {
282✔
1354
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pTrigger->userRecalcs, i);
×
1355
  
UNCOV
1356
    code = mstSetStreamRecalculateResBlock(pStream, pProgress, pBlock, *numOfRows);
×
UNCOV
1357
    if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
1358
      (*numOfRows)++;
×
1359
    }
1360
  }
1361

1362
  taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
282✔
1363
  
1364
  pBlock->info.rows = *numOfRows;
282✔
1365

1366
_exit:
282✔
1367
  
1368
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
282✔
1369

1370
  if (code) {
282✔
UNCOV
1371
    mError("error happens when build stream recalculates result block, lino:%d, code:%s", lino, tstrerror(code));
×
1372
  }
1373
  
1374
  return code;
282✔
1375
}
1376

1377
int32_t mstGetScanUidFromPlan(int64_t streamId, void* scanPlan, int64_t* uid) {
329,409✔
1378
  SSubplan* pSubplan = NULL;
329,409✔
1379
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
329,409✔
1380
  
1381
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
329,409✔
1382

1383
  if (pSubplan->pNode && nodeType(pSubplan->pNode) == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
329,409✔
1384
    SScanPhysiNode* pScanNode = (SScanPhysiNode*)pSubplan->pNode;
149,073✔
1385
    *uid = pScanNode->uid;
149,073✔
1386
  }
1387
  
1388
_exit:
328,794✔
1389

1390
  if (code) {
329,409✔
UNCOV
1391
    mstsError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1392
  }
1393

1394
  nodesDestroyNode((SNode *)pSubplan);
329,409✔
1395

1396
  return code;
329,409✔
1397
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc